cli/metadata/control/del/
table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use clap::Parser;
17use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
18use common_catalog::format_full_table_name;
19use common_error::ext::BoxedError;
20use common_meta::ddl::utils::get_region_wal_options;
21use common_meta::key::TableMetadataManager;
22use common_meta::key::table_name::TableNameManager;
23use common_meta::kv_backend::KvBackendRef;
24use store_api::storage::TableId;
25
26use crate::Tool;
27use crate::common::StoreConfig;
28use crate::error::{InvalidArgumentsSnafu, TableNotFoundSnafu};
29use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
30use crate::metadata::control::utils::get_table_id_by_name;
31
32/// Delete table metadata logically from the metadata store.
33#[derive(Debug, Default, Parser)]
34pub struct DelTableCommand {
35    /// The table id to delete from the metadata store.
36    #[clap(long)]
37    table_id: Option<u32>,
38
39    /// The table name to delete from the metadata store.
40    #[clap(long)]
41    table_name: Option<String>,
42
43    /// The schema name of the table.
44    #[clap(long, default_value = DEFAULT_SCHEMA_NAME)]
45    schema_name: String,
46
47    /// The catalog name of the table.
48    #[clap(long, default_value = DEFAULT_CATALOG_NAME)]
49    catalog_name: String,
50
51    /// The store config.
52    #[clap(flatten)]
53    store: StoreConfig,
54}
55
56impl DelTableCommand {
57    fn validate(&self) -> Result<(), BoxedError> {
58        if matches!(
59            (&self.table_id, &self.table_name),
60            (Some(_), Some(_)) | (None, None)
61        ) {
62            return Err(BoxedError::new(
63                InvalidArgumentsSnafu {
64                    msg: "You must specify either --table-id or --table-name.",
65                }
66                .build(),
67            ));
68        }
69        Ok(())
70    }
71}
72
73impl DelTableCommand {
74    pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
75        self.validate()?;
76        let kv_backend = self.store.build().await?;
77        Ok(Box::new(DelTableTool {
78            table_id: self.table_id,
79            table_name: self.table_name.clone(),
80            schema_name: self.schema_name.clone(),
81            catalog_name: self.catalog_name.clone(),
82            table_name_manager: TableNameManager::new(kv_backend.clone()),
83            table_metadata_deleter: TableMetadataDeleter::new(kv_backend),
84        }))
85    }
86}
87
88struct DelTableTool {
89    table_id: Option<u32>,
90    table_name: Option<String>,
91    schema_name: String,
92    catalog_name: String,
93    table_name_manager: TableNameManager,
94    table_metadata_deleter: TableMetadataDeleter,
95}
96
97#[async_trait]
98impl Tool for DelTableTool {
99    async fn do_work(&self) -> Result<(), BoxedError> {
100        let table_id = if let Some(table_name) = &self.table_name {
101            let catalog_name = &self.catalog_name;
102            let schema_name = &self.schema_name;
103
104            let Some(table_id) = get_table_id_by_name(
105                &self.table_name_manager,
106                catalog_name,
107                schema_name,
108                table_name,
109            )
110            .await?
111            else {
112                println!(
113                    "Table({}) not found",
114                    format_full_table_name(catalog_name, schema_name, table_name)
115                );
116                return Ok(());
117            };
118            table_id
119        } else {
120            // Safety: we have validated that table_id or table_name is not None
121            self.table_id.unwrap()
122        };
123        self.table_metadata_deleter.delete(table_id).await?;
124        println!("Table({}) deleted", table_id);
125
126        Ok(())
127    }
128}
129
130struct TableMetadataDeleter {
131    table_metadata_manager: TableMetadataManager,
132}
133
134impl TableMetadataDeleter {
135    fn new(kv_backend: KvBackendRef) -> Self {
136        Self {
137            table_metadata_manager: TableMetadataManager::new_with_custom_tombstone_prefix(
138                kv_backend,
139                CLI_TOMBSTONE_PREFIX,
140            ),
141        }
142    }
143
144    async fn delete(&self, table_id: TableId) -> Result<(), BoxedError> {
145        let (table_info, table_route) = self
146            .table_metadata_manager
147            .get_full_table_info(table_id)
148            .await
149            .map_err(BoxedError::new)?;
150        let Some(table_info) = table_info else {
151            return Err(BoxedError::new(TableNotFoundSnafu { table_id }.build()));
152        };
153        let Some(table_route) = table_route else {
154            return Err(BoxedError::new(TableNotFoundSnafu { table_id }.build()));
155        };
156        let physical_table_id = self
157            .table_metadata_manager
158            .table_route_manager()
159            .get_physical_table_id(table_id)
160            .await
161            .map_err(BoxedError::new)?;
162
163        let table_name = table_info.table_name();
164        let region_wal_options = get_region_wal_options(
165            &self.table_metadata_manager,
166            &table_route,
167            physical_table_id,
168        )
169        .await
170        .map_err(BoxedError::new)?;
171
172        self.table_metadata_manager
173            .delete_table_metadata(table_id, &table_name, &table_route, &region_wal_options)
174            .await
175            .map_err(BoxedError::new)?;
176        Ok(())
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use std::collections::HashMap;
183    use std::sync::Arc;
184
185    use common_error::ext::ErrorExt;
186    use common_error::status_code::StatusCode;
187    use common_meta::key::TableMetadataManager;
188    use common_meta::key::table_route::TableRouteValue;
189    use common_meta::kv_backend::chroot::ChrootKvBackend;
190    use common_meta::kv_backend::memory::MemoryKvBackend;
191    use common_meta::kv_backend::{KvBackend, KvBackendRef};
192    use common_meta::rpc::store::RangeRequest;
193
194    use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
195    use crate::metadata::control::del::table::TableMetadataDeleter;
196    use crate::metadata::control::test_utils::prepare_physical_table_metadata;
197
198    #[tokio::test]
199    async fn test_delete_table_not_found() {
200        let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
201
202        let table_metadata_deleter = TableMetadataDeleter::new(kv_backend);
203        let table_id = 1;
204        let err = table_metadata_deleter.delete(table_id).await.unwrap_err();
205        assert_eq!(err.status_code(), StatusCode::TableNotFound);
206    }
207
208    #[tokio::test]
209    async fn test_delete_table_metadata() {
210        let kv_backend = Arc::new(MemoryKvBackend::new());
211        let table_metadata_manager = TableMetadataManager::new(kv_backend.clone());
212        let table_id = 1024;
213        let (table_info, table_route) = prepare_physical_table_metadata("my_table", table_id).await;
214        table_metadata_manager
215            .create_table_metadata(
216                table_info,
217                TableRouteValue::Physical(table_route),
218                HashMap::new(),
219            )
220            .await
221            .unwrap();
222
223        let total_keys = kv_backend.len();
224        assert!(total_keys > 0);
225
226        let table_metadata_deleter = TableMetadataDeleter::new(kv_backend.clone());
227        table_metadata_deleter.delete(table_id).await.unwrap();
228
229        // Check the tombstone keys are deleted
230        let chroot =
231            ChrootKvBackend::new(CLI_TOMBSTONE_PREFIX.as_bytes().to_vec(), kv_backend.clone());
232        let req = RangeRequest::default().with_range(vec![0], vec![0]);
233        let resp = chroot.range(req).await.unwrap();
234        assert_eq!(resp.kvs.len(), total_keys);
235    }
236}