Skip to main content

common_meta/key/
tombstone.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use common_telemetry::debug;
18use futures_util::stream::BoxStream;
19use snafu::ensure;
20
21use crate::error::{self, Result};
22use crate::key::TABLE_NAME_KEY_PREFIX;
23use crate::key::txn_helper::TxnOpGetResponseSet;
24use crate::kv_backend::KvBackendRef;
25use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
26use crate::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
27use crate::rpc::KeyValue;
28use crate::rpc::store::{BatchDeleteRequest, BatchGetRequest, RangeRequest};
29
30/// [TombstoneManager] provides the ability to:
31/// - logically delete values
32/// - restore the deleted values
33///
34/// The tombstone mechanism is primarily used for table metadata deletion.
35/// When a table is logically deleted, its associated metadata keys are moved
36/// to a tombstone area by prepending a prefix (default is `__tombstone/`).
37///
38/// All involved keys in the tombstone mechanism:
39/// - `__table_name/{catalog}/{schema}/{table_name}`: Maps table name to table ID.
40/// - `__table_info/{table_id}`: Stores table information/metadata.
41/// - `__table_route/{table_id}`: Stores table routing information.
42/// - `__table_repart/{table_id}`: Stores table repartition information.
43/// - `__dn_table/{datanode_id}/{table_id}`: Maps datanodes to table regions.
44/// - `__topic_region/{topic_name}/{region_id}`: Maps regions to WAL topics.
45///
46/// These keys are moved to:
47/// - `__tombstone/__table_name/...`
48/// - `__tombstone/__table_info/...`
49/// - `__tombstone/__table_route/...`
50/// - `__tombstone/__table_repart/...`
51/// - `__tombstone/__dn_table/...`
52/// - `__tombstone/__topic_region/...`
53pub struct TombstoneManager {
54    kv_backend: KvBackendRef,
55    tombstone_prefix: String,
56    // Only used for testing.
57    #[cfg(test)]
58    max_txn_ops: Option<usize>,
59}
60
61const TOMBSTONE_PREFIX: &str = "__tombstone/";
62
63impl TombstoneManager {
64    /// Returns [TombstoneManager].
65    pub fn new(kv_backend: KvBackendRef) -> Self {
66        Self::new_with_prefix(kv_backend, TOMBSTONE_PREFIX)
67    }
68
69    /// Returns [TombstoneManager] with a custom tombstone prefix.
70    pub fn new_with_prefix(kv_backend: KvBackendRef, prefix: &str) -> Self {
71        Self {
72            kv_backend,
73            tombstone_prefix: prefix.to_string(),
74            #[cfg(test)]
75            max_txn_ops: None,
76        }
77    }
78
79    pub fn to_tombstone(&self, key: &[u8]) -> Vec<u8> {
80        [self.tombstone_prefix.as_bytes(), key].concat()
81    }
82
83    /// Removes the tombstone prefix from a tombstoned key.
84    pub fn strip_tombstone_prefix<'a>(&self, tombstone_key: &'a [u8]) -> Result<&'a [u8]> {
85        ensure!(
86            tombstone_key.starts_with(self.tombstone_prefix.as_bytes()),
87            error::UnexpectedSnafu {
88                err_msg: format!(
89                    "The key '{}' does not start with tombstone prefix '{}'.",
90                    String::from_utf8_lossy(tombstone_key),
91                    self.tombstone_prefix
92                ),
93            }
94        );
95
96        Ok(&tombstone_key[self.tombstone_prefix.len()..])
97    }
98
99    /// Gets a single tombstoned value by its original key.
100    pub async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>> {
101        let tombstone_key = self.to_tombstone(key);
102        self.kv_backend.get(&tombstone_key).await
103    }
104
105    /// Gets tombstoned values by their original keys.
106    pub async fn batch_get(&self, keys: &[Vec<u8>]) -> Result<HashMap<Vec<u8>, KeyValue>> {
107        let tombstone_keys = keys
108            .iter()
109            .map(|key| self.to_tombstone(key))
110            .collect::<Vec<_>>();
111        let resp = self
112            .kv_backend
113            .batch_get(BatchGetRequest::new().with_keys(tombstone_keys))
114            .await?;
115
116        resp.kvs
117            .into_iter()
118            .map(|kv| Ok((self.strip_tombstone_prefix(&kv.key)?.to_vec(), kv)))
119            .collect::<Result<HashMap<_, _>>>()
120    }
121
122    /// Streams all tombstoned key-value pairs.
123    pub fn tombstones(&self) -> BoxStream<'static, Result<KeyValue>> {
124        self.scan_prefix(self.tombstone_prefix.as_bytes().to_vec())
125    }
126
127    /// Streams tombstoned table-name entries only.
128    pub fn tombstoned_table_names(&self) -> BoxStream<'static, Result<KeyValue>> {
129        self.scan_prefix(
130            format!("{}{}/", self.tombstone_prefix, TABLE_NAME_KEY_PREFIX).into_bytes(),
131        )
132    }
133
134    /// Streams tombstoned entries under the provided prefix.
135    fn scan_prefix(&self, prefix: Vec<u8>) -> BoxStream<'static, Result<KeyValue>> {
136        let req = RangeRequest::new().with_prefix(prefix);
137        let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, Ok)
138            .into_stream();
139
140        Box::pin(stream)
141    }
142
143    #[cfg(test)]
144    pub fn set_max_txn_ops(&mut self, max_txn_ops: usize) {
145        self.max_txn_ops = Some(max_txn_ops);
146    }
147
148    /// Moves value to `dest_key`.
149    ///
150    /// Puts `value` to `dest_key` if the value of `src_key` equals `value`.
151    ///
152    /// Otherwise retrieves the value of `src_key`.
153    fn build_move_value_txn(
154        &self,
155        src_key: Vec<u8>,
156        value: Vec<u8>,
157        dest_key: Vec<u8>,
158    ) -> (Txn, impl FnMut(&mut TxnOpGetResponseSet) -> Option<Vec<u8>>) {
159        let txn = Txn::new()
160            .when(vec![Compare::with_value(
161                src_key.clone(),
162                CompareOp::Equal,
163                value.clone(),
164            )])
165            .and_then(vec![
166                TxnOp::Put(dest_key.clone(), value.clone()),
167                TxnOp::Delete(src_key.clone()),
168            ])
169            .or_else(vec![TxnOp::Get(src_key.clone())]);
170
171        (txn, TxnOpGetResponseSet::filter(src_key))
172    }
173
174    async fn move_values_inner(&self, keys: &[Vec<u8>], dest_keys: &[Vec<u8>]) -> Result<usize> {
175        ensure!(
176            keys.len() == dest_keys.len(),
177            error::UnexpectedSnafu {
178                err_msg: format!(
179                    "The length of keys({}) does not match the length of dest_keys({}).",
180                    keys.len(),
181                    dest_keys.len()
182                ),
183            }
184        );
185        // The key -> dest key mapping.
186        let lookup_table = keys.iter().zip(dest_keys.iter()).collect::<HashMap<_, _>>();
187
188        let resp = self
189            .kv_backend
190            .batch_get(BatchGetRequest::new().with_keys(keys.to_vec()))
191            .await?;
192        let mut results = resp
193            .kvs
194            .into_iter()
195            .map(|kv| (kv.key, kv.value))
196            .collect::<HashMap<_, _>>();
197
198        const MAX_RETRIES: usize = 8;
199        for _ in 0..MAX_RETRIES {
200            let (txns, (keys, filters)): (Vec<_>, (Vec<_>, Vec<_>)) = results
201                .iter()
202                .map(|(key, value)| {
203                    let (txn, filter) = self.build_move_value_txn(
204                        key.clone(),
205                        value.clone(),
206                        lookup_table[&key].clone(),
207                    );
208                    (txn, (key.clone(), filter))
209                })
210                .unzip();
211            let mut resp = self.kv_backend.txn(Txn::merge_all(txns)).await?;
212            if resp.succeeded {
213                return Ok(keys.len());
214            }
215            let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
216            // Updates results.
217            for (idx, mut filter) in filters.into_iter().enumerate() {
218                if let Some(value) = filter(&mut set) {
219                    results.insert(keys[idx].clone(), value);
220                } else {
221                    results.remove(&keys[idx]);
222                }
223            }
224        }
225
226        error::MoveValuesSnafu {
227            err_msg: format!(
228                "keys: {:?}",
229                keys.iter().map(|key| String::from_utf8_lossy(key)),
230            ),
231        }
232        .fail()
233    }
234
235    fn max_txn_ops(&self) -> usize {
236        #[cfg(test)]
237        if let Some(max_txn_ops) = self.max_txn_ops {
238            return max_txn_ops;
239        }
240        self.kv_backend.max_txn_ops()
241    }
242
243    /// Moves values to `dest_key`.
244    ///
245    /// Returns the number of keys that were moved.
246    async fn move_values(&self, keys: Vec<Vec<u8>>, dest_keys: Vec<Vec<u8>>) -> Result<usize> {
247        ensure!(
248            keys.len() == dest_keys.len(),
249            error::UnexpectedSnafu {
250                err_msg: format!(
251                    "The length of keys({}) does not match the length of dest_keys({}).",
252                    keys.len(),
253                    dest_keys.len()
254                ),
255            }
256        );
257        if keys.is_empty() {
258            return Ok(0);
259        }
260        let chunk_size = self.max_txn_ops() / 2;
261        if keys.len() > chunk_size {
262            debug!(
263                "Moving values with multiple chunks, keys len: {}, chunk_size: {}",
264                keys.len(),
265                chunk_size
266            );
267            let mut moved_keys = 0;
268            let keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
269            let dest_keys_chunks = dest_keys.chunks(chunk_size).collect::<Vec<_>>();
270            for (keys, dest_keys) in keys_chunks.into_iter().zip(dest_keys_chunks) {
271                moved_keys += self.move_values_inner(keys, dest_keys).await?;
272            }
273            Ok(moved_keys)
274        } else {
275            self.move_values_inner(&keys, &dest_keys).await
276        }
277    }
278
279    /// Creates tombstones for keys.
280    ///
281    /// Preforms to:
282    /// - deletes origin values.
283    /// - stores tombstone values.
284    ///
285    /// Returns the number of keys that were moved.
286    pub async fn create(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
287        let (keys, dest_keys): (Vec<_>, Vec<_>) = keys
288            .into_iter()
289            .map(|key| {
290                let tombstone_key = self.to_tombstone(&key);
291                (key, tombstone_key)
292            })
293            .unzip();
294
295        self.move_values(keys, dest_keys).await
296    }
297
298    /// Restores tombstones for keys.
299    ///
300    /// Preforms to:
301    /// - restore origin value.
302    /// - deletes tombstone values.
303    ///
304    /// Returns the number of keys that were restored.
305    pub async fn restore(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
306        let (keys, dest_keys): (Vec<_>, Vec<_>) = keys
307            .into_iter()
308            .map(|key| {
309                let tombstone_key = self.to_tombstone(&key);
310                (tombstone_key, key)
311            })
312            .unzip();
313
314        self.move_values(keys, dest_keys).await
315    }
316
317    /// Deletes tombstones values for the specified `keys`.
318    ///
319    /// Returns the number of keys that were deleted.
320    pub async fn delete(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
321        let keys = keys
322            .iter()
323            .map(|key| self.to_tombstone(key))
324            .collect::<Vec<_>>();
325
326        let num_keys = keys.len();
327        let _ = self
328            .kv_backend
329            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
330            .await?;
331
332        Ok(num_keys)
333    }
334}
335
336#[cfg(test)]
337mod tests {
338
339    use std::collections::HashMap;
340    use std::sync::Arc;
341
342    use crate::error::Error;
343    use crate::key::tombstone::TombstoneManager;
344    use crate::kv_backend::KvBackend;
345    use crate::kv_backend::memory::MemoryKvBackend;
346    use crate::rpc::store::PutRequest;
347
348    #[derive(Debug, Clone)]
349    struct MoveValue {
350        key: Vec<u8>,
351        dest_key: Vec<u8>,
352        value: Vec<u8>,
353    }
354
355    async fn check_moved_values(
356        kv_backend: Arc<MemoryKvBackend<Error>>,
357        move_values: &[MoveValue],
358    ) {
359        for MoveValue {
360            key,
361            dest_key,
362            value,
363        } in move_values
364        {
365            assert!(kv_backend.get(key).await.unwrap().is_none());
366            assert_eq!(
367                &kv_backend.get(dest_key).await.unwrap().unwrap().value,
368                value,
369            );
370        }
371    }
372
373    #[tokio::test]
374    async fn test_create_tombstone() {
375        let kv_backend = Arc::new(MemoryKvBackend::default());
376        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
377        kv_backend
378            .put(PutRequest::new().with_key("bar").with_value("baz"))
379            .await
380            .unwrap();
381        kv_backend
382            .put(PutRequest::new().with_key("foo").with_value("hi"))
383            .await
384            .unwrap();
385        tombstone_manager
386            .create(vec![b"bar".to_vec(), b"foo".to_vec()])
387            .await
388            .unwrap();
389        assert!(!kv_backend.exists(b"bar").await.unwrap());
390        assert!(!kv_backend.exists(b"foo").await.unwrap());
391        assert_eq!(
392            kv_backend
393                .get(&tombstone_manager.to_tombstone(b"bar"))
394                .await
395                .unwrap()
396                .unwrap()
397                .value,
398            b"baz"
399        );
400        assert_eq!(
401            kv_backend
402                .get(&tombstone_manager.to_tombstone(b"foo"))
403                .await
404                .unwrap()
405                .unwrap()
406                .value,
407            b"hi"
408        );
409        assert_eq!(kv_backend.len(), 2);
410    }
411
412    #[tokio::test]
413    async fn test_create_tombstone_with_non_exist_values() {
414        let kv_backend = Arc::new(MemoryKvBackend::default());
415        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
416
417        kv_backend
418            .put(PutRequest::new().with_key("bar").with_value("baz"))
419            .await
420            .unwrap();
421        kv_backend
422            .put(PutRequest::new().with_key("foo").with_value("hi"))
423            .await
424            .unwrap();
425
426        tombstone_manager
427            .create(vec![b"bar".to_vec(), b"baz".to_vec()])
428            .await
429            .unwrap();
430        check_moved_values(
431            kv_backend.clone(),
432            &[MoveValue {
433                key: b"bar".to_vec(),
434                dest_key: tombstone_manager.to_tombstone(b"bar"),
435                value: b"baz".to_vec(),
436            }],
437        )
438        .await;
439    }
440
441    #[tokio::test]
442    async fn test_restore_tombstone() {
443        let kv_backend = Arc::new(MemoryKvBackend::default());
444        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
445        kv_backend
446            .put(PutRequest::new().with_key("bar").with_value("baz"))
447            .await
448            .unwrap();
449        kv_backend
450            .put(PutRequest::new().with_key("foo").with_value("hi"))
451            .await
452            .unwrap();
453        let expected_kvs = kv_backend.dump();
454        tombstone_manager
455            .create(vec![b"bar".to_vec(), b"foo".to_vec()])
456            .await
457            .unwrap();
458        tombstone_manager
459            .restore(vec![b"bar".to_vec(), b"foo".to_vec()])
460            .await
461            .unwrap();
462        assert_eq!(expected_kvs, kv_backend.dump());
463    }
464
465    #[tokio::test]
466    async fn test_delete_tombstone() {
467        let kv_backend = Arc::new(MemoryKvBackend::default());
468        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
469        kv_backend
470            .put(PutRequest::new().with_key("bar").with_value("baz"))
471            .await
472            .unwrap();
473        kv_backend
474            .put(PutRequest::new().with_key("foo").with_value("hi"))
475            .await
476            .unwrap();
477        tombstone_manager
478            .create(vec![b"bar".to_vec(), b"foo".to_vec()])
479            .await
480            .unwrap();
481        tombstone_manager
482            .delete(vec![b"bar".to_vec(), b"foo".to_vec()])
483            .await
484            .unwrap();
485        assert!(kv_backend.is_empty());
486    }
487
488    #[tokio::test]
489    async fn test_batch_get_tombstones() {
490        let kv_backend = Arc::new(MemoryKvBackend::default());
491        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
492        kv_backend
493            .put(PutRequest::new().with_key("bar").with_value("baz"))
494            .await
495            .unwrap();
496        kv_backend
497            .put(PutRequest::new().with_key("foo").with_value("hi"))
498            .await
499            .unwrap();
500
501        tombstone_manager
502            .create(vec![b"bar".to_vec(), b"foo".to_vec()])
503            .await
504            .unwrap();
505
506        let kvs = tombstone_manager
507            .batch_get(&[b"bar".to_vec(), b"foo".to_vec(), b"missing".to_vec()])
508            .await
509            .unwrap();
510
511        assert_eq!(kvs.len(), 2);
512        assert_eq!(kvs.get(b"bar".as_slice()).unwrap().value, b"baz");
513        assert_eq!(kvs.get(b"foo".as_slice()).unwrap().value, b"hi");
514        assert!(!kvs.contains_key(b"missing".as_slice()));
515    }
516
517    #[tokio::test]
518    async fn test_move_values() {
519        let kv_backend = Arc::new(MemoryKvBackend::default());
520        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
521        let kvs = HashMap::from([
522            (b"bar".to_vec(), b"baz".to_vec()),
523            (b"foo".to_vec(), b"hi".to_vec()),
524            (b"baz".to_vec(), b"hello".to_vec()),
525        ]);
526        for (key, value) in &kvs {
527            kv_backend
528                .put(
529                    PutRequest::new()
530                        .with_key(key.clone())
531                        .with_value(value.clone()),
532                )
533                .await
534                .unwrap();
535        }
536        let move_values = kvs
537            .iter()
538            .map(|(key, value)| MoveValue {
539                key: key.clone(),
540                dest_key: tombstone_manager.to_tombstone(key),
541                value: value.clone(),
542            })
543            .collect::<Vec<_>>();
544        let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
545            .clone()
546            .into_iter()
547            .map(|kv| (kv.key, kv.dest_key))
548            .unzip();
549        let moved_keys = tombstone_manager
550            .move_values(keys.clone(), dest_keys.clone())
551            .await
552            .unwrap();
553        assert_eq!(kvs.len(), moved_keys);
554        check_moved_values(kv_backend.clone(), &move_values).await;
555        // Moves again
556        let moved_keys = tombstone_manager
557            .move_values(keys.clone(), dest_keys.clone())
558            .await
559            .unwrap();
560        assert_eq!(0, moved_keys);
561        check_moved_values(kv_backend.clone(), &move_values).await;
562    }
563
564    #[tokio::test]
565    async fn test_move_values_with_max_txn_ops() {
566        common_telemetry::init_default_ut_logging();
567        let kv_backend = Arc::new(MemoryKvBackend::default());
568        let mut tombstone_manager = TombstoneManager::new(kv_backend.clone());
569        tombstone_manager.set_max_txn_ops(4);
570        let kvs = HashMap::from([
571            (b"bar".to_vec(), b"baz".to_vec()),
572            (b"foo".to_vec(), b"hi".to_vec()),
573            (b"baz".to_vec(), b"hello".to_vec()),
574            (b"qux".to_vec(), b"world".to_vec()),
575            (b"quux".to_vec(), b"world".to_vec()),
576            (b"quuux".to_vec(), b"world".to_vec()),
577            (b"quuuux".to_vec(), b"world".to_vec()),
578            (b"quuuuux".to_vec(), b"world".to_vec()),
579            (b"quuuuuux".to_vec(), b"world".to_vec()),
580        ]);
581        for (key, value) in &kvs {
582            kv_backend
583                .put(
584                    PutRequest::new()
585                        .with_key(key.clone())
586                        .with_value(value.clone()),
587                )
588                .await
589                .unwrap();
590        }
591        let move_values = kvs
592            .iter()
593            .map(|(key, value)| MoveValue {
594                key: key.clone(),
595                dest_key: tombstone_manager.to_tombstone(key),
596                value: value.clone(),
597            })
598            .collect::<Vec<_>>();
599        let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
600            .clone()
601            .into_iter()
602            .map(|kv| (kv.key, kv.dest_key))
603            .unzip();
604        let moved_keys = tombstone_manager
605            .move_values(keys.clone(), dest_keys.clone())
606            .await
607            .unwrap();
608        assert_eq!(kvs.len(), moved_keys);
609        check_moved_values(kv_backend.clone(), &move_values).await;
610        // Moves again
611        let moved_keys = tombstone_manager
612            .move_values(keys.clone(), dest_keys.clone())
613            .await
614            .unwrap();
615        assert_eq!(0, moved_keys);
616        check_moved_values(kv_backend.clone(), &move_values).await;
617    }
618
619    #[tokio::test]
620    async fn test_move_values_with_non_exists_values() {
621        let kv_backend = Arc::new(MemoryKvBackend::default());
622        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
623        let kvs = HashMap::from([
624            (b"bar".to_vec(), b"baz".to_vec()),
625            (b"foo".to_vec(), b"hi".to_vec()),
626            (b"baz".to_vec(), b"hello".to_vec()),
627        ]);
628        for (key, value) in &kvs {
629            kv_backend
630                .put(
631                    PutRequest::new()
632                        .with_key(key.clone())
633                        .with_value(value.clone()),
634                )
635                .await
636                .unwrap();
637        }
638        let move_values = kvs
639            .iter()
640            .map(|(key, value)| MoveValue {
641                key: key.clone(),
642                dest_key: tombstone_manager.to_tombstone(key),
643                value: value.clone(),
644            })
645            .collect::<Vec<_>>();
646        let (mut keys, mut dest_keys): (Vec<_>, Vec<_>) = move_values
647            .clone()
648            .into_iter()
649            .map(|kv| (kv.key, kv.dest_key))
650            .unzip();
651        keys.push(b"non-exists".to_vec());
652        dest_keys.push(b"hi/non-exists".to_vec());
653        let moved_keys = tombstone_manager
654            .move_values(keys.clone(), dest_keys.clone())
655            .await
656            .unwrap();
657        check_moved_values(kv_backend.clone(), &move_values).await;
658        assert_eq!(3, moved_keys);
659        // Moves again
660        let moved_keys = tombstone_manager
661            .move_values(keys.clone(), dest_keys.clone())
662            .await
663            .unwrap();
664        check_moved_values(kv_backend.clone(), &move_values).await;
665        assert_eq!(0, moved_keys);
666    }
667
668    #[tokio::test]
669    async fn test_move_values_changed() {
670        let kv_backend = Arc::new(MemoryKvBackend::default());
671        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
672        let kvs = HashMap::from([
673            (b"bar".to_vec(), b"baz".to_vec()),
674            (b"foo".to_vec(), b"hi".to_vec()),
675            (b"baz".to_vec(), b"hello".to_vec()),
676        ]);
677        for (key, value) in &kvs {
678            kv_backend
679                .put(
680                    PutRequest::new()
681                        .with_key(key.clone())
682                        .with_value(value.clone()),
683                )
684                .await
685                .unwrap();
686        }
687
688        kv_backend
689            .put(PutRequest::new().with_key("baz").with_value("changed"))
690            .await
691            .unwrap();
692
693        let move_values = kvs
694            .iter()
695            .map(|(key, value)| MoveValue {
696                key: key.clone(),
697                dest_key: tombstone_manager.to_tombstone(key),
698                value: value.clone(),
699            })
700            .collect::<Vec<_>>();
701        let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
702            .clone()
703            .into_iter()
704            .map(|kv| (kv.key, kv.dest_key))
705            .unzip();
706        let moved_keys = tombstone_manager
707            .move_values(keys, dest_keys)
708            .await
709            .unwrap();
710        assert_eq!(kvs.len(), moved_keys);
711    }
712
713    #[tokio::test]
714    async fn test_move_values_overwrite_dest_values() {
715        let kv_backend = Arc::new(MemoryKvBackend::default());
716        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
717        let kvs = HashMap::from([
718            (b"bar".to_vec(), b"baz".to_vec()),
719            (b"foo".to_vec(), b"hi".to_vec()),
720            (b"baz".to_vec(), b"hello".to_vec()),
721        ]);
722        for (key, value) in &kvs {
723            kv_backend
724                .put(
725                    PutRequest::new()
726                        .with_key(key.clone())
727                        .with_value(value.clone()),
728                )
729                .await
730                .unwrap();
731        }
732
733        // Prepares
734        let move_values = kvs
735            .iter()
736            .map(|(key, value)| MoveValue {
737                key: key.clone(),
738                dest_key: tombstone_manager.to_tombstone(key),
739                value: value.clone(),
740            })
741            .collect::<Vec<_>>();
742        let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
743            .clone()
744            .into_iter()
745            .map(|kv| (kv.key, kv.dest_key))
746            .unzip();
747        tombstone_manager
748            .move_values(keys, dest_keys)
749            .await
750            .unwrap();
751        check_moved_values(kv_backend.clone(), &move_values).await;
752
753        // Overwrites existing dest keys.
754        let kvs = HashMap::from([
755            (b"bar".to_vec(), b"new baz".to_vec()),
756            (b"foo".to_vec(), b"new hi".to_vec()),
757            (b"baz".to_vec(), b"new baz".to_vec()),
758        ]);
759        for (key, value) in &kvs {
760            kv_backend
761                .put(
762                    PutRequest::new()
763                        .with_key(key.clone())
764                        .with_value(value.clone()),
765                )
766                .await
767                .unwrap();
768        }
769        let move_values = kvs
770            .iter()
771            .map(|(key, value)| MoveValue {
772                key: key.clone(),
773                dest_key: tombstone_manager.to_tombstone(key),
774                value: value.clone(),
775            })
776            .collect::<Vec<_>>();
777        let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
778            .clone()
779            .into_iter()
780            .map(|kv| (kv.key, kv.dest_key))
781            .unzip();
782        tombstone_manager
783            .move_values(keys, dest_keys)
784            .await
785            .unwrap();
786        check_moved_values(kv_backend.clone(), &move_values).await;
787    }
788
789    #[tokio::test]
790    async fn test_move_values_with_different_lengths() {
791        let kv_backend = Arc::new(MemoryKvBackend::default());
792        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
793
794        let keys = vec![b"bar".to_vec(), b"foo".to_vec()];
795        let dest_keys = vec![b"bar".to_vec(), b"foo".to_vec(), b"baz".to_vec()];
796
797        let err = tombstone_manager
798            .move_values(keys, dest_keys)
799            .await
800            .unwrap_err();
801        assert!(
802            err.to_string()
803                .contains("The length of keys(2) does not match the length of dest_keys(3)."),
804        );
805
806        let moved_keys = tombstone_manager.move_values(vec![], vec![]).await.unwrap();
807        assert_eq!(0, moved_keys);
808    }
809}