common_meta/key/
table_route.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::sync::Arc;
18
19use serde::{Deserialize, Serialize};
20use snafu::{OptionExt, ResultExt, ensure};
21use store_api::storage::{RegionId, RegionNumber};
22use table::metadata::TableId;
23
24use crate::error::{
25    InvalidMetadataSnafu, MetadataCorruptionSnafu, RegionNotFoundSnafu, Result, SerdeJsonSnafu,
26    TableRouteNotFoundSnafu, UnexpectedLogicalRouteTableSnafu,
27};
28use crate::key::node_address::{NodeAddressKey, NodeAddressValue};
29use crate::key::txn_helper::TxnOpGetResponseSet;
30use crate::key::{
31    DeserializedValueWithBytes, MetadataKey, MetadataValue, RegionDistribution,
32    TABLE_ROUTE_KEY_PATTERN, TABLE_ROUTE_PREFIX,
33};
34use crate::kv_backend::KvBackendRef;
35use crate::kv_backend::txn::Txn;
36use crate::rpc::router::{RegionRoute, region_distribution};
37use crate::rpc::store::BatchGetRequest;
38
39/// The key stores table routes
40///
41/// The layout: `__table_route/{table_id}`.
42#[derive(Debug, PartialEq)]
43pub struct TableRouteKey {
44    pub table_id: TableId,
45}
46
47impl TableRouteKey {
48    pub fn new(table_id: TableId) -> Self {
49        Self { table_id }
50    }
51
52    /// Returns the range prefix of the table route key.
53    pub fn range_prefix() -> Vec<u8> {
54        format!("{}/", TABLE_ROUTE_PREFIX).into_bytes()
55    }
56}
57
58#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
59#[serde(tag = "type", rename_all = "snake_case")]
60pub enum TableRouteValue {
61    Physical(PhysicalTableRouteValue),
62    Logical(LogicalTableRouteValue),
63}
64
65#[derive(Debug, PartialEq, Serialize, Deserialize, Clone, Default)]
66pub struct PhysicalTableRouteValue {
67    pub region_routes: Vec<RegionRoute>,
68    version: u64,
69}
70
71#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
72pub struct LogicalTableRouteValue {
73    physical_table_id: TableId,
74    region_ids: Vec<RegionId>,
75}
76
77impl TableRouteValue {
78    /// Returns a [TableRouteValue::Physical] if `table_id` equals `physical_table_id`.
79    /// Otherwise returns a [TableRouteValue::Logical].
80    pub(crate) fn new(
81        table_id: TableId,
82        physical_table_id: TableId,
83        region_routes: Vec<RegionRoute>,
84    ) -> Self {
85        if table_id == physical_table_id {
86            TableRouteValue::physical(region_routes)
87        } else {
88            let region_routes = region_routes
89                .into_iter()
90                .map(|region| {
91                    debug_assert_eq!(region.region.id.table_id(), physical_table_id);
92                    RegionId::new(table_id, region.region.id.region_number())
93                })
94                .collect();
95            TableRouteValue::logical(physical_table_id, region_routes)
96        }
97    }
98
99    pub fn physical(region_routes: Vec<RegionRoute>) -> Self {
100        Self::Physical(PhysicalTableRouteValue::new(region_routes))
101    }
102
103    pub fn logical(physical_table_id: TableId, region_ids: Vec<RegionId>) -> Self {
104        Self::Logical(LogicalTableRouteValue::new(physical_table_id, region_ids))
105    }
106
107    /// Returns a new version [TableRouteValue] with `region_routes`.
108    pub fn update(&self, region_routes: Vec<RegionRoute>) -> Result<Self> {
109        ensure!(
110            self.is_physical(),
111            UnexpectedLogicalRouteTableSnafu {
112                err_msg: format!("{self:?} is a non-physical TableRouteValue."),
113            }
114        );
115        let version = self.as_physical_table_route_ref().version;
116        Ok(Self::Physical(PhysicalTableRouteValue {
117            region_routes,
118            version: version + 1,
119        }))
120    }
121
122    /// Returns the version.
123    ///
124    /// For test purpose.
125    #[cfg(any(test, feature = "testing"))]
126    pub fn version(&self) -> Result<u64> {
127        ensure!(
128            self.is_physical(),
129            UnexpectedLogicalRouteTableSnafu {
130                err_msg: format!("{self:?} is a non-physical TableRouteValue."),
131            }
132        );
133        Ok(self.as_physical_table_route_ref().version)
134    }
135
136    /// Returns the corresponding [RegionRoute], returns `None` if it's the specific region is not found.
137    ///
138    /// Note: It throws an error if it's a logical table
139    pub fn region_route(&self, region_id: RegionId) -> Result<Option<RegionRoute>> {
140        ensure!(
141            self.is_physical(),
142            UnexpectedLogicalRouteTableSnafu {
143                err_msg: format!("{self:?} is a non-physical TableRouteValue."),
144            }
145        );
146        Ok(self
147            .as_physical_table_route_ref()
148            .region_routes
149            .iter()
150            .find(|route| route.region.id == region_id)
151            .cloned())
152    }
153
154    /// Returns true if it's [TableRouteValue::Physical].
155    pub fn is_physical(&self) -> bool {
156        matches!(self, TableRouteValue::Physical(_))
157    }
158
159    /// Gets the [RegionRoute]s of this [TableRouteValue::Physical].
160    pub fn region_routes(&self) -> Result<&Vec<RegionRoute>> {
161        ensure!(
162            self.is_physical(),
163            UnexpectedLogicalRouteTableSnafu {
164                err_msg: format!("{self:?} is a non-physical TableRouteValue."),
165            }
166        );
167        Ok(&self.as_physical_table_route_ref().region_routes)
168    }
169
170    /// Returns the reference of [`PhysicalTableRouteValue`].
171    ///
172    /// # Panic
173    /// If it is not the [`PhysicalTableRouteValue`].
174    fn as_physical_table_route_ref(&self) -> &PhysicalTableRouteValue {
175        match self {
176            TableRouteValue::Physical(x) => x,
177            _ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
178        }
179    }
180
181    /// Converts to [`PhysicalTableRouteValue`].
182    ///
183    /// # Panic
184    /// If it is not the [`PhysicalTableRouteValue`].
185    pub fn into_physical_table_route(self) -> PhysicalTableRouteValue {
186        match self {
187            TableRouteValue::Physical(x) => x,
188            _ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
189        }
190    }
191
192    /// Converts to [`LogicalTableRouteValue`].
193    ///
194    /// # Panic
195    /// If it is not the [`LogicalTableRouteValue`].
196    pub fn into_logical_table_route(self) -> LogicalTableRouteValue {
197        match self {
198            TableRouteValue::Logical(x) => x,
199            _ => unreachable!("Mistakenly been treated as a Logical TableRoute: {self:?}"),
200        }
201    }
202
203    pub fn region_numbers(&self) -> Vec<RegionNumber> {
204        match self {
205            TableRouteValue::Physical(x) => x
206                .region_routes
207                .iter()
208                .map(|region_route| region_route.region.id.region_number())
209                .collect(),
210            TableRouteValue::Logical(x) => x
211                .region_ids()
212                .iter()
213                .map(|region_id| region_id.region_number())
214                .collect(),
215        }
216    }
217}
218
219impl MetadataValue for TableRouteValue {
220    fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
221        let r = serde_json::from_slice::<TableRouteValue>(raw_value);
222        match r {
223            // Compatible with old TableRouteValue.
224            Err(e) if e.is_data() => Ok(Self::Physical(
225                serde_json::from_slice::<PhysicalTableRouteValue>(raw_value)
226                    .context(SerdeJsonSnafu)?,
227            )),
228            Ok(x) => Ok(x),
229            Err(e) => Err(e).context(SerdeJsonSnafu),
230        }
231    }
232
233    fn try_as_raw_value(&self) -> Result<Vec<u8>> {
234        serde_json::to_vec(self).context(SerdeJsonSnafu)
235    }
236}
237
238impl PhysicalTableRouteValue {
239    pub fn new(region_routes: Vec<RegionRoute>) -> Self {
240        Self {
241            region_routes,
242            version: 0,
243        }
244    }
245}
246
247impl LogicalTableRouteValue {
248    pub fn new(physical_table_id: TableId, region_ids: Vec<RegionId>) -> Self {
249        Self {
250            physical_table_id,
251            region_ids,
252        }
253    }
254
255    pub fn physical_table_id(&self) -> TableId {
256        self.physical_table_id
257    }
258
259    pub fn region_ids(&self) -> &Vec<RegionId> {
260        &self.region_ids
261    }
262}
263
264impl MetadataKey<'_, TableRouteKey> for TableRouteKey {
265    fn to_bytes(&self) -> Vec<u8> {
266        self.to_string().into_bytes()
267    }
268
269    fn from_bytes(bytes: &[u8]) -> Result<TableRouteKey> {
270        let key = std::str::from_utf8(bytes).map_err(|e| {
271            InvalidMetadataSnafu {
272                err_msg: format!(
273                    "TableRouteKey '{}' is not a valid UTF8 string: {e}",
274                    String::from_utf8_lossy(bytes)
275                ),
276            }
277            .build()
278        })?;
279        let captures = TABLE_ROUTE_KEY_PATTERN
280            .captures(key)
281            .context(InvalidMetadataSnafu {
282                err_msg: format!("Invalid TableRouteKey '{key}'"),
283            })?;
284        // Safety: pass the regex check above
285        let table_id = captures[1].parse::<TableId>().unwrap();
286        Ok(TableRouteKey { table_id })
287    }
288}
289
290impl Display for TableRouteKey {
291    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
292        write!(f, "{}/{}", TABLE_ROUTE_PREFIX, self.table_id)
293    }
294}
295
296pub type TableRouteManagerRef = Arc<TableRouteManager>;
297
298pub struct TableRouteManager {
299    storage: TableRouteStorage,
300}
301
302impl TableRouteManager {
303    pub fn new(kv_backend: KvBackendRef) -> Self {
304        Self {
305            storage: TableRouteStorage::new(kv_backend),
306        }
307    }
308
309    /// Returns the [TableId] recursively.
310    ///
311    /// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
312    /// - the table(`logical_or_physical_table_id`) does not exist.
313    pub async fn get_physical_table_id(
314        &self,
315        logical_or_physical_table_id: TableId,
316    ) -> Result<TableId> {
317        let table_route = self
318            .storage
319            .get_inner(logical_or_physical_table_id)
320            .await?
321            .context(TableRouteNotFoundSnafu {
322                table_id: logical_or_physical_table_id,
323            })?;
324
325        match table_route {
326            TableRouteValue::Physical(_) => Ok(logical_or_physical_table_id),
327            TableRouteValue::Logical(x) => Ok(x.physical_table_id()),
328        }
329    }
330
331    /// Returns the [TableRouteValue::Physical] recursively.
332    ///
333    /// Returns a [TableRouteNotFound](error::Error::TableRouteNotFound) Error if:
334    /// - the physical table(`logical_or_physical_table_id`) does not exist
335    /// - the corresponding physical table of the logical table(`logical_or_physical_table_id`) does not exist.
336    pub async fn get_physical_table_route(
337        &self,
338        logical_or_physical_table_id: TableId,
339    ) -> Result<(TableId, PhysicalTableRouteValue)> {
340        let table_route = self
341            .storage
342            .get(logical_or_physical_table_id)
343            .await?
344            .context(TableRouteNotFoundSnafu {
345                table_id: logical_or_physical_table_id,
346            })?;
347
348        match table_route {
349            TableRouteValue::Physical(x) => Ok((logical_or_physical_table_id, x)),
350            TableRouteValue::Logical(x) => {
351                let physical_table_id = x.physical_table_id();
352                let physical_table_route = self.storage.get(physical_table_id).await?.context(
353                    TableRouteNotFoundSnafu {
354                        table_id: physical_table_id,
355                    },
356                )?;
357                let physical_table_route = physical_table_route.into_physical_table_route();
358                Ok((physical_table_id, physical_table_route))
359            }
360        }
361    }
362
363    /// Returns the [TableRouteValue::Physical] recursively.
364    ///
365    /// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
366    /// - one of the logical tables corresponding to the physical table does not exist.
367    ///
368    /// **Notes**: it may return a subset of `logical_or_physical_table_ids`.
369    pub async fn batch_get_physical_table_routes(
370        &self,
371        logical_or_physical_table_ids: &[TableId],
372    ) -> Result<HashMap<TableId, PhysicalTableRouteValue>> {
373        let table_routes = self
374            .storage
375            .batch_get(logical_or_physical_table_ids)
376            .await?;
377        // Returns a subset of `logical_or_physical_table_ids`.
378        let table_routes = table_routes
379            .into_iter()
380            .zip(logical_or_physical_table_ids)
381            .filter_map(|(route, id)| route.map(|route| (*id, route)))
382            .collect::<HashMap<_, _>>();
383
384        let mut physical_table_routes = HashMap::with_capacity(table_routes.len());
385        let mut logical_table_ids = HashMap::with_capacity(table_routes.len());
386
387        for (table_id, table_route) in table_routes {
388            match table_route {
389                TableRouteValue::Physical(x) => {
390                    physical_table_routes.insert(table_id, x);
391                }
392                TableRouteValue::Logical(x) => {
393                    logical_table_ids.insert(table_id, x.physical_table_id());
394                }
395            }
396        }
397
398        if logical_table_ids.is_empty() {
399            return Ok(physical_table_routes);
400        }
401
402        // Finds the logical tables corresponding to the physical tables.
403        let physical_table_ids = logical_table_ids
404            .values()
405            .cloned()
406            .collect::<HashSet<_>>()
407            .into_iter()
408            .collect::<Vec<_>>();
409        let table_routes = self
410            .table_route_storage()
411            .batch_get(&physical_table_ids)
412            .await?;
413        let table_routes = table_routes
414            .into_iter()
415            .zip(physical_table_ids)
416            .filter_map(|(route, id)| route.map(|route| (id, route)))
417            .collect::<HashMap<_, _>>();
418
419        for (logical_table_id, physical_table_id) in logical_table_ids {
420            let table_route =
421                table_routes
422                    .get(&physical_table_id)
423                    .context(TableRouteNotFoundSnafu {
424                        table_id: physical_table_id,
425                    })?;
426            match table_route {
427                TableRouteValue::Physical(x) => {
428                    physical_table_routes.insert(logical_table_id, x.clone());
429                }
430                TableRouteValue::Logical(x) => {
431                    // Never get here, because we use a physical table id cannot obtain a logical table.
432                    MetadataCorruptionSnafu {
433                        err_msg: format!(
434                            "logical table {} {:?} cannot be resolved to a physical table.",
435                            logical_table_id, x
436                        ),
437                    }
438                    .fail()?;
439                }
440            }
441        }
442
443        Ok(physical_table_routes)
444    }
445
446    /// Returns [`RegionDistribution`] of the table(`table_id`).
447    pub async fn get_region_distribution(
448        &self,
449        table_id: TableId,
450    ) -> Result<Option<RegionDistribution>> {
451        self.storage
452            .get(table_id)
453            .await?
454            .map(|table_route| Ok(region_distribution(table_route.region_routes()?)))
455            .transpose()
456    }
457
458    /// Sets the staging state for a specific region.
459    ///
460    /// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
461    /// - the table does not exist
462    /// - the region is not found in the table
463    pub async fn set_region_staging_state(
464        &self,
465        region_id: store_api::storage::RegionId,
466        staging: bool,
467    ) -> Result<()> {
468        let table_id = region_id.table_id();
469
470        // Get current table route with raw bytes for CAS operation
471        let current_table_route = self
472            .storage
473            .get_with_raw_bytes(table_id)
474            .await?
475            .context(TableRouteNotFoundSnafu { table_id })?;
476
477        // Clone the current route value and update the specific region
478        let new_table_route = current_table_route.inner.clone();
479
480        // Only physical tables have region routes
481        ensure!(
482            new_table_route.is_physical(),
483            UnexpectedLogicalRouteTableSnafu {
484                err_msg: format!("Cannot set staging state for logical table {table_id}"),
485            }
486        );
487
488        let region_routes = new_table_route.region_routes()?.clone();
489        let mut updated_routes = region_routes.clone();
490
491        // Find and update the specific region
492        // TODO(ruihang): maybe update them in one transaction
493        let mut region_found = false;
494        for route in &mut updated_routes {
495            if route.region.id == region_id {
496                if staging {
497                    route.set_leader_staging();
498                } else {
499                    route.clear_leader_staging();
500                }
501                region_found = true;
502                break;
503            }
504        }
505
506        ensure!(region_found, RegionNotFoundSnafu { region_id });
507
508        // Create new table route with updated region routes
509        let updated_table_route = new_table_route.update(updated_routes)?;
510
511        // Execute atomic update
512        let (txn, _) =
513            self.storage
514                .build_update_txn(table_id, &current_table_route, &updated_table_route)?;
515
516        let result = self.storage.kv_backend.txn(txn).await?;
517
518        ensure!(
519            result.succeeded,
520            MetadataCorruptionSnafu {
521                err_msg: format!(
522                    "Failed to update staging state for region {}: CAS operation failed",
523                    region_id
524                ),
525            }
526        );
527
528        Ok(())
529    }
530
531    /// Checks if a specific region is in staging state.
532    ///
533    /// Returns false if the table/region doesn't exist.
534    pub async fn is_region_staging(&self, region_id: store_api::storage::RegionId) -> Result<bool> {
535        let table_id = region_id.table_id();
536
537        let table_route = self.storage.get(table_id).await?;
538
539        match table_route {
540            Some(route) if route.is_physical() => {
541                let region_routes = route.region_routes()?;
542                for route in region_routes {
543                    if route.region.id == region_id {
544                        return Ok(route.is_leader_staging());
545                    }
546                }
547                Ok(false)
548            }
549            _ => Ok(false),
550        }
551    }
552
553    /// Returns low-level APIs.
554    pub fn table_route_storage(&self) -> &TableRouteStorage {
555        &self.storage
556    }
557}
558
559/// Low-level operations of [TableRouteValue].
560pub struct TableRouteStorage {
561    kv_backend: KvBackendRef,
562}
563
564pub type TableRouteValueDecodeResult = Result<Option<DeserializedValueWithBytes<TableRouteValue>>>;
565
566impl TableRouteStorage {
567    pub fn new(kv_backend: KvBackendRef) -> Self {
568        Self { kv_backend }
569    }
570
571    /// Builds a create table route transaction,
572    /// it expected the `__table_route/{table_id}` wasn't occupied.
573    pub fn build_create_txn(
574        &self,
575        table_id: TableId,
576        table_route_value: &TableRouteValue,
577    ) -> Result<(
578        Txn,
579        impl FnOnce(&mut TxnOpGetResponseSet) -> TableRouteValueDecodeResult + use<>,
580    )> {
581        let key = TableRouteKey::new(table_id);
582        let raw_key = key.to_bytes();
583
584        let txn = Txn::put_if_not_exists(raw_key.clone(), table_route_value.try_as_raw_value()?);
585
586        Ok((
587            txn,
588            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
589        ))
590    }
591
592    // TODO(LFC): restore its original visibility after some test utility codes are refined
593    /// Builds a update table route transaction,
594    /// it expected the remote value equals the `current_table_route_value`.
595    /// It retrieves the latest value if the comparing failed.
596    pub fn build_update_txn(
597        &self,
598        table_id: TableId,
599        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
600        new_table_route_value: &TableRouteValue,
601    ) -> Result<(
602        Txn,
603        impl FnOnce(&mut TxnOpGetResponseSet) -> TableRouteValueDecodeResult + use<>,
604    )> {
605        let key = TableRouteKey::new(table_id);
606        let raw_key = key.to_bytes();
607        let raw_value = current_table_route_value.get_raw_bytes();
608        let new_raw_value: Vec<u8> = new_table_route_value.try_as_raw_value()?;
609
610        let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
611
612        Ok((
613            txn,
614            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
615        ))
616    }
617
618    /// Returns the [`TableRouteValue`].
619    pub async fn get(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
620        let mut table_route = self.get_inner(table_id).await?;
621        if let Some(table_route) = &mut table_route {
622            self.remap_route_address(table_route).await?;
623        };
624
625        Ok(table_route)
626    }
627
628    async fn get_inner(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
629        let key = TableRouteKey::new(table_id);
630        self.kv_backend
631            .get(&key.to_bytes())
632            .await?
633            .map(|kv| TableRouteValue::try_from_raw_value(&kv.value))
634            .transpose()
635    }
636
637    /// Returns the [`TableRouteValue`] wrapped with [`DeserializedValueWithBytes`].
638    pub async fn get_with_raw_bytes(
639        &self,
640        table_id: TableId,
641    ) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
642        let mut table_route = self.get_with_raw_bytes_inner(table_id).await?;
643        if let Some(table_route) = &mut table_route {
644            self.remap_route_address(table_route).await?;
645        };
646
647        Ok(table_route)
648    }
649
650    async fn get_with_raw_bytes_inner(
651        &self,
652        table_id: TableId,
653    ) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
654        let key = TableRouteKey::new(table_id);
655        self.kv_backend
656            .get(&key.to_bytes())
657            .await?
658            .map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value))
659            .transpose()
660    }
661
662    /// Returns batch of [`TableRouteValue`] that respects the order of `table_ids`.
663    pub async fn batch_get(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRouteValue>>> {
664        let raw_table_routes = self.batch_get_inner(table_ids).await?;
665
666        Ok(raw_table_routes
667            .into_iter()
668            .map(|v| v.map(|x| x.inner))
669            .collect())
670    }
671
672    /// Returns batch of [`TableRouteValue`] wrapped with [`DeserializedValueWithBytes`].
673    ///
674    /// The return value is a vector of [`Option<DeserializedValueWithBytes<TableRouteValue>>`].
675    /// Note: This method remaps the addresses of the table routes, but does not update their raw byte representations.
676    pub async fn batch_get_with_raw_bytes(
677        &self,
678        table_ids: &[TableId],
679    ) -> Result<Vec<Option<DeserializedValueWithBytes<TableRouteValue>>>> {
680        let mut raw_table_routes = self.batch_get_inner(table_ids).await?;
681        self.remap_routes_addresses(&mut raw_table_routes).await?;
682
683        Ok(raw_table_routes)
684    }
685
686    async fn batch_get_inner(
687        &self,
688        table_ids: &[TableId],
689    ) -> Result<Vec<Option<DeserializedValueWithBytes<TableRouteValue>>>> {
690        let keys = table_ids
691            .iter()
692            .map(|id| TableRouteKey::new(*id).to_bytes())
693            .collect::<Vec<_>>();
694        let resp = self
695            .kv_backend
696            .batch_get(BatchGetRequest { keys: keys.clone() })
697            .await?;
698
699        let kvs = resp
700            .kvs
701            .into_iter()
702            .map(|kv| (kv.key, kv.value))
703            .collect::<HashMap<_, _>>();
704        keys.into_iter()
705            .map(|key| {
706                if let Some(value) = kvs.get(&key) {
707                    Ok(Some(DeserializedValueWithBytes::from_inner_slice(value)?))
708                } else {
709                    Ok(None)
710                }
711            })
712            .collect()
713    }
714
715    async fn remap_routes_addresses(
716        &self,
717        table_routes: &mut [Option<DeserializedValueWithBytes<TableRouteValue>>],
718    ) -> Result<()> {
719        let keys = table_routes
720            .iter()
721            .flat_map(|table_route| {
722                table_route
723                    .as_ref()
724                    .map(|x| extract_address_keys(&x.inner))
725                    .unwrap_or_default()
726            })
727            .collect::<HashSet<_>>()
728            .into_iter()
729            .collect();
730        let node_addrs = self.get_node_addresses(keys).await?;
731        for table_route in table_routes.iter_mut().flatten() {
732            set_addresses(&node_addrs, table_route)?;
733        }
734
735        Ok(())
736    }
737
738    async fn remap_route_address(&self, table_route: &mut TableRouteValue) -> Result<()> {
739        let keys = extract_address_keys(table_route).into_iter().collect();
740        let node_addrs = self.get_node_addresses(keys).await?;
741        set_addresses(&node_addrs, table_route)?;
742
743        Ok(())
744    }
745
746    async fn get_node_addresses(
747        &self,
748        keys: Vec<Vec<u8>>,
749    ) -> Result<HashMap<u64, NodeAddressValue>> {
750        if keys.is_empty() {
751            return Ok(HashMap::default());
752        }
753
754        self.kv_backend
755            .batch_get(BatchGetRequest { keys })
756            .await?
757            .kvs
758            .into_iter()
759            .map(|kv| {
760                let node_id = NodeAddressKey::from_bytes(&kv.key)?.node_id;
761                let node_addr = NodeAddressValue::try_from_raw_value(&kv.value)?;
762                Ok((node_id, node_addr))
763            })
764            .collect()
765    }
766}
767
768fn set_addresses(
769    node_addrs: &HashMap<u64, NodeAddressValue>,
770    table_route: &mut TableRouteValue,
771) -> Result<()> {
772    let TableRouteValue::Physical(physical_table_route) = table_route else {
773        return Ok(());
774    };
775
776    for region_route in &mut physical_table_route.region_routes {
777        if let Some(leader) = &mut region_route.leader_peer
778            && let Some(node_addr) = node_addrs.get(&leader.id)
779        {
780            leader.addr = node_addr.peer.addr.clone();
781        }
782        for follower in &mut region_route.follower_peers {
783            if let Some(node_addr) = node_addrs.get(&follower.id) {
784                follower.addr = node_addr.peer.addr.clone();
785            }
786        }
787    }
788
789    Ok(())
790}
791
792fn extract_address_keys(table_route: &TableRouteValue) -> HashSet<Vec<u8>> {
793    let TableRouteValue::Physical(physical_table_route) = table_route else {
794        return HashSet::default();
795    };
796
797    physical_table_route
798        .region_routes
799        .iter()
800        .flat_map(|region_route| {
801            region_route
802                .follower_peers
803                .iter()
804                .map(|peer| NodeAddressKey::with_datanode(peer.id).to_bytes())
805                .chain(
806                    region_route
807                        .leader_peer
808                        .as_ref()
809                        .map(|leader| NodeAddressKey::with_datanode(leader.id).to_bytes()),
810                )
811        })
812        .collect()
813}
814
815#[cfg(test)]
816mod tests {
817    use std::sync::Arc;
818
819    use super::*;
820    use crate::kv_backend::memory::MemoryKvBackend;
821    use crate::kv_backend::{KvBackend, TxnService};
822    use crate::peer::Peer;
823    use crate::rpc::router::Region;
824    use crate::rpc::store::PutRequest;
825
826    #[test]
827    fn test_table_route_compatibility() {
828        let old_raw_v = r#"{"region_routes":[{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]},{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]}],"version":0}"#;
829        let v = TableRouteValue::try_from_raw_value(old_raw_v.as_bytes()).unwrap();
830
831        let expected_table_route = TableRouteValue::Physical(PhysicalTableRouteValue {
832            region_routes: vec![
833                RegionRoute {
834                    region: Region {
835                        id: RegionId::new(0, 1),
836                        name: "r1".to_string(),
837                        partition: None,
838                        attrs: Default::default(),
839                        partition_expr: Default::default(),
840                    },
841                    leader_peer: Some(Peer {
842                        id: 2,
843                        addr: "a2".to_string(),
844                    }),
845                    follower_peers: vec![],
846                    leader_state: None,
847                    leader_down_since: None,
848                },
849                RegionRoute {
850                    region: Region {
851                        id: RegionId::new(0, 1),
852                        name: "r1".to_string(),
853                        partition: None,
854                        attrs: Default::default(),
855                        partition_expr: Default::default(),
856                    },
857                    leader_peer: Some(Peer {
858                        id: 2,
859                        addr: "a2".to_string(),
860                    }),
861                    follower_peers: vec![],
862                    leader_state: None,
863                    leader_down_since: None,
864                },
865            ],
866            version: 0,
867        });
868
869        assert_eq!(v, expected_table_route);
870    }
871
872    #[test]
873    fn test_key_serialization() {
874        let key = TableRouteKey::new(42);
875        let raw_key = key.to_bytes();
876        assert_eq!(raw_key, b"__table_route/42");
877    }
878
879    #[test]
880    fn test_key_deserialization() {
881        let expected = TableRouteKey::new(42);
882        let key = TableRouteKey::from_bytes(b"__table_route/42").unwrap();
883        assert_eq!(key, expected);
884    }
885
886    #[tokio::test]
887    async fn test_table_route_storage_get_with_raw_bytes_empty() {
888        let kv = Arc::new(MemoryKvBackend::default());
889        let table_route_storage = TableRouteStorage::new(kv);
890        let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
891        assert!(table_route.is_none());
892    }
893
894    #[tokio::test]
895    async fn test_table_route_storage_get_with_raw_bytes() {
896        let kv = Arc::new(MemoryKvBackend::default());
897        let table_route_storage = TableRouteStorage::new(kv.clone());
898        let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
899        assert!(table_route.is_none());
900        let table_route_manager = TableRouteManager::new(kv.clone());
901        let table_route_value = TableRouteValue::Logical(LogicalTableRouteValue {
902            physical_table_id: 1023,
903            region_ids: vec![RegionId::new(1023, 1)],
904        });
905        let (txn, _) = table_route_manager
906            .table_route_storage()
907            .build_create_txn(1024, &table_route_value)
908            .unwrap();
909        let r = kv.txn(txn).await.unwrap();
910        assert!(r.succeeded);
911        let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
912        assert!(table_route.is_some());
913        let got = table_route.unwrap().inner;
914        assert_eq!(got, table_route_value);
915    }
916
917    #[tokio::test]
918    async fn test_table_route_batch_get() {
919        let kv = Arc::new(MemoryKvBackend::default());
920        let table_route_storage = TableRouteStorage::new(kv.clone());
921        let routes = table_route_storage
922            .batch_get(&[1023, 1024, 1025])
923            .await
924            .unwrap();
925
926        assert!(routes.iter().all(Option::is_none));
927        let table_route_manager = TableRouteManager::new(kv.clone());
928        let routes = [
929            (
930                1024,
931                TableRouteValue::Logical(LogicalTableRouteValue {
932                    physical_table_id: 1023,
933                    region_ids: vec![RegionId::new(1023, 1)],
934                }),
935            ),
936            (
937                1025,
938                TableRouteValue::Logical(LogicalTableRouteValue {
939                    physical_table_id: 1023,
940                    region_ids: vec![RegionId::new(1023, 2)],
941                }),
942            ),
943        ];
944        for (table_id, route) in &routes {
945            let (txn, _) = table_route_manager
946                .table_route_storage()
947                .build_create_txn(*table_id, route)
948                .unwrap();
949            let r = kv.txn(txn).await.unwrap();
950            assert!(r.succeeded);
951        }
952
953        let results = table_route_storage
954            .batch_get(&[9999, 1025, 8888, 1024])
955            .await
956            .unwrap();
957        assert!(results[0].is_none());
958        assert_eq!(results[1].as_ref().unwrap(), &routes[1].1);
959        assert!(results[2].is_none());
960        assert_eq!(results[3].as_ref().unwrap(), &routes[0].1);
961    }
962
963    #[tokio::test]
964    async fn remap_route_address_updates_addresses() {
965        let kv = Arc::new(MemoryKvBackend::default());
966        let table_route_storage = TableRouteStorage::new(kv.clone());
967        let mut table_route = TableRouteValue::Physical(PhysicalTableRouteValue {
968            region_routes: vec![RegionRoute {
969                leader_peer: Some(Peer {
970                    id: 1,
971                    ..Default::default()
972                }),
973                follower_peers: vec![Peer {
974                    id: 2,
975                    ..Default::default()
976                }],
977                ..Default::default()
978            }],
979            version: 0,
980        });
981
982        kv.put(PutRequest {
983            key: NodeAddressKey::with_datanode(1).to_bytes(),
984            value: NodeAddressValue {
985                peer: Peer {
986                    addr: "addr1".to_string(),
987                    ..Default::default()
988                },
989            }
990            .try_as_raw_value()
991            .unwrap(),
992            ..Default::default()
993        })
994        .await
995        .unwrap();
996
997        table_route_storage
998            .remap_route_address(&mut table_route)
999            .await
1000            .unwrap();
1001
1002        if let TableRouteValue::Physical(physical_table_route) = table_route {
1003            assert_eq!(
1004                physical_table_route.region_routes[0]
1005                    .leader_peer
1006                    .as_ref()
1007                    .unwrap()
1008                    .addr,
1009                "addr1"
1010            );
1011            assert_eq!(
1012                physical_table_route.region_routes[0].follower_peers[0].addr,
1013                ""
1014            );
1015        } else {
1016            panic!("Expected PhysicalTableRouteValue");
1017        }
1018    }
1019}