common_meta/key/
table_repart.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap};
16use std::fmt::Display;
17
18use common_telemetry::debug;
19use futures::TryStreamExt;
20use serde::{Deserialize, Serialize};
21use snafu::{OptionExt as _, ResultExt, ensure};
22use store_api::storage::RegionId;
23use table::metadata::TableId;
24
25use crate::error::{InvalidMetadataSnafu, Result, SerdeJsonSnafu};
26use crate::key::txn_helper::TxnOpGetResponseSet;
27use crate::key::{
28    DeserializedValueWithBytes, MetadataKey, MetadataValue, TABLE_REPART_KEY_PATTERN,
29    TABLE_REPART_PREFIX,
30};
31use crate::kv_backend::KvBackendRef;
32use crate::kv_backend::txn::Txn;
33use crate::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
34use crate::rpc::KeyValue;
35use crate::rpc::store::{BatchGetRequest, RangeRequest};
36
37/// The key stores table repartition metadata.
38/// Specifically, it records the relation between source and destination regions after a repartition operation is completed.
39/// This is distinct from the initial partitioning scheme of the table.
40/// For example, after repartition, a destination region may still hold files from a source region; this mapping should be updated once repartition is done.
41/// The GC scheduler uses this information to clean up those files (and removes this mapping if all files from the source region are cleaned).
42///
43/// The layout: `__table_repart/{table_id}`.
44#[derive(Debug, PartialEq)]
45pub struct TableRepartKey {
46    /// The unique identifier of the table whose re-partition information is stored in this key.
47    pub table_id: TableId,
48}
49
50impl TableRepartKey {
51    pub fn new(table_id: TableId) -> Self {
52        Self { table_id }
53    }
54
55    /// Returns the range prefix of the table repartition key.
56    pub fn range_prefix() -> Vec<u8> {
57        format!("{}/", TABLE_REPART_PREFIX).into_bytes()
58    }
59}
60
61impl MetadataKey<'_, TableRepartKey> for TableRepartKey {
62    fn to_bytes(&self) -> Vec<u8> {
63        self.to_string().into_bytes()
64    }
65
66    fn from_bytes(bytes: &[u8]) -> Result<TableRepartKey> {
67        let key = std::str::from_utf8(bytes).map_err(|e| {
68            InvalidMetadataSnafu {
69                err_msg: format!(
70                    "TableRepartKey '{}' is not a valid UTF8 string: {e}",
71                    String::from_utf8_lossy(bytes)
72                ),
73            }
74            .build()
75        })?;
76        let captures = TABLE_REPART_KEY_PATTERN
77            .captures(key)
78            .context(InvalidMetadataSnafu {
79                err_msg: format!("Invalid TableRepartKey '{key}'"),
80            })?;
81        // Safety: pass the regex check above
82        let table_id = captures[1].parse::<TableId>().unwrap();
83        Ok(TableRepartKey { table_id })
84    }
85}
86
87impl Display for TableRepartKey {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        write!(f, "{}/{}", TABLE_REPART_PREFIX, self.table_id)
90    }
91}
92
93#[derive(Debug, PartialEq, Serialize, Deserialize, Clone, Default)]
94pub struct TableRepartValue {
95    /// A mapping from source region IDs to sets of destination region IDs after repartition.
96    ///
97    /// Each key in the map is a `RegionId` representing a source region that has been repartitioned.
98    /// The corresponding value is a `BTreeSet<RegionId>` containing the IDs of destination regions
99    /// that currently hold files originally from the source region. This mapping is updated after
100    /// repartition and is used by the GC scheduler to track and clean up files that have been moved.
101    pub src_to_dst: BTreeMap<RegionId, BTreeSet<RegionId>>,
102}
103
104impl TableRepartValue {
105    /// Creates a new TableRepartValue with an empty src_to_dst map.
106    pub fn new() -> Self {
107        Default::default()
108    }
109    /// Update mapping from src region to dst regions. Should be called once repartition is done.
110    ///
111    /// If `dst` is empty, this method does nothing.
112    pub fn update_mappings(&mut self, src: RegionId, dst: &[RegionId]) {
113        if dst.is_empty() {
114            return;
115        }
116        self.src_to_dst.entry(src).or_default().extend(dst);
117    }
118
119    /// Remove mappings from src region to dst regions. Should be called once files from src region are cleaned up in dst regions.
120    pub fn remove_mappings(&mut self, src: RegionId, dsts: &[RegionId]) {
121        if let Some(dst_set) = self.src_to_dst.get_mut(&src) {
122            for dst in dsts {
123                dst_set.remove(dst);
124            }
125            if dst_set.is_empty() {
126                self.src_to_dst.remove(&src);
127            }
128        }
129    }
130}
131
132impl MetadataValue for TableRepartValue {
133    fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
134        serde_json::from_slice::<TableRepartValue>(raw_value).context(SerdeJsonSnafu)
135    }
136
137    fn try_as_raw_value(&self) -> Result<Vec<u8>> {
138        serde_json::to_vec(self).context(SerdeJsonSnafu)
139    }
140}
141
142pub type TableRepartValueDecodeResult =
143    Result<Option<DeserializedValueWithBytes<TableRepartValue>>>;
144
145/// Decodes `KeyValue` to [TableRepartKey] and [TableRepartValue].
146pub fn table_repart_decoder(kv: KeyValue) -> Result<(TableRepartKey, TableRepartValue)> {
147    let key = TableRepartKey::from_bytes(&kv.key)?;
148    let value = TableRepartValue::try_from_raw_value(&kv.value)?;
149    Ok((key, value))
150}
151
152pub struct TableRepartManager {
153    kv_backend: KvBackendRef,
154}
155
156impl TableRepartManager {
157    pub fn new(kv_backend: KvBackendRef) -> Self {
158        Self { kv_backend }
159    }
160
161    /// Returns all table repartition entries.
162    pub async fn table_reparts(&self) -> Result<Vec<(TableId, TableRepartValue)>> {
163        let prefix = TableRepartKey::range_prefix();
164        let req = RangeRequest::new().with_prefix(prefix);
165        let stream = PaginationStream::new(
166            self.kv_backend.clone(),
167            req,
168            DEFAULT_PAGE_SIZE,
169            table_repart_decoder,
170        )
171        .into_stream();
172
173        let res = stream.try_collect::<Vec<_>>().await?;
174        Ok(res
175            .into_iter()
176            .map(|(key, value)| (key.table_id, value))
177            .collect())
178    }
179
180    /// Builds a create table repart transaction,
181    /// it expected the `__table_repart/{table_id}` wasn't occupied.
182    pub fn build_create_txn(
183        &self,
184        table_id: TableId,
185        table_repart_value: &TableRepartValue,
186    ) -> Result<(
187        Txn,
188        impl FnOnce(&mut TxnOpGetResponseSet) -> TableRepartValueDecodeResult + use<>,
189    )> {
190        let key = TableRepartKey::new(table_id);
191        let raw_key = key.to_bytes();
192
193        let txn = Txn::put_if_not_exists(raw_key.clone(), table_repart_value.try_as_raw_value()?);
194
195        Ok((
196            txn,
197            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
198        ))
199    }
200
201    /// Builds a update table repart transaction,
202    /// it expected the remote value equals the `current_table_repart_value`.
203    /// It retrieves the latest value if the comparing failed.
204    pub fn build_update_txn(
205        &self,
206        table_id: TableId,
207        current_table_repart_value: &DeserializedValueWithBytes<TableRepartValue>,
208        new_table_repart_value: &TableRepartValue,
209    ) -> Result<(
210        Txn,
211        impl FnOnce(&mut TxnOpGetResponseSet) -> TableRepartValueDecodeResult + use<>,
212    )> {
213        let key = TableRepartKey::new(table_id);
214        let raw_key = key.to_bytes();
215        let raw_value = current_table_repart_value.get_raw_bytes();
216        let new_raw_value: Vec<u8> = new_table_repart_value.try_as_raw_value()?;
217
218        let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
219
220        Ok((
221            txn,
222            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
223        ))
224    }
225
226    /// Returns the [`TableRepartValue`].
227    pub async fn get(&self, table_id: TableId) -> Result<Option<TableRepartValue>> {
228        self.get_inner(table_id).await
229    }
230
231    async fn get_inner(&self, table_id: TableId) -> Result<Option<TableRepartValue>> {
232        let key = TableRepartKey::new(table_id);
233        self.kv_backend
234            .get(&key.to_bytes())
235            .await?
236            .map(|kv| TableRepartValue::try_from_raw_value(&kv.value))
237            .transpose()
238    }
239
240    /// Returns the [`TableRepartValue`] wrapped with [`DeserializedValueWithBytes`].
241    pub async fn get_with_raw_bytes(
242        &self,
243        table_id: TableId,
244    ) -> Result<Option<DeserializedValueWithBytes<TableRepartValue>>> {
245        self.get_with_raw_bytes_inner(table_id).await
246    }
247
248    async fn get_with_raw_bytes_inner(
249        &self,
250        table_id: TableId,
251    ) -> Result<Option<DeserializedValueWithBytes<TableRepartValue>>> {
252        let key = TableRepartKey::new(table_id);
253        self.kv_backend
254            .get(&key.to_bytes())
255            .await?
256            .map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value))
257            .transpose()
258    }
259
260    /// Returns batch of [`TableRepartValue`] that respects the order of `table_ids`.
261    pub async fn batch_get(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRepartValue>>> {
262        let raw_table_reparts = self.batch_get_inner(table_ids).await?;
263
264        Ok(raw_table_reparts
265            .into_iter()
266            .map(|v| v.map(|x| x.inner))
267            .collect())
268    }
269
270    /// Returns batch of [`TableRepartValue`] wrapped with [`DeserializedValueWithBytes`].
271    pub async fn batch_get_with_raw_bytes(
272        &self,
273        table_ids: &[TableId],
274    ) -> Result<Vec<Option<DeserializedValueWithBytes<TableRepartValue>>>> {
275        self.batch_get_inner(table_ids).await
276    }
277
278    async fn batch_get_inner(
279        &self,
280        table_ids: &[TableId],
281    ) -> Result<Vec<Option<DeserializedValueWithBytes<TableRepartValue>>>> {
282        let keys = table_ids
283            .iter()
284            .map(|id| TableRepartKey::new(*id).to_bytes())
285            .collect::<Vec<_>>();
286        let resp = self
287            .kv_backend
288            .batch_get(BatchGetRequest { keys: keys.clone() })
289            .await?;
290
291        let kvs = resp
292            .kvs
293            .into_iter()
294            .map(|kv| (kv.key, kv.value))
295            .collect::<HashMap<_, _>>();
296        keys.into_iter()
297            .map(|key| {
298                if let Some(value) = kvs.get(&key) {
299                    Ok(Some(DeserializedValueWithBytes::from_inner_slice(value)?))
300                } else {
301                    Ok(None)
302                }
303            })
304            .collect()
305    }
306
307    /// Updates mappings from src region to dst regions.
308    /// Should be called once repartition is done.
309    pub async fn update_mappings(
310        &self,
311        table_id: TableId,
312        region_mapping: &HashMap<RegionId, Vec<RegionId>>,
313    ) -> Result<()> {
314        let current = self.get_with_raw_bytes(table_id).await?;
315        let mut new_value = current
316            .as_ref()
317            .map(|c| c.inner.clone())
318            .unwrap_or_else(TableRepartValue::new);
319
320        for (src, dsts) in region_mapping.iter() {
321            new_value.update_mappings(*src, dsts);
322        }
323
324        self.upsert_value(table_id, current, &new_value).await
325    }
326
327    /// Removes mappings from src region to dst regions.
328    /// Should be called once files from src region are cleaned up in dst regions.
329    pub async fn remove_mappings(
330        &self,
331        table_id: TableId,
332        region_mapping: &HashMap<RegionId, Vec<RegionId>>,
333    ) -> Result<()> {
334        let current = self
335            .get_with_raw_bytes(table_id)
336            .await?
337            .context(crate::error::TableRepartNotFoundSnafu { table_id })?;
338
339        let mut new_value = current.inner.clone();
340        for (src, dsts) in region_mapping.iter() {
341            new_value.remove_mappings(*src, dsts);
342        }
343
344        self.upsert_value(table_id, Some(current), &new_value).await
345    }
346
347    /// Upserts the full repartition value with CAS semantics using the caller-provided snapshot.
348    /// If the value is empty and no existing repartition entry is present, it no-ops.
349    pub async fn upsert_value(
350        &self,
351        table_id: TableId,
352        current: Option<DeserializedValueWithBytes<TableRepartValue>>,
353        new_value: &TableRepartValue,
354    ) -> Result<()> {
355        if new_value.src_to_dst.is_empty() && current.is_none() {
356            // Nothing to persist and caller confirmed no existing entry.
357            return Ok(());
358        }
359
360        if let Some(current) = &current {
361            let (txn, _) = self.build_update_txn(table_id, current, new_value)?;
362            let result = self.kv_backend.txn(txn).await?;
363
364            ensure!(
365                result.succeeded,
366                crate::error::MetadataCorruptionSnafu {
367                    err_msg: format!(
368                        "Failed to update repartition mappings for table {}: CAS operation failed",
369                        table_id
370                    ),
371                }
372            );
373        } else {
374            let (txn, _) = self.build_create_txn(table_id, new_value)?;
375            let result = self.kv_backend.txn(txn).await?;
376
377            ensure!(
378                result.succeeded,
379                crate::error::MetadataCorruptionSnafu {
380                    err_msg: format!(
381                        "Failed to create repartition mappings for table {}: CAS operation failed",
382                        table_id
383                    ),
384                }
385            );
386        }
387
388        debug!(
389            "Upserted repartition value for table {}: current: {:?}, new: {:?}",
390            table_id, current, new_value
391        );
392        Ok(())
393    }
394
395    /// Returns the destination regions for a given source region.
396    pub async fn get_dst_regions(
397        &self,
398        src_region: RegionId,
399    ) -> Result<Option<BTreeSet<RegionId>>> {
400        let table_id = src_region.table_id();
401        let table_repart = self.get(table_id).await?;
402        Ok(table_repart.and_then(|repart| repart.src_to_dst.get(&src_region).cloned()))
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use std::collections::BTreeMap;
409    use std::sync::Arc;
410
411    use super::*;
412    use crate::kv_backend::TxnService;
413    use crate::kv_backend::memory::MemoryKvBackend;
414
415    #[test]
416    fn test_table_repart_key_serialization() {
417        let key = TableRepartKey::new(42);
418        let raw_key = key.to_bytes();
419        assert_eq!(raw_key, b"__table_repart/42");
420    }
421
422    #[test]
423    fn test_table_repart_key_deserialization() {
424        let expected = TableRepartKey::new(42);
425        let key = TableRepartKey::from_bytes(b"__table_repart/42").unwrap();
426        assert_eq!(key, expected);
427    }
428
429    #[test]
430    fn test_table_repart_key_deserialization_invalid_utf8() {
431        let result = TableRepartKey::from_bytes(b"__table_repart/\xff");
432        assert!(result.is_err());
433        assert!(
434            result
435                .unwrap_err()
436                .to_string()
437                .contains("not a valid UTF8 string")
438        );
439    }
440
441    #[test]
442    fn test_table_repart_key_deserialization_invalid_format() {
443        let result = TableRepartKey::from_bytes(b"invalid_key_format");
444        assert!(result.is_err());
445        assert!(
446            result
447                .unwrap_err()
448                .to_string()
449                .contains("Invalid TableRepartKey")
450        );
451    }
452
453    #[test]
454    fn test_table_repart_value_serialization_deserialization() {
455        let mut src_to_dst = BTreeMap::new();
456        let src_region = RegionId::new(1, 1);
457        let dst_regions = vec![RegionId::new(1, 2), RegionId::new(1, 3)];
458        src_to_dst.insert(src_region, dst_regions.into_iter().collect());
459
460        let value = TableRepartValue { src_to_dst };
461        let serialized = value.try_as_raw_value().unwrap();
462        let deserialized = TableRepartValue::try_from_raw_value(&serialized).unwrap();
463
464        assert_eq!(value, deserialized);
465    }
466
467    #[test]
468    fn test_table_repart_value_update_mappings_new_src() {
469        let mut value = TableRepartValue {
470            src_to_dst: BTreeMap::new(),
471        };
472
473        let src = RegionId::new(1, 1);
474        let dst = vec![RegionId::new(1, 2), RegionId::new(1, 3)];
475
476        value.update_mappings(src, &dst);
477
478        assert_eq!(value.src_to_dst.len(), 1);
479        assert!(value.src_to_dst.contains_key(&src));
480        assert_eq!(value.src_to_dst.get(&src).unwrap().len(), 2);
481        assert!(
482            value
483                .src_to_dst
484                .get(&src)
485                .unwrap()
486                .contains(&RegionId::new(1, 2))
487        );
488        assert!(
489            value
490                .src_to_dst
491                .get(&src)
492                .unwrap()
493                .contains(&RegionId::new(1, 3))
494        );
495    }
496
497    #[test]
498    fn test_table_repart_value_update_mappings_existing_src() {
499        let mut value = TableRepartValue {
500            src_to_dst: BTreeMap::new(),
501        };
502
503        let src = RegionId::new(1, 1);
504        let initial_dst = vec![RegionId::new(1, 2)];
505        let additional_dst = vec![RegionId::new(1, 3), RegionId::new(1, 4)];
506
507        // Initial mapping
508        value.update_mappings(src, &initial_dst);
509        // Update with additional destinations
510        value.update_mappings(src, &additional_dst);
511
512        assert_eq!(value.src_to_dst.len(), 1);
513        assert_eq!(value.src_to_dst.get(&src).unwrap().len(), 3);
514        assert!(
515            value
516                .src_to_dst
517                .get(&src)
518                .unwrap()
519                .contains(&RegionId::new(1, 2))
520        );
521        assert!(
522            value
523                .src_to_dst
524                .get(&src)
525                .unwrap()
526                .contains(&RegionId::new(1, 3))
527        );
528        assert!(
529            value
530                .src_to_dst
531                .get(&src)
532                .unwrap()
533                .contains(&RegionId::new(1, 4))
534        );
535    }
536
537    #[test]
538    fn test_table_repart_value_remove_mappings_existing() {
539        let mut value = TableRepartValue {
540            src_to_dst: BTreeMap::new(),
541        };
542
543        let src = RegionId::new(1, 1);
544        let dst_regions = vec![
545            RegionId::new(1, 2),
546            RegionId::new(1, 3),
547            RegionId::new(1, 4),
548        ];
549        value.update_mappings(src, &dst_regions);
550
551        // Remove some mappings
552        let to_remove = vec![RegionId::new(1, 2), RegionId::new(1, 3)];
553        value.remove_mappings(src, &to_remove);
554
555        assert_eq!(value.src_to_dst.len(), 1);
556        assert_eq!(value.src_to_dst.get(&src).unwrap().len(), 1);
557        assert!(
558            value
559                .src_to_dst
560                .get(&src)
561                .unwrap()
562                .contains(&RegionId::new(1, 4))
563        );
564    }
565
566    #[test]
567    fn test_table_repart_value_remove_mappings_all() {
568        let mut value = TableRepartValue {
569            src_to_dst: BTreeMap::new(),
570        };
571
572        let src = RegionId::new(1, 1);
573        let dst_regions = vec![RegionId::new(1, 2), RegionId::new(1, 3)];
574        value.update_mappings(src, &dst_regions);
575
576        // Remove all mappings
577        value.remove_mappings(src, &dst_regions);
578
579        assert_eq!(value.src_to_dst.len(), 0);
580    }
581
582    #[test]
583    fn test_table_repart_value_remove_mappings_nonexistent() {
584        let mut value = TableRepartValue {
585            src_to_dst: BTreeMap::new(),
586        };
587
588        let src = RegionId::new(1, 1);
589        let dst_regions = vec![RegionId::new(1, 2)];
590        value.update_mappings(src, &dst_regions);
591
592        // Try to remove non-existent mappings
593        let nonexistent_dst = vec![RegionId::new(1, 3), RegionId::new(1, 4)];
594        value.remove_mappings(src, &nonexistent_dst);
595
596        // Should remain unchanged
597        assert_eq!(value.src_to_dst.len(), 1);
598        assert_eq!(value.src_to_dst.get(&src).unwrap().len(), 1);
599        assert!(
600            value
601                .src_to_dst
602                .get(&src)
603                .unwrap()
604                .contains(&RegionId::new(1, 2))
605        );
606    }
607
608    #[test]
609    fn test_table_repart_value_remove_mappings_nonexistent_src() {
610        let mut value = TableRepartValue {
611            src_to_dst: BTreeMap::new(),
612        };
613
614        let src = RegionId::new(1, 1);
615        let dst_regions = vec![RegionId::new(1, 2)];
616
617        // Try to remove mappings for non-existent source
618        value.remove_mappings(src, &dst_regions);
619
620        // Should remain empty
621        assert_eq!(value.src_to_dst.len(), 0);
622    }
623
624    #[tokio::test]
625    async fn test_table_repart_manager_get_empty() {
626        let kv = Arc::new(MemoryKvBackend::default());
627        let manager = TableRepartManager::new(kv);
628        let result = manager.get(1024).await.unwrap();
629        assert!(result.is_none());
630    }
631
632    #[tokio::test]
633    async fn test_table_repart_manager_get_with_raw_bytes_empty() {
634        let kv = Arc::new(MemoryKvBackend::default());
635        let manager = TableRepartManager::new(kv);
636        let result = manager.get_with_raw_bytes(1024).await.unwrap();
637        assert!(result.is_none());
638    }
639
640    #[tokio::test]
641    async fn test_table_repart_manager_create_and_get() {
642        let kv = Arc::new(MemoryKvBackend::default());
643        let manager = TableRepartManager::new(kv.clone());
644
645        let mut src_to_dst = BTreeMap::new();
646        let src_region = RegionId::new(1, 1);
647        let dst_regions = vec![RegionId::new(1, 2), RegionId::new(1, 3)];
648        src_to_dst.insert(src_region, dst_regions.into_iter().collect());
649
650        let value = TableRepartValue { src_to_dst };
651
652        // Create the table repart
653        let (txn, _) = manager.build_create_txn(1024, &value).unwrap();
654        let result = kv.txn(txn).await.unwrap();
655        assert!(result.succeeded);
656
657        // Get the table repart
658        let retrieved = manager.get(1024).await.unwrap().unwrap();
659        assert_eq!(retrieved, value);
660    }
661
662    #[tokio::test]
663    async fn test_table_repart_manager_update_txn() {
664        let kv = Arc::new(MemoryKvBackend::default());
665        let manager = TableRepartManager::new(kv.clone());
666
667        let initial_value = TableRepartValue {
668            src_to_dst: BTreeMap::new(),
669        };
670
671        // Create initial table repart
672        let (create_txn, _) = manager.build_create_txn(1024, &initial_value).unwrap();
673        let result = kv.txn(create_txn).await.unwrap();
674        assert!(result.succeeded);
675
676        // Get current value with raw bytes
677        let current_value = manager.get_with_raw_bytes(1024).await.unwrap().unwrap();
678
679        // Create updated value
680        let mut updated_src_to_dst = BTreeMap::new();
681        let src_region = RegionId::new(1, 1);
682        let dst_regions = vec![RegionId::new(1, 2)];
683        updated_src_to_dst.insert(src_region, dst_regions.into_iter().collect());
684        let updated_value = TableRepartValue {
685            src_to_dst: updated_src_to_dst,
686        };
687
688        // Build update transaction
689        let (update_txn, _) = manager
690            .build_update_txn(1024, &current_value, &updated_value)
691            .unwrap();
692        let result = kv.txn(update_txn).await.unwrap();
693        assert!(result.succeeded);
694
695        // Verify update
696        let retrieved = manager.get(1024).await.unwrap().unwrap();
697        assert_eq!(retrieved, updated_value);
698    }
699
700    #[tokio::test]
701    async fn test_table_repart_manager_batch_get() {
702        let kv = Arc::new(MemoryKvBackend::default());
703        let manager = TableRepartManager::new(kv.clone());
704
705        // Create multiple table reparts
706        let table_reparts = vec![
707            (
708                1024,
709                TableRepartValue {
710                    src_to_dst: {
711                        let mut map = BTreeMap::new();
712                        map.insert(
713                            RegionId::new(1, 1),
714                            vec![RegionId::new(1, 2)].into_iter().collect(),
715                        );
716                        map
717                    },
718                },
719            ),
720            (
721                1025,
722                TableRepartValue {
723                    src_to_dst: {
724                        let mut map = BTreeMap::new();
725                        map.insert(
726                            RegionId::new(2, 1),
727                            vec![RegionId::new(2, 2), RegionId::new(2, 3)]
728                                .into_iter()
729                                .collect(),
730                        );
731                        map
732                    },
733                },
734            ),
735        ];
736
737        for (table_id, value) in &table_reparts {
738            let (txn, _) = manager.build_create_txn(*table_id, value).unwrap();
739            let result = kv.txn(txn).await.unwrap();
740            assert!(result.succeeded);
741        }
742
743        // Batch get
744        let results = manager.batch_get(&[1024, 1025, 1026]).await.unwrap();
745        assert_eq!(results.len(), 3);
746        assert_eq!(results[0].as_ref().unwrap(), &table_reparts[0].1);
747        assert_eq!(results[1].as_ref().unwrap(), &table_reparts[1].1);
748        assert!(results[2].is_none());
749    }
750
751    #[tokio::test]
752    async fn test_table_repart_manager_update_mappings() {
753        let kv = Arc::new(MemoryKvBackend::default());
754        let manager = TableRepartManager::new(kv.clone());
755
756        // Create initial table repart
757        let initial_value = TableRepartValue {
758            src_to_dst: BTreeMap::new(),
759        };
760        let (txn, _) = manager.build_create_txn(1024, &initial_value).unwrap();
761        let result = kv.txn(txn).await.unwrap();
762        assert!(result.succeeded);
763
764        // Update mappings
765        let src = RegionId::new(1024, 1);
766        let dst = vec![RegionId::new(1024, 2), RegionId::new(1024, 3)];
767        let region_mapping = HashMap::from([(src, dst)]);
768        manager
769            .update_mappings(1024, &region_mapping)
770            .await
771            .unwrap();
772
773        // Verify update
774        let retrieved = manager.get(1024).await.unwrap().unwrap();
775        assert_eq!(retrieved.src_to_dst.len(), 1);
776        assert!(retrieved.src_to_dst.contains_key(&src));
777        assert_eq!(retrieved.src_to_dst.get(&src).unwrap().len(), 2);
778    }
779
780    #[tokio::test]
781    async fn test_table_repart_manager_remove_mappings() {
782        let kv = Arc::new(MemoryKvBackend::default());
783        let manager = TableRepartManager::new(kv.clone());
784
785        // Create initial table repart with mappings
786        let mut initial_src_to_dst = BTreeMap::new();
787        let src = RegionId::new(1024, 1);
788        let dst_regions = vec![
789            RegionId::new(1024, 2),
790            RegionId::new(1024, 3),
791            RegionId::new(1024, 4),
792        ];
793        initial_src_to_dst.insert(src, dst_regions.into_iter().collect());
794
795        let initial_value = TableRepartValue {
796            src_to_dst: initial_src_to_dst,
797        };
798        let (txn, _) = manager.build_create_txn(1024, &initial_value).unwrap();
799        let result = kv.txn(txn).await.unwrap();
800        assert!(result.succeeded);
801
802        // Remove some mappings
803        let to_remove = vec![RegionId::new(1024, 2), RegionId::new(1024, 3)];
804        let region_mapping = HashMap::from([(src, to_remove)]);
805        manager
806            .remove_mappings(1024, &region_mapping)
807            .await
808            .unwrap();
809
810        // Verify removal
811        let retrieved = manager.get(1024).await.unwrap().unwrap();
812        assert_eq!(retrieved.src_to_dst.len(), 1);
813        assert_eq!(retrieved.src_to_dst.get(&src).unwrap().len(), 1);
814        assert!(
815            retrieved
816                .src_to_dst
817                .get(&src)
818                .unwrap()
819                .contains(&RegionId::new(1024, 4))
820        );
821    }
822
823    #[tokio::test]
824    async fn test_table_repart_manager_get_dst_regions() {
825        let kv = Arc::new(MemoryKvBackend::default());
826        let manager = TableRepartManager::new(kv.clone());
827
828        // Create initial table repart with mappings
829        let mut initial_src_to_dst = BTreeMap::new();
830        let src = RegionId::new(1024, 1);
831        let dst_regions = vec![RegionId::new(1024, 2), RegionId::new(1024, 3)];
832        initial_src_to_dst.insert(src, dst_regions.into_iter().collect());
833
834        let initial_value = TableRepartValue {
835            src_to_dst: initial_src_to_dst,
836        };
837        let (txn, _) = manager.build_create_txn(1024, &initial_value).unwrap();
838        let result = kv.txn(txn).await.unwrap();
839        assert!(result.succeeded);
840
841        // Get destination regions
842        let dst_regions = manager.get_dst_regions(src).await.unwrap();
843        assert!(dst_regions.is_some());
844        let dst_set = dst_regions.unwrap();
845        assert_eq!(dst_set.len(), 2);
846        assert!(dst_set.contains(&RegionId::new(1024, 2)));
847        assert!(dst_set.contains(&RegionId::new(1024, 3)));
848
849        // Test non-existent source region
850        let nonexistent_src = RegionId::new(1024, 99);
851        let result = manager.get_dst_regions(nonexistent_src).await.unwrap();
852        assert!(result.is_none());
853    }
854
855    #[tokio::test]
856    async fn test_table_repart_manager_operations_on_nonexistent_table() {
857        let kv = Arc::new(MemoryKvBackend::default());
858        let manager = TableRepartManager::new(kv);
859
860        let src = RegionId::new(1024, 1);
861        let dst = vec![RegionId::new(1024, 2)];
862
863        // Try to remove mappings on non-existent table
864        let region_mapping = HashMap::from([(src, dst.clone())]);
865        let result = manager.remove_mappings(1024, &region_mapping).await;
866        assert!(result.is_err());
867        let err_msg = result.unwrap_err().to_string();
868        assert!(
869            err_msg.contains("Failed to find table repartition metadata for table id 1024"),
870            "{err_msg}"
871        );
872
873        // Try to update mappings on non-existent table
874        let region_mapping = HashMap::from([(src, dst)]);
875        manager
876            .update_mappings(1024, &region_mapping)
877            .await
878            .unwrap();
879    }
880
881    #[tokio::test]
882    async fn test_table_repart_manager_batch_get_with_raw_bytes() {
883        let kv = Arc::new(MemoryKvBackend::default());
884        let manager = TableRepartManager::new(kv.clone());
885
886        // Create table repart
887        let value = TableRepartValue {
888            src_to_dst: {
889                let mut map = BTreeMap::new();
890                map.insert(
891                    RegionId::new(1, 1),
892                    vec![RegionId::new(1, 2)].into_iter().collect(),
893                );
894                map
895            },
896        };
897        let (txn, _) = manager.build_create_txn(1024, &value).unwrap();
898        let result = kv.txn(txn).await.unwrap();
899        assert!(result.succeeded);
900
901        // Batch get with raw bytes
902        let results = manager
903            .batch_get_with_raw_bytes(&[1024, 1025])
904            .await
905            .unwrap();
906        assert_eq!(results.len(), 2);
907        assert!(results[0].is_some());
908        assert!(results[1].is_none());
909
910        let retrieved = &results[0].as_ref().unwrap().inner;
911        assert_eq!(retrieved, &value);
912    }
913}