1use std::collections::HashMap;
16
17use common_telemetry::debug;
18use snafu::ensure;
19
20use crate::error::{self, Result};
21use crate::key::txn_helper::TxnOpGetResponseSet;
22use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
23use crate::kv_backend::KvBackendRef;
24use crate::rpc::store::{BatchDeleteRequest, BatchGetRequest};
25
26pub struct TombstoneManager {
30 kv_backend: KvBackendRef,
31 tombstone_prefix: String,
32 #[cfg(test)]
34 max_txn_ops: Option<usize>,
35}
36
37const TOMBSTONE_PREFIX: &str = "__tombstone/";
38
39impl TombstoneManager {
40 pub fn new(kv_backend: KvBackendRef) -> Self {
42 Self::new_with_prefix(kv_backend, TOMBSTONE_PREFIX)
43 }
44
45 pub fn new_with_prefix(kv_backend: KvBackendRef, prefix: &str) -> Self {
47 Self {
48 kv_backend,
49 tombstone_prefix: prefix.to_string(),
50 #[cfg(test)]
51 max_txn_ops: None,
52 }
53 }
54
55 pub fn to_tombstone(&self, key: &[u8]) -> Vec<u8> {
56 [self.tombstone_prefix.as_bytes(), key].concat()
57 }
58
59 #[cfg(test)]
60 pub fn set_max_txn_ops(&mut self, max_txn_ops: usize) {
61 self.max_txn_ops = Some(max_txn_ops);
62 }
63
64 fn build_move_value_txn(
70 &self,
71 src_key: Vec<u8>,
72 value: Vec<u8>,
73 dest_key: Vec<u8>,
74 ) -> (Txn, impl FnMut(&mut TxnOpGetResponseSet) -> Option<Vec<u8>>) {
75 let txn = Txn::new()
76 .when(vec![Compare::with_value(
77 src_key.clone(),
78 CompareOp::Equal,
79 value.clone(),
80 )])
81 .and_then(vec![
82 TxnOp::Put(dest_key.clone(), value.clone()),
83 TxnOp::Delete(src_key.clone()),
84 ])
85 .or_else(vec![TxnOp::Get(src_key.clone())]);
86
87 (txn, TxnOpGetResponseSet::filter(src_key))
88 }
89
90 async fn move_values_inner(&self, keys: &[Vec<u8>], dest_keys: &[Vec<u8>]) -> Result<usize> {
91 ensure!(
92 keys.len() == dest_keys.len(),
93 error::UnexpectedSnafu {
94 err_msg: format!(
95 "The length of keys({}) does not match the length of dest_keys({}).",
96 keys.len(),
97 dest_keys.len()
98 ),
99 }
100 );
101 let lookup_table = keys.iter().zip(dest_keys.iter()).collect::<HashMap<_, _>>();
103
104 let resp = self
105 .kv_backend
106 .batch_get(BatchGetRequest::new().with_keys(keys.to_vec()))
107 .await?;
108 let mut results = resp
109 .kvs
110 .into_iter()
111 .map(|kv| (kv.key, kv.value))
112 .collect::<HashMap<_, _>>();
113
114 const MAX_RETRIES: usize = 8;
115 for _ in 0..MAX_RETRIES {
116 let (txns, (keys, filters)): (Vec<_>, (Vec<_>, Vec<_>)) = results
117 .iter()
118 .map(|(key, value)| {
119 let (txn, filter) = self.build_move_value_txn(
120 key.clone(),
121 value.clone(),
122 lookup_table[&key].clone(),
123 );
124 (txn, (key.clone(), filter))
125 })
126 .unzip();
127 let mut resp = self.kv_backend.txn(Txn::merge_all(txns)).await?;
128 if resp.succeeded {
129 return Ok(keys.len());
130 }
131 let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
132 for (idx, mut filter) in filters.into_iter().enumerate() {
134 if let Some(value) = filter(&mut set) {
135 results.insert(keys[idx].clone(), value);
136 } else {
137 results.remove(&keys[idx]);
138 }
139 }
140 }
141
142 error::MoveValuesSnafu {
143 err_msg: format!(
144 "keys: {:?}",
145 keys.iter().map(|key| String::from_utf8_lossy(key)),
146 ),
147 }
148 .fail()
149 }
150
151 fn max_txn_ops(&self) -> usize {
152 #[cfg(test)]
153 if let Some(max_txn_ops) = self.max_txn_ops {
154 return max_txn_ops;
155 }
156 self.kv_backend.max_txn_ops()
157 }
158
159 async fn move_values(&self, keys: Vec<Vec<u8>>, dest_keys: Vec<Vec<u8>>) -> Result<usize> {
163 ensure!(
164 keys.len() == dest_keys.len(),
165 error::UnexpectedSnafu {
166 err_msg: format!(
167 "The length of keys({}) does not match the length of dest_keys({}).",
168 keys.len(),
169 dest_keys.len()
170 ),
171 }
172 );
173 if keys.is_empty() {
174 return Ok(0);
175 }
176 let chunk_size = self.max_txn_ops() / 2;
177 if keys.len() > chunk_size {
178 debug!(
179 "Moving values with multiple chunks, keys len: {}, chunk_size: {}",
180 keys.len(),
181 chunk_size
182 );
183 let mut moved_keys = 0;
184 let keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
185 let dest_keys_chunks = dest_keys.chunks(chunk_size).collect::<Vec<_>>();
186 for (keys, dest_keys) in keys_chunks.into_iter().zip(dest_keys_chunks) {
187 moved_keys += self.move_values_inner(keys, dest_keys).await?;
188 }
189 Ok(moved_keys)
190 } else {
191 self.move_values_inner(&keys, &dest_keys).await
192 }
193 }
194
195 pub async fn create(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
203 let (keys, dest_keys): (Vec<_>, Vec<_>) = keys
204 .into_iter()
205 .map(|key| {
206 let tombstone_key = self.to_tombstone(&key);
207 (key, tombstone_key)
208 })
209 .unzip();
210
211 self.move_values(keys, dest_keys).await
212 }
213
214 pub async fn restore(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
222 let (keys, dest_keys): (Vec<_>, Vec<_>) = keys
223 .into_iter()
224 .map(|key| {
225 let tombstone_key = self.to_tombstone(&key);
226 (tombstone_key, key)
227 })
228 .unzip();
229
230 self.move_values(keys, dest_keys).await
231 }
232
233 pub async fn delete(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
237 let keys = keys
238 .iter()
239 .map(|key| self.to_tombstone(key))
240 .collect::<Vec<_>>();
241
242 let num_keys = keys.len();
243 let _ = self
244 .kv_backend
245 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
246 .await?;
247
248 Ok(num_keys)
249 }
250}
251
252#[cfg(test)]
253mod tests {
254
255 use std::collections::HashMap;
256 use std::sync::Arc;
257
258 use crate::error::Error;
259 use crate::key::tombstone::TombstoneManager;
260 use crate::kv_backend::memory::MemoryKvBackend;
261 use crate::kv_backend::KvBackend;
262 use crate::rpc::store::PutRequest;
263
264 #[derive(Debug, Clone)]
265 struct MoveValue {
266 key: Vec<u8>,
267 dest_key: Vec<u8>,
268 value: Vec<u8>,
269 }
270
271 async fn check_moved_values(
272 kv_backend: Arc<MemoryKvBackend<Error>>,
273 move_values: &[MoveValue],
274 ) {
275 for MoveValue {
276 key,
277 dest_key,
278 value,
279 } in move_values
280 {
281 assert!(kv_backend.get(key).await.unwrap().is_none());
282 assert_eq!(
283 &kv_backend.get(dest_key).await.unwrap().unwrap().value,
284 value,
285 );
286 }
287 }
288
289 #[tokio::test]
290 async fn test_create_tombstone() {
291 let kv_backend = Arc::new(MemoryKvBackend::default());
292 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
293 kv_backend
294 .put(PutRequest::new().with_key("bar").with_value("baz"))
295 .await
296 .unwrap();
297 kv_backend
298 .put(PutRequest::new().with_key("foo").with_value("hi"))
299 .await
300 .unwrap();
301 tombstone_manager
302 .create(vec![b"bar".to_vec(), b"foo".to_vec()])
303 .await
304 .unwrap();
305 assert!(!kv_backend.exists(b"bar").await.unwrap());
306 assert!(!kv_backend.exists(b"foo").await.unwrap());
307 assert_eq!(
308 kv_backend
309 .get(&tombstone_manager.to_tombstone(b"bar"))
310 .await
311 .unwrap()
312 .unwrap()
313 .value,
314 b"baz"
315 );
316 assert_eq!(
317 kv_backend
318 .get(&tombstone_manager.to_tombstone(b"foo"))
319 .await
320 .unwrap()
321 .unwrap()
322 .value,
323 b"hi"
324 );
325 assert_eq!(kv_backend.len(), 2);
326 }
327
328 #[tokio::test]
329 async fn test_create_tombstone_with_non_exist_values() {
330 let kv_backend = Arc::new(MemoryKvBackend::default());
331 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
332
333 kv_backend
334 .put(PutRequest::new().with_key("bar").with_value("baz"))
335 .await
336 .unwrap();
337 kv_backend
338 .put(PutRequest::new().with_key("foo").with_value("hi"))
339 .await
340 .unwrap();
341
342 tombstone_manager
343 .create(vec![b"bar".to_vec(), b"baz".to_vec()])
344 .await
345 .unwrap();
346 check_moved_values(
347 kv_backend.clone(),
348 &[MoveValue {
349 key: b"bar".to_vec(),
350 dest_key: tombstone_manager.to_tombstone(b"bar"),
351 value: b"baz".to_vec(),
352 }],
353 )
354 .await;
355 }
356
357 #[tokio::test]
358 async fn test_restore_tombstone() {
359 let kv_backend = Arc::new(MemoryKvBackend::default());
360 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
361 kv_backend
362 .put(PutRequest::new().with_key("bar").with_value("baz"))
363 .await
364 .unwrap();
365 kv_backend
366 .put(PutRequest::new().with_key("foo").with_value("hi"))
367 .await
368 .unwrap();
369 let expected_kvs = kv_backend.dump();
370 tombstone_manager
371 .create(vec![b"bar".to_vec(), b"foo".to_vec()])
372 .await
373 .unwrap();
374 tombstone_manager
375 .restore(vec![b"bar".to_vec(), b"foo".to_vec()])
376 .await
377 .unwrap();
378 assert_eq!(expected_kvs, kv_backend.dump());
379 }
380
381 #[tokio::test]
382 async fn test_delete_tombstone() {
383 let kv_backend = Arc::new(MemoryKvBackend::default());
384 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
385 kv_backend
386 .put(PutRequest::new().with_key("bar").with_value("baz"))
387 .await
388 .unwrap();
389 kv_backend
390 .put(PutRequest::new().with_key("foo").with_value("hi"))
391 .await
392 .unwrap();
393 tombstone_manager
394 .create(vec![b"bar".to_vec(), b"foo".to_vec()])
395 .await
396 .unwrap();
397 tombstone_manager
398 .delete(vec![b"bar".to_vec(), b"foo".to_vec()])
399 .await
400 .unwrap();
401 assert!(kv_backend.is_empty());
402 }
403
404 #[tokio::test]
405 async fn test_move_values() {
406 let kv_backend = Arc::new(MemoryKvBackend::default());
407 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
408 let kvs = HashMap::from([
409 (b"bar".to_vec(), b"baz".to_vec()),
410 (b"foo".to_vec(), b"hi".to_vec()),
411 (b"baz".to_vec(), b"hello".to_vec()),
412 ]);
413 for (key, value) in &kvs {
414 kv_backend
415 .put(
416 PutRequest::new()
417 .with_key(key.clone())
418 .with_value(value.clone()),
419 )
420 .await
421 .unwrap();
422 }
423 let move_values = kvs
424 .iter()
425 .map(|(key, value)| MoveValue {
426 key: key.clone(),
427 dest_key: tombstone_manager.to_tombstone(key),
428 value: value.clone(),
429 })
430 .collect::<Vec<_>>();
431 let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
432 .clone()
433 .into_iter()
434 .map(|kv| (kv.key, kv.dest_key))
435 .unzip();
436 let moved_keys = tombstone_manager
437 .move_values(keys.clone(), dest_keys.clone())
438 .await
439 .unwrap();
440 assert_eq!(kvs.len(), moved_keys);
441 check_moved_values(kv_backend.clone(), &move_values).await;
442 let moved_keys = tombstone_manager
444 .move_values(keys.clone(), dest_keys.clone())
445 .await
446 .unwrap();
447 assert_eq!(0, moved_keys);
448 check_moved_values(kv_backend.clone(), &move_values).await;
449 }
450
451 #[tokio::test]
452 async fn test_move_values_with_max_txn_ops() {
453 common_telemetry::init_default_ut_logging();
454 let kv_backend = Arc::new(MemoryKvBackend::default());
455 let mut tombstone_manager = TombstoneManager::new(kv_backend.clone());
456 tombstone_manager.set_max_txn_ops(4);
457 let kvs = HashMap::from([
458 (b"bar".to_vec(), b"baz".to_vec()),
459 (b"foo".to_vec(), b"hi".to_vec()),
460 (b"baz".to_vec(), b"hello".to_vec()),
461 (b"qux".to_vec(), b"world".to_vec()),
462 (b"quux".to_vec(), b"world".to_vec()),
463 (b"quuux".to_vec(), b"world".to_vec()),
464 (b"quuuux".to_vec(), b"world".to_vec()),
465 (b"quuuuux".to_vec(), b"world".to_vec()),
466 (b"quuuuuux".to_vec(), b"world".to_vec()),
467 ]);
468 for (key, value) in &kvs {
469 kv_backend
470 .put(
471 PutRequest::new()
472 .with_key(key.clone())
473 .with_value(value.clone()),
474 )
475 .await
476 .unwrap();
477 }
478 let move_values = kvs
479 .iter()
480 .map(|(key, value)| MoveValue {
481 key: key.clone(),
482 dest_key: tombstone_manager.to_tombstone(key),
483 value: value.clone(),
484 })
485 .collect::<Vec<_>>();
486 let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
487 .clone()
488 .into_iter()
489 .map(|kv| (kv.key, kv.dest_key))
490 .unzip();
491 let moved_keys = tombstone_manager
492 .move_values(keys.clone(), dest_keys.clone())
493 .await
494 .unwrap();
495 assert_eq!(kvs.len(), moved_keys);
496 check_moved_values(kv_backend.clone(), &move_values).await;
497 let moved_keys = tombstone_manager
499 .move_values(keys.clone(), dest_keys.clone())
500 .await
501 .unwrap();
502 assert_eq!(0, moved_keys);
503 check_moved_values(kv_backend.clone(), &move_values).await;
504 }
505
506 #[tokio::test]
507 async fn test_move_values_with_non_exists_values() {
508 let kv_backend = Arc::new(MemoryKvBackend::default());
509 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
510 let kvs = HashMap::from([
511 (b"bar".to_vec(), b"baz".to_vec()),
512 (b"foo".to_vec(), b"hi".to_vec()),
513 (b"baz".to_vec(), b"hello".to_vec()),
514 ]);
515 for (key, value) in &kvs {
516 kv_backend
517 .put(
518 PutRequest::new()
519 .with_key(key.clone())
520 .with_value(value.clone()),
521 )
522 .await
523 .unwrap();
524 }
525 let move_values = kvs
526 .iter()
527 .map(|(key, value)| MoveValue {
528 key: key.clone(),
529 dest_key: tombstone_manager.to_tombstone(key),
530 value: value.clone(),
531 })
532 .collect::<Vec<_>>();
533 let (mut keys, mut dest_keys): (Vec<_>, Vec<_>) = move_values
534 .clone()
535 .into_iter()
536 .map(|kv| (kv.key, kv.dest_key))
537 .unzip();
538 keys.push(b"non-exists".to_vec());
539 dest_keys.push(b"hi/non-exists".to_vec());
540 let moved_keys = tombstone_manager
541 .move_values(keys.clone(), dest_keys.clone())
542 .await
543 .unwrap();
544 check_moved_values(kv_backend.clone(), &move_values).await;
545 assert_eq!(3, moved_keys);
546 let moved_keys = tombstone_manager
548 .move_values(keys.clone(), dest_keys.clone())
549 .await
550 .unwrap();
551 check_moved_values(kv_backend.clone(), &move_values).await;
552 assert_eq!(0, moved_keys);
553 }
554
555 #[tokio::test]
556 async fn test_move_values_changed() {
557 let kv_backend = Arc::new(MemoryKvBackend::default());
558 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
559 let kvs = HashMap::from([
560 (b"bar".to_vec(), b"baz".to_vec()),
561 (b"foo".to_vec(), b"hi".to_vec()),
562 (b"baz".to_vec(), b"hello".to_vec()),
563 ]);
564 for (key, value) in &kvs {
565 kv_backend
566 .put(
567 PutRequest::new()
568 .with_key(key.clone())
569 .with_value(value.clone()),
570 )
571 .await
572 .unwrap();
573 }
574
575 kv_backend
576 .put(PutRequest::new().with_key("baz").with_value("changed"))
577 .await
578 .unwrap();
579
580 let move_values = kvs
581 .iter()
582 .map(|(key, value)| MoveValue {
583 key: key.clone(),
584 dest_key: tombstone_manager.to_tombstone(key),
585 value: value.clone(),
586 })
587 .collect::<Vec<_>>();
588 let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
589 .clone()
590 .into_iter()
591 .map(|kv| (kv.key, kv.dest_key))
592 .unzip();
593 let moved_keys = tombstone_manager
594 .move_values(keys, dest_keys)
595 .await
596 .unwrap();
597 assert_eq!(kvs.len(), moved_keys);
598 }
599
600 #[tokio::test]
601 async fn test_move_values_overwrite_dest_values() {
602 let kv_backend = Arc::new(MemoryKvBackend::default());
603 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
604 let kvs = HashMap::from([
605 (b"bar".to_vec(), b"baz".to_vec()),
606 (b"foo".to_vec(), b"hi".to_vec()),
607 (b"baz".to_vec(), b"hello".to_vec()),
608 ]);
609 for (key, value) in &kvs {
610 kv_backend
611 .put(
612 PutRequest::new()
613 .with_key(key.clone())
614 .with_value(value.clone()),
615 )
616 .await
617 .unwrap();
618 }
619
620 let move_values = kvs
622 .iter()
623 .map(|(key, value)| MoveValue {
624 key: key.clone(),
625 dest_key: tombstone_manager.to_tombstone(key),
626 value: value.clone(),
627 })
628 .collect::<Vec<_>>();
629 let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
630 .clone()
631 .into_iter()
632 .map(|kv| (kv.key, kv.dest_key))
633 .unzip();
634 tombstone_manager
635 .move_values(keys, dest_keys)
636 .await
637 .unwrap();
638 check_moved_values(kv_backend.clone(), &move_values).await;
639
640 let kvs = HashMap::from([
642 (b"bar".to_vec(), b"new baz".to_vec()),
643 (b"foo".to_vec(), b"new hi".to_vec()),
644 (b"baz".to_vec(), b"new baz".to_vec()),
645 ]);
646 for (key, value) in &kvs {
647 kv_backend
648 .put(
649 PutRequest::new()
650 .with_key(key.clone())
651 .with_value(value.clone()),
652 )
653 .await
654 .unwrap();
655 }
656 let move_values = kvs
657 .iter()
658 .map(|(key, value)| MoveValue {
659 key: key.clone(),
660 dest_key: tombstone_manager.to_tombstone(key),
661 value: value.clone(),
662 })
663 .collect::<Vec<_>>();
664 let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
665 .clone()
666 .into_iter()
667 .map(|kv| (kv.key, kv.dest_key))
668 .unzip();
669 tombstone_manager
670 .move_values(keys, dest_keys)
671 .await
672 .unwrap();
673 check_moved_values(kv_backend.clone(), &move_values).await;
674 }
675
676 #[tokio::test]
677 async fn test_move_values_with_different_lengths() {
678 let kv_backend = Arc::new(MemoryKvBackend::default());
679 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
680
681 let keys = vec![b"bar".to_vec(), b"foo".to_vec()];
682 let dest_keys = vec![b"bar".to_vec(), b"foo".to_vec(), b"baz".to_vec()];
683
684 let err = tombstone_manager
685 .move_values(keys, dest_keys)
686 .await
687 .unwrap_err();
688 assert!(err
689 .to_string()
690 .contains("The length of keys(2) does not match the length of dest_keys(3)."),);
691
692 let moved_keys = tombstone_manager.move_values(vec![], vec![]).await.unwrap();
693 assert_eq!(0, moved_keys);
694 }
695}