common_meta/key/
tombstone.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use snafu::ensure;
18
19use crate::error::{self, Result};
20use crate::key::txn_helper::TxnOpGetResponseSet;
21use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
22use crate::kv_backend::KvBackendRef;
23use crate::rpc::store::BatchGetRequest;
24
25/// [TombstoneManager] provides the ability to:
26/// - logically delete values
27/// - restore the deleted values
28pub struct TombstoneManager {
29    kv_backend: KvBackendRef,
30    tombstone_prefix: String,
31}
32
33const TOMBSTONE_PREFIX: &str = "__tombstone/";
34
35impl TombstoneManager {
36    /// Returns [TombstoneManager].
37    pub fn new(kv_backend: KvBackendRef) -> Self {
38        Self {
39            kv_backend,
40            tombstone_prefix: TOMBSTONE_PREFIX.to_string(),
41        }
42    }
43
44    /// Returns [TombstoneManager] with a custom tombstone prefix.
45    pub fn new_with_prefix(kv_backend: KvBackendRef, prefix: &str) -> Self {
46        Self {
47            kv_backend,
48            tombstone_prefix: prefix.to_string(),
49        }
50    }
51
52    pub fn to_tombstone(&self, key: &[u8]) -> Vec<u8> {
53        [self.tombstone_prefix.as_bytes(), key].concat()
54    }
55
56    /// Moves value to `dest_key`.
57    ///
58    /// Puts `value` to `dest_key` if the value of `src_key` equals `value`.
59    ///
60    /// Otherwise retrieves the value of `src_key`.
61    fn build_move_value_txn(
62        &self,
63        src_key: Vec<u8>,
64        value: Vec<u8>,
65        dest_key: Vec<u8>,
66    ) -> (Txn, impl FnMut(&mut TxnOpGetResponseSet) -> Option<Vec<u8>>) {
67        let txn = Txn::new()
68            .when(vec![Compare::with_value(
69                src_key.clone(),
70                CompareOp::Equal,
71                value.clone(),
72            )])
73            .and_then(vec![
74                TxnOp::Put(dest_key.clone(), value.clone()),
75                TxnOp::Delete(src_key.clone()),
76            ])
77            .or_else(vec![TxnOp::Get(src_key.clone())]);
78
79        (txn, TxnOpGetResponseSet::filter(src_key))
80    }
81
82    async fn move_values_inner(&self, keys: &[Vec<u8>], dest_keys: &[Vec<u8>]) -> Result<usize> {
83        ensure!(
84            keys.len() == dest_keys.len(),
85            error::UnexpectedSnafu {
86                err_msg: "The length of keys does not match the length of dest_keys."
87            }
88        );
89        // The key -> dest key mapping.
90        let lookup_table = keys.iter().zip(dest_keys.iter()).collect::<HashMap<_, _>>();
91
92        let resp = self
93            .kv_backend
94            .batch_get(BatchGetRequest::new().with_keys(keys.to_vec()))
95            .await?;
96        let mut results = resp
97            .kvs
98            .into_iter()
99            .map(|kv| (kv.key, kv.value))
100            .collect::<HashMap<_, _>>();
101
102        const MAX_RETRIES: usize = 8;
103        for _ in 0..MAX_RETRIES {
104            let (txns, (keys, filters)): (Vec<_>, (Vec<_>, Vec<_>)) = results
105                .iter()
106                .map(|(key, value)| {
107                    let (txn, filter) = self.build_move_value_txn(
108                        key.clone(),
109                        value.clone(),
110                        lookup_table[&key].clone(),
111                    );
112                    (txn, (key.clone(), filter))
113                })
114                .unzip();
115            let mut resp = self.kv_backend.txn(Txn::merge_all(txns)).await?;
116            if resp.succeeded {
117                return Ok(keys.len());
118            }
119            let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
120            // Updates results.
121            for (idx, mut filter) in filters.into_iter().enumerate() {
122                if let Some(value) = filter(&mut set) {
123                    results.insert(keys[idx].clone(), value);
124                } else {
125                    results.remove(&keys[idx]);
126                }
127            }
128        }
129
130        error::MoveValuesSnafu {
131            err_msg: format!(
132                "keys: {:?}",
133                keys.iter().map(|key| String::from_utf8_lossy(key)),
134            ),
135        }
136        .fail()
137    }
138
139    /// Moves values to `dest_key`.
140    ///
141    /// Returns the number of keys that were moved.
142    async fn move_values(&self, keys: Vec<Vec<u8>>, dest_keys: Vec<Vec<u8>>) -> Result<usize> {
143        let chunk_size = self.kv_backend.max_txn_ops() / 2;
144        if keys.len() > chunk_size {
145            let keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
146            let dest_keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
147            for (keys, dest_keys) in keys_chunks.into_iter().zip(dest_keys_chunks) {
148                self.move_values_inner(keys, dest_keys).await?;
149            }
150
151            Ok(keys.len())
152        } else {
153            self.move_values_inner(&keys, &dest_keys).await
154        }
155    }
156
157    /// Creates tombstones for keys.
158    ///
159    /// Preforms to:
160    /// - deletes origin values.
161    /// - stores tombstone values.
162    ///
163    /// Returns the number of keys that were moved.
164    pub async fn create(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
165        let (keys, dest_keys): (Vec<_>, Vec<_>) = keys
166            .into_iter()
167            .map(|key| {
168                let tombstone_key = self.to_tombstone(&key);
169                (key, tombstone_key)
170            })
171            .unzip();
172
173        self.move_values(keys, dest_keys).await
174    }
175
176    /// Restores tombstones for keys.
177    ///
178    /// Preforms to:
179    /// - restore origin value.
180    /// - deletes tombstone values.
181    ///
182    /// Returns the number of keys that were restored.
183    pub async fn restore(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
184        let (keys, dest_keys): (Vec<_>, Vec<_>) = keys
185            .into_iter()
186            .map(|key| {
187                let tombstone_key = self.to_tombstone(&key);
188                (tombstone_key, key)
189            })
190            .unzip();
191
192        self.move_values(keys, dest_keys).await
193    }
194
195    /// Deletes tombstones values for the specified `keys`.
196    ///
197    /// Returns the number of keys that were deleted.
198    pub async fn delete(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
199        let operations = keys
200            .iter()
201            .map(|key| TxnOp::Delete(self.to_tombstone(key)))
202            .collect::<Vec<_>>();
203
204        let txn = Txn::new().and_then(operations);
205        // Always success.
206        let _ = self.kv_backend.txn(txn).await?;
207        Ok(keys.len())
208    }
209}
210
211#[cfg(test)]
212mod tests {
213
214    use std::collections::HashMap;
215    use std::sync::Arc;
216
217    use crate::error::Error;
218    use crate::key::tombstone::TombstoneManager;
219    use crate::kv_backend::memory::MemoryKvBackend;
220    use crate::kv_backend::KvBackend;
221    use crate::rpc::store::PutRequest;
222
223    #[derive(Debug, Clone)]
224    struct MoveValue {
225        key: Vec<u8>,
226        dest_key: Vec<u8>,
227        value: Vec<u8>,
228    }
229
230    async fn check_moved_values(
231        kv_backend: Arc<MemoryKvBackend<Error>>,
232        move_values: &[MoveValue],
233    ) {
234        for MoveValue {
235            key,
236            dest_key,
237            value,
238        } in move_values
239        {
240            assert!(kv_backend.get(key).await.unwrap().is_none());
241            assert_eq!(
242                &kv_backend.get(dest_key).await.unwrap().unwrap().value,
243                value,
244            );
245        }
246    }
247
248    #[tokio::test]
249    async fn test_create_tombstone() {
250        let kv_backend = Arc::new(MemoryKvBackend::default());
251        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
252        kv_backend
253            .put(PutRequest::new().with_key("bar").with_value("baz"))
254            .await
255            .unwrap();
256        kv_backend
257            .put(PutRequest::new().with_key("foo").with_value("hi"))
258            .await
259            .unwrap();
260        tombstone_manager
261            .create(vec![b"bar".to_vec(), b"foo".to_vec()])
262            .await
263            .unwrap();
264        assert!(!kv_backend.exists(b"bar").await.unwrap());
265        assert!(!kv_backend.exists(b"foo").await.unwrap());
266        assert_eq!(
267            kv_backend
268                .get(&tombstone_manager.to_tombstone(b"bar"))
269                .await
270                .unwrap()
271                .unwrap()
272                .value,
273            b"baz"
274        );
275        assert_eq!(
276            kv_backend
277                .get(&tombstone_manager.to_tombstone(b"foo"))
278                .await
279                .unwrap()
280                .unwrap()
281                .value,
282            b"hi"
283        );
284        assert_eq!(kv_backend.len(), 2);
285    }
286
287    #[tokio::test]
288    async fn test_create_tombstone_with_non_exist_values() {
289        let kv_backend = Arc::new(MemoryKvBackend::default());
290        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
291
292        kv_backend
293            .put(PutRequest::new().with_key("bar").with_value("baz"))
294            .await
295            .unwrap();
296        kv_backend
297            .put(PutRequest::new().with_key("foo").with_value("hi"))
298            .await
299            .unwrap();
300
301        tombstone_manager
302            .create(vec![b"bar".to_vec(), b"baz".to_vec()])
303            .await
304            .unwrap();
305        check_moved_values(
306            kv_backend.clone(),
307            &[MoveValue {
308                key: b"bar".to_vec(),
309                dest_key: tombstone_manager.to_tombstone(b"bar"),
310                value: b"baz".to_vec(),
311            }],
312        )
313        .await;
314    }
315
316    #[tokio::test]
317    async fn test_restore_tombstone() {
318        let kv_backend = Arc::new(MemoryKvBackend::default());
319        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
320        kv_backend
321            .put(PutRequest::new().with_key("bar").with_value("baz"))
322            .await
323            .unwrap();
324        kv_backend
325            .put(PutRequest::new().with_key("foo").with_value("hi"))
326            .await
327            .unwrap();
328        let expected_kvs = kv_backend.dump();
329        tombstone_manager
330            .create(vec![b"bar".to_vec(), b"foo".to_vec()])
331            .await
332            .unwrap();
333        tombstone_manager
334            .restore(vec![b"bar".to_vec(), b"foo".to_vec()])
335            .await
336            .unwrap();
337        assert_eq!(expected_kvs, kv_backend.dump());
338    }
339
340    #[tokio::test]
341    async fn test_delete_tombstone() {
342        let kv_backend = Arc::new(MemoryKvBackend::default());
343        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
344        kv_backend
345            .put(PutRequest::new().with_key("bar").with_value("baz"))
346            .await
347            .unwrap();
348        kv_backend
349            .put(PutRequest::new().with_key("foo").with_value("hi"))
350            .await
351            .unwrap();
352        tombstone_manager
353            .create(vec![b"bar".to_vec(), b"foo".to_vec()])
354            .await
355            .unwrap();
356        tombstone_manager
357            .delete(vec![b"bar".to_vec(), b"foo".to_vec()])
358            .await
359            .unwrap();
360        assert!(kv_backend.is_empty());
361    }
362
363    #[tokio::test]
364    async fn test_move_values() {
365        let kv_backend = Arc::new(MemoryKvBackend::default());
366        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
367        let kvs = HashMap::from([
368            (b"bar".to_vec(), b"baz".to_vec()),
369            (b"foo".to_vec(), b"hi".to_vec()),
370            (b"baz".to_vec(), b"hello".to_vec()),
371        ]);
372        for (key, value) in &kvs {
373            kv_backend
374                .put(
375                    PutRequest::new()
376                        .with_key(key.clone())
377                        .with_value(value.clone()),
378                )
379                .await
380                .unwrap();
381        }
382        let move_values = kvs
383            .iter()
384            .map(|(key, value)| MoveValue {
385                key: key.clone(),
386                dest_key: tombstone_manager.to_tombstone(key),
387                value: value.clone(),
388            })
389            .collect::<Vec<_>>();
390        let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
391            .clone()
392            .into_iter()
393            .map(|kv| (kv.key, kv.dest_key))
394            .unzip();
395        tombstone_manager
396            .move_values(keys.clone(), dest_keys.clone())
397            .await
398            .unwrap();
399        check_moved_values(kv_backend.clone(), &move_values).await;
400        // Moves again
401        tombstone_manager
402            .move_values(keys.clone(), dest_keys.clone())
403            .await
404            .unwrap();
405        check_moved_values(kv_backend.clone(), &move_values).await;
406    }
407
408    #[tokio::test]
409    async fn test_move_values_with_non_exists_values() {
410        let kv_backend = Arc::new(MemoryKvBackend::default());
411        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
412        let kvs = HashMap::from([
413            (b"bar".to_vec(), b"baz".to_vec()),
414            (b"foo".to_vec(), b"hi".to_vec()),
415            (b"baz".to_vec(), b"hello".to_vec()),
416        ]);
417        for (key, value) in &kvs {
418            kv_backend
419                .put(
420                    PutRequest::new()
421                        .with_key(key.clone())
422                        .with_value(value.clone()),
423                )
424                .await
425                .unwrap();
426        }
427        let move_values = kvs
428            .iter()
429            .map(|(key, value)| MoveValue {
430                key: key.clone(),
431                dest_key: tombstone_manager.to_tombstone(key),
432                value: value.clone(),
433            })
434            .collect::<Vec<_>>();
435        let (mut keys, mut dest_keys): (Vec<_>, Vec<_>) = move_values
436            .clone()
437            .into_iter()
438            .map(|kv| (kv.key, kv.dest_key))
439            .unzip();
440        keys.push(b"non-exists".to_vec());
441        dest_keys.push(b"hi/non-exists".to_vec());
442        tombstone_manager
443            .move_values(keys.clone(), dest_keys.clone())
444            .await
445            .unwrap();
446        check_moved_values(kv_backend.clone(), &move_values).await;
447        // Moves again
448        tombstone_manager
449            .move_values(keys.clone(), dest_keys.clone())
450            .await
451            .unwrap();
452        check_moved_values(kv_backend.clone(), &move_values).await;
453    }
454
455    #[tokio::test]
456    async fn test_move_values_changed() {
457        let kv_backend = Arc::new(MemoryKvBackend::default());
458        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
459        let kvs = HashMap::from([
460            (b"bar".to_vec(), b"baz".to_vec()),
461            (b"foo".to_vec(), b"hi".to_vec()),
462            (b"baz".to_vec(), b"hello".to_vec()),
463        ]);
464        for (key, value) in &kvs {
465            kv_backend
466                .put(
467                    PutRequest::new()
468                        .with_key(key.clone())
469                        .with_value(value.clone()),
470                )
471                .await
472                .unwrap();
473        }
474
475        kv_backend
476            .put(PutRequest::new().with_key("baz").with_value("changed"))
477            .await
478            .unwrap();
479
480        let move_values = kvs
481            .iter()
482            .map(|(key, value)| MoveValue {
483                key: key.clone(),
484                dest_key: tombstone_manager.to_tombstone(key),
485                value: value.clone(),
486            })
487            .collect::<Vec<_>>();
488        let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
489            .clone()
490            .into_iter()
491            .map(|kv| (kv.key, kv.dest_key))
492            .unzip();
493        tombstone_manager
494            .move_values(keys, dest_keys)
495            .await
496            .unwrap();
497    }
498
499    #[tokio::test]
500    async fn test_move_values_overwrite_dest_values() {
501        let kv_backend = Arc::new(MemoryKvBackend::default());
502        let tombstone_manager = TombstoneManager::new(kv_backend.clone());
503        let kvs = HashMap::from([
504            (b"bar".to_vec(), b"baz".to_vec()),
505            (b"foo".to_vec(), b"hi".to_vec()),
506            (b"baz".to_vec(), b"hello".to_vec()),
507        ]);
508        for (key, value) in &kvs {
509            kv_backend
510                .put(
511                    PutRequest::new()
512                        .with_key(key.clone())
513                        .with_value(value.clone()),
514                )
515                .await
516                .unwrap();
517        }
518
519        // Prepares
520        let move_values = kvs
521            .iter()
522            .map(|(key, value)| MoveValue {
523                key: key.clone(),
524                dest_key: tombstone_manager.to_tombstone(key),
525                value: value.clone(),
526            })
527            .collect::<Vec<_>>();
528        let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
529            .clone()
530            .into_iter()
531            .map(|kv| (kv.key, kv.dest_key))
532            .unzip();
533        tombstone_manager
534            .move_values(keys, dest_keys)
535            .await
536            .unwrap();
537        check_moved_values(kv_backend.clone(), &move_values).await;
538
539        // Overwrites existing dest keys.
540        let kvs = HashMap::from([
541            (b"bar".to_vec(), b"new baz".to_vec()),
542            (b"foo".to_vec(), b"new hi".to_vec()),
543            (b"baz".to_vec(), b"new baz".to_vec()),
544        ]);
545        for (key, value) in &kvs {
546            kv_backend
547                .put(
548                    PutRequest::new()
549                        .with_key(key.clone())
550                        .with_value(value.clone()),
551                )
552                .await
553                .unwrap();
554        }
555        let move_values = kvs
556            .iter()
557            .map(|(key, value)| MoveValue {
558                key: key.clone(),
559                dest_key: tombstone_manager.to_tombstone(key),
560                value: value.clone(),
561            })
562            .collect::<Vec<_>>();
563        let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
564            .clone()
565            .into_iter()
566            .map(|kv| (kv.key, kv.dest_key))
567            .unzip();
568        tombstone_manager
569            .move_values(keys, dest_keys)
570            .await
571            .unwrap();
572        check_moved_values(kv_backend.clone(), &move_values).await;
573    }
574}