1use std::collections::HashMap;
16
17use common_telemetry::debug;
18use futures_util::stream::BoxStream;
19use snafu::ensure;
20
21use crate::error::{self, Result};
22use crate::key::TABLE_NAME_KEY_PREFIX;
23use crate::key::txn_helper::TxnOpGetResponseSet;
24use crate::kv_backend::KvBackendRef;
25use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
26use crate::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
27use crate::rpc::KeyValue;
28use crate::rpc::store::{BatchDeleteRequest, BatchGetRequest, RangeRequest};
29
30pub struct TombstoneManager {
54 kv_backend: KvBackendRef,
55 tombstone_prefix: String,
56 #[cfg(test)]
58 max_txn_ops: Option<usize>,
59}
60
61const TOMBSTONE_PREFIX: &str = "__tombstone/";
62
63impl TombstoneManager {
64 pub fn new(kv_backend: KvBackendRef) -> Self {
66 Self::new_with_prefix(kv_backend, TOMBSTONE_PREFIX)
67 }
68
69 pub fn new_with_prefix(kv_backend: KvBackendRef, prefix: &str) -> Self {
71 Self {
72 kv_backend,
73 tombstone_prefix: prefix.to_string(),
74 #[cfg(test)]
75 max_txn_ops: None,
76 }
77 }
78
79 pub fn to_tombstone(&self, key: &[u8]) -> Vec<u8> {
80 [self.tombstone_prefix.as_bytes(), key].concat()
81 }
82
83 pub fn strip_tombstone_prefix<'a>(&self, tombstone_key: &'a [u8]) -> Result<&'a [u8]> {
85 ensure!(
86 tombstone_key.starts_with(self.tombstone_prefix.as_bytes()),
87 error::UnexpectedSnafu {
88 err_msg: format!(
89 "The key '{}' does not start with tombstone prefix '{}'.",
90 String::from_utf8_lossy(tombstone_key),
91 self.tombstone_prefix
92 ),
93 }
94 );
95
96 Ok(&tombstone_key[self.tombstone_prefix.len()..])
97 }
98
99 pub async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>> {
101 let tombstone_key = self.to_tombstone(key);
102 self.kv_backend.get(&tombstone_key).await
103 }
104
105 pub async fn batch_get(&self, keys: &[Vec<u8>]) -> Result<HashMap<Vec<u8>, KeyValue>> {
107 let tombstone_keys = keys
108 .iter()
109 .map(|key| self.to_tombstone(key))
110 .collect::<Vec<_>>();
111 let resp = self
112 .kv_backend
113 .batch_get(BatchGetRequest::new().with_keys(tombstone_keys))
114 .await?;
115
116 resp.kvs
117 .into_iter()
118 .map(|kv| Ok((self.strip_tombstone_prefix(&kv.key)?.to_vec(), kv)))
119 .collect::<Result<HashMap<_, _>>>()
120 }
121
122 pub fn tombstones(&self) -> BoxStream<'static, Result<KeyValue>> {
124 self.scan_prefix(self.tombstone_prefix.as_bytes().to_vec())
125 }
126
127 pub fn tombstoned_table_names(&self) -> BoxStream<'static, Result<KeyValue>> {
129 self.scan_prefix(
130 format!("{}{}/", self.tombstone_prefix, TABLE_NAME_KEY_PREFIX).into_bytes(),
131 )
132 }
133
134 fn scan_prefix(&self, prefix: Vec<u8>) -> BoxStream<'static, Result<KeyValue>> {
136 let req = RangeRequest::new().with_prefix(prefix);
137 let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, Ok)
138 .into_stream();
139
140 Box::pin(stream)
141 }
142
143 #[cfg(test)]
144 pub fn set_max_txn_ops(&mut self, max_txn_ops: usize) {
145 self.max_txn_ops = Some(max_txn_ops);
146 }
147
148 fn build_move_value_txn(
154 &self,
155 src_key: Vec<u8>,
156 value: Vec<u8>,
157 dest_key: Vec<u8>,
158 ) -> (Txn, impl FnMut(&mut TxnOpGetResponseSet) -> Option<Vec<u8>>) {
159 let txn = Txn::new()
160 .when(vec![Compare::with_value(
161 src_key.clone(),
162 CompareOp::Equal,
163 value.clone(),
164 )])
165 .and_then(vec![
166 TxnOp::Put(dest_key.clone(), value.clone()),
167 TxnOp::Delete(src_key.clone()),
168 ])
169 .or_else(vec![TxnOp::Get(src_key.clone())]);
170
171 (txn, TxnOpGetResponseSet::filter(src_key))
172 }
173
174 async fn move_values_inner(&self, keys: &[Vec<u8>], dest_keys: &[Vec<u8>]) -> Result<usize> {
175 ensure!(
176 keys.len() == dest_keys.len(),
177 error::UnexpectedSnafu {
178 err_msg: format!(
179 "The length of keys({}) does not match the length of dest_keys({}).",
180 keys.len(),
181 dest_keys.len()
182 ),
183 }
184 );
185 let lookup_table = keys.iter().zip(dest_keys.iter()).collect::<HashMap<_, _>>();
187
188 let resp = self
189 .kv_backend
190 .batch_get(BatchGetRequest::new().with_keys(keys.to_vec()))
191 .await?;
192 let mut results = resp
193 .kvs
194 .into_iter()
195 .map(|kv| (kv.key, kv.value))
196 .collect::<HashMap<_, _>>();
197
198 const MAX_RETRIES: usize = 8;
199 for _ in 0..MAX_RETRIES {
200 let (txns, (keys, filters)): (Vec<_>, (Vec<_>, Vec<_>)) = results
201 .iter()
202 .map(|(key, value)| {
203 let (txn, filter) = self.build_move_value_txn(
204 key.clone(),
205 value.clone(),
206 lookup_table[&key].clone(),
207 );
208 (txn, (key.clone(), filter))
209 })
210 .unzip();
211 let mut resp = self.kv_backend.txn(Txn::merge_all(txns)).await?;
212 if resp.succeeded {
213 return Ok(keys.len());
214 }
215 let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
216 for (idx, mut filter) in filters.into_iter().enumerate() {
218 if let Some(value) = filter(&mut set) {
219 results.insert(keys[idx].clone(), value);
220 } else {
221 results.remove(&keys[idx]);
222 }
223 }
224 }
225
226 error::MoveValuesSnafu {
227 err_msg: format!(
228 "keys: {:?}",
229 keys.iter().map(|key| String::from_utf8_lossy(key)),
230 ),
231 }
232 .fail()
233 }
234
235 fn max_txn_ops(&self) -> usize {
236 #[cfg(test)]
237 if let Some(max_txn_ops) = self.max_txn_ops {
238 return max_txn_ops;
239 }
240 self.kv_backend.max_txn_ops()
241 }
242
243 async fn move_values(&self, keys: Vec<Vec<u8>>, dest_keys: Vec<Vec<u8>>) -> Result<usize> {
247 ensure!(
248 keys.len() == dest_keys.len(),
249 error::UnexpectedSnafu {
250 err_msg: format!(
251 "The length of keys({}) does not match the length of dest_keys({}).",
252 keys.len(),
253 dest_keys.len()
254 ),
255 }
256 );
257 if keys.is_empty() {
258 return Ok(0);
259 }
260 let chunk_size = self.max_txn_ops() / 2;
261 if keys.len() > chunk_size {
262 debug!(
263 "Moving values with multiple chunks, keys len: {}, chunk_size: {}",
264 keys.len(),
265 chunk_size
266 );
267 let mut moved_keys = 0;
268 let keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
269 let dest_keys_chunks = dest_keys.chunks(chunk_size).collect::<Vec<_>>();
270 for (keys, dest_keys) in keys_chunks.into_iter().zip(dest_keys_chunks) {
271 moved_keys += self.move_values_inner(keys, dest_keys).await?;
272 }
273 Ok(moved_keys)
274 } else {
275 self.move_values_inner(&keys, &dest_keys).await
276 }
277 }
278
279 pub async fn create(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
287 let (keys, dest_keys): (Vec<_>, Vec<_>) = keys
288 .into_iter()
289 .map(|key| {
290 let tombstone_key = self.to_tombstone(&key);
291 (key, tombstone_key)
292 })
293 .unzip();
294
295 self.move_values(keys, dest_keys).await
296 }
297
298 pub async fn restore(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
306 let (keys, dest_keys): (Vec<_>, Vec<_>) = keys
307 .into_iter()
308 .map(|key| {
309 let tombstone_key = self.to_tombstone(&key);
310 (tombstone_key, key)
311 })
312 .unzip();
313
314 self.move_values(keys, dest_keys).await
315 }
316
317 pub async fn delete(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
321 let keys = keys
322 .iter()
323 .map(|key| self.to_tombstone(key))
324 .collect::<Vec<_>>();
325
326 let num_keys = keys.len();
327 let _ = self
328 .kv_backend
329 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
330 .await?;
331
332 Ok(num_keys)
333 }
334}
335
336#[cfg(test)]
337mod tests {
338
339 use std::collections::HashMap;
340 use std::sync::Arc;
341
342 use crate::error::Error;
343 use crate::key::tombstone::TombstoneManager;
344 use crate::kv_backend::KvBackend;
345 use crate::kv_backend::memory::MemoryKvBackend;
346 use crate::rpc::store::PutRequest;
347
348 #[derive(Debug, Clone)]
349 struct MoveValue {
350 key: Vec<u8>,
351 dest_key: Vec<u8>,
352 value: Vec<u8>,
353 }
354
355 async fn check_moved_values(
356 kv_backend: Arc<MemoryKvBackend<Error>>,
357 move_values: &[MoveValue],
358 ) {
359 for MoveValue {
360 key,
361 dest_key,
362 value,
363 } in move_values
364 {
365 assert!(kv_backend.get(key).await.unwrap().is_none());
366 assert_eq!(
367 &kv_backend.get(dest_key).await.unwrap().unwrap().value,
368 value,
369 );
370 }
371 }
372
373 #[tokio::test]
374 async fn test_create_tombstone() {
375 let kv_backend = Arc::new(MemoryKvBackend::default());
376 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
377 kv_backend
378 .put(PutRequest::new().with_key("bar").with_value("baz"))
379 .await
380 .unwrap();
381 kv_backend
382 .put(PutRequest::new().with_key("foo").with_value("hi"))
383 .await
384 .unwrap();
385 tombstone_manager
386 .create(vec![b"bar".to_vec(), b"foo".to_vec()])
387 .await
388 .unwrap();
389 assert!(!kv_backend.exists(b"bar").await.unwrap());
390 assert!(!kv_backend.exists(b"foo").await.unwrap());
391 assert_eq!(
392 kv_backend
393 .get(&tombstone_manager.to_tombstone(b"bar"))
394 .await
395 .unwrap()
396 .unwrap()
397 .value,
398 b"baz"
399 );
400 assert_eq!(
401 kv_backend
402 .get(&tombstone_manager.to_tombstone(b"foo"))
403 .await
404 .unwrap()
405 .unwrap()
406 .value,
407 b"hi"
408 );
409 assert_eq!(kv_backend.len(), 2);
410 }
411
412 #[tokio::test]
413 async fn test_create_tombstone_with_non_exist_values() {
414 let kv_backend = Arc::new(MemoryKvBackend::default());
415 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
416
417 kv_backend
418 .put(PutRequest::new().with_key("bar").with_value("baz"))
419 .await
420 .unwrap();
421 kv_backend
422 .put(PutRequest::new().with_key("foo").with_value("hi"))
423 .await
424 .unwrap();
425
426 tombstone_manager
427 .create(vec![b"bar".to_vec(), b"baz".to_vec()])
428 .await
429 .unwrap();
430 check_moved_values(
431 kv_backend.clone(),
432 &[MoveValue {
433 key: b"bar".to_vec(),
434 dest_key: tombstone_manager.to_tombstone(b"bar"),
435 value: b"baz".to_vec(),
436 }],
437 )
438 .await;
439 }
440
441 #[tokio::test]
442 async fn test_restore_tombstone() {
443 let kv_backend = Arc::new(MemoryKvBackend::default());
444 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
445 kv_backend
446 .put(PutRequest::new().with_key("bar").with_value("baz"))
447 .await
448 .unwrap();
449 kv_backend
450 .put(PutRequest::new().with_key("foo").with_value("hi"))
451 .await
452 .unwrap();
453 let expected_kvs = kv_backend.dump();
454 tombstone_manager
455 .create(vec![b"bar".to_vec(), b"foo".to_vec()])
456 .await
457 .unwrap();
458 tombstone_manager
459 .restore(vec![b"bar".to_vec(), b"foo".to_vec()])
460 .await
461 .unwrap();
462 assert_eq!(expected_kvs, kv_backend.dump());
463 }
464
465 #[tokio::test]
466 async fn test_delete_tombstone() {
467 let kv_backend = Arc::new(MemoryKvBackend::default());
468 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
469 kv_backend
470 .put(PutRequest::new().with_key("bar").with_value("baz"))
471 .await
472 .unwrap();
473 kv_backend
474 .put(PutRequest::new().with_key("foo").with_value("hi"))
475 .await
476 .unwrap();
477 tombstone_manager
478 .create(vec![b"bar".to_vec(), b"foo".to_vec()])
479 .await
480 .unwrap();
481 tombstone_manager
482 .delete(vec![b"bar".to_vec(), b"foo".to_vec()])
483 .await
484 .unwrap();
485 assert!(kv_backend.is_empty());
486 }
487
488 #[tokio::test]
489 async fn test_batch_get_tombstones() {
490 let kv_backend = Arc::new(MemoryKvBackend::default());
491 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
492 kv_backend
493 .put(PutRequest::new().with_key("bar").with_value("baz"))
494 .await
495 .unwrap();
496 kv_backend
497 .put(PutRequest::new().with_key("foo").with_value("hi"))
498 .await
499 .unwrap();
500
501 tombstone_manager
502 .create(vec![b"bar".to_vec(), b"foo".to_vec()])
503 .await
504 .unwrap();
505
506 let kvs = tombstone_manager
507 .batch_get(&[b"bar".to_vec(), b"foo".to_vec(), b"missing".to_vec()])
508 .await
509 .unwrap();
510
511 assert_eq!(kvs.len(), 2);
512 assert_eq!(kvs.get(b"bar".as_slice()).unwrap().value, b"baz");
513 assert_eq!(kvs.get(b"foo".as_slice()).unwrap().value, b"hi");
514 assert!(!kvs.contains_key(b"missing".as_slice()));
515 }
516
517 #[tokio::test]
518 async fn test_move_values() {
519 let kv_backend = Arc::new(MemoryKvBackend::default());
520 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
521 let kvs = HashMap::from([
522 (b"bar".to_vec(), b"baz".to_vec()),
523 (b"foo".to_vec(), b"hi".to_vec()),
524 (b"baz".to_vec(), b"hello".to_vec()),
525 ]);
526 for (key, value) in &kvs {
527 kv_backend
528 .put(
529 PutRequest::new()
530 .with_key(key.clone())
531 .with_value(value.clone()),
532 )
533 .await
534 .unwrap();
535 }
536 let move_values = kvs
537 .iter()
538 .map(|(key, value)| MoveValue {
539 key: key.clone(),
540 dest_key: tombstone_manager.to_tombstone(key),
541 value: value.clone(),
542 })
543 .collect::<Vec<_>>();
544 let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
545 .clone()
546 .into_iter()
547 .map(|kv| (kv.key, kv.dest_key))
548 .unzip();
549 let moved_keys = tombstone_manager
550 .move_values(keys.clone(), dest_keys.clone())
551 .await
552 .unwrap();
553 assert_eq!(kvs.len(), moved_keys);
554 check_moved_values(kv_backend.clone(), &move_values).await;
555 let moved_keys = tombstone_manager
557 .move_values(keys.clone(), dest_keys.clone())
558 .await
559 .unwrap();
560 assert_eq!(0, moved_keys);
561 check_moved_values(kv_backend.clone(), &move_values).await;
562 }
563
564 #[tokio::test]
565 async fn test_move_values_with_max_txn_ops() {
566 common_telemetry::init_default_ut_logging();
567 let kv_backend = Arc::new(MemoryKvBackend::default());
568 let mut tombstone_manager = TombstoneManager::new(kv_backend.clone());
569 tombstone_manager.set_max_txn_ops(4);
570 let kvs = HashMap::from([
571 (b"bar".to_vec(), b"baz".to_vec()),
572 (b"foo".to_vec(), b"hi".to_vec()),
573 (b"baz".to_vec(), b"hello".to_vec()),
574 (b"qux".to_vec(), b"world".to_vec()),
575 (b"quux".to_vec(), b"world".to_vec()),
576 (b"quuux".to_vec(), b"world".to_vec()),
577 (b"quuuux".to_vec(), b"world".to_vec()),
578 (b"quuuuux".to_vec(), b"world".to_vec()),
579 (b"quuuuuux".to_vec(), b"world".to_vec()),
580 ]);
581 for (key, value) in &kvs {
582 kv_backend
583 .put(
584 PutRequest::new()
585 .with_key(key.clone())
586 .with_value(value.clone()),
587 )
588 .await
589 .unwrap();
590 }
591 let move_values = kvs
592 .iter()
593 .map(|(key, value)| MoveValue {
594 key: key.clone(),
595 dest_key: tombstone_manager.to_tombstone(key),
596 value: value.clone(),
597 })
598 .collect::<Vec<_>>();
599 let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
600 .clone()
601 .into_iter()
602 .map(|kv| (kv.key, kv.dest_key))
603 .unzip();
604 let moved_keys = tombstone_manager
605 .move_values(keys.clone(), dest_keys.clone())
606 .await
607 .unwrap();
608 assert_eq!(kvs.len(), moved_keys);
609 check_moved_values(kv_backend.clone(), &move_values).await;
610 let moved_keys = tombstone_manager
612 .move_values(keys.clone(), dest_keys.clone())
613 .await
614 .unwrap();
615 assert_eq!(0, moved_keys);
616 check_moved_values(kv_backend.clone(), &move_values).await;
617 }
618
619 #[tokio::test]
620 async fn test_move_values_with_non_exists_values() {
621 let kv_backend = Arc::new(MemoryKvBackend::default());
622 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
623 let kvs = HashMap::from([
624 (b"bar".to_vec(), b"baz".to_vec()),
625 (b"foo".to_vec(), b"hi".to_vec()),
626 (b"baz".to_vec(), b"hello".to_vec()),
627 ]);
628 for (key, value) in &kvs {
629 kv_backend
630 .put(
631 PutRequest::new()
632 .with_key(key.clone())
633 .with_value(value.clone()),
634 )
635 .await
636 .unwrap();
637 }
638 let move_values = kvs
639 .iter()
640 .map(|(key, value)| MoveValue {
641 key: key.clone(),
642 dest_key: tombstone_manager.to_tombstone(key),
643 value: value.clone(),
644 })
645 .collect::<Vec<_>>();
646 let (mut keys, mut dest_keys): (Vec<_>, Vec<_>) = move_values
647 .clone()
648 .into_iter()
649 .map(|kv| (kv.key, kv.dest_key))
650 .unzip();
651 keys.push(b"non-exists".to_vec());
652 dest_keys.push(b"hi/non-exists".to_vec());
653 let moved_keys = tombstone_manager
654 .move_values(keys.clone(), dest_keys.clone())
655 .await
656 .unwrap();
657 check_moved_values(kv_backend.clone(), &move_values).await;
658 assert_eq!(3, moved_keys);
659 let moved_keys = tombstone_manager
661 .move_values(keys.clone(), dest_keys.clone())
662 .await
663 .unwrap();
664 check_moved_values(kv_backend.clone(), &move_values).await;
665 assert_eq!(0, moved_keys);
666 }
667
668 #[tokio::test]
669 async fn test_move_values_changed() {
670 let kv_backend = Arc::new(MemoryKvBackend::default());
671 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
672 let kvs = HashMap::from([
673 (b"bar".to_vec(), b"baz".to_vec()),
674 (b"foo".to_vec(), b"hi".to_vec()),
675 (b"baz".to_vec(), b"hello".to_vec()),
676 ]);
677 for (key, value) in &kvs {
678 kv_backend
679 .put(
680 PutRequest::new()
681 .with_key(key.clone())
682 .with_value(value.clone()),
683 )
684 .await
685 .unwrap();
686 }
687
688 kv_backend
689 .put(PutRequest::new().with_key("baz").with_value("changed"))
690 .await
691 .unwrap();
692
693 let move_values = kvs
694 .iter()
695 .map(|(key, value)| MoveValue {
696 key: key.clone(),
697 dest_key: tombstone_manager.to_tombstone(key),
698 value: value.clone(),
699 })
700 .collect::<Vec<_>>();
701 let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
702 .clone()
703 .into_iter()
704 .map(|kv| (kv.key, kv.dest_key))
705 .unzip();
706 let moved_keys = tombstone_manager
707 .move_values(keys, dest_keys)
708 .await
709 .unwrap();
710 assert_eq!(kvs.len(), moved_keys);
711 }
712
713 #[tokio::test]
714 async fn test_move_values_overwrite_dest_values() {
715 let kv_backend = Arc::new(MemoryKvBackend::default());
716 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
717 let kvs = HashMap::from([
718 (b"bar".to_vec(), b"baz".to_vec()),
719 (b"foo".to_vec(), b"hi".to_vec()),
720 (b"baz".to_vec(), b"hello".to_vec()),
721 ]);
722 for (key, value) in &kvs {
723 kv_backend
724 .put(
725 PutRequest::new()
726 .with_key(key.clone())
727 .with_value(value.clone()),
728 )
729 .await
730 .unwrap();
731 }
732
733 let move_values = kvs
735 .iter()
736 .map(|(key, value)| MoveValue {
737 key: key.clone(),
738 dest_key: tombstone_manager.to_tombstone(key),
739 value: value.clone(),
740 })
741 .collect::<Vec<_>>();
742 let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
743 .clone()
744 .into_iter()
745 .map(|kv| (kv.key, kv.dest_key))
746 .unzip();
747 tombstone_manager
748 .move_values(keys, dest_keys)
749 .await
750 .unwrap();
751 check_moved_values(kv_backend.clone(), &move_values).await;
752
753 let kvs = HashMap::from([
755 (b"bar".to_vec(), b"new baz".to_vec()),
756 (b"foo".to_vec(), b"new hi".to_vec()),
757 (b"baz".to_vec(), b"new baz".to_vec()),
758 ]);
759 for (key, value) in &kvs {
760 kv_backend
761 .put(
762 PutRequest::new()
763 .with_key(key.clone())
764 .with_value(value.clone()),
765 )
766 .await
767 .unwrap();
768 }
769 let move_values = kvs
770 .iter()
771 .map(|(key, value)| MoveValue {
772 key: key.clone(),
773 dest_key: tombstone_manager.to_tombstone(key),
774 value: value.clone(),
775 })
776 .collect::<Vec<_>>();
777 let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
778 .clone()
779 .into_iter()
780 .map(|kv| (kv.key, kv.dest_key))
781 .unzip();
782 tombstone_manager
783 .move_values(keys, dest_keys)
784 .await
785 .unwrap();
786 check_moved_values(kv_backend.clone(), &move_values).await;
787 }
788
789 #[tokio::test]
790 async fn test_move_values_with_different_lengths() {
791 let kv_backend = Arc::new(MemoryKvBackend::default());
792 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
793
794 let keys = vec![b"bar".to_vec(), b"foo".to_vec()];
795 let dest_keys = vec![b"bar".to_vec(), b"foo".to_vec(), b"baz".to_vec()];
796
797 let err = tombstone_manager
798 .move_values(keys, dest_keys)
799 .await
800 .unwrap_err();
801 assert!(
802 err.to_string()
803 .contains("The length of keys(2) does not match the length of dest_keys(3)."),
804 );
805
806 let moved_keys = tombstone_manager.move_values(vec![], vec![]).await.unwrap();
807 assert_eq!(0, moved_keys);
808 }
809}