cli/metadata/control/del/
key.rs1use async_trait::async_trait;
16use clap::Parser;
17use common_error::ext::BoxedError;
18use common_meta::key::tombstone::TombstoneManager;
19use common_meta::kv_backend::KvBackendRef;
20use common_meta::rpc::store::RangeRequest;
21
22use crate::metadata::common::StoreConfig;
23use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
24use crate::Tool;
25
26#[derive(Debug, Default, Parser)]
28pub struct DelKeyCommand {
29 key: String,
31
32 #[clap(long)]
34 prefix: bool,
35
36 #[clap(flatten)]
37 store: StoreConfig,
38}
39
40impl DelKeyCommand {
41 pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
42 let kv_backend = self.store.build().await?;
43 Ok(Box::new(DelKeyTool {
44 key: self.key.to_string(),
45 prefix: self.prefix,
46 key_deleter: KeyDeleter::new(kv_backend),
47 }))
48 }
49}
50
51struct KeyDeleter {
52 kv_backend: KvBackendRef,
53 tombstone_manager: TombstoneManager,
54}
55
56impl KeyDeleter {
57 fn new(kv_backend: KvBackendRef) -> Self {
58 Self {
59 kv_backend: kv_backend.clone(),
60 tombstone_manager: TombstoneManager::new_with_prefix(kv_backend, CLI_TOMBSTONE_PREFIX),
61 }
62 }
63
64 async fn delete(&self, key: &str, prefix: bool) -> Result<usize, BoxedError> {
65 let mut req = RangeRequest::default().with_keys_only();
66 if prefix {
67 req = req.with_prefix(key.as_bytes());
68 } else {
69 req = req.with_key(key.as_bytes());
70 }
71 let resp = self.kv_backend.range(req).await.map_err(BoxedError::new)?;
72 let keys = resp.kvs.iter().map(|kv| kv.key.clone()).collect::<Vec<_>>();
73 self.tombstone_manager
74 .create(keys)
75 .await
76 .map_err(BoxedError::new)
77 }
78}
79
80struct DelKeyTool {
81 key: String,
82 prefix: bool,
83 key_deleter: KeyDeleter,
84}
85
86#[async_trait]
87impl Tool for DelKeyTool {
88 async fn do_work(&self) -> Result<(), BoxedError> {
89 let deleted = self.key_deleter.delete(&self.key, self.prefix).await?;
90 println!("{}", deleted);
92 Ok(())
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use std::sync::Arc;
99
100 use common_meta::kv_backend::chroot::ChrootKvBackend;
101 use common_meta::kv_backend::memory::MemoryKvBackend;
102 use common_meta::kv_backend::{KvBackend, KvBackendRef};
103 use common_meta::rpc::store::RangeRequest;
104
105 use crate::metadata::control::del::key::KeyDeleter;
106 use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
107 use crate::metadata::control::test_utils::put_key;
108
109 #[tokio::test]
110 async fn test_delete_keys() {
111 let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
112 let key_deleter = KeyDeleter::new(kv_backend.clone());
113 put_key(&kv_backend, "foo", "bar").await;
114 put_key(&kv_backend, "foo/bar", "baz").await;
115 put_key(&kv_backend, "foo/baz", "qux").await;
116 let deleted = key_deleter.delete("foo", true).await.unwrap();
117 assert_eq!(deleted, 3);
118 let deleted = key_deleter.delete("foo/bar", false).await.unwrap();
119 assert_eq!(deleted, 0);
120
121 let chroot = ChrootKvBackend::new(CLI_TOMBSTONE_PREFIX.as_bytes().to_vec(), kv_backend);
122 let req = RangeRequest::default().with_prefix(b"foo");
123 let resp = chroot.range(req).await.unwrap();
124 assert_eq!(resp.kvs.len(), 3);
125 assert_eq!(resp.kvs[0].key, b"foo");
126 assert_eq!(resp.kvs[0].value, b"bar");
127 assert_eq!(resp.kvs[1].key, b"foo/bar");
128 assert_eq!(resp.kvs[1].value, b"baz");
129 assert_eq!(resp.kvs[2].key, b"foo/baz");
130 assert_eq!(resp.kvs[2].value, b"qux");
131 }
132}