Skip to main content

common_meta/ddl/
drop_database.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod cursor;
16pub mod end;
17pub mod executor;
18pub mod metadata;
19pub mod start;
20use std::any::Any;
21use std::fmt::Debug;
22
23use common_error::ext::BoxedError;
24use common_procedure::error::{ExternalSnafu, FromJsonSnafu, ToJsonSnafu};
25use common_procedure::{
26    Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
27};
28use futures::stream::BoxStream;
29use serde::{Deserialize, Serialize};
30use snafu::ResultExt;
31use tonic::async_trait;
32
33use self::start::DropDatabaseStart;
34use crate::ddl::DdlContext;
35use crate::ddl::utils::map_to_procedure_error;
36use crate::error::Result;
37use crate::key::table_name::TableNameValue;
38use crate::lock_key::{CatalogLock, SchemaLock};
39
40pub struct DropDatabaseProcedure {
41    /// The context of procedure runtime.
42    runtime_context: DdlContext,
43    context: DropDatabaseContext,
44
45    state: Box<dyn State>,
46}
47
48/// Target of dropping tables.
49#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
50pub(crate) enum DropTableTarget {
51    Logical,
52    Physical,
53}
54
55/// Context of [DropDatabaseProcedure] execution.
56pub(crate) struct DropDatabaseContext {
57    catalog: String,
58    schema: String,
59    drop_if_exists: bool,
60    tables: Option<BoxStream<'static, Result<(String, TableNameValue)>>>,
61    retrying: bool,
62}
63
64#[async_trait::async_trait]
65#[typetag::serde(tag = "drop_database_state")]
66pub(crate) trait State: Send + Debug {
67    /// Yields the next [State] and [Status].
68    async fn next(
69        &mut self,
70        ddl_ctx: &DdlContext,
71        ctx: &mut DropDatabaseContext,
72    ) -> Result<(Box<dyn State>, Status)>;
73
74    /// The hook is called during the recovery.
75    fn recover(&mut self, _ddl_ctx: &DdlContext) -> Result<()> {
76        Ok(())
77    }
78
79    /// Returns as [Any](std::any::Any).
80    fn as_any(&self) -> &dyn Any;
81}
82
83impl DropDatabaseProcedure {
84    pub const TYPE_NAME: &'static str = "metasrv-procedure::DropDatabase";
85
86    pub fn new(catalog: String, schema: String, drop_if_exists: bool, context: DdlContext) -> Self {
87        Self {
88            runtime_context: context,
89            context: DropDatabaseContext {
90                catalog,
91                schema,
92                drop_if_exists,
93                tables: None,
94                retrying: false,
95            },
96            state: Box::new(DropDatabaseStart),
97        }
98    }
99
100    pub fn from_json(json: &str, runtime_context: DdlContext) -> ProcedureResult<Self> {
101        let DropDatabaseOwnedData {
102            catalog,
103            schema,
104            drop_if_exists,
105            state,
106        } = serde_json::from_str(json).context(FromJsonSnafu)?;
107
108        Ok(Self {
109            runtime_context,
110            context: DropDatabaseContext {
111                catalog,
112                schema,
113                drop_if_exists,
114                tables: None,
115                retrying: false,
116            },
117            state,
118        })
119    }
120
121    #[cfg(test)]
122    pub(crate) fn state(&self) -> &dyn State {
123        self.state.as_ref()
124    }
125}
126
127#[async_trait]
128impl Procedure for DropDatabaseProcedure {
129    fn type_name(&self) -> &str {
130        Self::TYPE_NAME
131    }
132
133    fn recover(&mut self) -> ProcedureResult<()> {
134        self.state
135            .recover(&self.runtime_context)
136            .map_err(BoxedError::new)
137            .context(ExternalSnafu {
138                clean_poisons: false,
139            })
140    }
141
142    async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
143        let state = &mut self.state;
144
145        self.context.retrying = ctx.is_retrying().await.unwrap_or(false);
146        let (next, status) = state
147            .next(&self.runtime_context, &mut self.context)
148            .await
149            .map_err(map_to_procedure_error)?;
150
151        *state = next;
152        Ok(status)
153    }
154
155    fn dump(&self) -> ProcedureResult<String> {
156        let data = DropDatabaseData {
157            catalog: &self.context.catalog,
158            schema: &self.context.schema,
159            drop_if_exists: self.context.drop_if_exists,
160            state: self.state.as_ref(),
161        };
162
163        serde_json::to_string(&data).context(ToJsonSnafu)
164    }
165
166    fn lock_key(&self) -> LockKey {
167        let lock_key = vec![
168            CatalogLock::Read(&self.context.catalog).into(),
169            SchemaLock::write(&self.context.catalog, &self.context.schema).into(),
170        ];
171
172        LockKey::new(lock_key)
173    }
174}
175
176#[derive(Debug, Serialize)]
177struct DropDatabaseData<'a> {
178    // The catalog name
179    catalog: &'a str,
180    // The schema name
181    schema: &'a str,
182    drop_if_exists: bool,
183    state: &'a dyn State,
184}
185
186#[derive(Debug, Deserialize)]
187struct DropDatabaseOwnedData {
188    // The catalog name
189    catalog: String,
190    // The schema name
191    schema: String,
192    drop_if_exists: bool,
193    state: Box<dyn State>,
194}