common_meta/ddl/drop_table/
executor.rs1use std::collections::HashMap;
16
17use api::v1::region::{
18 CloseRequest as PbCloseRegionRequest, DropRequest as PbDropRegionRequest, RegionRequest,
19 RegionRequestHeader, region_request,
20};
21use common_error::ext::ErrorExt;
22use common_error::status_code::StatusCode;
23use common_telemetry::tracing_context::TracingContext;
24use common_telemetry::{debug, error};
25use common_wal::options::WalOptions;
26use futures::future::join_all;
27use snafu::ensure;
28use store_api::storage::{RegionId, RegionNumber};
29use table::metadata::TableId;
30use table::table_name::TableName;
31
32use crate::cache_invalidator::Context;
33use crate::ddl::DdlContext;
34use crate::ddl::utils::{add_peer_context_if_needed, convert_region_routes_to_detecting_regions};
35use crate::error::{self, Result};
36use crate::instruction::CacheIdent;
37use crate::key::table_name::TableNameKey;
38use crate::key::table_route::TableRouteValue;
39use crate::node_manager::NodeManagerRef;
40use crate::region_registry::LeaderRegionRegistryRef;
41use crate::rpc::router::{
42 RegionRoute, find_follower_regions, find_followers, find_leader_regions, find_leaders,
43 operating_leader_regions,
44};
45
46#[derive(Debug)]
48pub enum Control<T> {
49 Continue(T),
50 Stop,
51}
52
53impl<T> Control<T> {
54 pub fn stop(&self) -> bool {
56 matches!(self, Control::Stop)
57 }
58}
59
60impl DropTableExecutor {
61 pub fn new(table: TableName, table_id: TableId, drop_if_exists: bool) -> Self {
63 Self {
64 table,
65 table_id,
66 drop_if_exists,
67 }
68 }
69}
70
71pub struct DropTableExecutor {
76 table: TableName,
77 table_id: TableId,
78 drop_if_exists: bool,
79}
80
81impl DropTableExecutor {
82 pub async fn on_prepare(&self, ctx: &DdlContext) -> Result<Control<()>> {
86 let table_ref = self.table.table_ref();
87
88 let exist = ctx
89 .table_metadata_manager
90 .table_name_manager()
91 .exists(TableNameKey::new(
92 table_ref.catalog,
93 table_ref.schema,
94 table_ref.table,
95 ))
96 .await?;
97
98 if !exist && self.drop_if_exists {
99 return Ok(Control::Stop);
100 }
101
102 ensure!(
103 exist,
104 error::TableNotFoundSnafu {
105 table_name: table_ref.to_string()
106 }
107 );
108
109 Ok(Control::Continue(()))
110 }
111
112 pub async fn on_delete_metadata(
114 &self,
115 ctx: &DdlContext,
116 table_route_value: &TableRouteValue,
117 region_wal_options: &HashMap<RegionNumber, WalOptions>,
118 ) -> Result<()> {
119 ctx.table_metadata_manager
120 .delete_table_metadata(
121 self.table_id,
122 &self.table,
123 table_route_value,
124 region_wal_options,
125 )
126 .await
127 }
128
129 pub async fn on_delete_metadata_tombstone(
131 &self,
132 ctx: &DdlContext,
133 table_route_value: &TableRouteValue,
134 region_wal_options: &HashMap<u32, WalOptions>,
135 ) -> Result<()> {
136 ctx.table_metadata_manager
137 .delete_table_metadata_tombstone(
138 self.table_id,
139 &self.table,
140 table_route_value,
141 region_wal_options,
142 )
143 .await
144 }
145
146 pub async fn on_destroy_metadata(
148 &self,
149 ctx: &DdlContext,
150 table_route_value: &TableRouteValue,
151 region_wal_options: &HashMap<u32, WalOptions>,
152 ) -> Result<()> {
153 ctx.table_metadata_manager
154 .destroy_table_metadata(
155 self.table_id,
156 &self.table,
157 table_route_value,
158 region_wal_options,
159 )
160 .await?;
161
162 let detecting_regions = if table_route_value.is_physical() {
163 let regions = table_route_value.region_routes().unwrap();
165 convert_region_routes_to_detecting_regions(regions)
166 } else {
167 vec![]
168 };
169 ctx.deregister_failure_detectors(detecting_regions).await;
170 Ok(())
171 }
172
173 pub async fn on_restore_metadata(
175 &self,
176 ctx: &DdlContext,
177 table_route_value: &TableRouteValue,
178 region_wal_options: &HashMap<u32, WalOptions>,
179 ) -> Result<()> {
180 ctx.table_metadata_manager
181 .restore_table_metadata(
182 self.table_id,
183 &self.table,
184 table_route_value,
185 region_wal_options,
186 )
187 .await
188 }
189
190 pub async fn invalidate_table_cache(&self, ctx: &DdlContext) -> Result<()> {
192 let cache_invalidator = &ctx.cache_invalidator;
193 let ctx = Context {
194 subject: Some(format!(
195 "Invalidate table cache by dropping table {}, table_id: {}",
196 self.table.table_ref(),
197 self.table_id,
198 )),
199 };
200
201 cache_invalidator
202 .invalidate(
203 &ctx,
204 &[
205 CacheIdent::TableName(self.table.table_ref().into()),
206 CacheIdent::TableId(self.table_id),
207 ],
208 )
209 .await?;
210
211 Ok(())
212 }
213
214 pub async fn on_drop_regions(
216 &self,
217 node_manager: &NodeManagerRef,
218 leader_region_registry: &LeaderRegionRegistryRef,
219 region_routes: &[RegionRoute],
220 fast_path: bool,
221 force: bool,
222 partial_drop: bool,
223 ) -> Result<()> {
224 let leaders = find_leaders(region_routes);
226 let mut drop_region_tasks = Vec::with_capacity(leaders.len());
227 let table_id = self.table_id;
228 for datanode in leaders {
229 let requester = node_manager.datanode(&datanode).await;
230 let regions = find_leader_regions(region_routes, &datanode);
231 let region_ids = regions
232 .iter()
233 .map(|region_number| RegionId::new(table_id, *region_number))
234 .collect::<Vec<_>>();
235
236 for region_id in region_ids {
237 debug!("Dropping region {region_id} on Datanode {datanode:?}");
238 let request = RegionRequest {
239 header: Some(RegionRequestHeader {
240 tracing_context: TracingContext::from_current_span().to_w3c(),
241 ..Default::default()
242 }),
243 body: Some(region_request::Body::Drop(PbDropRegionRequest {
244 region_id: region_id.as_u64(),
245 fast_path,
246 force,
247 partial_drop,
248 })),
249 };
250 let datanode = datanode.clone();
251 let requester = requester.clone();
252 drop_region_tasks.push(async move {
253 if let Err(err) = requester.handle(request).await
254 && err.status_code() != StatusCode::RegionNotFound
255 {
256 return Err(add_peer_context_if_needed(datanode)(err));
257 }
258 Ok(())
259 });
260 }
261 }
262
263 join_all(drop_region_tasks)
264 .await
265 .into_iter()
266 .collect::<Result<Vec<_>>>()?;
267
268 let followers = find_followers(region_routes);
270 let mut close_region_tasks = Vec::with_capacity(followers.len());
271 for datanode in followers {
272 let requester = node_manager.datanode(&datanode).await;
273 let regions = find_follower_regions(region_routes, &datanode);
274 let region_ids = regions
275 .iter()
276 .map(|region_number| RegionId::new(table_id, *region_number))
277 .collect::<Vec<_>>();
278
279 for region_id in region_ids {
280 debug!("Closing region {region_id} on Datanode {datanode:?}");
281 let request = RegionRequest {
282 header: Some(RegionRequestHeader {
283 tracing_context: TracingContext::from_current_span().to_w3c(),
284 ..Default::default()
285 }),
286 body: Some(region_request::Body::Close(PbCloseRegionRequest {
287 region_id: region_id.as_u64(),
288 })),
289 };
290
291 let datanode = datanode.clone();
292 let requester = requester.clone();
293 close_region_tasks.push(async move {
294 if let Err(err) = requester.handle(request).await
295 && err.status_code() != StatusCode::RegionNotFound
296 {
297 return Err(add_peer_context_if_needed(datanode)(err));
298 }
299 Ok(())
300 });
301 }
302 }
303
304 if let Err(err) = join_all(close_region_tasks)
308 .await
309 .into_iter()
310 .collect::<Result<Vec<_>>>()
311 {
312 error!(err; "Failed to close follower regions on datanodes, table_id: {}", table_id);
313 }
314
315 let region_ids = operating_leader_regions(region_routes);
317 leader_region_registry.batch_delete(region_ids.into_iter().map(|(region_id, _)| region_id));
318
319 Ok(())
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use std::assert_matches::assert_matches;
326 use std::collections::HashMap;
327 use std::sync::Arc;
328
329 use api::v1::{ColumnDataType, SemanticType};
330 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
331 use table::metadata::TableInfo;
332 use table::table_name::TableName;
333
334 use super::*;
335 use crate::ddl::test_util::columns::TestColumnDefBuilder;
336 use crate::ddl::test_util::create_table::{
337 TestCreateTableExprBuilder, build_raw_table_info_from_expr,
338 };
339 use crate::key::table_route::TableRouteValue;
340 use crate::test_util::{MockDatanodeManager, new_ddl_context};
341
342 fn test_create_raw_table_info(name: &str) -> TableInfo {
343 let create_table = TestCreateTableExprBuilder::default()
344 .column_defs([
345 TestColumnDefBuilder::default()
346 .name("ts")
347 .data_type(ColumnDataType::TimestampMillisecond)
348 .semantic_type(SemanticType::Timestamp)
349 .build()
350 .unwrap()
351 .into(),
352 TestColumnDefBuilder::default()
353 .name("host")
354 .data_type(ColumnDataType::String)
355 .semantic_type(SemanticType::Tag)
356 .build()
357 .unwrap()
358 .into(),
359 TestColumnDefBuilder::default()
360 .name("cpu")
361 .data_type(ColumnDataType::Float64)
362 .semantic_type(SemanticType::Field)
363 .build()
364 .unwrap()
365 .into(),
366 ])
367 .time_index("ts")
368 .primary_keys(["host".into()])
369 .table_name(name)
370 .build()
371 .unwrap()
372 .into();
373 build_raw_table_info_from_expr(&create_table)
374 }
375
376 #[tokio::test]
377 async fn test_on_prepare() {
378 let node_manager = Arc::new(MockDatanodeManager::new(()));
380 let ctx = new_ddl_context(node_manager);
381 let executor = DropTableExecutor::new(
382 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table"),
383 1024,
384 true,
385 );
386 let ctrl = executor.on_prepare(&ctx).await.unwrap();
387 assert!(ctrl.stop());
388
389 let executor = DropTableExecutor::new(
391 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table"),
392 1024,
393 false,
394 );
395 let err = executor.on_prepare(&ctx).await.unwrap_err();
396 assert_matches!(err, error::Error::TableNotFound { .. });
397
398 let executor = DropTableExecutor::new(
400 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table"),
401 1024,
402 false,
403 );
404 let raw_table_info = test_create_raw_table_info("my_table");
405 ctx.table_metadata_manager
406 .create_table_metadata(
407 raw_table_info,
408 TableRouteValue::physical(vec![]),
409 HashMap::new(),
410 )
411 .await
412 .unwrap();
413 let ctrl = executor.on_prepare(&ctx).await.unwrap();
414 assert!(!ctrl.stop());
415 }
416}