common_meta/ddl/drop_database/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_procedure::Status;
18use serde::{Deserialize, Serialize};
19
20use crate::cache_invalidator::Context;
21use crate::ddl::drop_database::end::DropDatabaseEnd;
22use crate::ddl::drop_database::{DropDatabaseContext, State};
23use crate::ddl::DdlContext;
24use crate::error::Result;
25use crate::instruction::CacheIdent;
26use crate::key::schema_name::{SchemaName, SchemaNameKey};
27
28#[derive(Debug, Serialize, Deserialize)]
29pub(crate) struct DropDatabaseRemoveMetadata;
30
31#[async_trait::async_trait]
32#[typetag::serde]
33impl State for DropDatabaseRemoveMetadata {
34    async fn next(
35        &mut self,
36        ddl_ctx: &DdlContext,
37        ctx: &mut DropDatabaseContext,
38    ) -> Result<(Box<dyn State>, Status)> {
39        ddl_ctx
40            .table_metadata_manager
41            .schema_manager()
42            .delete(SchemaNameKey::new(&ctx.catalog, &ctx.schema))
43            .await?;
44
45        return Ok((Box::new(DropMetadataBroadcast), Status::executing(true)));
46    }
47
48    fn as_any(&self) -> &dyn Any {
49        self
50    }
51}
52
53#[derive(Debug, Serialize, Deserialize)]
54pub(crate) struct DropMetadataBroadcast;
55
56impl DropMetadataBroadcast {
57    /// Invalidates frontend caches
58    async fn invalidate_schema_cache(
59        &self,
60        ddl_ctx: &DdlContext,
61        db_ctx: &mut DropDatabaseContext,
62    ) -> Result<()> {
63        let cache_invalidator = &ddl_ctx.cache_invalidator;
64        let ctx = Context {
65            subject: Some("Invalidate schema cache by dropping database".to_string()),
66        };
67
68        cache_invalidator
69            .invalidate(
70                &ctx,
71                &[CacheIdent::SchemaName(SchemaName {
72                    catalog_name: db_ctx.catalog.clone(),
73                    schema_name: db_ctx.schema.clone(),
74                })],
75            )
76            .await?;
77
78        Ok(())
79    }
80}
81
82#[async_trait::async_trait]
83#[typetag::serde]
84impl State for DropMetadataBroadcast {
85    async fn next(
86        &mut self,
87        ddl_ctx: &DdlContext,
88        ctx: &mut DropDatabaseContext,
89    ) -> Result<(Box<dyn State>, Status)> {
90        self.invalidate_schema_cache(ddl_ctx, ctx).await?;
91        Ok((Box::new(DropDatabaseEnd), Status::done()))
92    }
93
94    fn as_any(&self) -> &dyn Any {
95        self
96    }
97}
98
99#[cfg(test)]
100mod tests {
101    use std::sync::Arc;
102
103    use crate::ddl::drop_database::end::DropDatabaseEnd;
104    use crate::ddl::drop_database::metadata::{DropDatabaseRemoveMetadata, DropMetadataBroadcast};
105    use crate::ddl::drop_database::{DropDatabaseContext, State};
106    use crate::key::schema_name::SchemaNameKey;
107    use crate::test_util::{new_ddl_context, MockDatanodeManager};
108
109    #[tokio::test]
110    async fn test_next() {
111        let node_manager = Arc::new(MockDatanodeManager::new(()));
112        let ddl_context = new_ddl_context(node_manager);
113        ddl_context
114            .table_metadata_manager
115            .schema_manager()
116            .create(SchemaNameKey::new("foo", "bar"), None, true)
117            .await
118            .unwrap();
119        let mut state = DropDatabaseRemoveMetadata;
120        let mut ctx = DropDatabaseContext {
121            catalog: "foo".to_string(),
122            schema: "bar".to_string(),
123            drop_if_exists: true,
124            tables: None,
125        };
126        let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
127        state
128            .as_any()
129            .downcast_ref::<DropMetadataBroadcast>()
130            .unwrap();
131        assert!(!status.is_done());
132        assert!(!ddl_context
133            .table_metadata_manager
134            .schema_manager()
135            .exists(SchemaNameKey::new("foo", "bar"))
136            .await
137            .unwrap());
138
139        let mut state = DropMetadataBroadcast;
140        let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
141        state.as_any().downcast_ref::<DropDatabaseEnd>().unwrap();
142        assert!(status.is_done());
143
144        // Schema not exists
145        let mut state = DropDatabaseRemoveMetadata;
146        let mut ctx = DropDatabaseContext {
147            catalog: "foo".to_string(),
148            schema: "bar".to_string(),
149            drop_if_exists: true,
150            tables: None,
151        };
152        let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
153        state
154            .as_any()
155            .downcast_ref::<DropMetadataBroadcast>()
156            .unwrap();
157        assert!(!status.is_done());
158    }
159}