common_meta/ddl/drop_database/
cursor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_catalog::format_full_table_name;
18use common_procedure::Status;
19use futures::TryStreamExt;
20use serde::{Deserialize, Serialize};
21use snafu::OptionExt;
22use table::metadata::{TableId, TableType};
23use table::table_name::TableName;
24
25use crate::cache_invalidator::Context;
26use crate::ddl::drop_database::executor::DropDatabaseExecutor;
27use crate::ddl::drop_database::metadata::DropDatabaseRemoveMetadata;
28use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
29use crate::ddl::DdlContext;
30use crate::error::{Result, TableInfoNotFoundSnafu};
31use crate::instruction::CacheIdent;
32use crate::key::table_route::TableRouteValue;
33
34#[derive(Debug, Serialize, Deserialize)]
35pub(crate) struct DropDatabaseCursor {
36    pub(crate) target: DropTableTarget,
37}
38
39impl DropDatabaseCursor {
40    /// Returns a new [DropDatabaseCursor].
41    pub fn new(target: DropTableTarget) -> Self {
42        Self { target }
43    }
44
45    fn handle_reach_end(
46        &mut self,
47        ctx: &mut DropDatabaseContext,
48    ) -> Result<(Box<dyn State>, Status)> {
49        // Consumes the tables stream.
50        ctx.tables.take();
51        match self.target {
52            DropTableTarget::Logical => Ok((
53                Box::new(DropDatabaseCursor::new(DropTableTarget::Physical)),
54                Status::executing(true),
55            )),
56            DropTableTarget::Physical => Ok((
57                Box::new(DropDatabaseRemoveMetadata),
58                Status::executing(true),
59            )),
60        }
61    }
62
63    async fn handle_table(
64        &mut self,
65        ddl_ctx: &DdlContext,
66        ctx: &mut DropDatabaseContext,
67        table_name: String,
68        table_id: TableId,
69        table_route_value: TableRouteValue,
70    ) -> Result<(Box<dyn State>, Status)> {
71        match (self.target, table_route_value) {
72            (DropTableTarget::Logical, TableRouteValue::Logical(route)) => {
73                let physical_table_id = route.physical_table_id();
74
75                let (_, table_route) = ddl_ctx
76                    .table_metadata_manager
77                    .table_route_manager()
78                    .get_physical_table_route(physical_table_id)
79                    .await?;
80                Ok((
81                    Box::new(DropDatabaseExecutor::new(
82                        table_id,
83                        table_id,
84                        TableName::new(&ctx.catalog, &ctx.schema, &table_name),
85                        table_route.region_routes,
86                        self.target,
87                    )),
88                    Status::executing(true),
89                ))
90            }
91            (DropTableTarget::Physical, TableRouteValue::Physical(table_route)) => Ok((
92                Box::new(DropDatabaseExecutor::new(
93                    table_id,
94                    table_id,
95                    TableName::new(&ctx.catalog, &ctx.schema, &table_name),
96                    table_route.region_routes,
97                    self.target,
98                )),
99                Status::executing(true),
100            )),
101            _ => Ok((
102                Box::new(DropDatabaseCursor::new(self.target)),
103                Status::executing(false),
104            )),
105        }
106    }
107
108    async fn handle_view(
109        &self,
110        ddl_ctx: &DdlContext,
111        ctx: &mut DropDatabaseContext,
112        table_name: String,
113        table_id: TableId,
114    ) -> Result<(Box<dyn State>, Status)> {
115        let view_name = TableName::new(&ctx.catalog, &ctx.schema, &table_name);
116        ddl_ctx
117            .table_metadata_manager
118            .destroy_view_info(table_id, &view_name)
119            .await?;
120
121        let cache_invalidator = &ddl_ctx.cache_invalidator;
122        let ctx = Context {
123            subject: Some("Invalidate table cache by dropping table".to_string()),
124        };
125
126        cache_invalidator
127            .invalidate(
128                &ctx,
129                &[
130                    CacheIdent::TableName(view_name),
131                    CacheIdent::TableId(table_id),
132                ],
133            )
134            .await?;
135
136        Ok((
137            Box::new(DropDatabaseCursor::new(self.target)),
138            Status::executing(false),
139        ))
140    }
141}
142
143#[async_trait::async_trait]
144#[typetag::serde]
145impl State for DropDatabaseCursor {
146    async fn next(
147        &mut self,
148        ddl_ctx: &DdlContext,
149        ctx: &mut DropDatabaseContext,
150    ) -> Result<(Box<dyn State>, Status)> {
151        if ctx.tables.as_deref().is_none() {
152            let tables = ddl_ctx
153                .table_metadata_manager
154                .table_name_manager()
155                .tables(&ctx.catalog, &ctx.schema);
156            ctx.tables = Some(tables);
157        }
158        // Safety: must exist
159        match ctx.tables.as_mut().unwrap().try_next().await? {
160            Some((table_name, table_name_value)) => {
161                let table_id = table_name_value.table_id();
162
163                let table_info_value = ddl_ctx
164                    .table_metadata_manager
165                    .table_info_manager()
166                    .get(table_id)
167                    .await?
168                    .with_context(|| TableInfoNotFoundSnafu {
169                        table: format_full_table_name(&ctx.catalog, &ctx.schema, &table_name),
170                    })?;
171
172                if table_info_value.table_info.table_type == TableType::View {
173                    return self.handle_view(ddl_ctx, ctx, table_name, table_id).await;
174                }
175
176                match ddl_ctx
177                    .table_metadata_manager
178                    .table_route_manager()
179                    .table_route_storage()
180                    .get(table_id)
181                    .await?
182                {
183                    Some(table_route_value) => {
184                        self.handle_table(ddl_ctx, ctx, table_name, table_id, table_route_value)
185                            .await
186                    }
187                    None => Ok((
188                        Box::new(DropDatabaseCursor::new(self.target)),
189                        Status::executing(false),
190                    )),
191                }
192            }
193            None => self.handle_reach_end(ctx),
194        }
195    }
196
197    fn as_any(&self) -> &dyn Any {
198        self
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use std::sync::Arc;
205
206    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
207
208    use crate::ddl::drop_database::cursor::DropDatabaseCursor;
209    use crate::ddl::drop_database::executor::DropDatabaseExecutor;
210    use crate::ddl::drop_database::metadata::DropDatabaseRemoveMetadata;
211    use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
212    use crate::ddl::test_util::{create_logical_table, create_physical_table};
213    use crate::test_util::{new_ddl_context, MockDatanodeManager};
214
215    #[tokio::test]
216    async fn test_next_without_logical_tables() {
217        let node_manager = Arc::new(MockDatanodeManager::new(()));
218        let ddl_context = new_ddl_context(node_manager);
219        create_physical_table(&ddl_context, "phy").await;
220        // It always starts from Logical
221        let mut state = DropDatabaseCursor::new(DropTableTarget::Logical);
222        let mut ctx = DropDatabaseContext {
223            catalog: DEFAULT_CATALOG_NAME.to_string(),
224            schema: DEFAULT_SCHEMA_NAME.to_string(),
225            drop_if_exists: false,
226            tables: None,
227        };
228        // Ticks
229        let (mut state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
230        assert!(!status.need_persist());
231        let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
232        assert_eq!(cursor.target, DropTableTarget::Logical);
233        // Ticks
234        let (mut state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
235        assert!(status.need_persist());
236        assert!(ctx.tables.is_none());
237        let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
238        assert_eq!(cursor.target, DropTableTarget::Physical);
239        // Ticks
240        let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
241        assert!(status.need_persist());
242        let executor = state
243            .as_any()
244            .downcast_ref::<DropDatabaseExecutor>()
245            .unwrap();
246        assert_eq!(executor.target, DropTableTarget::Physical);
247    }
248
249    #[tokio::test]
250    async fn test_next_with_logical_tables() {
251        let node_manager = Arc::new(MockDatanodeManager::new(()));
252        let ddl_context = new_ddl_context(node_manager);
253        let physical_table_id = create_physical_table(&ddl_context, "phy").await;
254        create_logical_table(ddl_context.clone(), physical_table_id, "metric_0").await;
255        // It always starts from Logical
256        let mut state = DropDatabaseCursor::new(DropTableTarget::Logical);
257        let mut ctx = DropDatabaseContext {
258            catalog: DEFAULT_CATALOG_NAME.to_string(),
259            schema: DEFAULT_SCHEMA_NAME.to_string(),
260            drop_if_exists: false,
261            tables: None,
262        };
263        // Ticks
264        let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
265        assert!(status.need_persist());
266        let executor = state
267            .as_any()
268            .downcast_ref::<DropDatabaseExecutor>()
269            .unwrap();
270        let (_, table_route) = ddl_context
271            .table_metadata_manager
272            .table_route_manager()
273            .get_physical_table_route(physical_table_id)
274            .await
275            .unwrap();
276        assert_eq!(table_route.region_routes, executor.physical_region_routes);
277        assert_eq!(executor.target, DropTableTarget::Logical);
278    }
279
280    #[tokio::test]
281    async fn test_reach_the_end() {
282        let node_manager = Arc::new(MockDatanodeManager::new(()));
283        let ddl_context = new_ddl_context(node_manager);
284        let mut state = DropDatabaseCursor::new(DropTableTarget::Physical);
285        let mut ctx = DropDatabaseContext {
286            catalog: DEFAULT_CATALOG_NAME.to_string(),
287            schema: DEFAULT_SCHEMA_NAME.to_string(),
288            drop_if_exists: false,
289            tables: None,
290        };
291        // Ticks
292        let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
293        assert!(status.need_persist());
294        state
295            .as_any()
296            .downcast_ref::<DropDatabaseRemoveMetadata>()
297            .unwrap();
298        assert!(ctx.tables.is_none());
299    }
300}