1use std::any::Any;
16use std::collections::HashMap;
17
18use common_procedure::Status;
19use common_telemetry::info;
20use serde::{Deserialize, Serialize};
21use snafu::OptionExt;
22use table::metadata::TableId;
23use table::table_name::TableName;
24
25use crate::ddl::drop_database::cursor::DropDatabaseCursor;
26use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
27use crate::ddl::drop_table::executor::DropTableExecutor;
28use crate::ddl::utils::extract_region_wal_options;
29use crate::ddl::DdlContext;
30use crate::error::{self, Result};
31use crate::key::table_route::TableRouteValue;
32use crate::region_keeper::OperatingRegionGuard;
33use crate::rpc::router::{operating_leader_regions, RegionRoute};
34
35#[derive(Debug, Serialize, Deserialize)]
36pub(crate) struct DropDatabaseExecutor {
37 table_id: TableId,
38 physical_table_id: TableId,
39 table_name: TableName,
40 pub(crate) physical_region_routes: Vec<RegionRoute>,
42 pub(crate) target: DropTableTarget,
43 #[serde(skip)]
44 dropping_regions: Vec<OperatingRegionGuard>,
45}
46
47impl DropDatabaseExecutor {
48 pub fn new(
50 table_id: TableId,
51 physical_table_id: TableId,
52 table_name: TableName,
53 physical_region_routes: Vec<RegionRoute>,
54 target: DropTableTarget,
55 ) -> Self {
56 Self {
57 table_id,
58 physical_table_id,
59 table_name,
60 physical_region_routes,
61 target,
62 dropping_regions: vec![],
63 }
64 }
65}
66
67impl DropDatabaseExecutor {
68 pub(crate) fn register_dropping_regions(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
70 if !self.dropping_regions.is_empty() {
71 return Ok(());
72 }
73 let dropping_regions = operating_leader_regions(&self.physical_region_routes);
74 let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
75 for (region_id, datanode_id) in dropping_regions {
76 let guard = ddl_ctx
77 .memory_region_keeper
78 .register(datanode_id, region_id)
79 .context(error::RegionOperatingRaceSnafu {
80 region_id,
81 peer_id: datanode_id,
82 })?;
83 dropping_region_guards.push(guard);
84 }
85 self.dropping_regions = dropping_region_guards;
86 Ok(())
87 }
88}
89
90#[async_trait::async_trait]
91#[typetag::serde]
92impl State for DropDatabaseExecutor {
93 fn recover(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
94 self.register_dropping_regions(ddl_ctx)
95 }
96
97 async fn next(
98 &mut self,
99 ddl_ctx: &DdlContext,
100 _ctx: &mut DropDatabaseContext,
101 ) -> Result<(Box<dyn State>, Status)> {
102 self.register_dropping_regions(ddl_ctx)?;
103 let executor = DropTableExecutor::new(self.table_name.clone(), self.table_id, true);
104 let table_route_value = TableRouteValue::new(
106 self.table_id,
107 self.physical_table_id,
108 self.physical_region_routes.clone(),
109 );
110
111 let region_wal_options =
113 if let TableRouteValue::Physical(table_route_value) = &table_route_value {
114 let datanode_table_values = ddl_ctx
115 .table_metadata_manager
116 .datanode_table_manager()
117 .regions(self.physical_table_id, table_route_value)
118 .await?;
119 extract_region_wal_options(&datanode_table_values)?
120 } else {
121 HashMap::new()
122 };
123
124 executor
125 .on_destroy_metadata(ddl_ctx, &table_route_value, ®ion_wal_options)
126 .await?;
127 executor.invalidate_table_cache(ddl_ctx).await?;
128 executor
129 .on_drop_regions(ddl_ctx, &self.physical_region_routes, true)
130 .await?;
131 info!("Table: {}({}) is dropped", self.table_name, self.table_id);
132
133 Ok((
134 Box::new(DropDatabaseCursor::new(self.target)),
135 Status::executing(false),
136 ))
137 }
138
139 fn as_any(&self) -> &dyn Any {
140 self
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use std::sync::Arc;
147
148 use api::region::RegionResponse;
149 use api::v1::region::RegionRequest;
150 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
151 use common_error::ext::BoxedError;
152 use common_query::request::QueryRequest;
153 use common_recordbatch::SendableRecordBatchStream;
154 use table::table_name::TableName;
155
156 use crate::ddl::drop_database::cursor::DropDatabaseCursor;
157 use crate::ddl::drop_database::executor::DropDatabaseExecutor;
158 use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
159 use crate::ddl::test_util::{create_logical_table, create_physical_table};
160 use crate::error::{self, Error, Result};
161 use crate::key::datanode_table::DatanodeTableKey;
162 use crate::peer::Peer;
163 use crate::rpc::router::region_distribution;
164 use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager};
165
166 #[derive(Clone)]
167 pub struct NaiveDatanodeHandler;
168
169 #[async_trait::async_trait]
170 impl MockDatanodeHandler for NaiveDatanodeHandler {
171 async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
172 Ok(RegionResponse::new(0))
173 }
174
175 async fn handle_query(
176 &self,
177 _peer: &Peer,
178 _request: QueryRequest,
179 ) -> Result<SendableRecordBatchStream> {
180 unreachable!()
181 }
182 }
183
184 #[tokio::test]
185 async fn test_next_with_physical_table() {
186 let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
187 let ddl_context = new_ddl_context(node_manager);
188 let physical_table_id = create_physical_table(&ddl_context, "phy").await;
189 let (_, table_route) = ddl_context
190 .table_metadata_manager
191 .table_route_manager()
192 .get_physical_table_route(physical_table_id)
193 .await
194 .unwrap();
195 {
196 let mut state = DropDatabaseExecutor::new(
197 physical_table_id,
198 physical_table_id,
199 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
200 table_route.region_routes.clone(),
201 DropTableTarget::Physical,
202 );
203 let mut ctx = DropDatabaseContext {
204 catalog: DEFAULT_CATALOG_NAME.to_string(),
205 schema: DEFAULT_SCHEMA_NAME.to_string(),
206 drop_if_exists: false,
207 tables: None,
208 };
209 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
210 assert!(!status.need_persist());
211 let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
212 assert_eq!(cursor.target, DropTableTarget::Physical);
213 }
214 let mut ctx = DropDatabaseContext {
216 catalog: DEFAULT_CATALOG_NAME.to_string(),
217 schema: DEFAULT_SCHEMA_NAME.to_string(),
218 drop_if_exists: false,
219 tables: None,
220 };
221 let mut state = DropDatabaseExecutor::new(
222 physical_table_id,
223 physical_table_id,
224 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
225 table_route.region_routes.clone(),
226 DropTableTarget::Physical,
227 );
228 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
229 assert!(!status.need_persist());
230 let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
231 assert_eq!(cursor.target, DropTableTarget::Physical);
232 }
233
234 #[tokio::test]
235 async fn test_next_logical_table() {
236 let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
237 let ddl_context = new_ddl_context(node_manager);
238 let physical_table_id = create_physical_table(&ddl_context, "phy").await;
239 create_logical_table(ddl_context.clone(), physical_table_id, "metric").await;
240 let logical_table_id = physical_table_id + 1;
241 let (_, table_route) = ddl_context
242 .table_metadata_manager
243 .table_route_manager()
244 .get_physical_table_route(logical_table_id)
245 .await
246 .unwrap();
247 {
248 let mut state = DropDatabaseExecutor::new(
249 logical_table_id,
250 physical_table_id,
251 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"),
252 table_route.region_routes.clone(),
253 DropTableTarget::Logical,
254 );
255 let mut ctx = DropDatabaseContext {
256 catalog: DEFAULT_CATALOG_NAME.to_string(),
257 schema: DEFAULT_SCHEMA_NAME.to_string(),
258 drop_if_exists: false,
259 tables: None,
260 };
261 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
262 assert!(!status.need_persist());
263 let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
264 assert_eq!(cursor.target, DropTableTarget::Logical);
265 }
266 let mut ctx = DropDatabaseContext {
268 catalog: DEFAULT_CATALOG_NAME.to_string(),
269 schema: DEFAULT_SCHEMA_NAME.to_string(),
270 drop_if_exists: false,
271 tables: None,
272 };
273 let mut state = DropDatabaseExecutor::new(
274 logical_table_id,
275 physical_table_id,
276 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"),
277 table_route.region_routes,
278 DropTableTarget::Logical,
279 );
280 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
281 assert!(!status.need_persist());
282 let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
283 assert_eq!(cursor.target, DropTableTarget::Logical);
284 ddl_context
286 .table_metadata_manager
287 .table_info_manager()
288 .get(physical_table_id)
289 .await
290 .unwrap()
291 .unwrap();
292 let table_route = ddl_context
294 .table_metadata_manager
295 .table_route_manager()
296 .table_route_storage()
297 .get(physical_table_id)
298 .await
299 .unwrap()
300 .unwrap();
301 let region_routes = table_route.region_routes().unwrap();
302 for datanode_id in region_distribution(region_routes).into_keys() {
303 ddl_context
304 .table_metadata_manager
305 .datanode_table_manager()
306 .get(&DatanodeTableKey::new(datanode_id, physical_table_id))
307 .await
308 .unwrap()
309 .unwrap();
310 }
311 }
312
313 #[derive(Clone)]
314 pub struct RetryErrorDatanodeHandler;
315
316 #[async_trait::async_trait]
317 impl MockDatanodeHandler for RetryErrorDatanodeHandler {
318 async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
319 Err(Error::RetryLater {
320 source: BoxedError::new(
321 error::UnexpectedSnafu {
322 err_msg: "retry later",
323 }
324 .build(),
325 ),
326 })
327 }
328
329 async fn handle_query(
330 &self,
331 _peer: &Peer,
332 _request: QueryRequest,
333 ) -> Result<SendableRecordBatchStream> {
334 unreachable!()
335 }
336 }
337
338 #[tokio::test]
339 async fn test_next_retryable_err() {
340 let node_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler));
341 let ddl_context = new_ddl_context(node_manager);
342 let physical_table_id = create_physical_table(&ddl_context, "phy").await;
343 let (_, table_route) = ddl_context
344 .table_metadata_manager
345 .table_route_manager()
346 .get_physical_table_route(physical_table_id)
347 .await
348 .unwrap();
349 let mut state = DropDatabaseExecutor::new(
350 physical_table_id,
351 physical_table_id,
352 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
353 table_route.region_routes,
354 DropTableTarget::Physical,
355 );
356 let mut ctx = DropDatabaseContext {
357 catalog: DEFAULT_CATALOG_NAME.to_string(),
358 schema: DEFAULT_SCHEMA_NAME.to_string(),
359 drop_if_exists: false,
360 tables: None,
361 };
362 let err = state.next(&ddl_context, &mut ctx).await.unwrap_err();
363 assert!(err.is_retry_later());
364 }
365
366 #[tokio::test]
367 async fn test_on_recovery() {
368 let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
369 let ddl_context = new_ddl_context(node_manager);
370 let physical_table_id = create_physical_table(&ddl_context, "phy").await;
371 let (_, table_route) = ddl_context
372 .table_metadata_manager
373 .table_route_manager()
374 .get_physical_table_route(physical_table_id)
375 .await
376 .unwrap();
377 {
378 let mut state = DropDatabaseExecutor::new(
379 physical_table_id,
380 physical_table_id,
381 TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
382 table_route.region_routes.clone(),
383 DropTableTarget::Physical,
384 );
385 let mut ctx = DropDatabaseContext {
386 catalog: DEFAULT_CATALOG_NAME.to_string(),
387 schema: DEFAULT_SCHEMA_NAME.to_string(),
388 drop_if_exists: false,
389 tables: None,
390 };
391 state.recover(&ddl_context).unwrap();
392 assert_eq!(state.dropping_regions.len(), 1);
393 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
394 assert!(!status.need_persist());
395 let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
396 assert_eq!(cursor.target, DropTableTarget::Physical);
397 }
398 }
399}