common_meta/ddl/drop_database/
start.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_procedure::Status;
18use serde::{Deserialize, Serialize};
19use snafu::ensure;
20
21use crate::ddl::drop_database::cursor::DropDatabaseCursor;
22use crate::ddl::drop_database::end::DropDatabaseEnd;
23use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
24use crate::ddl::DdlContext;
25use crate::error::{self, Result};
26use crate::key::schema_name::SchemaNameKey;
27
28#[derive(Debug, Serialize, Deserialize)]
29pub(crate) struct DropDatabaseStart;
30
31#[async_trait::async_trait]
32#[typetag::serde]
33impl State for DropDatabaseStart {
34    /// Checks whether schema exists.
35    /// - Early returns if schema not exists and `drop_if_exists` is `true`.
36    /// - Throws an error if schema not exists and `drop_if_exists` is `false`.
37    async fn next(
38        &mut self,
39        ddl_ctx: &DdlContext,
40        ctx: &mut DropDatabaseContext,
41    ) -> Result<(Box<dyn State>, Status)> {
42        let exists = ddl_ctx
43            .table_metadata_manager
44            .schema_manager()
45            .exists(SchemaNameKey {
46                catalog: &ctx.catalog,
47                schema: &ctx.schema,
48            })
49            .await?;
50
51        if !exists && ctx.drop_if_exists {
52            return Ok((Box::new(DropDatabaseEnd), Status::done()));
53        }
54
55        ensure!(
56            exists,
57            error::SchemaNotFoundSnafu {
58                table_schema: &ctx.schema,
59            }
60        );
61
62        Ok((
63            Box::new(DropDatabaseCursor::new(DropTableTarget::Logical)),
64            Status::executing(true),
65        ))
66    }
67
68    fn as_any(&self) -> &dyn Any {
69        self
70    }
71}
72
73#[cfg(test)]
74mod tests {
75    use std::assert_matches::assert_matches;
76    use std::sync::Arc;
77
78    use crate::ddl::drop_database::cursor::DropDatabaseCursor;
79    use crate::ddl::drop_database::end::DropDatabaseEnd;
80    use crate::ddl::drop_database::start::DropDatabaseStart;
81    use crate::ddl::drop_database::{DropDatabaseContext, State};
82    use crate::error;
83    use crate::key::schema_name::SchemaNameKey;
84    use crate::test_util::{new_ddl_context, MockDatanodeManager};
85
86    #[tokio::test]
87    async fn test_schema_not_exists_err() {
88        let node_manager = Arc::new(MockDatanodeManager::new(()));
89        let ddl_context = new_ddl_context(node_manager);
90        let mut step = DropDatabaseStart;
91        let mut ctx = DropDatabaseContext {
92            catalog: "foo".to_string(),
93            schema: "bar".to_string(),
94            drop_if_exists: false,
95            tables: None,
96        };
97        let err = step.next(&ddl_context, &mut ctx).await.unwrap_err();
98        assert_matches!(err, error::Error::SchemaNotFound { .. });
99    }
100
101    #[tokio::test]
102    async fn test_schema_not_exists() {
103        let node_manager = Arc::new(MockDatanodeManager::new(()));
104        let ddl_context = new_ddl_context(node_manager);
105        let mut state = DropDatabaseStart;
106        let mut ctx = DropDatabaseContext {
107            catalog: "foo".to_string(),
108            schema: "bar".to_string(),
109            drop_if_exists: true,
110            tables: None,
111        };
112        let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
113        state.as_any().downcast_ref::<DropDatabaseEnd>().unwrap();
114        assert!(status.is_done());
115    }
116
117    #[tokio::test]
118    async fn test_next() {
119        let node_manager = Arc::new(MockDatanodeManager::new(()));
120        let ddl_context = new_ddl_context(node_manager);
121        ddl_context
122            .table_metadata_manager
123            .schema_manager()
124            .create(SchemaNameKey::new("foo", "bar"), None, true)
125            .await
126            .unwrap();
127        let mut state = DropDatabaseStart;
128        let mut ctx = DropDatabaseContext {
129            catalog: "foo".to_string(),
130            schema: "bar".to_string(),
131            drop_if_exists: false,
132            tables: None,
133        };
134        let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
135        state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
136        assert!(status.need_persist());
137    }
138}