1use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::sync::Arc;
18
19use serde::{Deserialize, Serialize};
20use snafu::{OptionExt, ResultExt, ensure};
21use store_api::storage::{RegionId, RegionNumber};
22use table::metadata::TableId;
23
24use crate::error::{
25 InvalidMetadataSnafu, MetadataCorruptionSnafu, RegionNotFoundSnafu, Result, SerdeJsonSnafu,
26 TableRouteNotFoundSnafu, UnexpectedLogicalRouteTableSnafu,
27};
28use crate::key::node_address::{NodeAddressKey, NodeAddressValue};
29use crate::key::txn_helper::TxnOpGetResponseSet;
30use crate::key::{
31 DeserializedValueWithBytes, MetadataKey, MetadataValue, RegionDistribution,
32 TABLE_ROUTE_KEY_PATTERN, TABLE_ROUTE_PREFIX,
33};
34use crate::kv_backend::KvBackendRef;
35use crate::kv_backend::txn::Txn;
36use crate::rpc::router::{RegionRoute, region_distribution};
37use crate::rpc::store::BatchGetRequest;
38
39#[derive(Debug, PartialEq)]
43pub struct TableRouteKey {
44 pub table_id: TableId,
45}
46
47impl TableRouteKey {
48 pub fn new(table_id: TableId) -> Self {
49 Self { table_id }
50 }
51
52 pub fn range_prefix() -> Vec<u8> {
54 format!("{}/", TABLE_ROUTE_PREFIX).into_bytes()
55 }
56}
57
58#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
59#[serde(tag = "type", rename_all = "snake_case")]
60pub enum TableRouteValue {
61 Physical(PhysicalTableRouteValue),
62 Logical(LogicalTableRouteValue),
63}
64
65#[derive(Debug, PartialEq, Serialize, Deserialize, Clone, Default)]
66pub struct PhysicalTableRouteValue {
67 pub region_routes: Vec<RegionRoute>,
68 version: u64,
69}
70
71#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
72pub struct LogicalTableRouteValue {
73 physical_table_id: TableId,
74 region_ids: Vec<RegionId>,
75}
76
77impl TableRouteValue {
78 pub(crate) fn new(
81 table_id: TableId,
82 physical_table_id: TableId,
83 region_routes: Vec<RegionRoute>,
84 ) -> Self {
85 if table_id == physical_table_id {
86 TableRouteValue::physical(region_routes)
87 } else {
88 let region_routes = region_routes
89 .into_iter()
90 .map(|region| {
91 debug_assert_eq!(region.region.id.table_id(), physical_table_id);
92 RegionId::new(table_id, region.region.id.region_number())
93 })
94 .collect();
95 TableRouteValue::logical(physical_table_id, region_routes)
96 }
97 }
98
99 pub fn physical(region_routes: Vec<RegionRoute>) -> Self {
100 Self::Physical(PhysicalTableRouteValue::new(region_routes))
101 }
102
103 pub fn logical(physical_table_id: TableId, region_ids: Vec<RegionId>) -> Self {
104 Self::Logical(LogicalTableRouteValue::new(physical_table_id, region_ids))
105 }
106
107 pub fn update(&self, region_routes: Vec<RegionRoute>) -> Result<Self> {
109 ensure!(
110 self.is_physical(),
111 UnexpectedLogicalRouteTableSnafu {
112 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
113 }
114 );
115 let version = self.as_physical_table_route_ref().version;
116 Ok(Self::Physical(PhysicalTableRouteValue {
117 region_routes,
118 version: version + 1,
119 }))
120 }
121
122 #[cfg(any(test, feature = "testing"))]
126 pub fn version(&self) -> Result<u64> {
127 ensure!(
128 self.is_physical(),
129 UnexpectedLogicalRouteTableSnafu {
130 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
131 }
132 );
133 Ok(self.as_physical_table_route_ref().version)
134 }
135
136 pub fn region_route(&self, region_id: RegionId) -> Result<Option<RegionRoute>> {
140 ensure!(
141 self.is_physical(),
142 UnexpectedLogicalRouteTableSnafu {
143 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
144 }
145 );
146 Ok(self
147 .as_physical_table_route_ref()
148 .region_routes
149 .iter()
150 .find(|route| route.region.id == region_id)
151 .cloned())
152 }
153
154 pub fn is_physical(&self) -> bool {
156 matches!(self, TableRouteValue::Physical(_))
157 }
158
159 pub fn region_routes(&self) -> Result<&Vec<RegionRoute>> {
161 ensure!(
162 self.is_physical(),
163 UnexpectedLogicalRouteTableSnafu {
164 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
165 }
166 );
167 Ok(&self.as_physical_table_route_ref().region_routes)
168 }
169
170 fn as_physical_table_route_ref(&self) -> &PhysicalTableRouteValue {
175 match self {
176 TableRouteValue::Physical(x) => x,
177 _ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
178 }
179 }
180
181 pub fn into_physical_table_route(self) -> PhysicalTableRouteValue {
186 match self {
187 TableRouteValue::Physical(x) => x,
188 _ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
189 }
190 }
191
192 pub fn into_logical_table_route(self) -> LogicalTableRouteValue {
197 match self {
198 TableRouteValue::Logical(x) => x,
199 _ => unreachable!("Mistakenly been treated as a Logical TableRoute: {self:?}"),
200 }
201 }
202
203 pub fn region_numbers(&self) -> Vec<RegionNumber> {
204 match self {
205 TableRouteValue::Physical(x) => x
206 .region_routes
207 .iter()
208 .map(|region_route| region_route.region.id.region_number())
209 .collect(),
210 TableRouteValue::Logical(x) => x
211 .region_ids()
212 .iter()
213 .map(|region_id| region_id.region_number())
214 .collect(),
215 }
216 }
217}
218
219impl MetadataValue for TableRouteValue {
220 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
221 let r = serde_json::from_slice::<TableRouteValue>(raw_value);
222 match r {
223 Err(e) if e.is_data() => Ok(Self::Physical(
225 serde_json::from_slice::<PhysicalTableRouteValue>(raw_value)
226 .context(SerdeJsonSnafu)?,
227 )),
228 Ok(x) => Ok(x),
229 Err(e) => Err(e).context(SerdeJsonSnafu),
230 }
231 }
232
233 fn try_as_raw_value(&self) -> Result<Vec<u8>> {
234 serde_json::to_vec(self).context(SerdeJsonSnafu)
235 }
236}
237
238impl PhysicalTableRouteValue {
239 pub fn new(region_routes: Vec<RegionRoute>) -> Self {
240 Self {
241 region_routes,
242 version: 0,
243 }
244 }
245}
246
247impl LogicalTableRouteValue {
248 pub fn new(physical_table_id: TableId, region_ids: Vec<RegionId>) -> Self {
249 Self {
250 physical_table_id,
251 region_ids,
252 }
253 }
254
255 pub fn physical_table_id(&self) -> TableId {
256 self.physical_table_id
257 }
258
259 pub fn region_ids(&self) -> &Vec<RegionId> {
260 &self.region_ids
261 }
262}
263
264impl MetadataKey<'_, TableRouteKey> for TableRouteKey {
265 fn to_bytes(&self) -> Vec<u8> {
266 self.to_string().into_bytes()
267 }
268
269 fn from_bytes(bytes: &[u8]) -> Result<TableRouteKey> {
270 let key = std::str::from_utf8(bytes).map_err(|e| {
271 InvalidMetadataSnafu {
272 err_msg: format!(
273 "TableRouteKey '{}' is not a valid UTF8 string: {e}",
274 String::from_utf8_lossy(bytes)
275 ),
276 }
277 .build()
278 })?;
279 let captures = TABLE_ROUTE_KEY_PATTERN
280 .captures(key)
281 .context(InvalidMetadataSnafu {
282 err_msg: format!("Invalid TableRouteKey '{key}'"),
283 })?;
284 let table_id = captures[1].parse::<TableId>().unwrap();
286 Ok(TableRouteKey { table_id })
287 }
288}
289
290impl Display for TableRouteKey {
291 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
292 write!(f, "{}/{}", TABLE_ROUTE_PREFIX, self.table_id)
293 }
294}
295
296pub type TableRouteManagerRef = Arc<TableRouteManager>;
297
298pub struct TableRouteManager {
299 storage: TableRouteStorage,
300}
301
302impl TableRouteManager {
303 pub fn new(kv_backend: KvBackendRef) -> Self {
304 Self {
305 storage: TableRouteStorage::new(kv_backend),
306 }
307 }
308
309 pub async fn get_physical_table_id(
314 &self,
315 logical_or_physical_table_id: TableId,
316 ) -> Result<TableId> {
317 let table_route = self
318 .storage
319 .get_inner(logical_or_physical_table_id)
320 .await?
321 .context(TableRouteNotFoundSnafu {
322 table_id: logical_or_physical_table_id,
323 })?;
324
325 match table_route {
326 TableRouteValue::Physical(_) => Ok(logical_or_physical_table_id),
327 TableRouteValue::Logical(x) => Ok(x.physical_table_id()),
328 }
329 }
330
331 pub async fn get_physical_table_route(
337 &self,
338 logical_or_physical_table_id: TableId,
339 ) -> Result<(TableId, PhysicalTableRouteValue)> {
340 let table_route = self
341 .storage
342 .get(logical_or_physical_table_id)
343 .await?
344 .context(TableRouteNotFoundSnafu {
345 table_id: logical_or_physical_table_id,
346 })?;
347
348 match table_route {
349 TableRouteValue::Physical(x) => Ok((logical_or_physical_table_id, x)),
350 TableRouteValue::Logical(x) => {
351 let physical_table_id = x.physical_table_id();
352 let physical_table_route = self.storage.get(physical_table_id).await?.context(
353 TableRouteNotFoundSnafu {
354 table_id: physical_table_id,
355 },
356 )?;
357 let physical_table_route = physical_table_route.into_physical_table_route();
358 Ok((physical_table_id, physical_table_route))
359 }
360 }
361 }
362
363 pub async fn batch_get_physical_table_routes(
370 &self,
371 logical_or_physical_table_ids: &[TableId],
372 ) -> Result<HashMap<TableId, PhysicalTableRouteValue>> {
373 let table_routes = self
374 .storage
375 .batch_get(logical_or_physical_table_ids)
376 .await?;
377 let table_routes = table_routes
379 .into_iter()
380 .zip(logical_or_physical_table_ids)
381 .filter_map(|(route, id)| route.map(|route| (*id, route)))
382 .collect::<HashMap<_, _>>();
383
384 let mut physical_table_routes = HashMap::with_capacity(table_routes.len());
385 let mut logical_table_ids = HashMap::with_capacity(table_routes.len());
386
387 for (table_id, table_route) in table_routes {
388 match table_route {
389 TableRouteValue::Physical(x) => {
390 physical_table_routes.insert(table_id, x);
391 }
392 TableRouteValue::Logical(x) => {
393 logical_table_ids.insert(table_id, x.physical_table_id());
394 }
395 }
396 }
397
398 if logical_table_ids.is_empty() {
399 return Ok(physical_table_routes);
400 }
401
402 let physical_table_ids = logical_table_ids
404 .values()
405 .cloned()
406 .collect::<HashSet<_>>()
407 .into_iter()
408 .collect::<Vec<_>>();
409 let table_routes = self
410 .table_route_storage()
411 .batch_get(&physical_table_ids)
412 .await?;
413 let table_routes = table_routes
414 .into_iter()
415 .zip(physical_table_ids)
416 .filter_map(|(route, id)| route.map(|route| (id, route)))
417 .collect::<HashMap<_, _>>();
418
419 for (logical_table_id, physical_table_id) in logical_table_ids {
420 let table_route =
421 table_routes
422 .get(&physical_table_id)
423 .context(TableRouteNotFoundSnafu {
424 table_id: physical_table_id,
425 })?;
426 match table_route {
427 TableRouteValue::Physical(x) => {
428 physical_table_routes.insert(logical_table_id, x.clone());
429 }
430 TableRouteValue::Logical(x) => {
431 MetadataCorruptionSnafu {
433 err_msg: format!(
434 "logical table {} {:?} cannot be resolved to a physical table.",
435 logical_table_id, x
436 ),
437 }
438 .fail()?;
439 }
440 }
441 }
442
443 Ok(physical_table_routes)
444 }
445
446 pub async fn get_region_distribution(
448 &self,
449 table_id: TableId,
450 ) -> Result<Option<RegionDistribution>> {
451 self.storage
452 .get(table_id)
453 .await?
454 .map(|table_route| Ok(region_distribution(table_route.region_routes()?)))
455 .transpose()
456 }
457
458 pub async fn set_region_staging_state(
464 &self,
465 region_id: store_api::storage::RegionId,
466 staging: bool,
467 ) -> Result<()> {
468 let table_id = region_id.table_id();
469
470 let current_table_route = self
472 .storage
473 .get_with_raw_bytes(table_id)
474 .await?
475 .context(TableRouteNotFoundSnafu { table_id })?;
476
477 let new_table_route = current_table_route.inner.clone();
479
480 ensure!(
482 new_table_route.is_physical(),
483 UnexpectedLogicalRouteTableSnafu {
484 err_msg: format!("Cannot set staging state for logical table {table_id}"),
485 }
486 );
487
488 let region_routes = new_table_route.region_routes()?.clone();
489 let mut updated_routes = region_routes.clone();
490
491 let mut region_found = false;
494 for route in &mut updated_routes {
495 if route.region.id == region_id {
496 if staging {
497 route.set_leader_staging();
498 } else {
499 route.clear_leader_staging();
500 }
501 region_found = true;
502 break;
503 }
504 }
505
506 ensure!(region_found, RegionNotFoundSnafu { region_id });
507
508 let updated_table_route = new_table_route.update(updated_routes)?;
510
511 let (txn, _) =
513 self.storage
514 .build_update_txn(table_id, ¤t_table_route, &updated_table_route)?;
515
516 let result = self.storage.kv_backend.txn(txn).await?;
517
518 ensure!(
519 result.succeeded,
520 MetadataCorruptionSnafu {
521 err_msg: format!(
522 "Failed to update staging state for region {}: CAS operation failed",
523 region_id
524 ),
525 }
526 );
527
528 Ok(())
529 }
530
531 pub async fn is_region_staging(&self, region_id: store_api::storage::RegionId) -> Result<bool> {
535 let table_id = region_id.table_id();
536
537 let table_route = self.storage.get(table_id).await?;
538
539 match table_route {
540 Some(route) if route.is_physical() => {
541 let region_routes = route.region_routes()?;
542 for route in region_routes {
543 if route.region.id == region_id {
544 return Ok(route.is_leader_staging());
545 }
546 }
547 Ok(false)
548 }
549 _ => Ok(false),
550 }
551 }
552
553 pub fn table_route_storage(&self) -> &TableRouteStorage {
555 &self.storage
556 }
557}
558
559pub struct TableRouteStorage {
561 kv_backend: KvBackendRef,
562}
563
564pub type TableRouteValueDecodeResult = Result<Option<DeserializedValueWithBytes<TableRouteValue>>>;
565
566impl TableRouteStorage {
567 pub fn new(kv_backend: KvBackendRef) -> Self {
568 Self { kv_backend }
569 }
570
571 pub fn build_create_txn(
574 &self,
575 table_id: TableId,
576 table_route_value: &TableRouteValue,
577 ) -> Result<(
578 Txn,
579 impl FnOnce(&mut TxnOpGetResponseSet) -> TableRouteValueDecodeResult + use<>,
580 )> {
581 let key = TableRouteKey::new(table_id);
582 let raw_key = key.to_bytes();
583
584 let txn = Txn::put_if_not_exists(raw_key.clone(), table_route_value.try_as_raw_value()?);
585
586 Ok((
587 txn,
588 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
589 ))
590 }
591
592 pub fn build_update_txn(
597 &self,
598 table_id: TableId,
599 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
600 new_table_route_value: &TableRouteValue,
601 ) -> Result<(
602 Txn,
603 impl FnOnce(&mut TxnOpGetResponseSet) -> TableRouteValueDecodeResult + use<>,
604 )> {
605 let key = TableRouteKey::new(table_id);
606 let raw_key = key.to_bytes();
607 let raw_value = current_table_route_value.get_raw_bytes();
608 let new_raw_value: Vec<u8> = new_table_route_value.try_as_raw_value()?;
609
610 let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
611
612 Ok((
613 txn,
614 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
615 ))
616 }
617
618 pub async fn get(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
620 let mut table_route = self.get_inner(table_id).await?;
621 if let Some(table_route) = &mut table_route {
622 self.remap_route_address(table_route).await?;
623 };
624
625 Ok(table_route)
626 }
627
628 async fn get_inner(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
629 let key = TableRouteKey::new(table_id);
630 self.kv_backend
631 .get(&key.to_bytes())
632 .await?
633 .map(|kv| TableRouteValue::try_from_raw_value(&kv.value))
634 .transpose()
635 }
636
637 pub async fn get_with_raw_bytes(
639 &self,
640 table_id: TableId,
641 ) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
642 let mut table_route = self.get_with_raw_bytes_inner(table_id).await?;
643 if let Some(table_route) = &mut table_route {
644 self.remap_route_address(table_route).await?;
645 };
646
647 Ok(table_route)
648 }
649
650 async fn get_with_raw_bytes_inner(
651 &self,
652 table_id: TableId,
653 ) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
654 let key = TableRouteKey::new(table_id);
655 self.kv_backend
656 .get(&key.to_bytes())
657 .await?
658 .map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value))
659 .transpose()
660 }
661
662 pub async fn batch_get(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRouteValue>>> {
664 let raw_table_routes = self.batch_get_inner(table_ids).await?;
665
666 Ok(raw_table_routes
667 .into_iter()
668 .map(|v| v.map(|x| x.inner))
669 .collect())
670 }
671
672 pub async fn batch_get_with_raw_bytes(
677 &self,
678 table_ids: &[TableId],
679 ) -> Result<Vec<Option<DeserializedValueWithBytes<TableRouteValue>>>> {
680 let mut raw_table_routes = self.batch_get_inner(table_ids).await?;
681 self.remap_routes_addresses(&mut raw_table_routes).await?;
682
683 Ok(raw_table_routes)
684 }
685
686 async fn batch_get_inner(
687 &self,
688 table_ids: &[TableId],
689 ) -> Result<Vec<Option<DeserializedValueWithBytes<TableRouteValue>>>> {
690 let keys = table_ids
691 .iter()
692 .map(|id| TableRouteKey::new(*id).to_bytes())
693 .collect::<Vec<_>>();
694 let resp = self
695 .kv_backend
696 .batch_get(BatchGetRequest { keys: keys.clone() })
697 .await?;
698
699 let kvs = resp
700 .kvs
701 .into_iter()
702 .map(|kv| (kv.key, kv.value))
703 .collect::<HashMap<_, _>>();
704 keys.into_iter()
705 .map(|key| {
706 if let Some(value) = kvs.get(&key) {
707 Ok(Some(DeserializedValueWithBytes::from_inner_slice(value)?))
708 } else {
709 Ok(None)
710 }
711 })
712 .collect()
713 }
714
715 async fn remap_routes_addresses(
716 &self,
717 table_routes: &mut [Option<DeserializedValueWithBytes<TableRouteValue>>],
718 ) -> Result<()> {
719 let keys = table_routes
720 .iter()
721 .flat_map(|table_route| {
722 table_route
723 .as_ref()
724 .map(|x| extract_address_keys(&x.inner))
725 .unwrap_or_default()
726 })
727 .collect::<HashSet<_>>()
728 .into_iter()
729 .collect();
730 let node_addrs = self.get_node_addresses(keys).await?;
731 for table_route in table_routes.iter_mut().flatten() {
732 set_addresses(&node_addrs, table_route)?;
733 }
734
735 Ok(())
736 }
737
738 async fn remap_route_address(&self, table_route: &mut TableRouteValue) -> Result<()> {
739 let keys = extract_address_keys(table_route).into_iter().collect();
740 let node_addrs = self.get_node_addresses(keys).await?;
741 set_addresses(&node_addrs, table_route)?;
742
743 Ok(())
744 }
745
746 async fn get_node_addresses(
747 &self,
748 keys: Vec<Vec<u8>>,
749 ) -> Result<HashMap<u64, NodeAddressValue>> {
750 if keys.is_empty() {
751 return Ok(HashMap::default());
752 }
753
754 self.kv_backend
755 .batch_get(BatchGetRequest { keys })
756 .await?
757 .kvs
758 .into_iter()
759 .map(|kv| {
760 let node_id = NodeAddressKey::from_bytes(&kv.key)?.node_id;
761 let node_addr = NodeAddressValue::try_from_raw_value(&kv.value)?;
762 Ok((node_id, node_addr))
763 })
764 .collect()
765 }
766}
767
768fn set_addresses(
769 node_addrs: &HashMap<u64, NodeAddressValue>,
770 table_route: &mut TableRouteValue,
771) -> Result<()> {
772 let TableRouteValue::Physical(physical_table_route) = table_route else {
773 return Ok(());
774 };
775
776 for region_route in &mut physical_table_route.region_routes {
777 if let Some(leader) = &mut region_route.leader_peer
778 && let Some(node_addr) = node_addrs.get(&leader.id)
779 {
780 leader.addr = node_addr.peer.addr.clone();
781 }
782 for follower in &mut region_route.follower_peers {
783 if let Some(node_addr) = node_addrs.get(&follower.id) {
784 follower.addr = node_addr.peer.addr.clone();
785 }
786 }
787 }
788
789 Ok(())
790}
791
792fn extract_address_keys(table_route: &TableRouteValue) -> HashSet<Vec<u8>> {
793 let TableRouteValue::Physical(physical_table_route) = table_route else {
794 return HashSet::default();
795 };
796
797 physical_table_route
798 .region_routes
799 .iter()
800 .flat_map(|region_route| {
801 region_route
802 .follower_peers
803 .iter()
804 .map(|peer| NodeAddressKey::with_datanode(peer.id).to_bytes())
805 .chain(
806 region_route
807 .leader_peer
808 .as_ref()
809 .map(|leader| NodeAddressKey::with_datanode(leader.id).to_bytes()),
810 )
811 })
812 .collect()
813}
814
815#[cfg(test)]
816mod tests {
817 use std::sync::Arc;
818
819 use super::*;
820 use crate::kv_backend::memory::MemoryKvBackend;
821 use crate::kv_backend::{KvBackend, TxnService};
822 use crate::peer::Peer;
823 use crate::rpc::router::Region;
824 use crate::rpc::store::PutRequest;
825
826 #[test]
827 fn test_table_route_compatibility() {
828 let old_raw_v = r#"{"region_routes":[{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]},{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]}],"version":0}"#;
829 let v = TableRouteValue::try_from_raw_value(old_raw_v.as_bytes()).unwrap();
830
831 let expected_table_route = TableRouteValue::Physical(PhysicalTableRouteValue {
832 region_routes: vec![
833 RegionRoute {
834 region: Region {
835 id: RegionId::new(0, 1),
836 name: "r1".to_string(),
837 partition: None,
838 attrs: Default::default(),
839 partition_expr: Default::default(),
840 },
841 leader_peer: Some(Peer {
842 id: 2,
843 addr: "a2".to_string(),
844 }),
845 follower_peers: vec![],
846 leader_state: None,
847 leader_down_since: None,
848 },
849 RegionRoute {
850 region: Region {
851 id: RegionId::new(0, 1),
852 name: "r1".to_string(),
853 partition: None,
854 attrs: Default::default(),
855 partition_expr: Default::default(),
856 },
857 leader_peer: Some(Peer {
858 id: 2,
859 addr: "a2".to_string(),
860 }),
861 follower_peers: vec![],
862 leader_state: None,
863 leader_down_since: None,
864 },
865 ],
866 version: 0,
867 });
868
869 assert_eq!(v, expected_table_route);
870 }
871
872 #[test]
873 fn test_key_serialization() {
874 let key = TableRouteKey::new(42);
875 let raw_key = key.to_bytes();
876 assert_eq!(raw_key, b"__table_route/42");
877 }
878
879 #[test]
880 fn test_key_deserialization() {
881 let expected = TableRouteKey::new(42);
882 let key = TableRouteKey::from_bytes(b"__table_route/42").unwrap();
883 assert_eq!(key, expected);
884 }
885
886 #[tokio::test]
887 async fn test_table_route_storage_get_with_raw_bytes_empty() {
888 let kv = Arc::new(MemoryKvBackend::default());
889 let table_route_storage = TableRouteStorage::new(kv);
890 let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
891 assert!(table_route.is_none());
892 }
893
894 #[tokio::test]
895 async fn test_table_route_storage_get_with_raw_bytes() {
896 let kv = Arc::new(MemoryKvBackend::default());
897 let table_route_storage = TableRouteStorage::new(kv.clone());
898 let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
899 assert!(table_route.is_none());
900 let table_route_manager = TableRouteManager::new(kv.clone());
901 let table_route_value = TableRouteValue::Logical(LogicalTableRouteValue {
902 physical_table_id: 1023,
903 region_ids: vec![RegionId::new(1023, 1)],
904 });
905 let (txn, _) = table_route_manager
906 .table_route_storage()
907 .build_create_txn(1024, &table_route_value)
908 .unwrap();
909 let r = kv.txn(txn).await.unwrap();
910 assert!(r.succeeded);
911 let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
912 assert!(table_route.is_some());
913 let got = table_route.unwrap().inner;
914 assert_eq!(got, table_route_value);
915 }
916
917 #[tokio::test]
918 async fn test_table_route_batch_get() {
919 let kv = Arc::new(MemoryKvBackend::default());
920 let table_route_storage = TableRouteStorage::new(kv.clone());
921 let routes = table_route_storage
922 .batch_get(&[1023, 1024, 1025])
923 .await
924 .unwrap();
925
926 assert!(routes.iter().all(Option::is_none));
927 let table_route_manager = TableRouteManager::new(kv.clone());
928 let routes = [
929 (
930 1024,
931 TableRouteValue::Logical(LogicalTableRouteValue {
932 physical_table_id: 1023,
933 region_ids: vec![RegionId::new(1023, 1)],
934 }),
935 ),
936 (
937 1025,
938 TableRouteValue::Logical(LogicalTableRouteValue {
939 physical_table_id: 1023,
940 region_ids: vec![RegionId::new(1023, 2)],
941 }),
942 ),
943 ];
944 for (table_id, route) in &routes {
945 let (txn, _) = table_route_manager
946 .table_route_storage()
947 .build_create_txn(*table_id, route)
948 .unwrap();
949 let r = kv.txn(txn).await.unwrap();
950 assert!(r.succeeded);
951 }
952
953 let results = table_route_storage
954 .batch_get(&[9999, 1025, 8888, 1024])
955 .await
956 .unwrap();
957 assert!(results[0].is_none());
958 assert_eq!(results[1].as_ref().unwrap(), &routes[1].1);
959 assert!(results[2].is_none());
960 assert_eq!(results[3].as_ref().unwrap(), &routes[0].1);
961 }
962
963 #[tokio::test]
964 async fn remap_route_address_updates_addresses() {
965 let kv = Arc::new(MemoryKvBackend::default());
966 let table_route_storage = TableRouteStorage::new(kv.clone());
967 let mut table_route = TableRouteValue::Physical(PhysicalTableRouteValue {
968 region_routes: vec![RegionRoute {
969 leader_peer: Some(Peer {
970 id: 1,
971 ..Default::default()
972 }),
973 follower_peers: vec![Peer {
974 id: 2,
975 ..Default::default()
976 }],
977 ..Default::default()
978 }],
979 version: 0,
980 });
981
982 kv.put(PutRequest {
983 key: NodeAddressKey::with_datanode(1).to_bytes(),
984 value: NodeAddressValue {
985 peer: Peer {
986 addr: "addr1".to_string(),
987 ..Default::default()
988 },
989 }
990 .try_as_raw_value()
991 .unwrap(),
992 ..Default::default()
993 })
994 .await
995 .unwrap();
996
997 table_route_storage
998 .remap_route_address(&mut table_route)
999 .await
1000 .unwrap();
1001
1002 if let TableRouteValue::Physical(physical_table_route) = table_route {
1003 assert_eq!(
1004 physical_table_route.region_routes[0]
1005 .leader_peer
1006 .as_ref()
1007 .unwrap()
1008 .addr,
1009 "addr1"
1010 );
1011 assert_eq!(
1012 physical_table_route.region_routes[0].follower_peers[0].addr,
1013 ""
1014 );
1015 } else {
1016 panic!("Expected PhysicalTableRouteValue");
1017 }
1018 }
1019}