common_meta/key/
table_repart.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap};
16use std::fmt::Display;
17
18use serde::{Deserialize, Serialize};
19use snafu::{OptionExt as _, ResultExt, ensure};
20use store_api::storage::RegionId;
21use table::metadata::TableId;
22
23use crate::error::{InvalidMetadataSnafu, Result, SerdeJsonSnafu};
24use crate::key::txn_helper::TxnOpGetResponseSet;
25use crate::key::{
26    DeserializedValueWithBytes, MetadataKey, MetadataValue, TABLE_REPART_KEY_PATTERN,
27    TABLE_REPART_PREFIX,
28};
29use crate::kv_backend::KvBackendRef;
30use crate::kv_backend::txn::Txn;
31use crate::rpc::store::BatchGetRequest;
32
33/// The key stores table repartition metadata.
34/// Specifically, it records the relation between source and destination regions after a repartition operation is completed.
35/// This is distinct from the initial partitioning scheme of the table.
36/// For example, after repartition, a destination region may still hold files from a source region; this mapping should be updated once repartition is done.
37/// The GC scheduler uses this information to clean up those files (and removes this mapping if all files from the source region are cleaned).
38///
39/// The layout: `__table_repart/{table_id}`.
40#[derive(Debug, PartialEq)]
41pub struct TableRepartKey {
42    /// The unique identifier of the table whose re-partition information is stored in this key.
43    pub table_id: TableId,
44}
45
46impl TableRepartKey {
47    pub fn new(table_id: TableId) -> Self {
48        Self { table_id }
49    }
50
51    /// Returns the range prefix of the table repartition key.
52    pub fn range_prefix() -> Vec<u8> {
53        format!("{}/", TABLE_REPART_PREFIX).into_bytes()
54    }
55}
56
57impl MetadataKey<'_, TableRepartKey> for TableRepartKey {
58    fn to_bytes(&self) -> Vec<u8> {
59        self.to_string().into_bytes()
60    }
61
62    fn from_bytes(bytes: &[u8]) -> Result<TableRepartKey> {
63        let key = std::str::from_utf8(bytes).map_err(|e| {
64            InvalidMetadataSnafu {
65                err_msg: format!(
66                    "TableRepartKey '{}' is not a valid UTF8 string: {e}",
67                    String::from_utf8_lossy(bytes)
68                ),
69            }
70            .build()
71        })?;
72        let captures = TABLE_REPART_KEY_PATTERN
73            .captures(key)
74            .context(InvalidMetadataSnafu {
75                err_msg: format!("Invalid TableRepartKey '{key}'"),
76            })?;
77        // Safety: pass the regex check above
78        let table_id = captures[1].parse::<TableId>().unwrap();
79        Ok(TableRepartKey { table_id })
80    }
81}
82
83impl Display for TableRepartKey {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        write!(f, "{}/{}", TABLE_REPART_PREFIX, self.table_id)
86    }
87}
88
89#[derive(Debug, PartialEq, Serialize, Deserialize, Clone, Default)]
90pub struct TableRepartValue {
91    /// A mapping from source region IDs to sets of destination region IDs after repartition.
92    ///
93    /// Each key in the map is a `RegionId` representing a source region that has been repartitioned.
94    /// The corresponding value is a `BTreeSet<RegionId>` containing the IDs of destination regions
95    /// that currently hold files originally from the source region. This mapping is updated after
96    /// repartition and is used by the GC scheduler to track and clean up files that have been moved.
97    pub src_to_dst: BTreeMap<RegionId, BTreeSet<RegionId>>,
98}
99
100impl TableRepartValue {
101    /// Creates a new TableRepartValue with an empty src_to_dst map.
102    pub fn new() -> Self {
103        Default::default()
104    }
105    /// Update mapping from src region to dst regions. Should be called once repartition is done.
106    ///
107    /// If `dst` is empty, this method does nothing.
108    pub fn update_mappings(&mut self, src: RegionId, dst: &[RegionId]) {
109        if dst.is_empty() {
110            return;
111        }
112        self.src_to_dst.entry(src).or_default().extend(dst);
113    }
114
115    /// Remove mappings from src region to dst regions. Should be called once files from src region are cleaned up in dst regions.
116    pub fn remove_mappings(&mut self, src: RegionId, dsts: &[RegionId]) {
117        if let Some(dst_set) = self.src_to_dst.get_mut(&src) {
118            for dst in dsts {
119                dst_set.remove(dst);
120            }
121            if dst_set.is_empty() {
122                self.src_to_dst.remove(&src);
123            }
124        }
125    }
126}
127
128impl MetadataValue for TableRepartValue {
129    fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
130        serde_json::from_slice::<TableRepartValue>(raw_value).context(SerdeJsonSnafu)
131    }
132
133    fn try_as_raw_value(&self) -> Result<Vec<u8>> {
134        serde_json::to_vec(self).context(SerdeJsonSnafu)
135    }
136}
137
138pub type TableRepartValueDecodeResult =
139    Result<Option<DeserializedValueWithBytes<TableRepartValue>>>;
140
141pub struct TableRepartManager {
142    kv_backend: KvBackendRef,
143}
144
145impl TableRepartManager {
146    pub fn new(kv_backend: KvBackendRef) -> Self {
147        Self { kv_backend }
148    }
149
150    /// Builds a create table repart transaction,
151    /// it expected the `__table_repart/{table_id}` wasn't occupied.
152    pub fn build_create_txn(
153        &self,
154        table_id: TableId,
155        table_repart_value: &TableRepartValue,
156    ) -> Result<(
157        Txn,
158        impl FnOnce(&mut TxnOpGetResponseSet) -> TableRepartValueDecodeResult + use<>,
159    )> {
160        let key = TableRepartKey::new(table_id);
161        let raw_key = key.to_bytes();
162
163        let txn = Txn::put_if_not_exists(raw_key.clone(), table_repart_value.try_as_raw_value()?);
164
165        Ok((
166            txn,
167            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
168        ))
169    }
170
171    /// Builds a update table repart transaction,
172    /// it expected the remote value equals the `current_table_repart_value`.
173    /// It retrieves the latest value if the comparing failed.
174    pub fn build_update_txn(
175        &self,
176        table_id: TableId,
177        current_table_repart_value: &DeserializedValueWithBytes<TableRepartValue>,
178        new_table_repart_value: &TableRepartValue,
179    ) -> Result<(
180        Txn,
181        impl FnOnce(&mut TxnOpGetResponseSet) -> TableRepartValueDecodeResult + use<>,
182    )> {
183        let key = TableRepartKey::new(table_id);
184        let raw_key = key.to_bytes();
185        let raw_value = current_table_repart_value.get_raw_bytes();
186        let new_raw_value: Vec<u8> = new_table_repart_value.try_as_raw_value()?;
187
188        let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
189
190        Ok((
191            txn,
192            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
193        ))
194    }
195
196    /// Returns the [`TableRepartValue`].
197    pub async fn get(&self, table_id: TableId) -> Result<Option<TableRepartValue>> {
198        self.get_inner(table_id).await
199    }
200
201    async fn get_inner(&self, table_id: TableId) -> Result<Option<TableRepartValue>> {
202        let key = TableRepartKey::new(table_id);
203        self.kv_backend
204            .get(&key.to_bytes())
205            .await?
206            .map(|kv| TableRepartValue::try_from_raw_value(&kv.value))
207            .transpose()
208    }
209
210    /// Returns the [`TableRepartValue`] wrapped with [`DeserializedValueWithBytes`].
211    pub async fn get_with_raw_bytes(
212        &self,
213        table_id: TableId,
214    ) -> Result<Option<DeserializedValueWithBytes<TableRepartValue>>> {
215        self.get_with_raw_bytes_inner(table_id).await
216    }
217
218    async fn get_with_raw_bytes_inner(
219        &self,
220        table_id: TableId,
221    ) -> Result<Option<DeserializedValueWithBytes<TableRepartValue>>> {
222        let key = TableRepartKey::new(table_id);
223        self.kv_backend
224            .get(&key.to_bytes())
225            .await?
226            .map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value))
227            .transpose()
228    }
229
230    /// Returns batch of [`TableRepartValue`] that respects the order of `table_ids`.
231    pub async fn batch_get(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRepartValue>>> {
232        let raw_table_reparts = self.batch_get_inner(table_ids).await?;
233
234        Ok(raw_table_reparts
235            .into_iter()
236            .map(|v| v.map(|x| x.inner))
237            .collect())
238    }
239
240    /// Returns batch of [`TableRepartValue`] wrapped with [`DeserializedValueWithBytes`].
241    pub async fn batch_get_with_raw_bytes(
242        &self,
243        table_ids: &[TableId],
244    ) -> Result<Vec<Option<DeserializedValueWithBytes<TableRepartValue>>>> {
245        self.batch_get_inner(table_ids).await
246    }
247
248    async fn batch_get_inner(
249        &self,
250        table_ids: &[TableId],
251    ) -> Result<Vec<Option<DeserializedValueWithBytes<TableRepartValue>>>> {
252        let keys = table_ids
253            .iter()
254            .map(|id| TableRepartKey::new(*id).to_bytes())
255            .collect::<Vec<_>>();
256        let resp = self
257            .kv_backend
258            .batch_get(BatchGetRequest { keys: keys.clone() })
259            .await?;
260
261        let kvs = resp
262            .kvs
263            .into_iter()
264            .map(|kv| (kv.key, kv.value))
265            .collect::<HashMap<_, _>>();
266        keys.into_iter()
267            .map(|key| {
268                if let Some(value) = kvs.get(&key) {
269                    Ok(Some(DeserializedValueWithBytes::from_inner_slice(value)?))
270                } else {
271                    Ok(None)
272                }
273            })
274            .collect()
275    }
276
277    /// Updates mappings from src region to dst regions.
278    /// Should be called once repartition is done.
279    pub async fn update_mappings(&self, src: RegionId, dst: &[RegionId]) -> Result<()> {
280        let table_id = src.table_id();
281
282        // Get current table repart with raw bytes for CAS operation
283        let current_table_repart = self
284            .get_with_raw_bytes(table_id)
285            .await?
286            .context(crate::error::TableRepartNotFoundSnafu { table_id })?;
287
288        // Clone the current repart value and update mappings
289        let mut new_table_repart_value = current_table_repart.inner.clone();
290        new_table_repart_value.update_mappings(src, dst);
291
292        // Execute atomic update
293        let (txn, _) =
294            self.build_update_txn(table_id, &current_table_repart, &new_table_repart_value)?;
295
296        let result = self.kv_backend.txn(txn).await?;
297
298        ensure!(
299            result.succeeded,
300            crate::error::MetadataCorruptionSnafu {
301                err_msg: format!(
302                    "Failed to update mappings for table {}: CAS operation failed",
303                    table_id
304                ),
305            }
306        );
307
308        Ok(())
309    }
310
311    /// Removes mappings from src region to dst regions.
312    /// Should be called once files from src region are cleaned up in dst regions.
313    pub async fn remove_mappings(&self, src: RegionId, dsts: &[RegionId]) -> Result<()> {
314        let table_id = src.table_id();
315
316        // Get current table repart with raw bytes for CAS operation
317        let current_table_repart = self
318            .get_with_raw_bytes(table_id)
319            .await?
320            .context(crate::error::TableRepartNotFoundSnafu { table_id })?;
321
322        // Clone the current repart value and remove mappings
323        let mut new_table_repart_value = current_table_repart.inner.clone();
324        new_table_repart_value.remove_mappings(src, dsts);
325
326        // Execute atomic update
327        let (txn, _) =
328            self.build_update_txn(table_id, &current_table_repart, &new_table_repart_value)?;
329
330        let result = self.kv_backend.txn(txn).await?;
331
332        ensure!(
333            result.succeeded,
334            crate::error::MetadataCorruptionSnafu {
335                err_msg: format!(
336                    "Failed to remove mappings for table {}: CAS operation failed",
337                    table_id
338                ),
339            }
340        );
341
342        Ok(())
343    }
344
345    /// Returns the destination regions for a given source region.
346    pub async fn get_dst_regions(
347        &self,
348        src_region: RegionId,
349    ) -> Result<Option<BTreeSet<RegionId>>> {
350        let table_id = src_region.table_id();
351        let table_repart = self.get(table_id).await?;
352        Ok(table_repart.and_then(|repart| repart.src_to_dst.get(&src_region).cloned()))
353    }
354}
355
356#[cfg(test)]
357mod tests {
358    use std::collections::BTreeMap;
359    use std::sync::Arc;
360
361    use super::*;
362    use crate::kv_backend::TxnService;
363    use crate::kv_backend::memory::MemoryKvBackend;
364
365    #[test]
366    fn test_table_repart_key_serialization() {
367        let key = TableRepartKey::new(42);
368        let raw_key = key.to_bytes();
369        assert_eq!(raw_key, b"__table_repart/42");
370    }
371
372    #[test]
373    fn test_table_repart_key_deserialization() {
374        let expected = TableRepartKey::new(42);
375        let key = TableRepartKey::from_bytes(b"__table_repart/42").unwrap();
376        assert_eq!(key, expected);
377    }
378
379    #[test]
380    fn test_table_repart_key_deserialization_invalid_utf8() {
381        let result = TableRepartKey::from_bytes(b"__table_repart/\xff");
382        assert!(result.is_err());
383        assert!(
384            result
385                .unwrap_err()
386                .to_string()
387                .contains("not a valid UTF8 string")
388        );
389    }
390
391    #[test]
392    fn test_table_repart_key_deserialization_invalid_format() {
393        let result = TableRepartKey::from_bytes(b"invalid_key_format");
394        assert!(result.is_err());
395        assert!(
396            result
397                .unwrap_err()
398                .to_string()
399                .contains("Invalid TableRepartKey")
400        );
401    }
402
403    #[test]
404    fn test_table_repart_value_serialization_deserialization() {
405        let mut src_to_dst = BTreeMap::new();
406        let src_region = RegionId::new(1, 1);
407        let dst_regions = vec![RegionId::new(1, 2), RegionId::new(1, 3)];
408        src_to_dst.insert(src_region, dst_regions.into_iter().collect());
409
410        let value = TableRepartValue { src_to_dst };
411        let serialized = value.try_as_raw_value().unwrap();
412        let deserialized = TableRepartValue::try_from_raw_value(&serialized).unwrap();
413
414        assert_eq!(value, deserialized);
415    }
416
417    #[test]
418    fn test_table_repart_value_update_mappings_new_src() {
419        let mut value = TableRepartValue {
420            src_to_dst: BTreeMap::new(),
421        };
422
423        let src = RegionId::new(1, 1);
424        let dst = vec![RegionId::new(1, 2), RegionId::new(1, 3)];
425
426        value.update_mappings(src, &dst);
427
428        assert_eq!(value.src_to_dst.len(), 1);
429        assert!(value.src_to_dst.contains_key(&src));
430        assert_eq!(value.src_to_dst.get(&src).unwrap().len(), 2);
431        assert!(
432            value
433                .src_to_dst
434                .get(&src)
435                .unwrap()
436                .contains(&RegionId::new(1, 2))
437        );
438        assert!(
439            value
440                .src_to_dst
441                .get(&src)
442                .unwrap()
443                .contains(&RegionId::new(1, 3))
444        );
445    }
446
447    #[test]
448    fn test_table_repart_value_update_mappings_existing_src() {
449        let mut value = TableRepartValue {
450            src_to_dst: BTreeMap::new(),
451        };
452
453        let src = RegionId::new(1, 1);
454        let initial_dst = vec![RegionId::new(1, 2)];
455        let additional_dst = vec![RegionId::new(1, 3), RegionId::new(1, 4)];
456
457        // Initial mapping
458        value.update_mappings(src, &initial_dst);
459        // Update with additional destinations
460        value.update_mappings(src, &additional_dst);
461
462        assert_eq!(value.src_to_dst.len(), 1);
463        assert_eq!(value.src_to_dst.get(&src).unwrap().len(), 3);
464        assert!(
465            value
466                .src_to_dst
467                .get(&src)
468                .unwrap()
469                .contains(&RegionId::new(1, 2))
470        );
471        assert!(
472            value
473                .src_to_dst
474                .get(&src)
475                .unwrap()
476                .contains(&RegionId::new(1, 3))
477        );
478        assert!(
479            value
480                .src_to_dst
481                .get(&src)
482                .unwrap()
483                .contains(&RegionId::new(1, 4))
484        );
485    }
486
487    #[test]
488    fn test_table_repart_value_remove_mappings_existing() {
489        let mut value = TableRepartValue {
490            src_to_dst: BTreeMap::new(),
491        };
492
493        let src = RegionId::new(1, 1);
494        let dst_regions = vec![
495            RegionId::new(1, 2),
496            RegionId::new(1, 3),
497            RegionId::new(1, 4),
498        ];
499        value.update_mappings(src, &dst_regions);
500
501        // Remove some mappings
502        let to_remove = vec![RegionId::new(1, 2), RegionId::new(1, 3)];
503        value.remove_mappings(src, &to_remove);
504
505        assert_eq!(value.src_to_dst.len(), 1);
506        assert_eq!(value.src_to_dst.get(&src).unwrap().len(), 1);
507        assert!(
508            value
509                .src_to_dst
510                .get(&src)
511                .unwrap()
512                .contains(&RegionId::new(1, 4))
513        );
514    }
515
516    #[test]
517    fn test_table_repart_value_remove_mappings_all() {
518        let mut value = TableRepartValue {
519            src_to_dst: BTreeMap::new(),
520        };
521
522        let src = RegionId::new(1, 1);
523        let dst_regions = vec![RegionId::new(1, 2), RegionId::new(1, 3)];
524        value.update_mappings(src, &dst_regions);
525
526        // Remove all mappings
527        value.remove_mappings(src, &dst_regions);
528
529        assert_eq!(value.src_to_dst.len(), 0);
530    }
531
532    #[test]
533    fn test_table_repart_value_remove_mappings_nonexistent() {
534        let mut value = TableRepartValue {
535            src_to_dst: BTreeMap::new(),
536        };
537
538        let src = RegionId::new(1, 1);
539        let dst_regions = vec![RegionId::new(1, 2)];
540        value.update_mappings(src, &dst_regions);
541
542        // Try to remove non-existent mappings
543        let nonexistent_dst = vec![RegionId::new(1, 3), RegionId::new(1, 4)];
544        value.remove_mappings(src, &nonexistent_dst);
545
546        // Should remain unchanged
547        assert_eq!(value.src_to_dst.len(), 1);
548        assert_eq!(value.src_to_dst.get(&src).unwrap().len(), 1);
549        assert!(
550            value
551                .src_to_dst
552                .get(&src)
553                .unwrap()
554                .contains(&RegionId::new(1, 2))
555        );
556    }
557
558    #[test]
559    fn test_table_repart_value_remove_mappings_nonexistent_src() {
560        let mut value = TableRepartValue {
561            src_to_dst: BTreeMap::new(),
562        };
563
564        let src = RegionId::new(1, 1);
565        let dst_regions = vec![RegionId::new(1, 2)];
566
567        // Try to remove mappings for non-existent source
568        value.remove_mappings(src, &dst_regions);
569
570        // Should remain empty
571        assert_eq!(value.src_to_dst.len(), 0);
572    }
573
574    #[tokio::test]
575    async fn test_table_repart_manager_get_empty() {
576        let kv = Arc::new(MemoryKvBackend::default());
577        let manager = TableRepartManager::new(kv);
578        let result = manager.get(1024).await.unwrap();
579        assert!(result.is_none());
580    }
581
582    #[tokio::test]
583    async fn test_table_repart_manager_get_with_raw_bytes_empty() {
584        let kv = Arc::new(MemoryKvBackend::default());
585        let manager = TableRepartManager::new(kv);
586        let result = manager.get_with_raw_bytes(1024).await.unwrap();
587        assert!(result.is_none());
588    }
589
590    #[tokio::test]
591    async fn test_table_repart_manager_create_and_get() {
592        let kv = Arc::new(MemoryKvBackend::default());
593        let manager = TableRepartManager::new(kv.clone());
594
595        let mut src_to_dst = BTreeMap::new();
596        let src_region = RegionId::new(1, 1);
597        let dst_regions = vec![RegionId::new(1, 2), RegionId::new(1, 3)];
598        src_to_dst.insert(src_region, dst_regions.into_iter().collect());
599
600        let value = TableRepartValue { src_to_dst };
601
602        // Create the table repart
603        let (txn, _) = manager.build_create_txn(1024, &value).unwrap();
604        let result = kv.txn(txn).await.unwrap();
605        assert!(result.succeeded);
606
607        // Get the table repart
608        let retrieved = manager.get(1024).await.unwrap().unwrap();
609        assert_eq!(retrieved, value);
610    }
611
612    #[tokio::test]
613    async fn test_table_repart_manager_update_txn() {
614        let kv = Arc::new(MemoryKvBackend::default());
615        let manager = TableRepartManager::new(kv.clone());
616
617        let initial_value = TableRepartValue {
618            src_to_dst: BTreeMap::new(),
619        };
620
621        // Create initial table repart
622        let (create_txn, _) = manager.build_create_txn(1024, &initial_value).unwrap();
623        let result = kv.txn(create_txn).await.unwrap();
624        assert!(result.succeeded);
625
626        // Get current value with raw bytes
627        let current_value = manager.get_with_raw_bytes(1024).await.unwrap().unwrap();
628
629        // Create updated value
630        let mut updated_src_to_dst = BTreeMap::new();
631        let src_region = RegionId::new(1, 1);
632        let dst_regions = vec![RegionId::new(1, 2)];
633        updated_src_to_dst.insert(src_region, dst_regions.into_iter().collect());
634        let updated_value = TableRepartValue {
635            src_to_dst: updated_src_to_dst,
636        };
637
638        // Build update transaction
639        let (update_txn, _) = manager
640            .build_update_txn(1024, &current_value, &updated_value)
641            .unwrap();
642        let result = kv.txn(update_txn).await.unwrap();
643        assert!(result.succeeded);
644
645        // Verify update
646        let retrieved = manager.get(1024).await.unwrap().unwrap();
647        assert_eq!(retrieved, updated_value);
648    }
649
650    #[tokio::test]
651    async fn test_table_repart_manager_batch_get() {
652        let kv = Arc::new(MemoryKvBackend::default());
653        let manager = TableRepartManager::new(kv.clone());
654
655        // Create multiple table reparts
656        let table_reparts = vec![
657            (
658                1024,
659                TableRepartValue {
660                    src_to_dst: {
661                        let mut map = BTreeMap::new();
662                        map.insert(
663                            RegionId::new(1, 1),
664                            vec![RegionId::new(1, 2)].into_iter().collect(),
665                        );
666                        map
667                    },
668                },
669            ),
670            (
671                1025,
672                TableRepartValue {
673                    src_to_dst: {
674                        let mut map = BTreeMap::new();
675                        map.insert(
676                            RegionId::new(2, 1),
677                            vec![RegionId::new(2, 2), RegionId::new(2, 3)]
678                                .into_iter()
679                                .collect(),
680                        );
681                        map
682                    },
683                },
684            ),
685        ];
686
687        for (table_id, value) in &table_reparts {
688            let (txn, _) = manager.build_create_txn(*table_id, value).unwrap();
689            let result = kv.txn(txn).await.unwrap();
690            assert!(result.succeeded);
691        }
692
693        // Batch get
694        let results = manager.batch_get(&[1024, 1025, 1026]).await.unwrap();
695        assert_eq!(results.len(), 3);
696        assert_eq!(results[0].as_ref().unwrap(), &table_reparts[0].1);
697        assert_eq!(results[1].as_ref().unwrap(), &table_reparts[1].1);
698        assert!(results[2].is_none());
699    }
700
701    #[tokio::test]
702    async fn test_table_repart_manager_update_mappings() {
703        let kv = Arc::new(MemoryKvBackend::default());
704        let manager = TableRepartManager::new(kv.clone());
705
706        // Create initial table repart
707        let initial_value = TableRepartValue {
708            src_to_dst: BTreeMap::new(),
709        };
710        let (txn, _) = manager.build_create_txn(1024, &initial_value).unwrap();
711        let result = kv.txn(txn).await.unwrap();
712        assert!(result.succeeded);
713
714        // Update mappings
715        let src = RegionId::new(1024, 1);
716        let dst = vec![RegionId::new(1024, 2), RegionId::new(1024, 3)];
717        manager.update_mappings(src, &dst).await.unwrap();
718
719        // Verify update
720        let retrieved = manager.get(1024).await.unwrap().unwrap();
721        assert_eq!(retrieved.src_to_dst.len(), 1);
722        assert!(retrieved.src_to_dst.contains_key(&src));
723        assert_eq!(retrieved.src_to_dst.get(&src).unwrap().len(), 2);
724    }
725
726    #[tokio::test]
727    async fn test_table_repart_manager_remove_mappings() {
728        let kv = Arc::new(MemoryKvBackend::default());
729        let manager = TableRepartManager::new(kv.clone());
730
731        // Create initial table repart with mappings
732        let mut initial_src_to_dst = BTreeMap::new();
733        let src = RegionId::new(1024, 1);
734        let dst_regions = vec![
735            RegionId::new(1024, 2),
736            RegionId::new(1024, 3),
737            RegionId::new(1024, 4),
738        ];
739        initial_src_to_dst.insert(src, dst_regions.into_iter().collect());
740
741        let initial_value = TableRepartValue {
742            src_to_dst: initial_src_to_dst,
743        };
744        let (txn, _) = manager.build_create_txn(1024, &initial_value).unwrap();
745        let result = kv.txn(txn).await.unwrap();
746        assert!(result.succeeded);
747
748        // Remove some mappings
749        let to_remove = vec![RegionId::new(1024, 2), RegionId::new(1024, 3)];
750        manager.remove_mappings(src, &to_remove).await.unwrap();
751
752        // Verify removal
753        let retrieved = manager.get(1024).await.unwrap().unwrap();
754        assert_eq!(retrieved.src_to_dst.len(), 1);
755        assert_eq!(retrieved.src_to_dst.get(&src).unwrap().len(), 1);
756        assert!(
757            retrieved
758                .src_to_dst
759                .get(&src)
760                .unwrap()
761                .contains(&RegionId::new(1024, 4))
762        );
763    }
764
765    #[tokio::test]
766    async fn test_table_repart_manager_get_dst_regions() {
767        let kv = Arc::new(MemoryKvBackend::default());
768        let manager = TableRepartManager::new(kv.clone());
769
770        // Create initial table repart with mappings
771        let mut initial_src_to_dst = BTreeMap::new();
772        let src = RegionId::new(1024, 1);
773        let dst_regions = vec![RegionId::new(1024, 2), RegionId::new(1024, 3)];
774        initial_src_to_dst.insert(src, dst_regions.into_iter().collect());
775
776        let initial_value = TableRepartValue {
777            src_to_dst: initial_src_to_dst,
778        };
779        let (txn, _) = manager.build_create_txn(1024, &initial_value).unwrap();
780        let result = kv.txn(txn).await.unwrap();
781        assert!(result.succeeded);
782
783        // Get destination regions
784        let dst_regions = manager.get_dst_regions(src).await.unwrap();
785        assert!(dst_regions.is_some());
786        let dst_set = dst_regions.unwrap();
787        assert_eq!(dst_set.len(), 2);
788        assert!(dst_set.contains(&RegionId::new(1024, 2)));
789        assert!(dst_set.contains(&RegionId::new(1024, 3)));
790
791        // Test non-existent source region
792        let nonexistent_src = RegionId::new(1024, 99);
793        let result = manager.get_dst_regions(nonexistent_src).await.unwrap();
794        assert!(result.is_none());
795    }
796
797    #[tokio::test]
798    async fn test_table_repart_manager_operations_on_nonexistent_table() {
799        let kv = Arc::new(MemoryKvBackend::default());
800        let manager = TableRepartManager::new(kv);
801
802        let src = RegionId::new(1024, 1);
803        let dst = vec![RegionId::new(1024, 2)];
804
805        // Try to update mappings on non-existent table
806        let result = manager.update_mappings(src, &dst).await;
807        assert!(result.is_err());
808        let err_msg = result.unwrap_err().to_string();
809        assert!(
810            err_msg.contains("Failed to find table repartition metadata for table id 1024"),
811            "{err_msg}"
812        );
813
814        // Try to remove mappings on non-existent table
815        let result = manager.remove_mappings(src, &dst).await;
816        assert!(result.is_err());
817        let err_msg = result.unwrap_err().to_string();
818        assert!(
819            err_msg.contains("Failed to find table repartition metadata for table id 1024"),
820            "{err_msg}"
821        );
822    }
823
824    #[tokio::test]
825    async fn test_table_repart_manager_batch_get_with_raw_bytes() {
826        let kv = Arc::new(MemoryKvBackend::default());
827        let manager = TableRepartManager::new(kv.clone());
828
829        // Create table repart
830        let value = TableRepartValue {
831            src_to_dst: {
832                let mut map = BTreeMap::new();
833                map.insert(
834                    RegionId::new(1, 1),
835                    vec![RegionId::new(1, 2)].into_iter().collect(),
836                );
837                map
838            },
839        };
840        let (txn, _) = manager.build_create_txn(1024, &value).unwrap();
841        let result = kv.txn(txn).await.unwrap();
842        assert!(result.succeeded);
843
844        // Batch get with raw bytes
845        let results = manager
846            .batch_get_with_raw_bytes(&[1024, 1025])
847            .await
848            .unwrap();
849        assert_eq!(results.len(), 2);
850        assert!(results[0].is_some());
851        assert!(results[1].is_none());
852
853        let retrieved = &results[0].as_ref().unwrap().inner;
854        assert_eq!(retrieved, &value);
855    }
856}