common_meta/ddl/
drop_database.rs1pub mod cursor;
16pub mod end;
17pub mod executor;
18pub mod metadata;
19pub mod start;
20use std::any::Any;
21use std::fmt::Debug;
22
23use common_error::ext::BoxedError;
24use common_procedure::error::{ExternalSnafu, FromJsonSnafu, ToJsonSnafu};
25use common_procedure::{
26 Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
27};
28use futures::stream::BoxStream;
29use serde::{Deserialize, Serialize};
30use snafu::ResultExt;
31use tonic::async_trait;
32
33use self::start::DropDatabaseStart;
34use crate::ddl::utils::map_to_procedure_error;
35use crate::ddl::DdlContext;
36use crate::error::Result;
37use crate::key::table_name::TableNameValue;
38use crate::lock_key::{CatalogLock, SchemaLock};
39
40pub struct DropDatabaseProcedure {
41 runtime_context: DdlContext,
43 context: DropDatabaseContext,
44
45 state: Box<dyn State>,
46}
47
48#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
50pub(crate) enum DropTableTarget {
51 Logical,
52 Physical,
53}
54
55pub(crate) struct DropDatabaseContext {
57 catalog: String,
58 schema: String,
59 drop_if_exists: bool,
60 tables: Option<BoxStream<'static, Result<(String, TableNameValue)>>>,
61}
62
63#[async_trait::async_trait]
64#[typetag::serde(tag = "drop_database_state")]
65pub(crate) trait State: Send + Debug {
66 async fn next(
68 &mut self,
69 ddl_ctx: &DdlContext,
70 ctx: &mut DropDatabaseContext,
71 ) -> Result<(Box<dyn State>, Status)>;
72
73 fn recover(&mut self, _ddl_ctx: &DdlContext) -> Result<()> {
75 Ok(())
76 }
77
78 fn as_any(&self) -> &dyn Any;
80}
81
82impl DropDatabaseProcedure {
83 pub const TYPE_NAME: &'static str = "metasrv-procedure::DropDatabase";
84
85 pub fn new(catalog: String, schema: String, drop_if_exists: bool, context: DdlContext) -> Self {
86 Self {
87 runtime_context: context,
88 context: DropDatabaseContext {
89 catalog,
90 schema,
91 drop_if_exists,
92 tables: None,
93 },
94 state: Box::new(DropDatabaseStart),
95 }
96 }
97
98 pub fn from_json(json: &str, runtime_context: DdlContext) -> ProcedureResult<Self> {
99 let DropDatabaseOwnedData {
100 catalog,
101 schema,
102 drop_if_exists,
103 state,
104 } = serde_json::from_str(json).context(FromJsonSnafu)?;
105
106 Ok(Self {
107 runtime_context,
108 context: DropDatabaseContext {
109 catalog,
110 schema,
111 drop_if_exists,
112 tables: None,
113 },
114 state,
115 })
116 }
117
118 #[cfg(test)]
119 pub(crate) fn state(&self) -> &dyn State {
120 self.state.as_ref()
121 }
122}
123
124#[async_trait]
125impl Procedure for DropDatabaseProcedure {
126 fn type_name(&self) -> &str {
127 Self::TYPE_NAME
128 }
129
130 fn recover(&mut self) -> ProcedureResult<()> {
131 self.state
132 .recover(&self.runtime_context)
133 .map_err(BoxedError::new)
134 .context(ExternalSnafu {
135 clean_poisons: false,
136 })
137 }
138
139 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
140 let state = &mut self.state;
141
142 let (next, status) = state
143 .next(&self.runtime_context, &mut self.context)
144 .await
145 .map_err(map_to_procedure_error)?;
146
147 *state = next;
148 Ok(status)
149 }
150
151 fn dump(&self) -> ProcedureResult<String> {
152 let data = DropDatabaseData {
153 catalog: &self.context.catalog,
154 schema: &self.context.schema,
155 drop_if_exists: self.context.drop_if_exists,
156 state: self.state.as_ref(),
157 };
158
159 serde_json::to_string(&data).context(ToJsonSnafu)
160 }
161
162 fn lock_key(&self) -> LockKey {
163 let lock_key = vec![
164 CatalogLock::Read(&self.context.catalog).into(),
165 SchemaLock::write(&self.context.catalog, &self.context.schema).into(),
166 ];
167
168 LockKey::new(lock_key)
169 }
170}
171
172#[derive(Debug, Serialize)]
173struct DropDatabaseData<'a> {
174 catalog: &'a str,
176 schema: &'a str,
178 drop_if_exists: bool,
179 state: &'a dyn State,
180}
181
182#[derive(Debug, Deserialize)]
183struct DropDatabaseOwnedData {
184 catalog: String,
186 schema: String,
188 drop_if_exists: bool,
189 state: Box<dyn State>,
190}