1use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::sync::Arc;
18
19use serde::{Deserialize, Deserializer, Serialize};
20use snafu::{OptionExt, ResultExt, ensure};
21use store_api::storage::{RegionId, RegionNumber};
22use table::metadata::TableId;
23
24use crate::error::{
25 InvalidMetadataSnafu, MetadataCorruptionSnafu, RegionNotFoundSnafu, Result, SerdeJsonSnafu,
26 TableRouteNotFoundSnafu, UnexpectedLogicalRouteTableSnafu,
27};
28use crate::key::node_address::{NodeAddressKey, NodeAddressValue};
29use crate::key::txn_helper::TxnOpGetResponseSet;
30use crate::key::{
31 DeserializedValueWithBytes, MetadataKey, MetadataValue, RegionDistribution,
32 TABLE_ROUTE_KEY_PATTERN, TABLE_ROUTE_PREFIX,
33};
34use crate::kv_backend::KvBackendRef;
35use crate::kv_backend::txn::Txn;
36use crate::rpc::router::{RegionRoute, region_distribution};
37use crate::rpc::store::BatchGetRequest;
38
39#[derive(Debug, PartialEq)]
43pub struct TableRouteKey {
44 pub table_id: TableId,
45}
46
47impl TableRouteKey {
48 pub fn new(table_id: TableId) -> Self {
49 Self { table_id }
50 }
51
52 pub fn range_prefix() -> Vec<u8> {
54 format!("{}/", TABLE_ROUTE_PREFIX).into_bytes()
55 }
56}
57
58#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
59#[serde(tag = "type", rename_all = "snake_case")]
60pub enum TableRouteValue {
61 Physical(PhysicalTableRouteValue),
62 Logical(LogicalTableRouteValue),
63}
64
65#[derive(Debug, PartialEq, Serialize, Clone, Default)]
66pub struct PhysicalTableRouteValue {
67 pub region_routes: Vec<RegionRoute>,
69 pub max_region_number: RegionNumber,
73 version: u64,
75}
76
77impl<'de> Deserialize<'de> for PhysicalTableRouteValue {
78 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
79 where
80 D: Deserializer<'de>,
81 {
82 #[derive(Deserialize)]
83 struct Helper {
84 region_routes: Vec<RegionRoute>,
85 #[serde(default)]
86 max_region_number: Option<RegionNumber>,
87 version: u64,
88 }
89
90 let mut helper = Helper::deserialize(deserializer)?;
91 if helper.max_region_number.is_none() {
93 let max_region = helper
94 .region_routes
95 .iter()
96 .map(|r| r.region.id.region_number())
97 .max()
98 .unwrap_or_default();
99 helper.max_region_number = Some(max_region);
100 }
101
102 Ok(PhysicalTableRouteValue {
103 region_routes: helper.region_routes,
104 max_region_number: helper.max_region_number.unwrap_or_default(),
105 version: helper.version,
106 })
107 }
108}
109
110#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
111pub struct LogicalTableRouteValue {
112 physical_table_id: TableId,
113}
114
115impl TableRouteValue {
116 pub(crate) fn new(
119 table_id: TableId,
120 physical_table_id: TableId,
121 region_routes: Vec<RegionRoute>,
122 ) -> Self {
123 if table_id == physical_table_id {
124 TableRouteValue::physical(region_routes)
125 } else {
126 TableRouteValue::logical(physical_table_id)
127 }
128 }
129
130 pub fn physical(region_routes: Vec<RegionRoute>) -> Self {
131 Self::Physical(PhysicalTableRouteValue::new(region_routes))
132 }
133
134 pub fn logical(physical_table_id: TableId) -> Self {
135 Self::Logical(LogicalTableRouteValue::new(physical_table_id))
136 }
137
138 pub fn update(&self, region_routes: Vec<RegionRoute>) -> Result<Self> {
140 ensure!(
141 self.is_physical(),
142 UnexpectedLogicalRouteTableSnafu {
143 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
144 }
145 );
146 let physical_table_route = self.as_physical_table_route_ref();
147 let original_max_region_number = physical_table_route.max_region_number;
148 let new_max_region_number = region_routes
149 .iter()
150 .map(|r| r.region.id.region_number())
151 .max()
152 .unwrap_or_default();
153 let version = physical_table_route.version;
154 Ok(Self::Physical(PhysicalTableRouteValue {
155 region_routes,
156 max_region_number: original_max_region_number.max(new_max_region_number),
159 version: version + 1,
160 }))
161 }
162
163 #[cfg(any(test, feature = "testing"))]
167 pub fn version(&self) -> Result<u64> {
168 ensure!(
169 self.is_physical(),
170 UnexpectedLogicalRouteTableSnafu {
171 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
172 }
173 );
174 Ok(self.as_physical_table_route_ref().version)
175 }
176
177 pub fn region_route(&self, region_id: RegionId) -> Result<Option<RegionRoute>> {
181 ensure!(
182 self.is_physical(),
183 UnexpectedLogicalRouteTableSnafu {
184 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
185 }
186 );
187 Ok(self
188 .as_physical_table_route_ref()
189 .region_routes
190 .iter()
191 .find(|route| route.region.id == region_id)
192 .cloned())
193 }
194
195 pub fn is_physical(&self) -> bool {
197 matches!(self, TableRouteValue::Physical(_))
198 }
199
200 pub fn region_routes(&self) -> Result<&Vec<RegionRoute>> {
202 ensure!(
203 self.is_physical(),
204 UnexpectedLogicalRouteTableSnafu {
205 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
206 }
207 );
208 Ok(&self.as_physical_table_route_ref().region_routes)
209 }
210
211 pub fn max_region_number(&self) -> Result<RegionNumber> {
216 ensure!(
217 self.is_physical(),
218 UnexpectedLogicalRouteTableSnafu {
219 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
220 }
221 );
222 Ok(self.as_physical_table_route_ref().max_region_number)
223 }
224
225 fn as_physical_table_route_ref(&self) -> &PhysicalTableRouteValue {
230 match self {
231 TableRouteValue::Physical(x) => x,
232 _ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
233 }
234 }
235
236 pub fn into_physical_table_route(self) -> PhysicalTableRouteValue {
241 match self {
242 TableRouteValue::Physical(x) => x,
243 _ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
244 }
245 }
246
247 pub fn into_logical_table_route(self) -> LogicalTableRouteValue {
252 match self {
253 TableRouteValue::Logical(x) => x,
254 _ => unreachable!("Mistakenly been treated as a Logical TableRoute: {self:?}"),
255 }
256 }
257
258 pub fn region_numbers(&self) -> Vec<RegionNumber> {
259 match self {
260 TableRouteValue::Physical(x) => x
261 .region_routes
262 .iter()
263 .map(|region_route| region_route.region.id.region_number())
264 .collect(),
265 TableRouteValue::Logical(_) => {
266 vec![]
267 }
268 }
269 }
270}
271
272impl MetadataValue for TableRouteValue {
273 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
274 let r = serde_json::from_slice::<TableRouteValue>(raw_value);
275 match r {
276 Err(e) if e.is_data() => Ok(Self::Physical(
278 serde_json::from_slice::<PhysicalTableRouteValue>(raw_value)
279 .context(SerdeJsonSnafu)?,
280 )),
281 Ok(x) => Ok(x),
282 Err(e) => Err(e).context(SerdeJsonSnafu),
283 }
284 }
285
286 fn try_as_raw_value(&self) -> Result<Vec<u8>> {
287 serde_json::to_vec(self).context(SerdeJsonSnafu)
288 }
289}
290
291impl PhysicalTableRouteValue {
292 pub fn version(&self) -> u64 {
294 self.version
295 }
296 pub fn new(region_routes: Vec<RegionRoute>) -> Self {
297 let max_region_number = region_routes
298 .iter()
299 .map(|r| r.region.id.region_number())
300 .max()
301 .unwrap_or_default();
302 Self {
303 region_routes,
304 max_region_number,
305 version: 0,
306 }
307 }
308}
309
310impl LogicalTableRouteValue {
311 pub fn new(physical_table_id: TableId) -> Self {
312 Self { physical_table_id }
313 }
314
315 pub fn physical_table_id(&self) -> TableId {
316 self.physical_table_id
317 }
318}
319
320impl MetadataKey<'_, TableRouteKey> for TableRouteKey {
321 fn to_bytes(&self) -> Vec<u8> {
322 self.to_string().into_bytes()
323 }
324
325 fn from_bytes(bytes: &[u8]) -> Result<TableRouteKey> {
326 let key = std::str::from_utf8(bytes).map_err(|e| {
327 InvalidMetadataSnafu {
328 err_msg: format!(
329 "TableRouteKey '{}' is not a valid UTF8 string: {e}",
330 String::from_utf8_lossy(bytes)
331 ),
332 }
333 .build()
334 })?;
335 let captures = TABLE_ROUTE_KEY_PATTERN
336 .captures(key)
337 .context(InvalidMetadataSnafu {
338 err_msg: format!("Invalid TableRouteKey '{key}'"),
339 })?;
340 let table_id = captures[1].parse::<TableId>().unwrap();
342 Ok(TableRouteKey { table_id })
343 }
344}
345
346impl Display for TableRouteKey {
347 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
348 write!(f, "{}/{}", TABLE_ROUTE_PREFIX, self.table_id)
349 }
350}
351
352pub type TableRouteManagerRef = Arc<TableRouteManager>;
353
354pub struct TableRouteManager {
355 storage: TableRouteStorage,
356}
357
358impl TableRouteManager {
359 pub fn new(kv_backend: KvBackendRef) -> Self {
360 Self {
361 storage: TableRouteStorage::new(kv_backend),
362 }
363 }
364
365 pub async fn get_physical_table_id(
370 &self,
371 logical_or_physical_table_id: TableId,
372 ) -> Result<TableId> {
373 let table_route = self
374 .storage
375 .get_inner(logical_or_physical_table_id)
376 .await?
377 .context(TableRouteNotFoundSnafu {
378 table_id: logical_or_physical_table_id,
379 })?;
380
381 match table_route {
382 TableRouteValue::Physical(_) => Ok(logical_or_physical_table_id),
383 TableRouteValue::Logical(x) => Ok(x.physical_table_id()),
384 }
385 }
386
387 pub async fn get_physical_table_route(
393 &self,
394 logical_or_physical_table_id: TableId,
395 ) -> Result<(TableId, PhysicalTableRouteValue)> {
396 let table_route = self
397 .storage
398 .get(logical_or_physical_table_id)
399 .await?
400 .context(TableRouteNotFoundSnafu {
401 table_id: logical_or_physical_table_id,
402 })?;
403
404 match table_route {
405 TableRouteValue::Physical(x) => Ok((logical_or_physical_table_id, x)),
406 TableRouteValue::Logical(x) => {
407 let physical_table_id = x.physical_table_id();
408 let physical_table_route = self.storage.get(physical_table_id).await?.context(
409 TableRouteNotFoundSnafu {
410 table_id: physical_table_id,
411 },
412 )?;
413 let physical_table_route = physical_table_route.into_physical_table_route();
414 Ok((physical_table_id, physical_table_route))
415 }
416 }
417 }
418
419 pub async fn batch_get_physical_table_routes(
426 &self,
427 logical_or_physical_table_ids: &[TableId],
428 ) -> Result<HashMap<TableId, PhysicalTableRouteValue>> {
429 let table_routes = self
430 .storage
431 .batch_get(logical_or_physical_table_ids)
432 .await?;
433 let table_routes = table_routes
435 .into_iter()
436 .zip(logical_or_physical_table_ids)
437 .filter_map(|(route, id)| route.map(|route| (*id, route)))
438 .collect::<HashMap<_, _>>();
439
440 let mut physical_table_routes = HashMap::with_capacity(table_routes.len());
441 let mut logical_table_ids = HashMap::with_capacity(table_routes.len());
442
443 for (table_id, table_route) in table_routes {
444 match table_route {
445 TableRouteValue::Physical(x) => {
446 physical_table_routes.insert(table_id, x);
447 }
448 TableRouteValue::Logical(x) => {
449 logical_table_ids.insert(table_id, x.physical_table_id());
450 }
451 }
452 }
453
454 if logical_table_ids.is_empty() {
455 return Ok(physical_table_routes);
456 }
457
458 let physical_table_ids = logical_table_ids
460 .values()
461 .cloned()
462 .collect::<HashSet<_>>()
463 .into_iter()
464 .collect::<Vec<_>>();
465 let table_routes = self
466 .table_route_storage()
467 .batch_get(&physical_table_ids)
468 .await?;
469 let table_routes = table_routes
470 .into_iter()
471 .zip(physical_table_ids)
472 .filter_map(|(route, id)| route.map(|route| (id, route)))
473 .collect::<HashMap<_, _>>();
474
475 for (logical_table_id, physical_table_id) in logical_table_ids {
476 let table_route =
477 table_routes
478 .get(&physical_table_id)
479 .context(TableRouteNotFoundSnafu {
480 table_id: physical_table_id,
481 })?;
482 match table_route {
483 TableRouteValue::Physical(x) => {
484 physical_table_routes.insert(logical_table_id, x.clone());
485 }
486 TableRouteValue::Logical(x) => {
487 MetadataCorruptionSnafu {
489 err_msg: format!(
490 "logical table {} {:?} cannot be resolved to a physical table.",
491 logical_table_id, x
492 ),
493 }
494 .fail()?;
495 }
496 }
497 }
498
499 Ok(physical_table_routes)
500 }
501
502 pub async fn get_region_distribution(
504 &self,
505 table_id: TableId,
506 ) -> Result<Option<RegionDistribution>> {
507 self.storage
508 .get(table_id)
509 .await?
510 .map(|table_route| Ok(region_distribution(table_route.region_routes()?)))
511 .transpose()
512 }
513
514 pub async fn set_region_staging_state(
520 &self,
521 region_id: store_api::storage::RegionId,
522 staging: bool,
523 ) -> Result<()> {
524 let table_id = region_id.table_id();
525
526 let current_table_route = self
528 .storage
529 .get_with_raw_bytes(table_id)
530 .await?
531 .context(TableRouteNotFoundSnafu { table_id })?;
532
533 let new_table_route = current_table_route.inner.clone();
535
536 ensure!(
538 new_table_route.is_physical(),
539 UnexpectedLogicalRouteTableSnafu {
540 err_msg: format!("Cannot set staging state for logical table {table_id}"),
541 }
542 );
543
544 let region_routes = new_table_route.region_routes()?.clone();
545 let mut updated_routes = region_routes.clone();
546
547 let mut region_found = false;
550 for route in &mut updated_routes {
551 if route.region.id == region_id {
552 if staging {
553 route.set_leader_staging();
554 } else {
555 route.clear_leader_staging();
556 }
557 region_found = true;
558 break;
559 }
560 }
561
562 ensure!(region_found, RegionNotFoundSnafu { region_id });
563
564 let updated_table_route = new_table_route.update(updated_routes)?;
566
567 let (txn, _) =
569 self.storage
570 .build_update_txn(table_id, ¤t_table_route, &updated_table_route)?;
571
572 let result = self.storage.kv_backend.txn(txn).await?;
573
574 ensure!(
575 result.succeeded,
576 MetadataCorruptionSnafu {
577 err_msg: format!(
578 "Failed to update staging state for region {}: CAS operation failed",
579 region_id
580 ),
581 }
582 );
583
584 Ok(())
585 }
586
587 pub async fn is_region_staging(&self, region_id: store_api::storage::RegionId) -> Result<bool> {
591 let table_id = region_id.table_id();
592
593 let table_route = self.storage.get(table_id).await?;
594
595 match table_route {
596 Some(route) if route.is_physical() => {
597 let region_routes = route.region_routes()?;
598 for route in region_routes {
599 if route.region.id == region_id {
600 return Ok(route.is_leader_staging());
601 }
602 }
603 Ok(false)
604 }
605 _ => Ok(false),
606 }
607 }
608
609 pub fn table_route_storage(&self) -> &TableRouteStorage {
611 &self.storage
612 }
613}
614
615pub struct TableRouteStorage {
617 kv_backend: KvBackendRef,
618}
619
620pub type TableRouteValueDecodeResult = Result<Option<DeserializedValueWithBytes<TableRouteValue>>>;
621
622impl TableRouteStorage {
623 pub fn new(kv_backend: KvBackendRef) -> Self {
624 Self { kv_backend }
625 }
626
627 pub fn build_create_txn(
630 &self,
631 table_id: TableId,
632 table_route_value: &TableRouteValue,
633 ) -> Result<(
634 Txn,
635 impl FnOnce(&mut TxnOpGetResponseSet) -> TableRouteValueDecodeResult + use<>,
636 )> {
637 let key = TableRouteKey::new(table_id);
638 let raw_key = key.to_bytes();
639
640 let txn = Txn::put_if_not_exists(raw_key.clone(), table_route_value.try_as_raw_value()?);
641
642 Ok((
643 txn,
644 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
645 ))
646 }
647
648 pub fn build_update_txn(
653 &self,
654 table_id: TableId,
655 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
656 new_table_route_value: &TableRouteValue,
657 ) -> Result<(
658 Txn,
659 impl FnOnce(&mut TxnOpGetResponseSet) -> TableRouteValueDecodeResult + use<>,
660 )> {
661 let key = TableRouteKey::new(table_id);
662 let raw_key = key.to_bytes();
663 let raw_value = current_table_route_value.get_raw_bytes();
664 let new_raw_value: Vec<u8> = new_table_route_value.try_as_raw_value()?;
665
666 let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
667
668 Ok((
669 txn,
670 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
671 ))
672 }
673
674 pub async fn get(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
676 let mut table_route = self.get_inner(table_id).await?;
677 if let Some(table_route) = &mut table_route {
678 self.remap_route_address(table_route).await?;
679 };
680
681 Ok(table_route)
682 }
683
684 async fn get_inner(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
685 let key = TableRouteKey::new(table_id);
686 self.kv_backend
687 .get(&key.to_bytes())
688 .await?
689 .map(|kv| TableRouteValue::try_from_raw_value(&kv.value))
690 .transpose()
691 }
692
693 pub async fn get_with_raw_bytes(
695 &self,
696 table_id: TableId,
697 ) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
698 let mut table_route = self.get_with_raw_bytes_inner(table_id).await?;
699 if let Some(table_route) = &mut table_route {
700 self.remap_route_address(table_route).await?;
701 };
702
703 Ok(table_route)
704 }
705
706 async fn get_with_raw_bytes_inner(
707 &self,
708 table_id: TableId,
709 ) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
710 let key = TableRouteKey::new(table_id);
711 self.kv_backend
712 .get(&key.to_bytes())
713 .await?
714 .map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value))
715 .transpose()
716 }
717
718 pub async fn batch_get(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRouteValue>>> {
720 let raw_table_routes = self.batch_get_inner(table_ids).await?;
721
722 Ok(raw_table_routes
723 .into_iter()
724 .map(|v| v.map(|x| x.inner))
725 .collect())
726 }
727
728 pub async fn batch_get_with_raw_bytes(
733 &self,
734 table_ids: &[TableId],
735 ) -> Result<Vec<Option<DeserializedValueWithBytes<TableRouteValue>>>> {
736 let mut raw_table_routes = self.batch_get_inner(table_ids).await?;
737 self.remap_routes_addresses(&mut raw_table_routes).await?;
738
739 Ok(raw_table_routes)
740 }
741
742 async fn batch_get_inner(
743 &self,
744 table_ids: &[TableId],
745 ) -> Result<Vec<Option<DeserializedValueWithBytes<TableRouteValue>>>> {
746 let keys = table_ids
747 .iter()
748 .map(|id| TableRouteKey::new(*id).to_bytes())
749 .collect::<Vec<_>>();
750 let resp = self
751 .kv_backend
752 .batch_get(BatchGetRequest { keys: keys.clone() })
753 .await?;
754
755 let kvs = resp
756 .kvs
757 .into_iter()
758 .map(|kv| (kv.key, kv.value))
759 .collect::<HashMap<_, _>>();
760 keys.into_iter()
761 .map(|key| {
762 if let Some(value) = kvs.get(&key) {
763 Ok(Some(DeserializedValueWithBytes::from_inner_slice(value)?))
764 } else {
765 Ok(None)
766 }
767 })
768 .collect()
769 }
770
771 async fn remap_routes_addresses(
772 &self,
773 table_routes: &mut [Option<DeserializedValueWithBytes<TableRouteValue>>],
774 ) -> Result<()> {
775 let keys = table_routes
776 .iter()
777 .flat_map(|table_route| {
778 table_route
779 .as_ref()
780 .map(|x| extract_address_keys(&x.inner))
781 .unwrap_or_default()
782 })
783 .collect::<HashSet<_>>()
784 .into_iter()
785 .collect();
786 let node_addrs = self.get_node_addresses(keys).await?;
787 for table_route in table_routes.iter_mut().flatten() {
788 set_addresses(&node_addrs, table_route)?;
789 }
790
791 Ok(())
792 }
793
794 async fn remap_route_address(&self, table_route: &mut TableRouteValue) -> Result<()> {
795 let keys = extract_address_keys(table_route).into_iter().collect();
796 let node_addrs = self.get_node_addresses(keys).await?;
797 set_addresses(&node_addrs, table_route)?;
798
799 Ok(())
800 }
801
802 async fn get_node_addresses(
803 &self,
804 keys: Vec<Vec<u8>>,
805 ) -> Result<HashMap<u64, NodeAddressValue>> {
806 if keys.is_empty() {
807 return Ok(HashMap::default());
808 }
809
810 self.kv_backend
811 .batch_get(BatchGetRequest { keys })
812 .await?
813 .kvs
814 .into_iter()
815 .map(|kv| {
816 let node_id = NodeAddressKey::from_bytes(&kv.key)?.node_id;
817 let node_addr = NodeAddressValue::try_from_raw_value(&kv.value)?;
818 Ok((node_id, node_addr))
819 })
820 .collect()
821 }
822}
823
824fn set_addresses(
825 node_addrs: &HashMap<u64, NodeAddressValue>,
826 table_route: &mut TableRouteValue,
827) -> Result<()> {
828 let TableRouteValue::Physical(physical_table_route) = table_route else {
829 return Ok(());
830 };
831
832 for region_route in &mut physical_table_route.region_routes {
833 if let Some(leader) = &mut region_route.leader_peer
834 && let Some(node_addr) = node_addrs.get(&leader.id)
835 {
836 leader.addr = node_addr.peer.addr.clone();
837 }
838 for follower in &mut region_route.follower_peers {
839 if let Some(node_addr) = node_addrs.get(&follower.id) {
840 follower.addr = node_addr.peer.addr.clone();
841 }
842 }
843 }
844
845 Ok(())
846}
847
848fn extract_address_keys(table_route: &TableRouteValue) -> HashSet<Vec<u8>> {
849 let TableRouteValue::Physical(physical_table_route) = table_route else {
850 return HashSet::default();
851 };
852
853 physical_table_route
854 .region_routes
855 .iter()
856 .flat_map(|region_route| {
857 region_route
858 .follower_peers
859 .iter()
860 .map(|peer| NodeAddressKey::with_datanode(peer.id).to_bytes())
861 .chain(
862 region_route
863 .leader_peer
864 .as_ref()
865 .map(|leader| NodeAddressKey::with_datanode(leader.id).to_bytes()),
866 )
867 })
868 .collect()
869}
870
871#[cfg(test)]
872mod tests {
873 use std::sync::Arc;
874
875 use super::*;
876 use crate::kv_backend::memory::MemoryKvBackend;
877 use crate::kv_backend::{KvBackend, TxnService};
878 use crate::peer::Peer;
879 use crate::rpc::router::Region;
880 use crate::rpc::store::PutRequest;
881
882 #[test]
883 fn test_update_table_route_max_region_number() {
884 let table_route = PhysicalTableRouteValue::new(vec![
885 RegionRoute {
886 region: Region {
887 id: RegionId::new(0, 1),
888 ..Default::default()
889 },
890 ..Default::default()
891 },
892 RegionRoute {
893 region: Region {
894 id: RegionId::new(0, 2),
895 ..Default::default()
896 },
897 ..Default::default()
898 },
899 ]);
900 assert_eq!(table_route.max_region_number, 2);
901
902 let new_table_route = TableRouteValue::Physical(table_route)
904 .update(vec![RegionRoute {
905 region: Region {
906 id: RegionId::new(0, 1),
907 ..Default::default()
908 },
909 ..Default::default()
910 }])
911 .unwrap();
912 assert_eq!(
913 new_table_route
914 .as_physical_table_route_ref()
915 .max_region_number,
916 2
917 );
918
919 let new_table_route = new_table_route
921 .update(vec![RegionRoute {
922 region: Region {
923 id: RegionId::new(0, 3),
924 ..Default::default()
925 },
926 ..Default::default()
927 }])
928 .unwrap()
929 .into_physical_table_route();
930 assert_eq!(new_table_route.max_region_number, 3);
931 }
932
933 #[test]
934 fn test_table_route_compatibility() {
935 let old_raw_v = r#"{"region_routes":[{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]},{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]}],"version":0}"#;
936 let v = TableRouteValue::try_from_raw_value(old_raw_v.as_bytes()).unwrap();
937
938 let expected_table_route = TableRouteValue::Physical(PhysicalTableRouteValue {
939 region_routes: vec![
940 RegionRoute {
941 region: Region {
942 id: RegionId::new(0, 1),
943 name: "r1".to_string(),
944 partition: None,
945 attrs: Default::default(),
946 partition_expr: Default::default(),
947 },
948 leader_peer: Some(Peer {
949 id: 2,
950 addr: "a2".to_string(),
951 }),
952 follower_peers: vec![],
953 leader_state: None,
954 leader_down_since: None,
955 },
956 RegionRoute {
957 region: Region {
958 id: RegionId::new(0, 1),
959 name: "r1".to_string(),
960 partition: None,
961 attrs: Default::default(),
962 partition_expr: Default::default(),
963 },
964 leader_peer: Some(Peer {
965 id: 2,
966 addr: "a2".to_string(),
967 }),
968 follower_peers: vec![],
969 leader_state: None,
970 leader_down_since: None,
971 },
972 ],
973 max_region_number: 1,
974 version: 0,
975 });
976
977 assert_eq!(v, expected_table_route);
978 }
979
980 #[test]
981 fn test_key_serialization() {
982 let key = TableRouteKey::new(42);
983 let raw_key = key.to_bytes();
984 assert_eq!(raw_key, b"__table_route/42");
985 }
986
987 #[test]
988 fn test_key_deserialization() {
989 let expected = TableRouteKey::new(42);
990 let key = TableRouteKey::from_bytes(b"__table_route/42").unwrap();
991 assert_eq!(key, expected);
992 }
993
994 #[tokio::test]
995 async fn test_table_route_storage_get_with_raw_bytes_empty() {
996 let kv = Arc::new(MemoryKvBackend::default());
997 let table_route_storage = TableRouteStorage::new(kv);
998 let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
999 assert!(table_route.is_none());
1000 }
1001
1002 #[tokio::test]
1003 async fn test_table_route_storage_get_with_raw_bytes() {
1004 let kv = Arc::new(MemoryKvBackend::default());
1005 let table_route_storage = TableRouteStorage::new(kv.clone());
1006 let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
1007 assert!(table_route.is_none());
1008 let table_route_manager = TableRouteManager::new(kv.clone());
1009 let table_route_value = TableRouteValue::Logical(LogicalTableRouteValue {
1010 physical_table_id: 1023,
1011 });
1012 let (txn, _) = table_route_manager
1013 .table_route_storage()
1014 .build_create_txn(1024, &table_route_value)
1015 .unwrap();
1016 let r = kv.txn(txn).await.unwrap();
1017 assert!(r.succeeded);
1018 let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap();
1019 assert!(table_route.is_some());
1020 let got = table_route.unwrap().inner;
1021 assert_eq!(got, table_route_value);
1022 }
1023
1024 #[tokio::test]
1025 async fn test_table_route_batch_get() {
1026 let kv = Arc::new(MemoryKvBackend::default());
1027 let table_route_storage = TableRouteStorage::new(kv.clone());
1028 let routes = table_route_storage
1029 .batch_get(&[1023, 1024, 1025])
1030 .await
1031 .unwrap();
1032
1033 assert!(routes.iter().all(Option::is_none));
1034 let table_route_manager = TableRouteManager::new(kv.clone());
1035 let routes = [
1036 (
1037 1024,
1038 TableRouteValue::Logical(LogicalTableRouteValue {
1039 physical_table_id: 1023,
1040 }),
1041 ),
1042 (
1043 1025,
1044 TableRouteValue::Logical(LogicalTableRouteValue {
1045 physical_table_id: 1023,
1046 }),
1047 ),
1048 ];
1049 for (table_id, route) in &routes {
1050 let (txn, _) = table_route_manager
1051 .table_route_storage()
1052 .build_create_txn(*table_id, route)
1053 .unwrap();
1054 let r = kv.txn(txn).await.unwrap();
1055 assert!(r.succeeded);
1056 }
1057
1058 let results = table_route_storage
1059 .batch_get(&[9999, 1025, 8888, 1024])
1060 .await
1061 .unwrap();
1062 assert!(results[0].is_none());
1063 assert_eq!(results[1].as_ref().unwrap(), &routes[1].1);
1064 assert!(results[2].is_none());
1065 assert_eq!(results[3].as_ref().unwrap(), &routes[0].1);
1066 }
1067
1068 #[tokio::test]
1069 async fn remap_route_address_updates_addresses() {
1070 let kv = Arc::new(MemoryKvBackend::default());
1071 let table_route_storage = TableRouteStorage::new(kv.clone());
1072 let mut table_route = TableRouteValue::Physical(PhysicalTableRouteValue {
1073 region_routes: vec![RegionRoute {
1074 leader_peer: Some(Peer {
1075 id: 1,
1076 ..Default::default()
1077 }),
1078 follower_peers: vec![Peer {
1079 id: 2,
1080 ..Default::default()
1081 }],
1082 ..Default::default()
1083 }],
1084 max_region_number: 0,
1085 version: 0,
1086 });
1087
1088 kv.put(PutRequest {
1089 key: NodeAddressKey::with_datanode(1).to_bytes(),
1090 value: NodeAddressValue {
1091 peer: Peer {
1092 addr: "addr1".to_string(),
1093 ..Default::default()
1094 },
1095 }
1096 .try_as_raw_value()
1097 .unwrap(),
1098 ..Default::default()
1099 })
1100 .await
1101 .unwrap();
1102
1103 table_route_storage
1104 .remap_route_address(&mut table_route)
1105 .await
1106 .unwrap();
1107
1108 if let TableRouteValue::Physical(physical_table_route) = table_route {
1109 assert_eq!(
1110 physical_table_route.region_routes[0]
1111 .leader_peer
1112 .as_ref()
1113 .unwrap()
1114 .addr,
1115 "addr1"
1116 );
1117 assert_eq!(
1118 physical_table_route.region_routes[0].follower_peers[0].addr,
1119 ""
1120 );
1121 } else {
1122 panic!("Expected PhysicalTableRouteValue");
1123 }
1124 }
1125}