1use std::any::Any;
16
17use common_procedure::Status;
18use common_telemetry::info;
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use table::metadata::TableId;
22use table::table_name::TableName;
23
24use crate::ddl::DdlContext;
25use crate::ddl::drop_database::cursor::DropDatabaseCursor;
26use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
27use crate::ddl::drop_table::executor::DropTableExecutor;
28use crate::ddl::utils::get_region_wal_options;
29use crate::error::{self, Result};
30use crate::key::table_route::TableRouteValue;
31use crate::region_keeper::OperatingRegionGuard;
32use crate::rpc::router::{RegionRoute, operating_leader_regions};
33
34#[derive(Debug, Serialize, Deserialize)]
35pub(crate) struct DropDatabaseExecutor {
36 table_id: TableId,
37 physical_table_id: TableId,
38 table_name: TableName,
39 pub(crate) physical_region_routes: Vec<RegionRoute>,
41 pub(crate) target: DropTableTarget,
42 #[serde(skip)]
43 dropping_regions: Vec<OperatingRegionGuard>,
44}
45
46impl DropDatabaseExecutor {
47 pub fn new(
49 table_id: TableId,
50 physical_table_id: TableId,
51 table_name: TableName,
52 physical_region_routes: Vec<RegionRoute>,
53 target: DropTableTarget,
54 ) -> Self {
55 Self {
56 table_id,
57 physical_table_id,
58 table_name,
59 physical_region_routes,
60 target,
61 dropping_regions: vec![],
62 }
63 }
64}
65
66impl DropDatabaseExecutor {
67 pub(crate) fn register_dropping_regions(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
69 if !self.dropping_regions.is_empty() {
70 return Ok(());
71 }
72 let dropping_regions = operating_leader_regions(&self.physical_region_routes);
73 let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
74 for (region_id, datanode_id) in dropping_regions {
75 let guard = ddl_ctx
76 .memory_region_keeper
77 .register(datanode_id, region_id)
78 .context(error::RegionOperatingRaceSnafu {
79 region_id,
80 peer_id: datanode_id,
81 })?;
82 dropping_region_guards.push(guard);
83 }
84 self.dropping_regions = dropping_region_guards;
85 Ok(())
86 }
87}
88
89#[async_trait::async_trait]
90#[typetag::serde]
91impl State for DropDatabaseExecutor {
92 fn recover(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
93 self.register_dropping_regions(ddl_ctx)
94 }
95
96 async fn next(
97 &mut self,
98 ddl_ctx: &DdlContext,
99 _ctx: &mut DropDatabaseContext,
100 ) -> Result<(Box<dyn State>, Status)> {
101 self.register_dropping_regions(ddl_ctx)?;
102 let executor = DropTableExecutor::new(self.table_name.clone(), self.table_id, true);
103 let table_route_value = TableRouteValue::new(
105 self.table_id,
106 self.physical_table_id,
107 self.physical_region_routes.clone(),
108 );
109
110 let region_wal_options = get_region_wal_options(
112 &ddl_ctx.table_metadata_manager,
113 &table_route_value,
114 self.physical_table_id,
115 )
116 .await?;
117
118 executor
119 .on_destroy_metadata(ddl_ctx, &table_route_value, ®ion_wal_options)
120 .await?;
121 executor.invalidate_table_cache(ddl_ctx).await?;
122 executor
123 .on_drop_regions(
124 &ddl_ctx.node_manager,
125 &ddl_ctx.leader_region_registry,
126 &self.physical_region_routes,
127 true,
128 false,
129 false,
130 )
131 .await?;
132 info!("Table: {}({}) is dropped", self.table_name, self.table_id);
133
134 Ok((
135 Box::new(DropDatabaseCursor::new(self.target)),
136 Status::executing(false),
137 ))
138 }
139
140 fn as_any(&self) -> &dyn Any {
141 self
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use std::sync::Arc;
148
149 use api::region::RegionResponse;
150 use api::v1::region::RegionRequest;
151 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
152 use common_error::ext::BoxedError;
153 use common_query::request::QueryRequest;
154 use common_recordbatch::SendableRecordBatchStream;
155 use table::table_name::TableName;
156
157 use crate::ddl::drop_database::cursor::DropDatabaseCursor;
158 use crate::ddl::drop_database::executor::DropDatabaseExecutor;
159 use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
160 use crate::ddl::test_util::{create_logical_table, create_physical_table};
161 use crate::error::{self, Error, Result};
162 use crate::key::datanode_table::DatanodeTableKey;
163 use crate::peer::Peer;
164 use crate::rpc::router::region_distribution;
165 use crate::test_util::{MockDatanodeHandler, MockDatanodeManager, new_ddl_context};
166
167 #[derive(Clone)]
168 pub struct NaiveDatanodeHandler;
169
170 #[async_trait::async_trait]
171 impl MockDatanodeHandler for NaiveDatanodeHandler {
172 async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
173 Ok(RegionResponse::new(0))
174 }
175
176 async fn handle_query(
177 &self,
178 _peer: &Peer,
179 _request: QueryRequest,
180 ) -> Result<SendableRecordBatchStream> {
181 unreachable!()
182 }
183 }
184
185 #[tokio::test]
186 async fn test_next_with_physical_table() {
187 let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
188 let ddl_context = new_ddl_context(node_manager);
189 let physical_table_id = create_physical_table(&ddl_context, "phy").await;
190 let (_, table_route) = ddl_context
191 .table_metadata_manager
192 .table_route_manager()
193 .get_physical_table_route(physical_table_id)
194 .await
195 .unwrap();
196 {
197 let mut state = DropDatabaseExecutor::new(
198 physical_table_id,
199 physical_table_id,
200 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
201 table_route.region_routes.clone(),
202 DropTableTarget::Physical,
203 );
204 let mut ctx = DropDatabaseContext {
205 catalog: DEFAULT_CATALOG_NAME.to_string(),
206 schema: DEFAULT_SCHEMA_NAME.to_string(),
207 drop_if_exists: false,
208 tables: None,
209 };
210 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
211 assert!(!status.need_persist());
212 let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
213 assert_eq!(cursor.target, DropTableTarget::Physical);
214 }
215 let mut ctx = DropDatabaseContext {
217 catalog: DEFAULT_CATALOG_NAME.to_string(),
218 schema: DEFAULT_SCHEMA_NAME.to_string(),
219 drop_if_exists: false,
220 tables: None,
221 };
222 let mut state = DropDatabaseExecutor::new(
223 physical_table_id,
224 physical_table_id,
225 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
226 table_route.region_routes.clone(),
227 DropTableTarget::Physical,
228 );
229 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
230 assert!(!status.need_persist());
231 let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
232 assert_eq!(cursor.target, DropTableTarget::Physical);
233 }
234
235 #[tokio::test]
236 async fn test_next_logical_table() {
237 let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
238 let ddl_context = new_ddl_context(node_manager);
239 let physical_table_id = create_physical_table(&ddl_context, "phy").await;
240 create_logical_table(ddl_context.clone(), physical_table_id, "metric").await;
241 let logical_table_id = physical_table_id + 1;
242 let (_, table_route) = ddl_context
243 .table_metadata_manager
244 .table_route_manager()
245 .get_physical_table_route(logical_table_id)
246 .await
247 .unwrap();
248 {
249 let mut state = DropDatabaseExecutor::new(
250 logical_table_id,
251 physical_table_id,
252 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"),
253 table_route.region_routes.clone(),
254 DropTableTarget::Logical,
255 );
256 let mut ctx = DropDatabaseContext {
257 catalog: DEFAULT_CATALOG_NAME.to_string(),
258 schema: DEFAULT_SCHEMA_NAME.to_string(),
259 drop_if_exists: false,
260 tables: None,
261 };
262 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
263 assert!(!status.need_persist());
264 let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
265 assert_eq!(cursor.target, DropTableTarget::Logical);
266 }
267 let mut ctx = DropDatabaseContext {
269 catalog: DEFAULT_CATALOG_NAME.to_string(),
270 schema: DEFAULT_SCHEMA_NAME.to_string(),
271 drop_if_exists: false,
272 tables: None,
273 };
274 let mut state = DropDatabaseExecutor::new(
275 logical_table_id,
276 physical_table_id,
277 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"),
278 table_route.region_routes,
279 DropTableTarget::Logical,
280 );
281 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
282 assert!(!status.need_persist());
283 let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
284 assert_eq!(cursor.target, DropTableTarget::Logical);
285 ddl_context
287 .table_metadata_manager
288 .table_info_manager()
289 .get(physical_table_id)
290 .await
291 .unwrap()
292 .unwrap();
293 let table_route = ddl_context
295 .table_metadata_manager
296 .table_route_manager()
297 .table_route_storage()
298 .get(physical_table_id)
299 .await
300 .unwrap()
301 .unwrap();
302 let region_routes = table_route.region_routes().unwrap();
303 for datanode_id in region_distribution(region_routes).into_keys() {
304 ddl_context
305 .table_metadata_manager
306 .datanode_table_manager()
307 .get(&DatanodeTableKey::new(datanode_id, physical_table_id))
308 .await
309 .unwrap()
310 .unwrap();
311 }
312 }
313
314 #[derive(Clone)]
315 pub struct RetryErrorDatanodeHandler;
316
317 #[async_trait::async_trait]
318 impl MockDatanodeHandler for RetryErrorDatanodeHandler {
319 async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
320 Err(Error::RetryLater {
321 source: BoxedError::new(
322 error::UnexpectedSnafu {
323 err_msg: "retry later",
324 }
325 .build(),
326 ),
327 clean_poisons: false,
328 })
329 }
330
331 async fn handle_query(
332 &self,
333 _peer: &Peer,
334 _request: QueryRequest,
335 ) -> Result<SendableRecordBatchStream> {
336 unreachable!()
337 }
338 }
339
340 #[tokio::test]
341 async fn test_next_retryable_err() {
342 let node_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler));
343 let ddl_context = new_ddl_context(node_manager);
344 let physical_table_id = create_physical_table(&ddl_context, "phy").await;
345 let (_, table_route) = ddl_context
346 .table_metadata_manager
347 .table_route_manager()
348 .get_physical_table_route(physical_table_id)
349 .await
350 .unwrap();
351 let mut state = DropDatabaseExecutor::new(
352 physical_table_id,
353 physical_table_id,
354 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
355 table_route.region_routes,
356 DropTableTarget::Physical,
357 );
358 let mut ctx = DropDatabaseContext {
359 catalog: DEFAULT_CATALOG_NAME.to_string(),
360 schema: DEFAULT_SCHEMA_NAME.to_string(),
361 drop_if_exists: false,
362 tables: None,
363 };
364 let err = state.next(&ddl_context, &mut ctx).await.unwrap_err();
365 assert!(err.is_retry_later());
366 }
367
368 #[tokio::test]
369 async fn test_on_recovery() {
370 let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
371 let ddl_context = new_ddl_context(node_manager);
372 let physical_table_id = create_physical_table(&ddl_context, "phy").await;
373 let (_, table_route) = ddl_context
374 .table_metadata_manager
375 .table_route_manager()
376 .get_physical_table_route(physical_table_id)
377 .await
378 .unwrap();
379 {
380 let mut state = DropDatabaseExecutor::new(
381 physical_table_id,
382 physical_table_id,
383 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
384 table_route.region_routes.clone(),
385 DropTableTarget::Physical,
386 );
387 let mut ctx = DropDatabaseContext {
388 catalog: DEFAULT_CATALOG_NAME.to_string(),
389 schema: DEFAULT_SCHEMA_NAME.to_string(),
390 drop_if_exists: false,
391 tables: None,
392 };
393 state.recover(&ddl_context).unwrap();
394 assert_eq!(state.dropping_regions.len(), 1);
395 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
396 assert!(!status.need_persist());
397 let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
398 assert_eq!(cursor.target, DropTableTarget::Physical);
399 }
400 }
401}