cli/metadata/control/del/
key.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use clap::Parser;
17use common_error::ext::BoxedError;
18use common_meta::key::tombstone::TombstoneManager;
19use common_meta::kv_backend::KvBackendRef;
20use common_meta::rpc::store::RangeRequest;
21
22use crate::metadata::common::StoreConfig;
23use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
24use crate::Tool;
25
26/// Delete key-value pairs logically from the metadata store.
27#[derive(Debug, Default, Parser)]
28pub struct DelKeyCommand {
29    /// The key to delete from the metadata store.
30    key: String,
31
32    /// Delete key-value pairs with the given prefix.
33    #[clap(long)]
34    prefix: bool,
35
36    #[clap(flatten)]
37    store: StoreConfig,
38}
39
40impl DelKeyCommand {
41    pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
42        let kv_backend = self.store.build().await?;
43        Ok(Box::new(DelKeyTool {
44            key: self.key.to_string(),
45            prefix: self.prefix,
46            key_deleter: KeyDeleter::new(kv_backend),
47        }))
48    }
49}
50
51struct KeyDeleter {
52    kv_backend: KvBackendRef,
53    tombstone_manager: TombstoneManager,
54}
55
56impl KeyDeleter {
57    fn new(kv_backend: KvBackendRef) -> Self {
58        Self {
59            kv_backend: kv_backend.clone(),
60            tombstone_manager: TombstoneManager::new_with_prefix(kv_backend, CLI_TOMBSTONE_PREFIX),
61        }
62    }
63
64    async fn delete(&self, key: &str, prefix: bool) -> Result<usize, BoxedError> {
65        let mut req = RangeRequest::default().with_keys_only();
66        if prefix {
67            req = req.with_prefix(key.as_bytes());
68        } else {
69            req = req.with_key(key.as_bytes());
70        }
71        let resp = self.kv_backend.range(req).await.map_err(BoxedError::new)?;
72        let keys = resp.kvs.iter().map(|kv| kv.key.clone()).collect::<Vec<_>>();
73        self.tombstone_manager
74            .create(keys)
75            .await
76            .map_err(BoxedError::new)
77    }
78}
79
80struct DelKeyTool {
81    key: String,
82    prefix: bool,
83    key_deleter: KeyDeleter,
84}
85
86#[async_trait]
87impl Tool for DelKeyTool {
88    async fn do_work(&self) -> Result<(), BoxedError> {
89        let deleted = self.key_deleter.delete(&self.key, self.prefix).await?;
90        // Print the number of deleted keys.
91        println!("{}", deleted);
92        Ok(())
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use std::sync::Arc;
99
100    use common_meta::kv_backend::chroot::ChrootKvBackend;
101    use common_meta::kv_backend::memory::MemoryKvBackend;
102    use common_meta::kv_backend::{KvBackend, KvBackendRef};
103    use common_meta::rpc::store::RangeRequest;
104
105    use crate::metadata::control::del::key::KeyDeleter;
106    use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
107    use crate::metadata::control::test_utils::put_key;
108
109    #[tokio::test]
110    async fn test_delete_keys() {
111        let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
112        let key_deleter = KeyDeleter::new(kv_backend.clone());
113        put_key(&kv_backend, "foo", "bar").await;
114        put_key(&kv_backend, "foo/bar", "baz").await;
115        put_key(&kv_backend, "foo/baz", "qux").await;
116        let deleted = key_deleter.delete("foo", true).await.unwrap();
117        assert_eq!(deleted, 3);
118        let deleted = key_deleter.delete("foo/bar", false).await.unwrap();
119        assert_eq!(deleted, 0);
120
121        let chroot = ChrootKvBackend::new(CLI_TOMBSTONE_PREFIX.as_bytes().to_vec(), kv_backend);
122        let req = RangeRequest::default().with_prefix(b"foo");
123        let resp = chroot.range(req).await.unwrap();
124        assert_eq!(resp.kvs.len(), 3);
125        assert_eq!(resp.kvs[0].key, b"foo");
126        assert_eq!(resp.kvs[0].value, b"bar");
127        assert_eq!(resp.kvs[1].key, b"foo/bar");
128        assert_eq!(resp.kvs[1].value, b"baz");
129        assert_eq!(resp.kvs[2].key, b"foo/baz");
130        assert_eq!(resp.kvs[2].value, b"qux");
131    }
132}