1use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::sync::Arc;
18
19use serde::{Deserialize, Serialize};
20use snafu::{ensure, OptionExt, ResultExt};
21use store_api::storage::{RegionId, RegionNumber};
22use table::metadata::TableId;
23
24use crate::error::{
25 InvalidMetadataSnafu, MetadataCorruptionSnafu, Result, SerdeJsonSnafu, TableRouteNotFoundSnafu,
26 UnexpectedLogicalRouteTableSnafu,
27};
28use crate::key::node_address::{NodeAddressKey, NodeAddressValue};
29use crate::key::txn_helper::TxnOpGetResponseSet;
30use crate::key::{
31 DeserializedValueWithBytes, MetadataKey, MetadataValue, RegionDistribution,
32 TABLE_ROUTE_KEY_PATTERN, TABLE_ROUTE_PREFIX,
33};
34use crate::kv_backend::txn::Txn;
35use crate::kv_backend::KvBackendRef;
36use crate::rpc::router::{region_distribution, RegionRoute};
37use crate::rpc::store::BatchGetRequest;
38
39#[derive(Debug, PartialEq)]
43pub struct TableRouteKey {
44 pub table_id: TableId,
45}
46
47impl TableRouteKey {
48 pub fn new(table_id: TableId) -> Self {
49 Self { table_id }
50 }
51
52 pub fn range_prefix() -> Vec<u8> {
54 format!("{}/", TABLE_ROUTE_PREFIX).into_bytes()
55 }
56}
57
58#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
59#[serde(tag = "type", rename_all = "snake_case")]
60pub enum TableRouteValue {
61 Physical(PhysicalTableRouteValue),
62 Logical(LogicalTableRouteValue),
63}
64
65#[derive(Debug, PartialEq, Serialize, Deserialize, Clone, Default)]
66pub struct PhysicalTableRouteValue {
67 pub region_routes: Vec<RegionRoute>,
68 version: u64,
69}
70
71#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
72pub struct LogicalTableRouteValue {
73 physical_table_id: TableId,
74 region_ids: Vec<RegionId>,
75}
76
77impl TableRouteValue {
78 pub(crate) fn new(
81 table_id: TableId,
82 physical_table_id: TableId,
83 region_routes: Vec<RegionRoute>,
84 ) -> Self {
85 if table_id == physical_table_id {
86 TableRouteValue::physical(region_routes)
87 } else {
88 let region_routes = region_routes
89 .into_iter()
90 .map(|region| {
91 debug_assert_eq!(region.region.id.table_id(), physical_table_id);
92 RegionId::new(table_id, region.region.id.region_number())
93 })
94 .collect();
95 TableRouteValue::logical(physical_table_id, region_routes)
96 }
97 }
98
99 pub fn physical(region_routes: Vec<RegionRoute>) -> Self {
100 Self::Physical(PhysicalTableRouteValue::new(region_routes))
101 }
102
103 pub fn logical(physical_table_id: TableId, region_ids: Vec<RegionId>) -> Self {
104 Self::Logical(LogicalTableRouteValue::new(physical_table_id, region_ids))
105 }
106
107 pub fn update(&self, region_routes: Vec<RegionRoute>) -> Result<Self> {
109 ensure!(
110 self.is_physical(),
111 UnexpectedLogicalRouteTableSnafu {
112 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
113 }
114 );
115 let version = self.as_physical_table_route_ref().version;
116 Ok(Self::Physical(PhysicalTableRouteValue {
117 region_routes,
118 version: version + 1,
119 }))
120 }
121
122 #[cfg(any(test, feature = "testing"))]
126 pub fn version(&self) -> Result<u64> {
127 ensure!(
128 self.is_physical(),
129 UnexpectedLogicalRouteTableSnafu {
130 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
131 }
132 );
133 Ok(self.as_physical_table_route_ref().version)
134 }
135
136 pub fn region_route(&self, region_id: RegionId) -> Result<Option<RegionRoute>> {
140 ensure!(
141 self.is_physical(),
142 UnexpectedLogicalRouteTableSnafu {
143 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
144 }
145 );
146 Ok(self
147 .as_physical_table_route_ref()
148 .region_routes
149 .iter()
150 .find(|route| route.region.id == region_id)
151 .cloned())
152 }
153
154 pub fn is_physical(&self) -> bool {
156 matches!(self, TableRouteValue::Physical(_))
157 }
158
159 pub fn region_routes(&self) -> Result<&Vec<RegionRoute>> {
161 ensure!(
162 self.is_physical(),
163 UnexpectedLogicalRouteTableSnafu {
164 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
165 }
166 );
167 Ok(&self.as_physical_table_route_ref().region_routes)
168 }
169
170 fn as_physical_table_route_ref(&self) -> &PhysicalTableRouteValue {
175 match self {
176 TableRouteValue::Physical(x) => x,
177 _ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
178 }
179 }
180
181 pub fn into_physical_table_route(self) -> PhysicalTableRouteValue {
186 match self {
187 TableRouteValue::Physical(x) => x,
188 _ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
189 }
190 }
191
192 pub fn region_numbers(&self) -> Vec<RegionNumber> {
193 match self {
194 TableRouteValue::Physical(x) => x
195 .region_routes
196 .iter()
197 .map(|region_route| region_route.region.id.region_number())
198 .collect(),
199 TableRouteValue::Logical(x) => x
200 .region_ids()
201 .iter()
202 .map(|region_id| region_id.region_number())
203 .collect(),
204 }
205 }
206}
207
208impl MetadataValue for TableRouteValue {
209 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
210 let r = serde_json::from_slice::<TableRouteValue>(raw_value);
211 match r {
212 Err(e) if e.is_data() => Ok(Self::Physical(
214 serde_json::from_slice::<PhysicalTableRouteValue>(raw_value)
215 .context(SerdeJsonSnafu)?,
216 )),
217 Ok(x) => Ok(x),
218 Err(e) => Err(e).context(SerdeJsonSnafu),
219 }
220 }
221
222 fn try_as_raw_value(&self) -> Result<Vec<u8>> {
223 serde_json::to_vec(self).context(SerdeJsonSnafu)
224 }
225}
226
227impl PhysicalTableRouteValue {
228 pub fn new(region_routes: Vec<RegionRoute>) -> Self {
229 Self {
230 region_routes,
231 version: 0,
232 }
233 }
234}
235
236impl LogicalTableRouteValue {
237 pub fn new(physical_table_id: TableId, region_ids: Vec<RegionId>) -> Self {
238 Self {
239 physical_table_id,
240 region_ids,
241 }
242 }
243
244 pub fn physical_table_id(&self) -> TableId {
245 self.physical_table_id
246 }
247
248 pub fn region_ids(&self) -> &Vec<RegionId> {
249 &self.region_ids
250 }
251}
252
253impl MetadataKey<'_, TableRouteKey> for TableRouteKey {
254 fn to_bytes(&self) -> Vec<u8> {
255 self.to_string().into_bytes()
256 }
257
258 fn from_bytes(bytes: &[u8]) -> Result<TableRouteKey> {
259 let key = std::str::from_utf8(bytes).map_err(|e| {
260 InvalidMetadataSnafu {
261 err_msg: format!(
262 "TableRouteKey '{}' is not a valid UTF8 string: {e}",
263 String::from_utf8_lossy(bytes)
264 ),
265 }
266 .build()
267 })?;
268 let captures = TABLE_ROUTE_KEY_PATTERN
269 .captures(key)
270 .context(InvalidMetadataSnafu {
271 err_msg: format!("Invalid TableRouteKey '{key}'"),
272 })?;
273 let table_id = captures[1].parse::<TableId>().unwrap();
275 Ok(TableRouteKey { table_id })
276 }
277}
278
279impl Display for TableRouteKey {
280 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
281 write!(f, "{}/{}", TABLE_ROUTE_PREFIX, self.table_id)
282 }
283}
284
285pub type TableRouteManagerRef = Arc<TableRouteManager>;
286
287pub struct TableRouteManager {
288 storage: TableRouteStorage,
289}
290
291impl TableRouteManager {
292 pub fn new(kv_backend: KvBackendRef) -> Self {
293 Self {
294 storage: TableRouteStorage::new(kv_backend),
295 }
296 }
297
298 pub async fn get_physical_table_id(
303 &self,
304 logical_or_physical_table_id: TableId,
305 ) -> Result<TableId> {
306 let table_route = self
307 .storage
308 .get_inner(logical_or_physical_table_id)
309 .await?
310 .context(TableRouteNotFoundSnafu {
311 table_id: logical_or_physical_table_id,
312 })?;
313
314 match table_route {
315 TableRouteValue::Physical(_) => Ok(logical_or_physical_table_id),
316 TableRouteValue::Logical(x) => Ok(x.physical_table_id()),
317 }
318 }
319
320 pub async fn get_physical_table_route(
326 &self,
327 logical_or_physical_table_id: TableId,
328 ) -> Result<(TableId, PhysicalTableRouteValue)> {
329 let table_route = self
330 .storage
331 .get(logical_or_physical_table_id)
332 .await?
333 .context(TableRouteNotFoundSnafu {
334 table_id: logical_or_physical_table_id,
335 })?;
336
337 match table_route {
338 TableRouteValue::Physical(x) => Ok((logical_or_physical_table_id, x)),
339 TableRouteValue::Logical(x) => {
340 let physical_table_id = x.physical_table_id();
341 let physical_table_route = self.storage.get(physical_table_id).await?.context(
342 TableRouteNotFoundSnafu {
343 table_id: physical_table_id,
344 },
345 )?;
346 let physical_table_route = physical_table_route.into_physical_table_route();
347 Ok((physical_table_id, physical_table_route))
348 }
349 }
350 }
351
352 pub async fn batch_get_physical_table_routes(
359 &self,
360 logical_or_physical_table_ids: &[TableId],
361 ) -> Result<HashMap<TableId, PhysicalTableRouteValue>> {
362 let table_routes = self
363 .storage
364 .batch_get(logical_or_physical_table_ids)
365 .await?;
366 let table_routes = table_routes
368 .into_iter()
369 .zip(logical_or_physical_table_ids)
370 .filter_map(|(route, id)| route.map(|route| (*id, route)))
371 .collect::<HashMap<_, _>>();
372
373 let mut physical_table_routes = HashMap::with_capacity(table_routes.len());
374 let mut logical_table_ids = HashMap::with_capacity(table_routes.len());
375
376 for (table_id, table_route) in table_routes {
377 match table_route {
378 TableRouteValue::Physical(x) => {
379 physical_table_routes.insert(table_id, x);
380 }
381 TableRouteValue::Logical(x) => {
382 logical_table_ids.insert(table_id, x.physical_table_id());
383 }
384 }
385 }
386
387 if logical_table_ids.is_empty() {
388 return Ok(physical_table_routes);
389 }
390
391 let physical_table_ids = logical_table_ids
393 .values()
394 .cloned()
395 .collect::<HashSet<_>>()
396 .into_iter()
397 .collect::<Vec<_>>();
398 let table_routes = self
399 .table_route_storage()
400 .batch_get(&physical_table_ids)
401 .await?;
402 let table_routes = table_routes
403 .into_iter()
404 .zip(physical_table_ids)
405 .filter_map(|(route, id)| route.map(|route| (id, route)))
406 .collect::<HashMap<_, _>>();
407
408 for (logical_table_id, physical_table_id) in logical_table_ids {
409 let table_route =
410 table_routes
411 .get(&physical_table_id)
412 .context(TableRouteNotFoundSnafu {
413 table_id: physical_table_id,
414 })?;
415 match table_route {
416 TableRouteValue::Physical(x) => {
417 physical_table_routes.insert(logical_table_id, x.clone());
418 }
419 TableRouteValue::Logical(x) => {
420 MetadataCorruptionSnafu {
422 err_msg: format!(
423 "logical table {} {:?} cannot be resolved to a physical table.",
424 logical_table_id, x
425 ),
426 }
427 .fail()?;
428 }
429 }
430 }
431
432 Ok(physical_table_routes)
433 }
434
435 pub async fn get_region_distribution(
437 &self,
438 table_id: TableId,
439 ) -> Result<Option<RegionDistribution>> {
440 self.storage
441 .get(table_id)
442 .await?
443 .map(|table_route| Ok(region_distribution(table_route.region_routes()?)))
444 .transpose()
445 }
446
447 pub fn table_route_storage(&self) -> &TableRouteStorage {
449 &self.storage
450 }
451}
452
453pub struct TableRouteStorage {
455 kv_backend: KvBackendRef,
456}
457
458pub type TableRouteValueDecodeResult = Result<Option<DeserializedValueWithBytes<TableRouteValue>>>;
459
460impl TableRouteStorage {
461 pub fn new(kv_backend: KvBackendRef) -> Self {
462 Self { kv_backend }
463 }
464
465 pub fn build_create_txn(
468 &self,
469 table_id: TableId,
470 table_route_value: &TableRouteValue,
471 ) -> Result<(
472 Txn,
473 impl FnOnce(&mut TxnOpGetResponseSet) -> TableRouteValueDecodeResult,
474 )> {
475 let key = TableRouteKey::new(table_id);
476 let raw_key = key.to_bytes();
477
478 let txn = Txn::put_if_not_exists(raw_key.clone(), table_route_value.try_as_raw_value()?);
479
480 Ok((
481 txn,
482 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
483 ))
484 }
485
486 pub fn build_update_txn(
491 &self,
492 table_id: TableId,
493 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
494 new_table_route_value: &TableRouteValue,
495 ) -> Result<(
496 Txn,
497 impl FnOnce(&mut TxnOpGetResponseSet) -> TableRouteValueDecodeResult,
498 )> {
499 let key = TableRouteKey::new(table_id);
500 let raw_key = key.to_bytes();
501 let raw_value = current_table_route_value.get_raw_bytes();
502 let new_raw_value: Vec<u8> = new_table_route_value.try_as_raw_value()?;
503
504 let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
505
506 Ok((
507 txn,
508 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
509 ))
510 }
511
512 pub async fn get(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
514 let mut table_route = self.get_inner(table_id).await?;
515 if let Some(table_route) = &mut table_route {
516 self.remap_route_address(table_route).await?;
517 };
518
519 Ok(table_route)
520 }
521
522 async fn get_inner(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
523 let key = TableRouteKey::new(table_id);
524 self.kv_backend
525 .get(&key.to_bytes())
526 .await?
527 .map(|kv| TableRouteValue::try_from_raw_value(&kv.value))
528 .transpose()
529 }
530
531 pub async fn get_with_raw_bytes(
533 &self,
534 table_id: TableId,
535 ) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
536 let mut table_route = self.get_with_raw_bytes_inner(table_id).await?;
537 if let Some(table_route) = &mut table_route {
538 self.remap_route_address(table_route).await?;
539 };
540
541 Ok(table_route)
542 }
543
544 async fn get_with_raw_bytes_inner(
545 &self,
546 table_id: TableId,
547 ) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
548 let key = TableRouteKey::new(table_id);
549 self.kv_backend
550 .get(&key.to_bytes())
551 .await?
552 .map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value))
553 .transpose()
554 }
555
556 pub async fn batch_get(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRouteValue>>> {
558 let mut table_routes = self.batch_get_inner(table_ids).await?;
559 self.remap_routes_addresses(&mut table_routes).await?;
560
561 Ok(table_routes)
562 }
563
564 async fn batch_get_inner(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRouteValue>>> {
565 let keys = table_ids
566 .iter()
567 .map(|id| TableRouteKey::new(*id).to_bytes())
568 .collect::<Vec<_>>();
569 let resp = self
570 .kv_backend
571 .batch_get(BatchGetRequest { keys: keys.clone() })
572 .await?;
573
574 let kvs = resp
575 .kvs
576 .into_iter()
577 .map(|kv| (kv.key, kv.value))
578 .collect::<HashMap<_, _>>();
579 keys.into_iter()
580 .map(|key| {
581 if let Some(value) = kvs.get(&key) {
582 Ok(Some(TableRouteValue::try_from_raw_value(value)?))
583 } else {
584 Ok(None)
585 }
586 })
587 .collect()
588 }
589
590 async fn remap_routes_addresses(
591 &self,
592 table_routes: &mut [Option<TableRouteValue>],
593 ) -> Result<()> {
594 let keys = table_routes
595 .iter()
596 .flat_map(|table_route| {
597 table_route
598 .as_ref()
599 .map(extract_address_keys)
600 .unwrap_or_default()
601 })
602 .collect::<HashSet<_>>()
603 .into_iter()
604 .collect();
605 let node_addrs = self.get_node_addresses(keys).await?;
606 for table_route in table_routes.iter_mut().flatten() {
607 set_addresses(&node_addrs, table_route)?;
608 }
609
610 Ok(())
611 }
612
613 async fn remap_route_address(&self, table_route: &mut TableRouteValue) -> Result<()> {
614 let keys = extract_address_keys(table_route).into_iter().collect();
615 let node_addrs = self.get_node_addresses(keys).await?;
616 set_addresses(&node_addrs, table_route)?;
617
618 Ok(())
619 }
620
621 async fn get_node_addresses(
622 &self,
623 keys: Vec<Vec<u8>>,
624 ) -> Result<HashMap<u64, NodeAddressValue>> {
625 if keys.is_empty() {
626 return Ok(HashMap::default());
627 }
628
629 self.kv_backend
630 .batch_get(BatchGetRequest { keys })
631 .await?
632 .kvs
633 .into_iter()
634 .map(|kv| {
635 let node_id = NodeAddressKey::from_bytes(&kv.key)?.node_id;
636 let node_addr = NodeAddressValue::try_from_raw_value(&kv.value)?;
637 Ok((node_id, node_addr))
638 })
639 .collect()
640 }
641}
642
643fn set_addresses(
644 node_addrs: &HashMap<u64, NodeAddressValue>,
645 table_route: &mut TableRouteValue,
646) -> Result<()> {
647 let TableRouteValue::Physical(physical_table_route) = table_route else {
648 return Ok(());
649 };
650
651 for region_route in &mut physical_table_route.region_routes {
652 if let Some(leader) = &mut region_route.leader_peer {
653 if let Some(node_addr) = node_addrs.get(&leader.id) {
654 leader.addr = node_addr.peer.addr.clone();
655 }
656 }
657 for follower in &mut region_route.follower_peers {
658 if let Some(node_addr) = node_addrs.get(&follower.id) {
659 follower.addr = node_addr.peer.addr.clone();
660 }
661 }
662 }
663
664 Ok(())
665}
666
667fn extract_address_keys(table_route: &TableRouteValue) -> HashSet<Vec<u8>> {
668 let TableRouteValue::Physical(physical_table_route) = table_route else {
669 return HashSet::default();
670 };
671
672 physical_table_route
673 .region_routes
674 .iter()
675 .flat_map(|region_route| {
676 region_route
677 .follower_peers
678 .iter()
679 .map(|peer| NodeAddressKey::with_datanode(peer.id).to_bytes())
680 .chain(
681 region_route
682 .leader_peer
683 .as_ref()
684 .map(|leader| NodeAddressKey::with_datanode(leader.id).to_bytes()),
685 )
686 })
687 .collect()
688}
689
690#[cfg(test)]
691mod tests {
692 use std::sync::Arc;
693
694 use super::*;
695 use crate::kv_backend::memory::MemoryKvBackend;
696 use crate::kv_backend::{KvBackend, TxnService};
697 use crate::peer::Peer;
698 use crate::rpc::router::Region;
699 use crate::rpc::store::PutRequest;
700
701 #[test]
702 fn test_table_route_compatibility() {
703 let old_raw_v = r#"{"region_routes":[{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]},{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]}],"version":0}"#;
704 let v = TableRouteValue::try_from_raw_value(old_raw_v.as_bytes()).unwrap();
705
706 let expected_table_route = TableRouteValue::Physical(PhysicalTableRouteValue {
707 region_routes: vec![
708 RegionRoute {
709 region: Region {
710 id: RegionId::new(0, 1),
711 name: "r1".to_string(),
712 partition: None,
713 attrs: Default::default(),
714 },
715 leader_peer: Some(Peer {
716 id: 2,
717 addr: "a2".to_string(),
718 }),
719 follower_peers: vec![],
720 leader_state: None,
721 leader_down_since: None,
722 },
723 RegionRoute {
724 region: Region {
725 id: RegionId::new(0, 1),
726 name: "r1".to_string(),
727 partition: None,
728 attrs: Default::default(),
729 },
730 leader_peer: Some(Peer {
731 id: 2,
732 addr: "a2".to_string(),
733 }),
734 follower_peers: vec![],
735 leader_state: None,
736 leader_down_since: None,
737 },
738 ],
739 version: 0,
740 });
741
742 assert_eq!(v, expected_table_route);
743 }
744
745 #[test]
746 fn test_key_serialization() {
747 let key = TableRouteKey::new(42);
748 let raw_key = key.to_bytes();
749 assert_eq!(raw_key, b"__table_route/42");
750 }
751
752 #[test]
753 fn test_key_deserialization() {
754 let expected = TableRouteKey::new(42);
755 let key = TableRouteKey::from_bytes(b"__table_route/42").unwrap();
756 assert_eq!(key, expected);
757 }
758
759 #[tokio::test]
760 async fn test_table_route_storage_get_with_raw_bytes_empty() {
761 let kv = Arc::new(MemoryKvBackend::default());
762 let table_route_storage = TableRouteStorage::new(kv);
763 let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
764 assert!(table_route.is_none());
765 }
766
767 #[tokio::test]
768 async fn test_table_route_storage_get_with_raw_bytes() {
769 let kv = Arc::new(MemoryKvBackend::default());
770 let table_route_storage = TableRouteStorage::new(kv.clone());
771 let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
772 assert!(table_route.is_none());
773 let table_route_manager = TableRouteManager::new(kv.clone());
774 let table_route_value = TableRouteValue::Logical(LogicalTableRouteValue {
775 physical_table_id: 1023,
776 region_ids: vec![RegionId::new(1023, 1)],
777 });
778 let (txn, _) = table_route_manager
779 .table_route_storage()
780 .build_create_txn(1024, &table_route_value)
781 .unwrap();
782 let r = kv.txn(txn).await.unwrap();
783 assert!(r.succeeded);
784 let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
785 assert!(table_route.is_some());
786 let got = table_route.unwrap().inner;
787 assert_eq!(got, table_route_value);
788 }
789
790 #[tokio::test]
791 async fn test_table_route_batch_get() {
792 let kv = Arc::new(MemoryKvBackend::default());
793 let table_route_storage = TableRouteStorage::new(kv.clone());
794 let routes = table_route_storage
795 .batch_get(&[1023, 1024, 1025])
796 .await
797 .unwrap();
798
799 assert!(routes.iter().all(Option::is_none));
800 let table_route_manager = TableRouteManager::new(kv.clone());
801 let routes = [
802 (
803 1024,
804 TableRouteValue::Logical(LogicalTableRouteValue {
805 physical_table_id: 1023,
806 region_ids: vec![RegionId::new(1023, 1)],
807 }),
808 ),
809 (
810 1025,
811 TableRouteValue::Logical(LogicalTableRouteValue {
812 physical_table_id: 1023,
813 region_ids: vec![RegionId::new(1023, 2)],
814 }),
815 ),
816 ];
817 for (table_id, route) in &routes {
818 let (txn, _) = table_route_manager
819 .table_route_storage()
820 .build_create_txn(*table_id, route)
821 .unwrap();
822 let r = kv.txn(txn).await.unwrap();
823 assert!(r.succeeded);
824 }
825
826 let results = table_route_storage
827 .batch_get(&[9999, 1025, 8888, 1024])
828 .await
829 .unwrap();
830 assert!(results[0].is_none());
831 assert_eq!(results[1].as_ref().unwrap(), &routes[1].1);
832 assert!(results[2].is_none());
833 assert_eq!(results[3].as_ref().unwrap(), &routes[0].1);
834 }
835
836 #[tokio::test]
837 async fn remap_route_address_updates_addresses() {
838 let kv = Arc::new(MemoryKvBackend::default());
839 let table_route_storage = TableRouteStorage::new(kv.clone());
840 let mut table_route = TableRouteValue::Physical(PhysicalTableRouteValue {
841 region_routes: vec![RegionRoute {
842 leader_peer: Some(Peer {
843 id: 1,
844 ..Default::default()
845 }),
846 follower_peers: vec![Peer {
847 id: 2,
848 ..Default::default()
849 }],
850 ..Default::default()
851 }],
852 version: 0,
853 });
854
855 kv.put(PutRequest {
856 key: NodeAddressKey::with_datanode(1).to_bytes(),
857 value: NodeAddressValue {
858 peer: Peer {
859 addr: "addr1".to_string(),
860 ..Default::default()
861 },
862 }
863 .try_as_raw_value()
864 .unwrap(),
865 ..Default::default()
866 })
867 .await
868 .unwrap();
869
870 table_route_storage
871 .remap_route_address(&mut table_route)
872 .await
873 .unwrap();
874
875 if let TableRouteValue::Physical(physical_table_route) = table_route {
876 assert_eq!(
877 physical_table_route.region_routes[0]
878 .leader_peer
879 .as_ref()
880 .unwrap()
881 .addr,
882 "addr1"
883 );
884 assert_eq!(
885 physical_table_route.region_routes[0].follower_peers[0].addr,
886 ""
887 );
888 } else {
889 panic!("Expected PhysicalTableRouteValue");
890 }
891 }
892}