common_meta/ddl/
drop_database.rs1pub mod cursor;
16pub mod end;
17pub mod executor;
18pub mod metadata;
19pub mod start;
20use std::any::Any;
21use std::fmt::Debug;
22
23use common_error::ext::BoxedError;
24use common_procedure::error::{Error as ProcedureError, ExternalSnafu, FromJsonSnafu, ToJsonSnafu};
25use common_procedure::{
26 Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
27};
28use futures::stream::BoxStream;
29use serde::{Deserialize, Serialize};
30use snafu::ResultExt;
31use tonic::async_trait;
32
33use self::start::DropDatabaseStart;
34use crate::ddl::DdlContext;
35use crate::error::Result;
36use crate::key::table_name::TableNameValue;
37use crate::lock_key::{CatalogLock, SchemaLock};
38
39pub struct DropDatabaseProcedure {
40 runtime_context: DdlContext,
42 context: DropDatabaseContext,
43
44 state: Box<dyn State>,
45}
46
47#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
49pub(crate) enum DropTableTarget {
50 Logical,
51 Physical,
52}
53
54pub(crate) struct DropDatabaseContext {
56 catalog: String,
57 schema: String,
58 drop_if_exists: bool,
59 tables: Option<BoxStream<'static, Result<(String, TableNameValue)>>>,
60}
61
62#[async_trait::async_trait]
63#[typetag::serde(tag = "drop_database_state")]
64pub(crate) trait State: Send + Debug {
65 async fn next(
67 &mut self,
68 ddl_ctx: &DdlContext,
69 ctx: &mut DropDatabaseContext,
70 ) -> Result<(Box<dyn State>, Status)>;
71
72 fn recover(&mut self, _ddl_ctx: &DdlContext) -> Result<()> {
74 Ok(())
75 }
76
77 fn as_any(&self) -> &dyn Any;
79}
80
81impl DropDatabaseProcedure {
82 pub const TYPE_NAME: &'static str = "metasrv-procedure::DropDatabase";
83
84 pub fn new(catalog: String, schema: String, drop_if_exists: bool, context: DdlContext) -> Self {
85 Self {
86 runtime_context: context,
87 context: DropDatabaseContext {
88 catalog,
89 schema,
90 drop_if_exists,
91 tables: None,
92 },
93 state: Box::new(DropDatabaseStart),
94 }
95 }
96
97 pub fn from_json(json: &str, runtime_context: DdlContext) -> ProcedureResult<Self> {
98 let DropDatabaseOwnedData {
99 catalog,
100 schema,
101 drop_if_exists,
102 state,
103 } = serde_json::from_str(json).context(FromJsonSnafu)?;
104
105 Ok(Self {
106 runtime_context,
107 context: DropDatabaseContext {
108 catalog,
109 schema,
110 drop_if_exists,
111 tables: None,
112 },
113 state,
114 })
115 }
116
117 #[cfg(test)]
118 pub(crate) fn state(&self) -> &dyn State {
119 self.state.as_ref()
120 }
121}
122
123#[async_trait]
124impl Procedure for DropDatabaseProcedure {
125 fn type_name(&self) -> &str {
126 Self::TYPE_NAME
127 }
128
129 fn recover(&mut self) -> ProcedureResult<()> {
130 self.state
131 .recover(&self.runtime_context)
132 .map_err(BoxedError::new)
133 .context(ExternalSnafu {
134 clean_poisons: false,
135 })
136 }
137
138 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
139 let state = &mut self.state;
140
141 let (next, status) = state
142 .next(&self.runtime_context, &mut self.context)
143 .await
144 .map_err(|e| {
145 if e.is_retry_later() {
146 ProcedureError::retry_later(e)
147 } else {
148 ProcedureError::external(e)
149 }
150 })?;
151
152 *state = next;
153 Ok(status)
154 }
155
156 fn dump(&self) -> ProcedureResult<String> {
157 let data = DropDatabaseData {
158 catalog: &self.context.catalog,
159 schema: &self.context.schema,
160 drop_if_exists: self.context.drop_if_exists,
161 state: self.state.as_ref(),
162 };
163
164 serde_json::to_string(&data).context(ToJsonSnafu)
165 }
166
167 fn lock_key(&self) -> LockKey {
168 let lock_key = vec![
169 CatalogLock::Read(&self.context.catalog).into(),
170 SchemaLock::write(&self.context.catalog, &self.context.schema).into(),
171 ];
172
173 LockKey::new(lock_key)
174 }
175}
176
177#[derive(Debug, Serialize)]
178struct DropDatabaseData<'a> {
179 catalog: &'a str,
181 schema: &'a str,
183 drop_if_exists: bool,
184 state: &'a dyn State,
185}
186
187#[derive(Debug, Deserialize)]
188struct DropDatabaseOwnedData {
189 catalog: String,
191 schema: String,
193 drop_if_exists: bool,
194 state: Box<dyn State>,
195}