Skip to main content

cli/metadata/control/del/
table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use clap::Parser;
17use common_error::ext::BoxedError;
18use common_meta::ddl::utils::get_region_wal_options;
19use common_meta::key::TableMetadataManager;
20use common_meta::kv_backend::KvBackendRef;
21use store_api::storage::TableId;
22
23use crate::Tool;
24use crate::common::StoreConfig;
25use crate::error::TableNotFoundSnafu;
26use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
27use crate::metadata::control::selector::TableSelector;
28
29/// Delete table metadata logically from the metadata store.
30#[derive(Debug, Default, Parser)]
31pub struct DelTableCommand {
32    #[clap(flatten)]
33    selector: TableSelector,
34
35    /// The store config.
36    #[clap(flatten)]
37    store: StoreConfig,
38}
39
40impl DelTableCommand {
41    pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
42        self.selector.validate()?;
43        let kv_backend = self.store.build().await?;
44        Ok(Box::new(DelTableTool {
45            selector: self.selector.clone(),
46            table_metadata_deleter: TableMetadataDeleter::new(kv_backend),
47        }))
48    }
49}
50
51struct DelTableTool {
52    selector: TableSelector,
53    table_metadata_deleter: TableMetadataDeleter,
54}
55
56#[async_trait]
57impl Tool for DelTableTool {
58    async fn do_work(&self) -> Result<(), BoxedError> {
59        let Some(table_id) = self
60            .selector
61            .resolve_table_id(
62                self.table_metadata_deleter
63                    .table_metadata_manager
64                    .table_name_manager(),
65            )
66            .await?
67        else {
68            println!("Table({}) not found", self.selector.formatted_table_name());
69            return Ok(());
70        };
71        self.table_metadata_deleter.delete(table_id).await?;
72        println!("Table({}) deleted", table_id);
73
74        Ok(())
75    }
76}
77
78struct TableMetadataDeleter {
79    table_metadata_manager: TableMetadataManager,
80}
81
82impl TableMetadataDeleter {
83    fn new(kv_backend: KvBackendRef) -> Self {
84        Self {
85            table_metadata_manager: TableMetadataManager::new_with_custom_tombstone_prefix(
86                kv_backend,
87                CLI_TOMBSTONE_PREFIX,
88            ),
89        }
90    }
91
92    async fn delete(&self, table_id: TableId) -> Result<(), BoxedError> {
93        let (table_info, table_route) = self
94            .table_metadata_manager
95            .get_full_table_info(table_id)
96            .await
97            .map_err(BoxedError::new)?;
98        let Some(table_info) = table_info else {
99            return Err(BoxedError::new(TableNotFoundSnafu { table_id }.build()));
100        };
101        let Some(table_route) = table_route else {
102            return Err(BoxedError::new(TableNotFoundSnafu { table_id }.build()));
103        };
104        let physical_table_id = self
105            .table_metadata_manager
106            .table_route_manager()
107            .get_physical_table_id(table_id)
108            .await
109            .map_err(BoxedError::new)?;
110
111        let table_name = table_info.table_name();
112        let region_wal_options = get_region_wal_options(
113            &self.table_metadata_manager,
114            &table_route,
115            physical_table_id,
116        )
117        .await
118        .map_err(BoxedError::new)?;
119
120        self.table_metadata_manager
121            .delete_table_metadata(table_id, &table_name, &table_route, &region_wal_options)
122            .await
123            .map_err(BoxedError::new)?;
124        Ok(())
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use std::collections::HashMap;
131    use std::sync::Arc;
132
133    use clap::Parser;
134    use common_error::ext::ErrorExt;
135    use common_error::status_code::StatusCode;
136    use common_meta::key::TableMetadataManager;
137    use common_meta::key::table_route::TableRouteValue;
138    use common_meta::kv_backend::chroot::ChrootKvBackend;
139    use common_meta::kv_backend::memory::MemoryKvBackend;
140    use common_meta::kv_backend::{KvBackend, KvBackendRef};
141    use common_meta::rpc::store::RangeRequest;
142
143    use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
144    use crate::metadata::control::del::table::{DelTableCommand, TableMetadataDeleter};
145    use crate::metadata::control::test_utils::prepare_physical_table_metadata;
146
147    #[tokio::test]
148    async fn test_del_table_selector_requires_single_target() {
149        let command = DelTableCommand::parse_from([
150            "table",
151            "--backend",
152            "memory-store",
153            "--store-addrs",
154            "memory://",
155        ]);
156
157        let err = match command.build().await {
158            Ok(_) => panic!("expected validation failure"),
159            Err(err) => err,
160        };
161        assert!(
162            err.output_msg()
163                .contains("You must specify either --table-id or --table-name.")
164        );
165    }
166
167    #[tokio::test]
168    async fn test_del_table_selector_rejects_both_targets() {
169        let command = DelTableCommand::parse_from([
170            "table",
171            "--table-id",
172            "1024",
173            "--table-name",
174            "my_table",
175            "--backend",
176            "memory-store",
177            "--store-addrs",
178            "memory://",
179        ]);
180
181        let err = match command.build().await {
182            Ok(_) => panic!("expected validation failure"),
183            Err(err) => err,
184        };
185        assert!(
186            err.output_msg()
187                .contains("You must specify either --table-id or --table-name.")
188        );
189    }
190
191    #[tokio::test]
192    async fn test_del_table_command_builds_tool_with_table_id() {
193        let command = DelTableCommand::parse_from([
194            "table",
195            "--table-id",
196            "1024",
197            "--backend",
198            "memory-store",
199            "--store-addrs",
200            "memory://",
201        ]);
202
203        let _tool = command.build().await.unwrap();
204    }
205
206    #[tokio::test]
207    async fn test_del_table_command_builds_tool_with_table_name() {
208        let command = DelTableCommand::parse_from([
209            "table",
210            "--table-name",
211            "my_table",
212            "--backend",
213            "memory-store",
214            "--store-addrs",
215            "memory://",
216        ]);
217
218        let _tool = command.build().await.unwrap();
219    }
220
221    #[tokio::test]
222    async fn test_delete_table_not_found() {
223        let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
224
225        let table_metadata_deleter = TableMetadataDeleter::new(kv_backend);
226        let table_id = 1;
227        let err = table_metadata_deleter.delete(table_id).await.unwrap_err();
228        assert_eq!(err.status_code(), StatusCode::TableNotFound);
229    }
230
231    #[tokio::test]
232    async fn test_delete_table_metadata() {
233        let kv_backend = Arc::new(MemoryKvBackend::new());
234        let table_metadata_manager = TableMetadataManager::new(kv_backend.clone());
235        let table_id = 1024;
236        let (table_info, table_route) = prepare_physical_table_metadata("my_table", table_id).await;
237        table_metadata_manager
238            .create_table_metadata(
239                table_info,
240                TableRouteValue::Physical(table_route),
241                HashMap::new(),
242            )
243            .await
244            .unwrap();
245
246        let total_keys = kv_backend.len();
247        assert!(total_keys > 0);
248
249        let table_metadata_deleter = TableMetadataDeleter::new(kv_backend.clone());
250        table_metadata_deleter.delete(table_id).await.unwrap();
251
252        // Check the tombstone keys are deleted
253        let chroot =
254            ChrootKvBackend::new(CLI_TOMBSTONE_PREFIX.as_bytes().to_vec(), kv_backend.clone());
255        let req = RangeRequest::default().with_range(vec![0], vec![0]);
256        let resp = chroot.range(req).await.unwrap();
257        assert_eq!(resp.kvs.len(), total_keys);
258    }
259}