1use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::sync::Arc;
18
19use serde::{Deserialize, Serialize};
20use snafu::{ensure, OptionExt, ResultExt};
21use store_api::storage::{RegionId, RegionNumber};
22use table::metadata::TableId;
23
24use crate::error::{
25 InvalidMetadataSnafu, MetadataCorruptionSnafu, RegionNotFoundSnafu, Result, SerdeJsonSnafu,
26 TableRouteNotFoundSnafu, UnexpectedLogicalRouteTableSnafu,
27};
28use crate::key::node_address::{NodeAddressKey, NodeAddressValue};
29use crate::key::txn_helper::TxnOpGetResponseSet;
30use crate::key::{
31 DeserializedValueWithBytes, MetadataKey, MetadataValue, RegionDistribution,
32 TABLE_ROUTE_KEY_PATTERN, TABLE_ROUTE_PREFIX,
33};
34use crate::kv_backend::txn::Txn;
35use crate::kv_backend::KvBackendRef;
36use crate::rpc::router::{region_distribution, RegionRoute};
37use crate::rpc::store::BatchGetRequest;
38
39#[derive(Debug, PartialEq)]
43pub struct TableRouteKey {
44 pub table_id: TableId,
45}
46
47impl TableRouteKey {
48 pub fn new(table_id: TableId) -> Self {
49 Self { table_id }
50 }
51
52 pub fn range_prefix() -> Vec<u8> {
54 format!("{}/", TABLE_ROUTE_PREFIX).into_bytes()
55 }
56}
57
58#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
59#[serde(tag = "type", rename_all = "snake_case")]
60pub enum TableRouteValue {
61 Physical(PhysicalTableRouteValue),
62 Logical(LogicalTableRouteValue),
63}
64
65#[derive(Debug, PartialEq, Serialize, Deserialize, Clone, Default)]
66pub struct PhysicalTableRouteValue {
67 pub region_routes: Vec<RegionRoute>,
68 version: u64,
69}
70
71#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
72pub struct LogicalTableRouteValue {
73 physical_table_id: TableId,
74 region_ids: Vec<RegionId>,
75}
76
77impl TableRouteValue {
78 pub(crate) fn new(
81 table_id: TableId,
82 physical_table_id: TableId,
83 region_routes: Vec<RegionRoute>,
84 ) -> Self {
85 if table_id == physical_table_id {
86 TableRouteValue::physical(region_routes)
87 } else {
88 let region_routes = region_routes
89 .into_iter()
90 .map(|region| {
91 debug_assert_eq!(region.region.id.table_id(), physical_table_id);
92 RegionId::new(table_id, region.region.id.region_number())
93 })
94 .collect();
95 TableRouteValue::logical(physical_table_id, region_routes)
96 }
97 }
98
99 pub fn physical(region_routes: Vec<RegionRoute>) -> Self {
100 Self::Physical(PhysicalTableRouteValue::new(region_routes))
101 }
102
103 pub fn logical(physical_table_id: TableId, region_ids: Vec<RegionId>) -> Self {
104 Self::Logical(LogicalTableRouteValue::new(physical_table_id, region_ids))
105 }
106
107 pub fn update(&self, region_routes: Vec<RegionRoute>) -> Result<Self> {
109 ensure!(
110 self.is_physical(),
111 UnexpectedLogicalRouteTableSnafu {
112 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
113 }
114 );
115 let version = self.as_physical_table_route_ref().version;
116 Ok(Self::Physical(PhysicalTableRouteValue {
117 region_routes,
118 version: version + 1,
119 }))
120 }
121
122 #[cfg(any(test, feature = "testing"))]
126 pub fn version(&self) -> Result<u64> {
127 ensure!(
128 self.is_physical(),
129 UnexpectedLogicalRouteTableSnafu {
130 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
131 }
132 );
133 Ok(self.as_physical_table_route_ref().version)
134 }
135
136 pub fn region_route(&self, region_id: RegionId) -> Result<Option<RegionRoute>> {
140 ensure!(
141 self.is_physical(),
142 UnexpectedLogicalRouteTableSnafu {
143 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
144 }
145 );
146 Ok(self
147 .as_physical_table_route_ref()
148 .region_routes
149 .iter()
150 .find(|route| route.region.id == region_id)
151 .cloned())
152 }
153
154 pub fn is_physical(&self) -> bool {
156 matches!(self, TableRouteValue::Physical(_))
157 }
158
159 pub fn region_routes(&self) -> Result<&Vec<RegionRoute>> {
161 ensure!(
162 self.is_physical(),
163 UnexpectedLogicalRouteTableSnafu {
164 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
165 }
166 );
167 Ok(&self.as_physical_table_route_ref().region_routes)
168 }
169
170 fn as_physical_table_route_ref(&self) -> &PhysicalTableRouteValue {
175 match self {
176 TableRouteValue::Physical(x) => x,
177 _ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
178 }
179 }
180
181 pub fn into_physical_table_route(self) -> PhysicalTableRouteValue {
186 match self {
187 TableRouteValue::Physical(x) => x,
188 _ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
189 }
190 }
191
192 pub fn into_logical_table_route(self) -> LogicalTableRouteValue {
197 match self {
198 TableRouteValue::Logical(x) => x,
199 _ => unreachable!("Mistakenly been treated as a Logical TableRoute: {self:?}"),
200 }
201 }
202
203 pub fn region_numbers(&self) -> Vec<RegionNumber> {
204 match self {
205 TableRouteValue::Physical(x) => x
206 .region_routes
207 .iter()
208 .map(|region_route| region_route.region.id.region_number())
209 .collect(),
210 TableRouteValue::Logical(x) => x
211 .region_ids()
212 .iter()
213 .map(|region_id| region_id.region_number())
214 .collect(),
215 }
216 }
217}
218
219impl MetadataValue for TableRouteValue {
220 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
221 let r = serde_json::from_slice::<TableRouteValue>(raw_value);
222 match r {
223 Err(e) if e.is_data() => Ok(Self::Physical(
225 serde_json::from_slice::<PhysicalTableRouteValue>(raw_value)
226 .context(SerdeJsonSnafu)?,
227 )),
228 Ok(x) => Ok(x),
229 Err(e) => Err(e).context(SerdeJsonSnafu),
230 }
231 }
232
233 fn try_as_raw_value(&self) -> Result<Vec<u8>> {
234 serde_json::to_vec(self).context(SerdeJsonSnafu)
235 }
236}
237
238impl PhysicalTableRouteValue {
239 pub fn new(region_routes: Vec<RegionRoute>) -> Self {
240 Self {
241 region_routes,
242 version: 0,
243 }
244 }
245}
246
247impl LogicalTableRouteValue {
248 pub fn new(physical_table_id: TableId, region_ids: Vec<RegionId>) -> Self {
249 Self {
250 physical_table_id,
251 region_ids,
252 }
253 }
254
255 pub fn physical_table_id(&self) -> TableId {
256 self.physical_table_id
257 }
258
259 pub fn region_ids(&self) -> &Vec<RegionId> {
260 &self.region_ids
261 }
262}
263
264impl MetadataKey<'_, TableRouteKey> for TableRouteKey {
265 fn to_bytes(&self) -> Vec<u8> {
266 self.to_string().into_bytes()
267 }
268
269 fn from_bytes(bytes: &[u8]) -> Result<TableRouteKey> {
270 let key = std::str::from_utf8(bytes).map_err(|e| {
271 InvalidMetadataSnafu {
272 err_msg: format!(
273 "TableRouteKey '{}' is not a valid UTF8 string: {e}",
274 String::from_utf8_lossy(bytes)
275 ),
276 }
277 .build()
278 })?;
279 let captures = TABLE_ROUTE_KEY_PATTERN
280 .captures(key)
281 .context(InvalidMetadataSnafu {
282 err_msg: format!("Invalid TableRouteKey '{key}'"),
283 })?;
284 let table_id = captures[1].parse::<TableId>().unwrap();
286 Ok(TableRouteKey { table_id })
287 }
288}
289
290impl Display for TableRouteKey {
291 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
292 write!(f, "{}/{}", TABLE_ROUTE_PREFIX, self.table_id)
293 }
294}
295
296pub type TableRouteManagerRef = Arc<TableRouteManager>;
297
298pub struct TableRouteManager {
299 storage: TableRouteStorage,
300}
301
302impl TableRouteManager {
303 pub fn new(kv_backend: KvBackendRef) -> Self {
304 Self {
305 storage: TableRouteStorage::new(kv_backend),
306 }
307 }
308
309 pub async fn get_physical_table_id(
314 &self,
315 logical_or_physical_table_id: TableId,
316 ) -> Result<TableId> {
317 let table_route = self
318 .storage
319 .get_inner(logical_or_physical_table_id)
320 .await?
321 .context(TableRouteNotFoundSnafu {
322 table_id: logical_or_physical_table_id,
323 })?;
324
325 match table_route {
326 TableRouteValue::Physical(_) => Ok(logical_or_physical_table_id),
327 TableRouteValue::Logical(x) => Ok(x.physical_table_id()),
328 }
329 }
330
331 pub async fn get_physical_table_route(
337 &self,
338 logical_or_physical_table_id: TableId,
339 ) -> Result<(TableId, PhysicalTableRouteValue)> {
340 let table_route = self
341 .storage
342 .get(logical_or_physical_table_id)
343 .await?
344 .context(TableRouteNotFoundSnafu {
345 table_id: logical_or_physical_table_id,
346 })?;
347
348 match table_route {
349 TableRouteValue::Physical(x) => Ok((logical_or_physical_table_id, x)),
350 TableRouteValue::Logical(x) => {
351 let physical_table_id = x.physical_table_id();
352 let physical_table_route = self.storage.get(physical_table_id).await?.context(
353 TableRouteNotFoundSnafu {
354 table_id: physical_table_id,
355 },
356 )?;
357 let physical_table_route = physical_table_route.into_physical_table_route();
358 Ok((physical_table_id, physical_table_route))
359 }
360 }
361 }
362
363 pub async fn batch_get_physical_table_routes(
370 &self,
371 logical_or_physical_table_ids: &[TableId],
372 ) -> Result<HashMap<TableId, PhysicalTableRouteValue>> {
373 let table_routes = self
374 .storage
375 .batch_get(logical_or_physical_table_ids)
376 .await?;
377 let table_routes = table_routes
379 .into_iter()
380 .zip(logical_or_physical_table_ids)
381 .filter_map(|(route, id)| route.map(|route| (*id, route)))
382 .collect::<HashMap<_, _>>();
383
384 let mut physical_table_routes = HashMap::with_capacity(table_routes.len());
385 let mut logical_table_ids = HashMap::with_capacity(table_routes.len());
386
387 for (table_id, table_route) in table_routes {
388 match table_route {
389 TableRouteValue::Physical(x) => {
390 physical_table_routes.insert(table_id, x);
391 }
392 TableRouteValue::Logical(x) => {
393 logical_table_ids.insert(table_id, x.physical_table_id());
394 }
395 }
396 }
397
398 if logical_table_ids.is_empty() {
399 return Ok(physical_table_routes);
400 }
401
402 let physical_table_ids = logical_table_ids
404 .values()
405 .cloned()
406 .collect::<HashSet<_>>()
407 .into_iter()
408 .collect::<Vec<_>>();
409 let table_routes = self
410 .table_route_storage()
411 .batch_get(&physical_table_ids)
412 .await?;
413 let table_routes = table_routes
414 .into_iter()
415 .zip(physical_table_ids)
416 .filter_map(|(route, id)| route.map(|route| (id, route)))
417 .collect::<HashMap<_, _>>();
418
419 for (logical_table_id, physical_table_id) in logical_table_ids {
420 let table_route =
421 table_routes
422 .get(&physical_table_id)
423 .context(TableRouteNotFoundSnafu {
424 table_id: physical_table_id,
425 })?;
426 match table_route {
427 TableRouteValue::Physical(x) => {
428 physical_table_routes.insert(logical_table_id, x.clone());
429 }
430 TableRouteValue::Logical(x) => {
431 MetadataCorruptionSnafu {
433 err_msg: format!(
434 "logical table {} {:?} cannot be resolved to a physical table.",
435 logical_table_id, x
436 ),
437 }
438 .fail()?;
439 }
440 }
441 }
442
443 Ok(physical_table_routes)
444 }
445
446 pub async fn get_region_distribution(
448 &self,
449 table_id: TableId,
450 ) -> Result<Option<RegionDistribution>> {
451 self.storage
452 .get(table_id)
453 .await?
454 .map(|table_route| Ok(region_distribution(table_route.region_routes()?)))
455 .transpose()
456 }
457
458 pub async fn set_region_staging_state(
464 &self,
465 region_id: store_api::storage::RegionId,
466 staging: bool,
467 ) -> Result<()> {
468 let table_id = region_id.table_id();
469
470 let current_table_route = self
472 .storage
473 .get_with_raw_bytes(table_id)
474 .await?
475 .context(TableRouteNotFoundSnafu { table_id })?;
476
477 let new_table_route = current_table_route.inner.clone();
479
480 ensure!(
482 new_table_route.is_physical(),
483 UnexpectedLogicalRouteTableSnafu {
484 err_msg: format!("Cannot set staging state for logical table {table_id}"),
485 }
486 );
487
488 let region_routes = new_table_route.region_routes()?.clone();
489 let mut updated_routes = region_routes.clone();
490
491 let mut region_found = false;
494 for route in &mut updated_routes {
495 if route.region.id == region_id {
496 if staging {
497 route.set_leader_staging();
498 } else {
499 route.clear_leader_staging();
500 }
501 region_found = true;
502 break;
503 }
504 }
505
506 ensure!(region_found, RegionNotFoundSnafu { region_id });
507
508 let updated_table_route = new_table_route.update(updated_routes)?;
510
511 let (txn, _) =
513 self.storage
514 .build_update_txn(table_id, ¤t_table_route, &updated_table_route)?;
515
516 let result = self.storage.kv_backend.txn(txn).await?;
517
518 ensure!(
519 result.succeeded,
520 MetadataCorruptionSnafu {
521 err_msg: format!(
522 "Failed to update staging state for region {}: CAS operation failed",
523 region_id
524 ),
525 }
526 );
527
528 Ok(())
529 }
530
531 pub async fn is_region_staging(&self, region_id: store_api::storage::RegionId) -> Result<bool> {
535 let table_id = region_id.table_id();
536
537 let table_route = self.storage.get(table_id).await?;
538
539 match table_route {
540 Some(route) if route.is_physical() => {
541 let region_routes = route.region_routes()?;
542 for route in region_routes {
543 if route.region.id == region_id {
544 return Ok(route.is_leader_staging());
545 }
546 }
547 Ok(false)
548 }
549 _ => Ok(false),
550 }
551 }
552
553 pub fn table_route_storage(&self) -> &TableRouteStorage {
555 &self.storage
556 }
557}
558
559pub struct TableRouteStorage {
561 kv_backend: KvBackendRef,
562}
563
564pub type TableRouteValueDecodeResult = Result<Option<DeserializedValueWithBytes<TableRouteValue>>>;
565
566impl TableRouteStorage {
567 pub fn new(kv_backend: KvBackendRef) -> Self {
568 Self { kv_backend }
569 }
570
571 pub fn build_create_txn(
574 &self,
575 table_id: TableId,
576 table_route_value: &TableRouteValue,
577 ) -> Result<(
578 Txn,
579 impl FnOnce(&mut TxnOpGetResponseSet) -> TableRouteValueDecodeResult,
580 )> {
581 let key = TableRouteKey::new(table_id);
582 let raw_key = key.to_bytes();
583
584 let txn = Txn::put_if_not_exists(raw_key.clone(), table_route_value.try_as_raw_value()?);
585
586 Ok((
587 txn,
588 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
589 ))
590 }
591
592 pub fn build_update_txn(
597 &self,
598 table_id: TableId,
599 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
600 new_table_route_value: &TableRouteValue,
601 ) -> Result<(
602 Txn,
603 impl FnOnce(&mut TxnOpGetResponseSet) -> TableRouteValueDecodeResult,
604 )> {
605 let key = TableRouteKey::new(table_id);
606 let raw_key = key.to_bytes();
607 let raw_value = current_table_route_value.get_raw_bytes();
608 let new_raw_value: Vec<u8> = new_table_route_value.try_as_raw_value()?;
609
610 let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
611
612 Ok((
613 txn,
614 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
615 ))
616 }
617
618 pub async fn get(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
620 let mut table_route = self.get_inner(table_id).await?;
621 if let Some(table_route) = &mut table_route {
622 self.remap_route_address(table_route).await?;
623 };
624
625 Ok(table_route)
626 }
627
628 async fn get_inner(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
629 let key = TableRouteKey::new(table_id);
630 self.kv_backend
631 .get(&key.to_bytes())
632 .await?
633 .map(|kv| TableRouteValue::try_from_raw_value(&kv.value))
634 .transpose()
635 }
636
637 pub async fn get_with_raw_bytes(
639 &self,
640 table_id: TableId,
641 ) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
642 let mut table_route = self.get_with_raw_bytes_inner(table_id).await?;
643 if let Some(table_route) = &mut table_route {
644 self.remap_route_address(table_route).await?;
645 };
646
647 Ok(table_route)
648 }
649
650 async fn get_with_raw_bytes_inner(
651 &self,
652 table_id: TableId,
653 ) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
654 let key = TableRouteKey::new(table_id);
655 self.kv_backend
656 .get(&key.to_bytes())
657 .await?
658 .map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value))
659 .transpose()
660 }
661
662 pub async fn batch_get(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRouteValue>>> {
664 let mut table_routes = self.batch_get_inner(table_ids).await?;
665 self.remap_routes_addresses(&mut table_routes).await?;
666
667 Ok(table_routes)
668 }
669
670 async fn batch_get_inner(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRouteValue>>> {
671 let keys = table_ids
672 .iter()
673 .map(|id| TableRouteKey::new(*id).to_bytes())
674 .collect::<Vec<_>>();
675 let resp = self
676 .kv_backend
677 .batch_get(BatchGetRequest { keys: keys.clone() })
678 .await?;
679
680 let kvs = resp
681 .kvs
682 .into_iter()
683 .map(|kv| (kv.key, kv.value))
684 .collect::<HashMap<_, _>>();
685 keys.into_iter()
686 .map(|key| {
687 if let Some(value) = kvs.get(&key) {
688 Ok(Some(TableRouteValue::try_from_raw_value(value)?))
689 } else {
690 Ok(None)
691 }
692 })
693 .collect()
694 }
695
696 async fn remap_routes_addresses(
697 &self,
698 table_routes: &mut [Option<TableRouteValue>],
699 ) -> Result<()> {
700 let keys = table_routes
701 .iter()
702 .flat_map(|table_route| {
703 table_route
704 .as_ref()
705 .map(extract_address_keys)
706 .unwrap_or_default()
707 })
708 .collect::<HashSet<_>>()
709 .into_iter()
710 .collect();
711 let node_addrs = self.get_node_addresses(keys).await?;
712 for table_route in table_routes.iter_mut().flatten() {
713 set_addresses(&node_addrs, table_route)?;
714 }
715
716 Ok(())
717 }
718
719 async fn remap_route_address(&self, table_route: &mut TableRouteValue) -> Result<()> {
720 let keys = extract_address_keys(table_route).into_iter().collect();
721 let node_addrs = self.get_node_addresses(keys).await?;
722 set_addresses(&node_addrs, table_route)?;
723
724 Ok(())
725 }
726
727 async fn get_node_addresses(
728 &self,
729 keys: Vec<Vec<u8>>,
730 ) -> Result<HashMap<u64, NodeAddressValue>> {
731 if keys.is_empty() {
732 return Ok(HashMap::default());
733 }
734
735 self.kv_backend
736 .batch_get(BatchGetRequest { keys })
737 .await?
738 .kvs
739 .into_iter()
740 .map(|kv| {
741 let node_id = NodeAddressKey::from_bytes(&kv.key)?.node_id;
742 let node_addr = NodeAddressValue::try_from_raw_value(&kv.value)?;
743 Ok((node_id, node_addr))
744 })
745 .collect()
746 }
747}
748
749fn set_addresses(
750 node_addrs: &HashMap<u64, NodeAddressValue>,
751 table_route: &mut TableRouteValue,
752) -> Result<()> {
753 let TableRouteValue::Physical(physical_table_route) = table_route else {
754 return Ok(());
755 };
756
757 for region_route in &mut physical_table_route.region_routes {
758 if let Some(leader) = &mut region_route.leader_peer {
759 if let Some(node_addr) = node_addrs.get(&leader.id) {
760 leader.addr = node_addr.peer.addr.clone();
761 }
762 }
763 for follower in &mut region_route.follower_peers {
764 if let Some(node_addr) = node_addrs.get(&follower.id) {
765 follower.addr = node_addr.peer.addr.clone();
766 }
767 }
768 }
769
770 Ok(())
771}
772
773fn extract_address_keys(table_route: &TableRouteValue) -> HashSet<Vec<u8>> {
774 let TableRouteValue::Physical(physical_table_route) = table_route else {
775 return HashSet::default();
776 };
777
778 physical_table_route
779 .region_routes
780 .iter()
781 .flat_map(|region_route| {
782 region_route
783 .follower_peers
784 .iter()
785 .map(|peer| NodeAddressKey::with_datanode(peer.id).to_bytes())
786 .chain(
787 region_route
788 .leader_peer
789 .as_ref()
790 .map(|leader| NodeAddressKey::with_datanode(leader.id).to_bytes()),
791 )
792 })
793 .collect()
794}
795
796#[cfg(test)]
797mod tests {
798 use std::sync::Arc;
799
800 use super::*;
801 use crate::kv_backend::memory::MemoryKvBackend;
802 use crate::kv_backend::{KvBackend, TxnService};
803 use crate::peer::Peer;
804 use crate::rpc::router::Region;
805 use crate::rpc::store::PutRequest;
806
807 #[test]
808 fn test_table_route_compatibility() {
809 let old_raw_v = r#"{"region_routes":[{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]},{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]}],"version":0}"#;
810 let v = TableRouteValue::try_from_raw_value(old_raw_v.as_bytes()).unwrap();
811
812 let expected_table_route = TableRouteValue::Physical(PhysicalTableRouteValue {
813 region_routes: vec![
814 RegionRoute {
815 region: Region {
816 id: RegionId::new(0, 1),
817 name: "r1".to_string(),
818 partition: None,
819 attrs: Default::default(),
820 partition_expr: Default::default(),
821 },
822 leader_peer: Some(Peer {
823 id: 2,
824 addr: "a2".to_string(),
825 }),
826 follower_peers: vec![],
827 leader_state: None,
828 leader_down_since: None,
829 },
830 RegionRoute {
831 region: Region {
832 id: RegionId::new(0, 1),
833 name: "r1".to_string(),
834 partition: None,
835 attrs: Default::default(),
836 partition_expr: Default::default(),
837 },
838 leader_peer: Some(Peer {
839 id: 2,
840 addr: "a2".to_string(),
841 }),
842 follower_peers: vec![],
843 leader_state: None,
844 leader_down_since: None,
845 },
846 ],
847 version: 0,
848 });
849
850 assert_eq!(v, expected_table_route);
851 }
852
853 #[test]
854 fn test_key_serialization() {
855 let key = TableRouteKey::new(42);
856 let raw_key = key.to_bytes();
857 assert_eq!(raw_key, b"__table_route/42");
858 }
859
860 #[test]
861 fn test_key_deserialization() {
862 let expected = TableRouteKey::new(42);
863 let key = TableRouteKey::from_bytes(b"__table_route/42").unwrap();
864 assert_eq!(key, expected);
865 }
866
867 #[tokio::test]
868 async fn test_table_route_storage_get_with_raw_bytes_empty() {
869 let kv = Arc::new(MemoryKvBackend::default());
870 let table_route_storage = TableRouteStorage::new(kv);
871 let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
872 assert!(table_route.is_none());
873 }
874
875 #[tokio::test]
876 async fn test_table_route_storage_get_with_raw_bytes() {
877 let kv = Arc::new(MemoryKvBackend::default());
878 let table_route_storage = TableRouteStorage::new(kv.clone());
879 let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
880 assert!(table_route.is_none());
881 let table_route_manager = TableRouteManager::new(kv.clone());
882 let table_route_value = TableRouteValue::Logical(LogicalTableRouteValue {
883 physical_table_id: 1023,
884 region_ids: vec![RegionId::new(1023, 1)],
885 });
886 let (txn, _) = table_route_manager
887 .table_route_storage()
888 .build_create_txn(1024, &table_route_value)
889 .unwrap();
890 let r = kv.txn(txn).await.unwrap();
891 assert!(r.succeeded);
892 let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
893 assert!(table_route.is_some());
894 let got = table_route.unwrap().inner;
895 assert_eq!(got, table_route_value);
896 }
897
898 #[tokio::test]
899 async fn test_table_route_batch_get() {
900 let kv = Arc::new(MemoryKvBackend::default());
901 let table_route_storage = TableRouteStorage::new(kv.clone());
902 let routes = table_route_storage
903 .batch_get(&[1023, 1024, 1025])
904 .await
905 .unwrap();
906
907 assert!(routes.iter().all(Option::is_none));
908 let table_route_manager = TableRouteManager::new(kv.clone());
909 let routes = [
910 (
911 1024,
912 TableRouteValue::Logical(LogicalTableRouteValue {
913 physical_table_id: 1023,
914 region_ids: vec![RegionId::new(1023, 1)],
915 }),
916 ),
917 (
918 1025,
919 TableRouteValue::Logical(LogicalTableRouteValue {
920 physical_table_id: 1023,
921 region_ids: vec![RegionId::new(1023, 2)],
922 }),
923 ),
924 ];
925 for (table_id, route) in &routes {
926 let (txn, _) = table_route_manager
927 .table_route_storage()
928 .build_create_txn(*table_id, route)
929 .unwrap();
930 let r = kv.txn(txn).await.unwrap();
931 assert!(r.succeeded);
932 }
933
934 let results = table_route_storage
935 .batch_get(&[9999, 1025, 8888, 1024])
936 .await
937 .unwrap();
938 assert!(results[0].is_none());
939 assert_eq!(results[1].as_ref().unwrap(), &routes[1].1);
940 assert!(results[2].is_none());
941 assert_eq!(results[3].as_ref().unwrap(), &routes[0].1);
942 }
943
944 #[tokio::test]
945 async fn remap_route_address_updates_addresses() {
946 let kv = Arc::new(MemoryKvBackend::default());
947 let table_route_storage = TableRouteStorage::new(kv.clone());
948 let mut table_route = TableRouteValue::Physical(PhysicalTableRouteValue {
949 region_routes: vec![RegionRoute {
950 leader_peer: Some(Peer {
951 id: 1,
952 ..Default::default()
953 }),
954 follower_peers: vec![Peer {
955 id: 2,
956 ..Default::default()
957 }],
958 ..Default::default()
959 }],
960 version: 0,
961 });
962
963 kv.put(PutRequest {
964 key: NodeAddressKey::with_datanode(1).to_bytes(),
965 value: NodeAddressValue {
966 peer: Peer {
967 addr: "addr1".to_string(),
968 ..Default::default()
969 },
970 }
971 .try_as_raw_value()
972 .unwrap(),
973 ..Default::default()
974 })
975 .await
976 .unwrap();
977
978 table_route_storage
979 .remap_route_address(&mut table_route)
980 .await
981 .unwrap();
982
983 if let TableRouteValue::Physical(physical_table_route) = table_route {
984 assert_eq!(
985 physical_table_route.region_routes[0]
986 .leader_peer
987 .as_ref()
988 .unwrap()
989 .addr,
990 "addr1"
991 );
992 assert_eq!(
993 physical_table_route.region_routes[0].follower_peers[0].addr,
994 ""
995 );
996 } else {
997 panic!("Expected PhysicalTableRouteValue");
998 }
999 }
1000}