1use std::collections::HashMap;
16
17use snafu::ensure;
18
19use crate::error::{self, Result};
20use crate::key::txn_helper::TxnOpGetResponseSet;
21use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
22use crate::kv_backend::KvBackendRef;
23use crate::rpc::store::BatchGetRequest;
24
25pub(crate) struct TombstoneManager {
29 kv_backend: KvBackendRef,
30}
31
32const TOMBSTONE_PREFIX: &str = "__tombstone/";
33
34fn to_tombstone(key: &[u8]) -> Vec<u8> {
35 [TOMBSTONE_PREFIX.as_bytes(), key].concat()
36}
37
38impl TombstoneManager {
39 pub fn new(kv_backend: KvBackendRef) -> Self {
41 Self { kv_backend }
42 }
43
44 fn build_move_value_txn(
50 &self,
51 src_key: Vec<u8>,
52 value: Vec<u8>,
53 dest_key: Vec<u8>,
54 ) -> (Txn, impl FnMut(&mut TxnOpGetResponseSet) -> Option<Vec<u8>>) {
55 let txn = Txn::new()
56 .when(vec![Compare::with_value(
57 src_key.clone(),
58 CompareOp::Equal,
59 value.clone(),
60 )])
61 .and_then(vec![
62 TxnOp::Put(dest_key.clone(), value.clone()),
63 TxnOp::Delete(src_key.clone()),
64 ])
65 .or_else(vec![TxnOp::Get(src_key.clone())]);
66
67 (txn, TxnOpGetResponseSet::filter(src_key))
68 }
69
70 async fn move_values_inner(&self, keys: &[Vec<u8>], dest_keys: &[Vec<u8>]) -> Result<()> {
71 ensure!(
72 keys.len() == dest_keys.len(),
73 error::UnexpectedSnafu {
74 err_msg: "The length of keys does not match the length of dest_keys."
75 }
76 );
77 let lookup_table = keys.iter().zip(dest_keys.iter()).collect::<HashMap<_, _>>();
79
80 let resp = self
81 .kv_backend
82 .batch_get(BatchGetRequest::new().with_keys(keys.to_vec()))
83 .await?;
84 let mut results = resp
85 .kvs
86 .into_iter()
87 .map(|kv| (kv.key, kv.value))
88 .collect::<HashMap<_, _>>();
89
90 const MAX_RETRIES: usize = 8;
91 for _ in 0..MAX_RETRIES {
92 let (txns, (keys, filters)): (Vec<_>, (Vec<_>, Vec<_>)) = results
93 .iter()
94 .map(|(key, value)| {
95 let (txn, filter) = self.build_move_value_txn(
96 key.clone(),
97 value.clone(),
98 lookup_table[&key].clone(),
99 );
100 (txn, (key.clone(), filter))
101 })
102 .unzip();
103 let mut resp = self.kv_backend.txn(Txn::merge_all(txns)).await?;
104 if resp.succeeded {
105 return Ok(());
106 }
107 let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
108 for (idx, mut filter) in filters.into_iter().enumerate() {
110 if let Some(value) = filter(&mut set) {
111 results.insert(keys[idx].clone(), value);
112 } else {
113 results.remove(&keys[idx]);
114 }
115 }
116 }
117
118 error::MoveValuesSnafu {
119 err_msg: format!(
120 "keys: {:?}",
121 keys.iter().map(|key| String::from_utf8_lossy(key)),
122 ),
123 }
124 .fail()
125 }
126
127 async fn move_values(&self, keys: Vec<Vec<u8>>, dest_keys: Vec<Vec<u8>>) -> Result<()> {
129 let chunk_size = self.kv_backend.max_txn_ops() / 2;
130 if keys.len() > chunk_size {
131 let keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
132 let dest_keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
133 for (keys, dest_keys) in keys_chunks.into_iter().zip(dest_keys_chunks) {
134 self.move_values_inner(keys, dest_keys).await?;
135 }
136
137 Ok(())
138 } else {
139 self.move_values_inner(&keys, &dest_keys).await
140 }
141 }
142
143 pub(crate) async fn create(&self, keys: Vec<Vec<u8>>) -> Result<()> {
149 let (keys, dest_keys): (Vec<_>, Vec<_>) = keys
150 .into_iter()
151 .map(|key| {
152 let tombstone_key = to_tombstone(&key);
153 (key, tombstone_key)
154 })
155 .unzip();
156
157 self.move_values(keys, dest_keys).await
158 }
159
160 pub(crate) async fn restore(&self, keys: Vec<Vec<u8>>) -> Result<()> {
166 let (keys, dest_keys): (Vec<_>, Vec<_>) = keys
167 .into_iter()
168 .map(|key| {
169 let tombstone_key = to_tombstone(&key);
170 (tombstone_key, key)
171 })
172 .unzip();
173
174 self.move_values(keys, dest_keys).await
175 }
176
177 pub(crate) async fn delete(&self, keys: Vec<Vec<u8>>) -> Result<()> {
179 let operations = keys
180 .iter()
181 .map(|key| TxnOp::Delete(to_tombstone(key)))
182 .collect::<Vec<_>>();
183
184 let txn = Txn::new().and_then(operations);
185 let _ = self.kv_backend.txn(txn).await?;
187 Ok(())
188 }
189}
190
191#[cfg(test)]
192mod tests {
193
194 use std::collections::HashMap;
195 use std::sync::Arc;
196
197 use super::to_tombstone;
198 use crate::error::Error;
199 use crate::key::tombstone::TombstoneManager;
200 use crate::kv_backend::memory::MemoryKvBackend;
201 use crate::kv_backend::KvBackend;
202 use crate::rpc::store::PutRequest;
203
204 #[derive(Debug, Clone)]
205 struct MoveValue {
206 key: Vec<u8>,
207 dest_key: Vec<u8>,
208 value: Vec<u8>,
209 }
210
211 async fn check_moved_values(
212 kv_backend: Arc<MemoryKvBackend<Error>>,
213 move_values: &[MoveValue],
214 ) {
215 for MoveValue {
216 key,
217 dest_key,
218 value,
219 } in move_values
220 {
221 assert!(kv_backend.get(key).await.unwrap().is_none());
222 assert_eq!(
223 &kv_backend.get(dest_key).await.unwrap().unwrap().value,
224 value,
225 );
226 }
227 }
228
229 #[tokio::test]
230 async fn test_create_tombstone() {
231 let kv_backend = Arc::new(MemoryKvBackend::default());
232 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
233 kv_backend
234 .put(PutRequest::new().with_key("bar").with_value("baz"))
235 .await
236 .unwrap();
237 kv_backend
238 .put(PutRequest::new().with_key("foo").with_value("hi"))
239 .await
240 .unwrap();
241 tombstone_manager
242 .create(vec![b"bar".to_vec(), b"foo".to_vec()])
243 .await
244 .unwrap();
245 assert!(!kv_backend.exists(b"bar").await.unwrap());
246 assert!(!kv_backend.exists(b"foo").await.unwrap());
247 assert_eq!(
248 kv_backend
249 .get(&to_tombstone(b"bar"))
250 .await
251 .unwrap()
252 .unwrap()
253 .value,
254 b"baz"
255 );
256 assert_eq!(
257 kv_backend
258 .get(&to_tombstone(b"foo"))
259 .await
260 .unwrap()
261 .unwrap()
262 .value,
263 b"hi"
264 );
265 assert_eq!(kv_backend.len(), 2);
266 }
267
268 #[tokio::test]
269 async fn test_create_tombstone_with_non_exist_values() {
270 let kv_backend = Arc::new(MemoryKvBackend::default());
271 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
272
273 kv_backend
274 .put(PutRequest::new().with_key("bar").with_value("baz"))
275 .await
276 .unwrap();
277 kv_backend
278 .put(PutRequest::new().with_key("foo").with_value("hi"))
279 .await
280 .unwrap();
281
282 tombstone_manager
283 .create(vec![b"bar".to_vec(), b"baz".to_vec()])
284 .await
285 .unwrap();
286 check_moved_values(
287 kv_backend.clone(),
288 &[MoveValue {
289 key: b"bar".to_vec(),
290 dest_key: to_tombstone(b"bar"),
291 value: b"baz".to_vec(),
292 }],
293 )
294 .await;
295 }
296
297 #[tokio::test]
298 async fn test_restore_tombstone() {
299 let kv_backend = Arc::new(MemoryKvBackend::default());
300 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
301 kv_backend
302 .put(PutRequest::new().with_key("bar").with_value("baz"))
303 .await
304 .unwrap();
305 kv_backend
306 .put(PutRequest::new().with_key("foo").with_value("hi"))
307 .await
308 .unwrap();
309 let expected_kvs = kv_backend.dump();
310 tombstone_manager
311 .create(vec![b"bar".to_vec(), b"foo".to_vec()])
312 .await
313 .unwrap();
314 tombstone_manager
315 .restore(vec![b"bar".to_vec(), b"foo".to_vec()])
316 .await
317 .unwrap();
318 assert_eq!(expected_kvs, kv_backend.dump());
319 }
320
321 #[tokio::test]
322 async fn test_delete_tombstone() {
323 let kv_backend = Arc::new(MemoryKvBackend::default());
324 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
325 kv_backend
326 .put(PutRequest::new().with_key("bar").with_value("baz"))
327 .await
328 .unwrap();
329 kv_backend
330 .put(PutRequest::new().with_key("foo").with_value("hi"))
331 .await
332 .unwrap();
333 tombstone_manager
334 .create(vec![b"bar".to_vec(), b"foo".to_vec()])
335 .await
336 .unwrap();
337 tombstone_manager
338 .delete(vec![b"bar".to_vec(), b"foo".to_vec()])
339 .await
340 .unwrap();
341 assert!(kv_backend.is_empty());
342 }
343
344 #[tokio::test]
345 async fn test_move_values() {
346 let kv_backend = Arc::new(MemoryKvBackend::default());
347 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
348 let kvs = HashMap::from([
349 (b"bar".to_vec(), b"baz".to_vec()),
350 (b"foo".to_vec(), b"hi".to_vec()),
351 (b"baz".to_vec(), b"hello".to_vec()),
352 ]);
353 for (key, value) in &kvs {
354 kv_backend
355 .put(
356 PutRequest::new()
357 .with_key(key.clone())
358 .with_value(value.clone()),
359 )
360 .await
361 .unwrap();
362 }
363 let move_values = kvs
364 .iter()
365 .map(|(key, value)| MoveValue {
366 key: key.clone(),
367 dest_key: to_tombstone(key),
368 value: value.clone(),
369 })
370 .collect::<Vec<_>>();
371 let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
372 .clone()
373 .into_iter()
374 .map(|kv| (kv.key, kv.dest_key))
375 .unzip();
376 tombstone_manager
377 .move_values(keys.clone(), dest_keys.clone())
378 .await
379 .unwrap();
380 check_moved_values(kv_backend.clone(), &move_values).await;
381 tombstone_manager
383 .move_values(keys.clone(), dest_keys.clone())
384 .await
385 .unwrap();
386 check_moved_values(kv_backend.clone(), &move_values).await;
387 }
388
389 #[tokio::test]
390 async fn test_move_values_with_non_exists_values() {
391 let kv_backend = Arc::new(MemoryKvBackend::default());
392 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
393 let kvs = HashMap::from([
394 (b"bar".to_vec(), b"baz".to_vec()),
395 (b"foo".to_vec(), b"hi".to_vec()),
396 (b"baz".to_vec(), b"hello".to_vec()),
397 ]);
398 for (key, value) in &kvs {
399 kv_backend
400 .put(
401 PutRequest::new()
402 .with_key(key.clone())
403 .with_value(value.clone()),
404 )
405 .await
406 .unwrap();
407 }
408 let move_values = kvs
409 .iter()
410 .map(|(key, value)| MoveValue {
411 key: key.clone(),
412 dest_key: to_tombstone(key),
413 value: value.clone(),
414 })
415 .collect::<Vec<_>>();
416 let (mut keys, mut dest_keys): (Vec<_>, Vec<_>) = move_values
417 .clone()
418 .into_iter()
419 .map(|kv| (kv.key, kv.dest_key))
420 .unzip();
421 keys.push(b"non-exists".to_vec());
422 dest_keys.push(b"hi/non-exists".to_vec());
423 tombstone_manager
424 .move_values(keys.clone(), dest_keys.clone())
425 .await
426 .unwrap();
427 check_moved_values(kv_backend.clone(), &move_values).await;
428 tombstone_manager
430 .move_values(keys.clone(), dest_keys.clone())
431 .await
432 .unwrap();
433 check_moved_values(kv_backend.clone(), &move_values).await;
434 }
435
436 #[tokio::test]
437 async fn test_move_values_changed() {
438 let kv_backend = Arc::new(MemoryKvBackend::default());
439 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
440 let kvs = HashMap::from([
441 (b"bar".to_vec(), b"baz".to_vec()),
442 (b"foo".to_vec(), b"hi".to_vec()),
443 (b"baz".to_vec(), b"hello".to_vec()),
444 ]);
445 for (key, value) in &kvs {
446 kv_backend
447 .put(
448 PutRequest::new()
449 .with_key(key.clone())
450 .with_value(value.clone()),
451 )
452 .await
453 .unwrap();
454 }
455
456 kv_backend
457 .put(PutRequest::new().with_key("baz").with_value("changed"))
458 .await
459 .unwrap();
460
461 let move_values = kvs
462 .iter()
463 .map(|(key, value)| MoveValue {
464 key: key.clone(),
465 dest_key: to_tombstone(key),
466 value: value.clone(),
467 })
468 .collect::<Vec<_>>();
469 let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
470 .clone()
471 .into_iter()
472 .map(|kv| (kv.key, kv.dest_key))
473 .unzip();
474 tombstone_manager
475 .move_values(keys, dest_keys)
476 .await
477 .unwrap();
478 }
479
480 #[tokio::test]
481 async fn test_move_values_overwrite_dest_values() {
482 let kv_backend = Arc::new(MemoryKvBackend::default());
483 let tombstone_manager = TombstoneManager::new(kv_backend.clone());
484 let kvs = HashMap::from([
485 (b"bar".to_vec(), b"baz".to_vec()),
486 (b"foo".to_vec(), b"hi".to_vec()),
487 (b"baz".to_vec(), b"hello".to_vec()),
488 ]);
489 for (key, value) in &kvs {
490 kv_backend
491 .put(
492 PutRequest::new()
493 .with_key(key.clone())
494 .with_value(value.clone()),
495 )
496 .await
497 .unwrap();
498 }
499
500 let move_values = kvs
502 .iter()
503 .map(|(key, value)| MoveValue {
504 key: key.clone(),
505 dest_key: to_tombstone(key),
506 value: value.clone(),
507 })
508 .collect::<Vec<_>>();
509 let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
510 .clone()
511 .into_iter()
512 .map(|kv| (kv.key, kv.dest_key))
513 .unzip();
514 tombstone_manager
515 .move_values(keys, dest_keys)
516 .await
517 .unwrap();
518 check_moved_values(kv_backend.clone(), &move_values).await;
519
520 let kvs = HashMap::from([
522 (b"bar".to_vec(), b"new baz".to_vec()),
523 (b"foo".to_vec(), b"new hi".to_vec()),
524 (b"baz".to_vec(), b"new baz".to_vec()),
525 ]);
526 for (key, value) in &kvs {
527 kv_backend
528 .put(
529 PutRequest::new()
530 .with_key(key.clone())
531 .with_value(value.clone()),
532 )
533 .await
534 .unwrap();
535 }
536 let move_values = kvs
537 .iter()
538 .map(|(key, value)| MoveValue {
539 key: key.clone(),
540 dest_key: to_tombstone(key),
541 value: value.clone(),
542 })
543 .collect::<Vec<_>>();
544 let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
545 .clone()
546 .into_iter()
547 .map(|kv| (kv.key, kv.dest_key))
548 .unzip();
549 tombstone_manager
550 .move_values(keys, dest_keys)
551 .await
552 .unwrap();
553 check_moved_values(kv_backend.clone(), &move_values).await;
554 }
555}