common_meta/ddl/
drop_database.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod cursor;
16pub mod end;
17pub mod executor;
18pub mod metadata;
19pub mod start;
20use std::any::Any;
21use std::fmt::Debug;
22
23use common_error::ext::BoxedError;
24use common_procedure::error::{ExternalSnafu, FromJsonSnafu, ToJsonSnafu};
25use common_procedure::{
26    Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
27};
28use futures::stream::BoxStream;
29use serde::{Deserialize, Serialize};
30use snafu::ResultExt;
31use tonic::async_trait;
32
33use self::start::DropDatabaseStart;
34use crate::ddl::utils::map_to_procedure_error;
35use crate::ddl::DdlContext;
36use crate::error::Result;
37use crate::key::table_name::TableNameValue;
38use crate::lock_key::{CatalogLock, SchemaLock};
39
40pub struct DropDatabaseProcedure {
41    /// The context of procedure runtime.
42    runtime_context: DdlContext,
43    context: DropDatabaseContext,
44
45    state: Box<dyn State>,
46}
47
48/// Target of dropping tables.
49#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
50pub(crate) enum DropTableTarget {
51    Logical,
52    Physical,
53}
54
55/// Context of [DropDatabaseProcedure] execution.
56pub(crate) struct DropDatabaseContext {
57    catalog: String,
58    schema: String,
59    drop_if_exists: bool,
60    tables: Option<BoxStream<'static, Result<(String, TableNameValue)>>>,
61}
62
63#[async_trait::async_trait]
64#[typetag::serde(tag = "drop_database_state")]
65pub(crate) trait State: Send + Debug {
66    /// Yields the next [State] and [Status].
67    async fn next(
68        &mut self,
69        ddl_ctx: &DdlContext,
70        ctx: &mut DropDatabaseContext,
71    ) -> Result<(Box<dyn State>, Status)>;
72
73    /// The hook is called during the recovery.
74    fn recover(&mut self, _ddl_ctx: &DdlContext) -> Result<()> {
75        Ok(())
76    }
77
78    /// Returns as [Any](std::any::Any).
79    fn as_any(&self) -> &dyn Any;
80}
81
82impl DropDatabaseProcedure {
83    pub const TYPE_NAME: &'static str = "metasrv-procedure::DropDatabase";
84
85    pub fn new(catalog: String, schema: String, drop_if_exists: bool, context: DdlContext) -> Self {
86        Self {
87            runtime_context: context,
88            context: DropDatabaseContext {
89                catalog,
90                schema,
91                drop_if_exists,
92                tables: None,
93            },
94            state: Box::new(DropDatabaseStart),
95        }
96    }
97
98    pub fn from_json(json: &str, runtime_context: DdlContext) -> ProcedureResult<Self> {
99        let DropDatabaseOwnedData {
100            catalog,
101            schema,
102            drop_if_exists,
103            state,
104        } = serde_json::from_str(json).context(FromJsonSnafu)?;
105
106        Ok(Self {
107            runtime_context,
108            context: DropDatabaseContext {
109                catalog,
110                schema,
111                drop_if_exists,
112                tables: None,
113            },
114            state,
115        })
116    }
117
118    #[cfg(test)]
119    pub(crate) fn state(&self) -> &dyn State {
120        self.state.as_ref()
121    }
122}
123
124#[async_trait]
125impl Procedure for DropDatabaseProcedure {
126    fn type_name(&self) -> &str {
127        Self::TYPE_NAME
128    }
129
130    fn recover(&mut self) -> ProcedureResult<()> {
131        self.state
132            .recover(&self.runtime_context)
133            .map_err(BoxedError::new)
134            .context(ExternalSnafu {
135                clean_poisons: false,
136            })
137    }
138
139    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
140        let state = &mut self.state;
141
142        let (next, status) = state
143            .next(&self.runtime_context, &mut self.context)
144            .await
145            .map_err(map_to_procedure_error)?;
146
147        *state = next;
148        Ok(status)
149    }
150
151    fn dump(&self) -> ProcedureResult<String> {
152        let data = DropDatabaseData {
153            catalog: &self.context.catalog,
154            schema: &self.context.schema,
155            drop_if_exists: self.context.drop_if_exists,
156            state: self.state.as_ref(),
157        };
158
159        serde_json::to_string(&data).context(ToJsonSnafu)
160    }
161
162    fn lock_key(&self) -> LockKey {
163        let lock_key = vec![
164            CatalogLock::Read(&self.context.catalog).into(),
165            SchemaLock::write(&self.context.catalog, &self.context.schema).into(),
166        ];
167
168        LockKey::new(lock_key)
169    }
170}
171
172#[derive(Debug, Serialize)]
173struct DropDatabaseData<'a> {
174    // The catalog name
175    catalog: &'a str,
176    // The schema name
177    schema: &'a str,
178    drop_if_exists: bool,
179    state: &'a dyn State,
180}
181
182#[derive(Debug, Deserialize)]
183struct DropDatabaseOwnedData {
184    // The catalog name
185    catalog: String,
186    // The schema name
187    schema: String,
188    drop_if_exists: bool,
189    state: Box<dyn State>,
190}