common_meta/ddl/drop_table/
executor.rs1use std::collections::HashMap;
16
17use api::v1::region::{
18 region_request, CloseRequest as PbCloseRegionRequest, DropRequest as PbDropRegionRequest,
19 RegionRequest, RegionRequestHeader,
20};
21use common_error::ext::ErrorExt;
22use common_error::status_code::StatusCode;
23use common_telemetry::tracing_context::TracingContext;
24use common_telemetry::{debug, error};
25use common_wal::options::WalOptions;
26use futures::future::join_all;
27use snafu::ensure;
28use store_api::storage::{RegionId, RegionNumber};
29use table::metadata::TableId;
30use table::table_name::TableName;
31
32use crate::cache_invalidator::Context;
33use crate::ddl::utils::{add_peer_context_if_needed, convert_region_routes_to_detecting_regions};
34use crate::ddl::DdlContext;
35use crate::error::{self, Result};
36use crate::instruction::CacheIdent;
37use crate::key::table_name::TableNameKey;
38use crate::key::table_route::TableRouteValue;
39use crate::rpc::router::{
40 find_follower_regions, find_followers, find_leader_regions, find_leaders,
41 operating_leader_regions, RegionRoute,
42};
43
44#[derive(Debug)]
46pub enum Control<T> {
47 Continue(T),
48 Stop,
49}
50
51impl<T> Control<T> {
52 pub fn stop(&self) -> bool {
54 matches!(self, Control::Stop)
55 }
56}
57
58impl DropTableExecutor {
59 pub fn new(table: TableName, table_id: TableId, drop_if_exists: bool) -> Self {
61 Self {
62 table,
63 table_id,
64 drop_if_exists,
65 }
66 }
67}
68
69pub struct DropTableExecutor {
74 table: TableName,
75 table_id: TableId,
76 drop_if_exists: bool,
77}
78
79impl DropTableExecutor {
80 pub async fn on_prepare(&self, ctx: &DdlContext) -> Result<Control<()>> {
84 let table_ref = self.table.table_ref();
85
86 let exist = ctx
87 .table_metadata_manager
88 .table_name_manager()
89 .exists(TableNameKey::new(
90 table_ref.catalog,
91 table_ref.schema,
92 table_ref.table,
93 ))
94 .await?;
95
96 if !exist && self.drop_if_exists {
97 return Ok(Control::Stop);
98 }
99
100 ensure!(
101 exist,
102 error::TableNotFoundSnafu {
103 table_name: table_ref.to_string()
104 }
105 );
106
107 Ok(Control::Continue(()))
108 }
109
110 pub async fn on_delete_metadata(
112 &self,
113 ctx: &DdlContext,
114 table_route_value: &TableRouteValue,
115 region_wal_options: &HashMap<RegionNumber, WalOptions>,
116 ) -> Result<()> {
117 ctx.table_metadata_manager
118 .delete_table_metadata(
119 self.table_id,
120 &self.table,
121 table_route_value,
122 region_wal_options,
123 )
124 .await
125 }
126
127 pub async fn on_delete_metadata_tombstone(
129 &self,
130 ctx: &DdlContext,
131 table_route_value: &TableRouteValue,
132 region_wal_options: &HashMap<u32, WalOptions>,
133 ) -> Result<()> {
134 ctx.table_metadata_manager
135 .delete_table_metadata_tombstone(
136 self.table_id,
137 &self.table,
138 table_route_value,
139 region_wal_options,
140 )
141 .await
142 }
143
144 pub async fn on_destroy_metadata(
146 &self,
147 ctx: &DdlContext,
148 table_route_value: &TableRouteValue,
149 region_wal_options: &HashMap<u32, WalOptions>,
150 ) -> Result<()> {
151 ctx.table_metadata_manager
152 .destroy_table_metadata(
153 self.table_id,
154 &self.table,
155 table_route_value,
156 region_wal_options,
157 )
158 .await?;
159
160 let detecting_regions = if table_route_value.is_physical() {
161 let regions = table_route_value.region_routes().unwrap();
163 convert_region_routes_to_detecting_regions(regions)
164 } else {
165 vec![]
166 };
167 ctx.deregister_failure_detectors(detecting_regions).await;
168 Ok(())
169 }
170
171 pub async fn on_restore_metadata(
173 &self,
174 ctx: &DdlContext,
175 table_route_value: &TableRouteValue,
176 region_wal_options: &HashMap<u32, WalOptions>,
177 ) -> Result<()> {
178 ctx.table_metadata_manager
179 .restore_table_metadata(
180 self.table_id,
181 &self.table,
182 table_route_value,
183 region_wal_options,
184 )
185 .await
186 }
187
188 pub async fn invalidate_table_cache(&self, ctx: &DdlContext) -> Result<()> {
190 let cache_invalidator = &ctx.cache_invalidator;
191 let ctx = Context {
192 subject: Some(format!(
193 "Invalidate table cache by dropping table {}, table_id: {}",
194 self.table.table_ref(),
195 self.table_id,
196 )),
197 };
198
199 cache_invalidator
200 .invalidate(
201 &ctx,
202 &[
203 CacheIdent::TableName(self.table.table_ref().into()),
204 CacheIdent::TableId(self.table_id),
205 ],
206 )
207 .await?;
208
209 Ok(())
210 }
211
212 pub async fn on_drop_regions(
214 &self,
215 ctx: &DdlContext,
216 region_routes: &[RegionRoute],
217 fast_path: bool,
218 ) -> Result<()> {
219 let leaders = find_leaders(region_routes);
221 let mut drop_region_tasks = Vec::with_capacity(leaders.len());
222 let table_id = self.table_id;
223 for datanode in leaders {
224 let requester = ctx.node_manager.datanode(&datanode).await;
225 let regions = find_leader_regions(region_routes, &datanode);
226 let region_ids = regions
227 .iter()
228 .map(|region_number| RegionId::new(table_id, *region_number))
229 .collect::<Vec<_>>();
230
231 for region_id in region_ids {
232 debug!("Dropping region {region_id} on Datanode {datanode:?}");
233 let request = RegionRequest {
234 header: Some(RegionRequestHeader {
235 tracing_context: TracingContext::from_current_span().to_w3c(),
236 ..Default::default()
237 }),
238 body: Some(region_request::Body::Drop(PbDropRegionRequest {
239 region_id: region_id.as_u64(),
240 fast_path,
241 })),
242 };
243 let datanode = datanode.clone();
244 let requester = requester.clone();
245 drop_region_tasks.push(async move {
246 if let Err(err) = requester.handle(request).await {
247 if err.status_code() != StatusCode::RegionNotFound {
248 return Err(add_peer_context_if_needed(datanode)(err));
249 }
250 }
251 Ok(())
252 });
253 }
254 }
255
256 join_all(drop_region_tasks)
257 .await
258 .into_iter()
259 .collect::<Result<Vec<_>>>()?;
260
261 let followers = find_followers(region_routes);
263 let mut close_region_tasks = Vec::with_capacity(followers.len());
264 for datanode in followers {
265 let requester = ctx.node_manager.datanode(&datanode).await;
266 let regions = find_follower_regions(region_routes, &datanode);
267 let region_ids = regions
268 .iter()
269 .map(|region_number| RegionId::new(table_id, *region_number))
270 .collect::<Vec<_>>();
271
272 for region_id in region_ids {
273 debug!("Closing region {region_id} on Datanode {datanode:?}");
274 let request = RegionRequest {
275 header: Some(RegionRequestHeader {
276 tracing_context: TracingContext::from_current_span().to_w3c(),
277 ..Default::default()
278 }),
279 body: Some(region_request::Body::Close(PbCloseRegionRequest {
280 region_id: region_id.as_u64(),
281 })),
282 };
283
284 let datanode = datanode.clone();
285 let requester = requester.clone();
286 close_region_tasks.push(async move {
287 if let Err(err) = requester.handle(request).await {
288 if err.status_code() != StatusCode::RegionNotFound {
289 return Err(add_peer_context_if_needed(datanode)(err));
290 }
291 }
292 Ok(())
293 });
294 }
295 }
296
297 if let Err(err) = join_all(close_region_tasks)
301 .await
302 .into_iter()
303 .collect::<Result<Vec<_>>>()
304 {
305 error!(err; "Failed to close follower regions on datanodes, table_id: {}", table_id);
306 }
307
308 let region_ids = operating_leader_regions(region_routes);
310 ctx.leader_region_registry
311 .batch_delete(region_ids.into_iter().map(|(region_id, _)| region_id));
312
313 Ok(())
314 }
315}
316
317#[cfg(test)]
318mod tests {
319 use std::assert_matches::assert_matches;
320 use std::collections::HashMap;
321 use std::sync::Arc;
322
323 use api::v1::{ColumnDataType, SemanticType};
324 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
325 use table::metadata::RawTableInfo;
326 use table::table_name::TableName;
327
328 use super::*;
329 use crate::ddl::test_util::columns::TestColumnDefBuilder;
330 use crate::ddl::test_util::create_table::{
331 build_raw_table_info_from_expr, TestCreateTableExprBuilder,
332 };
333 use crate::key::table_route::TableRouteValue;
334 use crate::test_util::{new_ddl_context, MockDatanodeManager};
335
336 fn test_create_raw_table_info(name: &str) -> RawTableInfo {
337 let create_table = TestCreateTableExprBuilder::default()
338 .column_defs([
339 TestColumnDefBuilder::default()
340 .name("ts")
341 .data_type(ColumnDataType::TimestampMillisecond)
342 .semantic_type(SemanticType::Timestamp)
343 .build()
344 .unwrap()
345 .into(),
346 TestColumnDefBuilder::default()
347 .name("host")
348 .data_type(ColumnDataType::String)
349 .semantic_type(SemanticType::Tag)
350 .build()
351 .unwrap()
352 .into(),
353 TestColumnDefBuilder::default()
354 .name("cpu")
355 .data_type(ColumnDataType::Float64)
356 .semantic_type(SemanticType::Field)
357 .build()
358 .unwrap()
359 .into(),
360 ])
361 .time_index("ts")
362 .primary_keys(["host".into()])
363 .table_name(name)
364 .build()
365 .unwrap()
366 .into();
367 build_raw_table_info_from_expr(&create_table)
368 }
369
370 #[tokio::test]
371 async fn test_on_prepare() {
372 let node_manager = Arc::new(MockDatanodeManager::new(()));
374 let ctx = new_ddl_context(node_manager);
375 let executor = DropTableExecutor::new(
376 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table"),
377 1024,
378 true,
379 );
380 let ctrl = executor.on_prepare(&ctx).await.unwrap();
381 assert!(ctrl.stop());
382
383 let executor = DropTableExecutor::new(
385 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table"),
386 1024,
387 false,
388 );
389 let err = executor.on_prepare(&ctx).await.unwrap_err();
390 assert_matches!(err, error::Error::TableNotFound { .. });
391
392 let executor = DropTableExecutor::new(
394 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table"),
395 1024,
396 false,
397 );
398 let raw_table_info = test_create_raw_table_info("my_table");
399 ctx.table_metadata_manager
400 .create_table_metadata(
401 raw_table_info,
402 TableRouteValue::physical(vec![]),
403 HashMap::new(),
404 )
405 .await
406 .unwrap();
407 let ctrl = executor.on_prepare(&ctx).await.unwrap();
408 assert!(!ctrl.stop());
409 }
410}