cli/metadata/control/del/
table.rs1use async_trait::async_trait;
16use clap::Parser;
17use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
18use common_catalog::format_full_table_name;
19use common_error::ext::BoxedError;
20use common_meta::ddl::utils::get_region_wal_options;
21use common_meta::key::table_name::TableNameManager;
22use common_meta::key::TableMetadataManager;
23use common_meta::kv_backend::KvBackendRef;
24use store_api::storage::TableId;
25
26use crate::error::{InvalidArgumentsSnafu, TableNotFoundSnafu};
27use crate::metadata::common::StoreConfig;
28use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
29use crate::metadata::control::utils::get_table_id_by_name;
30use crate::Tool;
31
32#[derive(Debug, Default, Parser)]
34pub struct DelTableCommand {
35 #[clap(long)]
37 table_id: Option<u32>,
38
39 #[clap(long)]
41 table_name: Option<String>,
42
43 #[clap(long, default_value = DEFAULT_SCHEMA_NAME)]
45 schema_name: String,
46
47 #[clap(long, default_value = DEFAULT_CATALOG_NAME)]
49 catalog_name: String,
50
51 #[clap(flatten)]
52 store: StoreConfig,
53}
54
55impl DelTableCommand {
56 fn validate(&self) -> Result<(), BoxedError> {
57 if matches!(
58 (&self.table_id, &self.table_name),
59 (Some(_), Some(_)) | (None, None)
60 ) {
61 return Err(BoxedError::new(
62 InvalidArgumentsSnafu {
63 msg: "You must specify either --table-id or --table-name.",
64 }
65 .build(),
66 ));
67 }
68 Ok(())
69 }
70}
71
72impl DelTableCommand {
73 pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
74 self.validate()?;
75 let kv_backend = self.store.build().await?;
76 Ok(Box::new(DelTableTool {
77 table_id: self.table_id,
78 table_name: self.table_name.clone(),
79 schema_name: self.schema_name.clone(),
80 catalog_name: self.catalog_name.clone(),
81 table_name_manager: TableNameManager::new(kv_backend.clone()),
82 table_metadata_deleter: TableMetadataDeleter::new(kv_backend),
83 }))
84 }
85}
86
87struct DelTableTool {
88 table_id: Option<u32>,
89 table_name: Option<String>,
90 schema_name: String,
91 catalog_name: String,
92 table_name_manager: TableNameManager,
93 table_metadata_deleter: TableMetadataDeleter,
94}
95
96#[async_trait]
97impl Tool for DelTableTool {
98 async fn do_work(&self) -> Result<(), BoxedError> {
99 let table_id = if let Some(table_name) = &self.table_name {
100 let catalog_name = &self.catalog_name;
101 let schema_name = &self.schema_name;
102
103 let Some(table_id) = get_table_id_by_name(
104 &self.table_name_manager,
105 catalog_name,
106 schema_name,
107 table_name,
108 )
109 .await?
110 else {
111 println!(
112 "Table({}) not found",
113 format_full_table_name(catalog_name, schema_name, table_name)
114 );
115 return Ok(());
116 };
117 table_id
118 } else {
119 self.table_id.unwrap()
121 };
122 self.table_metadata_deleter.delete(table_id).await?;
123 println!("Table({}) deleted", table_id);
124
125 Ok(())
126 }
127}
128
129struct TableMetadataDeleter {
130 table_metadata_manager: TableMetadataManager,
131}
132
133impl TableMetadataDeleter {
134 fn new(kv_backend: KvBackendRef) -> Self {
135 Self {
136 table_metadata_manager: TableMetadataManager::new_with_custom_tombstone_prefix(
137 kv_backend,
138 CLI_TOMBSTONE_PREFIX,
139 ),
140 }
141 }
142
143 async fn delete(&self, table_id: TableId) -> Result<(), BoxedError> {
144 let (table_info, table_route) = self
145 .table_metadata_manager
146 .get_full_table_info(table_id)
147 .await
148 .map_err(BoxedError::new)?;
149 let Some(table_info) = table_info else {
150 return Err(BoxedError::new(TableNotFoundSnafu { table_id }.build()));
151 };
152 let Some(table_route) = table_route else {
153 return Err(BoxedError::new(TableNotFoundSnafu { table_id }.build()));
154 };
155 let physical_table_id = self
156 .table_metadata_manager
157 .table_route_manager()
158 .get_physical_table_id(table_id)
159 .await
160 .map_err(BoxedError::new)?;
161
162 let table_name = table_info.table_name();
163 let region_wal_options = get_region_wal_options(
164 &self.table_metadata_manager,
165 &table_route,
166 physical_table_id,
167 )
168 .await
169 .map_err(BoxedError::new)?;
170
171 self.table_metadata_manager
172 .delete_table_metadata(table_id, &table_name, &table_route, ®ion_wal_options)
173 .await
174 .map_err(BoxedError::new)?;
175 Ok(())
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use std::collections::HashMap;
182 use std::sync::Arc;
183
184 use common_error::ext::ErrorExt;
185 use common_error::status_code::StatusCode;
186 use common_meta::key::table_route::TableRouteValue;
187 use common_meta::key::TableMetadataManager;
188 use common_meta::kv_backend::chroot::ChrootKvBackend;
189 use common_meta::kv_backend::memory::MemoryKvBackend;
190 use common_meta::kv_backend::{KvBackend, KvBackendRef};
191 use common_meta::rpc::store::RangeRequest;
192
193 use crate::metadata::control::del::table::TableMetadataDeleter;
194 use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
195 use crate::metadata::control::test_utils::prepare_physical_table_metadata;
196
197 #[tokio::test]
198 async fn test_delete_table_not_found() {
199 let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
200
201 let table_metadata_deleter = TableMetadataDeleter::new(kv_backend);
202 let table_id = 1;
203 let err = table_metadata_deleter.delete(table_id).await.unwrap_err();
204 assert_eq!(err.status_code(), StatusCode::TableNotFound);
205 }
206
207 #[tokio::test]
208 async fn test_delete_table_metadata() {
209 let kv_backend = Arc::new(MemoryKvBackend::new());
210 let table_metadata_manager = TableMetadataManager::new(kv_backend.clone());
211 let table_id = 1024;
212 let (table_info, table_route) = prepare_physical_table_metadata("my_table", table_id).await;
213 table_metadata_manager
214 .create_table_metadata(
215 table_info,
216 TableRouteValue::Physical(table_route),
217 HashMap::new(),
218 )
219 .await
220 .unwrap();
221
222 let total_keys = kv_backend.len();
223 assert!(total_keys > 0);
224
225 let table_metadata_deleter = TableMetadataDeleter::new(kv_backend.clone());
226 table_metadata_deleter.delete(table_id).await.unwrap();
227
228 let chroot =
230 ChrootKvBackend::new(CLI_TOMBSTONE_PREFIX.as_bytes().to_vec(), kv_backend.clone());
231 let req = RangeRequest::default().with_range(vec![0], vec![0]);
232 let resp = chroot.range(req).await.unwrap();
233 assert_eq!(resp.kvs.len(), total_keys);
234 }
235}