common_meta/ddl/drop_database/
start.rs1use std::any::Any;
16
17use common_procedure::Status;
18use serde::{Deserialize, Serialize};
19use snafu::ensure;
20
21use crate::ddl::drop_database::cursor::DropDatabaseCursor;
22use crate::ddl::drop_database::end::DropDatabaseEnd;
23use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
24use crate::ddl::DdlContext;
25use crate::error::{self, Result};
26use crate::key::schema_name::SchemaNameKey;
27
28#[derive(Debug, Serialize, Deserialize)]
29pub(crate) struct DropDatabaseStart;
30
31#[async_trait::async_trait]
32#[typetag::serde]
33impl State for DropDatabaseStart {
34 async fn next(
38 &mut self,
39 ddl_ctx: &DdlContext,
40 ctx: &mut DropDatabaseContext,
41 ) -> Result<(Box<dyn State>, Status)> {
42 let exists = ddl_ctx
43 .table_metadata_manager
44 .schema_manager()
45 .exists(SchemaNameKey {
46 catalog: &ctx.catalog,
47 schema: &ctx.schema,
48 })
49 .await?;
50
51 if !exists && ctx.drop_if_exists {
52 return Ok((Box::new(DropDatabaseEnd), Status::done()));
53 }
54
55 ensure!(
56 exists,
57 error::SchemaNotFoundSnafu {
58 table_schema: &ctx.schema,
59 }
60 );
61
62 Ok((
63 Box::new(DropDatabaseCursor::new(DropTableTarget::Logical)),
64 Status::executing(true),
65 ))
66 }
67
68 fn as_any(&self) -> &dyn Any {
69 self
70 }
71}
72
73#[cfg(test)]
74mod tests {
75 use std::assert_matches::assert_matches;
76 use std::sync::Arc;
77
78 use crate::ddl::drop_database::cursor::DropDatabaseCursor;
79 use crate::ddl::drop_database::end::DropDatabaseEnd;
80 use crate::ddl::drop_database::start::DropDatabaseStart;
81 use crate::ddl::drop_database::{DropDatabaseContext, State};
82 use crate::error;
83 use crate::key::schema_name::SchemaNameKey;
84 use crate::test_util::{new_ddl_context, MockDatanodeManager};
85
86 #[tokio::test]
87 async fn test_schema_not_exists_err() {
88 let node_manager = Arc::new(MockDatanodeManager::new(()));
89 let ddl_context = new_ddl_context(node_manager);
90 let mut step = DropDatabaseStart;
91 let mut ctx = DropDatabaseContext {
92 catalog: "foo".to_string(),
93 schema: "bar".to_string(),
94 drop_if_exists: false,
95 tables: None,
96 };
97 let err = step.next(&ddl_context, &mut ctx).await.unwrap_err();
98 assert_matches!(err, error::Error::SchemaNotFound { .. });
99 }
100
101 #[tokio::test]
102 async fn test_schema_not_exists() {
103 let node_manager = Arc::new(MockDatanodeManager::new(()));
104 let ddl_context = new_ddl_context(node_manager);
105 let mut state = DropDatabaseStart;
106 let mut ctx = DropDatabaseContext {
107 catalog: "foo".to_string(),
108 schema: "bar".to_string(),
109 drop_if_exists: true,
110 tables: None,
111 };
112 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
113 state.as_any().downcast_ref::<DropDatabaseEnd>().unwrap();
114 assert!(status.is_done());
115 }
116
117 #[tokio::test]
118 async fn test_next() {
119 let node_manager = Arc::new(MockDatanodeManager::new(()));
120 let ddl_context = new_ddl_context(node_manager);
121 ddl_context
122 .table_metadata_manager
123 .schema_manager()
124 .create(SchemaNameKey::new("foo", "bar"), None, true)
125 .await
126 .unwrap();
127 let mut state = DropDatabaseStart;
128 let mut ctx = DropDatabaseContext {
129 catalog: "foo".to_string(),
130 schema: "bar".to_string(),
131 drop_if_exists: false,
132 tables: None,
133 };
134 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
135 state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
136 assert!(status.need_persist());
137 }
138}