common_meta/key/
tombstone.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use common_telemetry::debug;
18use snafu::ensure;
19
20use crate::error::{self, Result};
21use crate::key::txn_helper::TxnOpGetResponseSet;
22use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
23use crate::kv_backend::KvBackendRef;
24use crate::rpc::store::{BatchDeleteRequest, BatchGetRequest};
25
26/// [TombstoneManager] provides the ability to:
27/// - logically delete values
28/// - restore the deleted values
29pub struct TombstoneManager {
30    kv_backend: KvBackendRef,
31    tombstone_prefix: String,
32    // Only used for testing.
33    #[cfg(test)]
34    max_txn_ops: Option<usize>,
35}
36
37const TOMBSTONE_PREFIX: &str = "__tombstone/";
38
39impl TombstoneManager {
40    /// Returns [TombstoneManager].
41    pub fn new(kv_backend: KvBackendRef) -> Self {
42        Self::new_with_prefix(kv_backend, TOMBSTONE_PREFIX)
43    }
44
45    /// Returns [TombstoneManager] with a custom tombstone prefix.
46    pub fn new_with_prefix(kv_backend: KvBackendRef, prefix: &str) -> Self {
47        Self {
48            kv_backend,
49            tombstone_prefix: prefix.to_string(),
50            #[cfg(test)]
51            max_txn_ops: None,
52        }
53    }
54
55    pub fn to_tombstone(&self, key: &[u8]) -> Vec<u8> {
56        [self.tombstone_prefix.as_bytes(), key].concat()
57    }
58
59    #[cfg(test)]
60    pub fn set_max_txn_ops(&mut self, max_txn_ops: usize) {
61        self.max_txn_ops = Some(max_txn_ops);
62    }
63
64    /// Moves value to `dest_key`.
65    ///
66    /// Puts `value` to `dest_key` if the value of `src_key` equals `value`.
67    ///
68    /// Otherwise retrieves the value of `src_key`.
69    fn build_move_value_txn(
70        &self,
71        src_key: Vec<u8>,
72        value: Vec<u8>,
73        dest_key: Vec<u8>,
74    ) -> (Txn, impl FnMut(&mut TxnOpGetResponseSet) -> Option<Vec<u8>>) {
75        let txn = Txn::new()
76            .when(vec![Compare::with_value(
77                src_key.clone(),
78                CompareOp::Equal,
79                value.clone(),
80            )])
81            .and_then(vec![
82                TxnOp::Put(dest_key.clone(), value.clone()),
83                TxnOp::Delete(src_key.clone()),
84            ])
85            .or_else(vec![TxnOp::Get(src_key.clone())]);
86
87        (txn, TxnOpGetResponseSet::filter(src_key))
88    }
89
90    async fn move_values_inner(&self, keys: &[Vec<u8>], dest_keys: &[Vec<u8>]) -> Result<usize> {
91        ensure!(
92            keys.len() == dest_keys.len(),
93            error::UnexpectedSnafu {
94                err_msg: format!(
95                    "The length of keys({}) does not match the length of dest_keys({}).",
96                    keys.len(),
97                    dest_keys.len()
98                ),
99            }
100        );
101        // The key -> dest key mapping.
102        let lookup_table = keys.iter().zip(dest_keys.iter()).collect::<HashMap<_, _>>();
103
104        let resp = self
105            .kv_backend
106            .batch_get(BatchGetRequest::new().with_keys(keys.to_vec()))
107            .await?;
108        let mut results = resp
109            .kvs
110            .into_iter()
111            .map(|kv| (kv.key, kv.value))
112            .collect::<HashMap<_, _>>();
113
114        const MAX_RETRIES: usize = 8;
115        for _ in 0..MAX_RETRIES {
116            let (txns, (keys, filters)): (Vec<_>, (Vec<_>, Vec<_>)) = results
117                .iter()
118                .map(|(key, value)| {
119                    let (txn, filter) = self.build_move_value_txn(
120                        key.clone(),
121                        value.clone(),
122                        lookup_table[&key].clone(),
123                    );
124                    (txn, (key.clone(), filter))
125                })
126                .unzip();
127            let mut resp = self.kv_backend.txn(Txn::merge_all(txns)).await?;
128            if resp.succeeded {
129                return Ok(keys.len());
130            }
131            let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
132            // Updates results.
133            for (idx, mut filter) in filters.into_iter().enumerate() {
134                if let Some(value) = filter(&mut set) {
135                    results.insert(keys[idx].clone(), value);
136                } else {
137                    results.remove(&keys[idx]);
138                }
139            }
140        }
141
142        error::MoveValuesSnafu {
143            err_msg: format!(
144                "keys: {:?}",
145                keys.iter().map(|key| String::from_utf8_lossy(key)),
146            ),
147        }
148        .fail()
149    }
150
151    fn max_txn_ops(&self) -> usize {
152        #[cfg(test)]
153        if let Some(max_txn_ops) = self.max_txn_ops {
154            return max_txn_ops;
155        }
156        self.kv_backend.max_txn_ops()
157    }
158
159    /// Moves values to `dest_key`.
160    ///
161    /// Returns the number of keys that were moved.
162    async fn move_values(&self, keys: Vec<Vec<u8>>, dest_keys: Vec<Vec<u8>>) -> Result<usize> {
163        ensure!(
164            keys.len() == dest_keys.len(),
165            error::UnexpectedSnafu {
166                err_msg: format!(
167                    "The length of keys({}) does not match the length of dest_keys({}).",
168                    keys.len(),
169                    dest_keys.len()
170                ),
171            }
172        );
173        if keys.is_empty() {
174            return Ok(0);
175        }
176        let chunk_size = self.max_txn_ops() / 2;
177        if keys.len() > chunk_size {
178            debug!(
179                "Moving values with multiple chunks, keys len: {}, chunk_size: {}",
180                keys.len(),
181                chunk_size
182            );
183            let mut moved_keys = 0;
184            let keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
185            let dest_keys_chunks = dest_keys.chunks(chunk_size).collect::<Vec<_>>();
186            for (keys, dest_keys) in keys_chunks.into_iter().zip(dest_keys_chunks) {
187                moved_keys += self.move_values_inner(keys, dest_keys).await?;
188            }
189            Ok(moved_keys)
190        } else {
191            self.move_values_inner(&keys, &dest_keys).await
192        }
193    }
194
195    /// Creates tombstones for keys.
196    ///
197    /// Preforms to:
198    /// - deletes origin values.
199    /// - stores tombstone values.
200    ///
201    /// Returns the number of keys that were moved.
202    pub async fn create(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
203        let (keys, dest_keys): (Vec<_>, Vec<_>) = keys
204            .into_iter()
205            .map(|key| {
206                let tombstone_key = self.to_tombstone(&key);
207                (key, tombstone_key)
208            })
209            .unzip();
210
211        self.move_values(keys, dest_keys).await
212    }
213
214    /// Restores tombstones for keys.
215    ///
216    /// Preforms to:
217    /// - restore origin value.
218    /// - deletes tombstone values.
219    ///
220    /// Returns the number of keys that were restored.
221    pub async fn restore(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
222        let (keys, dest_keys): (Vec<_>, Vec<_>) = keys
223            .into_iter()
224            .map(|key| {
225                let tombstone_key = self.to_tombstone(&key);
226                (tombstone_key, key)
227            })
228            .unzip();
229
230        self.move_values(keys, dest_keys).await
231    }
232
233    /// Deletes tombstones values for the specified `keys`.
234    ///
235    /// Returns the number of keys that were deleted.
236    pub async fn delete(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
237        let keys = keys
238            .iter()
239            .map(|key| self.to_tombstone(key))
240            .collect::<Vec<_>>();
241
242        let num_keys = keys.len();
243        let _ = self
244            .kv_backend
245            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
246            .await?;
247
248        Ok(num_keys)
249    }
250}
251
252#[cfg(test)]
253mod tests {
254
255    use std::collections::HashMap;
256    use std::sync::Arc;
257
258    use crate::error::Error;
259    use crate::key::tombstone::TombstoneManager;
260    use crate::kv_backend::memory::MemoryKvBackend;
261    use crate::kv_backend::KvBackend;
262    use crate::rpc::store::PutRequest;
263
264    #[derive(Debug, Clone)]
265    struct MoveValue {
266        key: Vec<u8>,
267        dest_key: Vec<u8>,
268        value: Vec<u8>,
269    }
270
271    async fn check_moved_values(
272        kv_backend: Arc<MemoryKvBackend<Error>>,
273        move_values: &[MoveValue],
274    ) {
275        for MoveValue {
276            key,
277            dest_key,
278            value,
279        } in move_values
280        {
281            assert!(kv_backend.get(key).await.unwrap().is_none());
282            assert_eq!(
283                &kv_backend.get(dest_key).await.unwrap().unwrap().value,
284                value,
285            );
286        }
287    }
288
289    #[tokio::test]
290    async fn test_create_tombstone() {
291        let kv_backend = Arc::new(MemoryKvBackend::default());
292        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
293        kv_backend
294            .put(PutRequest::new().with_key("bar").with_value("baz"))
295            .await
296            .unwrap();
297        kv_backend
298            .put(PutRequest::new().with_key("foo").with_value("hi"))
299            .await
300            .unwrap();
301        tombstone_manager
302            .create(vec![b"bar".to_vec(), b"foo".to_vec()])
303            .await
304            .unwrap();
305        assert!(!kv_backend.exists(b"bar").await.unwrap());
306        assert!(!kv_backend.exists(b"foo").await.unwrap());
307        assert_eq!(
308            kv_backend
309                .get(&tombstone_manager.to_tombstone(b"bar"))
310                .await
311                .unwrap()
312                .unwrap()
313                .value,
314            b"baz"
315        );
316        assert_eq!(
317            kv_backend
318                .get(&tombstone_manager.to_tombstone(b"foo"))
319                .await
320                .unwrap()
321                .unwrap()
322                .value,
323            b"hi"
324        );
325        assert_eq!(kv_backend.len(), 2);
326    }
327
328    #[tokio::test]
329    async fn test_create_tombstone_with_non_exist_values() {
330        let kv_backend = Arc::new(MemoryKvBackend::default());
331        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
332
333        kv_backend
334            .put(PutRequest::new().with_key("bar").with_value("baz"))
335            .await
336            .unwrap();
337        kv_backend
338            .put(PutRequest::new().with_key("foo").with_value("hi"))
339            .await
340            .unwrap();
341
342        tombstone_manager
343            .create(vec![b"bar".to_vec(), b"baz".to_vec()])
344            .await
345            .unwrap();
346        check_moved_values(
347            kv_backend.clone(),
348            &[MoveValue {
349                key: b"bar".to_vec(),
350                dest_key: tombstone_manager.to_tombstone(b"bar"),
351                value: b"baz".to_vec(),
352            }],
353        )
354        .await;
355    }
356
357    #[tokio::test]
358    async fn test_restore_tombstone() {
359        let kv_backend = Arc::new(MemoryKvBackend::default());
360        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
361        kv_backend
362            .put(PutRequest::new().with_key("bar").with_value("baz"))
363            .await
364            .unwrap();
365        kv_backend
366            .put(PutRequest::new().with_key("foo").with_value("hi"))
367            .await
368            .unwrap();
369        let expected_kvs = kv_backend.dump();
370        tombstone_manager
371            .create(vec![b"bar".to_vec(), b"foo".to_vec()])
372            .await
373            .unwrap();
374        tombstone_manager
375            .restore(vec![b"bar".to_vec(), b"foo".to_vec()])
376            .await
377            .unwrap();
378        assert_eq!(expected_kvs, kv_backend.dump());
379    }
380
381    #[tokio::test]
382    async fn test_delete_tombstone() {
383        let kv_backend = Arc::new(MemoryKvBackend::default());
384        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
385        kv_backend
386            .put(PutRequest::new().with_key("bar").with_value("baz"))
387            .await
388            .unwrap();
389        kv_backend
390            .put(PutRequest::new().with_key("foo").with_value("hi"))
391            .await
392            .unwrap();
393        tombstone_manager
394            .create(vec![b"bar".to_vec(), b"foo".to_vec()])
395            .await
396            .unwrap();
397        tombstone_manager
398            .delete(vec![b"bar".to_vec(), b"foo".to_vec()])
399            .await
400            .unwrap();
401        assert!(kv_backend.is_empty());
402    }
403
404    #[tokio::test]
405    async fn test_move_values() {
406        let kv_backend = Arc::new(MemoryKvBackend::default());
407        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
408        let kvs = HashMap::from([
409            (b"bar".to_vec(), b"baz".to_vec()),
410            (b"foo".to_vec(), b"hi".to_vec()),
411            (b"baz".to_vec(), b"hello".to_vec()),
412        ]);
413        for (key, value) in &kvs {
414            kv_backend
415                .put(
416                    PutRequest::new()
417                        .with_key(key.clone())
418                        .with_value(value.clone()),
419                )
420                .await
421                .unwrap();
422        }
423        let move_values = kvs
424            .iter()
425            .map(|(key, value)| MoveValue {
426                key: key.clone(),
427                dest_key: tombstone_manager.to_tombstone(key),
428                value: value.clone(),
429            })
430            .collect::<Vec<_>>();
431        let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
432            .clone()
433            .into_iter()
434            .map(|kv| (kv.key, kv.dest_key))
435            .unzip();
436        let moved_keys = tombstone_manager
437            .move_values(keys.clone(), dest_keys.clone())
438            .await
439            .unwrap();
440        assert_eq!(kvs.len(), moved_keys);
441        check_moved_values(kv_backend.clone(), &move_values).await;
442        // Moves again
443        let moved_keys = tombstone_manager
444            .move_values(keys.clone(), dest_keys.clone())
445            .await
446            .unwrap();
447        assert_eq!(0, moved_keys);
448        check_moved_values(kv_backend.clone(), &move_values).await;
449    }
450
451    #[tokio::test]
452    async fn test_move_values_with_max_txn_ops() {
453        common_telemetry::init_default_ut_logging();
454        let kv_backend = Arc::new(MemoryKvBackend::default());
455        let mut tombstone_manager = TombstoneManager::new(kv_backend.clone());
456        tombstone_manager.set_max_txn_ops(4);
457        let kvs = HashMap::from([
458            (b"bar".to_vec(), b"baz".to_vec()),
459            (b"foo".to_vec(), b"hi".to_vec()),
460            (b"baz".to_vec(), b"hello".to_vec()),
461            (b"qux".to_vec(), b"world".to_vec()),
462            (b"quux".to_vec(), b"world".to_vec()),
463            (b"quuux".to_vec(), b"world".to_vec()),
464            (b"quuuux".to_vec(), b"world".to_vec()),
465            (b"quuuuux".to_vec(), b"world".to_vec()),
466            (b"quuuuuux".to_vec(), b"world".to_vec()),
467        ]);
468        for (key, value) in &kvs {
469            kv_backend
470                .put(
471                    PutRequest::new()
472                        .with_key(key.clone())
473                        .with_value(value.clone()),
474                )
475                .await
476                .unwrap();
477        }
478        let move_values = kvs
479            .iter()
480            .map(|(key, value)| MoveValue {
481                key: key.clone(),
482                dest_key: tombstone_manager.to_tombstone(key),
483                value: value.clone(),
484            })
485            .collect::<Vec<_>>();
486        let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
487            .clone()
488            .into_iter()
489            .map(|kv| (kv.key, kv.dest_key))
490            .unzip();
491        let moved_keys = tombstone_manager
492            .move_values(keys.clone(), dest_keys.clone())
493            .await
494            .unwrap();
495        assert_eq!(kvs.len(), moved_keys);
496        check_moved_values(kv_backend.clone(), &move_values).await;
497        // Moves again
498        let moved_keys = tombstone_manager
499            .move_values(keys.clone(), dest_keys.clone())
500            .await
501            .unwrap();
502        assert_eq!(0, moved_keys);
503        check_moved_values(kv_backend.clone(), &move_values).await;
504    }
505
506    #[tokio::test]
507    async fn test_move_values_with_non_exists_values() {
508        let kv_backend = Arc::new(MemoryKvBackend::default());
509        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
510        let kvs = HashMap::from([
511            (b"bar".to_vec(), b"baz".to_vec()),
512            (b"foo".to_vec(), b"hi".to_vec()),
513            (b"baz".to_vec(), b"hello".to_vec()),
514        ]);
515        for (key, value) in &kvs {
516            kv_backend
517                .put(
518                    PutRequest::new()
519                        .with_key(key.clone())
520                        .with_value(value.clone()),
521                )
522                .await
523                .unwrap();
524        }
525        let move_values = kvs
526            .iter()
527            .map(|(key, value)| MoveValue {
528                key: key.clone(),
529                dest_key: tombstone_manager.to_tombstone(key),
530                value: value.clone(),
531            })
532            .collect::<Vec<_>>();
533        let (mut keys, mut dest_keys): (Vec<_>, Vec<_>) = move_values
534            .clone()
535            .into_iter()
536            .map(|kv| (kv.key, kv.dest_key))
537            .unzip();
538        keys.push(b"non-exists".to_vec());
539        dest_keys.push(b"hi/non-exists".to_vec());
540        let moved_keys = tombstone_manager
541            .move_values(keys.clone(), dest_keys.clone())
542            .await
543            .unwrap();
544        check_moved_values(kv_backend.clone(), &move_values).await;
545        assert_eq!(3, moved_keys);
546        // Moves again
547        let moved_keys = tombstone_manager
548            .move_values(keys.clone(), dest_keys.clone())
549            .await
550            .unwrap();
551        check_moved_values(kv_backend.clone(), &move_values).await;
552        assert_eq!(0, moved_keys);
553    }
554
555    #[tokio::test]
556    async fn test_move_values_changed() {
557        let kv_backend = Arc::new(MemoryKvBackend::default());
558        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
559        let kvs = HashMap::from([
560            (b"bar".to_vec(), b"baz".to_vec()),
561            (b"foo".to_vec(), b"hi".to_vec()),
562            (b"baz".to_vec(), b"hello".to_vec()),
563        ]);
564        for (key, value) in &kvs {
565            kv_backend
566                .put(
567                    PutRequest::new()
568                        .with_key(key.clone())
569                        .with_value(value.clone()),
570                )
571                .await
572                .unwrap();
573        }
574
575        kv_backend
576            .put(PutRequest::new().with_key("baz").with_value("changed"))
577            .await
578            .unwrap();
579
580        let move_values = kvs
581            .iter()
582            .map(|(key, value)| MoveValue {
583                key: key.clone(),
584                dest_key: tombstone_manager.to_tombstone(key),
585                value: value.clone(),
586            })
587            .collect::<Vec<_>>();
588        let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
589            .clone()
590            .into_iter()
591            .map(|kv| (kv.key, kv.dest_key))
592            .unzip();
593        let moved_keys = tombstone_manager
594            .move_values(keys, dest_keys)
595            .await
596            .unwrap();
597        assert_eq!(kvs.len(), moved_keys);
598    }
599
600    #[tokio::test]
601    async fn test_move_values_overwrite_dest_values() {
602        let kv_backend = Arc::new(MemoryKvBackend::default());
603        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
604        let kvs = HashMap::from([
605            (b"bar".to_vec(), b"baz".to_vec()),
606            (b"foo".to_vec(), b"hi".to_vec()),
607            (b"baz".to_vec(), b"hello".to_vec()),
608        ]);
609        for (key, value) in &kvs {
610            kv_backend
611                .put(
612                    PutRequest::new()
613                        .with_key(key.clone())
614                        .with_value(value.clone()),
615                )
616                .await
617                .unwrap();
618        }
619
620        // Prepares
621        let move_values = kvs
622            .iter()
623            .map(|(key, value)| MoveValue {
624                key: key.clone(),
625                dest_key: tombstone_manager.to_tombstone(key),
626                value: value.clone(),
627            })
628            .collect::<Vec<_>>();
629        let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
630            .clone()
631            .into_iter()
632            .map(|kv| (kv.key, kv.dest_key))
633            .unzip();
634        tombstone_manager
635            .move_values(keys, dest_keys)
636            .await
637            .unwrap();
638        check_moved_values(kv_backend.clone(), &move_values).await;
639
640        // Overwrites existing dest keys.
641        let kvs = HashMap::from([
642            (b"bar".to_vec(), b"new baz".to_vec()),
643            (b"foo".to_vec(), b"new hi".to_vec()),
644            (b"baz".to_vec(), b"new baz".to_vec()),
645        ]);
646        for (key, value) in &kvs {
647            kv_backend
648                .put(
649                    PutRequest::new()
650                        .with_key(key.clone())
651                        .with_value(value.clone()),
652                )
653                .await
654                .unwrap();
655        }
656        let move_values = kvs
657            .iter()
658            .map(|(key, value)| MoveValue {
659                key: key.clone(),
660                dest_key: tombstone_manager.to_tombstone(key),
661                value: value.clone(),
662            })
663            .collect::<Vec<_>>();
664        let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
665            .clone()
666            .into_iter()
667            .map(|kv| (kv.key, kv.dest_key))
668            .unzip();
669        tombstone_manager
670            .move_values(keys, dest_keys)
671            .await
672            .unwrap();
673        check_moved_values(kv_backend.clone(), &move_values).await;
674    }
675
676    #[tokio::test]
677    async fn test_move_values_with_different_lengths() {
678        let kv_backend = Arc::new(MemoryKvBackend::default());
679        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
680
681        let keys = vec![b"bar".to_vec(), b"foo".to_vec()];
682        let dest_keys = vec![b"bar".to_vec(), b"foo".to_vec(), b"baz".to_vec()];
683
684        let err = tombstone_manager
685            .move_values(keys, dest_keys)
686            .await
687            .unwrap_err();
688        assert!(err
689            .to_string()
690            .contains("The length of keys(2) does not match the length of dest_keys(3)."),);
691
692        let moved_keys = tombstone_manager.move_values(vec![], vec![]).await.unwrap();
693        assert_eq!(0, moved_keys);
694    }
695}