common_meta/ddl/
drop_database.rs1pub mod cursor;
16pub mod end;
17pub mod executor;
18pub mod metadata;
19pub mod start;
20use std::any::Any;
21use std::fmt::Debug;
22
23use common_error::ext::BoxedError;
24use common_procedure::error::{ExternalSnafu, FromJsonSnafu, ToJsonSnafu};
25use common_procedure::{
26    Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
27};
28use futures::stream::BoxStream;
29use serde::{Deserialize, Serialize};
30use snafu::ResultExt;
31use tonic::async_trait;
32
33use self::start::DropDatabaseStart;
34use crate::ddl::DdlContext;
35use crate::ddl::utils::map_to_procedure_error;
36use crate::error::Result;
37use crate::key::table_name::TableNameValue;
38use crate::lock_key::{CatalogLock, SchemaLock};
39
40pub struct DropDatabaseProcedure {
41    runtime_context: DdlContext,
43    context: DropDatabaseContext,
44
45    state: Box<dyn State>,
46}
47
48#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
50pub(crate) enum DropTableTarget {
51    Logical,
52    Physical,
53}
54
55pub(crate) struct DropDatabaseContext {
57    catalog: String,
58    schema: String,
59    drop_if_exists: bool,
60    tables: Option<BoxStream<'static, Result<(String, TableNameValue)>>>,
61}
62
63#[async_trait::async_trait]
64#[typetag::serde(tag = "drop_database_state")]
65pub(crate) trait State: Send + Debug {
66    async fn next(
68        &mut self,
69        ddl_ctx: &DdlContext,
70        ctx: &mut DropDatabaseContext,
71    ) -> Result<(Box<dyn State>, Status)>;
72
73    fn recover(&mut self, _ddl_ctx: &DdlContext) -> Result<()> {
75        Ok(())
76    }
77
78    fn as_any(&self) -> &dyn Any;
80}
81
82impl DropDatabaseProcedure {
83    pub const TYPE_NAME: &'static str = "metasrv-procedure::DropDatabase";
84
85    pub fn new(catalog: String, schema: String, drop_if_exists: bool, context: DdlContext) -> Self {
86        Self {
87            runtime_context: context,
88            context: DropDatabaseContext {
89                catalog,
90                schema,
91                drop_if_exists,
92                tables: None,
93            },
94            state: Box::new(DropDatabaseStart),
95        }
96    }
97
98    pub fn from_json(json: &str, runtime_context: DdlContext) -> ProcedureResult<Self> {
99        let DropDatabaseOwnedData {
100            catalog,
101            schema,
102            drop_if_exists,
103            state,
104        } = serde_json::from_str(json).context(FromJsonSnafu)?;
105
106        Ok(Self {
107            runtime_context,
108            context: DropDatabaseContext {
109                catalog,
110                schema,
111                drop_if_exists,
112                tables: None,
113            },
114            state,
115        })
116    }
117
118    #[cfg(test)]
119    pub(crate) fn state(&self) -> &dyn State {
120        self.state.as_ref()
121    }
122}
123
124#[async_trait]
125impl Procedure for DropDatabaseProcedure {
126    fn type_name(&self) -> &str {
127        Self::TYPE_NAME
128    }
129
130    fn recover(&mut self) -> ProcedureResult<()> {
131        self.state
132            .recover(&self.runtime_context)
133            .map_err(BoxedError::new)
134            .context(ExternalSnafu {
135                clean_poisons: false,
136            })
137    }
138
139    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
140        let state = &mut self.state;
141
142        let (next, status) = state
143            .next(&self.runtime_context, &mut self.context)
144            .await
145            .map_err(map_to_procedure_error)?;
146
147        *state = next;
148        Ok(status)
149    }
150
151    fn dump(&self) -> ProcedureResult<String> {
152        let data = DropDatabaseData {
153            catalog: &self.context.catalog,
154            schema: &self.context.schema,
155            drop_if_exists: self.context.drop_if_exists,
156            state: self.state.as_ref(),
157        };
158
159        serde_json::to_string(&data).context(ToJsonSnafu)
160    }
161
162    fn lock_key(&self) -> LockKey {
163        let lock_key = vec![
164            CatalogLock::Read(&self.context.catalog).into(),
165            SchemaLock::write(&self.context.catalog, &self.context.schema).into(),
166        ];
167
168        LockKey::new(lock_key)
169    }
170}
171
172#[derive(Debug, Serialize)]
173struct DropDatabaseData<'a> {
174    catalog: &'a str,
176    schema: &'a str,
178    drop_if_exists: bool,
179    state: &'a dyn State,
180}
181
182#[derive(Debug, Deserialize)]
183struct DropDatabaseOwnedData {
184    catalog: String,
186    schema: String,
188    drop_if_exists: bool,
189    state: Box<dyn State>,
190}