common_meta/ddl/drop_database/
executor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17
18use common_procedure::Status;
19use common_telemetry::info;
20use serde::{Deserialize, Serialize};
21use snafu::OptionExt;
22use table::metadata::TableId;
23use table::table_name::TableName;
24
25use crate::ddl::drop_database::cursor::DropDatabaseCursor;
26use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
27use crate::ddl::drop_table::executor::DropTableExecutor;
28use crate::ddl::utils::extract_region_wal_options;
29use crate::ddl::DdlContext;
30use crate::error::{self, Result};
31use crate::key::table_route::TableRouteValue;
32use crate::region_keeper::OperatingRegionGuard;
33use crate::rpc::router::{operating_leader_regions, RegionRoute};
34
35#[derive(Debug, Serialize, Deserialize)]
36pub(crate) struct DropDatabaseExecutor {
37    table_id: TableId,
38    physical_table_id: TableId,
39    table_name: TableName,
40    /// The physical table region routes.
41    pub(crate) physical_region_routes: Vec<RegionRoute>,
42    pub(crate) target: DropTableTarget,
43    #[serde(skip)]
44    dropping_regions: Vec<OperatingRegionGuard>,
45}
46
47impl DropDatabaseExecutor {
48    /// Returns a new [DropDatabaseExecutor].
49    pub fn new(
50        table_id: TableId,
51        physical_table_id: TableId,
52        table_name: TableName,
53        physical_region_routes: Vec<RegionRoute>,
54        target: DropTableTarget,
55    ) -> Self {
56        Self {
57            table_id,
58            physical_table_id,
59            table_name,
60            physical_region_routes,
61            target,
62            dropping_regions: vec![],
63        }
64    }
65}
66
67impl DropDatabaseExecutor {
68    /// Registers the operating regions.
69    pub(crate) fn register_dropping_regions(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
70        if !self.dropping_regions.is_empty() {
71            return Ok(());
72        }
73        let dropping_regions = operating_leader_regions(&self.physical_region_routes);
74        let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
75        for (region_id, datanode_id) in dropping_regions {
76            let guard = ddl_ctx
77                .memory_region_keeper
78                .register(datanode_id, region_id)
79                .context(error::RegionOperatingRaceSnafu {
80                    region_id,
81                    peer_id: datanode_id,
82                })?;
83            dropping_region_guards.push(guard);
84        }
85        self.dropping_regions = dropping_region_guards;
86        Ok(())
87    }
88}
89
90#[async_trait::async_trait]
91#[typetag::serde]
92impl State for DropDatabaseExecutor {
93    fn recover(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
94        self.register_dropping_regions(ddl_ctx)
95    }
96
97    async fn next(
98        &mut self,
99        ddl_ctx: &DdlContext,
100        _ctx: &mut DropDatabaseContext,
101    ) -> Result<(Box<dyn State>, Status)> {
102        self.register_dropping_regions(ddl_ctx)?;
103        let executor = DropTableExecutor::new(self.table_name.clone(), self.table_id, true);
104        // Deletes metadata for table permanently.
105        let table_route_value = TableRouteValue::new(
106            self.table_id,
107            self.physical_table_id,
108            self.physical_region_routes.clone(),
109        );
110
111        // Deletes topic-region mapping if dropping physical table
112        let region_wal_options =
113            if let TableRouteValue::Physical(table_route_value) = &table_route_value {
114                let datanode_table_values = ddl_ctx
115                    .table_metadata_manager
116                    .datanode_table_manager()
117                    .regions(self.physical_table_id, table_route_value)
118                    .await?;
119                extract_region_wal_options(&datanode_table_values)?
120            } else {
121                HashMap::new()
122            };
123
124        executor
125            .on_destroy_metadata(ddl_ctx, &table_route_value, &region_wal_options)
126            .await?;
127        executor.invalidate_table_cache(ddl_ctx).await?;
128        executor
129            .on_drop_regions(ddl_ctx, &self.physical_region_routes, true)
130            .await?;
131        info!("Table: {}({}) is dropped", self.table_name, self.table_id);
132
133        Ok((
134            Box::new(DropDatabaseCursor::new(self.target)),
135            Status::executing(false),
136        ))
137    }
138
139    fn as_any(&self) -> &dyn Any {
140        self
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use std::sync::Arc;
147
148    use api::region::RegionResponse;
149    use api::v1::region::RegionRequest;
150    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
151    use common_error::ext::BoxedError;
152    use common_query::request::QueryRequest;
153    use common_recordbatch::SendableRecordBatchStream;
154    use table::table_name::TableName;
155
156    use crate::ddl::drop_database::cursor::DropDatabaseCursor;
157    use crate::ddl::drop_database::executor::DropDatabaseExecutor;
158    use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
159    use crate::ddl::test_util::{create_logical_table, create_physical_table};
160    use crate::error::{self, Error, Result};
161    use crate::key::datanode_table::DatanodeTableKey;
162    use crate::peer::Peer;
163    use crate::rpc::router::region_distribution;
164    use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager};
165
166    #[derive(Clone)]
167    pub struct NaiveDatanodeHandler;
168
169    #[async_trait::async_trait]
170    impl MockDatanodeHandler for NaiveDatanodeHandler {
171        async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
172            Ok(RegionResponse::new(0))
173        }
174
175        async fn handle_query(
176            &self,
177            _peer: &Peer,
178            _request: QueryRequest,
179        ) -> Result<SendableRecordBatchStream> {
180            unreachable!()
181        }
182    }
183
184    #[tokio::test]
185    async fn test_next_with_physical_table() {
186        let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
187        let ddl_context = new_ddl_context(node_manager);
188        let physical_table_id = create_physical_table(&ddl_context, "phy").await;
189        let (_, table_route) = ddl_context
190            .table_metadata_manager
191            .table_route_manager()
192            .get_physical_table_route(physical_table_id)
193            .await
194            .unwrap();
195        {
196            let mut state = DropDatabaseExecutor::new(
197                physical_table_id,
198                physical_table_id,
199                TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
200                table_route.region_routes.clone(),
201                DropTableTarget::Physical,
202            );
203            let mut ctx = DropDatabaseContext {
204                catalog: DEFAULT_CATALOG_NAME.to_string(),
205                schema: DEFAULT_SCHEMA_NAME.to_string(),
206                drop_if_exists: false,
207                tables: None,
208            };
209            let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
210            assert!(!status.need_persist());
211            let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
212            assert_eq!(cursor.target, DropTableTarget::Physical);
213        }
214        // Execute again
215        let mut ctx = DropDatabaseContext {
216            catalog: DEFAULT_CATALOG_NAME.to_string(),
217            schema: DEFAULT_SCHEMA_NAME.to_string(),
218            drop_if_exists: false,
219            tables: None,
220        };
221        let mut state = DropDatabaseExecutor::new(
222            physical_table_id,
223            physical_table_id,
224            TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
225            table_route.region_routes.clone(),
226            DropTableTarget::Physical,
227        );
228        let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
229        assert!(!status.need_persist());
230        let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
231        assert_eq!(cursor.target, DropTableTarget::Physical);
232    }
233
234    #[tokio::test]
235    async fn test_next_logical_table() {
236        let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
237        let ddl_context = new_ddl_context(node_manager);
238        let physical_table_id = create_physical_table(&ddl_context, "phy").await;
239        create_logical_table(ddl_context.clone(), physical_table_id, "metric").await;
240        let logical_table_id = physical_table_id + 1;
241        let (_, table_route) = ddl_context
242            .table_metadata_manager
243            .table_route_manager()
244            .get_physical_table_route(logical_table_id)
245            .await
246            .unwrap();
247        {
248            let mut state = DropDatabaseExecutor::new(
249                logical_table_id,
250                physical_table_id,
251                TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"),
252                table_route.region_routes.clone(),
253                DropTableTarget::Logical,
254            );
255            let mut ctx = DropDatabaseContext {
256                catalog: DEFAULT_CATALOG_NAME.to_string(),
257                schema: DEFAULT_SCHEMA_NAME.to_string(),
258                drop_if_exists: false,
259                tables: None,
260            };
261            let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
262            assert!(!status.need_persist());
263            let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
264            assert_eq!(cursor.target, DropTableTarget::Logical);
265        }
266        // Execute again
267        let mut ctx = DropDatabaseContext {
268            catalog: DEFAULT_CATALOG_NAME.to_string(),
269            schema: DEFAULT_SCHEMA_NAME.to_string(),
270            drop_if_exists: false,
271            tables: None,
272        };
273        let mut state = DropDatabaseExecutor::new(
274            logical_table_id,
275            physical_table_id,
276            TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"),
277            table_route.region_routes,
278            DropTableTarget::Logical,
279        );
280        let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
281        assert!(!status.need_persist());
282        let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
283        assert_eq!(cursor.target, DropTableTarget::Logical);
284        // Checks table info
285        ddl_context
286            .table_metadata_manager
287            .table_info_manager()
288            .get(physical_table_id)
289            .await
290            .unwrap()
291            .unwrap();
292        // Checks table route
293        let table_route = ddl_context
294            .table_metadata_manager
295            .table_route_manager()
296            .table_route_storage()
297            .get(physical_table_id)
298            .await
299            .unwrap()
300            .unwrap();
301        let region_routes = table_route.region_routes().unwrap();
302        for datanode_id in region_distribution(region_routes).into_keys() {
303            ddl_context
304                .table_metadata_manager
305                .datanode_table_manager()
306                .get(&DatanodeTableKey::new(datanode_id, physical_table_id))
307                .await
308                .unwrap()
309                .unwrap();
310        }
311    }
312
313    #[derive(Clone)]
314    pub struct RetryErrorDatanodeHandler;
315
316    #[async_trait::async_trait]
317    impl MockDatanodeHandler for RetryErrorDatanodeHandler {
318        async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
319            Err(Error::RetryLater {
320                source: BoxedError::new(
321                    error::UnexpectedSnafu {
322                        err_msg: "retry later",
323                    }
324                    .build(),
325                ),
326                clean_poisons: false,
327            })
328        }
329
330        async fn handle_query(
331            &self,
332            _peer: &Peer,
333            _request: QueryRequest,
334        ) -> Result<SendableRecordBatchStream> {
335            unreachable!()
336        }
337    }
338
339    #[tokio::test]
340    async fn test_next_retryable_err() {
341        let node_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler));
342        let ddl_context = new_ddl_context(node_manager);
343        let physical_table_id = create_physical_table(&ddl_context, "phy").await;
344        let (_, table_route) = ddl_context
345            .table_metadata_manager
346            .table_route_manager()
347            .get_physical_table_route(physical_table_id)
348            .await
349            .unwrap();
350        let mut state = DropDatabaseExecutor::new(
351            physical_table_id,
352            physical_table_id,
353            TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
354            table_route.region_routes,
355            DropTableTarget::Physical,
356        );
357        let mut ctx = DropDatabaseContext {
358            catalog: DEFAULT_CATALOG_NAME.to_string(),
359            schema: DEFAULT_SCHEMA_NAME.to_string(),
360            drop_if_exists: false,
361            tables: None,
362        };
363        let err = state.next(&ddl_context, &mut ctx).await.unwrap_err();
364        assert!(err.is_retry_later());
365    }
366
367    #[tokio::test]
368    async fn test_on_recovery() {
369        let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
370        let ddl_context = new_ddl_context(node_manager);
371        let physical_table_id = create_physical_table(&ddl_context, "phy").await;
372        let (_, table_route) = ddl_context
373            .table_metadata_manager
374            .table_route_manager()
375            .get_physical_table_route(physical_table_id)
376            .await
377            .unwrap();
378        {
379            let mut state = DropDatabaseExecutor::new(
380                physical_table_id,
381                physical_table_id,
382                TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
383                table_route.region_routes.clone(),
384                DropTableTarget::Physical,
385            );
386            let mut ctx = DropDatabaseContext {
387                catalog: DEFAULT_CATALOG_NAME.to_string(),
388                schema: DEFAULT_SCHEMA_NAME.to_string(),
389                drop_if_exists: false,
390                tables: None,
391            };
392            state.recover(&ddl_context).unwrap();
393            assert_eq!(state.dropping_regions.len(), 1);
394            let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
395            assert!(!status.need_persist());
396            let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
397            assert_eq!(cursor.target, DropTableTarget::Physical);
398        }
399    }
400}