common_meta/key/
table_route.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::sync::Arc;
18
19use serde::{Deserialize, Serialize};
20use snafu::{ensure, OptionExt, ResultExt};
21use store_api::storage::{RegionId, RegionNumber};
22use table::metadata::TableId;
23
24use crate::error::{
25    InvalidMetadataSnafu, MetadataCorruptionSnafu, Result, SerdeJsonSnafu, TableRouteNotFoundSnafu,
26    UnexpectedLogicalRouteTableSnafu,
27};
28use crate::key::node_address::{NodeAddressKey, NodeAddressValue};
29use crate::key::txn_helper::TxnOpGetResponseSet;
30use crate::key::{
31    DeserializedValueWithBytes, MetadataKey, MetadataValue, RegionDistribution,
32    TABLE_ROUTE_KEY_PATTERN, TABLE_ROUTE_PREFIX,
33};
34use crate::kv_backend::txn::Txn;
35use crate::kv_backend::KvBackendRef;
36use crate::rpc::router::{region_distribution, RegionRoute};
37use crate::rpc::store::BatchGetRequest;
38
39/// The key stores table routes
40///
41/// The layout: `__table_route/{table_id}`.
42#[derive(Debug, PartialEq)]
43pub struct TableRouteKey {
44    pub table_id: TableId,
45}
46
47impl TableRouteKey {
48    pub fn new(table_id: TableId) -> Self {
49        Self { table_id }
50    }
51}
52
53#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
54#[serde(tag = "type", rename_all = "snake_case")]
55pub enum TableRouteValue {
56    Physical(PhysicalTableRouteValue),
57    Logical(LogicalTableRouteValue),
58}
59
60#[derive(Debug, PartialEq, Serialize, Deserialize, Clone, Default)]
61pub struct PhysicalTableRouteValue {
62    pub region_routes: Vec<RegionRoute>,
63    version: u64,
64}
65
66#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
67pub struct LogicalTableRouteValue {
68    physical_table_id: TableId,
69    region_ids: Vec<RegionId>,
70}
71
72impl TableRouteValue {
73    /// Returns a [TableRouteValue::Physical] if `table_id` equals `physical_table_id`.
74    /// Otherwise returns a [TableRouteValue::Logical].
75    pub(crate) fn new(
76        table_id: TableId,
77        physical_table_id: TableId,
78        region_routes: Vec<RegionRoute>,
79    ) -> Self {
80        if table_id == physical_table_id {
81            TableRouteValue::physical(region_routes)
82        } else {
83            let region_routes = region_routes
84                .into_iter()
85                .map(|region| {
86                    debug_assert_eq!(region.region.id.table_id(), physical_table_id);
87                    RegionId::new(table_id, region.region.id.region_number())
88                })
89                .collect();
90            TableRouteValue::logical(physical_table_id, region_routes)
91        }
92    }
93
94    pub fn physical(region_routes: Vec<RegionRoute>) -> Self {
95        Self::Physical(PhysicalTableRouteValue::new(region_routes))
96    }
97
98    pub fn logical(physical_table_id: TableId, region_ids: Vec<RegionId>) -> Self {
99        Self::Logical(LogicalTableRouteValue::new(physical_table_id, region_ids))
100    }
101
102    /// Returns a new version [TableRouteValue] with `region_routes`.
103    pub fn update(&self, region_routes: Vec<RegionRoute>) -> Result<Self> {
104        ensure!(
105            self.is_physical(),
106            UnexpectedLogicalRouteTableSnafu {
107                err_msg: format!("{self:?} is a non-physical TableRouteValue."),
108            }
109        );
110        let version = self.as_physical_table_route_ref().version;
111        Ok(Self::Physical(PhysicalTableRouteValue {
112            region_routes,
113            version: version + 1,
114        }))
115    }
116
117    /// Returns the version.
118    ///
119    /// For test purpose.
120    #[cfg(any(test, feature = "testing"))]
121    pub fn version(&self) -> Result<u64> {
122        ensure!(
123            self.is_physical(),
124            UnexpectedLogicalRouteTableSnafu {
125                err_msg: format!("{self:?} is a non-physical TableRouteValue."),
126            }
127        );
128        Ok(self.as_physical_table_route_ref().version)
129    }
130
131    /// Returns the corresponding [RegionRoute], returns `None` if it's the specific region is not found.
132    ///
133    /// Note: It throws an error if it's a logical table
134    pub fn region_route(&self, region_id: RegionId) -> Result<Option<RegionRoute>> {
135        ensure!(
136            self.is_physical(),
137            UnexpectedLogicalRouteTableSnafu {
138                err_msg: format!("{self:?} is a non-physical TableRouteValue."),
139            }
140        );
141        Ok(self
142            .as_physical_table_route_ref()
143            .region_routes
144            .iter()
145            .find(|route| route.region.id == region_id)
146            .cloned())
147    }
148
149    /// Returns true if it's [TableRouteValue::Physical].
150    pub fn is_physical(&self) -> bool {
151        matches!(self, TableRouteValue::Physical(_))
152    }
153
154    /// Gets the [RegionRoute]s of this [TableRouteValue::Physical].
155    pub fn region_routes(&self) -> Result<&Vec<RegionRoute>> {
156        ensure!(
157            self.is_physical(),
158            UnexpectedLogicalRouteTableSnafu {
159                err_msg: format!("{self:?} is a non-physical TableRouteValue."),
160            }
161        );
162        Ok(&self.as_physical_table_route_ref().region_routes)
163    }
164
165    /// Returns the reference of [`PhysicalTableRouteValue`].
166    ///
167    /// # Panic
168    /// If it is not the [`PhysicalTableRouteValue`].
169    fn as_physical_table_route_ref(&self) -> &PhysicalTableRouteValue {
170        match self {
171            TableRouteValue::Physical(x) => x,
172            _ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
173        }
174    }
175
176    /// Converts to [`PhysicalTableRouteValue`].
177    ///
178    /// # Panic
179    /// If it is not the [`PhysicalTableRouteValue`].
180    pub fn into_physical_table_route(self) -> PhysicalTableRouteValue {
181        match self {
182            TableRouteValue::Physical(x) => x,
183            _ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
184        }
185    }
186
187    pub fn region_numbers(&self) -> Vec<RegionNumber> {
188        match self {
189            TableRouteValue::Physical(x) => x
190                .region_routes
191                .iter()
192                .map(|region_route| region_route.region.id.region_number())
193                .collect(),
194            TableRouteValue::Logical(x) => x
195                .region_ids()
196                .iter()
197                .map(|region_id| region_id.region_number())
198                .collect(),
199        }
200    }
201}
202
203impl MetadataValue for TableRouteValue {
204    fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
205        let r = serde_json::from_slice::<TableRouteValue>(raw_value);
206        match r {
207            // Compatible with old TableRouteValue.
208            Err(e) if e.is_data() => Ok(Self::Physical(
209                serde_json::from_slice::<PhysicalTableRouteValue>(raw_value)
210                    .context(SerdeJsonSnafu)?,
211            )),
212            Ok(x) => Ok(x),
213            Err(e) => Err(e).context(SerdeJsonSnafu),
214        }
215    }
216
217    fn try_as_raw_value(&self) -> Result<Vec<u8>> {
218        serde_json::to_vec(self).context(SerdeJsonSnafu)
219    }
220}
221
222impl PhysicalTableRouteValue {
223    pub fn new(region_routes: Vec<RegionRoute>) -> Self {
224        Self {
225            region_routes,
226            version: 0,
227        }
228    }
229}
230
231impl LogicalTableRouteValue {
232    pub fn new(physical_table_id: TableId, region_ids: Vec<RegionId>) -> Self {
233        Self {
234            physical_table_id,
235            region_ids,
236        }
237    }
238
239    pub fn physical_table_id(&self) -> TableId {
240        self.physical_table_id
241    }
242
243    pub fn region_ids(&self) -> &Vec<RegionId> {
244        &self.region_ids
245    }
246}
247
248impl MetadataKey<'_, TableRouteKey> for TableRouteKey {
249    fn to_bytes(&self) -> Vec<u8> {
250        self.to_string().into_bytes()
251    }
252
253    fn from_bytes(bytes: &[u8]) -> Result<TableRouteKey> {
254        let key = std::str::from_utf8(bytes).map_err(|e| {
255            InvalidMetadataSnafu {
256                err_msg: format!(
257                    "TableRouteKey '{}' is not a valid UTF8 string: {e}",
258                    String::from_utf8_lossy(bytes)
259                ),
260            }
261            .build()
262        })?;
263        let captures = TABLE_ROUTE_KEY_PATTERN
264            .captures(key)
265            .context(InvalidMetadataSnafu {
266                err_msg: format!("Invalid TableRouteKey '{key}'"),
267            })?;
268        // Safety: pass the regex check above
269        let table_id = captures[1].parse::<TableId>().unwrap();
270        Ok(TableRouteKey { table_id })
271    }
272}
273
274impl Display for TableRouteKey {
275    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276        write!(f, "{}/{}", TABLE_ROUTE_PREFIX, self.table_id)
277    }
278}
279
280pub type TableRouteManagerRef = Arc<TableRouteManager>;
281
282pub struct TableRouteManager {
283    storage: TableRouteStorage,
284}
285
286impl TableRouteManager {
287    pub fn new(kv_backend: KvBackendRef) -> Self {
288        Self {
289            storage: TableRouteStorage::new(kv_backend),
290        }
291    }
292
293    /// Returns the [TableId] recursively.
294    ///
295    /// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
296    /// - the table(`logical_or_physical_table_id`) does not exist.
297    pub async fn get_physical_table_id(
298        &self,
299        logical_or_physical_table_id: TableId,
300    ) -> Result<TableId> {
301        let table_route = self
302            .storage
303            .get_inner(logical_or_physical_table_id)
304            .await?
305            .context(TableRouteNotFoundSnafu {
306                table_id: logical_or_physical_table_id,
307            })?;
308
309        match table_route {
310            TableRouteValue::Physical(_) => Ok(logical_or_physical_table_id),
311            TableRouteValue::Logical(x) => Ok(x.physical_table_id()),
312        }
313    }
314
315    /// Returns the [TableRouteValue::Physical] recursively.
316    ///
317    /// Returns a [TableRouteNotFound](error::Error::TableRouteNotFound) Error if:
318    /// - the physical table(`logical_or_physical_table_id`) does not exist
319    /// - the corresponding physical table of the logical table(`logical_or_physical_table_id`) does not exist.
320    pub async fn get_physical_table_route(
321        &self,
322        logical_or_physical_table_id: TableId,
323    ) -> Result<(TableId, PhysicalTableRouteValue)> {
324        let table_route = self
325            .storage
326            .get(logical_or_physical_table_id)
327            .await?
328            .context(TableRouteNotFoundSnafu {
329                table_id: logical_or_physical_table_id,
330            })?;
331
332        match table_route {
333            TableRouteValue::Physical(x) => Ok((logical_or_physical_table_id, x)),
334            TableRouteValue::Logical(x) => {
335                let physical_table_id = x.physical_table_id();
336                let physical_table_route = self.storage.get(physical_table_id).await?.context(
337                    TableRouteNotFoundSnafu {
338                        table_id: physical_table_id,
339                    },
340                )?;
341                let physical_table_route = physical_table_route.into_physical_table_route();
342                Ok((physical_table_id, physical_table_route))
343            }
344        }
345    }
346
347    /// Returns the [TableRouteValue::Physical] recursively.
348    ///
349    /// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
350    /// - one of the logical tables corresponding to the physical table does not exist.
351    ///
352    /// **Notes**: it may return a subset of `logical_or_physical_table_ids`.
353    pub async fn batch_get_physical_table_routes(
354        &self,
355        logical_or_physical_table_ids: &[TableId],
356    ) -> Result<HashMap<TableId, PhysicalTableRouteValue>> {
357        let table_routes = self
358            .storage
359            .batch_get(logical_or_physical_table_ids)
360            .await?;
361        // Returns a subset of `logical_or_physical_table_ids`.
362        let table_routes = table_routes
363            .into_iter()
364            .zip(logical_or_physical_table_ids)
365            .filter_map(|(route, id)| route.map(|route| (*id, route)))
366            .collect::<HashMap<_, _>>();
367
368        let mut physical_table_routes = HashMap::with_capacity(table_routes.len());
369        let mut logical_table_ids = HashMap::with_capacity(table_routes.len());
370
371        for (table_id, table_route) in table_routes {
372            match table_route {
373                TableRouteValue::Physical(x) => {
374                    physical_table_routes.insert(table_id, x);
375                }
376                TableRouteValue::Logical(x) => {
377                    logical_table_ids.insert(table_id, x.physical_table_id());
378                }
379            }
380        }
381
382        if logical_table_ids.is_empty() {
383            return Ok(physical_table_routes);
384        }
385
386        // Finds the logical tables corresponding to the physical tables.
387        let physical_table_ids = logical_table_ids
388            .values()
389            .cloned()
390            .collect::<HashSet<_>>()
391            .into_iter()
392            .collect::<Vec<_>>();
393        let table_routes = self
394            .table_route_storage()
395            .batch_get(&physical_table_ids)
396            .await?;
397        let table_routes = table_routes
398            .into_iter()
399            .zip(physical_table_ids)
400            .filter_map(|(route, id)| route.map(|route| (id, route)))
401            .collect::<HashMap<_, _>>();
402
403        for (logical_table_id, physical_table_id) in logical_table_ids {
404            let table_route =
405                table_routes
406                    .get(&physical_table_id)
407                    .context(TableRouteNotFoundSnafu {
408                        table_id: physical_table_id,
409                    })?;
410            match table_route {
411                TableRouteValue::Physical(x) => {
412                    physical_table_routes.insert(logical_table_id, x.clone());
413                }
414                TableRouteValue::Logical(x) => {
415                    // Never get here, because we use a physical table id cannot obtain a logical table.
416                    MetadataCorruptionSnafu {
417                        err_msg: format!(
418                            "logical table {} {:?} cannot be resolved to a physical table.",
419                            logical_table_id, x
420                        ),
421                    }
422                    .fail()?;
423                }
424            }
425        }
426
427        Ok(physical_table_routes)
428    }
429
430    /// Returns [`RegionDistribution`] of the table(`table_id`).
431    pub async fn get_region_distribution(
432        &self,
433        table_id: TableId,
434    ) -> Result<Option<RegionDistribution>> {
435        self.storage
436            .get(table_id)
437            .await?
438            .map(|table_route| Ok(region_distribution(table_route.region_routes()?)))
439            .transpose()
440    }
441
442    /// Returns low-level APIs.
443    pub fn table_route_storage(&self) -> &TableRouteStorage {
444        &self.storage
445    }
446}
447
448/// Low-level operations of [TableRouteValue].
449pub struct TableRouteStorage {
450    kv_backend: KvBackendRef,
451}
452
453pub type TableRouteValueDecodeResult = Result<Option<DeserializedValueWithBytes<TableRouteValue>>>;
454
455impl TableRouteStorage {
456    pub fn new(kv_backend: KvBackendRef) -> Self {
457        Self { kv_backend }
458    }
459
460    /// Builds a create table route transaction,
461    /// it expected the `__table_route/{table_id}` wasn't occupied.
462    pub fn build_create_txn(
463        &self,
464        table_id: TableId,
465        table_route_value: &TableRouteValue,
466    ) -> Result<(
467        Txn,
468        impl FnOnce(&mut TxnOpGetResponseSet) -> TableRouteValueDecodeResult,
469    )> {
470        let key = TableRouteKey::new(table_id);
471        let raw_key = key.to_bytes();
472
473        let txn = Txn::put_if_not_exists(raw_key.clone(), table_route_value.try_as_raw_value()?);
474
475        Ok((
476            txn,
477            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
478        ))
479    }
480
481    // TODO(LFC): restore its original visibility after some test utility codes are refined
482    /// Builds a update table route transaction,
483    /// it expected the remote value equals the `current_table_route_value`.
484    /// It retrieves the latest value if the comparing failed.
485    pub fn build_update_txn(
486        &self,
487        table_id: TableId,
488        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
489        new_table_route_value: &TableRouteValue,
490    ) -> Result<(
491        Txn,
492        impl FnOnce(&mut TxnOpGetResponseSet) -> TableRouteValueDecodeResult,
493    )> {
494        let key = TableRouteKey::new(table_id);
495        let raw_key = key.to_bytes();
496        let raw_value = current_table_route_value.get_raw_bytes();
497        let new_raw_value: Vec<u8> = new_table_route_value.try_as_raw_value()?;
498
499        let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
500
501        Ok((
502            txn,
503            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
504        ))
505    }
506
507    /// Returns the [`TableRouteValue`].
508    pub async fn get(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
509        let mut table_route = self.get_inner(table_id).await?;
510        if let Some(table_route) = &mut table_route {
511            self.remap_route_address(table_route).await?;
512        };
513
514        Ok(table_route)
515    }
516
517    async fn get_inner(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
518        let key = TableRouteKey::new(table_id);
519        self.kv_backend
520            .get(&key.to_bytes())
521            .await?
522            .map(|kv| TableRouteValue::try_from_raw_value(&kv.value))
523            .transpose()
524    }
525
526    /// Returns the [`TableRouteValue`] wrapped with [`DeserializedValueWithBytes`].
527    pub async fn get_with_raw_bytes(
528        &self,
529        table_id: TableId,
530    ) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
531        let mut table_route = self.get_with_raw_bytes_inner(table_id).await?;
532        if let Some(table_route) = &mut table_route {
533            self.remap_route_address(table_route).await?;
534        };
535
536        Ok(table_route)
537    }
538
539    async fn get_with_raw_bytes_inner(
540        &self,
541        table_id: TableId,
542    ) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
543        let key = TableRouteKey::new(table_id);
544        self.kv_backend
545            .get(&key.to_bytes())
546            .await?
547            .map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value))
548            .transpose()
549    }
550
551    /// Returns batch of [`TableRouteValue`] that respects the order of `table_ids`.
552    pub async fn batch_get(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRouteValue>>> {
553        let mut table_routes = self.batch_get_inner(table_ids).await?;
554        self.remap_routes_addresses(&mut table_routes).await?;
555
556        Ok(table_routes)
557    }
558
559    async fn batch_get_inner(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRouteValue>>> {
560        let keys = table_ids
561            .iter()
562            .map(|id| TableRouteKey::new(*id).to_bytes())
563            .collect::<Vec<_>>();
564        let resp = self
565            .kv_backend
566            .batch_get(BatchGetRequest { keys: keys.clone() })
567            .await?;
568
569        let kvs = resp
570            .kvs
571            .into_iter()
572            .map(|kv| (kv.key, kv.value))
573            .collect::<HashMap<_, _>>();
574        keys.into_iter()
575            .map(|key| {
576                if let Some(value) = kvs.get(&key) {
577                    Ok(Some(TableRouteValue::try_from_raw_value(value)?))
578                } else {
579                    Ok(None)
580                }
581            })
582            .collect()
583    }
584
585    async fn remap_routes_addresses(
586        &self,
587        table_routes: &mut [Option<TableRouteValue>],
588    ) -> Result<()> {
589        let keys = table_routes
590            .iter()
591            .flat_map(|table_route| {
592                table_route
593                    .as_ref()
594                    .map(extract_address_keys)
595                    .unwrap_or_default()
596            })
597            .collect::<HashSet<_>>()
598            .into_iter()
599            .collect();
600        let node_addrs = self.get_node_addresses(keys).await?;
601        for table_route in table_routes.iter_mut().flatten() {
602            set_addresses(&node_addrs, table_route)?;
603        }
604
605        Ok(())
606    }
607
608    async fn remap_route_address(&self, table_route: &mut TableRouteValue) -> Result<()> {
609        let keys = extract_address_keys(table_route).into_iter().collect();
610        let node_addrs = self.get_node_addresses(keys).await?;
611        set_addresses(&node_addrs, table_route)?;
612
613        Ok(())
614    }
615
616    async fn get_node_addresses(
617        &self,
618        keys: Vec<Vec<u8>>,
619    ) -> Result<HashMap<u64, NodeAddressValue>> {
620        if keys.is_empty() {
621            return Ok(HashMap::default());
622        }
623
624        self.kv_backend
625            .batch_get(BatchGetRequest { keys })
626            .await?
627            .kvs
628            .into_iter()
629            .map(|kv| {
630                let node_id = NodeAddressKey::from_bytes(&kv.key)?.node_id;
631                let node_addr = NodeAddressValue::try_from_raw_value(&kv.value)?;
632                Ok((node_id, node_addr))
633            })
634            .collect()
635    }
636}
637
638fn set_addresses(
639    node_addrs: &HashMap<u64, NodeAddressValue>,
640    table_route: &mut TableRouteValue,
641) -> Result<()> {
642    let TableRouteValue::Physical(physical_table_route) = table_route else {
643        return Ok(());
644    };
645
646    for region_route in &mut physical_table_route.region_routes {
647        if let Some(leader) = &mut region_route.leader_peer {
648            if let Some(node_addr) = node_addrs.get(&leader.id) {
649                leader.addr = node_addr.peer.addr.clone();
650            }
651        }
652        for follower in &mut region_route.follower_peers {
653            if let Some(node_addr) = node_addrs.get(&follower.id) {
654                follower.addr = node_addr.peer.addr.clone();
655            }
656        }
657    }
658
659    Ok(())
660}
661
662fn extract_address_keys(table_route: &TableRouteValue) -> HashSet<Vec<u8>> {
663    let TableRouteValue::Physical(physical_table_route) = table_route else {
664        return HashSet::default();
665    };
666
667    physical_table_route
668        .region_routes
669        .iter()
670        .flat_map(|region_route| {
671            region_route
672                .follower_peers
673                .iter()
674                .map(|peer| NodeAddressKey::with_datanode(peer.id).to_bytes())
675                .chain(
676                    region_route
677                        .leader_peer
678                        .as_ref()
679                        .map(|leader| NodeAddressKey::with_datanode(leader.id).to_bytes()),
680                )
681        })
682        .collect()
683}
684
685#[cfg(test)]
686mod tests {
687    use std::sync::Arc;
688
689    use super::*;
690    use crate::kv_backend::memory::MemoryKvBackend;
691    use crate::kv_backend::{KvBackend, TxnService};
692    use crate::peer::Peer;
693    use crate::rpc::router::Region;
694    use crate::rpc::store::PutRequest;
695
696    #[test]
697    fn test_table_route_compatibility() {
698        let old_raw_v = r#"{"region_routes":[{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]},{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]}],"version":0}"#;
699        let v = TableRouteValue::try_from_raw_value(old_raw_v.as_bytes()).unwrap();
700
701        let expected_table_route = TableRouteValue::Physical(PhysicalTableRouteValue {
702            region_routes: vec![
703                RegionRoute {
704                    region: Region {
705                        id: RegionId::new(0, 1),
706                        name: "r1".to_string(),
707                        partition: None,
708                        attrs: Default::default(),
709                    },
710                    leader_peer: Some(Peer {
711                        id: 2,
712                        addr: "a2".to_string(),
713                    }),
714                    follower_peers: vec![],
715                    leader_state: None,
716                    leader_down_since: None,
717                },
718                RegionRoute {
719                    region: Region {
720                        id: RegionId::new(0, 1),
721                        name: "r1".to_string(),
722                        partition: None,
723                        attrs: Default::default(),
724                    },
725                    leader_peer: Some(Peer {
726                        id: 2,
727                        addr: "a2".to_string(),
728                    }),
729                    follower_peers: vec![],
730                    leader_state: None,
731                    leader_down_since: None,
732                },
733            ],
734            version: 0,
735        });
736
737        assert_eq!(v, expected_table_route);
738    }
739
740    #[test]
741    fn test_key_serialization() {
742        let key = TableRouteKey::new(42);
743        let raw_key = key.to_bytes();
744        assert_eq!(raw_key, b"__table_route/42");
745    }
746
747    #[test]
748    fn test_key_deserialization() {
749        let expected = TableRouteKey::new(42);
750        let key = TableRouteKey::from_bytes(b"__table_route/42").unwrap();
751        assert_eq!(key, expected);
752    }
753
754    #[tokio::test]
755    async fn test_table_route_storage_get_with_raw_bytes_empty() {
756        let kv = Arc::new(MemoryKvBackend::default());
757        let table_route_storage = TableRouteStorage::new(kv);
758        let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
759        assert!(table_route.is_none());
760    }
761
762    #[tokio::test]
763    async fn test_table_route_storage_get_with_raw_bytes() {
764        let kv = Arc::new(MemoryKvBackend::default());
765        let table_route_storage = TableRouteStorage::new(kv.clone());
766        let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
767        assert!(table_route.is_none());
768        let table_route_manager = TableRouteManager::new(kv.clone());
769        let table_route_value = TableRouteValue::Logical(LogicalTableRouteValue {
770            physical_table_id: 1023,
771            region_ids: vec![RegionId::new(1023, 1)],
772        });
773        let (txn, _) = table_route_manager
774            .table_route_storage()
775            .build_create_txn(1024, &table_route_value)
776            .unwrap();
777        let r = kv.txn(txn).await.unwrap();
778        assert!(r.succeeded);
779        let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
780        assert!(table_route.is_some());
781        let got = table_route.unwrap().inner;
782        assert_eq!(got, table_route_value);
783    }
784
785    #[tokio::test]
786    async fn test_table_route_batch_get() {
787        let kv = Arc::new(MemoryKvBackend::default());
788        let table_route_storage = TableRouteStorage::new(kv.clone());
789        let routes = table_route_storage
790            .batch_get(&[1023, 1024, 1025])
791            .await
792            .unwrap();
793
794        assert!(routes.iter().all(Option::is_none));
795        let table_route_manager = TableRouteManager::new(kv.clone());
796        let routes = [
797            (
798                1024,
799                TableRouteValue::Logical(LogicalTableRouteValue {
800                    physical_table_id: 1023,
801                    region_ids: vec![RegionId::new(1023, 1)],
802                }),
803            ),
804            (
805                1025,
806                TableRouteValue::Logical(LogicalTableRouteValue {
807                    physical_table_id: 1023,
808                    region_ids: vec![RegionId::new(1023, 2)],
809                }),
810            ),
811        ];
812        for (table_id, route) in &routes {
813            let (txn, _) = table_route_manager
814                .table_route_storage()
815                .build_create_txn(*table_id, route)
816                .unwrap();
817            let r = kv.txn(txn).await.unwrap();
818            assert!(r.succeeded);
819        }
820
821        let results = table_route_storage
822            .batch_get(&[9999, 1025, 8888, 1024])
823            .await
824            .unwrap();
825        assert!(results[0].is_none());
826        assert_eq!(results[1].as_ref().unwrap(), &routes[1].1);
827        assert!(results[2].is_none());
828        assert_eq!(results[3].as_ref().unwrap(), &routes[0].1);
829    }
830
831    #[tokio::test]
832    async fn remap_route_address_updates_addresses() {
833        let kv = Arc::new(MemoryKvBackend::default());
834        let table_route_storage = TableRouteStorage::new(kv.clone());
835        let mut table_route = TableRouteValue::Physical(PhysicalTableRouteValue {
836            region_routes: vec![RegionRoute {
837                leader_peer: Some(Peer {
838                    id: 1,
839                    ..Default::default()
840                }),
841                follower_peers: vec![Peer {
842                    id: 2,
843                    ..Default::default()
844                }],
845                ..Default::default()
846            }],
847            version: 0,
848        });
849
850        kv.put(PutRequest {
851            key: NodeAddressKey::with_datanode(1).to_bytes(),
852            value: NodeAddressValue {
853                peer: Peer {
854                    addr: "addr1".to_string(),
855                    ..Default::default()
856                },
857            }
858            .try_as_raw_value()
859            .unwrap(),
860            ..Default::default()
861        })
862        .await
863        .unwrap();
864
865        table_route_storage
866            .remap_route_address(&mut table_route)
867            .await
868            .unwrap();
869
870        if let TableRouteValue::Physical(physical_table_route) = table_route {
871            assert_eq!(
872                physical_table_route.region_routes[0]
873                    .leader_peer
874                    .as_ref()
875                    .unwrap()
876                    .addr,
877                "addr1"
878            );
879            assert_eq!(
880                physical_table_route.region_routes[0].follower_peers[0].addr,
881                ""
882            );
883        } else {
884            panic!("Expected PhysicalTableRouteValue");
885        }
886    }
887}