cli/metadata/control/del/
table.rs1use async_trait::async_trait;
16use clap::Parser;
17use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
18use common_catalog::format_full_table_name;
19use common_error::ext::BoxedError;
20use common_meta::ddl::utils::get_region_wal_options;
21use common_meta::key::TableMetadataManager;
22use common_meta::key::table_name::TableNameManager;
23use common_meta::kv_backend::KvBackendRef;
24use store_api::storage::TableId;
25
26use crate::Tool;
27use crate::common::StoreConfig;
28use crate::error::{InvalidArgumentsSnafu, TableNotFoundSnafu};
29use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
30use crate::metadata::control::utils::get_table_id_by_name;
31
32#[derive(Debug, Default, Parser)]
34pub struct DelTableCommand {
35 #[clap(long)]
37 table_id: Option<u32>,
38
39 #[clap(long)]
41 table_name: Option<String>,
42
43 #[clap(long, default_value = DEFAULT_SCHEMA_NAME)]
45 schema_name: String,
46
47 #[clap(long, default_value = DEFAULT_CATALOG_NAME)]
49 catalog_name: String,
50
51 #[clap(flatten)]
53 store: StoreConfig,
54}
55
56impl DelTableCommand {
57 fn validate(&self) -> Result<(), BoxedError> {
58 if matches!(
59 (&self.table_id, &self.table_name),
60 (Some(_), Some(_)) | (None, None)
61 ) {
62 return Err(BoxedError::new(
63 InvalidArgumentsSnafu {
64 msg: "You must specify either --table-id or --table-name.",
65 }
66 .build(),
67 ));
68 }
69 Ok(())
70 }
71}
72
73impl DelTableCommand {
74 pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
75 self.validate()?;
76 let kv_backend = self.store.build().await?;
77 Ok(Box::new(DelTableTool {
78 table_id: self.table_id,
79 table_name: self.table_name.clone(),
80 schema_name: self.schema_name.clone(),
81 catalog_name: self.catalog_name.clone(),
82 table_name_manager: TableNameManager::new(kv_backend.clone()),
83 table_metadata_deleter: TableMetadataDeleter::new(kv_backend),
84 }))
85 }
86}
87
88struct DelTableTool {
89 table_id: Option<u32>,
90 table_name: Option<String>,
91 schema_name: String,
92 catalog_name: String,
93 table_name_manager: TableNameManager,
94 table_metadata_deleter: TableMetadataDeleter,
95}
96
97#[async_trait]
98impl Tool for DelTableTool {
99 async fn do_work(&self) -> Result<(), BoxedError> {
100 let table_id = if let Some(table_name) = &self.table_name {
101 let catalog_name = &self.catalog_name;
102 let schema_name = &self.schema_name;
103
104 let Some(table_id) = get_table_id_by_name(
105 &self.table_name_manager,
106 catalog_name,
107 schema_name,
108 table_name,
109 )
110 .await?
111 else {
112 println!(
113 "Table({}) not found",
114 format_full_table_name(catalog_name, schema_name, table_name)
115 );
116 return Ok(());
117 };
118 table_id
119 } else {
120 self.table_id.unwrap()
122 };
123 self.table_metadata_deleter.delete(table_id).await?;
124 println!("Table({}) deleted", table_id);
125
126 Ok(())
127 }
128}
129
130struct TableMetadataDeleter {
131 table_metadata_manager: TableMetadataManager,
132}
133
134impl TableMetadataDeleter {
135 fn new(kv_backend: KvBackendRef) -> Self {
136 Self {
137 table_metadata_manager: TableMetadataManager::new_with_custom_tombstone_prefix(
138 kv_backend,
139 CLI_TOMBSTONE_PREFIX,
140 ),
141 }
142 }
143
144 async fn delete(&self, table_id: TableId) -> Result<(), BoxedError> {
145 let (table_info, table_route) = self
146 .table_metadata_manager
147 .get_full_table_info(table_id)
148 .await
149 .map_err(BoxedError::new)?;
150 let Some(table_info) = table_info else {
151 return Err(BoxedError::new(TableNotFoundSnafu { table_id }.build()));
152 };
153 let Some(table_route) = table_route else {
154 return Err(BoxedError::new(TableNotFoundSnafu { table_id }.build()));
155 };
156 let physical_table_id = self
157 .table_metadata_manager
158 .table_route_manager()
159 .get_physical_table_id(table_id)
160 .await
161 .map_err(BoxedError::new)?;
162
163 let table_name = table_info.table_name();
164 let region_wal_options = get_region_wal_options(
165 &self.table_metadata_manager,
166 &table_route,
167 physical_table_id,
168 )
169 .await
170 .map_err(BoxedError::new)?;
171
172 self.table_metadata_manager
173 .delete_table_metadata(table_id, &table_name, &table_route, ®ion_wal_options)
174 .await
175 .map_err(BoxedError::new)?;
176 Ok(())
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use std::collections::HashMap;
183 use std::sync::Arc;
184
185 use common_error::ext::ErrorExt;
186 use common_error::status_code::StatusCode;
187 use common_meta::key::TableMetadataManager;
188 use common_meta::key::table_route::TableRouteValue;
189 use common_meta::kv_backend::chroot::ChrootKvBackend;
190 use common_meta::kv_backend::memory::MemoryKvBackend;
191 use common_meta::kv_backend::{KvBackend, KvBackendRef};
192 use common_meta::rpc::store::RangeRequest;
193
194 use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
195 use crate::metadata::control::del::table::TableMetadataDeleter;
196 use crate::metadata::control::test_utils::prepare_physical_table_metadata;
197
198 #[tokio::test]
199 async fn test_delete_table_not_found() {
200 let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
201
202 let table_metadata_deleter = TableMetadataDeleter::new(kv_backend);
203 let table_id = 1;
204 let err = table_metadata_deleter.delete(table_id).await.unwrap_err();
205 assert_eq!(err.status_code(), StatusCode::TableNotFound);
206 }
207
208 #[tokio::test]
209 async fn test_delete_table_metadata() {
210 let kv_backend = Arc::new(MemoryKvBackend::new());
211 let table_metadata_manager = TableMetadataManager::new(kv_backend.clone());
212 let table_id = 1024;
213 let (table_info, table_route) = prepare_physical_table_metadata("my_table", table_id).await;
214 table_metadata_manager
215 .create_table_metadata(
216 table_info,
217 TableRouteValue::Physical(table_route),
218 HashMap::new(),
219 )
220 .await
221 .unwrap();
222
223 let total_keys = kv_backend.len();
224 assert!(total_keys > 0);
225
226 let table_metadata_deleter = TableMetadataDeleter::new(kv_backend.clone());
227 table_metadata_deleter.delete(table_id).await.unwrap();
228
229 let chroot =
231 ChrootKvBackend::new(CLI_TOMBSTONE_PREFIX.as_bytes().to_vec(), kv_backend.clone());
232 let req = RangeRequest::default().with_range(vec![0], vec![0]);
233 let resp = chroot.range(req).await.unwrap();
234 assert_eq!(resp.kvs.len(), total_keys);
235 }
236}