Skip to main content

common_meta/ddl/drop_database/
start.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_procedure::Status;
18use serde::{Deserialize, Serialize};
19use snafu::ensure;
20
21use crate::ddl::DdlContext;
22use crate::ddl::drop_database::cursor::DropDatabaseCursor;
23use crate::ddl::drop_database::end::DropDatabaseEnd;
24use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
25use crate::error::{self, Result};
26use crate::key::schema_name::SchemaNameKey;
27
28#[derive(Debug, Serialize, Deserialize)]
29pub(crate) struct DropDatabaseStart;
30
31#[async_trait::async_trait]
32#[typetag::serde]
33impl State for DropDatabaseStart {
34    /// Checks whether schema exists.
35    /// - Early returns if schema not exists and `drop_if_exists` is `true`.
36    /// - Throws an error if schema not exists and `drop_if_exists` is `false`.
37    async fn next(
38        &mut self,
39        ddl_ctx: &DdlContext,
40        ctx: &mut DropDatabaseContext,
41    ) -> Result<(Box<dyn State>, Status)> {
42        let exists = ddl_ctx
43            .table_metadata_manager
44            .schema_manager()
45            .exists(SchemaNameKey {
46                catalog: &ctx.catalog,
47                schema: &ctx.schema,
48            })
49            .await?;
50
51        if !exists && ctx.drop_if_exists {
52            return Ok((Box::new(DropDatabaseEnd), Status::done()));
53        }
54
55        ensure!(
56            exists,
57            error::SchemaNotFoundSnafu {
58                table_schema: &ctx.schema,
59            }
60        );
61
62        Ok((
63            Box::new(DropDatabaseCursor::new(DropTableTarget::Logical)),
64            Status::executing(true),
65        ))
66    }
67
68    fn as_any(&self) -> &dyn Any {
69        self
70    }
71}
72
73#[cfg(test)]
74mod tests {
75    use std::assert_matches;
76    use std::sync::Arc;
77
78    use crate::ddl::drop_database::cursor::DropDatabaseCursor;
79    use crate::ddl::drop_database::end::DropDatabaseEnd;
80    use crate::ddl::drop_database::start::DropDatabaseStart;
81    use crate::ddl::drop_database::{DropDatabaseContext, State};
82    use crate::error;
83    use crate::key::schema_name::SchemaNameKey;
84    use crate::test_util::{MockDatanodeManager, new_ddl_context};
85
86    #[tokio::test]
87    async fn test_schema_not_exists_err() {
88        let node_manager = Arc::new(MockDatanodeManager::new(()));
89        let ddl_context = new_ddl_context(node_manager);
90        let mut step = DropDatabaseStart;
91        let mut ctx = DropDatabaseContext {
92            catalog: "foo".to_string(),
93            schema: "bar".to_string(),
94            drop_if_exists: false,
95            tables: None,
96            retrying: false,
97        };
98        let err = step.next(&ddl_context, &mut ctx).await.unwrap_err();
99        assert_matches!(err, error::Error::SchemaNotFound { .. });
100    }
101
102    #[tokio::test]
103    async fn test_schema_not_exists() {
104        let node_manager = Arc::new(MockDatanodeManager::new(()));
105        let ddl_context = new_ddl_context(node_manager);
106        let mut state = DropDatabaseStart;
107        let mut ctx = DropDatabaseContext {
108            catalog: "foo".to_string(),
109            schema: "bar".to_string(),
110            drop_if_exists: true,
111            tables: None,
112            retrying: false,
113        };
114        let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
115        state.as_any().downcast_ref::<DropDatabaseEnd>().unwrap();
116        assert!(status.is_done());
117    }
118
119    #[tokio::test]
120    async fn test_next() {
121        let node_manager = Arc::new(MockDatanodeManager::new(()));
122        let ddl_context = new_ddl_context(node_manager);
123        ddl_context
124            .table_metadata_manager
125            .schema_manager()
126            .create(SchemaNameKey::new("foo", "bar"), None, true)
127            .await
128            .unwrap();
129        let mut state = DropDatabaseStart;
130        let mut ctx = DropDatabaseContext {
131            catalog: "foo".to_string(),
132            schema: "bar".to_string(),
133            drop_if_exists: false,
134            tables: None,
135            retrying: false,
136        };
137        let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
138        state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
139        assert!(status.need_persist());
140    }
141}