cli/metadata/control/del/
table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use clap::Parser;
17use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
18use common_catalog::format_full_table_name;
19use common_error::ext::BoxedError;
20use common_meta::ddl::utils::get_region_wal_options;
21use common_meta::key::table_name::TableNameManager;
22use common_meta::key::TableMetadataManager;
23use common_meta::kv_backend::KvBackendRef;
24use store_api::storage::TableId;
25
26use crate::error::{InvalidArgumentsSnafu, TableNotFoundSnafu};
27use crate::metadata::common::StoreConfig;
28use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
29use crate::metadata::control::utils::get_table_id_by_name;
30use crate::Tool;
31
32/// Delete table metadata logically from the metadata store.
33#[derive(Debug, Default, Parser)]
34pub struct DelTableCommand {
35    /// The table id to delete from the metadata store.
36    #[clap(long)]
37    table_id: Option<u32>,
38
39    /// The table name to delete from the metadata store.
40    #[clap(long)]
41    table_name: Option<String>,
42
43    /// The schema name of the table.
44    #[clap(long, default_value = DEFAULT_SCHEMA_NAME)]
45    schema_name: String,
46
47    /// The catalog name of the table.
48    #[clap(long, default_value = DEFAULT_CATALOG_NAME)]
49    catalog_name: String,
50
51    #[clap(flatten)]
52    store: StoreConfig,
53}
54
55impl DelTableCommand {
56    fn validate(&self) -> Result<(), BoxedError> {
57        if matches!(
58            (&self.table_id, &self.table_name),
59            (Some(_), Some(_)) | (None, None)
60        ) {
61            return Err(BoxedError::new(
62                InvalidArgumentsSnafu {
63                    msg: "You must specify either --table-id or --table-name.",
64                }
65                .build(),
66            ));
67        }
68        Ok(())
69    }
70}
71
72impl DelTableCommand {
73    pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
74        self.validate()?;
75        let kv_backend = self.store.build().await?;
76        Ok(Box::new(DelTableTool {
77            table_id: self.table_id,
78            table_name: self.table_name.clone(),
79            schema_name: self.schema_name.clone(),
80            catalog_name: self.catalog_name.clone(),
81            table_name_manager: TableNameManager::new(kv_backend.clone()),
82            table_metadata_deleter: TableMetadataDeleter::new(kv_backend),
83        }))
84    }
85}
86
87struct DelTableTool {
88    table_id: Option<u32>,
89    table_name: Option<String>,
90    schema_name: String,
91    catalog_name: String,
92    table_name_manager: TableNameManager,
93    table_metadata_deleter: TableMetadataDeleter,
94}
95
96#[async_trait]
97impl Tool for DelTableTool {
98    async fn do_work(&self) -> Result<(), BoxedError> {
99        let table_id = if let Some(table_name) = &self.table_name {
100            let catalog_name = &self.catalog_name;
101            let schema_name = &self.schema_name;
102
103            let Some(table_id) = get_table_id_by_name(
104                &self.table_name_manager,
105                catalog_name,
106                schema_name,
107                table_name,
108            )
109            .await?
110            else {
111                println!(
112                    "Table({}) not found",
113                    format_full_table_name(catalog_name, schema_name, table_name)
114                );
115                return Ok(());
116            };
117            table_id
118        } else {
119            // Safety: we have validated that table_id or table_name is not None
120            self.table_id.unwrap()
121        };
122        self.table_metadata_deleter.delete(table_id).await?;
123        println!("Table({}) deleted", table_id);
124
125        Ok(())
126    }
127}
128
129struct TableMetadataDeleter {
130    table_metadata_manager: TableMetadataManager,
131}
132
133impl TableMetadataDeleter {
134    fn new(kv_backend: KvBackendRef) -> Self {
135        Self {
136            table_metadata_manager: TableMetadataManager::new_with_custom_tombstone_prefix(
137                kv_backend,
138                CLI_TOMBSTONE_PREFIX,
139            ),
140        }
141    }
142
143    async fn delete(&self, table_id: TableId) -> Result<(), BoxedError> {
144        let (table_info, table_route) = self
145            .table_metadata_manager
146            .get_full_table_info(table_id)
147            .await
148            .map_err(BoxedError::new)?;
149        let Some(table_info) = table_info else {
150            return Err(BoxedError::new(TableNotFoundSnafu { table_id }.build()));
151        };
152        let Some(table_route) = table_route else {
153            return Err(BoxedError::new(TableNotFoundSnafu { table_id }.build()));
154        };
155        let physical_table_id = self
156            .table_metadata_manager
157            .table_route_manager()
158            .get_physical_table_id(table_id)
159            .await
160            .map_err(BoxedError::new)?;
161
162        let table_name = table_info.table_name();
163        let region_wal_options = get_region_wal_options(
164            &self.table_metadata_manager,
165            &table_route,
166            physical_table_id,
167        )
168        .await
169        .map_err(BoxedError::new)?;
170
171        self.table_metadata_manager
172            .delete_table_metadata(table_id, &table_name, &table_route, &region_wal_options)
173            .await
174            .map_err(BoxedError::new)?;
175        Ok(())
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use std::collections::HashMap;
182    use std::sync::Arc;
183
184    use common_error::ext::ErrorExt;
185    use common_error::status_code::StatusCode;
186    use common_meta::key::table_route::TableRouteValue;
187    use common_meta::key::TableMetadataManager;
188    use common_meta::kv_backend::chroot::ChrootKvBackend;
189    use common_meta::kv_backend::memory::MemoryKvBackend;
190    use common_meta::kv_backend::{KvBackend, KvBackendRef};
191    use common_meta::rpc::store::RangeRequest;
192
193    use crate::metadata::control::del::table::TableMetadataDeleter;
194    use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
195    use crate::metadata::control::test_utils::prepare_physical_table_metadata;
196
197    #[tokio::test]
198    async fn test_delete_table_not_found() {
199        let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
200
201        let table_metadata_deleter = TableMetadataDeleter::new(kv_backend);
202        let table_id = 1;
203        let err = table_metadata_deleter.delete(table_id).await.unwrap_err();
204        assert_eq!(err.status_code(), StatusCode::TableNotFound);
205    }
206
207    #[tokio::test]
208    async fn test_delete_table_metadata() {
209        let kv_backend = Arc::new(MemoryKvBackend::new());
210        let table_metadata_manager = TableMetadataManager::new(kv_backend.clone());
211        let table_id = 1024;
212        let (table_info, table_route) = prepare_physical_table_metadata("my_table", table_id).await;
213        table_metadata_manager
214            .create_table_metadata(
215                table_info,
216                TableRouteValue::Physical(table_route),
217                HashMap::new(),
218            )
219            .await
220            .unwrap();
221
222        let total_keys = kv_backend.len();
223        assert!(total_keys > 0);
224
225        let table_metadata_deleter = TableMetadataDeleter::new(kv_backend.clone());
226        table_metadata_deleter.delete(table_id).await.unwrap();
227
228        // Check the tombstone keys are deleted
229        let chroot =
230            ChrootKvBackend::new(CLI_TOMBSTONE_PREFIX.as_bytes().to_vec(), kv_backend.clone());
231        let req = RangeRequest::default().with_range(vec![0], vec![0]);
232        let resp = chroot.range(req).await.unwrap();
233        assert_eq!(resp.kvs.len(), total_keys);
234    }
235}