common_meta/key/
table_route.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::sync::Arc;
18
19use serde::{Deserialize, Serialize};
20use snafu::{ensure, OptionExt, ResultExt};
21use store_api::storage::{RegionId, RegionNumber};
22use table::metadata::TableId;
23
24use crate::error::{
25    InvalidMetadataSnafu, MetadataCorruptionSnafu, Result, SerdeJsonSnafu, TableRouteNotFoundSnafu,
26    UnexpectedLogicalRouteTableSnafu,
27};
28use crate::key::node_address::{NodeAddressKey, NodeAddressValue};
29use crate::key::txn_helper::TxnOpGetResponseSet;
30use crate::key::{
31    DeserializedValueWithBytes, MetadataKey, MetadataValue, RegionDistribution,
32    TABLE_ROUTE_KEY_PATTERN, TABLE_ROUTE_PREFIX,
33};
34use crate::kv_backend::txn::Txn;
35use crate::kv_backend::KvBackendRef;
36use crate::rpc::router::{region_distribution, RegionRoute};
37use crate::rpc::store::BatchGetRequest;
38
39/// The key stores table routes
40///
41/// The layout: `__table_route/{table_id}`.
42#[derive(Debug, PartialEq)]
43pub struct TableRouteKey {
44    pub table_id: TableId,
45}
46
47impl TableRouteKey {
48    pub fn new(table_id: TableId) -> Self {
49        Self { table_id }
50    }
51
52    /// Returns the range prefix of the table route key.
53    pub fn range_prefix() -> Vec<u8> {
54        format!("{}/", TABLE_ROUTE_PREFIX).into_bytes()
55    }
56}
57
58#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
59#[serde(tag = "type", rename_all = "snake_case")]
60pub enum TableRouteValue {
61    Physical(PhysicalTableRouteValue),
62    Logical(LogicalTableRouteValue),
63}
64
65#[derive(Debug, PartialEq, Serialize, Deserialize, Clone, Default)]
66pub struct PhysicalTableRouteValue {
67    pub region_routes: Vec<RegionRoute>,
68    version: u64,
69}
70
71#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
72pub struct LogicalTableRouteValue {
73    physical_table_id: TableId,
74    region_ids: Vec<RegionId>,
75}
76
77impl TableRouteValue {
78    /// Returns a [TableRouteValue::Physical] if `table_id` equals `physical_table_id`.
79    /// Otherwise returns a [TableRouteValue::Logical].
80    pub(crate) fn new(
81        table_id: TableId,
82        physical_table_id: TableId,
83        region_routes: Vec<RegionRoute>,
84    ) -> Self {
85        if table_id == physical_table_id {
86            TableRouteValue::physical(region_routes)
87        } else {
88            let region_routes = region_routes
89                .into_iter()
90                .map(|region| {
91                    debug_assert_eq!(region.region.id.table_id(), physical_table_id);
92                    RegionId::new(table_id, region.region.id.region_number())
93                })
94                .collect();
95            TableRouteValue::logical(physical_table_id, region_routes)
96        }
97    }
98
99    pub fn physical(region_routes: Vec<RegionRoute>) -> Self {
100        Self::Physical(PhysicalTableRouteValue::new(region_routes))
101    }
102
103    pub fn logical(physical_table_id: TableId, region_ids: Vec<RegionId>) -> Self {
104        Self::Logical(LogicalTableRouteValue::new(physical_table_id, region_ids))
105    }
106
107    /// Returns a new version [TableRouteValue] with `region_routes`.
108    pub fn update(&self, region_routes: Vec<RegionRoute>) -> Result<Self> {
109        ensure!(
110            self.is_physical(),
111            UnexpectedLogicalRouteTableSnafu {
112                err_msg: format!("{self:?} is a non-physical TableRouteValue."),
113            }
114        );
115        let version = self.as_physical_table_route_ref().version;
116        Ok(Self::Physical(PhysicalTableRouteValue {
117            region_routes,
118            version: version + 1,
119        }))
120    }
121
122    /// Returns the version.
123    ///
124    /// For test purpose.
125    #[cfg(any(test, feature = "testing"))]
126    pub fn version(&self) -> Result<u64> {
127        ensure!(
128            self.is_physical(),
129            UnexpectedLogicalRouteTableSnafu {
130                err_msg: format!("{self:?} is a non-physical TableRouteValue."),
131            }
132        );
133        Ok(self.as_physical_table_route_ref().version)
134    }
135
136    /// Returns the corresponding [RegionRoute], returns `None` if it's the specific region is not found.
137    ///
138    /// Note: It throws an error if it's a logical table
139    pub fn region_route(&self, region_id: RegionId) -> Result<Option<RegionRoute>> {
140        ensure!(
141            self.is_physical(),
142            UnexpectedLogicalRouteTableSnafu {
143                err_msg: format!("{self:?} is a non-physical TableRouteValue."),
144            }
145        );
146        Ok(self
147            .as_physical_table_route_ref()
148            .region_routes
149            .iter()
150            .find(|route| route.region.id == region_id)
151            .cloned())
152    }
153
154    /// Returns true if it's [TableRouteValue::Physical].
155    pub fn is_physical(&self) -> bool {
156        matches!(self, TableRouteValue::Physical(_))
157    }
158
159    /// Gets the [RegionRoute]s of this [TableRouteValue::Physical].
160    pub fn region_routes(&self) -> Result<&Vec<RegionRoute>> {
161        ensure!(
162            self.is_physical(),
163            UnexpectedLogicalRouteTableSnafu {
164                err_msg: format!("{self:?} is a non-physical TableRouteValue."),
165            }
166        );
167        Ok(&self.as_physical_table_route_ref().region_routes)
168    }
169
170    /// Returns the reference of [`PhysicalTableRouteValue`].
171    ///
172    /// # Panic
173    /// If it is not the [`PhysicalTableRouteValue`].
174    fn as_physical_table_route_ref(&self) -> &PhysicalTableRouteValue {
175        match self {
176            TableRouteValue::Physical(x) => x,
177            _ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
178        }
179    }
180
181    /// Converts to [`PhysicalTableRouteValue`].
182    ///
183    /// # Panic
184    /// If it is not the [`PhysicalTableRouteValue`].
185    pub fn into_physical_table_route(self) -> PhysicalTableRouteValue {
186        match self {
187            TableRouteValue::Physical(x) => x,
188            _ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
189        }
190    }
191
192    pub fn region_numbers(&self) -> Vec<RegionNumber> {
193        match self {
194            TableRouteValue::Physical(x) => x
195                .region_routes
196                .iter()
197                .map(|region_route| region_route.region.id.region_number())
198                .collect(),
199            TableRouteValue::Logical(x) => x
200                .region_ids()
201                .iter()
202                .map(|region_id| region_id.region_number())
203                .collect(),
204        }
205    }
206}
207
208impl MetadataValue for TableRouteValue {
209    fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
210        let r = serde_json::from_slice::<TableRouteValue>(raw_value);
211        match r {
212            // Compatible with old TableRouteValue.
213            Err(e) if e.is_data() => Ok(Self::Physical(
214                serde_json::from_slice::<PhysicalTableRouteValue>(raw_value)
215                    .context(SerdeJsonSnafu)?,
216            )),
217            Ok(x) => Ok(x),
218            Err(e) => Err(e).context(SerdeJsonSnafu),
219        }
220    }
221
222    fn try_as_raw_value(&self) -> Result<Vec<u8>> {
223        serde_json::to_vec(self).context(SerdeJsonSnafu)
224    }
225}
226
227impl PhysicalTableRouteValue {
228    pub fn new(region_routes: Vec<RegionRoute>) -> Self {
229        Self {
230            region_routes,
231            version: 0,
232        }
233    }
234}
235
236impl LogicalTableRouteValue {
237    pub fn new(physical_table_id: TableId, region_ids: Vec<RegionId>) -> Self {
238        Self {
239            physical_table_id,
240            region_ids,
241        }
242    }
243
244    pub fn physical_table_id(&self) -> TableId {
245        self.physical_table_id
246    }
247
248    pub fn region_ids(&self) -> &Vec<RegionId> {
249        &self.region_ids
250    }
251}
252
253impl MetadataKey<'_, TableRouteKey> for TableRouteKey {
254    fn to_bytes(&self) -> Vec<u8> {
255        self.to_string().into_bytes()
256    }
257
258    fn from_bytes(bytes: &[u8]) -> Result<TableRouteKey> {
259        let key = std::str::from_utf8(bytes).map_err(|e| {
260            InvalidMetadataSnafu {
261                err_msg: format!(
262                    "TableRouteKey '{}' is not a valid UTF8 string: {e}",
263                    String::from_utf8_lossy(bytes)
264                ),
265            }
266            .build()
267        })?;
268        let captures = TABLE_ROUTE_KEY_PATTERN
269            .captures(key)
270            .context(InvalidMetadataSnafu {
271                err_msg: format!("Invalid TableRouteKey '{key}'"),
272            })?;
273        // Safety: pass the regex check above
274        let table_id = captures[1].parse::<TableId>().unwrap();
275        Ok(TableRouteKey { table_id })
276    }
277}
278
279impl Display for TableRouteKey {
280    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
281        write!(f, "{}/{}", TABLE_ROUTE_PREFIX, self.table_id)
282    }
283}
284
285pub type TableRouteManagerRef = Arc<TableRouteManager>;
286
287pub struct TableRouteManager {
288    storage: TableRouteStorage,
289}
290
291impl TableRouteManager {
292    pub fn new(kv_backend: KvBackendRef) -> Self {
293        Self {
294            storage: TableRouteStorage::new(kv_backend),
295        }
296    }
297
298    /// Returns the [TableId] recursively.
299    ///
300    /// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
301    /// - the table(`logical_or_physical_table_id`) does not exist.
302    pub async fn get_physical_table_id(
303        &self,
304        logical_or_physical_table_id: TableId,
305    ) -> Result<TableId> {
306        let table_route = self
307            .storage
308            .get_inner(logical_or_physical_table_id)
309            .await?
310            .context(TableRouteNotFoundSnafu {
311                table_id: logical_or_physical_table_id,
312            })?;
313
314        match table_route {
315            TableRouteValue::Physical(_) => Ok(logical_or_physical_table_id),
316            TableRouteValue::Logical(x) => Ok(x.physical_table_id()),
317        }
318    }
319
320    /// Returns the [TableRouteValue::Physical] recursively.
321    ///
322    /// Returns a [TableRouteNotFound](error::Error::TableRouteNotFound) Error if:
323    /// - the physical table(`logical_or_physical_table_id`) does not exist
324    /// - the corresponding physical table of the logical table(`logical_or_physical_table_id`) does not exist.
325    pub async fn get_physical_table_route(
326        &self,
327        logical_or_physical_table_id: TableId,
328    ) -> Result<(TableId, PhysicalTableRouteValue)> {
329        let table_route = self
330            .storage
331            .get(logical_or_physical_table_id)
332            .await?
333            .context(TableRouteNotFoundSnafu {
334                table_id: logical_or_physical_table_id,
335            })?;
336
337        match table_route {
338            TableRouteValue::Physical(x) => Ok((logical_or_physical_table_id, x)),
339            TableRouteValue::Logical(x) => {
340                let physical_table_id = x.physical_table_id();
341                let physical_table_route = self.storage.get(physical_table_id).await?.context(
342                    TableRouteNotFoundSnafu {
343                        table_id: physical_table_id,
344                    },
345                )?;
346                let physical_table_route = physical_table_route.into_physical_table_route();
347                Ok((physical_table_id, physical_table_route))
348            }
349        }
350    }
351
352    /// Returns the [TableRouteValue::Physical] recursively.
353    ///
354    /// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
355    /// - one of the logical tables corresponding to the physical table does not exist.
356    ///
357    /// **Notes**: it may return a subset of `logical_or_physical_table_ids`.
358    pub async fn batch_get_physical_table_routes(
359        &self,
360        logical_or_physical_table_ids: &[TableId],
361    ) -> Result<HashMap<TableId, PhysicalTableRouteValue>> {
362        let table_routes = self
363            .storage
364            .batch_get(logical_or_physical_table_ids)
365            .await?;
366        // Returns a subset of `logical_or_physical_table_ids`.
367        let table_routes = table_routes
368            .into_iter()
369            .zip(logical_or_physical_table_ids)
370            .filter_map(|(route, id)| route.map(|route| (*id, route)))
371            .collect::<HashMap<_, _>>();
372
373        let mut physical_table_routes = HashMap::with_capacity(table_routes.len());
374        let mut logical_table_ids = HashMap::with_capacity(table_routes.len());
375
376        for (table_id, table_route) in table_routes {
377            match table_route {
378                TableRouteValue::Physical(x) => {
379                    physical_table_routes.insert(table_id, x);
380                }
381                TableRouteValue::Logical(x) => {
382                    logical_table_ids.insert(table_id, x.physical_table_id());
383                }
384            }
385        }
386
387        if logical_table_ids.is_empty() {
388            return Ok(physical_table_routes);
389        }
390
391        // Finds the logical tables corresponding to the physical tables.
392        let physical_table_ids = logical_table_ids
393            .values()
394            .cloned()
395            .collect::<HashSet<_>>()
396            .into_iter()
397            .collect::<Vec<_>>();
398        let table_routes = self
399            .table_route_storage()
400            .batch_get(&physical_table_ids)
401            .await?;
402        let table_routes = table_routes
403            .into_iter()
404            .zip(physical_table_ids)
405            .filter_map(|(route, id)| route.map(|route| (id, route)))
406            .collect::<HashMap<_, _>>();
407
408        for (logical_table_id, physical_table_id) in logical_table_ids {
409            let table_route =
410                table_routes
411                    .get(&physical_table_id)
412                    .context(TableRouteNotFoundSnafu {
413                        table_id: physical_table_id,
414                    })?;
415            match table_route {
416                TableRouteValue::Physical(x) => {
417                    physical_table_routes.insert(logical_table_id, x.clone());
418                }
419                TableRouteValue::Logical(x) => {
420                    // Never get here, because we use a physical table id cannot obtain a logical table.
421                    MetadataCorruptionSnafu {
422                        err_msg: format!(
423                            "logical table {} {:?} cannot be resolved to a physical table.",
424                            logical_table_id, x
425                        ),
426                    }
427                    .fail()?;
428                }
429            }
430        }
431
432        Ok(physical_table_routes)
433    }
434
435    /// Returns [`RegionDistribution`] of the table(`table_id`).
436    pub async fn get_region_distribution(
437        &self,
438        table_id: TableId,
439    ) -> Result<Option<RegionDistribution>> {
440        self.storage
441            .get(table_id)
442            .await?
443            .map(|table_route| Ok(region_distribution(table_route.region_routes()?)))
444            .transpose()
445    }
446
447    /// Returns low-level APIs.
448    pub fn table_route_storage(&self) -> &TableRouteStorage {
449        &self.storage
450    }
451}
452
453/// Low-level operations of [TableRouteValue].
454pub struct TableRouteStorage {
455    kv_backend: KvBackendRef,
456}
457
458pub type TableRouteValueDecodeResult = Result<Option<DeserializedValueWithBytes<TableRouteValue>>>;
459
460impl TableRouteStorage {
461    pub fn new(kv_backend: KvBackendRef) -> Self {
462        Self { kv_backend }
463    }
464
465    /// Builds a create table route transaction,
466    /// it expected the `__table_route/{table_id}` wasn't occupied.
467    pub fn build_create_txn(
468        &self,
469        table_id: TableId,
470        table_route_value: &TableRouteValue,
471    ) -> Result<(
472        Txn,
473        impl FnOnce(&mut TxnOpGetResponseSet) -> TableRouteValueDecodeResult,
474    )> {
475        let key = TableRouteKey::new(table_id);
476        let raw_key = key.to_bytes();
477
478        let txn = Txn::put_if_not_exists(raw_key.clone(), table_route_value.try_as_raw_value()?);
479
480        Ok((
481            txn,
482            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
483        ))
484    }
485
486    // TODO(LFC): restore its original visibility after some test utility codes are refined
487    /// Builds a update table route transaction,
488    /// it expected the remote value equals the `current_table_route_value`.
489    /// It retrieves the latest value if the comparing failed.
490    pub fn build_update_txn(
491        &self,
492        table_id: TableId,
493        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
494        new_table_route_value: &TableRouteValue,
495    ) -> Result<(
496        Txn,
497        impl FnOnce(&mut TxnOpGetResponseSet) -> TableRouteValueDecodeResult,
498    )> {
499        let key = TableRouteKey::new(table_id);
500        let raw_key = key.to_bytes();
501        let raw_value = current_table_route_value.get_raw_bytes();
502        let new_raw_value: Vec<u8> = new_table_route_value.try_as_raw_value()?;
503
504        let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
505
506        Ok((
507            txn,
508            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
509        ))
510    }
511
512    /// Returns the [`TableRouteValue`].
513    pub async fn get(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
514        let mut table_route = self.get_inner(table_id).await?;
515        if let Some(table_route) = &mut table_route {
516            self.remap_route_address(table_route).await?;
517        };
518
519        Ok(table_route)
520    }
521
522    async fn get_inner(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
523        let key = TableRouteKey::new(table_id);
524        self.kv_backend
525            .get(&key.to_bytes())
526            .await?
527            .map(|kv| TableRouteValue::try_from_raw_value(&kv.value))
528            .transpose()
529    }
530
531    /// Returns the [`TableRouteValue`] wrapped with [`DeserializedValueWithBytes`].
532    pub async fn get_with_raw_bytes(
533        &self,
534        table_id: TableId,
535    ) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
536        let mut table_route = self.get_with_raw_bytes_inner(table_id).await?;
537        if let Some(table_route) = &mut table_route {
538            self.remap_route_address(table_route).await?;
539        };
540
541        Ok(table_route)
542    }
543
544    async fn get_with_raw_bytes_inner(
545        &self,
546        table_id: TableId,
547    ) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
548        let key = TableRouteKey::new(table_id);
549        self.kv_backend
550            .get(&key.to_bytes())
551            .await?
552            .map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value))
553            .transpose()
554    }
555
556    /// Returns batch of [`TableRouteValue`] that respects the order of `table_ids`.
557    pub async fn batch_get(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRouteValue>>> {
558        let mut table_routes = self.batch_get_inner(table_ids).await?;
559        self.remap_routes_addresses(&mut table_routes).await?;
560
561        Ok(table_routes)
562    }
563
564    async fn batch_get_inner(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRouteValue>>> {
565        let keys = table_ids
566            .iter()
567            .map(|id| TableRouteKey::new(*id).to_bytes())
568            .collect::<Vec<_>>();
569        let resp = self
570            .kv_backend
571            .batch_get(BatchGetRequest { keys: keys.clone() })
572            .await?;
573
574        let kvs = resp
575            .kvs
576            .into_iter()
577            .map(|kv| (kv.key, kv.value))
578            .collect::<HashMap<_, _>>();
579        keys.into_iter()
580            .map(|key| {
581                if let Some(value) = kvs.get(&key) {
582                    Ok(Some(TableRouteValue::try_from_raw_value(value)?))
583                } else {
584                    Ok(None)
585                }
586            })
587            .collect()
588    }
589
590    async fn remap_routes_addresses(
591        &self,
592        table_routes: &mut [Option<TableRouteValue>],
593    ) -> Result<()> {
594        let keys = table_routes
595            .iter()
596            .flat_map(|table_route| {
597                table_route
598                    .as_ref()
599                    .map(extract_address_keys)
600                    .unwrap_or_default()
601            })
602            .collect::<HashSet<_>>()
603            .into_iter()
604            .collect();
605        let node_addrs = self.get_node_addresses(keys).await?;
606        for table_route in table_routes.iter_mut().flatten() {
607            set_addresses(&node_addrs, table_route)?;
608        }
609
610        Ok(())
611    }
612
613    async fn remap_route_address(&self, table_route: &mut TableRouteValue) -> Result<()> {
614        let keys = extract_address_keys(table_route).into_iter().collect();
615        let node_addrs = self.get_node_addresses(keys).await?;
616        set_addresses(&node_addrs, table_route)?;
617
618        Ok(())
619    }
620
621    async fn get_node_addresses(
622        &self,
623        keys: Vec<Vec<u8>>,
624    ) -> Result<HashMap<u64, NodeAddressValue>> {
625        if keys.is_empty() {
626            return Ok(HashMap::default());
627        }
628
629        self.kv_backend
630            .batch_get(BatchGetRequest { keys })
631            .await?
632            .kvs
633            .into_iter()
634            .map(|kv| {
635                let node_id = NodeAddressKey::from_bytes(&kv.key)?.node_id;
636                let node_addr = NodeAddressValue::try_from_raw_value(&kv.value)?;
637                Ok((node_id, node_addr))
638            })
639            .collect()
640    }
641}
642
643fn set_addresses(
644    node_addrs: &HashMap<u64, NodeAddressValue>,
645    table_route: &mut TableRouteValue,
646) -> Result<()> {
647    let TableRouteValue::Physical(physical_table_route) = table_route else {
648        return Ok(());
649    };
650
651    for region_route in &mut physical_table_route.region_routes {
652        if let Some(leader) = &mut region_route.leader_peer {
653            if let Some(node_addr) = node_addrs.get(&leader.id) {
654                leader.addr = node_addr.peer.addr.clone();
655            }
656        }
657        for follower in &mut region_route.follower_peers {
658            if let Some(node_addr) = node_addrs.get(&follower.id) {
659                follower.addr = node_addr.peer.addr.clone();
660            }
661        }
662    }
663
664    Ok(())
665}
666
667fn extract_address_keys(table_route: &TableRouteValue) -> HashSet<Vec<u8>> {
668    let TableRouteValue::Physical(physical_table_route) = table_route else {
669        return HashSet::default();
670    };
671
672    physical_table_route
673        .region_routes
674        .iter()
675        .flat_map(|region_route| {
676            region_route
677                .follower_peers
678                .iter()
679                .map(|peer| NodeAddressKey::with_datanode(peer.id).to_bytes())
680                .chain(
681                    region_route
682                        .leader_peer
683                        .as_ref()
684                        .map(|leader| NodeAddressKey::with_datanode(leader.id).to_bytes()),
685                )
686        })
687        .collect()
688}
689
690#[cfg(test)]
691mod tests {
692    use std::sync::Arc;
693
694    use super::*;
695    use crate::kv_backend::memory::MemoryKvBackend;
696    use crate::kv_backend::{KvBackend, TxnService};
697    use crate::peer::Peer;
698    use crate::rpc::router::Region;
699    use crate::rpc::store::PutRequest;
700
701    #[test]
702    fn test_table_route_compatibility() {
703        let old_raw_v = r#"{"region_routes":[{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]},{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]}],"version":0}"#;
704        let v = TableRouteValue::try_from_raw_value(old_raw_v.as_bytes()).unwrap();
705
706        let expected_table_route = TableRouteValue::Physical(PhysicalTableRouteValue {
707            region_routes: vec![
708                RegionRoute {
709                    region: Region {
710                        id: RegionId::new(0, 1),
711                        name: "r1".to_string(),
712                        partition: None,
713                        attrs: Default::default(),
714                    },
715                    leader_peer: Some(Peer {
716                        id: 2,
717                        addr: "a2".to_string(),
718                    }),
719                    follower_peers: vec![],
720                    leader_state: None,
721                    leader_down_since: None,
722                },
723                RegionRoute {
724                    region: Region {
725                        id: RegionId::new(0, 1),
726                        name: "r1".to_string(),
727                        partition: None,
728                        attrs: Default::default(),
729                    },
730                    leader_peer: Some(Peer {
731                        id: 2,
732                        addr: "a2".to_string(),
733                    }),
734                    follower_peers: vec![],
735                    leader_state: None,
736                    leader_down_since: None,
737                },
738            ],
739            version: 0,
740        });
741
742        assert_eq!(v, expected_table_route);
743    }
744
745    #[test]
746    fn test_key_serialization() {
747        let key = TableRouteKey::new(42);
748        let raw_key = key.to_bytes();
749        assert_eq!(raw_key, b"__table_route/42");
750    }
751
752    #[test]
753    fn test_key_deserialization() {
754        let expected = TableRouteKey::new(42);
755        let key = TableRouteKey::from_bytes(b"__table_route/42").unwrap();
756        assert_eq!(key, expected);
757    }
758
759    #[tokio::test]
760    async fn test_table_route_storage_get_with_raw_bytes_empty() {
761        let kv = Arc::new(MemoryKvBackend::default());
762        let table_route_storage = TableRouteStorage::new(kv);
763        let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
764        assert!(table_route.is_none());
765    }
766
767    #[tokio::test]
768    async fn test_table_route_storage_get_with_raw_bytes() {
769        let kv = Arc::new(MemoryKvBackend::default());
770        let table_route_storage = TableRouteStorage::new(kv.clone());
771        let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
772        assert!(table_route.is_none());
773        let table_route_manager = TableRouteManager::new(kv.clone());
774        let table_route_value = TableRouteValue::Logical(LogicalTableRouteValue {
775            physical_table_id: 1023,
776            region_ids: vec![RegionId::new(1023, 1)],
777        });
778        let (txn, _) = table_route_manager
779            .table_route_storage()
780            .build_create_txn(1024, &table_route_value)
781            .unwrap();
782        let r = kv.txn(txn).await.unwrap();
783        assert!(r.succeeded);
784        let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
785        assert!(table_route.is_some());
786        let got = table_route.unwrap().inner;
787        assert_eq!(got, table_route_value);
788    }
789
790    #[tokio::test]
791    async fn test_table_route_batch_get() {
792        let kv = Arc::new(MemoryKvBackend::default());
793        let table_route_storage = TableRouteStorage::new(kv.clone());
794        let routes = table_route_storage
795            .batch_get(&[1023, 1024, 1025])
796            .await
797            .unwrap();
798
799        assert!(routes.iter().all(Option::is_none));
800        let table_route_manager = TableRouteManager::new(kv.clone());
801        let routes = [
802            (
803                1024,
804                TableRouteValue::Logical(LogicalTableRouteValue {
805                    physical_table_id: 1023,
806                    region_ids: vec![RegionId::new(1023, 1)],
807                }),
808            ),
809            (
810                1025,
811                TableRouteValue::Logical(LogicalTableRouteValue {
812                    physical_table_id: 1023,
813                    region_ids: vec![RegionId::new(1023, 2)],
814                }),
815            ),
816        ];
817        for (table_id, route) in &routes {
818            let (txn, _) = table_route_manager
819                .table_route_storage()
820                .build_create_txn(*table_id, route)
821                .unwrap();
822            let r = kv.txn(txn).await.unwrap();
823            assert!(r.succeeded);
824        }
825
826        let results = table_route_storage
827            .batch_get(&[9999, 1025, 8888, 1024])
828            .await
829            .unwrap();
830        assert!(results[0].is_none());
831        assert_eq!(results[1].as_ref().unwrap(), &routes[1].1);
832        assert!(results[2].is_none());
833        assert_eq!(results[3].as_ref().unwrap(), &routes[0].1);
834    }
835
836    #[tokio::test]
837    async fn remap_route_address_updates_addresses() {
838        let kv = Arc::new(MemoryKvBackend::default());
839        let table_route_storage = TableRouteStorage::new(kv.clone());
840        let mut table_route = TableRouteValue::Physical(PhysicalTableRouteValue {
841            region_routes: vec![RegionRoute {
842                leader_peer: Some(Peer {
843                    id: 1,
844                    ..Default::default()
845                }),
846                follower_peers: vec![Peer {
847                    id: 2,
848                    ..Default::default()
849                }],
850                ..Default::default()
851            }],
852            version: 0,
853        });
854
855        kv.put(PutRequest {
856            key: NodeAddressKey::with_datanode(1).to_bytes(),
857            value: NodeAddressValue {
858                peer: Peer {
859                    addr: "addr1".to_string(),
860                    ..Default::default()
861                },
862            }
863            .try_as_raw_value()
864            .unwrap(),
865            ..Default::default()
866        })
867        .await
868        .unwrap();
869
870        table_route_storage
871            .remap_route_address(&mut table_route)
872            .await
873            .unwrap();
874
875        if let TableRouteValue::Physical(physical_table_route) = table_route {
876            assert_eq!(
877                physical_table_route.region_routes[0]
878                    .leader_peer
879                    .as_ref()
880                    .unwrap()
881                    .addr,
882                "addr1"
883            );
884            assert_eq!(
885                physical_table_route.region_routes[0].follower_peers[0].addr,
886                ""
887            );
888        } else {
889            panic!("Expected PhysicalTableRouteValue");
890        }
891    }
892}