1use std::collections::{BTreeMap, BTreeSet, HashMap};
16use std::fmt::Display;
17
18use serde::{Deserialize, Serialize};
19use snafu::{OptionExt as _, ResultExt, ensure};
20use store_api::storage::RegionId;
21use table::metadata::TableId;
22
23use crate::error::{InvalidMetadataSnafu, Result, SerdeJsonSnafu};
24use crate::key::txn_helper::TxnOpGetResponseSet;
25use crate::key::{
26 DeserializedValueWithBytes, MetadataKey, MetadataValue, TABLE_REPART_KEY_PATTERN,
27 TABLE_REPART_PREFIX,
28};
29use crate::kv_backend::KvBackendRef;
30use crate::kv_backend::txn::Txn;
31use crate::rpc::store::BatchGetRequest;
32
33#[derive(Debug, PartialEq)]
41pub struct TableRepartKey {
42 pub table_id: TableId,
44}
45
46impl TableRepartKey {
47 pub fn new(table_id: TableId) -> Self {
48 Self { table_id }
49 }
50
51 pub fn range_prefix() -> Vec<u8> {
53 format!("{}/", TABLE_REPART_PREFIX).into_bytes()
54 }
55}
56
57impl MetadataKey<'_, TableRepartKey> for TableRepartKey {
58 fn to_bytes(&self) -> Vec<u8> {
59 self.to_string().into_bytes()
60 }
61
62 fn from_bytes(bytes: &[u8]) -> Result<TableRepartKey> {
63 let key = std::str::from_utf8(bytes).map_err(|e| {
64 InvalidMetadataSnafu {
65 err_msg: format!(
66 "TableRepartKey '{}' is not a valid UTF8 string: {e}",
67 String::from_utf8_lossy(bytes)
68 ),
69 }
70 .build()
71 })?;
72 let captures = TABLE_REPART_KEY_PATTERN
73 .captures(key)
74 .context(InvalidMetadataSnafu {
75 err_msg: format!("Invalid TableRepartKey '{key}'"),
76 })?;
77 let table_id = captures[1].parse::<TableId>().unwrap();
79 Ok(TableRepartKey { table_id })
80 }
81}
82
83impl Display for TableRepartKey {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 write!(f, "{}/{}", TABLE_REPART_PREFIX, self.table_id)
86 }
87}
88
89#[derive(Debug, PartialEq, Serialize, Deserialize, Clone, Default)]
90pub struct TableRepartValue {
91 pub src_to_dst: BTreeMap<RegionId, BTreeSet<RegionId>>,
98}
99
100impl TableRepartValue {
101 pub fn new() -> Self {
103 Default::default()
104 }
105 pub fn update_mappings(&mut self, src: RegionId, dst: &[RegionId]) {
109 if dst.is_empty() {
110 return;
111 }
112 self.src_to_dst.entry(src).or_default().extend(dst);
113 }
114
115 pub fn remove_mappings(&mut self, src: RegionId, dsts: &[RegionId]) {
117 if let Some(dst_set) = self.src_to_dst.get_mut(&src) {
118 for dst in dsts {
119 dst_set.remove(dst);
120 }
121 if dst_set.is_empty() {
122 self.src_to_dst.remove(&src);
123 }
124 }
125 }
126}
127
128impl MetadataValue for TableRepartValue {
129 fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
130 serde_json::from_slice::<TableRepartValue>(raw_value).context(SerdeJsonSnafu)
131 }
132
133 fn try_as_raw_value(&self) -> Result<Vec<u8>> {
134 serde_json::to_vec(self).context(SerdeJsonSnafu)
135 }
136}
137
138pub type TableRepartValueDecodeResult =
139 Result<Option<DeserializedValueWithBytes<TableRepartValue>>>;
140
141pub struct TableRepartManager {
142 kv_backend: KvBackendRef,
143}
144
145impl TableRepartManager {
146 pub fn new(kv_backend: KvBackendRef) -> Self {
147 Self { kv_backend }
148 }
149
150 pub fn build_create_txn(
153 &self,
154 table_id: TableId,
155 table_repart_value: &TableRepartValue,
156 ) -> Result<(
157 Txn,
158 impl FnOnce(&mut TxnOpGetResponseSet) -> TableRepartValueDecodeResult + use<>,
159 )> {
160 let key = TableRepartKey::new(table_id);
161 let raw_key = key.to_bytes();
162
163 let txn = Txn::put_if_not_exists(raw_key.clone(), table_repart_value.try_as_raw_value()?);
164
165 Ok((
166 txn,
167 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
168 ))
169 }
170
171 pub fn build_update_txn(
175 &self,
176 table_id: TableId,
177 current_table_repart_value: &DeserializedValueWithBytes<TableRepartValue>,
178 new_table_repart_value: &TableRepartValue,
179 ) -> Result<(
180 Txn,
181 impl FnOnce(&mut TxnOpGetResponseSet) -> TableRepartValueDecodeResult + use<>,
182 )> {
183 let key = TableRepartKey::new(table_id);
184 let raw_key = key.to_bytes();
185 let raw_value = current_table_repart_value.get_raw_bytes();
186 let new_raw_value: Vec<u8> = new_table_repart_value.try_as_raw_value()?;
187
188 let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
189
190 Ok((
191 txn,
192 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
193 ))
194 }
195
196 pub async fn get(&self, table_id: TableId) -> Result<Option<TableRepartValue>> {
198 self.get_inner(table_id).await
199 }
200
201 async fn get_inner(&self, table_id: TableId) -> Result<Option<TableRepartValue>> {
202 let key = TableRepartKey::new(table_id);
203 self.kv_backend
204 .get(&key.to_bytes())
205 .await?
206 .map(|kv| TableRepartValue::try_from_raw_value(&kv.value))
207 .transpose()
208 }
209
210 pub async fn get_with_raw_bytes(
212 &self,
213 table_id: TableId,
214 ) -> Result<Option<DeserializedValueWithBytes<TableRepartValue>>> {
215 self.get_with_raw_bytes_inner(table_id).await
216 }
217
218 async fn get_with_raw_bytes_inner(
219 &self,
220 table_id: TableId,
221 ) -> Result<Option<DeserializedValueWithBytes<TableRepartValue>>> {
222 let key = TableRepartKey::new(table_id);
223 self.kv_backend
224 .get(&key.to_bytes())
225 .await?
226 .map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value))
227 .transpose()
228 }
229
230 pub async fn batch_get(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRepartValue>>> {
232 let raw_table_reparts = self.batch_get_inner(table_ids).await?;
233
234 Ok(raw_table_reparts
235 .into_iter()
236 .map(|v| v.map(|x| x.inner))
237 .collect())
238 }
239
240 pub async fn batch_get_with_raw_bytes(
242 &self,
243 table_ids: &[TableId],
244 ) -> Result<Vec<Option<DeserializedValueWithBytes<TableRepartValue>>>> {
245 self.batch_get_inner(table_ids).await
246 }
247
248 async fn batch_get_inner(
249 &self,
250 table_ids: &[TableId],
251 ) -> Result<Vec<Option<DeserializedValueWithBytes<TableRepartValue>>>> {
252 let keys = table_ids
253 .iter()
254 .map(|id| TableRepartKey::new(*id).to_bytes())
255 .collect::<Vec<_>>();
256 let resp = self
257 .kv_backend
258 .batch_get(BatchGetRequest { keys: keys.clone() })
259 .await?;
260
261 let kvs = resp
262 .kvs
263 .into_iter()
264 .map(|kv| (kv.key, kv.value))
265 .collect::<HashMap<_, _>>();
266 keys.into_iter()
267 .map(|key| {
268 if let Some(value) = kvs.get(&key) {
269 Ok(Some(DeserializedValueWithBytes::from_inner_slice(value)?))
270 } else {
271 Ok(None)
272 }
273 })
274 .collect()
275 }
276
277 pub async fn update_mappings(&self, src: RegionId, dst: &[RegionId]) -> Result<()> {
280 let table_id = src.table_id();
281
282 let current_table_repart = self
284 .get_with_raw_bytes(table_id)
285 .await?
286 .context(crate::error::TableRepartNotFoundSnafu { table_id })?;
287
288 let mut new_table_repart_value = current_table_repart.inner.clone();
290 new_table_repart_value.update_mappings(src, dst);
291
292 let (txn, _) =
294 self.build_update_txn(table_id, ¤t_table_repart, &new_table_repart_value)?;
295
296 let result = self.kv_backend.txn(txn).await?;
297
298 ensure!(
299 result.succeeded,
300 crate::error::MetadataCorruptionSnafu {
301 err_msg: format!(
302 "Failed to update mappings for table {}: CAS operation failed",
303 table_id
304 ),
305 }
306 );
307
308 Ok(())
309 }
310
311 pub async fn remove_mappings(&self, src: RegionId, dsts: &[RegionId]) -> Result<()> {
314 let table_id = src.table_id();
315
316 let current_table_repart = self
318 .get_with_raw_bytes(table_id)
319 .await?
320 .context(crate::error::TableRepartNotFoundSnafu { table_id })?;
321
322 let mut new_table_repart_value = current_table_repart.inner.clone();
324 new_table_repart_value.remove_mappings(src, dsts);
325
326 let (txn, _) =
328 self.build_update_txn(table_id, ¤t_table_repart, &new_table_repart_value)?;
329
330 let result = self.kv_backend.txn(txn).await?;
331
332 ensure!(
333 result.succeeded,
334 crate::error::MetadataCorruptionSnafu {
335 err_msg: format!(
336 "Failed to remove mappings for table {}: CAS operation failed",
337 table_id
338 ),
339 }
340 );
341
342 Ok(())
343 }
344
345 pub async fn get_dst_regions(
347 &self,
348 src_region: RegionId,
349 ) -> Result<Option<BTreeSet<RegionId>>> {
350 let table_id = src_region.table_id();
351 let table_repart = self.get(table_id).await?;
352 Ok(table_repart.and_then(|repart| repart.src_to_dst.get(&src_region).cloned()))
353 }
354}
355
356#[cfg(test)]
357mod tests {
358 use std::collections::BTreeMap;
359 use std::sync::Arc;
360
361 use super::*;
362 use crate::kv_backend::TxnService;
363 use crate::kv_backend::memory::MemoryKvBackend;
364
365 #[test]
366 fn test_table_repart_key_serialization() {
367 let key = TableRepartKey::new(42);
368 let raw_key = key.to_bytes();
369 assert_eq!(raw_key, b"__table_repart/42");
370 }
371
372 #[test]
373 fn test_table_repart_key_deserialization() {
374 let expected = TableRepartKey::new(42);
375 let key = TableRepartKey::from_bytes(b"__table_repart/42").unwrap();
376 assert_eq!(key, expected);
377 }
378
379 #[test]
380 fn test_table_repart_key_deserialization_invalid_utf8() {
381 let result = TableRepartKey::from_bytes(b"__table_repart/\xff");
382 assert!(result.is_err());
383 assert!(
384 result
385 .unwrap_err()
386 .to_string()
387 .contains("not a valid UTF8 string")
388 );
389 }
390
391 #[test]
392 fn test_table_repart_key_deserialization_invalid_format() {
393 let result = TableRepartKey::from_bytes(b"invalid_key_format");
394 assert!(result.is_err());
395 assert!(
396 result
397 .unwrap_err()
398 .to_string()
399 .contains("Invalid TableRepartKey")
400 );
401 }
402
403 #[test]
404 fn test_table_repart_value_serialization_deserialization() {
405 let mut src_to_dst = BTreeMap::new();
406 let src_region = RegionId::new(1, 1);
407 let dst_regions = vec![RegionId::new(1, 2), RegionId::new(1, 3)];
408 src_to_dst.insert(src_region, dst_regions.into_iter().collect());
409
410 let value = TableRepartValue { src_to_dst };
411 let serialized = value.try_as_raw_value().unwrap();
412 let deserialized = TableRepartValue::try_from_raw_value(&serialized).unwrap();
413
414 assert_eq!(value, deserialized);
415 }
416
417 #[test]
418 fn test_table_repart_value_update_mappings_new_src() {
419 let mut value = TableRepartValue {
420 src_to_dst: BTreeMap::new(),
421 };
422
423 let src = RegionId::new(1, 1);
424 let dst = vec![RegionId::new(1, 2), RegionId::new(1, 3)];
425
426 value.update_mappings(src, &dst);
427
428 assert_eq!(value.src_to_dst.len(), 1);
429 assert!(value.src_to_dst.contains_key(&src));
430 assert_eq!(value.src_to_dst.get(&src).unwrap().len(), 2);
431 assert!(
432 value
433 .src_to_dst
434 .get(&src)
435 .unwrap()
436 .contains(&RegionId::new(1, 2))
437 );
438 assert!(
439 value
440 .src_to_dst
441 .get(&src)
442 .unwrap()
443 .contains(&RegionId::new(1, 3))
444 );
445 }
446
447 #[test]
448 fn test_table_repart_value_update_mappings_existing_src() {
449 let mut value = TableRepartValue {
450 src_to_dst: BTreeMap::new(),
451 };
452
453 let src = RegionId::new(1, 1);
454 let initial_dst = vec![RegionId::new(1, 2)];
455 let additional_dst = vec![RegionId::new(1, 3), RegionId::new(1, 4)];
456
457 value.update_mappings(src, &initial_dst);
459 value.update_mappings(src, &additional_dst);
461
462 assert_eq!(value.src_to_dst.len(), 1);
463 assert_eq!(value.src_to_dst.get(&src).unwrap().len(), 3);
464 assert!(
465 value
466 .src_to_dst
467 .get(&src)
468 .unwrap()
469 .contains(&RegionId::new(1, 2))
470 );
471 assert!(
472 value
473 .src_to_dst
474 .get(&src)
475 .unwrap()
476 .contains(&RegionId::new(1, 3))
477 );
478 assert!(
479 value
480 .src_to_dst
481 .get(&src)
482 .unwrap()
483 .contains(&RegionId::new(1, 4))
484 );
485 }
486
487 #[test]
488 fn test_table_repart_value_remove_mappings_existing() {
489 let mut value = TableRepartValue {
490 src_to_dst: BTreeMap::new(),
491 };
492
493 let src = RegionId::new(1, 1);
494 let dst_regions = vec![
495 RegionId::new(1, 2),
496 RegionId::new(1, 3),
497 RegionId::new(1, 4),
498 ];
499 value.update_mappings(src, &dst_regions);
500
501 let to_remove = vec![RegionId::new(1, 2), RegionId::new(1, 3)];
503 value.remove_mappings(src, &to_remove);
504
505 assert_eq!(value.src_to_dst.len(), 1);
506 assert_eq!(value.src_to_dst.get(&src).unwrap().len(), 1);
507 assert!(
508 value
509 .src_to_dst
510 .get(&src)
511 .unwrap()
512 .contains(&RegionId::new(1, 4))
513 );
514 }
515
516 #[test]
517 fn test_table_repart_value_remove_mappings_all() {
518 let mut value = TableRepartValue {
519 src_to_dst: BTreeMap::new(),
520 };
521
522 let src = RegionId::new(1, 1);
523 let dst_regions = vec![RegionId::new(1, 2), RegionId::new(1, 3)];
524 value.update_mappings(src, &dst_regions);
525
526 value.remove_mappings(src, &dst_regions);
528
529 assert_eq!(value.src_to_dst.len(), 0);
530 }
531
532 #[test]
533 fn test_table_repart_value_remove_mappings_nonexistent() {
534 let mut value = TableRepartValue {
535 src_to_dst: BTreeMap::new(),
536 };
537
538 let src = RegionId::new(1, 1);
539 let dst_regions = vec![RegionId::new(1, 2)];
540 value.update_mappings(src, &dst_regions);
541
542 let nonexistent_dst = vec![RegionId::new(1, 3), RegionId::new(1, 4)];
544 value.remove_mappings(src, &nonexistent_dst);
545
546 assert_eq!(value.src_to_dst.len(), 1);
548 assert_eq!(value.src_to_dst.get(&src).unwrap().len(), 1);
549 assert!(
550 value
551 .src_to_dst
552 .get(&src)
553 .unwrap()
554 .contains(&RegionId::new(1, 2))
555 );
556 }
557
558 #[test]
559 fn test_table_repart_value_remove_mappings_nonexistent_src() {
560 let mut value = TableRepartValue {
561 src_to_dst: BTreeMap::new(),
562 };
563
564 let src = RegionId::new(1, 1);
565 let dst_regions = vec![RegionId::new(1, 2)];
566
567 value.remove_mappings(src, &dst_regions);
569
570 assert_eq!(value.src_to_dst.len(), 0);
572 }
573
574 #[tokio::test]
575 async fn test_table_repart_manager_get_empty() {
576 let kv = Arc::new(MemoryKvBackend::default());
577 let manager = TableRepartManager::new(kv);
578 let result = manager.get(1024).await.unwrap();
579 assert!(result.is_none());
580 }
581
582 #[tokio::test]
583 async fn test_table_repart_manager_get_with_raw_bytes_empty() {
584 let kv = Arc::new(MemoryKvBackend::default());
585 let manager = TableRepartManager::new(kv);
586 let result = manager.get_with_raw_bytes(1024).await.unwrap();
587 assert!(result.is_none());
588 }
589
590 #[tokio::test]
591 async fn test_table_repart_manager_create_and_get() {
592 let kv = Arc::new(MemoryKvBackend::default());
593 let manager = TableRepartManager::new(kv.clone());
594
595 let mut src_to_dst = BTreeMap::new();
596 let src_region = RegionId::new(1, 1);
597 let dst_regions = vec![RegionId::new(1, 2), RegionId::new(1, 3)];
598 src_to_dst.insert(src_region, dst_regions.into_iter().collect());
599
600 let value = TableRepartValue { src_to_dst };
601
602 let (txn, _) = manager.build_create_txn(1024, &value).unwrap();
604 let result = kv.txn(txn).await.unwrap();
605 assert!(result.succeeded);
606
607 let retrieved = manager.get(1024).await.unwrap().unwrap();
609 assert_eq!(retrieved, value);
610 }
611
612 #[tokio::test]
613 async fn test_table_repart_manager_update_txn() {
614 let kv = Arc::new(MemoryKvBackend::default());
615 let manager = TableRepartManager::new(kv.clone());
616
617 let initial_value = TableRepartValue {
618 src_to_dst: BTreeMap::new(),
619 };
620
621 let (create_txn, _) = manager.build_create_txn(1024, &initial_value).unwrap();
623 let result = kv.txn(create_txn).await.unwrap();
624 assert!(result.succeeded);
625
626 let current_value = manager.get_with_raw_bytes(1024).await.unwrap().unwrap();
628
629 let mut updated_src_to_dst = BTreeMap::new();
631 let src_region = RegionId::new(1, 1);
632 let dst_regions = vec![RegionId::new(1, 2)];
633 updated_src_to_dst.insert(src_region, dst_regions.into_iter().collect());
634 let updated_value = TableRepartValue {
635 src_to_dst: updated_src_to_dst,
636 };
637
638 let (update_txn, _) = manager
640 .build_update_txn(1024, ¤t_value, &updated_value)
641 .unwrap();
642 let result = kv.txn(update_txn).await.unwrap();
643 assert!(result.succeeded);
644
645 let retrieved = manager.get(1024).await.unwrap().unwrap();
647 assert_eq!(retrieved, updated_value);
648 }
649
650 #[tokio::test]
651 async fn test_table_repart_manager_batch_get() {
652 let kv = Arc::new(MemoryKvBackend::default());
653 let manager = TableRepartManager::new(kv.clone());
654
655 let table_reparts = vec![
657 (
658 1024,
659 TableRepartValue {
660 src_to_dst: {
661 let mut map = BTreeMap::new();
662 map.insert(
663 RegionId::new(1, 1),
664 vec![RegionId::new(1, 2)].into_iter().collect(),
665 );
666 map
667 },
668 },
669 ),
670 (
671 1025,
672 TableRepartValue {
673 src_to_dst: {
674 let mut map = BTreeMap::new();
675 map.insert(
676 RegionId::new(2, 1),
677 vec![RegionId::new(2, 2), RegionId::new(2, 3)]
678 .into_iter()
679 .collect(),
680 );
681 map
682 },
683 },
684 ),
685 ];
686
687 for (table_id, value) in &table_reparts {
688 let (txn, _) = manager.build_create_txn(*table_id, value).unwrap();
689 let result = kv.txn(txn).await.unwrap();
690 assert!(result.succeeded);
691 }
692
693 let results = manager.batch_get(&[1024, 1025, 1026]).await.unwrap();
695 assert_eq!(results.len(), 3);
696 assert_eq!(results[0].as_ref().unwrap(), &table_reparts[0].1);
697 assert_eq!(results[1].as_ref().unwrap(), &table_reparts[1].1);
698 assert!(results[2].is_none());
699 }
700
701 #[tokio::test]
702 async fn test_table_repart_manager_update_mappings() {
703 let kv = Arc::new(MemoryKvBackend::default());
704 let manager = TableRepartManager::new(kv.clone());
705
706 let initial_value = TableRepartValue {
708 src_to_dst: BTreeMap::new(),
709 };
710 let (txn, _) = manager.build_create_txn(1024, &initial_value).unwrap();
711 let result = kv.txn(txn).await.unwrap();
712 assert!(result.succeeded);
713
714 let src = RegionId::new(1024, 1);
716 let dst = vec![RegionId::new(1024, 2), RegionId::new(1024, 3)];
717 manager.update_mappings(src, &dst).await.unwrap();
718
719 let retrieved = manager.get(1024).await.unwrap().unwrap();
721 assert_eq!(retrieved.src_to_dst.len(), 1);
722 assert!(retrieved.src_to_dst.contains_key(&src));
723 assert_eq!(retrieved.src_to_dst.get(&src).unwrap().len(), 2);
724 }
725
726 #[tokio::test]
727 async fn test_table_repart_manager_remove_mappings() {
728 let kv = Arc::new(MemoryKvBackend::default());
729 let manager = TableRepartManager::new(kv.clone());
730
731 let mut initial_src_to_dst = BTreeMap::new();
733 let src = RegionId::new(1024, 1);
734 let dst_regions = vec![
735 RegionId::new(1024, 2),
736 RegionId::new(1024, 3),
737 RegionId::new(1024, 4),
738 ];
739 initial_src_to_dst.insert(src, dst_regions.into_iter().collect());
740
741 let initial_value = TableRepartValue {
742 src_to_dst: initial_src_to_dst,
743 };
744 let (txn, _) = manager.build_create_txn(1024, &initial_value).unwrap();
745 let result = kv.txn(txn).await.unwrap();
746 assert!(result.succeeded);
747
748 let to_remove = vec![RegionId::new(1024, 2), RegionId::new(1024, 3)];
750 manager.remove_mappings(src, &to_remove).await.unwrap();
751
752 let retrieved = manager.get(1024).await.unwrap().unwrap();
754 assert_eq!(retrieved.src_to_dst.len(), 1);
755 assert_eq!(retrieved.src_to_dst.get(&src).unwrap().len(), 1);
756 assert!(
757 retrieved
758 .src_to_dst
759 .get(&src)
760 .unwrap()
761 .contains(&RegionId::new(1024, 4))
762 );
763 }
764
765 #[tokio::test]
766 async fn test_table_repart_manager_get_dst_regions() {
767 let kv = Arc::new(MemoryKvBackend::default());
768 let manager = TableRepartManager::new(kv.clone());
769
770 let mut initial_src_to_dst = BTreeMap::new();
772 let src = RegionId::new(1024, 1);
773 let dst_regions = vec![RegionId::new(1024, 2), RegionId::new(1024, 3)];
774 initial_src_to_dst.insert(src, dst_regions.into_iter().collect());
775
776 let initial_value = TableRepartValue {
777 src_to_dst: initial_src_to_dst,
778 };
779 let (txn, _) = manager.build_create_txn(1024, &initial_value).unwrap();
780 let result = kv.txn(txn).await.unwrap();
781 assert!(result.succeeded);
782
783 let dst_regions = manager.get_dst_regions(src).await.unwrap();
785 assert!(dst_regions.is_some());
786 let dst_set = dst_regions.unwrap();
787 assert_eq!(dst_set.len(), 2);
788 assert!(dst_set.contains(&RegionId::new(1024, 2)));
789 assert!(dst_set.contains(&RegionId::new(1024, 3)));
790
791 let nonexistent_src = RegionId::new(1024, 99);
793 let result = manager.get_dst_regions(nonexistent_src).await.unwrap();
794 assert!(result.is_none());
795 }
796
797 #[tokio::test]
798 async fn test_table_repart_manager_operations_on_nonexistent_table() {
799 let kv = Arc::new(MemoryKvBackend::default());
800 let manager = TableRepartManager::new(kv);
801
802 let src = RegionId::new(1024, 1);
803 let dst = vec![RegionId::new(1024, 2)];
804
805 let result = manager.update_mappings(src, &dst).await;
807 assert!(result.is_err());
808 let err_msg = result.unwrap_err().to_string();
809 assert!(
810 err_msg.contains("Failed to find table repartition metadata for table id 1024"),
811 "{err_msg}"
812 );
813
814 let result = manager.remove_mappings(src, &dst).await;
816 assert!(result.is_err());
817 let err_msg = result.unwrap_err().to_string();
818 assert!(
819 err_msg.contains("Failed to find table repartition metadata for table id 1024"),
820 "{err_msg}"
821 );
822 }
823
824 #[tokio::test]
825 async fn test_table_repart_manager_batch_get_with_raw_bytes() {
826 let kv = Arc::new(MemoryKvBackend::default());
827 let manager = TableRepartManager::new(kv.clone());
828
829 let value = TableRepartValue {
831 src_to_dst: {
832 let mut map = BTreeMap::new();
833 map.insert(
834 RegionId::new(1, 1),
835 vec![RegionId::new(1, 2)].into_iter().collect(),
836 );
837 map
838 },
839 };
840 let (txn, _) = manager.build_create_txn(1024, &value).unwrap();
841 let result = kv.txn(txn).await.unwrap();
842 assert!(result.succeeded);
843
844 let results = manager
846 .batch_get_with_raw_bytes(&[1024, 1025])
847 .await
848 .unwrap();
849 assert_eq!(results.len(), 2);
850 assert!(results[0].is_some());
851 assert!(results[1].is_none());
852
853 let retrieved = &results[0].as_ref().unwrap().inner;
854 assert_eq!(retrieved, &value);
855 }
856}