1use std::any::Any;
16
17use common_procedure::Status;
18use common_telemetry::info;
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use table::metadata::TableId;
22use table::table_name::TableName;
23
24use crate::ddl::drop_database::cursor::DropDatabaseCursor;
25use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
26use crate::ddl::drop_table::executor::DropTableExecutor;
27use crate::ddl::utils::get_region_wal_options;
28use crate::ddl::DdlContext;
29use crate::error::{self, Result};
30use crate::key::table_route::TableRouteValue;
31use crate::region_keeper::OperatingRegionGuard;
32use crate::rpc::router::{operating_leader_regions, RegionRoute};
33
34#[derive(Debug, Serialize, Deserialize)]
35pub(crate) struct DropDatabaseExecutor {
36 table_id: TableId,
37 physical_table_id: TableId,
38 table_name: TableName,
39 pub(crate) physical_region_routes: Vec<RegionRoute>,
41 pub(crate) target: DropTableTarget,
42 #[serde(skip)]
43 dropping_regions: Vec<OperatingRegionGuard>,
44}
45
46impl DropDatabaseExecutor {
47 pub fn new(
49 table_id: TableId,
50 physical_table_id: TableId,
51 table_name: TableName,
52 physical_region_routes: Vec<RegionRoute>,
53 target: DropTableTarget,
54 ) -> Self {
55 Self {
56 table_id,
57 physical_table_id,
58 table_name,
59 physical_region_routes,
60 target,
61 dropping_regions: vec![],
62 }
63 }
64}
65
66impl DropDatabaseExecutor {
67 pub(crate) fn register_dropping_regions(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
69 if !self.dropping_regions.is_empty() {
70 return Ok(());
71 }
72 let dropping_regions = operating_leader_regions(&self.physical_region_routes);
73 let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
74 for (region_id, datanode_id) in dropping_regions {
75 let guard = ddl_ctx
76 .memory_region_keeper
77 .register(datanode_id, region_id)
78 .context(error::RegionOperatingRaceSnafu {
79 region_id,
80 peer_id: datanode_id,
81 })?;
82 dropping_region_guards.push(guard);
83 }
84 self.dropping_regions = dropping_region_guards;
85 Ok(())
86 }
87}
88
89#[async_trait::async_trait]
90#[typetag::serde]
91impl State for DropDatabaseExecutor {
92 fn recover(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
93 self.register_dropping_regions(ddl_ctx)
94 }
95
96 async fn next(
97 &mut self,
98 ddl_ctx: &DdlContext,
99 _ctx: &mut DropDatabaseContext,
100 ) -> Result<(Box<dyn State>, Status)> {
101 self.register_dropping_regions(ddl_ctx)?;
102 let executor = DropTableExecutor::new(self.table_name.clone(), self.table_id, true);
103 let table_route_value = TableRouteValue::new(
105 self.table_id,
106 self.physical_table_id,
107 self.physical_region_routes.clone(),
108 );
109
110 let region_wal_options = get_region_wal_options(
112 &ddl_ctx.table_metadata_manager,
113 &table_route_value,
114 self.physical_table_id,
115 )
116 .await?;
117
118 executor
119 .on_destroy_metadata(ddl_ctx, &table_route_value, ®ion_wal_options)
120 .await?;
121 executor.invalidate_table_cache(ddl_ctx).await?;
122 executor
123 .on_drop_regions(ddl_ctx, &self.physical_region_routes, true)
124 .await?;
125 info!("Table: {}({}) is dropped", self.table_name, self.table_id);
126
127 Ok((
128 Box::new(DropDatabaseCursor::new(self.target)),
129 Status::executing(false),
130 ))
131 }
132
133 fn as_any(&self) -> &dyn Any {
134 self
135 }
136}
137
138#[cfg(test)]
139mod tests {
140 use std::sync::Arc;
141
142 use api::region::RegionResponse;
143 use api::v1::region::RegionRequest;
144 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
145 use common_error::ext::BoxedError;
146 use common_query::request::QueryRequest;
147 use common_recordbatch::SendableRecordBatchStream;
148 use table::table_name::TableName;
149
150 use crate::ddl::drop_database::cursor::DropDatabaseCursor;
151 use crate::ddl::drop_database::executor::DropDatabaseExecutor;
152 use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
153 use crate::ddl::test_util::{create_logical_table, create_physical_table};
154 use crate::error::{self, Error, Result};
155 use crate::key::datanode_table::DatanodeTableKey;
156 use crate::peer::Peer;
157 use crate::rpc::router::region_distribution;
158 use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager};
159
160 #[derive(Clone)]
161 pub struct NaiveDatanodeHandler;
162
163 #[async_trait::async_trait]
164 impl MockDatanodeHandler for NaiveDatanodeHandler {
165 async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
166 Ok(RegionResponse::new(0))
167 }
168
169 async fn handle_query(
170 &self,
171 _peer: &Peer,
172 _request: QueryRequest,
173 ) -> Result<SendableRecordBatchStream> {
174 unreachable!()
175 }
176 }
177
178 #[tokio::test]
179 async fn test_next_with_physical_table() {
180 let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
181 let ddl_context = new_ddl_context(node_manager);
182 let physical_table_id = create_physical_table(&ddl_context, "phy").await;
183 let (_, table_route) = ddl_context
184 .table_metadata_manager
185 .table_route_manager()
186 .get_physical_table_route(physical_table_id)
187 .await
188 .unwrap();
189 {
190 let mut state = DropDatabaseExecutor::new(
191 physical_table_id,
192 physical_table_id,
193 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
194 table_route.region_routes.clone(),
195 DropTableTarget::Physical,
196 );
197 let mut ctx = DropDatabaseContext {
198 catalog: DEFAULT_CATALOG_NAME.to_string(),
199 schema: DEFAULT_SCHEMA_NAME.to_string(),
200 drop_if_exists: false,
201 tables: None,
202 };
203 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
204 assert!(!status.need_persist());
205 let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
206 assert_eq!(cursor.target, DropTableTarget::Physical);
207 }
208 let mut ctx = DropDatabaseContext {
210 catalog: DEFAULT_CATALOG_NAME.to_string(),
211 schema: DEFAULT_SCHEMA_NAME.to_string(),
212 drop_if_exists: false,
213 tables: None,
214 };
215 let mut state = DropDatabaseExecutor::new(
216 physical_table_id,
217 physical_table_id,
218 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
219 table_route.region_routes.clone(),
220 DropTableTarget::Physical,
221 );
222 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
223 assert!(!status.need_persist());
224 let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
225 assert_eq!(cursor.target, DropTableTarget::Physical);
226 }
227
228 #[tokio::test]
229 async fn test_next_logical_table() {
230 let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
231 let ddl_context = new_ddl_context(node_manager);
232 let physical_table_id = create_physical_table(&ddl_context, "phy").await;
233 create_logical_table(ddl_context.clone(), physical_table_id, "metric").await;
234 let logical_table_id = physical_table_id + 1;
235 let (_, table_route) = ddl_context
236 .table_metadata_manager
237 .table_route_manager()
238 .get_physical_table_route(logical_table_id)
239 .await
240 .unwrap();
241 {
242 let mut state = DropDatabaseExecutor::new(
243 logical_table_id,
244 physical_table_id,
245 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"),
246 table_route.region_routes.clone(),
247 DropTableTarget::Logical,
248 );
249 let mut ctx = DropDatabaseContext {
250 catalog: DEFAULT_CATALOG_NAME.to_string(),
251 schema: DEFAULT_SCHEMA_NAME.to_string(),
252 drop_if_exists: false,
253 tables: None,
254 };
255 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
256 assert!(!status.need_persist());
257 let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
258 assert_eq!(cursor.target, DropTableTarget::Logical);
259 }
260 let mut ctx = DropDatabaseContext {
262 catalog: DEFAULT_CATALOG_NAME.to_string(),
263 schema: DEFAULT_SCHEMA_NAME.to_string(),
264 drop_if_exists: false,
265 tables: None,
266 };
267 let mut state = DropDatabaseExecutor::new(
268 logical_table_id,
269 physical_table_id,
270 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"),
271 table_route.region_routes,
272 DropTableTarget::Logical,
273 );
274 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
275 assert!(!status.need_persist());
276 let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
277 assert_eq!(cursor.target, DropTableTarget::Logical);
278 ddl_context
280 .table_metadata_manager
281 .table_info_manager()
282 .get(physical_table_id)
283 .await
284 .unwrap()
285 .unwrap();
286 let table_route = ddl_context
288 .table_metadata_manager
289 .table_route_manager()
290 .table_route_storage()
291 .get(physical_table_id)
292 .await
293 .unwrap()
294 .unwrap();
295 let region_routes = table_route.region_routes().unwrap();
296 for datanode_id in region_distribution(region_routes).into_keys() {
297 ddl_context
298 .table_metadata_manager
299 .datanode_table_manager()
300 .get(&DatanodeTableKey::new(datanode_id, physical_table_id))
301 .await
302 .unwrap()
303 .unwrap();
304 }
305 }
306
307 #[derive(Clone)]
308 pub struct RetryErrorDatanodeHandler;
309
310 #[async_trait::async_trait]
311 impl MockDatanodeHandler for RetryErrorDatanodeHandler {
312 async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
313 Err(Error::RetryLater {
314 source: BoxedError::new(
315 error::UnexpectedSnafu {
316 err_msg: "retry later",
317 }
318 .build(),
319 ),
320 clean_poisons: false,
321 })
322 }
323
324 async fn handle_query(
325 &self,
326 _peer: &Peer,
327 _request: QueryRequest,
328 ) -> Result<SendableRecordBatchStream> {
329 unreachable!()
330 }
331 }
332
333 #[tokio::test]
334 async fn test_next_retryable_err() {
335 let node_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler));
336 let ddl_context = new_ddl_context(node_manager);
337 let physical_table_id = create_physical_table(&ddl_context, "phy").await;
338 let (_, table_route) = ddl_context
339 .table_metadata_manager
340 .table_route_manager()
341 .get_physical_table_route(physical_table_id)
342 .await
343 .unwrap();
344 let mut state = DropDatabaseExecutor::new(
345 physical_table_id,
346 physical_table_id,
347 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
348 table_route.region_routes,
349 DropTableTarget::Physical,
350 );
351 let mut ctx = DropDatabaseContext {
352 catalog: DEFAULT_CATALOG_NAME.to_string(),
353 schema: DEFAULT_SCHEMA_NAME.to_string(),
354 drop_if_exists: false,
355 tables: None,
356 };
357 let err = state.next(&ddl_context, &mut ctx).await.unwrap_err();
358 assert!(err.is_retry_later());
359 }
360
361 #[tokio::test]
362 async fn test_on_recovery() {
363 let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
364 let ddl_context = new_ddl_context(node_manager);
365 let physical_table_id = create_physical_table(&ddl_context, "phy").await;
366 let (_, table_route) = ddl_context
367 .table_metadata_manager
368 .table_route_manager()
369 .get_physical_table_route(physical_table_id)
370 .await
371 .unwrap();
372 {
373 let mut state = DropDatabaseExecutor::new(
374 physical_table_id,
375 physical_table_id,
376 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
377 table_route.region_routes.clone(),
378 DropTableTarget::Physical,
379 );
380 let mut ctx = DropDatabaseContext {
381 catalog: DEFAULT_CATALOG_NAME.to_string(),
382 schema: DEFAULT_SCHEMA_NAME.to_string(),
383 drop_if_exists: false,
384 tables: None,
385 };
386 state.recover(&ddl_context).unwrap();
387 assert_eq!(state.dropping_regions.len(), 1);
388 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
389 assert!(!status.need_persist());
390 let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
391 assert_eq!(cursor.target, DropTableTarget::Physical);
392 }
393 }
394}