common_meta/ddl/drop_database/
start.rs1use std::any::Any;
16
17use common_procedure::Status;
18use serde::{Deserialize, Serialize};
19use snafu::ensure;
20
21use crate::ddl::DdlContext;
22use crate::ddl::drop_database::cursor::DropDatabaseCursor;
23use crate::ddl::drop_database::end::DropDatabaseEnd;
24use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
25use crate::error::{self, Result};
26use crate::key::schema_name::SchemaNameKey;
27
28#[derive(Debug, Serialize, Deserialize)]
29pub(crate) struct DropDatabaseStart;
30
31#[async_trait::async_trait]
32#[typetag::serde]
33impl State for DropDatabaseStart {
34 async fn next(
38 &mut self,
39 ddl_ctx: &DdlContext,
40 ctx: &mut DropDatabaseContext,
41 ) -> Result<(Box<dyn State>, Status)> {
42 let exists = ddl_ctx
43 .table_metadata_manager
44 .schema_manager()
45 .exists(SchemaNameKey {
46 catalog: &ctx.catalog,
47 schema: &ctx.schema,
48 })
49 .await?;
50
51 if !exists && ctx.drop_if_exists {
52 return Ok((Box::new(DropDatabaseEnd), Status::done()));
53 }
54
55 ensure!(
56 exists,
57 error::SchemaNotFoundSnafu {
58 table_schema: &ctx.schema,
59 }
60 );
61
62 Ok((
63 Box::new(DropDatabaseCursor::new(DropTableTarget::Logical)),
64 Status::executing(true),
65 ))
66 }
67
68 fn as_any(&self) -> &dyn Any {
69 self
70 }
71}
72
73#[cfg(test)]
74mod tests {
75 use std::assert_matches;
76 use std::sync::Arc;
77
78 use crate::ddl::drop_database::cursor::DropDatabaseCursor;
79 use crate::ddl::drop_database::end::DropDatabaseEnd;
80 use crate::ddl::drop_database::start::DropDatabaseStart;
81 use crate::ddl::drop_database::{DropDatabaseContext, State};
82 use crate::error;
83 use crate::key::schema_name::SchemaNameKey;
84 use crate::test_util::{MockDatanodeManager, new_ddl_context};
85
86 #[tokio::test]
87 async fn test_schema_not_exists_err() {
88 let node_manager = Arc::new(MockDatanodeManager::new(()));
89 let ddl_context = new_ddl_context(node_manager);
90 let mut step = DropDatabaseStart;
91 let mut ctx = DropDatabaseContext {
92 catalog: "foo".to_string(),
93 schema: "bar".to_string(),
94 drop_if_exists: false,
95 tables: None,
96 retrying: false,
97 };
98 let err = step.next(&ddl_context, &mut ctx).await.unwrap_err();
99 assert_matches!(err, error::Error::SchemaNotFound { .. });
100 }
101
102 #[tokio::test]
103 async fn test_schema_not_exists() {
104 let node_manager = Arc::new(MockDatanodeManager::new(()));
105 let ddl_context = new_ddl_context(node_manager);
106 let mut state = DropDatabaseStart;
107 let mut ctx = DropDatabaseContext {
108 catalog: "foo".to_string(),
109 schema: "bar".to_string(),
110 drop_if_exists: true,
111 tables: None,
112 retrying: false,
113 };
114 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
115 state.as_any().downcast_ref::<DropDatabaseEnd>().unwrap();
116 assert!(status.is_done());
117 }
118
119 #[tokio::test]
120 async fn test_next() {
121 let node_manager = Arc::new(MockDatanodeManager::new(()));
122 let ddl_context = new_ddl_context(node_manager);
123 ddl_context
124 .table_metadata_manager
125 .schema_manager()
126 .create(SchemaNameKey::new("foo", "bar"), None, true)
127 .await
128 .unwrap();
129 let mut state = DropDatabaseStart;
130 let mut ctx = DropDatabaseContext {
131 catalog: "foo".to_string(),
132 schema: "bar".to_string(),
133 drop_if_exists: false,
134 tables: None,
135 retrying: false,
136 };
137 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
138 state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
139 assert!(status.need_persist());
140 }
141}