common_meta/key/
table_route.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::sync::Arc;
18
19use serde::{Deserialize, Serialize};
20use snafu::{ensure, OptionExt, ResultExt};
21use store_api::storage::{RegionId, RegionNumber};
22use table::metadata::TableId;
23
24use crate::error::{
25    InvalidMetadataSnafu, MetadataCorruptionSnafu, RegionNotFoundSnafu, Result, SerdeJsonSnafu,
26    TableRouteNotFoundSnafu, UnexpectedLogicalRouteTableSnafu,
27};
28use crate::key::node_address::{NodeAddressKey, NodeAddressValue};
29use crate::key::txn_helper::TxnOpGetResponseSet;
30use crate::key::{
31    DeserializedValueWithBytes, MetadataKey, MetadataValue, RegionDistribution,
32    TABLE_ROUTE_KEY_PATTERN, TABLE_ROUTE_PREFIX,
33};
34use crate::kv_backend::txn::Txn;
35use crate::kv_backend::KvBackendRef;
36use crate::rpc::router::{region_distribution, RegionRoute};
37use crate::rpc::store::BatchGetRequest;
38
39/// The key stores table routes
40///
41/// The layout: `__table_route/{table_id}`.
42#[derive(Debug, PartialEq)]
43pub struct TableRouteKey {
44    pub table_id: TableId,
45}
46
47impl TableRouteKey {
48    pub fn new(table_id: TableId) -> Self {
49        Self { table_id }
50    }
51
52    /// Returns the range prefix of the table route key.
53    pub fn range_prefix() -> Vec<u8> {
54        format!("{}/", TABLE_ROUTE_PREFIX).into_bytes()
55    }
56}
57
58#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
59#[serde(tag = "type", rename_all = "snake_case")]
60pub enum TableRouteValue {
61    Physical(PhysicalTableRouteValue),
62    Logical(LogicalTableRouteValue),
63}
64
65#[derive(Debug, PartialEq, Serialize, Deserialize, Clone, Default)]
66pub struct PhysicalTableRouteValue {
67    pub region_routes: Vec<RegionRoute>,
68    version: u64,
69}
70
71#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
72pub struct LogicalTableRouteValue {
73    physical_table_id: TableId,
74    region_ids: Vec<RegionId>,
75}
76
77impl TableRouteValue {
78    /// Returns a [TableRouteValue::Physical] if `table_id` equals `physical_table_id`.
79    /// Otherwise returns a [TableRouteValue::Logical].
80    pub(crate) fn new(
81        table_id: TableId,
82        physical_table_id: TableId,
83        region_routes: Vec<RegionRoute>,
84    ) -> Self {
85        if table_id == physical_table_id {
86            TableRouteValue::physical(region_routes)
87        } else {
88            let region_routes = region_routes
89                .into_iter()
90                .map(|region| {
91                    debug_assert_eq!(region.region.id.table_id(), physical_table_id);
92                    RegionId::new(table_id, region.region.id.region_number())
93                })
94                .collect();
95            TableRouteValue::logical(physical_table_id, region_routes)
96        }
97    }
98
99    pub fn physical(region_routes: Vec<RegionRoute>) -> Self {
100        Self::Physical(PhysicalTableRouteValue::new(region_routes))
101    }
102
103    pub fn logical(physical_table_id: TableId, region_ids: Vec<RegionId>) -> Self {
104        Self::Logical(LogicalTableRouteValue::new(physical_table_id, region_ids))
105    }
106
107    /// Returns a new version [TableRouteValue] with `region_routes`.
108    pub fn update(&self, region_routes: Vec<RegionRoute>) -> Result<Self> {
109        ensure!(
110            self.is_physical(),
111            UnexpectedLogicalRouteTableSnafu {
112                err_msg: format!("{self:?} is a non-physical TableRouteValue."),
113            }
114        );
115        let version = self.as_physical_table_route_ref().version;
116        Ok(Self::Physical(PhysicalTableRouteValue {
117            region_routes,
118            version: version + 1,
119        }))
120    }
121
122    /// Returns the version.
123    ///
124    /// For test purpose.
125    #[cfg(any(test, feature = "testing"))]
126    pub fn version(&self) -> Result<u64> {
127        ensure!(
128            self.is_physical(),
129            UnexpectedLogicalRouteTableSnafu {
130                err_msg: format!("{self:?} is a non-physical TableRouteValue."),
131            }
132        );
133        Ok(self.as_physical_table_route_ref().version)
134    }
135
136    /// Returns the corresponding [RegionRoute], returns `None` if it's the specific region is not found.
137    ///
138    /// Note: It throws an error if it's a logical table
139    pub fn region_route(&self, region_id: RegionId) -> Result<Option<RegionRoute>> {
140        ensure!(
141            self.is_physical(),
142            UnexpectedLogicalRouteTableSnafu {
143                err_msg: format!("{self:?} is a non-physical TableRouteValue."),
144            }
145        );
146        Ok(self
147            .as_physical_table_route_ref()
148            .region_routes
149            .iter()
150            .find(|route| route.region.id == region_id)
151            .cloned())
152    }
153
154    /// Returns true if it's [TableRouteValue::Physical].
155    pub fn is_physical(&self) -> bool {
156        matches!(self, TableRouteValue::Physical(_))
157    }
158
159    /// Gets the [RegionRoute]s of this [TableRouteValue::Physical].
160    pub fn region_routes(&self) -> Result<&Vec<RegionRoute>> {
161        ensure!(
162            self.is_physical(),
163            UnexpectedLogicalRouteTableSnafu {
164                err_msg: format!("{self:?} is a non-physical TableRouteValue."),
165            }
166        );
167        Ok(&self.as_physical_table_route_ref().region_routes)
168    }
169
170    /// Returns the reference of [`PhysicalTableRouteValue`].
171    ///
172    /// # Panic
173    /// If it is not the [`PhysicalTableRouteValue`].
174    fn as_physical_table_route_ref(&self) -> &PhysicalTableRouteValue {
175        match self {
176            TableRouteValue::Physical(x) => x,
177            _ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
178        }
179    }
180
181    /// Converts to [`PhysicalTableRouteValue`].
182    ///
183    /// # Panic
184    /// If it is not the [`PhysicalTableRouteValue`].
185    pub fn into_physical_table_route(self) -> PhysicalTableRouteValue {
186        match self {
187            TableRouteValue::Physical(x) => x,
188            _ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
189        }
190    }
191
192    /// Converts to [`LogicalTableRouteValue`].
193    ///
194    /// # Panic
195    /// If it is not the [`LogicalTableRouteValue`].
196    pub fn into_logical_table_route(self) -> LogicalTableRouteValue {
197        match self {
198            TableRouteValue::Logical(x) => x,
199            _ => unreachable!("Mistakenly been treated as a Logical TableRoute: {self:?}"),
200        }
201    }
202
203    pub fn region_numbers(&self) -> Vec<RegionNumber> {
204        match self {
205            TableRouteValue::Physical(x) => x
206                .region_routes
207                .iter()
208                .map(|region_route| region_route.region.id.region_number())
209                .collect(),
210            TableRouteValue::Logical(x) => x
211                .region_ids()
212                .iter()
213                .map(|region_id| region_id.region_number())
214                .collect(),
215        }
216    }
217}
218
219impl MetadataValue for TableRouteValue {
220    fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
221        let r = serde_json::from_slice::<TableRouteValue>(raw_value);
222        match r {
223            // Compatible with old TableRouteValue.
224            Err(e) if e.is_data() => Ok(Self::Physical(
225                serde_json::from_slice::<PhysicalTableRouteValue>(raw_value)
226                    .context(SerdeJsonSnafu)?,
227            )),
228            Ok(x) => Ok(x),
229            Err(e) => Err(e).context(SerdeJsonSnafu),
230        }
231    }
232
233    fn try_as_raw_value(&self) -> Result<Vec<u8>> {
234        serde_json::to_vec(self).context(SerdeJsonSnafu)
235    }
236}
237
238impl PhysicalTableRouteValue {
239    pub fn new(region_routes: Vec<RegionRoute>) -> Self {
240        Self {
241            region_routes,
242            version: 0,
243        }
244    }
245}
246
247impl LogicalTableRouteValue {
248    pub fn new(physical_table_id: TableId, region_ids: Vec<RegionId>) -> Self {
249        Self {
250            physical_table_id,
251            region_ids,
252        }
253    }
254
255    pub fn physical_table_id(&self) -> TableId {
256        self.physical_table_id
257    }
258
259    pub fn region_ids(&self) -> &Vec<RegionId> {
260        &self.region_ids
261    }
262}
263
264impl MetadataKey<'_, TableRouteKey> for TableRouteKey {
265    fn to_bytes(&self) -> Vec<u8> {
266        self.to_string().into_bytes()
267    }
268
269    fn from_bytes(bytes: &[u8]) -> Result<TableRouteKey> {
270        let key = std::str::from_utf8(bytes).map_err(|e| {
271            InvalidMetadataSnafu {
272                err_msg: format!(
273                    "TableRouteKey '{}' is not a valid UTF8 string: {e}",
274                    String::from_utf8_lossy(bytes)
275                ),
276            }
277            .build()
278        })?;
279        let captures = TABLE_ROUTE_KEY_PATTERN
280            .captures(key)
281            .context(InvalidMetadataSnafu {
282                err_msg: format!("Invalid TableRouteKey '{key}'"),
283            })?;
284        // Safety: pass the regex check above
285        let table_id = captures[1].parse::<TableId>().unwrap();
286        Ok(TableRouteKey { table_id })
287    }
288}
289
290impl Display for TableRouteKey {
291    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
292        write!(f, "{}/{}", TABLE_ROUTE_PREFIX, self.table_id)
293    }
294}
295
296pub type TableRouteManagerRef = Arc<TableRouteManager>;
297
298pub struct TableRouteManager {
299    storage: TableRouteStorage,
300}
301
302impl TableRouteManager {
303    pub fn new(kv_backend: KvBackendRef) -> Self {
304        Self {
305            storage: TableRouteStorage::new(kv_backend),
306        }
307    }
308
309    /// Returns the [TableId] recursively.
310    ///
311    /// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
312    /// - the table(`logical_or_physical_table_id`) does not exist.
313    pub async fn get_physical_table_id(
314        &self,
315        logical_or_physical_table_id: TableId,
316    ) -> Result<TableId> {
317        let table_route = self
318            .storage
319            .get_inner(logical_or_physical_table_id)
320            .await?
321            .context(TableRouteNotFoundSnafu {
322                table_id: logical_or_physical_table_id,
323            })?;
324
325        match table_route {
326            TableRouteValue::Physical(_) => Ok(logical_or_physical_table_id),
327            TableRouteValue::Logical(x) => Ok(x.physical_table_id()),
328        }
329    }
330
331    /// Returns the [TableRouteValue::Physical] recursively.
332    ///
333    /// Returns a [TableRouteNotFound](error::Error::TableRouteNotFound) Error if:
334    /// - the physical table(`logical_or_physical_table_id`) does not exist
335    /// - the corresponding physical table of the logical table(`logical_or_physical_table_id`) does not exist.
336    pub async fn get_physical_table_route(
337        &self,
338        logical_or_physical_table_id: TableId,
339    ) -> Result<(TableId, PhysicalTableRouteValue)> {
340        let table_route = self
341            .storage
342            .get(logical_or_physical_table_id)
343            .await?
344            .context(TableRouteNotFoundSnafu {
345                table_id: logical_or_physical_table_id,
346            })?;
347
348        match table_route {
349            TableRouteValue::Physical(x) => Ok((logical_or_physical_table_id, x)),
350            TableRouteValue::Logical(x) => {
351                let physical_table_id = x.physical_table_id();
352                let physical_table_route = self.storage.get(physical_table_id).await?.context(
353                    TableRouteNotFoundSnafu {
354                        table_id: physical_table_id,
355                    },
356                )?;
357                let physical_table_route = physical_table_route.into_physical_table_route();
358                Ok((physical_table_id, physical_table_route))
359            }
360        }
361    }
362
363    /// Returns the [TableRouteValue::Physical] recursively.
364    ///
365    /// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
366    /// - one of the logical tables corresponding to the physical table does not exist.
367    ///
368    /// **Notes**: it may return a subset of `logical_or_physical_table_ids`.
369    pub async fn batch_get_physical_table_routes(
370        &self,
371        logical_or_physical_table_ids: &[TableId],
372    ) -> Result<HashMap<TableId, PhysicalTableRouteValue>> {
373        let table_routes = self
374            .storage
375            .batch_get(logical_or_physical_table_ids)
376            .await?;
377        // Returns a subset of `logical_or_physical_table_ids`.
378        let table_routes = table_routes
379            .into_iter()
380            .zip(logical_or_physical_table_ids)
381            .filter_map(|(route, id)| route.map(|route| (*id, route)))
382            .collect::<HashMap<_, _>>();
383
384        let mut physical_table_routes = HashMap::with_capacity(table_routes.len());
385        let mut logical_table_ids = HashMap::with_capacity(table_routes.len());
386
387        for (table_id, table_route) in table_routes {
388            match table_route {
389                TableRouteValue::Physical(x) => {
390                    physical_table_routes.insert(table_id, x);
391                }
392                TableRouteValue::Logical(x) => {
393                    logical_table_ids.insert(table_id, x.physical_table_id());
394                }
395            }
396        }
397
398        if logical_table_ids.is_empty() {
399            return Ok(physical_table_routes);
400        }
401
402        // Finds the logical tables corresponding to the physical tables.
403        let physical_table_ids = logical_table_ids
404            .values()
405            .cloned()
406            .collect::<HashSet<_>>()
407            .into_iter()
408            .collect::<Vec<_>>();
409        let table_routes = self
410            .table_route_storage()
411            .batch_get(&physical_table_ids)
412            .await?;
413        let table_routes = table_routes
414            .into_iter()
415            .zip(physical_table_ids)
416            .filter_map(|(route, id)| route.map(|route| (id, route)))
417            .collect::<HashMap<_, _>>();
418
419        for (logical_table_id, physical_table_id) in logical_table_ids {
420            let table_route =
421                table_routes
422                    .get(&physical_table_id)
423                    .context(TableRouteNotFoundSnafu {
424                        table_id: physical_table_id,
425                    })?;
426            match table_route {
427                TableRouteValue::Physical(x) => {
428                    physical_table_routes.insert(logical_table_id, x.clone());
429                }
430                TableRouteValue::Logical(x) => {
431                    // Never get here, because we use a physical table id cannot obtain a logical table.
432                    MetadataCorruptionSnafu {
433                        err_msg: format!(
434                            "logical table {} {:?} cannot be resolved to a physical table.",
435                            logical_table_id, x
436                        ),
437                    }
438                    .fail()?;
439                }
440            }
441        }
442
443        Ok(physical_table_routes)
444    }
445
446    /// Returns [`RegionDistribution`] of the table(`table_id`).
447    pub async fn get_region_distribution(
448        &self,
449        table_id: TableId,
450    ) -> Result<Option<RegionDistribution>> {
451        self.storage
452            .get(table_id)
453            .await?
454            .map(|table_route| Ok(region_distribution(table_route.region_routes()?)))
455            .transpose()
456    }
457
458    /// Sets the staging state for a specific region.
459    ///
460    /// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
461    /// - the table does not exist
462    /// - the region is not found in the table
463    pub async fn set_region_staging_state(
464        &self,
465        region_id: store_api::storage::RegionId,
466        staging: bool,
467    ) -> Result<()> {
468        let table_id = region_id.table_id();
469
470        // Get current table route with raw bytes for CAS operation
471        let current_table_route = self
472            .storage
473            .get_with_raw_bytes(table_id)
474            .await?
475            .context(TableRouteNotFoundSnafu { table_id })?;
476
477        // Clone the current route value and update the specific region
478        let new_table_route = current_table_route.inner.clone();
479
480        // Only physical tables have region routes
481        ensure!(
482            new_table_route.is_physical(),
483            UnexpectedLogicalRouteTableSnafu {
484                err_msg: format!("Cannot set staging state for logical table {table_id}"),
485            }
486        );
487
488        let region_routes = new_table_route.region_routes()?.clone();
489        let mut updated_routes = region_routes.clone();
490
491        // Find and update the specific region
492        // TODO(ruihang): maybe update them in one transaction
493        let mut region_found = false;
494        for route in &mut updated_routes {
495            if route.region.id == region_id {
496                if staging {
497                    route.set_leader_staging();
498                } else {
499                    route.clear_leader_staging();
500                }
501                region_found = true;
502                break;
503            }
504        }
505
506        ensure!(region_found, RegionNotFoundSnafu { region_id });
507
508        // Create new table route with updated region routes
509        let updated_table_route = new_table_route.update(updated_routes)?;
510
511        // Execute atomic update
512        let (txn, _) =
513            self.storage
514                .build_update_txn(table_id, &current_table_route, &updated_table_route)?;
515
516        let result = self.storage.kv_backend.txn(txn).await?;
517
518        ensure!(
519            result.succeeded,
520            MetadataCorruptionSnafu {
521                err_msg: format!(
522                    "Failed to update staging state for region {}: CAS operation failed",
523                    region_id
524                ),
525            }
526        );
527
528        Ok(())
529    }
530
531    /// Checks if a specific region is in staging state.
532    ///
533    /// Returns false if the table/region doesn't exist.
534    pub async fn is_region_staging(&self, region_id: store_api::storage::RegionId) -> Result<bool> {
535        let table_id = region_id.table_id();
536
537        let table_route = self.storage.get(table_id).await?;
538
539        match table_route {
540            Some(route) if route.is_physical() => {
541                let region_routes = route.region_routes()?;
542                for route in region_routes {
543                    if route.region.id == region_id {
544                        return Ok(route.is_leader_staging());
545                    }
546                }
547                Ok(false)
548            }
549            _ => Ok(false),
550        }
551    }
552
553    /// Returns low-level APIs.
554    pub fn table_route_storage(&self) -> &TableRouteStorage {
555        &self.storage
556    }
557}
558
559/// Low-level operations of [TableRouteValue].
560pub struct TableRouteStorage {
561    kv_backend: KvBackendRef,
562}
563
564pub type TableRouteValueDecodeResult = Result<Option<DeserializedValueWithBytes<TableRouteValue>>>;
565
566impl TableRouteStorage {
567    pub fn new(kv_backend: KvBackendRef) -> Self {
568        Self { kv_backend }
569    }
570
571    /// Builds a create table route transaction,
572    /// it expected the `__table_route/{table_id}` wasn't occupied.
573    pub fn build_create_txn(
574        &self,
575        table_id: TableId,
576        table_route_value: &TableRouteValue,
577    ) -> Result<(
578        Txn,
579        impl FnOnce(&mut TxnOpGetResponseSet) -> TableRouteValueDecodeResult,
580    )> {
581        let key = TableRouteKey::new(table_id);
582        let raw_key = key.to_bytes();
583
584        let txn = Txn::put_if_not_exists(raw_key.clone(), table_route_value.try_as_raw_value()?);
585
586        Ok((
587            txn,
588            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
589        ))
590    }
591
592    // TODO(LFC): restore its original visibility after some test utility codes are refined
593    /// Builds a update table route transaction,
594    /// it expected the remote value equals the `current_table_route_value`.
595    /// It retrieves the latest value if the comparing failed.
596    pub fn build_update_txn(
597        &self,
598        table_id: TableId,
599        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
600        new_table_route_value: &TableRouteValue,
601    ) -> Result<(
602        Txn,
603        impl FnOnce(&mut TxnOpGetResponseSet) -> TableRouteValueDecodeResult,
604    )> {
605        let key = TableRouteKey::new(table_id);
606        let raw_key = key.to_bytes();
607        let raw_value = current_table_route_value.get_raw_bytes();
608        let new_raw_value: Vec<u8> = new_table_route_value.try_as_raw_value()?;
609
610        let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
611
612        Ok((
613            txn,
614            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
615        ))
616    }
617
618    /// Returns the [`TableRouteValue`].
619    pub async fn get(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
620        let mut table_route = self.get_inner(table_id).await?;
621        if let Some(table_route) = &mut table_route {
622            self.remap_route_address(table_route).await?;
623        };
624
625        Ok(table_route)
626    }
627
628    async fn get_inner(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
629        let key = TableRouteKey::new(table_id);
630        self.kv_backend
631            .get(&key.to_bytes())
632            .await?
633            .map(|kv| TableRouteValue::try_from_raw_value(&kv.value))
634            .transpose()
635    }
636
637    /// Returns the [`TableRouteValue`] wrapped with [`DeserializedValueWithBytes`].
638    pub async fn get_with_raw_bytes(
639        &self,
640        table_id: TableId,
641    ) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
642        let mut table_route = self.get_with_raw_bytes_inner(table_id).await?;
643        if let Some(table_route) = &mut table_route {
644            self.remap_route_address(table_route).await?;
645        };
646
647        Ok(table_route)
648    }
649
650    async fn get_with_raw_bytes_inner(
651        &self,
652        table_id: TableId,
653    ) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
654        let key = TableRouteKey::new(table_id);
655        self.kv_backend
656            .get(&key.to_bytes())
657            .await?
658            .map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value))
659            .transpose()
660    }
661
662    /// Returns batch of [`TableRouteValue`] that respects the order of `table_ids`.
663    pub async fn batch_get(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRouteValue>>> {
664        let mut table_routes = self.batch_get_inner(table_ids).await?;
665        self.remap_routes_addresses(&mut table_routes).await?;
666
667        Ok(table_routes)
668    }
669
670    async fn batch_get_inner(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRouteValue>>> {
671        let keys = table_ids
672            .iter()
673            .map(|id| TableRouteKey::new(*id).to_bytes())
674            .collect::<Vec<_>>();
675        let resp = self
676            .kv_backend
677            .batch_get(BatchGetRequest { keys: keys.clone() })
678            .await?;
679
680        let kvs = resp
681            .kvs
682            .into_iter()
683            .map(|kv| (kv.key, kv.value))
684            .collect::<HashMap<_, _>>();
685        keys.into_iter()
686            .map(|key| {
687                if let Some(value) = kvs.get(&key) {
688                    Ok(Some(TableRouteValue::try_from_raw_value(value)?))
689                } else {
690                    Ok(None)
691                }
692            })
693            .collect()
694    }
695
696    async fn remap_routes_addresses(
697        &self,
698        table_routes: &mut [Option<TableRouteValue>],
699    ) -> Result<()> {
700        let keys = table_routes
701            .iter()
702            .flat_map(|table_route| {
703                table_route
704                    .as_ref()
705                    .map(extract_address_keys)
706                    .unwrap_or_default()
707            })
708            .collect::<HashSet<_>>()
709            .into_iter()
710            .collect();
711        let node_addrs = self.get_node_addresses(keys).await?;
712        for table_route in table_routes.iter_mut().flatten() {
713            set_addresses(&node_addrs, table_route)?;
714        }
715
716        Ok(())
717    }
718
719    async fn remap_route_address(&self, table_route: &mut TableRouteValue) -> Result<()> {
720        let keys = extract_address_keys(table_route).into_iter().collect();
721        let node_addrs = self.get_node_addresses(keys).await?;
722        set_addresses(&node_addrs, table_route)?;
723
724        Ok(())
725    }
726
727    async fn get_node_addresses(
728        &self,
729        keys: Vec<Vec<u8>>,
730    ) -> Result<HashMap<u64, NodeAddressValue>> {
731        if keys.is_empty() {
732            return Ok(HashMap::default());
733        }
734
735        self.kv_backend
736            .batch_get(BatchGetRequest { keys })
737            .await?
738            .kvs
739            .into_iter()
740            .map(|kv| {
741                let node_id = NodeAddressKey::from_bytes(&kv.key)?.node_id;
742                let node_addr = NodeAddressValue::try_from_raw_value(&kv.value)?;
743                Ok((node_id, node_addr))
744            })
745            .collect()
746    }
747}
748
749fn set_addresses(
750    node_addrs: &HashMap<u64, NodeAddressValue>,
751    table_route: &mut TableRouteValue,
752) -> Result<()> {
753    let TableRouteValue::Physical(physical_table_route) = table_route else {
754        return Ok(());
755    };
756
757    for region_route in &mut physical_table_route.region_routes {
758        if let Some(leader) = &mut region_route.leader_peer {
759            if let Some(node_addr) = node_addrs.get(&leader.id) {
760                leader.addr = node_addr.peer.addr.clone();
761            }
762        }
763        for follower in &mut region_route.follower_peers {
764            if let Some(node_addr) = node_addrs.get(&follower.id) {
765                follower.addr = node_addr.peer.addr.clone();
766            }
767        }
768    }
769
770    Ok(())
771}
772
773fn extract_address_keys(table_route: &TableRouteValue) -> HashSet<Vec<u8>> {
774    let TableRouteValue::Physical(physical_table_route) = table_route else {
775        return HashSet::default();
776    };
777
778    physical_table_route
779        .region_routes
780        .iter()
781        .flat_map(|region_route| {
782            region_route
783                .follower_peers
784                .iter()
785                .map(|peer| NodeAddressKey::with_datanode(peer.id).to_bytes())
786                .chain(
787                    region_route
788                        .leader_peer
789                        .as_ref()
790                        .map(|leader| NodeAddressKey::with_datanode(leader.id).to_bytes()),
791                )
792        })
793        .collect()
794}
795
796#[cfg(test)]
797mod tests {
798    use std::sync::Arc;
799
800    use super::*;
801    use crate::kv_backend::memory::MemoryKvBackend;
802    use crate::kv_backend::{KvBackend, TxnService};
803    use crate::peer::Peer;
804    use crate::rpc::router::Region;
805    use crate::rpc::store::PutRequest;
806
807    #[test]
808    fn test_table_route_compatibility() {
809        let old_raw_v = r#"{"region_routes":[{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]},{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]}],"version":0}"#;
810        let v = TableRouteValue::try_from_raw_value(old_raw_v.as_bytes()).unwrap();
811
812        let expected_table_route = TableRouteValue::Physical(PhysicalTableRouteValue {
813            region_routes: vec![
814                RegionRoute {
815                    region: Region {
816                        id: RegionId::new(0, 1),
817                        name: "r1".to_string(),
818                        partition: None,
819                        attrs: Default::default(),
820                        partition_expr: Default::default(),
821                    },
822                    leader_peer: Some(Peer {
823                        id: 2,
824                        addr: "a2".to_string(),
825                    }),
826                    follower_peers: vec![],
827                    leader_state: None,
828                    leader_down_since: None,
829                },
830                RegionRoute {
831                    region: Region {
832                        id: RegionId::new(0, 1),
833                        name: "r1".to_string(),
834                        partition: None,
835                        attrs: Default::default(),
836                        partition_expr: Default::default(),
837                    },
838                    leader_peer: Some(Peer {
839                        id: 2,
840                        addr: "a2".to_string(),
841                    }),
842                    follower_peers: vec![],
843                    leader_state: None,
844                    leader_down_since: None,
845                },
846            ],
847            version: 0,
848        });
849
850        assert_eq!(v, expected_table_route);
851    }
852
853    #[test]
854    fn test_key_serialization() {
855        let key = TableRouteKey::new(42);
856        let raw_key = key.to_bytes();
857        assert_eq!(raw_key, b"__table_route/42");
858    }
859
860    #[test]
861    fn test_key_deserialization() {
862        let expected = TableRouteKey::new(42);
863        let key = TableRouteKey::from_bytes(b"__table_route/42").unwrap();
864        assert_eq!(key, expected);
865    }
866
867    #[tokio::test]
868    async fn test_table_route_storage_get_with_raw_bytes_empty() {
869        let kv = Arc::new(MemoryKvBackend::default());
870        let table_route_storage = TableRouteStorage::new(kv);
871        let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
872        assert!(table_route.is_none());
873    }
874
875    #[tokio::test]
876    async fn test_table_route_storage_get_with_raw_bytes() {
877        let kv = Arc::new(MemoryKvBackend::default());
878        let table_route_storage = TableRouteStorage::new(kv.clone());
879        let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
880        assert!(table_route.is_none());
881        let table_route_manager = TableRouteManager::new(kv.clone());
882        let table_route_value = TableRouteValue::Logical(LogicalTableRouteValue {
883            physical_table_id: 1023,
884            region_ids: vec![RegionId::new(1023, 1)],
885        });
886        let (txn, _) = table_route_manager
887            .table_route_storage()
888            .build_create_txn(1024, &table_route_value)
889            .unwrap();
890        let r = kv.txn(txn).await.unwrap();
891        assert!(r.succeeded);
892        let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
893        assert!(table_route.is_some());
894        let got = table_route.unwrap().inner;
895        assert_eq!(got, table_route_value);
896    }
897
898    #[tokio::test]
899    async fn test_table_route_batch_get() {
900        let kv = Arc::new(MemoryKvBackend::default());
901        let table_route_storage = TableRouteStorage::new(kv.clone());
902        let routes = table_route_storage
903            .batch_get(&[1023, 1024, 1025])
904            .await
905            .unwrap();
906
907        assert!(routes.iter().all(Option::is_none));
908        let table_route_manager = TableRouteManager::new(kv.clone());
909        let routes = [
910            (
911                1024,
912                TableRouteValue::Logical(LogicalTableRouteValue {
913                    physical_table_id: 1023,
914                    region_ids: vec![RegionId::new(1023, 1)],
915                }),
916            ),
917            (
918                1025,
919                TableRouteValue::Logical(LogicalTableRouteValue {
920                    physical_table_id: 1023,
921                    region_ids: vec![RegionId::new(1023, 2)],
922                }),
923            ),
924        ];
925        for (table_id, route) in &routes {
926            let (txn, _) = table_route_manager
927                .table_route_storage()
928                .build_create_txn(*table_id, route)
929                .unwrap();
930            let r = kv.txn(txn).await.unwrap();
931            assert!(r.succeeded);
932        }
933
934        let results = table_route_storage
935            .batch_get(&[9999, 1025, 8888, 1024])
936            .await
937            .unwrap();
938        assert!(results[0].is_none());
939        assert_eq!(results[1].as_ref().unwrap(), &routes[1].1);
940        assert!(results[2].is_none());
941        assert_eq!(results[3].as_ref().unwrap(), &routes[0].1);
942    }
943
944    #[tokio::test]
945    async fn remap_route_address_updates_addresses() {
946        let kv = Arc::new(MemoryKvBackend::default());
947        let table_route_storage = TableRouteStorage::new(kv.clone());
948        let mut table_route = TableRouteValue::Physical(PhysicalTableRouteValue {
949            region_routes: vec![RegionRoute {
950                leader_peer: Some(Peer {
951                    id: 1,
952                    ..Default::default()
953                }),
954                follower_peers: vec![Peer {
955                    id: 2,
956                    ..Default::default()
957                }],
958                ..Default::default()
959            }],
960            version: 0,
961        });
962
963        kv.put(PutRequest {
964            key: NodeAddressKey::with_datanode(1).to_bytes(),
965            value: NodeAddressValue {
966                peer: Peer {
967                    addr: "addr1".to_string(),
968                    ..Default::default()
969                },
970            }
971            .try_as_raw_value()
972            .unwrap(),
973            ..Default::default()
974        })
975        .await
976        .unwrap();
977
978        table_route_storage
979            .remap_route_address(&mut table_route)
980            .await
981            .unwrap();
982
983        if let TableRouteValue::Physical(physical_table_route) = table_route {
984            assert_eq!(
985                physical_table_route.region_routes[0]
986                    .leader_peer
987                    .as_ref()
988                    .unwrap()
989                    .addr,
990                "addr1"
991            );
992            assert_eq!(
993                physical_table_route.region_routes[0].follower_peers[0].addr,
994                ""
995            );
996        } else {
997            panic!("Expected PhysicalTableRouteValue");
998        }
999    }
1000}