common_meta/ddl/drop_table/
executor.rs1use std::collections::HashMap;
16
17use api::v1::region::{
18 CloseRequest as PbCloseRegionRequest, DropRequest as PbDropRegionRequest, RegionRequest,
19 RegionRequestHeader, region_request,
20};
21use common_error::ext::ErrorExt;
22use common_error::status_code::StatusCode;
23use common_telemetry::tracing_context::TracingContext;
24use common_telemetry::{debug, error};
25use common_wal::options::WalOptions;
26use futures::future::join_all;
27use snafu::ensure;
28use store_api::storage::{RegionId, RegionNumber};
29use table::metadata::TableId;
30use table::table_name::TableName;
31
32use crate::cache_invalidator::Context;
33use crate::ddl::DdlContext;
34use crate::ddl::utils::{add_peer_context_if_needed, convert_region_routes_to_detecting_regions};
35use crate::error::{self, Result};
36use crate::instruction::CacheIdent;
37use crate::key::table_name::TableNameKey;
38use crate::key::table_route::TableRouteValue;
39use crate::node_manager::NodeManagerRef;
40use crate::region_registry::LeaderRegionRegistryRef;
41use crate::rpc::router::{
42 RegionRoute, find_follower_regions, find_followers, find_leader_regions, find_leaders,
43 operating_leader_regions,
44};
45
46#[derive(Debug)]
48pub enum Control<T> {
49 Continue(T),
50 Stop,
51}
52
53impl<T> Control<T> {
54 pub fn stop(&self) -> bool {
56 matches!(self, Control::Stop)
57 }
58}
59
60impl DropTableExecutor {
61 pub fn new(table: TableName, table_id: TableId, drop_if_exists: bool) -> Self {
63 Self {
64 table,
65 table_id,
66 drop_if_exists,
67 }
68 }
69}
70
71pub struct DropTableExecutor {
76 table: TableName,
77 table_id: TableId,
78 drop_if_exists: bool,
79}
80
81impl DropTableExecutor {
82 pub async fn on_prepare(&self, ctx: &DdlContext) -> Result<Control<()>> {
86 let table_ref = self.table.table_ref();
87
88 let exist = ctx
89 .table_metadata_manager
90 .table_name_manager()
91 .exists(TableNameKey::new(
92 table_ref.catalog,
93 table_ref.schema,
94 table_ref.table,
95 ))
96 .await?;
97
98 if !exist && self.drop_if_exists {
99 return Ok(Control::Stop);
100 }
101
102 ensure!(
103 exist,
104 error::TableNotFoundSnafu {
105 table_name: table_ref.to_string()
106 }
107 );
108
109 Ok(Control::Continue(()))
110 }
111
112 pub async fn on_delete_metadata(
114 &self,
115 ctx: &DdlContext,
116 table_route_value: &TableRouteValue,
117 region_wal_options: &HashMap<RegionNumber, WalOptions>,
118 ) -> Result<()> {
119 ctx.table_metadata_manager
120 .delete_table_metadata(
121 self.table_id,
122 &self.table,
123 table_route_value,
124 region_wal_options,
125 )
126 .await
127 }
128
129 pub async fn on_delete_metadata_tombstone(
131 &self,
132 ctx: &DdlContext,
133 table_route_value: &TableRouteValue,
134 region_wal_options: &HashMap<u32, WalOptions>,
135 ) -> Result<()> {
136 ctx.table_metadata_manager
137 .delete_table_metadata_tombstone(
138 self.table_id,
139 &self.table,
140 table_route_value,
141 region_wal_options,
142 )
143 .await
144 }
145
146 pub async fn on_destroy_metadata(
148 &self,
149 ctx: &DdlContext,
150 table_route_value: &TableRouteValue,
151 region_wal_options: &HashMap<u32, WalOptions>,
152 ) -> Result<()> {
153 ctx.table_metadata_manager
154 .destroy_table_metadata(
155 self.table_id,
156 &self.table,
157 table_route_value,
158 region_wal_options,
159 )
160 .await?;
161
162 let detecting_regions = if table_route_value.is_physical() {
163 let regions = table_route_value.region_routes().unwrap();
165 convert_region_routes_to_detecting_regions(regions)
166 } else {
167 vec![]
168 };
169 ctx.deregister_failure_detectors(detecting_regions).await;
170 Ok(())
171 }
172
173 pub async fn on_restore_metadata(
175 &self,
176 ctx: &DdlContext,
177 table_route_value: &TableRouteValue,
178 region_wal_options: &HashMap<u32, WalOptions>,
179 ) -> Result<()> {
180 ctx.table_metadata_manager
181 .restore_table_metadata(
182 self.table_id,
183 &self.table,
184 table_route_value,
185 region_wal_options,
186 )
187 .await
188 }
189
190 pub async fn invalidate_table_cache(&self, ctx: &DdlContext) -> Result<()> {
192 let cache_invalidator = &ctx.cache_invalidator;
193 let ctx = Context {
194 subject: Some(format!(
195 "Invalidate table cache by dropping table {}, table_id: {}",
196 self.table.table_ref(),
197 self.table_id,
198 )),
199 };
200
201 cache_invalidator
202 .invalidate(
203 &ctx,
204 &[
205 CacheIdent::TableName(self.table.table_ref().into()),
206 CacheIdent::TableId(self.table_id),
207 ],
208 )
209 .await?;
210
211 Ok(())
212 }
213
214 pub async fn on_drop_regions(
216 &self,
217 node_manager: &NodeManagerRef,
218 leader_region_registry: &LeaderRegionRegistryRef,
219 region_routes: &[RegionRoute],
220 fast_path: bool,
221 force: bool,
222 partial_drop: bool,
223 ) -> Result<()> {
224 let leaders = find_leaders(region_routes);
226 let mut drop_region_tasks = Vec::with_capacity(leaders.len());
227 let table_id = self.table_id;
228 for datanode in leaders {
229 let requester = node_manager.datanode(&datanode).await;
230 let regions = find_leader_regions(region_routes, &datanode);
231 let region_ids = regions
232 .iter()
233 .map(|region_number| RegionId::new(table_id, *region_number))
234 .collect::<Vec<_>>();
235
236 for region_id in region_ids {
237 debug!("Dropping region {region_id} on Datanode {datanode:?}");
238 let request = RegionRequest {
239 header: Some(RegionRequestHeader {
240 tracing_context: TracingContext::from_current_span().to_w3c(),
241 ..Default::default()
242 }),
243 body: Some(region_request::Body::Drop(PbDropRegionRequest {
244 region_id: region_id.as_u64(),
245 fast_path,
246 force,
247 partial_drop,
248 soft_drop: false,
249 })),
250 };
251 let datanode = datanode.clone();
252 let requester = requester.clone();
253 drop_region_tasks.push(async move {
254 if let Err(err) = requester.handle(request).await
255 && err.status_code() != StatusCode::RegionNotFound
256 {
257 return Err(add_peer_context_if_needed(datanode)(err));
258 }
259 Ok(())
260 });
261 }
262 }
263
264 join_all(drop_region_tasks)
265 .await
266 .into_iter()
267 .collect::<Result<Vec<_>>>()?;
268
269 let followers = find_followers(region_routes);
271 let mut close_region_tasks = Vec::with_capacity(followers.len());
272 for datanode in followers {
273 let requester = node_manager.datanode(&datanode).await;
274 let regions = find_follower_regions(region_routes, &datanode);
275 let region_ids = regions
276 .iter()
277 .map(|region_number| RegionId::new(table_id, *region_number))
278 .collect::<Vec<_>>();
279
280 for region_id in region_ids {
281 debug!("Closing region {region_id} on Datanode {datanode:?}");
282 let request = RegionRequest {
283 header: Some(RegionRequestHeader {
284 tracing_context: TracingContext::from_current_span().to_w3c(),
285 ..Default::default()
286 }),
287 body: Some(region_request::Body::Close(PbCloseRegionRequest {
288 region_id: region_id.as_u64(),
289 })),
290 };
291
292 let datanode = datanode.clone();
293 let requester = requester.clone();
294 close_region_tasks.push(async move {
295 if let Err(err) = requester.handle(request).await
296 && err.status_code() != StatusCode::RegionNotFound
297 {
298 return Err(add_peer_context_if_needed(datanode)(err));
299 }
300 Ok(())
301 });
302 }
303 }
304
305 if let Err(err) = join_all(close_region_tasks)
309 .await
310 .into_iter()
311 .collect::<Result<Vec<_>>>()
312 {
313 error!(err; "Failed to close follower regions on datanodes, table_id: {}", table_id);
314 }
315
316 let region_ids = operating_leader_regions(region_routes);
318 leader_region_registry.batch_delete(region_ids.into_iter().map(|(region_id, _)| region_id));
319
320 Ok(())
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use std::assert_matches;
327 use std::collections::HashMap;
328 use std::sync::Arc;
329
330 use api::v1::{ColumnDataType, SemanticType};
331 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
332 use table::metadata::TableInfo;
333 use table::table_name::TableName;
334
335 use super::*;
336 use crate::ddl::test_util::columns::TestColumnDefBuilder;
337 use crate::ddl::test_util::create_table::{
338 TestCreateTableExprBuilder, build_raw_table_info_from_expr,
339 };
340 use crate::key::table_route::TableRouteValue;
341 use crate::test_util::{MockDatanodeManager, new_ddl_context};
342
343 fn test_create_raw_table_info(name: &str) -> TableInfo {
344 let create_table = TestCreateTableExprBuilder::default()
345 .column_defs([
346 TestColumnDefBuilder::default()
347 .name("ts")
348 .data_type(ColumnDataType::TimestampMillisecond)
349 .semantic_type(SemanticType::Timestamp)
350 .build()
351 .unwrap()
352 .into(),
353 TestColumnDefBuilder::default()
354 .name("host")
355 .data_type(ColumnDataType::String)
356 .semantic_type(SemanticType::Tag)
357 .build()
358 .unwrap()
359 .into(),
360 TestColumnDefBuilder::default()
361 .name("cpu")
362 .data_type(ColumnDataType::Float64)
363 .semantic_type(SemanticType::Field)
364 .build()
365 .unwrap()
366 .into(),
367 ])
368 .time_index("ts")
369 .primary_keys(["host".into()])
370 .table_name(name)
371 .build()
372 .unwrap()
373 .into();
374 build_raw_table_info_from_expr(&create_table)
375 }
376
377 #[tokio::test]
378 async fn test_on_prepare() {
379 let node_manager = Arc::new(MockDatanodeManager::new(()));
381 let ctx = new_ddl_context(node_manager);
382 let executor = DropTableExecutor::new(
383 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table"),
384 1024,
385 true,
386 );
387 let ctrl = executor.on_prepare(&ctx).await.unwrap();
388 assert!(ctrl.stop());
389
390 let executor = DropTableExecutor::new(
392 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table"),
393 1024,
394 false,
395 );
396 let err = executor.on_prepare(&ctx).await.unwrap_err();
397 assert_matches!(err, error::Error::TableNotFound { .. });
398
399 let executor = DropTableExecutor::new(
401 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table"),
402 1024,
403 false,
404 );
405 let raw_table_info = test_create_raw_table_info("my_table");
406 ctx.table_metadata_manager
407 .create_table_metadata(
408 raw_table_info,
409 TableRouteValue::physical(vec![]),
410 HashMap::new(),
411 )
412 .await
413 .unwrap();
414 let ctrl = executor.on_prepare(&ctx).await.unwrap();
415 assert!(!ctrl.stop());
416 }
417}