1use std::collections::HashMap;
16
17use snafu::ensure;
18
19use crate::error::{self, Result};
20use crate::key::txn_helper::TxnOpGetResponseSet;
21use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
22use crate::kv_backend::KvBackendRef;
23use crate::rpc::store::BatchGetRequest;
24
25pub struct TombstoneManager {
29 kv_backend: KvBackendRef,
30 tombstone_prefix: String,
31}
32
33const TOMBSTONE_PREFIX: &str = "__tombstone/";
34
35impl TombstoneManager {
36 pub fn new(kv_backend: KvBackendRef) -> Self {
38 Self {
39 kv_backend,
40 tombstone_prefix: TOMBSTONE_PREFIX.to_string(),
41 }
42 }
43
44 pub fn new_with_prefix(kv_backend: KvBackendRef, prefix: &str) -> Self {
46 Self {
47 kv_backend,
48 tombstone_prefix: prefix.to_string(),
49 }
50 }
51
52 pub fn to_tombstone(&self, key: &[u8]) -> Vec<u8> {
53 [self.tombstone_prefix.as_bytes(), key].concat()
54 }
55
56 fn build_move_value_txn(
62 &self,
63 src_key: Vec<u8>,
64 value: Vec<u8>,
65 dest_key: Vec<u8>,
66 ) -> (Txn, impl FnMut(&mut TxnOpGetResponseSet) -> Option<Vec<u8>>) {
67 let txn = Txn::new()
68 .when(vec![Compare::with_value(
69 src_key.clone(),
70 CompareOp::Equal,
71 value.clone(),
72 )])
73 .and_then(vec![
74 TxnOp::Put(dest_key.clone(), value.clone()),
75 TxnOp::Delete(src_key.clone()),
76 ])
77 .or_else(vec![TxnOp::Get(src_key.clone())]);
78
79 (txn, TxnOpGetResponseSet::filter(src_key))
80 }
81
82 async fn move_values_inner(&self, keys: &[Vec<u8>], dest_keys: &[Vec<u8>]) -> Result<usize> {
83 ensure!(
84 keys.len() == dest_keys.len(),
85 error::UnexpectedSnafu {
86 err_msg: "The length of keys does not match the length of dest_keys."
87 }
88 );
89 let lookup_table = keys.iter().zip(dest_keys.iter()).collect::<HashMap<_, _>>();
91
92 let resp = self
93 .kv_backend
94 .batch_get(BatchGetRequest::new().with_keys(keys.to_vec()))
95 .await?;
96 let mut results = resp
97 .kvs
98 .into_iter()
99 .map(|kv| (kv.key, kv.value))
100 .collect::<HashMap<_, _>>();
101
102 const MAX_RETRIES: usize = 8;
103 for _ in 0..MAX_RETRIES {
104 let (txns, (keys, filters)): (Vec<_>, (Vec<_>, Vec<_>)) = results
105 .iter()
106 .map(|(key, value)| {
107 let (txn, filter) = self.build_move_value_txn(
108 key.clone(),
109 value.clone(),
110 lookup_table[&key].clone(),
111 );
112 (txn, (key.clone(), filter))
113 })
114 .unzip();
115 let mut resp = self.kv_backend.txn(Txn::merge_all(txns)).await?;
116 if resp.succeeded {
117 return Ok(keys.len());
118 }
119 let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
120 for (idx, mut filter) in filters.into_iter().enumerate() {
122 if let Some(value) = filter(&mut set) {
123 results.insert(keys[idx].clone(), value);
124 } else {
125 results.remove(&keys[idx]);
126 }
127 }
128 }
129
130 error::MoveValuesSnafu {
131 err_msg: format!(
132 "keys: {:?}",
133 keys.iter().map(|key| String::from_utf8_lossy(key)),
134 ),
135 }
136 .fail()
137 }
138
139 async fn move_values(&self, keys: Vec<Vec<u8>>, dest_keys: Vec<Vec<u8>>) -> Result<usize> {
143 let chunk_size = self.kv_backend.max_txn_ops() / 2;
144 if keys.len() > chunk_size {
145 let keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
146 let dest_keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
147 for (keys, dest_keys) in keys_chunks.into_iter().zip(dest_keys_chunks) {
148 self.move_values_inner(keys, dest_keys).await?;
149 }
150
151 Ok(keys.len())
152 } else {
153 self.move_values_inner(&keys, &dest_keys).await
154 }
155 }
156
157 pub async fn create(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
165 let (keys, dest_keys): (Vec<_>, Vec<_>) = keys
166 .into_iter()
167 .map(|key| {
168 let tombstone_key = self.to_tombstone(&key);
169 (key, tombstone_key)
170 })
171 .unzip();
172
173 self.move_values(keys, dest_keys).await
174 }
175
176 pub async fn restore(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
184 let (keys, dest_keys): (Vec<_>, Vec<_>) = keys
185 .into_iter()
186 .map(|key| {
187 let tombstone_key = self.to_tombstone(&key);
188 (tombstone_key, key)
189 })
190 .unzip();
191
192 self.move_values(keys, dest_keys).await
193 }
194
195 pub async fn delete(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
199 let operations = keys
200 .iter()
201 .map(|key| TxnOp::Delete(self.to_tombstone(key)))
202 .collect::<Vec<_>>();
203
204 let txn = Txn::new().and_then(operations);
205 let _ = self.kv_backend.txn(txn).await?;
207 Ok(keys.len())
208 }
209}
210
211#[cfg(test)]
212mod tests {
213
214 use std::collections::HashMap;
215 use std::sync::Arc;
216
217 use crate::error::Error;
218 use crate::key::tombstone::TombstoneManager;
219 use crate::kv_backend::memory::MemoryKvBackend;
220 use crate::kv_backend::KvBackend;
221 use crate::rpc::store::PutRequest;
222
223 #[derive(Debug, Clone)]
224 struct MoveValue {
225 key: Vec<u8>,
226 dest_key: Vec<u8>,
227 value: Vec<u8>,
228 }
229
230 async fn check_moved_values(
231 kv_backend: Arc<MemoryKvBackend<Error>>,
232 move_values: &[MoveValue],
233 ) {
234 for MoveValue {
235 key,
236 dest_key,
237 value,
238 } in move_values
239 {
240 assert!(kv_backend.get(key).await.unwrap().is_none());
241 assert_eq!(
242 &kv_backend.get(dest_key).await.unwrap().unwrap().value,
243 value,
244 );
245 }
246 }
247
248 #[tokio::test]
249 async fn test_create_tombstone() {
250 let kv_backend = Arc::new(MemoryKvBackend::default());
251 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
252 kv_backend
253 .put(PutRequest::new().with_key("bar").with_value("baz"))
254 .await
255 .unwrap();
256 kv_backend
257 .put(PutRequest::new().with_key("foo").with_value("hi"))
258 .await
259 .unwrap();
260 tombstone_manager
261 .create(vec![b"bar".to_vec(), b"foo".to_vec()])
262 .await
263 .unwrap();
264 assert!(!kv_backend.exists(b"bar").await.unwrap());
265 assert!(!kv_backend.exists(b"foo").await.unwrap());
266 assert_eq!(
267 kv_backend
268 .get(&tombstone_manager.to_tombstone(b"bar"))
269 .await
270 .unwrap()
271 .unwrap()
272 .value,
273 b"baz"
274 );
275 assert_eq!(
276 kv_backend
277 .get(&tombstone_manager.to_tombstone(b"foo"))
278 .await
279 .unwrap()
280 .unwrap()
281 .value,
282 b"hi"
283 );
284 assert_eq!(kv_backend.len(), 2);
285 }
286
287 #[tokio::test]
288 async fn test_create_tombstone_with_non_exist_values() {
289 let kv_backend = Arc::new(MemoryKvBackend::default());
290 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
291
292 kv_backend
293 .put(PutRequest::new().with_key("bar").with_value("baz"))
294 .await
295 .unwrap();
296 kv_backend
297 .put(PutRequest::new().with_key("foo").with_value("hi"))
298 .await
299 .unwrap();
300
301 tombstone_manager
302 .create(vec![b"bar".to_vec(), b"baz".to_vec()])
303 .await
304 .unwrap();
305 check_moved_values(
306 kv_backend.clone(),
307 &[MoveValue {
308 key: b"bar".to_vec(),
309 dest_key: tombstone_manager.to_tombstone(b"bar"),
310 value: b"baz".to_vec(),
311 }],
312 )
313 .await;
314 }
315
316 #[tokio::test]
317 async fn test_restore_tombstone() {
318 let kv_backend = Arc::new(MemoryKvBackend::default());
319 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
320 kv_backend
321 .put(PutRequest::new().with_key("bar").with_value("baz"))
322 .await
323 .unwrap();
324 kv_backend
325 .put(PutRequest::new().with_key("foo").with_value("hi"))
326 .await
327 .unwrap();
328 let expected_kvs = kv_backend.dump();
329 tombstone_manager
330 .create(vec![b"bar".to_vec(), b"foo".to_vec()])
331 .await
332 .unwrap();
333 tombstone_manager
334 .restore(vec![b"bar".to_vec(), b"foo".to_vec()])
335 .await
336 .unwrap();
337 assert_eq!(expected_kvs, kv_backend.dump());
338 }
339
340 #[tokio::test]
341 async fn test_delete_tombstone() {
342 let kv_backend = Arc::new(MemoryKvBackend::default());
343 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
344 kv_backend
345 .put(PutRequest::new().with_key("bar").with_value("baz"))
346 .await
347 .unwrap();
348 kv_backend
349 .put(PutRequest::new().with_key("foo").with_value("hi"))
350 .await
351 .unwrap();
352 tombstone_manager
353 .create(vec![b"bar".to_vec(), b"foo".to_vec()])
354 .await
355 .unwrap();
356 tombstone_manager
357 .delete(vec![b"bar".to_vec(), b"foo".to_vec()])
358 .await
359 .unwrap();
360 assert!(kv_backend.is_empty());
361 }
362
363 #[tokio::test]
364 async fn test_move_values() {
365 let kv_backend = Arc::new(MemoryKvBackend::default());
366 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
367 let kvs = HashMap::from([
368 (b"bar".to_vec(), b"baz".to_vec()),
369 (b"foo".to_vec(), b"hi".to_vec()),
370 (b"baz".to_vec(), b"hello".to_vec()),
371 ]);
372 for (key, value) in &kvs {
373 kv_backend
374 .put(
375 PutRequest::new()
376 .with_key(key.clone())
377 .with_value(value.clone()),
378 )
379 .await
380 .unwrap();
381 }
382 let move_values = kvs
383 .iter()
384 .map(|(key, value)| MoveValue {
385 key: key.clone(),
386 dest_key: tombstone_manager.to_tombstone(key),
387 value: value.clone(),
388 })
389 .collect::<Vec<_>>();
390 let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
391 .clone()
392 .into_iter()
393 .map(|kv| (kv.key, kv.dest_key))
394 .unzip();
395 tombstone_manager
396 .move_values(keys.clone(), dest_keys.clone())
397 .await
398 .unwrap();
399 check_moved_values(kv_backend.clone(), &move_values).await;
400 tombstone_manager
402 .move_values(keys.clone(), dest_keys.clone())
403 .await
404 .unwrap();
405 check_moved_values(kv_backend.clone(), &move_values).await;
406 }
407
408 #[tokio::test]
409 async fn test_move_values_with_non_exists_values() {
410 let kv_backend = Arc::new(MemoryKvBackend::default());
411 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
412 let kvs = HashMap::from([
413 (b"bar".to_vec(), b"baz".to_vec()),
414 (b"foo".to_vec(), b"hi".to_vec()),
415 (b"baz".to_vec(), b"hello".to_vec()),
416 ]);
417 for (key, value) in &kvs {
418 kv_backend
419 .put(
420 PutRequest::new()
421 .with_key(key.clone())
422 .with_value(value.clone()),
423 )
424 .await
425 .unwrap();
426 }
427 let move_values = kvs
428 .iter()
429 .map(|(key, value)| MoveValue {
430 key: key.clone(),
431 dest_key: tombstone_manager.to_tombstone(key),
432 value: value.clone(),
433 })
434 .collect::<Vec<_>>();
435 let (mut keys, mut dest_keys): (Vec<_>, Vec<_>) = move_values
436 .clone()
437 .into_iter()
438 .map(|kv| (kv.key, kv.dest_key))
439 .unzip();
440 keys.push(b"non-exists".to_vec());
441 dest_keys.push(b"hi/non-exists".to_vec());
442 tombstone_manager
443 .move_values(keys.clone(), dest_keys.clone())
444 .await
445 .unwrap();
446 check_moved_values(kv_backend.clone(), &move_values).await;
447 tombstone_manager
449 .move_values(keys.clone(), dest_keys.clone())
450 .await
451 .unwrap();
452 check_moved_values(kv_backend.clone(), &move_values).await;
453 }
454
455 #[tokio::test]
456 async fn test_move_values_changed() {
457 let kv_backend = Arc::new(MemoryKvBackend::default());
458 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
459 let kvs = HashMap::from([
460 (b"bar".to_vec(), b"baz".to_vec()),
461 (b"foo".to_vec(), b"hi".to_vec()),
462 (b"baz".to_vec(), b"hello".to_vec()),
463 ]);
464 for (key, value) in &kvs {
465 kv_backend
466 .put(
467 PutRequest::new()
468 .with_key(key.clone())
469 .with_value(value.clone()),
470 )
471 .await
472 .unwrap();
473 }
474
475 kv_backend
476 .put(PutRequest::new().with_key("baz").with_value("changed"))
477 .await
478 .unwrap();
479
480 let move_values = kvs
481 .iter()
482 .map(|(key, value)| MoveValue {
483 key: key.clone(),
484 dest_key: tombstone_manager.to_tombstone(key),
485 value: value.clone(),
486 })
487 .collect::<Vec<_>>();
488 let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
489 .clone()
490 .into_iter()
491 .map(|kv| (kv.key, kv.dest_key))
492 .unzip();
493 tombstone_manager
494 .move_values(keys, dest_keys)
495 .await
496 .unwrap();
497 }
498
499 #[tokio::test]
500 async fn test_move_values_overwrite_dest_values() {
501 let kv_backend = Arc::new(MemoryKvBackend::default());
502 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
503 let kvs = HashMap::from([
504 (b"bar".to_vec(), b"baz".to_vec()),
505 (b"foo".to_vec(), b"hi".to_vec()),
506 (b"baz".to_vec(), b"hello".to_vec()),
507 ]);
508 for (key, value) in &kvs {
509 kv_backend
510 .put(
511 PutRequest::new()
512 .with_key(key.clone())
513 .with_value(value.clone()),
514 )
515 .await
516 .unwrap();
517 }
518
519 let move_values = kvs
521 .iter()
522 .map(|(key, value)| MoveValue {
523 key: key.clone(),
524 dest_key: tombstone_manager.to_tombstone(key),
525 value: value.clone(),
526 })
527 .collect::<Vec<_>>();
528 let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
529 .clone()
530 .into_iter()
531 .map(|kv| (kv.key, kv.dest_key))
532 .unzip();
533 tombstone_manager
534 .move_values(keys, dest_keys)
535 .await
536 .unwrap();
537 check_moved_values(kv_backend.clone(), &move_values).await;
538
539 let kvs = HashMap::from([
541 (b"bar".to_vec(), b"new baz".to_vec()),
542 (b"foo".to_vec(), b"new hi".to_vec()),
543 (b"baz".to_vec(), b"new baz".to_vec()),
544 ]);
545 for (key, value) in &kvs {
546 kv_backend
547 .put(
548 PutRequest::new()
549 .with_key(key.clone())
550 .with_value(value.clone()),
551 )
552 .await
553 .unwrap();
554 }
555 let move_values = kvs
556 .iter()
557 .map(|(key, value)| MoveValue {
558 key: key.clone(),
559 dest_key: tombstone_manager.to_tombstone(key),
560 value: value.clone(),
561 })
562 .collect::<Vec<_>>();
563 let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
564 .clone()
565 .into_iter()
566 .map(|kv| (kv.key, kv.dest_key))
567 .unzip();
568 tombstone_manager
569 .move_values(keys, dest_keys)
570 .await
571 .unwrap();
572 check_moved_values(kv_backend.clone(), &move_values).await;
573 }
574}