1use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::sync::Arc;
18
19use serde::{Deserialize, Serialize};
20use snafu::{ensure, OptionExt, ResultExt};
21use store_api::storage::{RegionId, RegionNumber};
22use table::metadata::TableId;
23
24use crate::error::{
25 InvalidMetadataSnafu, MetadataCorruptionSnafu, Result, SerdeJsonSnafu, TableRouteNotFoundSnafu,
26 UnexpectedLogicalRouteTableSnafu,
27};
28use crate::key::node_address::{NodeAddressKey, NodeAddressValue};
29use crate::key::txn_helper::TxnOpGetResponseSet;
30use crate::key::{
31 DeserializedValueWithBytes, MetadataKey, MetadataValue, RegionDistribution,
32 TABLE_ROUTE_KEY_PATTERN, TABLE_ROUTE_PREFIX,
33};
34use crate::kv_backend::txn::Txn;
35use crate::kv_backend::KvBackendRef;
36use crate::rpc::router::{region_distribution, RegionRoute};
37use crate::rpc::store::BatchGetRequest;
38
39#[derive(Debug, PartialEq)]
43pub struct TableRouteKey {
44 pub table_id: TableId,
45}
46
47impl TableRouteKey {
48 pub fn new(table_id: TableId) -> Self {
49 Self { table_id }
50 }
51}
52
53#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
54#[serde(tag = "type", rename_all = "snake_case")]
55pub enum TableRouteValue {
56 Physical(PhysicalTableRouteValue),
57 Logical(LogicalTableRouteValue),
58}
59
60#[derive(Debug, PartialEq, Serialize, Deserialize, Clone, Default)]
61pub struct PhysicalTableRouteValue {
62 pub region_routes: Vec<RegionRoute>,
63 version: u64,
64}
65
66#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
67pub struct LogicalTableRouteValue {
68 physical_table_id: TableId,
69 region_ids: Vec<RegionId>,
70}
71
72impl TableRouteValue {
73 pub(crate) fn new(
76 table_id: TableId,
77 physical_table_id: TableId,
78 region_routes: Vec<RegionRoute>,
79 ) -> Self {
80 if table_id == physical_table_id {
81 TableRouteValue::physical(region_routes)
82 } else {
83 let region_routes = region_routes
84 .into_iter()
85 .map(|region| {
86 debug_assert_eq!(region.region.id.table_id(), physical_table_id);
87 RegionId::new(table_id, region.region.id.region_number())
88 })
89 .collect();
90 TableRouteValue::logical(physical_table_id, region_routes)
91 }
92 }
93
94 pub fn physical(region_routes: Vec<RegionRoute>) -> Self {
95 Self::Physical(PhysicalTableRouteValue::new(region_routes))
96 }
97
98 pub fn logical(physical_table_id: TableId, region_ids: Vec<RegionId>) -> Self {
99 Self::Logical(LogicalTableRouteValue::new(physical_table_id, region_ids))
100 }
101
102 pub fn update(&self, region_routes: Vec<RegionRoute>) -> Result<Self> {
104 ensure!(
105 self.is_physical(),
106 UnexpectedLogicalRouteTableSnafu {
107 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
108 }
109 );
110 let version = self.as_physical_table_route_ref().version;
111 Ok(Self::Physical(PhysicalTableRouteValue {
112 region_routes,
113 version: version + 1,
114 }))
115 }
116
117 #[cfg(any(test, feature = "testing"))]
121 pub fn version(&self) -> Result<u64> {
122 ensure!(
123 self.is_physical(),
124 UnexpectedLogicalRouteTableSnafu {
125 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
126 }
127 );
128 Ok(self.as_physical_table_route_ref().version)
129 }
130
131 pub fn region_route(&self, region_id: RegionId) -> Result<Option<RegionRoute>> {
135 ensure!(
136 self.is_physical(),
137 UnexpectedLogicalRouteTableSnafu {
138 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
139 }
140 );
141 Ok(self
142 .as_physical_table_route_ref()
143 .region_routes
144 .iter()
145 .find(|route| route.region.id == region_id)
146 .cloned())
147 }
148
149 pub fn is_physical(&self) -> bool {
151 matches!(self, TableRouteValue::Physical(_))
152 }
153
154 pub fn region_routes(&self) -> Result<&Vec<RegionRoute>> {
156 ensure!(
157 self.is_physical(),
158 UnexpectedLogicalRouteTableSnafu {
159 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
160 }
161 );
162 Ok(&self.as_physical_table_route_ref().region_routes)
163 }
164
165 fn as_physical_table_route_ref(&self) -> &PhysicalTableRouteValue {
170 match self {
171 TableRouteValue::Physical(x) => x,
172 _ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
173 }
174 }
175
176 pub fn into_physical_table_route(self) -> PhysicalTableRouteValue {
181 match self {
182 TableRouteValue::Physical(x) => x,
183 _ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
184 }
185 }
186
187 pub fn region_numbers(&self) -> Vec<RegionNumber> {
188 match self {
189 TableRouteValue::Physical(x) => x
190 .region_routes
191 .iter()
192 .map(|region_route| region_route.region.id.region_number())
193 .collect(),
194 TableRouteValue::Logical(x) => x
195 .region_ids()
196 .iter()
197 .map(|region_id| region_id.region_number())
198 .collect(),
199 }
200 }
201}
202
203impl MetadataValue for TableRouteValue {
204 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
205 let r = serde_json::from_slice::<TableRouteValue>(raw_value);
206 match r {
207 Err(e) if e.is_data() => Ok(Self::Physical(
209 serde_json::from_slice::<PhysicalTableRouteValue>(raw_value)
210 .context(SerdeJsonSnafu)?,
211 )),
212 Ok(x) => Ok(x),
213 Err(e) => Err(e).context(SerdeJsonSnafu),
214 }
215 }
216
217 fn try_as_raw_value(&self) -> Result<Vec<u8>> {
218 serde_json::to_vec(self).context(SerdeJsonSnafu)
219 }
220}
221
222impl PhysicalTableRouteValue {
223 pub fn new(region_routes: Vec<RegionRoute>) -> Self {
224 Self {
225 region_routes,
226 version: 0,
227 }
228 }
229}
230
231impl LogicalTableRouteValue {
232 pub fn new(physical_table_id: TableId, region_ids: Vec<RegionId>) -> Self {
233 Self {
234 physical_table_id,
235 region_ids,
236 }
237 }
238
239 pub fn physical_table_id(&self) -> TableId {
240 self.physical_table_id
241 }
242
243 pub fn region_ids(&self) -> &Vec<RegionId> {
244 &self.region_ids
245 }
246}
247
248impl MetadataKey<'_, TableRouteKey> for TableRouteKey {
249 fn to_bytes(&self) -> Vec<u8> {
250 self.to_string().into_bytes()
251 }
252
253 fn from_bytes(bytes: &[u8]) -> Result<TableRouteKey> {
254 let key = std::str::from_utf8(bytes).map_err(|e| {
255 InvalidMetadataSnafu {
256 err_msg: format!(
257 "TableRouteKey '{}' is not a valid UTF8 string: {e}",
258 String::from_utf8_lossy(bytes)
259 ),
260 }
261 .build()
262 })?;
263 let captures = TABLE_ROUTE_KEY_PATTERN
264 .captures(key)
265 .context(InvalidMetadataSnafu {
266 err_msg: format!("Invalid TableRouteKey '{key}'"),
267 })?;
268 let table_id = captures[1].parse::<TableId>().unwrap();
270 Ok(TableRouteKey { table_id })
271 }
272}
273
274impl Display for TableRouteKey {
275 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276 write!(f, "{}/{}", TABLE_ROUTE_PREFIX, self.table_id)
277 }
278}
279
280pub type TableRouteManagerRef = Arc<TableRouteManager>;
281
282pub struct TableRouteManager {
283 storage: TableRouteStorage,
284}
285
286impl TableRouteManager {
287 pub fn new(kv_backend: KvBackendRef) -> Self {
288 Self {
289 storage: TableRouteStorage::new(kv_backend),
290 }
291 }
292
293 pub async fn get_physical_table_id(
298 &self,
299 logical_or_physical_table_id: TableId,
300 ) -> Result<TableId> {
301 let table_route = self
302 .storage
303 .get_inner(logical_or_physical_table_id)
304 .await?
305 .context(TableRouteNotFoundSnafu {
306 table_id: logical_or_physical_table_id,
307 })?;
308
309 match table_route {
310 TableRouteValue::Physical(_) => Ok(logical_or_physical_table_id),
311 TableRouteValue::Logical(x) => Ok(x.physical_table_id()),
312 }
313 }
314
315 pub async fn get_physical_table_route(
321 &self,
322 logical_or_physical_table_id: TableId,
323 ) -> Result<(TableId, PhysicalTableRouteValue)> {
324 let table_route = self
325 .storage
326 .get(logical_or_physical_table_id)
327 .await?
328 .context(TableRouteNotFoundSnafu {
329 table_id: logical_or_physical_table_id,
330 })?;
331
332 match table_route {
333 TableRouteValue::Physical(x) => Ok((logical_or_physical_table_id, x)),
334 TableRouteValue::Logical(x) => {
335 let physical_table_id = x.physical_table_id();
336 let physical_table_route = self.storage.get(physical_table_id).await?.context(
337 TableRouteNotFoundSnafu {
338 table_id: physical_table_id,
339 },
340 )?;
341 let physical_table_route = physical_table_route.into_physical_table_route();
342 Ok((physical_table_id, physical_table_route))
343 }
344 }
345 }
346
347 pub async fn batch_get_physical_table_routes(
354 &self,
355 logical_or_physical_table_ids: &[TableId],
356 ) -> Result<HashMap<TableId, PhysicalTableRouteValue>> {
357 let table_routes = self
358 .storage
359 .batch_get(logical_or_physical_table_ids)
360 .await?;
361 let table_routes = table_routes
363 .into_iter()
364 .zip(logical_or_physical_table_ids)
365 .filter_map(|(route, id)| route.map(|route| (*id, route)))
366 .collect::<HashMap<_, _>>();
367
368 let mut physical_table_routes = HashMap::with_capacity(table_routes.len());
369 let mut logical_table_ids = HashMap::with_capacity(table_routes.len());
370
371 for (table_id, table_route) in table_routes {
372 match table_route {
373 TableRouteValue::Physical(x) => {
374 physical_table_routes.insert(table_id, x);
375 }
376 TableRouteValue::Logical(x) => {
377 logical_table_ids.insert(table_id, x.physical_table_id());
378 }
379 }
380 }
381
382 if logical_table_ids.is_empty() {
383 return Ok(physical_table_routes);
384 }
385
386 let physical_table_ids = logical_table_ids
388 .values()
389 .cloned()
390 .collect::<HashSet<_>>()
391 .into_iter()
392 .collect::<Vec<_>>();
393 let table_routes = self
394 .table_route_storage()
395 .batch_get(&physical_table_ids)
396 .await?;
397 let table_routes = table_routes
398 .into_iter()
399 .zip(physical_table_ids)
400 .filter_map(|(route, id)| route.map(|route| (id, route)))
401 .collect::<HashMap<_, _>>();
402
403 for (logical_table_id, physical_table_id) in logical_table_ids {
404 let table_route =
405 table_routes
406 .get(&physical_table_id)
407 .context(TableRouteNotFoundSnafu {
408 table_id: physical_table_id,
409 })?;
410 match table_route {
411 TableRouteValue::Physical(x) => {
412 physical_table_routes.insert(logical_table_id, x.clone());
413 }
414 TableRouteValue::Logical(x) => {
415 MetadataCorruptionSnafu {
417 err_msg: format!(
418 "logical table {} {:?} cannot be resolved to a physical table.",
419 logical_table_id, x
420 ),
421 }
422 .fail()?;
423 }
424 }
425 }
426
427 Ok(physical_table_routes)
428 }
429
430 pub async fn get_region_distribution(
432 &self,
433 table_id: TableId,
434 ) -> Result<Option<RegionDistribution>> {
435 self.storage
436 .get(table_id)
437 .await?
438 .map(|table_route| Ok(region_distribution(table_route.region_routes()?)))
439 .transpose()
440 }
441
442 pub fn table_route_storage(&self) -> &TableRouteStorage {
444 &self.storage
445 }
446}
447
448pub struct TableRouteStorage {
450 kv_backend: KvBackendRef,
451}
452
453pub type TableRouteValueDecodeResult = Result<Option<DeserializedValueWithBytes<TableRouteValue>>>;
454
455impl TableRouteStorage {
456 pub fn new(kv_backend: KvBackendRef) -> Self {
457 Self { kv_backend }
458 }
459
460 pub fn build_create_txn(
463 &self,
464 table_id: TableId,
465 table_route_value: &TableRouteValue,
466 ) -> Result<(
467 Txn,
468 impl FnOnce(&mut TxnOpGetResponseSet) -> TableRouteValueDecodeResult,
469 )> {
470 let key = TableRouteKey::new(table_id);
471 let raw_key = key.to_bytes();
472
473 let txn = Txn::put_if_not_exists(raw_key.clone(), table_route_value.try_as_raw_value()?);
474
475 Ok((
476 txn,
477 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
478 ))
479 }
480
481 pub fn build_update_txn(
486 &self,
487 table_id: TableId,
488 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
489 new_table_route_value: &TableRouteValue,
490 ) -> Result<(
491 Txn,
492 impl FnOnce(&mut TxnOpGetResponseSet) -> TableRouteValueDecodeResult,
493 )> {
494 let key = TableRouteKey::new(table_id);
495 let raw_key = key.to_bytes();
496 let raw_value = current_table_route_value.get_raw_bytes();
497 let new_raw_value: Vec<u8> = new_table_route_value.try_as_raw_value()?;
498
499 let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
500
501 Ok((
502 txn,
503 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
504 ))
505 }
506
507 pub async fn get(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
509 let mut table_route = self.get_inner(table_id).await?;
510 if let Some(table_route) = &mut table_route {
511 self.remap_route_address(table_route).await?;
512 };
513
514 Ok(table_route)
515 }
516
517 async fn get_inner(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
518 let key = TableRouteKey::new(table_id);
519 self.kv_backend
520 .get(&key.to_bytes())
521 .await?
522 .map(|kv| TableRouteValue::try_from_raw_value(&kv.value))
523 .transpose()
524 }
525
526 pub async fn get_with_raw_bytes(
528 &self,
529 table_id: TableId,
530 ) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
531 let mut table_route = self.get_with_raw_bytes_inner(table_id).await?;
532 if let Some(table_route) = &mut table_route {
533 self.remap_route_address(table_route).await?;
534 };
535
536 Ok(table_route)
537 }
538
539 async fn get_with_raw_bytes_inner(
540 &self,
541 table_id: TableId,
542 ) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
543 let key = TableRouteKey::new(table_id);
544 self.kv_backend
545 .get(&key.to_bytes())
546 .await?
547 .map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value))
548 .transpose()
549 }
550
551 pub async fn batch_get(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRouteValue>>> {
553 let mut table_routes = self.batch_get_inner(table_ids).await?;
554 self.remap_routes_addresses(&mut table_routes).await?;
555
556 Ok(table_routes)
557 }
558
559 async fn batch_get_inner(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRouteValue>>> {
560 let keys = table_ids
561 .iter()
562 .map(|id| TableRouteKey::new(*id).to_bytes())
563 .collect::<Vec<_>>();
564 let resp = self
565 .kv_backend
566 .batch_get(BatchGetRequest { keys: keys.clone() })
567 .await?;
568
569 let kvs = resp
570 .kvs
571 .into_iter()
572 .map(|kv| (kv.key, kv.value))
573 .collect::<HashMap<_, _>>();
574 keys.into_iter()
575 .map(|key| {
576 if let Some(value) = kvs.get(&key) {
577 Ok(Some(TableRouteValue::try_from_raw_value(value)?))
578 } else {
579 Ok(None)
580 }
581 })
582 .collect()
583 }
584
585 async fn remap_routes_addresses(
586 &self,
587 table_routes: &mut [Option<TableRouteValue>],
588 ) -> Result<()> {
589 let keys = table_routes
590 .iter()
591 .flat_map(|table_route| {
592 table_route
593 .as_ref()
594 .map(extract_address_keys)
595 .unwrap_or_default()
596 })
597 .collect::<HashSet<_>>()
598 .into_iter()
599 .collect();
600 let node_addrs = self.get_node_addresses(keys).await?;
601 for table_route in table_routes.iter_mut().flatten() {
602 set_addresses(&node_addrs, table_route)?;
603 }
604
605 Ok(())
606 }
607
608 async fn remap_route_address(&self, table_route: &mut TableRouteValue) -> Result<()> {
609 let keys = extract_address_keys(table_route).into_iter().collect();
610 let node_addrs = self.get_node_addresses(keys).await?;
611 set_addresses(&node_addrs, table_route)?;
612
613 Ok(())
614 }
615
616 async fn get_node_addresses(
617 &self,
618 keys: Vec<Vec<u8>>,
619 ) -> Result<HashMap<u64, NodeAddressValue>> {
620 if keys.is_empty() {
621 return Ok(HashMap::default());
622 }
623
624 self.kv_backend
625 .batch_get(BatchGetRequest { keys })
626 .await?
627 .kvs
628 .into_iter()
629 .map(|kv| {
630 let node_id = NodeAddressKey::from_bytes(&kv.key)?.node_id;
631 let node_addr = NodeAddressValue::try_from_raw_value(&kv.value)?;
632 Ok((node_id, node_addr))
633 })
634 .collect()
635 }
636}
637
638fn set_addresses(
639 node_addrs: &HashMap<u64, NodeAddressValue>,
640 table_route: &mut TableRouteValue,
641) -> Result<()> {
642 let TableRouteValue::Physical(physical_table_route) = table_route else {
643 return Ok(());
644 };
645
646 for region_route in &mut physical_table_route.region_routes {
647 if let Some(leader) = &mut region_route.leader_peer {
648 if let Some(node_addr) = node_addrs.get(&leader.id) {
649 leader.addr = node_addr.peer.addr.clone();
650 }
651 }
652 for follower in &mut region_route.follower_peers {
653 if let Some(node_addr) = node_addrs.get(&follower.id) {
654 follower.addr = node_addr.peer.addr.clone();
655 }
656 }
657 }
658
659 Ok(())
660}
661
662fn extract_address_keys(table_route: &TableRouteValue) -> HashSet<Vec<u8>> {
663 let TableRouteValue::Physical(physical_table_route) = table_route else {
664 return HashSet::default();
665 };
666
667 physical_table_route
668 .region_routes
669 .iter()
670 .flat_map(|region_route| {
671 region_route
672 .follower_peers
673 .iter()
674 .map(|peer| NodeAddressKey::with_datanode(peer.id).to_bytes())
675 .chain(
676 region_route
677 .leader_peer
678 .as_ref()
679 .map(|leader| NodeAddressKey::with_datanode(leader.id).to_bytes()),
680 )
681 })
682 .collect()
683}
684
685#[cfg(test)]
686mod tests {
687 use std::sync::Arc;
688
689 use super::*;
690 use crate::kv_backend::memory::MemoryKvBackend;
691 use crate::kv_backend::{KvBackend, TxnService};
692 use crate::peer::Peer;
693 use crate::rpc::router::Region;
694 use crate::rpc::store::PutRequest;
695
696 #[test]
697 fn test_table_route_compatibility() {
698 let old_raw_v = r#"{"region_routes":[{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]},{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]}],"version":0}"#;
699 let v = TableRouteValue::try_from_raw_value(old_raw_v.as_bytes()).unwrap();
700
701 let expected_table_route = TableRouteValue::Physical(PhysicalTableRouteValue {
702 region_routes: vec![
703 RegionRoute {
704 region: Region {
705 id: RegionId::new(0, 1),
706 name: "r1".to_string(),
707 partition: None,
708 attrs: Default::default(),
709 },
710 leader_peer: Some(Peer {
711 id: 2,
712 addr: "a2".to_string(),
713 }),
714 follower_peers: vec![],
715 leader_state: None,
716 leader_down_since: None,
717 },
718 RegionRoute {
719 region: Region {
720 id: RegionId::new(0, 1),
721 name: "r1".to_string(),
722 partition: None,
723 attrs: Default::default(),
724 },
725 leader_peer: Some(Peer {
726 id: 2,
727 addr: "a2".to_string(),
728 }),
729 follower_peers: vec![],
730 leader_state: None,
731 leader_down_since: None,
732 },
733 ],
734 version: 0,
735 });
736
737 assert_eq!(v, expected_table_route);
738 }
739
740 #[test]
741 fn test_key_serialization() {
742 let key = TableRouteKey::new(42);
743 let raw_key = key.to_bytes();
744 assert_eq!(raw_key, b"__table_route/42");
745 }
746
747 #[test]
748 fn test_key_deserialization() {
749 let expected = TableRouteKey::new(42);
750 let key = TableRouteKey::from_bytes(b"__table_route/42").unwrap();
751 assert_eq!(key, expected);
752 }
753
754 #[tokio::test]
755 async fn test_table_route_storage_get_with_raw_bytes_empty() {
756 let kv = Arc::new(MemoryKvBackend::default());
757 let table_route_storage = TableRouteStorage::new(kv);
758 let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
759 assert!(table_route.is_none());
760 }
761
762 #[tokio::test]
763 async fn test_table_route_storage_get_with_raw_bytes() {
764 let kv = Arc::new(MemoryKvBackend::default());
765 let table_route_storage = TableRouteStorage::new(kv.clone());
766 let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
767 assert!(table_route.is_none());
768 let table_route_manager = TableRouteManager::new(kv.clone());
769 let table_route_value = TableRouteValue::Logical(LogicalTableRouteValue {
770 physical_table_id: 1023,
771 region_ids: vec![RegionId::new(1023, 1)],
772 });
773 let (txn, _) = table_route_manager
774 .table_route_storage()
775 .build_create_txn(1024, &table_route_value)
776 .unwrap();
777 let r = kv.txn(txn).await.unwrap();
778 assert!(r.succeeded);
779 let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
780 assert!(table_route.is_some());
781 let got = table_route.unwrap().inner;
782 assert_eq!(got, table_route_value);
783 }
784
785 #[tokio::test]
786 async fn test_table_route_batch_get() {
787 let kv = Arc::new(MemoryKvBackend::default());
788 let table_route_storage = TableRouteStorage::new(kv.clone());
789 let routes = table_route_storage
790 .batch_get(&[1023, 1024, 1025])
791 .await
792 .unwrap();
793
794 assert!(routes.iter().all(Option::is_none));
795 let table_route_manager = TableRouteManager::new(kv.clone());
796 let routes = [
797 (
798 1024,
799 TableRouteValue::Logical(LogicalTableRouteValue {
800 physical_table_id: 1023,
801 region_ids: vec![RegionId::new(1023, 1)],
802 }),
803 ),
804 (
805 1025,
806 TableRouteValue::Logical(LogicalTableRouteValue {
807 physical_table_id: 1023,
808 region_ids: vec![RegionId::new(1023, 2)],
809 }),
810 ),
811 ];
812 for (table_id, route) in &routes {
813 let (txn, _) = table_route_manager
814 .table_route_storage()
815 .build_create_txn(*table_id, route)
816 .unwrap();
817 let r = kv.txn(txn).await.unwrap();
818 assert!(r.succeeded);
819 }
820
821 let results = table_route_storage
822 .batch_get(&[9999, 1025, 8888, 1024])
823 .await
824 .unwrap();
825 assert!(results[0].is_none());
826 assert_eq!(results[1].as_ref().unwrap(), &routes[1].1);
827 assert!(results[2].is_none());
828 assert_eq!(results[3].as_ref().unwrap(), &routes[0].1);
829 }
830
831 #[tokio::test]
832 async fn remap_route_address_updates_addresses() {
833 let kv = Arc::new(MemoryKvBackend::default());
834 let table_route_storage = TableRouteStorage::new(kv.clone());
835 let mut table_route = TableRouteValue::Physical(PhysicalTableRouteValue {
836 region_routes: vec![RegionRoute {
837 leader_peer: Some(Peer {
838 id: 1,
839 ..Default::default()
840 }),
841 follower_peers: vec![Peer {
842 id: 2,
843 ..Default::default()
844 }],
845 ..Default::default()
846 }],
847 version: 0,
848 });
849
850 kv.put(PutRequest {
851 key: NodeAddressKey::with_datanode(1).to_bytes(),
852 value: NodeAddressValue {
853 peer: Peer {
854 addr: "addr1".to_string(),
855 ..Default::default()
856 },
857 }
858 .try_as_raw_value()
859 .unwrap(),
860 ..Default::default()
861 })
862 .await
863 .unwrap();
864
865 table_route_storage
866 .remap_route_address(&mut table_route)
867 .await
868 .unwrap();
869
870 if let TableRouteValue::Physical(physical_table_route) = table_route {
871 assert_eq!(
872 physical_table_route.region_routes[0]
873 .leader_peer
874 .as_ref()
875 .unwrap()
876 .addr,
877 "addr1"
878 );
879 assert_eq!(
880 physical_table_route.region_routes[0].follower_peers[0].addr,
881 ""
882 );
883 } else {
884 panic!("Expected PhysicalTableRouteValue");
885 }
886 }
887}