common_meta/ddl/drop_database/
executor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_procedure::Status;
18use common_telemetry::info;
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use table::metadata::TableId;
22use table::table_name::TableName;
23
24use crate::ddl::drop_database::cursor::DropDatabaseCursor;
25use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
26use crate::ddl::drop_table::executor::DropTableExecutor;
27use crate::ddl::utils::get_region_wal_options;
28use crate::ddl::DdlContext;
29use crate::error::{self, Result};
30use crate::key::table_route::TableRouteValue;
31use crate::region_keeper::OperatingRegionGuard;
32use crate::rpc::router::{operating_leader_regions, RegionRoute};
33
34#[derive(Debug, Serialize, Deserialize)]
35pub(crate) struct DropDatabaseExecutor {
36    table_id: TableId,
37    physical_table_id: TableId,
38    table_name: TableName,
39    /// The physical table region routes.
40    pub(crate) physical_region_routes: Vec<RegionRoute>,
41    pub(crate) target: DropTableTarget,
42    #[serde(skip)]
43    dropping_regions: Vec<OperatingRegionGuard>,
44}
45
46impl DropDatabaseExecutor {
47    /// Returns a new [DropDatabaseExecutor].
48    pub fn new(
49        table_id: TableId,
50        physical_table_id: TableId,
51        table_name: TableName,
52        physical_region_routes: Vec<RegionRoute>,
53        target: DropTableTarget,
54    ) -> Self {
55        Self {
56            table_id,
57            physical_table_id,
58            table_name,
59            physical_region_routes,
60            target,
61            dropping_regions: vec![],
62        }
63    }
64}
65
66impl DropDatabaseExecutor {
67    /// Registers the operating regions.
68    pub(crate) fn register_dropping_regions(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
69        if !self.dropping_regions.is_empty() {
70            return Ok(());
71        }
72        let dropping_regions = operating_leader_regions(&self.physical_region_routes);
73        let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
74        for (region_id, datanode_id) in dropping_regions {
75            let guard = ddl_ctx
76                .memory_region_keeper
77                .register(datanode_id, region_id)
78                .context(error::RegionOperatingRaceSnafu {
79                    region_id,
80                    peer_id: datanode_id,
81                })?;
82            dropping_region_guards.push(guard);
83        }
84        self.dropping_regions = dropping_region_guards;
85        Ok(())
86    }
87}
88
89#[async_trait::async_trait]
90#[typetag::serde]
91impl State for DropDatabaseExecutor {
92    fn recover(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
93        self.register_dropping_regions(ddl_ctx)
94    }
95
96    async fn next(
97        &mut self,
98        ddl_ctx: &DdlContext,
99        _ctx: &mut DropDatabaseContext,
100    ) -> Result<(Box<dyn State>, Status)> {
101        self.register_dropping_regions(ddl_ctx)?;
102        let executor = DropTableExecutor::new(self.table_name.clone(), self.table_id, true);
103        // Deletes metadata for table permanently.
104        let table_route_value = TableRouteValue::new(
105            self.table_id,
106            self.physical_table_id,
107            self.physical_region_routes.clone(),
108        );
109
110        // Deletes topic-region mapping if dropping physical table
111        let region_wal_options = get_region_wal_options(
112            &ddl_ctx.table_metadata_manager,
113            &table_route_value,
114            self.physical_table_id,
115        )
116        .await?;
117
118        executor
119            .on_destroy_metadata(ddl_ctx, &table_route_value, &region_wal_options)
120            .await?;
121        executor.invalidate_table_cache(ddl_ctx).await?;
122        executor
123            .on_drop_regions(ddl_ctx, &self.physical_region_routes, true)
124            .await?;
125        info!("Table: {}({}) is dropped", self.table_name, self.table_id);
126
127        Ok((
128            Box::new(DropDatabaseCursor::new(self.target)),
129            Status::executing(false),
130        ))
131    }
132
133    fn as_any(&self) -> &dyn Any {
134        self
135    }
136}
137
138#[cfg(test)]
139mod tests {
140    use std::sync::Arc;
141
142    use api::region::RegionResponse;
143    use api::v1::region::RegionRequest;
144    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
145    use common_error::ext::BoxedError;
146    use common_query::request::QueryRequest;
147    use common_recordbatch::SendableRecordBatchStream;
148    use table::table_name::TableName;
149
150    use crate::ddl::drop_database::cursor::DropDatabaseCursor;
151    use crate::ddl::drop_database::executor::DropDatabaseExecutor;
152    use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
153    use crate::ddl::test_util::{create_logical_table, create_physical_table};
154    use crate::error::{self, Error, Result};
155    use crate::key::datanode_table::DatanodeTableKey;
156    use crate::peer::Peer;
157    use crate::rpc::router::region_distribution;
158    use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager};
159
160    #[derive(Clone)]
161    pub struct NaiveDatanodeHandler;
162
163    #[async_trait::async_trait]
164    impl MockDatanodeHandler for NaiveDatanodeHandler {
165        async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
166            Ok(RegionResponse::new(0))
167        }
168
169        async fn handle_query(
170            &self,
171            _peer: &Peer,
172            _request: QueryRequest,
173        ) -> Result<SendableRecordBatchStream> {
174            unreachable!()
175        }
176    }
177
178    #[tokio::test]
179    async fn test_next_with_physical_table() {
180        let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
181        let ddl_context = new_ddl_context(node_manager);
182        let physical_table_id = create_physical_table(&ddl_context, "phy").await;
183        let (_, table_route) = ddl_context
184            .table_metadata_manager
185            .table_route_manager()
186            .get_physical_table_route(physical_table_id)
187            .await
188            .unwrap();
189        {
190            let mut state = DropDatabaseExecutor::new(
191                physical_table_id,
192                physical_table_id,
193                TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
194                table_route.region_routes.clone(),
195                DropTableTarget::Physical,
196            );
197            let mut ctx = DropDatabaseContext {
198                catalog: DEFAULT_CATALOG_NAME.to_string(),
199                schema: DEFAULT_SCHEMA_NAME.to_string(),
200                drop_if_exists: false,
201                tables: None,
202            };
203            let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
204            assert!(!status.need_persist());
205            let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
206            assert_eq!(cursor.target, DropTableTarget::Physical);
207        }
208        // Execute again
209        let mut ctx = DropDatabaseContext {
210            catalog: DEFAULT_CATALOG_NAME.to_string(),
211            schema: DEFAULT_SCHEMA_NAME.to_string(),
212            drop_if_exists: false,
213            tables: None,
214        };
215        let mut state = DropDatabaseExecutor::new(
216            physical_table_id,
217            physical_table_id,
218            TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
219            table_route.region_routes.clone(),
220            DropTableTarget::Physical,
221        );
222        let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
223        assert!(!status.need_persist());
224        let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
225        assert_eq!(cursor.target, DropTableTarget::Physical);
226    }
227
228    #[tokio::test]
229    async fn test_next_logical_table() {
230        let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
231        let ddl_context = new_ddl_context(node_manager);
232        let physical_table_id = create_physical_table(&ddl_context, "phy").await;
233        create_logical_table(ddl_context.clone(), physical_table_id, "metric").await;
234        let logical_table_id = physical_table_id + 1;
235        let (_, table_route) = ddl_context
236            .table_metadata_manager
237            .table_route_manager()
238            .get_physical_table_route(logical_table_id)
239            .await
240            .unwrap();
241        {
242            let mut state = DropDatabaseExecutor::new(
243                logical_table_id,
244                physical_table_id,
245                TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"),
246                table_route.region_routes.clone(),
247                DropTableTarget::Logical,
248            );
249            let mut ctx = DropDatabaseContext {
250                catalog: DEFAULT_CATALOG_NAME.to_string(),
251                schema: DEFAULT_SCHEMA_NAME.to_string(),
252                drop_if_exists: false,
253                tables: None,
254            };
255            let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
256            assert!(!status.need_persist());
257            let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
258            assert_eq!(cursor.target, DropTableTarget::Logical);
259        }
260        // Execute again
261        let mut ctx = DropDatabaseContext {
262            catalog: DEFAULT_CATALOG_NAME.to_string(),
263            schema: DEFAULT_SCHEMA_NAME.to_string(),
264            drop_if_exists: false,
265            tables: None,
266        };
267        let mut state = DropDatabaseExecutor::new(
268            logical_table_id,
269            physical_table_id,
270            TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"),
271            table_route.region_routes,
272            DropTableTarget::Logical,
273        );
274        let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
275        assert!(!status.need_persist());
276        let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
277        assert_eq!(cursor.target, DropTableTarget::Logical);
278        // Checks table info
279        ddl_context
280            .table_metadata_manager
281            .table_info_manager()
282            .get(physical_table_id)
283            .await
284            .unwrap()
285            .unwrap();
286        // Checks table route
287        let table_route = ddl_context
288            .table_metadata_manager
289            .table_route_manager()
290            .table_route_storage()
291            .get(physical_table_id)
292            .await
293            .unwrap()
294            .unwrap();
295        let region_routes = table_route.region_routes().unwrap();
296        for datanode_id in region_distribution(region_routes).into_keys() {
297            ddl_context
298                .table_metadata_manager
299                .datanode_table_manager()
300                .get(&DatanodeTableKey::new(datanode_id, physical_table_id))
301                .await
302                .unwrap()
303                .unwrap();
304        }
305    }
306
307    #[derive(Clone)]
308    pub struct RetryErrorDatanodeHandler;
309
310    #[async_trait::async_trait]
311    impl MockDatanodeHandler for RetryErrorDatanodeHandler {
312        async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
313            Err(Error::RetryLater {
314                source: BoxedError::new(
315                    error::UnexpectedSnafu {
316                        err_msg: "retry later",
317                    }
318                    .build(),
319                ),
320                clean_poisons: false,
321            })
322        }
323
324        async fn handle_query(
325            &self,
326            _peer: &Peer,
327            _request: QueryRequest,
328        ) -> Result<SendableRecordBatchStream> {
329            unreachable!()
330        }
331    }
332
333    #[tokio::test]
334    async fn test_next_retryable_err() {
335        let node_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler));
336        let ddl_context = new_ddl_context(node_manager);
337        let physical_table_id = create_physical_table(&ddl_context, "phy").await;
338        let (_, table_route) = ddl_context
339            .table_metadata_manager
340            .table_route_manager()
341            .get_physical_table_route(physical_table_id)
342            .await
343            .unwrap();
344        let mut state = DropDatabaseExecutor::new(
345            physical_table_id,
346            physical_table_id,
347            TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
348            table_route.region_routes,
349            DropTableTarget::Physical,
350        );
351        let mut ctx = DropDatabaseContext {
352            catalog: DEFAULT_CATALOG_NAME.to_string(),
353            schema: DEFAULT_SCHEMA_NAME.to_string(),
354            drop_if_exists: false,
355            tables: None,
356        };
357        let err = state.next(&ddl_context, &mut ctx).await.unwrap_err();
358        assert!(err.is_retry_later());
359    }
360
361    #[tokio::test]
362    async fn test_on_recovery() {
363        let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
364        let ddl_context = new_ddl_context(node_manager);
365        let physical_table_id = create_physical_table(&ddl_context, "phy").await;
366        let (_, table_route) = ddl_context
367            .table_metadata_manager
368            .table_route_manager()
369            .get_physical_table_route(physical_table_id)
370            .await
371            .unwrap();
372        {
373            let mut state = DropDatabaseExecutor::new(
374                physical_table_id,
375                physical_table_id,
376                TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
377                table_route.region_routes.clone(),
378                DropTableTarget::Physical,
379            );
380            let mut ctx = DropDatabaseContext {
381                catalog: DEFAULT_CATALOG_NAME.to_string(),
382                schema: DEFAULT_SCHEMA_NAME.to_string(),
383                drop_if_exists: false,
384                tables: None,
385            };
386            state.recover(&ddl_context).unwrap();
387            assert_eq!(state.dropping_regions.len(), 1);
388            let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
389            assert!(!status.need_persist());
390            let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
391            assert_eq!(cursor.target, DropTableTarget::Physical);
392        }
393    }
394}