1use std::collections::{BTreeMap, BTreeSet, HashMap};
16use std::fmt::Display;
17
18use common_telemetry::debug;
19use futures::TryStreamExt;
20use serde::{Deserialize, Serialize};
21use snafu::{OptionExt as _, ResultExt, ensure};
22use store_api::storage::RegionId;
23use table::metadata::TableId;
24
25use crate::error::{InvalidMetadataSnafu, Result, SerdeJsonSnafu};
26use crate::key::txn_helper::TxnOpGetResponseSet;
27use crate::key::{
28 DeserializedValueWithBytes, MetadataKey, MetadataValue, TABLE_REPART_KEY_PATTERN,
29 TABLE_REPART_PREFIX,
30};
31use crate::kv_backend::KvBackendRef;
32use crate::kv_backend::txn::Txn;
33use crate::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
34use crate::rpc::KeyValue;
35use crate::rpc::store::{BatchGetRequest, RangeRequest};
36
37#[derive(Debug, PartialEq)]
45pub struct TableRepartKey {
46 pub table_id: TableId,
48}
49
50impl TableRepartKey {
51 pub fn new(table_id: TableId) -> Self {
52 Self { table_id }
53 }
54
55 pub fn range_prefix() -> Vec<u8> {
57 format!("{}/", TABLE_REPART_PREFIX).into_bytes()
58 }
59}
60
61impl MetadataKey<'_, TableRepartKey> for TableRepartKey {
62 fn to_bytes(&self) -> Vec<u8> {
63 self.to_string().into_bytes()
64 }
65
66 fn from_bytes(bytes: &[u8]) -> Result<TableRepartKey> {
67 let key = std::str::from_utf8(bytes).map_err(|e| {
68 InvalidMetadataSnafu {
69 err_msg: format!(
70 "TableRepartKey '{}' is not a valid UTF8 string: {e}",
71 String::from_utf8_lossy(bytes)
72 ),
73 }
74 .build()
75 })?;
76 let captures = TABLE_REPART_KEY_PATTERN
77 .captures(key)
78 .context(InvalidMetadataSnafu {
79 err_msg: format!("Invalid TableRepartKey '{key}'"),
80 })?;
81 let table_id = captures[1].parse::<TableId>().unwrap();
83 Ok(TableRepartKey { table_id })
84 }
85}
86
87impl Display for TableRepartKey {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 write!(f, "{}/{}", TABLE_REPART_PREFIX, self.table_id)
90 }
91}
92
93#[derive(Debug, PartialEq, Serialize, Deserialize, Clone, Default)]
94pub struct TableRepartValue {
95 pub src_to_dst: BTreeMap<RegionId, BTreeSet<RegionId>>,
102}
103
104impl TableRepartValue {
105 pub fn new() -> Self {
107 Default::default()
108 }
109 pub fn update_mappings(&mut self, src: RegionId, dst: &[RegionId]) {
113 if dst.is_empty() {
114 return;
115 }
116 self.src_to_dst.entry(src).or_default().extend(dst);
117 }
118
119 pub fn remove_mappings(&mut self, src: RegionId, dsts: &[RegionId]) {
121 if let Some(dst_set) = self.src_to_dst.get_mut(&src) {
122 for dst in dsts {
123 dst_set.remove(dst);
124 }
125 if dst_set.is_empty() {
126 self.src_to_dst.remove(&src);
127 }
128 }
129 }
130}
131
132impl MetadataValue for TableRepartValue {
133 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
134 serde_json::from_slice::<TableRepartValue>(raw_value).context(SerdeJsonSnafu)
135 }
136
137 fn try_as_raw_value(&self) -> Result<Vec<u8>> {
138 serde_json::to_vec(self).context(SerdeJsonSnafu)
139 }
140}
141
142pub type TableRepartValueDecodeResult =
143 Result<Option<DeserializedValueWithBytes<TableRepartValue>>>;
144
145pub fn table_repart_decoder(kv: KeyValue) -> Result<(TableRepartKey, TableRepartValue)> {
147 let key = TableRepartKey::from_bytes(&kv.key)?;
148 let value = TableRepartValue::try_from_raw_value(&kv.value)?;
149 Ok((key, value))
150}
151
152pub struct TableRepartManager {
153 kv_backend: KvBackendRef,
154}
155
156impl TableRepartManager {
157 pub fn new(kv_backend: KvBackendRef) -> Self {
158 Self { kv_backend }
159 }
160
161 pub async fn table_reparts(&self) -> Result<Vec<(TableId, TableRepartValue)>> {
163 let prefix = TableRepartKey::range_prefix();
164 let req = RangeRequest::new().with_prefix(prefix);
165 let stream = PaginationStream::new(
166 self.kv_backend.clone(),
167 req,
168 DEFAULT_PAGE_SIZE,
169 table_repart_decoder,
170 )
171 .into_stream();
172
173 let res = stream.try_collect::<Vec<_>>().await?;
174 Ok(res
175 .into_iter()
176 .map(|(key, value)| (key.table_id, value))
177 .collect())
178 }
179
180 pub fn build_create_txn(
183 &self,
184 table_id: TableId,
185 table_repart_value: &TableRepartValue,
186 ) -> Result<(
187 Txn,
188 impl FnOnce(&mut TxnOpGetResponseSet) -> TableRepartValueDecodeResult + use<>,
189 )> {
190 let key = TableRepartKey::new(table_id);
191 let raw_key = key.to_bytes();
192
193 let txn = Txn::put_if_not_exists(raw_key.clone(), table_repart_value.try_as_raw_value()?);
194
195 Ok((
196 txn,
197 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
198 ))
199 }
200
201 pub fn build_update_txn(
205 &self,
206 table_id: TableId,
207 current_table_repart_value: &DeserializedValueWithBytes<TableRepartValue>,
208 new_table_repart_value: &TableRepartValue,
209 ) -> Result<(
210 Txn,
211 impl FnOnce(&mut TxnOpGetResponseSet) -> TableRepartValueDecodeResult + use<>,
212 )> {
213 let key = TableRepartKey::new(table_id);
214 let raw_key = key.to_bytes();
215 let raw_value = current_table_repart_value.get_raw_bytes();
216 let new_raw_value: Vec<u8> = new_table_repart_value.try_as_raw_value()?;
217
218 let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
219
220 Ok((
221 txn,
222 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
223 ))
224 }
225
226 pub async fn get(&self, table_id: TableId) -> Result<Option<TableRepartValue>> {
228 self.get_inner(table_id).await
229 }
230
231 async fn get_inner(&self, table_id: TableId) -> Result<Option<TableRepartValue>> {
232 let key = TableRepartKey::new(table_id);
233 self.kv_backend
234 .get(&key.to_bytes())
235 .await?
236 .map(|kv| TableRepartValue::try_from_raw_value(&kv.value))
237 .transpose()
238 }
239
240 pub async fn get_with_raw_bytes(
242 &self,
243 table_id: TableId,
244 ) -> Result<Option<DeserializedValueWithBytes<TableRepartValue>>> {
245 self.get_with_raw_bytes_inner(table_id).await
246 }
247
248 async fn get_with_raw_bytes_inner(
249 &self,
250 table_id: TableId,
251 ) -> Result<Option<DeserializedValueWithBytes<TableRepartValue>>> {
252 let key = TableRepartKey::new(table_id);
253 self.kv_backend
254 .get(&key.to_bytes())
255 .await?
256 .map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value))
257 .transpose()
258 }
259
260 pub async fn batch_get(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRepartValue>>> {
262 let raw_table_reparts = self.batch_get_inner(table_ids).await?;
263
264 Ok(raw_table_reparts
265 .into_iter()
266 .map(|v| v.map(|x| x.inner))
267 .collect())
268 }
269
270 pub async fn batch_get_with_raw_bytes(
272 &self,
273 table_ids: &[TableId],
274 ) -> Result<Vec<Option<DeserializedValueWithBytes<TableRepartValue>>>> {
275 self.batch_get_inner(table_ids).await
276 }
277
278 async fn batch_get_inner(
279 &self,
280 table_ids: &[TableId],
281 ) -> Result<Vec<Option<DeserializedValueWithBytes<TableRepartValue>>>> {
282 let keys = table_ids
283 .iter()
284 .map(|id| TableRepartKey::new(*id).to_bytes())
285 .collect::<Vec<_>>();
286 let resp = self
287 .kv_backend
288 .batch_get(BatchGetRequest { keys: keys.clone() })
289 .await?;
290
291 let kvs = resp
292 .kvs
293 .into_iter()
294 .map(|kv| (kv.key, kv.value))
295 .collect::<HashMap<_, _>>();
296 keys.into_iter()
297 .map(|key| {
298 if let Some(value) = kvs.get(&key) {
299 Ok(Some(DeserializedValueWithBytes::from_inner_slice(value)?))
300 } else {
301 Ok(None)
302 }
303 })
304 .collect()
305 }
306
307 pub async fn update_mappings(
310 &self,
311 table_id: TableId,
312 region_mapping: &HashMap<RegionId, Vec<RegionId>>,
313 ) -> Result<()> {
314 let current = self.get_with_raw_bytes(table_id).await?;
315 let mut new_value = current
316 .as_ref()
317 .map(|c| c.inner.clone())
318 .unwrap_or_else(TableRepartValue::new);
319
320 for (src, dsts) in region_mapping.iter() {
321 new_value.update_mappings(*src, dsts);
322 }
323
324 self.upsert_value(table_id, current, &new_value).await
325 }
326
327 pub async fn remove_mappings(
330 &self,
331 table_id: TableId,
332 region_mapping: &HashMap<RegionId, Vec<RegionId>>,
333 ) -> Result<()> {
334 let current = self
335 .get_with_raw_bytes(table_id)
336 .await?
337 .context(crate::error::TableRepartNotFoundSnafu { table_id })?;
338
339 let mut new_value = current.inner.clone();
340 for (src, dsts) in region_mapping.iter() {
341 new_value.remove_mappings(*src, dsts);
342 }
343
344 self.upsert_value(table_id, Some(current), &new_value).await
345 }
346
347 pub async fn upsert_value(
350 &self,
351 table_id: TableId,
352 current: Option<DeserializedValueWithBytes<TableRepartValue>>,
353 new_value: &TableRepartValue,
354 ) -> Result<()> {
355 if new_value.src_to_dst.is_empty() && current.is_none() {
356 return Ok(());
358 }
359
360 if let Some(current) = ¤t {
361 let (txn, _) = self.build_update_txn(table_id, current, new_value)?;
362 let result = self.kv_backend.txn(txn).await?;
363
364 ensure!(
365 result.succeeded,
366 crate::error::MetadataCorruptionSnafu {
367 err_msg: format!(
368 "Failed to update repartition mappings for table {}: CAS operation failed",
369 table_id
370 ),
371 }
372 );
373 } else {
374 let (txn, _) = self.build_create_txn(table_id, new_value)?;
375 let result = self.kv_backend.txn(txn).await?;
376
377 ensure!(
378 result.succeeded,
379 crate::error::MetadataCorruptionSnafu {
380 err_msg: format!(
381 "Failed to create repartition mappings for table {}: CAS operation failed",
382 table_id
383 ),
384 }
385 );
386 }
387
388 debug!(
389 "Upserted repartition value for table {}: current: {:?}, new: {:?}",
390 table_id, current, new_value
391 );
392 Ok(())
393 }
394
395 pub async fn get_dst_regions(
397 &self,
398 src_region: RegionId,
399 ) -> Result<Option<BTreeSet<RegionId>>> {
400 let table_id = src_region.table_id();
401 let table_repart = self.get(table_id).await?;
402 Ok(table_repart.and_then(|repart| repart.src_to_dst.get(&src_region).cloned()))
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use std::collections::BTreeMap;
409 use std::sync::Arc;
410
411 use super::*;
412 use crate::kv_backend::TxnService;
413 use crate::kv_backend::memory::MemoryKvBackend;
414
415 #[test]
416 fn test_table_repart_key_serialization() {
417 let key = TableRepartKey::new(42);
418 let raw_key = key.to_bytes();
419 assert_eq!(raw_key, b"__table_repart/42");
420 }
421
422 #[test]
423 fn test_table_repart_key_deserialization() {
424 let expected = TableRepartKey::new(42);
425 let key = TableRepartKey::from_bytes(b"__table_repart/42").unwrap();
426 assert_eq!(key, expected);
427 }
428
429 #[test]
430 fn test_table_repart_key_deserialization_invalid_utf8() {
431 let result = TableRepartKey::from_bytes(b"__table_repart/\xff");
432 assert!(result.is_err());
433 assert!(
434 result
435 .unwrap_err()
436 .to_string()
437 .contains("not a valid UTF8 string")
438 );
439 }
440
441 #[test]
442 fn test_table_repart_key_deserialization_invalid_format() {
443 let result = TableRepartKey::from_bytes(b"invalid_key_format");
444 assert!(result.is_err());
445 assert!(
446 result
447 .unwrap_err()
448 .to_string()
449 .contains("Invalid TableRepartKey")
450 );
451 }
452
453 #[test]
454 fn test_table_repart_value_serialization_deserialization() {
455 let mut src_to_dst = BTreeMap::new();
456 let src_region = RegionId::new(1, 1);
457 let dst_regions = vec![RegionId::new(1, 2), RegionId::new(1, 3)];
458 src_to_dst.insert(src_region, dst_regions.into_iter().collect());
459
460 let value = TableRepartValue { src_to_dst };
461 let serialized = value.try_as_raw_value().unwrap();
462 let deserialized = TableRepartValue::try_from_raw_value(&serialized).unwrap();
463
464 assert_eq!(value, deserialized);
465 }
466
467 #[test]
468 fn test_table_repart_value_update_mappings_new_src() {
469 let mut value = TableRepartValue {
470 src_to_dst: BTreeMap::new(),
471 };
472
473 let src = RegionId::new(1, 1);
474 let dst = vec![RegionId::new(1, 2), RegionId::new(1, 3)];
475
476 value.update_mappings(src, &dst);
477
478 assert_eq!(value.src_to_dst.len(), 1);
479 assert!(value.src_to_dst.contains_key(&src));
480 assert_eq!(value.src_to_dst.get(&src).unwrap().len(), 2);
481 assert!(
482 value
483 .src_to_dst
484 .get(&src)
485 .unwrap()
486 .contains(&RegionId::new(1, 2))
487 );
488 assert!(
489 value
490 .src_to_dst
491 .get(&src)
492 .unwrap()
493 .contains(&RegionId::new(1, 3))
494 );
495 }
496
497 #[test]
498 fn test_table_repart_value_update_mappings_existing_src() {
499 let mut value = TableRepartValue {
500 src_to_dst: BTreeMap::new(),
501 };
502
503 let src = RegionId::new(1, 1);
504 let initial_dst = vec![RegionId::new(1, 2)];
505 let additional_dst = vec![RegionId::new(1, 3), RegionId::new(1, 4)];
506
507 value.update_mappings(src, &initial_dst);
509 value.update_mappings(src, &additional_dst);
511
512 assert_eq!(value.src_to_dst.len(), 1);
513 assert_eq!(value.src_to_dst.get(&src).unwrap().len(), 3);
514 assert!(
515 value
516 .src_to_dst
517 .get(&src)
518 .unwrap()
519 .contains(&RegionId::new(1, 2))
520 );
521 assert!(
522 value
523 .src_to_dst
524 .get(&src)
525 .unwrap()
526 .contains(&RegionId::new(1, 3))
527 );
528 assert!(
529 value
530 .src_to_dst
531 .get(&src)
532 .unwrap()
533 .contains(&RegionId::new(1, 4))
534 );
535 }
536
537 #[test]
538 fn test_table_repart_value_remove_mappings_existing() {
539 let mut value = TableRepartValue {
540 src_to_dst: BTreeMap::new(),
541 };
542
543 let src = RegionId::new(1, 1);
544 let dst_regions = vec![
545 RegionId::new(1, 2),
546 RegionId::new(1, 3),
547 RegionId::new(1, 4),
548 ];
549 value.update_mappings(src, &dst_regions);
550
551 let to_remove = vec![RegionId::new(1, 2), RegionId::new(1, 3)];
553 value.remove_mappings(src, &to_remove);
554
555 assert_eq!(value.src_to_dst.len(), 1);
556 assert_eq!(value.src_to_dst.get(&src).unwrap().len(), 1);
557 assert!(
558 value
559 .src_to_dst
560 .get(&src)
561 .unwrap()
562 .contains(&RegionId::new(1, 4))
563 );
564 }
565
566 #[test]
567 fn test_table_repart_value_remove_mappings_all() {
568 let mut value = TableRepartValue {
569 src_to_dst: BTreeMap::new(),
570 };
571
572 let src = RegionId::new(1, 1);
573 let dst_regions = vec![RegionId::new(1, 2), RegionId::new(1, 3)];
574 value.update_mappings(src, &dst_regions);
575
576 value.remove_mappings(src, &dst_regions);
578
579 assert_eq!(value.src_to_dst.len(), 0);
580 }
581
582 #[test]
583 fn test_table_repart_value_remove_mappings_nonexistent() {
584 let mut value = TableRepartValue {
585 src_to_dst: BTreeMap::new(),
586 };
587
588 let src = RegionId::new(1, 1);
589 let dst_regions = vec![RegionId::new(1, 2)];
590 value.update_mappings(src, &dst_regions);
591
592 let nonexistent_dst = vec![RegionId::new(1, 3), RegionId::new(1, 4)];
594 value.remove_mappings(src, &nonexistent_dst);
595
596 assert_eq!(value.src_to_dst.len(), 1);
598 assert_eq!(value.src_to_dst.get(&src).unwrap().len(), 1);
599 assert!(
600 value
601 .src_to_dst
602 .get(&src)
603 .unwrap()
604 .contains(&RegionId::new(1, 2))
605 );
606 }
607
608 #[test]
609 fn test_table_repart_value_remove_mappings_nonexistent_src() {
610 let mut value = TableRepartValue {
611 src_to_dst: BTreeMap::new(),
612 };
613
614 let src = RegionId::new(1, 1);
615 let dst_regions = vec![RegionId::new(1, 2)];
616
617 value.remove_mappings(src, &dst_regions);
619
620 assert_eq!(value.src_to_dst.len(), 0);
622 }
623
624 #[tokio::test]
625 async fn test_table_repart_manager_get_empty() {
626 let kv = Arc::new(MemoryKvBackend::default());
627 let manager = TableRepartManager::new(kv);
628 let result = manager.get(1024).await.unwrap();
629 assert!(result.is_none());
630 }
631
632 #[tokio::test]
633 async fn test_table_repart_manager_get_with_raw_bytes_empty() {
634 let kv = Arc::new(MemoryKvBackend::default());
635 let manager = TableRepartManager::new(kv);
636 let result = manager.get_with_raw_bytes(1024).await.unwrap();
637 assert!(result.is_none());
638 }
639
640 #[tokio::test]
641 async fn test_table_repart_manager_create_and_get() {
642 let kv = Arc::new(MemoryKvBackend::default());
643 let manager = TableRepartManager::new(kv.clone());
644
645 let mut src_to_dst = BTreeMap::new();
646 let src_region = RegionId::new(1, 1);
647 let dst_regions = vec![RegionId::new(1, 2), RegionId::new(1, 3)];
648 src_to_dst.insert(src_region, dst_regions.into_iter().collect());
649
650 let value = TableRepartValue { src_to_dst };
651
652 let (txn, _) = manager.build_create_txn(1024, &value).unwrap();
654 let result = kv.txn(txn).await.unwrap();
655 assert!(result.succeeded);
656
657 let retrieved = manager.get(1024).await.unwrap().unwrap();
659 assert_eq!(retrieved, value);
660 }
661
662 #[tokio::test]
663 async fn test_table_repart_manager_update_txn() {
664 let kv = Arc::new(MemoryKvBackend::default());
665 let manager = TableRepartManager::new(kv.clone());
666
667 let initial_value = TableRepartValue {
668 src_to_dst: BTreeMap::new(),
669 };
670
671 let (create_txn, _) = manager.build_create_txn(1024, &initial_value).unwrap();
673 let result = kv.txn(create_txn).await.unwrap();
674 assert!(result.succeeded);
675
676 let current_value = manager.get_with_raw_bytes(1024).await.unwrap().unwrap();
678
679 let mut updated_src_to_dst = BTreeMap::new();
681 let src_region = RegionId::new(1, 1);
682 let dst_regions = vec![RegionId::new(1, 2)];
683 updated_src_to_dst.insert(src_region, dst_regions.into_iter().collect());
684 let updated_value = TableRepartValue {
685 src_to_dst: updated_src_to_dst,
686 };
687
688 let (update_txn, _) = manager
690 .build_update_txn(1024, ¤t_value, &updated_value)
691 .unwrap();
692 let result = kv.txn(update_txn).await.unwrap();
693 assert!(result.succeeded);
694
695 let retrieved = manager.get(1024).await.unwrap().unwrap();
697 assert_eq!(retrieved, updated_value);
698 }
699
700 #[tokio::test]
701 async fn test_table_repart_manager_batch_get() {
702 let kv = Arc::new(MemoryKvBackend::default());
703 let manager = TableRepartManager::new(kv.clone());
704
705 let table_reparts = vec![
707 (
708 1024,
709 TableRepartValue {
710 src_to_dst: {
711 let mut map = BTreeMap::new();
712 map.insert(
713 RegionId::new(1, 1),
714 vec![RegionId::new(1, 2)].into_iter().collect(),
715 );
716 map
717 },
718 },
719 ),
720 (
721 1025,
722 TableRepartValue {
723 src_to_dst: {
724 let mut map = BTreeMap::new();
725 map.insert(
726 RegionId::new(2, 1),
727 vec![RegionId::new(2, 2), RegionId::new(2, 3)]
728 .into_iter()
729 .collect(),
730 );
731 map
732 },
733 },
734 ),
735 ];
736
737 for (table_id, value) in &table_reparts {
738 let (txn, _) = manager.build_create_txn(*table_id, value).unwrap();
739 let result = kv.txn(txn).await.unwrap();
740 assert!(result.succeeded);
741 }
742
743 let results = manager.batch_get(&[1024, 1025, 1026]).await.unwrap();
745 assert_eq!(results.len(), 3);
746 assert_eq!(results[0].as_ref().unwrap(), &table_reparts[0].1);
747 assert_eq!(results[1].as_ref().unwrap(), &table_reparts[1].1);
748 assert!(results[2].is_none());
749 }
750
751 #[tokio::test]
752 async fn test_table_repart_manager_update_mappings() {
753 let kv = Arc::new(MemoryKvBackend::default());
754 let manager = TableRepartManager::new(kv.clone());
755
756 let initial_value = TableRepartValue {
758 src_to_dst: BTreeMap::new(),
759 };
760 let (txn, _) = manager.build_create_txn(1024, &initial_value).unwrap();
761 let result = kv.txn(txn).await.unwrap();
762 assert!(result.succeeded);
763
764 let src = RegionId::new(1024, 1);
766 let dst = vec![RegionId::new(1024, 2), RegionId::new(1024, 3)];
767 let region_mapping = HashMap::from([(src, dst)]);
768 manager
769 .update_mappings(1024, ®ion_mapping)
770 .await
771 .unwrap();
772
773 let retrieved = manager.get(1024).await.unwrap().unwrap();
775 assert_eq!(retrieved.src_to_dst.len(), 1);
776 assert!(retrieved.src_to_dst.contains_key(&src));
777 assert_eq!(retrieved.src_to_dst.get(&src).unwrap().len(), 2);
778 }
779
780 #[tokio::test]
781 async fn test_table_repart_manager_remove_mappings() {
782 let kv = Arc::new(MemoryKvBackend::default());
783 let manager = TableRepartManager::new(kv.clone());
784
785 let mut initial_src_to_dst = BTreeMap::new();
787 let src = RegionId::new(1024, 1);
788 let dst_regions = vec![
789 RegionId::new(1024, 2),
790 RegionId::new(1024, 3),
791 RegionId::new(1024, 4),
792 ];
793 initial_src_to_dst.insert(src, dst_regions.into_iter().collect());
794
795 let initial_value = TableRepartValue {
796 src_to_dst: initial_src_to_dst,
797 };
798 let (txn, _) = manager.build_create_txn(1024, &initial_value).unwrap();
799 let result = kv.txn(txn).await.unwrap();
800 assert!(result.succeeded);
801
802 let to_remove = vec![RegionId::new(1024, 2), RegionId::new(1024, 3)];
804 let region_mapping = HashMap::from([(src, to_remove)]);
805 manager
806 .remove_mappings(1024, ®ion_mapping)
807 .await
808 .unwrap();
809
810 let retrieved = manager.get(1024).await.unwrap().unwrap();
812 assert_eq!(retrieved.src_to_dst.len(), 1);
813 assert_eq!(retrieved.src_to_dst.get(&src).unwrap().len(), 1);
814 assert!(
815 retrieved
816 .src_to_dst
817 .get(&src)
818 .unwrap()
819 .contains(&RegionId::new(1024, 4))
820 );
821 }
822
823 #[tokio::test]
824 async fn test_table_repart_manager_get_dst_regions() {
825 let kv = Arc::new(MemoryKvBackend::default());
826 let manager = TableRepartManager::new(kv.clone());
827
828 let mut initial_src_to_dst = BTreeMap::new();
830 let src = RegionId::new(1024, 1);
831 let dst_regions = vec![RegionId::new(1024, 2), RegionId::new(1024, 3)];
832 initial_src_to_dst.insert(src, dst_regions.into_iter().collect());
833
834 let initial_value = TableRepartValue {
835 src_to_dst: initial_src_to_dst,
836 };
837 let (txn, _) = manager.build_create_txn(1024, &initial_value).unwrap();
838 let result = kv.txn(txn).await.unwrap();
839 assert!(result.succeeded);
840
841 let dst_regions = manager.get_dst_regions(src).await.unwrap();
843 assert!(dst_regions.is_some());
844 let dst_set = dst_regions.unwrap();
845 assert_eq!(dst_set.len(), 2);
846 assert!(dst_set.contains(&RegionId::new(1024, 2)));
847 assert!(dst_set.contains(&RegionId::new(1024, 3)));
848
849 let nonexistent_src = RegionId::new(1024, 99);
851 let result = manager.get_dst_regions(nonexistent_src).await.unwrap();
852 assert!(result.is_none());
853 }
854
855 #[tokio::test]
856 async fn test_table_repart_manager_operations_on_nonexistent_table() {
857 let kv = Arc::new(MemoryKvBackend::default());
858 let manager = TableRepartManager::new(kv);
859
860 let src = RegionId::new(1024, 1);
861 let dst = vec![RegionId::new(1024, 2)];
862
863 let region_mapping = HashMap::from([(src, dst.clone())]);
865 let result = manager.remove_mappings(1024, ®ion_mapping).await;
866 assert!(result.is_err());
867 let err_msg = result.unwrap_err().to_string();
868 assert!(
869 err_msg.contains("Failed to find table repartition metadata for table id 1024"),
870 "{err_msg}"
871 );
872
873 let region_mapping = HashMap::from([(src, dst)]);
875 manager
876 .update_mappings(1024, ®ion_mapping)
877 .await
878 .unwrap();
879 }
880
881 #[tokio::test]
882 async fn test_table_repart_manager_batch_get_with_raw_bytes() {
883 let kv = Arc::new(MemoryKvBackend::default());
884 let manager = TableRepartManager::new(kv.clone());
885
886 let value = TableRepartValue {
888 src_to_dst: {
889 let mut map = BTreeMap::new();
890 map.insert(
891 RegionId::new(1, 1),
892 vec![RegionId::new(1, 2)].into_iter().collect(),
893 );
894 map
895 },
896 };
897 let (txn, _) = manager.build_create_txn(1024, &value).unwrap();
898 let result = kv.txn(txn).await.unwrap();
899 assert!(result.succeeded);
900
901 let results = manager
903 .batch_get_with_raw_bytes(&[1024, 1025])
904 .await
905 .unwrap();
906 assert_eq!(results.len(), 2);
907 assert!(results[0].is_some());
908 assert!(results[1].is_none());
909
910 let retrieved = &results[0].as_ref().unwrap().inner;
911 assert_eq!(retrieved, &value);
912 }
913}