1use std::any::Any;
16
17use common_procedure::Status;
18use common_telemetry::info;
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use table::metadata::TableId;
22use table::table_name::TableName;
23
24use crate::ddl::DdlContext;
25use crate::ddl::drop_database::cursor::DropDatabaseCursor;
26use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
27use crate::ddl::drop_table::executor::DropTableExecutor;
28use crate::ddl::utils::get_region_wal_options;
29use crate::error::{self, Result};
30use crate::key::table_route::TableRouteValue;
31use crate::region_keeper::OperatingRegionGuard;
32use crate::rpc::router::{RegionRoute, operating_leader_region_roles};
33
34#[derive(Debug, Serialize, Deserialize)]
35pub(crate) struct DropDatabaseExecutor {
36 table_id: TableId,
37 physical_table_id: TableId,
38 table_name: TableName,
39 pub(crate) physical_region_routes: Vec<RegionRoute>,
41 pub(crate) target: DropTableTarget,
42 #[serde(skip)]
43 dropping_regions: Vec<OperatingRegionGuard>,
44}
45
46impl DropDatabaseExecutor {
47 pub fn new(
49 table_id: TableId,
50 physical_table_id: TableId,
51 table_name: TableName,
52 physical_region_routes: Vec<RegionRoute>,
53 target: DropTableTarget,
54 ) -> Self {
55 Self {
56 table_id,
57 physical_table_id,
58 table_name,
59 physical_region_routes,
60 target,
61 dropping_regions: vec![],
62 }
63 }
64}
65
66impl DropDatabaseExecutor {
67 pub(crate) fn register_dropping_regions(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
69 if !self.dropping_regions.is_empty() {
70 return Ok(());
71 }
72 let dropping_regions = operating_leader_region_roles(&self.physical_region_routes);
73 let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
74 for (region_id, datanode_id, role) in dropping_regions {
75 let guard = ddl_ctx
76 .memory_region_keeper
77 .register_with_role(datanode_id, region_id, role)
78 .context(error::RegionOperatingRaceSnafu {
79 region_id,
80 peer_id: datanode_id,
81 })?;
82 dropping_region_guards.push(guard);
83 }
84 self.dropping_regions = dropping_region_guards;
85 Ok(())
86 }
87}
88
89#[async_trait::async_trait]
90#[typetag::serde]
91impl State for DropDatabaseExecutor {
92 fn recover(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
93 self.register_dropping_regions(ddl_ctx)
94 }
95
96 async fn next(
97 &mut self,
98 ddl_ctx: &DdlContext,
99 ctx: &mut DropDatabaseContext,
100 ) -> Result<(Box<dyn State>, Status)> {
101 self.register_dropping_regions(ddl_ctx)?;
102 let executor = DropTableExecutor::new(self.table_name.clone(), self.table_id, true);
103 if ctx.retrying {
104 info!(
105 "Remapping region routes addresses for retrying drop regions for table_id: {}",
106 self.table_id
107 );
108 let storage = ddl_ctx
109 .table_metadata_manager
110 .table_route_manager()
111 .table_route_storage();
112 storage
115 .remap_region_routes(&mut self.physical_region_routes)
116 .await?;
117 }
118 let table_route_value = TableRouteValue::new(
120 self.table_id,
121 self.physical_table_id,
122 self.physical_region_routes.clone(),
123 );
124
125 let region_wal_options = get_region_wal_options(
127 &ddl_ctx.table_metadata_manager,
128 &table_route_value,
129 self.physical_table_id,
130 )
131 .await?;
132
133 executor
134 .on_destroy_metadata(ddl_ctx, &table_route_value, ®ion_wal_options)
135 .await?;
136 executor.invalidate_table_cache(ddl_ctx).await?;
137 executor
138 .on_drop_regions(
139 &ddl_ctx.node_manager,
140 &ddl_ctx.leader_region_registry,
141 &self.physical_region_routes,
142 true,
143 false,
144 false,
145 )
146 .await?;
147 info!("Table: {}({}) is dropped", self.table_name, self.table_id);
148
149 Ok((
150 Box::new(DropDatabaseCursor::new(self.target)),
151 Status::executing(false),
152 ))
153 }
154
155 fn as_any(&self) -> &dyn Any {
156 self
157 }
158}
159
160#[cfg(test)]
161mod tests {
162 use std::collections::HashSet;
163 use std::sync::Arc;
164
165 use api::region::RegionResponse;
166 use api::v1::region::RegionRequest;
167 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
168 use common_error::ext::BoxedError;
169 use common_query::request::QueryRequest;
170 use common_recordbatch::SendableRecordBatchStream;
171 use store_api::region_engine::RegionRole;
172 use store_api::storage::RegionId;
173 use table::table_name::TableName;
174
175 use crate::ddl::drop_database::cursor::DropDatabaseCursor;
176 use crate::ddl::drop_database::executor::DropDatabaseExecutor;
177 use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
178 use crate::ddl::test_util::datanode_handler::DatanodeWatcher;
179 use crate::ddl::test_util::{
180 create_logical_table, create_physical_table, put_datanode_address,
181 };
182 use crate::error::{self, Error, Result};
183 use crate::key::datanode_table::DatanodeTableKey;
184 use crate::peer::Peer;
185 use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
186 use crate::test_util::{MockDatanodeHandler, MockDatanodeManager, new_ddl_context};
187
188 #[derive(Clone)]
189 pub struct NaiveDatanodeHandler;
190
191 #[async_trait::async_trait]
192 impl MockDatanodeHandler for NaiveDatanodeHandler {
193 async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
194 Ok(RegionResponse::new(0))
195 }
196
197 async fn handle_query(
198 &self,
199 _peer: &Peer,
200 _request: QueryRequest,
201 ) -> Result<SendableRecordBatchStream> {
202 unreachable!()
203 }
204 }
205
206 #[tokio::test]
207 async fn test_next_with_physical_table() {
208 let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
209 let ddl_context = new_ddl_context(node_manager);
210 let physical_table_id = create_physical_table(&ddl_context, "phy").await;
211 let (_, table_route) = ddl_context
212 .table_metadata_manager
213 .table_route_manager()
214 .get_physical_table_route(physical_table_id)
215 .await
216 .unwrap();
217 {
218 let mut state = DropDatabaseExecutor::new(
219 physical_table_id,
220 physical_table_id,
221 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
222 table_route.region_routes.clone(),
223 DropTableTarget::Physical,
224 );
225 let mut ctx = DropDatabaseContext {
226 catalog: DEFAULT_CATALOG_NAME.to_string(),
227 schema: DEFAULT_SCHEMA_NAME.to_string(),
228 drop_if_exists: false,
229 tables: None,
230 retrying: false,
231 };
232 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
233 assert!(!status.need_persist());
234 let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
235 assert_eq!(cursor.target, DropTableTarget::Physical);
236 }
237 let mut ctx = DropDatabaseContext {
239 catalog: DEFAULT_CATALOG_NAME.to_string(),
240 schema: DEFAULT_SCHEMA_NAME.to_string(),
241 drop_if_exists: false,
242 tables: None,
243 retrying: false,
244 };
245 let mut state = DropDatabaseExecutor::new(
246 physical_table_id,
247 physical_table_id,
248 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
249 table_route.region_routes.clone(),
250 DropTableTarget::Physical,
251 );
252 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
253 assert!(!status.need_persist());
254 let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
255 assert_eq!(cursor.target, DropTableTarget::Physical);
256 }
257
258 #[tokio::test]
259 async fn test_next_logical_table() {
260 let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
261 let ddl_context = new_ddl_context(node_manager);
262 let physical_table_id = create_physical_table(&ddl_context, "phy").await;
263 create_logical_table(ddl_context.clone(), physical_table_id, "metric").await;
264 let logical_table_id = physical_table_id + 1;
265 let (_, table_route) = ddl_context
266 .table_metadata_manager
267 .table_route_manager()
268 .get_physical_table_route(logical_table_id)
269 .await
270 .unwrap();
271 {
272 let mut state = DropDatabaseExecutor::new(
273 logical_table_id,
274 physical_table_id,
275 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"),
276 table_route.region_routes.clone(),
277 DropTableTarget::Logical,
278 );
279 let mut ctx = DropDatabaseContext {
280 catalog: DEFAULT_CATALOG_NAME.to_string(),
281 schema: DEFAULT_SCHEMA_NAME.to_string(),
282 drop_if_exists: false,
283 tables: None,
284 retrying: false,
285 };
286 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
287 assert!(!status.need_persist());
288 let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
289 assert_eq!(cursor.target, DropTableTarget::Logical);
290 }
291 let mut ctx = DropDatabaseContext {
293 catalog: DEFAULT_CATALOG_NAME.to_string(),
294 schema: DEFAULT_SCHEMA_NAME.to_string(),
295 drop_if_exists: false,
296 tables: None,
297 retrying: false,
298 };
299 let mut state = DropDatabaseExecutor::new(
300 logical_table_id,
301 physical_table_id,
302 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"),
303 table_route.region_routes,
304 DropTableTarget::Logical,
305 );
306 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
307 assert!(!status.need_persist());
308 let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
309 assert_eq!(cursor.target, DropTableTarget::Logical);
310 ddl_context
312 .table_metadata_manager
313 .table_info_manager()
314 .get(physical_table_id)
315 .await
316 .unwrap()
317 .unwrap();
318 let table_route = ddl_context
320 .table_metadata_manager
321 .table_route_manager()
322 .table_route_storage()
323 .get(physical_table_id)
324 .await
325 .unwrap()
326 .unwrap();
327 let region_routes = table_route.region_routes().unwrap();
328 for datanode_id in region_distribution(region_routes).into_keys() {
329 ddl_context
330 .table_metadata_manager
331 .datanode_table_manager()
332 .get(&DatanodeTableKey::new(datanode_id, physical_table_id))
333 .await
334 .unwrap()
335 .unwrap();
336 }
337 }
338
339 #[derive(Clone)]
340 pub struct RetryErrorDatanodeHandler;
341
342 #[async_trait::async_trait]
343 impl MockDatanodeHandler for RetryErrorDatanodeHandler {
344 async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
345 Err(Error::RetryLater {
346 source: BoxedError::new(
347 error::UnexpectedSnafu {
348 err_msg: "retry later",
349 }
350 .build(),
351 ),
352 clean_poisons: false,
353 })
354 }
355
356 async fn handle_query(
357 &self,
358 _peer: &Peer,
359 _request: QueryRequest,
360 ) -> Result<SendableRecordBatchStream> {
361 unreachable!()
362 }
363 }
364
365 #[tokio::test]
366 async fn test_next_retryable_err() {
367 let node_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler));
368 let ddl_context = new_ddl_context(node_manager);
369 let physical_table_id = create_physical_table(&ddl_context, "phy").await;
370 let (_, table_route) = ddl_context
371 .table_metadata_manager
372 .table_route_manager()
373 .get_physical_table_route(physical_table_id)
374 .await
375 .unwrap();
376 let mut state = DropDatabaseExecutor::new(
377 physical_table_id,
378 physical_table_id,
379 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
380 table_route.region_routes,
381 DropTableTarget::Physical,
382 );
383 let mut ctx = DropDatabaseContext {
384 catalog: DEFAULT_CATALOG_NAME.to_string(),
385 schema: DEFAULT_SCHEMA_NAME.to_string(),
386 drop_if_exists: false,
387 tables: None,
388 retrying: false,
389 };
390 let err = state.next(&ddl_context, &mut ctx).await.unwrap_err();
391 assert!(err.is_retry_later());
392 }
393
394 #[tokio::test]
395 async fn test_on_recovery() {
396 let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
397 let ddl_context = new_ddl_context(node_manager);
398 let physical_table_id = create_physical_table(&ddl_context, "phy").await;
399 let (_, table_route) = ddl_context
400 .table_metadata_manager
401 .table_route_manager()
402 .get_physical_table_route(physical_table_id)
403 .await
404 .unwrap();
405 {
406 let mut state = DropDatabaseExecutor::new(
407 physical_table_id,
408 physical_table_id,
409 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
410 table_route.region_routes.clone(),
411 DropTableTarget::Physical,
412 );
413 let mut ctx = DropDatabaseContext {
414 catalog: DEFAULT_CATALOG_NAME.to_string(),
415 schema: DEFAULT_SCHEMA_NAME.to_string(),
416 drop_if_exists: false,
417 tables: None,
418 retrying: false,
419 };
420 state.recover(&ddl_context).unwrap();
421 assert_eq!(state.dropping_regions.len(), 1);
422 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
423 assert!(!status.need_persist());
424 let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
425 assert_eq!(cursor.target, DropTableTarget::Physical);
426 }
427 }
428
429 #[tokio::test]
430 async fn test_recover_registers_region_role_from_routes() {
431 let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
432 let ddl_context = new_ddl_context(node_manager);
433 let region_id = RegionId::new(1024, 1);
434 let mut state = DropDatabaseExecutor::new(
435 1024,
436 1024,
437 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
438 vec![RegionRoute {
439 region: Region::new_test(region_id),
440 leader_peer: Some(Peer::empty(7)),
441 follower_peers: vec![],
442 leader_state: Some(LeaderState::Downgrading),
443 leader_down_since: None,
444 write_route_policy: None,
445 }],
446 DropTableTarget::Physical,
447 );
448
449 state.recover(&ddl_context).unwrap();
450
451 let roles = ddl_context
452 .memory_region_keeper
453 .extract_operating_region_roles(7, &HashSet::from([region_id]));
454 assert_eq!(roles.get(®ion_id), Some(&RegionRole::DowngradingLeader));
455 }
456
457 #[tokio::test]
458 async fn test_next_remaps_addresses_when_retrying() {
459 let (tx, mut rx) = tokio::sync::mpsc::channel(8);
460 let node_manager = Arc::new(MockDatanodeManager::new(DatanodeWatcher::new(tx)));
461 let ddl_context = new_ddl_context(node_manager);
462 let physical_table_id = create_physical_table(&ddl_context, "phy").await;
463 let (_, table_route) = ddl_context
464 .table_metadata_manager
465 .table_route_manager()
466 .get_physical_table_route(physical_table_id)
467 .await
468 .unwrap();
469
470 let mut state = DropDatabaseExecutor::new(
471 physical_table_id,
472 physical_table_id,
473 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
474 table_route.region_routes,
475 DropTableTarget::Physical,
476 );
477 state.physical_region_routes[0]
478 .leader_peer
479 .as_mut()
480 .unwrap()
481 .addr = "old-addr".to_string();
482 let mut ctx = DropDatabaseContext {
483 catalog: DEFAULT_CATALOG_NAME.to_string(),
484 schema: DEFAULT_SCHEMA_NAME.to_string(),
485 drop_if_exists: false,
486 tables: None,
487 retrying: true,
488 };
489
490 put_datanode_address(&ddl_context, 0, "new-addr").await;
491
492 state.next(&ddl_context, &mut ctx).await.unwrap();
493
494 let (peer, _) = rx.try_recv().unwrap();
495 assert_eq!(peer.addr, "new-addr");
496 }
497}