common_meta/ddl/drop_database/
cursor.rs1use std::any::Any;
16
17use common_catalog::format_full_table_name;
18use common_procedure::Status;
19use futures::TryStreamExt;
20use serde::{Deserialize, Serialize};
21use snafu::OptionExt;
22use table::metadata::{TableId, TableType};
23use table::table_name::TableName;
24
25use crate::cache_invalidator::Context;
26use crate::ddl::drop_database::executor::DropDatabaseExecutor;
27use crate::ddl::drop_database::metadata::DropDatabaseRemoveMetadata;
28use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
29use crate::ddl::DdlContext;
30use crate::error::{Result, TableInfoNotFoundSnafu};
31use crate::instruction::CacheIdent;
32use crate::key::table_route::TableRouteValue;
33
34#[derive(Debug, Serialize, Deserialize)]
35pub(crate) struct DropDatabaseCursor {
36 pub(crate) target: DropTableTarget,
37}
38
39impl DropDatabaseCursor {
40 pub fn new(target: DropTableTarget) -> Self {
42 Self { target }
43 }
44
45 fn handle_reach_end(
46 &mut self,
47 ctx: &mut DropDatabaseContext,
48 ) -> Result<(Box<dyn State>, Status)> {
49 ctx.tables.take();
51 match self.target {
52 DropTableTarget::Logical => Ok((
53 Box::new(DropDatabaseCursor::new(DropTableTarget::Physical)),
54 Status::executing(true),
55 )),
56 DropTableTarget::Physical => Ok((
57 Box::new(DropDatabaseRemoveMetadata),
58 Status::executing(true),
59 )),
60 }
61 }
62
63 async fn handle_table(
64 &mut self,
65 ddl_ctx: &DdlContext,
66 ctx: &mut DropDatabaseContext,
67 table_name: String,
68 table_id: TableId,
69 table_route_value: TableRouteValue,
70 ) -> Result<(Box<dyn State>, Status)> {
71 match (self.target, table_route_value) {
72 (DropTableTarget::Logical, TableRouteValue::Logical(route)) => {
73 let physical_table_id = route.physical_table_id();
74
75 let (_, table_route) = ddl_ctx
76 .table_metadata_manager
77 .table_route_manager()
78 .get_physical_table_route(physical_table_id)
79 .await?;
80 Ok((
81 Box::new(DropDatabaseExecutor::new(
82 table_id,
83 table_id,
84 TableName::new(&ctx.catalog, &ctx.schema, &table_name),
85 table_route.region_routes,
86 self.target,
87 )),
88 Status::executing(true),
89 ))
90 }
91 (DropTableTarget::Physical, TableRouteValue::Physical(table_route)) => Ok((
92 Box::new(DropDatabaseExecutor::new(
93 table_id,
94 table_id,
95 TableName::new(&ctx.catalog, &ctx.schema, &table_name),
96 table_route.region_routes,
97 self.target,
98 )),
99 Status::executing(true),
100 )),
101 _ => Ok((
102 Box::new(DropDatabaseCursor::new(self.target)),
103 Status::executing(false),
104 )),
105 }
106 }
107
108 async fn handle_view(
109 &self,
110 ddl_ctx: &DdlContext,
111 ctx: &mut DropDatabaseContext,
112 table_name: String,
113 table_id: TableId,
114 ) -> Result<(Box<dyn State>, Status)> {
115 let view_name = TableName::new(&ctx.catalog, &ctx.schema, &table_name);
116 ddl_ctx
117 .table_metadata_manager
118 .destroy_view_info(table_id, &view_name)
119 .await?;
120
121 let cache_invalidator = &ddl_ctx.cache_invalidator;
122 let ctx = Context {
123 subject: Some("Invalidate table cache by dropping table".to_string()),
124 };
125
126 cache_invalidator
127 .invalidate(
128 &ctx,
129 &[
130 CacheIdent::TableName(view_name),
131 CacheIdent::TableId(table_id),
132 ],
133 )
134 .await?;
135
136 Ok((
137 Box::new(DropDatabaseCursor::new(self.target)),
138 Status::executing(false),
139 ))
140 }
141}
142
143#[async_trait::async_trait]
144#[typetag::serde]
145impl State for DropDatabaseCursor {
146 async fn next(
147 &mut self,
148 ddl_ctx: &DdlContext,
149 ctx: &mut DropDatabaseContext,
150 ) -> Result<(Box<dyn State>, Status)> {
151 if ctx.tables.as_deref().is_none() {
152 let tables = ddl_ctx
153 .table_metadata_manager
154 .table_name_manager()
155 .tables(&ctx.catalog, &ctx.schema);
156 ctx.tables = Some(tables);
157 }
158 match ctx.tables.as_mut().unwrap().try_next().await? {
160 Some((table_name, table_name_value)) => {
161 let table_id = table_name_value.table_id();
162
163 let table_info_value = ddl_ctx
164 .table_metadata_manager
165 .table_info_manager()
166 .get(table_id)
167 .await?
168 .with_context(|| TableInfoNotFoundSnafu {
169 table: format_full_table_name(&ctx.catalog, &ctx.schema, &table_name),
170 })?;
171
172 if table_info_value.table_info.table_type == TableType::View {
173 return self.handle_view(ddl_ctx, ctx, table_name, table_id).await;
174 }
175
176 match ddl_ctx
177 .table_metadata_manager
178 .table_route_manager()
179 .table_route_storage()
180 .get(table_id)
181 .await?
182 {
183 Some(table_route_value) => {
184 self.handle_table(ddl_ctx, ctx, table_name, table_id, table_route_value)
185 .await
186 }
187 None => Ok((
188 Box::new(DropDatabaseCursor::new(self.target)),
189 Status::executing(false),
190 )),
191 }
192 }
193 None => self.handle_reach_end(ctx),
194 }
195 }
196
197 fn as_any(&self) -> &dyn Any {
198 self
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use std::sync::Arc;
205
206 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
207
208 use crate::ddl::drop_database::cursor::DropDatabaseCursor;
209 use crate::ddl::drop_database::executor::DropDatabaseExecutor;
210 use crate::ddl::drop_database::metadata::DropDatabaseRemoveMetadata;
211 use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
212 use crate::ddl::test_util::{create_logical_table, create_physical_table};
213 use crate::test_util::{new_ddl_context, MockDatanodeManager};
214
215 #[tokio::test]
216 async fn test_next_without_logical_tables() {
217 let node_manager = Arc::new(MockDatanodeManager::new(()));
218 let ddl_context = new_ddl_context(node_manager);
219 create_physical_table(&ddl_context, "phy").await;
220 let mut state = DropDatabaseCursor::new(DropTableTarget::Logical);
222 let mut ctx = DropDatabaseContext {
223 catalog: DEFAULT_CATALOG_NAME.to_string(),
224 schema: DEFAULT_SCHEMA_NAME.to_string(),
225 drop_if_exists: false,
226 tables: None,
227 };
228 let (mut state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
230 assert!(!status.need_persist());
231 let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
232 assert_eq!(cursor.target, DropTableTarget::Logical);
233 let (mut state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
235 assert!(status.need_persist());
236 assert!(ctx.tables.is_none());
237 let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
238 assert_eq!(cursor.target, DropTableTarget::Physical);
239 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
241 assert!(status.need_persist());
242 let executor = state
243 .as_any()
244 .downcast_ref::<DropDatabaseExecutor>()
245 .unwrap();
246 assert_eq!(executor.target, DropTableTarget::Physical);
247 }
248
249 #[tokio::test]
250 async fn test_next_with_logical_tables() {
251 let node_manager = Arc::new(MockDatanodeManager::new(()));
252 let ddl_context = new_ddl_context(node_manager);
253 let physical_table_id = create_physical_table(&ddl_context, "phy").await;
254 create_logical_table(ddl_context.clone(), physical_table_id, "metric_0").await;
255 let mut state = DropDatabaseCursor::new(DropTableTarget::Logical);
257 let mut ctx = DropDatabaseContext {
258 catalog: DEFAULT_CATALOG_NAME.to_string(),
259 schema: DEFAULT_SCHEMA_NAME.to_string(),
260 drop_if_exists: false,
261 tables: None,
262 };
263 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
265 assert!(status.need_persist());
266 let executor = state
267 .as_any()
268 .downcast_ref::<DropDatabaseExecutor>()
269 .unwrap();
270 let (_, table_route) = ddl_context
271 .table_metadata_manager
272 .table_route_manager()
273 .get_physical_table_route(physical_table_id)
274 .await
275 .unwrap();
276 assert_eq!(table_route.region_routes, executor.physical_region_routes);
277 assert_eq!(executor.target, DropTableTarget::Logical);
278 }
279
280 #[tokio::test]
281 async fn test_reach_the_end() {
282 let node_manager = Arc::new(MockDatanodeManager::new(()));
283 let ddl_context = new_ddl_context(node_manager);
284 let mut state = DropDatabaseCursor::new(DropTableTarget::Physical);
285 let mut ctx = DropDatabaseContext {
286 catalog: DEFAULT_CATALOG_NAME.to_string(),
287 schema: DEFAULT_SCHEMA_NAME.to_string(),
288 drop_if_exists: false,
289 tables: None,
290 };
291 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
293 assert!(status.need_persist());
294 state
295 .as_any()
296 .downcast_ref::<DropDatabaseRemoveMetadata>()
297 .unwrap();
298 assert!(ctx.tables.is_none());
299 }
300}