common_meta/ddl/
drop_database.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod cursor;
16pub mod end;
17pub mod executor;
18pub mod metadata;
19pub mod start;
20use std::any::Any;
21use std::fmt::Debug;
22
23use common_error::ext::BoxedError;
24use common_procedure::error::{Error as ProcedureError, ExternalSnafu, FromJsonSnafu, ToJsonSnafu};
25use common_procedure::{
26    Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
27};
28use futures::stream::BoxStream;
29use serde::{Deserialize, Serialize};
30use snafu::ResultExt;
31use tonic::async_trait;
32
33use self::start::DropDatabaseStart;
34use crate::ddl::DdlContext;
35use crate::error::Result;
36use crate::key::table_name::TableNameValue;
37use crate::lock_key::{CatalogLock, SchemaLock};
38
39pub struct DropDatabaseProcedure {
40    /// The context of procedure runtime.
41    runtime_context: DdlContext,
42    context: DropDatabaseContext,
43
44    state: Box<dyn State>,
45}
46
47/// Target of dropping tables.
48#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
49pub(crate) enum DropTableTarget {
50    Logical,
51    Physical,
52}
53
54/// Context of [DropDatabaseProcedure] execution.
55pub(crate) struct DropDatabaseContext {
56    catalog: String,
57    schema: String,
58    drop_if_exists: bool,
59    tables: Option<BoxStream<'static, Result<(String, TableNameValue)>>>,
60}
61
62#[async_trait::async_trait]
63#[typetag::serde(tag = "drop_database_state")]
64pub(crate) trait State: Send + Debug {
65    /// Yields the next [State] and [Status].
66    async fn next(
67        &mut self,
68        ddl_ctx: &DdlContext,
69        ctx: &mut DropDatabaseContext,
70    ) -> Result<(Box<dyn State>, Status)>;
71
72    /// The hook is called during the recovery.
73    fn recover(&mut self, _ddl_ctx: &DdlContext) -> Result<()> {
74        Ok(())
75    }
76
77    /// Returns as [Any](std::any::Any).
78    fn as_any(&self) -> &dyn Any;
79}
80
81impl DropDatabaseProcedure {
82    pub const TYPE_NAME: &'static str = "metasrv-procedure::DropDatabase";
83
84    pub fn new(catalog: String, schema: String, drop_if_exists: bool, context: DdlContext) -> Self {
85        Self {
86            runtime_context: context,
87            context: DropDatabaseContext {
88                catalog,
89                schema,
90                drop_if_exists,
91                tables: None,
92            },
93            state: Box::new(DropDatabaseStart),
94        }
95    }
96
97    pub fn from_json(json: &str, runtime_context: DdlContext) -> ProcedureResult<Self> {
98        let DropDatabaseOwnedData {
99            catalog,
100            schema,
101            drop_if_exists,
102            state,
103        } = serde_json::from_str(json).context(FromJsonSnafu)?;
104
105        Ok(Self {
106            runtime_context,
107            context: DropDatabaseContext {
108                catalog,
109                schema,
110                drop_if_exists,
111                tables: None,
112            },
113            state,
114        })
115    }
116
117    #[cfg(test)]
118    pub(crate) fn state(&self) -> &dyn State {
119        self.state.as_ref()
120    }
121}
122
123#[async_trait]
124impl Procedure for DropDatabaseProcedure {
125    fn type_name(&self) -> &str {
126        Self::TYPE_NAME
127    }
128
129    fn recover(&mut self) -> ProcedureResult<()> {
130        self.state
131            .recover(&self.runtime_context)
132            .map_err(BoxedError::new)
133            .context(ExternalSnafu {
134                clean_poisons: false,
135            })
136    }
137
138    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
139        let state = &mut self.state;
140
141        let (next, status) = state
142            .next(&self.runtime_context, &mut self.context)
143            .await
144            .map_err(|e| {
145                if e.is_retry_later() {
146                    ProcedureError::retry_later(e)
147                } else {
148                    ProcedureError::external(e)
149                }
150            })?;
151
152        *state = next;
153        Ok(status)
154    }
155
156    fn dump(&self) -> ProcedureResult<String> {
157        let data = DropDatabaseData {
158            catalog: &self.context.catalog,
159            schema: &self.context.schema,
160            drop_if_exists: self.context.drop_if_exists,
161            state: self.state.as_ref(),
162        };
163
164        serde_json::to_string(&data).context(ToJsonSnafu)
165    }
166
167    fn lock_key(&self) -> LockKey {
168        let lock_key = vec![
169            CatalogLock::Read(&self.context.catalog).into(),
170            SchemaLock::write(&self.context.catalog, &self.context.schema).into(),
171        ];
172
173        LockKey::new(lock_key)
174    }
175}
176
177#[derive(Debug, Serialize)]
178struct DropDatabaseData<'a> {
179    // The catalog name
180    catalog: &'a str,
181    // The schema name
182    schema: &'a str,
183    drop_if_exists: bool,
184    state: &'a dyn State,
185}
186
187#[derive(Debug, Deserialize)]
188struct DropDatabaseOwnedData {
189    // The catalog name
190    catalog: String,
191    // The schema name
192    schema: String,
193    drop_if_exists: bool,
194    state: Box<dyn State>,
195}