common_meta/ddl/drop_database/
metadata.rs1use std::any::Any;
16
17use common_procedure::Status;
18use serde::{Deserialize, Serialize};
19
20use crate::cache_invalidator::Context;
21use crate::ddl::drop_database::end::DropDatabaseEnd;
22use crate::ddl::drop_database::{DropDatabaseContext, State};
23use crate::ddl::DdlContext;
24use crate::error::Result;
25use crate::instruction::CacheIdent;
26use crate::key::schema_name::{SchemaName, SchemaNameKey};
27
28#[derive(Debug, Serialize, Deserialize)]
29pub(crate) struct DropDatabaseRemoveMetadata;
30
31#[async_trait::async_trait]
32#[typetag::serde]
33impl State for DropDatabaseRemoveMetadata {
34 async fn next(
35 &mut self,
36 ddl_ctx: &DdlContext,
37 ctx: &mut DropDatabaseContext,
38 ) -> Result<(Box<dyn State>, Status)> {
39 ddl_ctx
40 .table_metadata_manager
41 .schema_manager()
42 .delete(SchemaNameKey::new(&ctx.catalog, &ctx.schema))
43 .await?;
44
45 return Ok((Box::new(DropMetadataBroadcast), Status::executing(true)));
46 }
47
48 fn as_any(&self) -> &dyn Any {
49 self
50 }
51}
52
53#[derive(Debug, Serialize, Deserialize)]
54pub(crate) struct DropMetadataBroadcast;
55
56impl DropMetadataBroadcast {
57 async fn invalidate_schema_cache(
59 &self,
60 ddl_ctx: &DdlContext,
61 db_ctx: &mut DropDatabaseContext,
62 ) -> Result<()> {
63 let cache_invalidator = &ddl_ctx.cache_invalidator;
64 let ctx = Context {
65 subject: Some("Invalidate schema cache by dropping database".to_string()),
66 };
67
68 cache_invalidator
69 .invalidate(
70 &ctx,
71 &[CacheIdent::SchemaName(SchemaName {
72 catalog_name: db_ctx.catalog.clone(),
73 schema_name: db_ctx.schema.clone(),
74 })],
75 )
76 .await?;
77
78 Ok(())
79 }
80}
81
82#[async_trait::async_trait]
83#[typetag::serde]
84impl State for DropMetadataBroadcast {
85 async fn next(
86 &mut self,
87 ddl_ctx: &DdlContext,
88 ctx: &mut DropDatabaseContext,
89 ) -> Result<(Box<dyn State>, Status)> {
90 self.invalidate_schema_cache(ddl_ctx, ctx).await?;
91 Ok((Box::new(DropDatabaseEnd), Status::done()))
92 }
93
94 fn as_any(&self) -> &dyn Any {
95 self
96 }
97}
98
99#[cfg(test)]
100mod tests {
101 use std::sync::Arc;
102
103 use crate::ddl::drop_database::end::DropDatabaseEnd;
104 use crate::ddl::drop_database::metadata::{DropDatabaseRemoveMetadata, DropMetadataBroadcast};
105 use crate::ddl::drop_database::{DropDatabaseContext, State};
106 use crate::key::schema_name::SchemaNameKey;
107 use crate::test_util::{new_ddl_context, MockDatanodeManager};
108
109 #[tokio::test]
110 async fn test_next() {
111 let node_manager = Arc::new(MockDatanodeManager::new(()));
112 let ddl_context = new_ddl_context(node_manager);
113 ddl_context
114 .table_metadata_manager
115 .schema_manager()
116 .create(SchemaNameKey::new("foo", "bar"), None, true)
117 .await
118 .unwrap();
119 let mut state = DropDatabaseRemoveMetadata;
120 let mut ctx = DropDatabaseContext {
121 catalog: "foo".to_string(),
122 schema: "bar".to_string(),
123 drop_if_exists: true,
124 tables: None,
125 };
126 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
127 state
128 .as_any()
129 .downcast_ref::<DropMetadataBroadcast>()
130 .unwrap();
131 assert!(!status.is_done());
132 assert!(!ddl_context
133 .table_metadata_manager
134 .schema_manager()
135 .exists(SchemaNameKey::new("foo", "bar"))
136 .await
137 .unwrap());
138
139 let mut state = DropMetadataBroadcast;
140 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
141 state.as_any().downcast_ref::<DropDatabaseEnd>().unwrap();
142 assert!(status.is_done());
143
144 let mut state = DropDatabaseRemoveMetadata;
146 let mut ctx = DropDatabaseContext {
147 catalog: "foo".to_string(),
148 schema: "bar".to_string(),
149 drop_if_exists: true,
150 tables: None,
151 };
152 let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
153 state
154 .as_any()
155 .downcast_ref::<DropMetadataBroadcast>()
156 .unwrap();
157 assert!(!status.is_done());
158 }
159}