common_meta/key/
tombstone.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use snafu::ensure;
18
19use crate::error::{self, Result};
20use crate::key::txn_helper::TxnOpGetResponseSet;
21use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
22use crate::kv_backend::KvBackendRef;
23use crate::rpc::store::BatchGetRequest;
24
25/// [TombstoneManager] provides the ability to:
26/// - logically delete values
27/// - restore the deleted values
28pub(crate) struct TombstoneManager {
29    kv_backend: KvBackendRef,
30}
31
32const TOMBSTONE_PREFIX: &str = "__tombstone/";
33
34fn to_tombstone(key: &[u8]) -> Vec<u8> {
35    [TOMBSTONE_PREFIX.as_bytes(), key].concat()
36}
37
38impl TombstoneManager {
39    /// Returns [TombstoneManager].
40    pub fn new(kv_backend: KvBackendRef) -> Self {
41        Self { kv_backend }
42    }
43
44    /// Moves value to `dest_key`.
45    ///
46    /// Puts `value` to `dest_key` if the value of `src_key` equals `value`.
47    ///
48    /// Otherwise retrieves the value of `src_key`.
49    fn build_move_value_txn(
50        &self,
51        src_key: Vec<u8>,
52        value: Vec<u8>,
53        dest_key: Vec<u8>,
54    ) -> (Txn, impl FnMut(&mut TxnOpGetResponseSet) -> Option<Vec<u8>>) {
55        let txn = Txn::new()
56            .when(vec![Compare::with_value(
57                src_key.clone(),
58                CompareOp::Equal,
59                value.clone(),
60            )])
61            .and_then(vec![
62                TxnOp::Put(dest_key.clone(), value.clone()),
63                TxnOp::Delete(src_key.clone()),
64            ])
65            .or_else(vec![TxnOp::Get(src_key.clone())]);
66
67        (txn, TxnOpGetResponseSet::filter(src_key))
68    }
69
70    async fn move_values_inner(&self, keys: &[Vec<u8>], dest_keys: &[Vec<u8>]) -> Result<()> {
71        ensure!(
72            keys.len() == dest_keys.len(),
73            error::UnexpectedSnafu {
74                err_msg: "The length of keys does not match the length of dest_keys."
75            }
76        );
77        // The key -> dest key mapping.
78        let lookup_table = keys.iter().zip(dest_keys.iter()).collect::<HashMap<_, _>>();
79
80        let resp = self
81            .kv_backend
82            .batch_get(BatchGetRequest::new().with_keys(keys.to_vec()))
83            .await?;
84        let mut results = resp
85            .kvs
86            .into_iter()
87            .map(|kv| (kv.key, kv.value))
88            .collect::<HashMap<_, _>>();
89
90        const MAX_RETRIES: usize = 8;
91        for _ in 0..MAX_RETRIES {
92            let (txns, (keys, filters)): (Vec<_>, (Vec<_>, Vec<_>)) = results
93                .iter()
94                .map(|(key, value)| {
95                    let (txn, filter) = self.build_move_value_txn(
96                        key.clone(),
97                        value.clone(),
98                        lookup_table[&key].clone(),
99                    );
100                    (txn, (key.clone(), filter))
101                })
102                .unzip();
103            let mut resp = self.kv_backend.txn(Txn::merge_all(txns)).await?;
104            if resp.succeeded {
105                return Ok(());
106            }
107            let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
108            // Updates results.
109            for (idx, mut filter) in filters.into_iter().enumerate() {
110                if let Some(value) = filter(&mut set) {
111                    results.insert(keys[idx].clone(), value);
112                } else {
113                    results.remove(&keys[idx]);
114                }
115            }
116        }
117
118        error::MoveValuesSnafu {
119            err_msg: format!(
120                "keys: {:?}",
121                keys.iter().map(|key| String::from_utf8_lossy(key)),
122            ),
123        }
124        .fail()
125    }
126
127    /// Moves values to `dest_key`.
128    async fn move_values(&self, keys: Vec<Vec<u8>>, dest_keys: Vec<Vec<u8>>) -> Result<()> {
129        let chunk_size = self.kv_backend.max_txn_ops() / 2;
130        if keys.len() > chunk_size {
131            let keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
132            let dest_keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
133            for (keys, dest_keys) in keys_chunks.into_iter().zip(dest_keys_chunks) {
134                self.move_values_inner(keys, dest_keys).await?;
135            }
136
137            Ok(())
138        } else {
139            self.move_values_inner(&keys, &dest_keys).await
140        }
141    }
142
143    /// Creates tombstones for keys.
144    ///
145    /// Preforms to:
146    /// - deletes origin values.
147    /// - stores tombstone values.
148    pub(crate) async fn create(&self, keys: Vec<Vec<u8>>) -> Result<()> {
149        let (keys, dest_keys): (Vec<_>, Vec<_>) = keys
150            .into_iter()
151            .map(|key| {
152                let tombstone_key = to_tombstone(&key);
153                (key, tombstone_key)
154            })
155            .unzip();
156
157        self.move_values(keys, dest_keys).await
158    }
159
160    /// Restores tombstones for keys.
161    ///
162    /// Preforms to:
163    /// - restore origin value.
164    /// - deletes tombstone values.
165    pub(crate) async fn restore(&self, keys: Vec<Vec<u8>>) -> Result<()> {
166        let (keys, dest_keys): (Vec<_>, Vec<_>) = keys
167            .into_iter()
168            .map(|key| {
169                let tombstone_key = to_tombstone(&key);
170                (tombstone_key, key)
171            })
172            .unzip();
173
174        self.move_values(keys, dest_keys).await
175    }
176
177    /// Deletes tombstones values for the specified `keys`.
178    pub(crate) async fn delete(&self, keys: Vec<Vec<u8>>) -> Result<()> {
179        let operations = keys
180            .iter()
181            .map(|key| TxnOp::Delete(to_tombstone(key)))
182            .collect::<Vec<_>>();
183
184        let txn = Txn::new().and_then(operations);
185        // Always success.
186        let _ = self.kv_backend.txn(txn).await?;
187        Ok(())
188    }
189}
190
191#[cfg(test)]
192mod tests {
193
194    use std::collections::HashMap;
195    use std::sync::Arc;
196
197    use super::to_tombstone;
198    use crate::error::Error;
199    use crate::key::tombstone::TombstoneManager;
200    use crate::kv_backend::memory::MemoryKvBackend;
201    use crate::kv_backend::KvBackend;
202    use crate::rpc::store::PutRequest;
203
204    #[derive(Debug, Clone)]
205    struct MoveValue {
206        key: Vec<u8>,
207        dest_key: Vec<u8>,
208        value: Vec<u8>,
209    }
210
211    async fn check_moved_values(
212        kv_backend: Arc<MemoryKvBackend<Error>>,
213        move_values: &[MoveValue],
214    ) {
215        for MoveValue {
216            key,
217            dest_key,
218            value,
219        } in move_values
220        {
221            assert!(kv_backend.get(key).await.unwrap().is_none());
222            assert_eq!(
223                &kv_backend.get(dest_key).await.unwrap().unwrap().value,
224                value,
225            );
226        }
227    }
228
229    #[tokio::test]
230    async fn test_create_tombstone() {
231        let kv_backend = Arc::new(MemoryKvBackend::default());
232        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
233        kv_backend
234            .put(PutRequest::new().with_key("bar").with_value("baz"))
235            .await
236            .unwrap();
237        kv_backend
238            .put(PutRequest::new().with_key("foo").with_value("hi"))
239            .await
240            .unwrap();
241        tombstone_manager
242            .create(vec![b"bar".to_vec(), b"foo".to_vec()])
243            .await
244            .unwrap();
245        assert!(!kv_backend.exists(b"bar").await.unwrap());
246        assert!(!kv_backend.exists(b"foo").await.unwrap());
247        assert_eq!(
248            kv_backend
249                .get(&to_tombstone(b"bar"))
250                .await
251                .unwrap()
252                .unwrap()
253                .value,
254            b"baz"
255        );
256        assert_eq!(
257            kv_backend
258                .get(&to_tombstone(b"foo"))
259                .await
260                .unwrap()
261                .unwrap()
262                .value,
263            b"hi"
264        );
265        assert_eq!(kv_backend.len(), 2);
266    }
267
268    #[tokio::test]
269    async fn test_create_tombstone_with_non_exist_values() {
270        let kv_backend = Arc::new(MemoryKvBackend::default());
271        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
272
273        kv_backend
274            .put(PutRequest::new().with_key("bar").with_value("baz"))
275            .await
276            .unwrap();
277        kv_backend
278            .put(PutRequest::new().with_key("foo").with_value("hi"))
279            .await
280            .unwrap();
281
282        tombstone_manager
283            .create(vec![b"bar".to_vec(), b"baz".to_vec()])
284            .await
285            .unwrap();
286        check_moved_values(
287            kv_backend.clone(),
288            &[MoveValue {
289                key: b"bar".to_vec(),
290                dest_key: to_tombstone(b"bar"),
291                value: b"baz".to_vec(),
292            }],
293        )
294        .await;
295    }
296
297    #[tokio::test]
298    async fn test_restore_tombstone() {
299        let kv_backend = Arc::new(MemoryKvBackend::default());
300        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
301        kv_backend
302            .put(PutRequest::new().with_key("bar").with_value("baz"))
303            .await
304            .unwrap();
305        kv_backend
306            .put(PutRequest::new().with_key("foo").with_value("hi"))
307            .await
308            .unwrap();
309        let expected_kvs = kv_backend.dump();
310        tombstone_manager
311            .create(vec![b"bar".to_vec(), b"foo".to_vec()])
312            .await
313            .unwrap();
314        tombstone_manager
315            .restore(vec![b"bar".to_vec(), b"foo".to_vec()])
316            .await
317            .unwrap();
318        assert_eq!(expected_kvs, kv_backend.dump());
319    }
320
321    #[tokio::test]
322    async fn test_delete_tombstone() {
323        let kv_backend = Arc::new(MemoryKvBackend::default());
324        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
325        kv_backend
326            .put(PutRequest::new().with_key("bar").with_value("baz"))
327            .await
328            .unwrap();
329        kv_backend
330            .put(PutRequest::new().with_key("foo").with_value("hi"))
331            .await
332            .unwrap();
333        tombstone_manager
334            .create(vec![b"bar".to_vec(), b"foo".to_vec()])
335            .await
336            .unwrap();
337        tombstone_manager
338            .delete(vec![b"bar".to_vec(), b"foo".to_vec()])
339            .await
340            .unwrap();
341        assert!(kv_backend.is_empty());
342    }
343
344    #[tokio::test]
345    async fn test_move_values() {
346        let kv_backend = Arc::new(MemoryKvBackend::default());
347        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
348        let kvs = HashMap::from([
349            (b"bar".to_vec(), b"baz".to_vec()),
350            (b"foo".to_vec(), b"hi".to_vec()),
351            (b"baz".to_vec(), b"hello".to_vec()),
352        ]);
353        for (key, value) in &kvs {
354            kv_backend
355                .put(
356                    PutRequest::new()
357                        .with_key(key.clone())
358                        .with_value(value.clone()),
359                )
360                .await
361                .unwrap();
362        }
363        let move_values = kvs
364            .iter()
365            .map(|(key, value)| MoveValue {
366                key: key.clone(),
367                dest_key: to_tombstone(key),
368                value: value.clone(),
369            })
370            .collect::<Vec<_>>();
371        let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
372            .clone()
373            .into_iter()
374            .map(|kv| (kv.key, kv.dest_key))
375            .unzip();
376        tombstone_manager
377            .move_values(keys.clone(), dest_keys.clone())
378            .await
379            .unwrap();
380        check_moved_values(kv_backend.clone(), &move_values).await;
381        // Moves again
382        tombstone_manager
383            .move_values(keys.clone(), dest_keys.clone())
384            .await
385            .unwrap();
386        check_moved_values(kv_backend.clone(), &move_values).await;
387    }
388
389    #[tokio::test]
390    async fn test_move_values_with_non_exists_values() {
391        let kv_backend = Arc::new(MemoryKvBackend::default());
392        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
393        let kvs = HashMap::from([
394            (b"bar".to_vec(), b"baz".to_vec()),
395            (b"foo".to_vec(), b"hi".to_vec()),
396            (b"baz".to_vec(), b"hello".to_vec()),
397        ]);
398        for (key, value) in &kvs {
399            kv_backend
400                .put(
401                    PutRequest::new()
402                        .with_key(key.clone())
403                        .with_value(value.clone()),
404                )
405                .await
406                .unwrap();
407        }
408        let move_values = kvs
409            .iter()
410            .map(|(key, value)| MoveValue {
411                key: key.clone(),
412                dest_key: to_tombstone(key),
413                value: value.clone(),
414            })
415            .collect::<Vec<_>>();
416        let (mut keys, mut dest_keys): (Vec<_>, Vec<_>) = move_values
417            .clone()
418            .into_iter()
419            .map(|kv| (kv.key, kv.dest_key))
420            .unzip();
421        keys.push(b"non-exists".to_vec());
422        dest_keys.push(b"hi/non-exists".to_vec());
423        tombstone_manager
424            .move_values(keys.clone(), dest_keys.clone())
425            .await
426            .unwrap();
427        check_moved_values(kv_backend.clone(), &move_values).await;
428        // Moves again
429        tombstone_manager
430            .move_values(keys.clone(), dest_keys.clone())
431            .await
432            .unwrap();
433        check_moved_values(kv_backend.clone(), &move_values).await;
434    }
435
436    #[tokio::test]
437    async fn test_move_values_changed() {
438        let kv_backend = Arc::new(MemoryKvBackend::default());
439        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
440        let kvs = HashMap::from([
441            (b"bar".to_vec(), b"baz".to_vec()),
442            (b"foo".to_vec(), b"hi".to_vec()),
443            (b"baz".to_vec(), b"hello".to_vec()),
444        ]);
445        for (key, value) in &kvs {
446            kv_backend
447                .put(
448                    PutRequest::new()
449                        .with_key(key.clone())
450                        .with_value(value.clone()),
451                )
452                .await
453                .unwrap();
454        }
455
456        kv_backend
457            .put(PutRequest::new().with_key("baz").with_value("changed"))
458            .await
459            .unwrap();
460
461        let move_values = kvs
462            .iter()
463            .map(|(key, value)| MoveValue {
464                key: key.clone(),
465                dest_key: to_tombstone(key),
466                value: value.clone(),
467            })
468            .collect::<Vec<_>>();
469        let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
470            .clone()
471            .into_iter()
472            .map(|kv| (kv.key, kv.dest_key))
473            .unzip();
474        tombstone_manager
475            .move_values(keys, dest_keys)
476            .await
477            .unwrap();
478    }
479
480    #[tokio::test]
481    async fn test_move_values_overwrite_dest_values() {
482        let kv_backend = Arc::new(MemoryKvBackend::default());
483        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
484        let kvs = HashMap::from([
485            (b"bar".to_vec(), b"baz".to_vec()),
486            (b"foo".to_vec(), b"hi".to_vec()),
487            (b"baz".to_vec(), b"hello".to_vec()),
488        ]);
489        for (key, value) in &kvs {
490            kv_backend
491                .put(
492                    PutRequest::new()
493                        .with_key(key.clone())
494                        .with_value(value.clone()),
495                )
496                .await
497                .unwrap();
498        }
499
500        // Prepares
501        let move_values = kvs
502            .iter()
503            .map(|(key, value)| MoveValue {
504                key: key.clone(),
505                dest_key: to_tombstone(key),
506                value: value.clone(),
507            })
508            .collect::<Vec<_>>();
509        let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
510            .clone()
511            .into_iter()
512            .map(|kv| (kv.key, kv.dest_key))
513            .unzip();
514        tombstone_manager
515            .move_values(keys, dest_keys)
516            .await
517            .unwrap();
518        check_moved_values(kv_backend.clone(), &move_values).await;
519
520        // Overwrites existing dest keys.
521        let kvs = HashMap::from([
522            (b"bar".to_vec(), b"new baz".to_vec()),
523            (b"foo".to_vec(), b"new hi".to_vec()),
524            (b"baz".to_vec(), b"new baz".to_vec()),
525        ]);
526        for (key, value) in &kvs {
527            kv_backend
528                .put(
529                    PutRequest::new()
530                        .with_key(key.clone())
531                        .with_value(value.clone()),
532                )
533                .await
534                .unwrap();
535        }
536        let move_values = kvs
537            .iter()
538            .map(|(key, value)| MoveValue {
539                key: key.clone(),
540                dest_key: to_tombstone(key),
541                value: value.clone(),
542            })
543            .collect::<Vec<_>>();
544        let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
545            .clone()
546            .into_iter()
547            .map(|kv| (kv.key, kv.dest_key))
548            .unzip();
549        tombstone_manager
550            .move_values(keys, dest_keys)
551            .await
552            .unwrap();
553        check_moved_values(kv_backend.clone(), &move_values).await;
554    }
555}