common_meta/ddl/drop_database/
cursor.rs1use std::any::Any;
16
17use common_catalog::format_full_table_name;
18use common_procedure::Status;
19use futures::TryStreamExt;
20use serde::{Deserialize, Serialize};
21use snafu::OptionExt;
22use table::metadata::{TableId, TableType};
23use table::table_name::TableName;
24
25use crate::cache_invalidator::Context;
26use crate::ddl::DdlContext;
27use crate::ddl::drop_database::executor::DropDatabaseExecutor;
28use crate::ddl::drop_database::metadata::DropDatabaseRemoveMetadata;
29use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
30use crate::error::{Result, TableInfoNotFoundSnafu};
31use crate::instruction::CacheIdent;
32use crate::key::table_route::TableRouteValue;
33
34#[derive(Debug, Serialize, Deserialize)]
35pub(crate) struct DropDatabaseCursor {
36 pub(crate) target: DropTableTarget,
37}
38
39impl DropDatabaseCursor {
40 pub fn new(target: DropTableTarget) -> Self {
42 Self { target }
43 }
44
45 fn handle_reach_end(
46 &mut self,
47 ctx: &mut DropDatabaseContext,
48 ) -> Result<(Box<dyn State>, Status)> {
49 ctx.tables.take();
51 match self.target {
52 DropTableTarget::Logical => Ok((
53 Box::new(DropDatabaseCursor::new(DropTableTarget::Physical)),
54 Status::executing(true),
55 )),
56 DropTableTarget::Physical => Ok((
57 Box::new(DropDatabaseRemoveMetadata),
58 Status::executing(true),
59 )),
60 }
61 }
62
63 async fn handle_table(
64 &mut self,
65 ddl_ctx: &DdlContext,
66 ctx: &mut DropDatabaseContext,
67 table_name: String,
68 table_id: TableId,
69 table_route_value: TableRouteValue,
70 ) -> Result<(Box<dyn State>, Status)> {
71 match (self.target, table_route_value) {
72 (DropTableTarget::Logical, TableRouteValue::Logical(route)) => {
73 let physical_table_id = route.physical_table_id();
74
75 let (_, table_route) = ddl_ctx
76 .table_metadata_manager
77 .table_route_manager()
78 .get_physical_table_route(physical_table_id)
79 .await?;
80 Ok((
81 Box::new(DropDatabaseExecutor::new(
82 table_id,
83 table_id,
84 TableName::new(&ctx.catalog, &ctx.schema, &table_name),
85 table_route.region_routes,
86 self.target,
87 )),
88 Status::executing(true),
89 ))
90 }
91 (DropTableTarget::Physical, TableRouteValue::Physical(table_route)) => Ok((
92 Box::new(DropDatabaseExecutor::new(
93 table_id,
94 table_id,
95 TableName::new(&ctx.catalog, &ctx.schema, &table_name),
96 table_route.region_routes,
97 self.target,
98 )),
99 Status::executing(true),
100 )),
101 _ => Ok((
102 Box::new(DropDatabaseCursor::new(self.target)),
103 Status::executing(false),
104 )),
105 }
106 }
107
108 async fn handle_view(
109 &self,
110 ddl_ctx: &DdlContext,
111 ctx: &mut DropDatabaseContext,
112 table_name: String,
113 table_id: TableId,
114 ) -> Result<(Box<dyn State>, Status)> {
115 let view_name = TableName::new(&ctx.catalog, &ctx.schema, &table_name);
116 ddl_ctx
117 .table_metadata_manager
118 .destroy_view_info(table_id, &view_name)
119 .await?;
120
121 let cache_invalidator = &ddl_ctx.cache_invalidator;
122 let ctx = Context {
123 subject: Some("Invalidate table cache by dropping table".to_string()),
124 };
125
126 cache_invalidator
127 .invalidate(
128 &ctx,
129 &[
130 CacheIdent::TableName(view_name),
131 CacheIdent::TableId(table_id),
132 ],
133 )
134 .await?;
135
136 Ok((
137 Box::new(DropDatabaseCursor::new(self.target)),
138 Status::executing(false),
139 ))
140 }
141}
142
143#[async_trait::async_trait]
144#[typetag::serde]
145impl State for DropDatabaseCursor {
146 async fn next(
147 &mut self,
148 ddl_ctx: &DdlContext,
149 ctx: &mut DropDatabaseContext,
150 ) -> Result<(Box<dyn State>, Status)> {
151 if ctx.tables.as_deref().is_none() {
152 let tables = ddl_ctx
153 .table_metadata_manager
154 .table_name_manager()
155 .tables(&ctx.catalog, &ctx.schema);
156 ctx.tables = Some(tables);
157 }
158 match ctx.tables.as_mut().unwrap().try_next().await? {
160 Some((table_name, table_name_value)) => {
161 let table_id = table_name_value.table_id();
162
163 let table_info_value = ddl_ctx
164 .table_metadata_manager
165 .table_info_manager()
166 .get(table_id)
167 .await?
168 .with_context(|| TableInfoNotFoundSnafu {
169 table: format_full_table_name(&ctx.catalog, &ctx.schema, &table_name),
170 })?;
171
172 if table_info_value.table_info.table_type == TableType::View {
173 return self.handle_view(ddl_ctx, ctx, table_name, table_id).await;
174 }
175
176 match ddl_ctx
177 .table_metadata_manager
178 .table_route_manager()
179 .table_route_storage()
180 .get(table_id)
181 .await?
182 {
183 Some(table_route_value) => {
184 self.handle_table(ddl_ctx, ctx, table_name, table_id, table_route_value)
185 .await
186 }
187 None => Ok((
188 Box::new(DropDatabaseCursor::new(self.target)),
189 Status::executing(false),
190 )),
191 }
192 }
193 None => self.handle_reach_end(ctx),
194 }
195 }
196
197 fn as_any(&self) -> &dyn Any {
198 self
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use std::sync::Arc;
205
206 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
207
208 use crate::ddl::drop_database::cursor::DropDatabaseCursor;
209 use crate::ddl::drop_database::executor::DropDatabaseExecutor;
210 use crate::ddl::drop_database::metadata::DropDatabaseRemoveMetadata;
211 use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
212 use crate::ddl::test_util::{create_logical_table, create_physical_table};
213 use crate::test_util::{MockDatanodeManager, new_ddl_context};
214
215 #[tokio::test]
216 async fn test_next_without_logical_tables() {
217 let node_manager = Arc::new(MockDatanodeManager::new(()));
218 let ddl_context = new_ddl_context(node_manager);
219 create_physical_table(&ddl_context, "phy").await;
220 let mut state = DropDatabaseCursor::new(DropTableTarget::Logical);
222 let mut ctx = DropDatabaseContext {
223 catalog: DEFAULT_CATALOG_NAME.to_string(),
224 schema: DEFAULT_SCHEMA_NAME.to_string(),
225 drop_if_exists: false,
226 tables: None,
227 retrying: false,
228 };
229 let (mut state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
231 assert!(!status.need_persist());
232 let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
233 assert_eq!(cursor.target, DropTableTarget::Logical);
234 let (mut state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
236 assert!(status.need_persist());
237 assert!(ctx.tables.is_none());
238 let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
239 assert_eq!(cursor.target, DropTableTarget::Physical);
240 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
242 assert!(status.need_persist());
243 let executor = state
244 .as_any()
245 .downcast_ref::<DropDatabaseExecutor>()
246 .unwrap();
247 assert_eq!(executor.target, DropTableTarget::Physical);
248 }
249
250 #[tokio::test]
251 async fn test_next_with_logical_tables() {
252 let node_manager = Arc::new(MockDatanodeManager::new(()));
253 let ddl_context = new_ddl_context(node_manager);
254 let physical_table_id = create_physical_table(&ddl_context, "phy").await;
255 create_logical_table(ddl_context.clone(), physical_table_id, "metric_0").await;
256 let mut state = DropDatabaseCursor::new(DropTableTarget::Logical);
258 let mut ctx = DropDatabaseContext {
259 catalog: DEFAULT_CATALOG_NAME.to_string(),
260 schema: DEFAULT_SCHEMA_NAME.to_string(),
261 drop_if_exists: false,
262 tables: None,
263 retrying: false,
264 };
265 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
267 assert!(status.need_persist());
268 let executor = state
269 .as_any()
270 .downcast_ref::<DropDatabaseExecutor>()
271 .unwrap();
272 let (_, table_route) = ddl_context
273 .table_metadata_manager
274 .table_route_manager()
275 .get_physical_table_route(physical_table_id)
276 .await
277 .unwrap();
278 assert_eq!(table_route.region_routes, executor.physical_region_routes);
279 assert_eq!(executor.target, DropTableTarget::Logical);
280 }
281
282 #[tokio::test]
283 async fn test_reach_the_end() {
284 let node_manager = Arc::new(MockDatanodeManager::new(()));
285 let ddl_context = new_ddl_context(node_manager);
286 let mut state = DropDatabaseCursor::new(DropTableTarget::Physical);
287 let mut ctx = DropDatabaseContext {
288 catalog: DEFAULT_CATALOG_NAME.to_string(),
289 schema: DEFAULT_SCHEMA_NAME.to_string(),
290 drop_if_exists: false,
291 tables: None,
292 retrying: false,
293 };
294 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
296 assert!(status.need_persist());
297 state
298 .as_any()
299 .downcast_ref::<DropDatabaseRemoveMetadata>()
300 .unwrap();
301 assert!(ctx.tables.is_none());
302 }
303}