common_meta/ddl/
drop_database.rs1pub mod cursor;
16pub mod end;
17pub mod executor;
18pub mod metadata;
19pub mod start;
20use std::any::Any;
21use std::fmt::Debug;
22
23use common_error::ext::BoxedError;
24use common_procedure::error::{ExternalSnafu, FromJsonSnafu, ToJsonSnafu};
25use common_procedure::{
26 Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
27};
28use futures::stream::BoxStream;
29use serde::{Deserialize, Serialize};
30use snafu::ResultExt;
31use tonic::async_trait;
32
33use self::start::DropDatabaseStart;
34use crate::ddl::DdlContext;
35use crate::ddl::utils::map_to_procedure_error;
36use crate::error::Result;
37use crate::key::table_name::TableNameValue;
38use crate::lock_key::{CatalogLock, SchemaLock};
39
40pub struct DropDatabaseProcedure {
41 runtime_context: DdlContext,
43 context: DropDatabaseContext,
44
45 state: Box<dyn State>,
46}
47
48#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
50pub(crate) enum DropTableTarget {
51 Logical,
52 Physical,
53}
54
55pub(crate) struct DropDatabaseContext {
57 catalog: String,
58 schema: String,
59 drop_if_exists: bool,
60 tables: Option<BoxStream<'static, Result<(String, TableNameValue)>>>,
61 retrying: bool,
62}
63
64#[async_trait::async_trait]
65#[typetag::serde(tag = "drop_database_state")]
66pub(crate) trait State: Send + Debug {
67 async fn next(
69 &mut self,
70 ddl_ctx: &DdlContext,
71 ctx: &mut DropDatabaseContext,
72 ) -> Result<(Box<dyn State>, Status)>;
73
74 fn recover(&mut self, _ddl_ctx: &DdlContext) -> Result<()> {
76 Ok(())
77 }
78
79 fn as_any(&self) -> &dyn Any;
81}
82
83impl DropDatabaseProcedure {
84 pub const TYPE_NAME: &'static str = "metasrv-procedure::DropDatabase";
85
86 pub fn new(catalog: String, schema: String, drop_if_exists: bool, context: DdlContext) -> Self {
87 Self {
88 runtime_context: context,
89 context: DropDatabaseContext {
90 catalog,
91 schema,
92 drop_if_exists,
93 tables: None,
94 retrying: false,
95 },
96 state: Box::new(DropDatabaseStart),
97 }
98 }
99
100 pub fn from_json(json: &str, runtime_context: DdlContext) -> ProcedureResult<Self> {
101 let DropDatabaseOwnedData {
102 catalog,
103 schema,
104 drop_if_exists,
105 state,
106 } = serde_json::from_str(json).context(FromJsonSnafu)?;
107
108 Ok(Self {
109 runtime_context,
110 context: DropDatabaseContext {
111 catalog,
112 schema,
113 drop_if_exists,
114 tables: None,
115 retrying: false,
116 },
117 state,
118 })
119 }
120
121 #[cfg(test)]
122 pub(crate) fn state(&self) -> &dyn State {
123 self.state.as_ref()
124 }
125}
126
127#[async_trait]
128impl Procedure for DropDatabaseProcedure {
129 fn type_name(&self) -> &str {
130 Self::TYPE_NAME
131 }
132
133 fn recover(&mut self) -> ProcedureResult<()> {
134 self.state
135 .recover(&self.runtime_context)
136 .map_err(BoxedError::new)
137 .context(ExternalSnafu {
138 clean_poisons: false,
139 })
140 }
141
142 async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
143 let state = &mut self.state;
144
145 self.context.retrying = ctx.is_retrying().await.unwrap_or(false);
146 let (next, status) = state
147 .next(&self.runtime_context, &mut self.context)
148 .await
149 .map_err(map_to_procedure_error)?;
150
151 *state = next;
152 Ok(status)
153 }
154
155 fn dump(&self) -> ProcedureResult<String> {
156 let data = DropDatabaseData {
157 catalog: &self.context.catalog,
158 schema: &self.context.schema,
159 drop_if_exists: self.context.drop_if_exists,
160 state: self.state.as_ref(),
161 };
162
163 serde_json::to_string(&data).context(ToJsonSnafu)
164 }
165
166 fn lock_key(&self) -> LockKey {
167 let lock_key = vec![
168 CatalogLock::Read(&self.context.catalog).into(),
169 SchemaLock::write(&self.context.catalog, &self.context.schema).into(),
170 ];
171
172 LockKey::new(lock_key)
173 }
174}
175
176#[derive(Debug, Serialize)]
177struct DropDatabaseData<'a> {
178 catalog: &'a str,
180 schema: &'a str,
182 drop_if_exists: bool,
183 state: &'a dyn State,
184}
185
186#[derive(Debug, Deserialize)]
187struct DropDatabaseOwnedData {
188 catalog: String,
190 schema: String,
192 drop_if_exists: bool,
193 state: Box<dyn State>,
194}