common_meta/ddl/drop_database/
executor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17
18use common_procedure::Status;
19use common_telemetry::info;
20use serde::{Deserialize, Serialize};
21use snafu::OptionExt;
22use table::metadata::TableId;
23use table::table_name::TableName;
24
25use crate::ddl::drop_database::cursor::DropDatabaseCursor;
26use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
27use crate::ddl::drop_table::executor::DropTableExecutor;
28use crate::ddl::utils::extract_region_wal_options;
29use crate::ddl::DdlContext;
30use crate::error::{self, Result};
31use crate::key::table_route::TableRouteValue;
32use crate::region_keeper::OperatingRegionGuard;
33use crate::rpc::router::{operating_leader_regions, RegionRoute};
34
35#[derive(Debug, Serialize, Deserialize)]
36pub(crate) struct DropDatabaseExecutor {
37    table_id: TableId,
38    physical_table_id: TableId,
39    table_name: TableName,
40    /// The physical table region routes.
41    pub(crate) physical_region_routes: Vec<RegionRoute>,
42    pub(crate) target: DropTableTarget,
43    #[serde(skip)]
44    dropping_regions: Vec<OperatingRegionGuard>,
45}
46
47impl DropDatabaseExecutor {
48    /// Returns a new [DropDatabaseExecutor].
49    pub fn new(
50        table_id: TableId,
51        physical_table_id: TableId,
52        table_name: TableName,
53        physical_region_routes: Vec<RegionRoute>,
54        target: DropTableTarget,
55    ) -> Self {
56        Self {
57            table_id,
58            physical_table_id,
59            table_name,
60            physical_region_routes,
61            target,
62            dropping_regions: vec![],
63        }
64    }
65}
66
67impl DropDatabaseExecutor {
68    /// Registers the operating regions.
69    pub(crate) fn register_dropping_regions(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
70        if !self.dropping_regions.is_empty() {
71            return Ok(());
72        }
73        let dropping_regions = operating_leader_regions(&self.physical_region_routes);
74        let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
75        for (region_id, datanode_id) in dropping_regions {
76            let guard = ddl_ctx
77                .memory_region_keeper
78                .register(datanode_id, region_id)
79                .context(error::RegionOperatingRaceSnafu {
80                    region_id,
81                    peer_id: datanode_id,
82                })?;
83            dropping_region_guards.push(guard);
84        }
85        self.dropping_regions = dropping_region_guards;
86        Ok(())
87    }
88}
89
90#[async_trait::async_trait]
91#[typetag::serde]
92impl State for DropDatabaseExecutor {
93    fn recover(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
94        self.register_dropping_regions(ddl_ctx)
95    }
96
97    async fn next(
98        &mut self,
99        ddl_ctx: &DdlContext,
100        _ctx: &mut DropDatabaseContext,
101    ) -> Result<(Box<dyn State>, Status)> {
102        self.register_dropping_regions(ddl_ctx)?;
103        let executor = DropTableExecutor::new(self.table_name.clone(), self.table_id, true);
104        // Deletes metadata for table permanently.
105        let table_route_value = TableRouteValue::new(
106            self.table_id,
107            self.physical_table_id,
108            self.physical_region_routes.clone(),
109        );
110
111        // Deletes topic-region mapping if dropping physical table
112        let region_wal_options =
113            if let TableRouteValue::Physical(table_route_value) = &table_route_value {
114                let datanode_table_values = ddl_ctx
115                    .table_metadata_manager
116                    .datanode_table_manager()
117                    .regions(self.physical_table_id, table_route_value)
118                    .await?;
119                extract_region_wal_options(&datanode_table_values)?
120            } else {
121                HashMap::new()
122            };
123
124        executor
125            .on_destroy_metadata(ddl_ctx, &table_route_value, &region_wal_options)
126            .await?;
127        executor.invalidate_table_cache(ddl_ctx).await?;
128        executor
129            .on_drop_regions(ddl_ctx, &self.physical_region_routes, true)
130            .await?;
131        info!("Table: {}({}) is dropped", self.table_name, self.table_id);
132
133        Ok((
134            Box::new(DropDatabaseCursor::new(self.target)),
135            Status::executing(false),
136        ))
137    }
138
139    fn as_any(&self) -> &dyn Any {
140        self
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use std::sync::Arc;
147
148    use api::region::RegionResponse;
149    use api::v1::region::RegionRequest;
150    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
151    use common_error::ext::BoxedError;
152    use common_query::request::QueryRequest;
153    use common_recordbatch::SendableRecordBatchStream;
154    use table::table_name::TableName;
155
156    use crate::ddl::drop_database::cursor::DropDatabaseCursor;
157    use crate::ddl::drop_database::executor::DropDatabaseExecutor;
158    use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
159    use crate::ddl::test_util::{create_logical_table, create_physical_table};
160    use crate::error::{self, Error, Result};
161    use crate::key::datanode_table::DatanodeTableKey;
162    use crate::peer::Peer;
163    use crate::rpc::router::region_distribution;
164    use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager};
165
166    #[derive(Clone)]
167    pub struct NaiveDatanodeHandler;
168
169    #[async_trait::async_trait]
170    impl MockDatanodeHandler for NaiveDatanodeHandler {
171        async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
172            Ok(RegionResponse::new(0))
173        }
174
175        async fn handle_query(
176            &self,
177            _peer: &Peer,
178            _request: QueryRequest,
179        ) -> Result<SendableRecordBatchStream> {
180            unreachable!()
181        }
182    }
183
184    #[tokio::test]
185    async fn test_next_with_physical_table() {
186        let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
187        let ddl_context = new_ddl_context(node_manager);
188        let physical_table_id = create_physical_table(&ddl_context, "phy").await;
189        let (_, table_route) = ddl_context
190            .table_metadata_manager
191            .table_route_manager()
192            .get_physical_table_route(physical_table_id)
193            .await
194            .unwrap();
195        {
196            let mut state = DropDatabaseExecutor::new(
197                physical_table_id,
198                physical_table_id,
199                TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
200                table_route.region_routes.clone(),
201                DropTableTarget::Physical,
202            );
203            let mut ctx = DropDatabaseContext {
204                catalog: DEFAULT_CATALOG_NAME.to_string(),
205                schema: DEFAULT_SCHEMA_NAME.to_string(),
206                drop_if_exists: false,
207                tables: None,
208            };
209            let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
210            assert!(!status.need_persist());
211            let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
212            assert_eq!(cursor.target, DropTableTarget::Physical);
213        }
214        // Execute again
215        let mut ctx = DropDatabaseContext {
216            catalog: DEFAULT_CATALOG_NAME.to_string(),
217            schema: DEFAULT_SCHEMA_NAME.to_string(),
218            drop_if_exists: false,
219            tables: None,
220        };
221        let mut state = DropDatabaseExecutor::new(
222            physical_table_id,
223            physical_table_id,
224            TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
225            table_route.region_routes.clone(),
226            DropTableTarget::Physical,
227        );
228        let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
229        assert!(!status.need_persist());
230        let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
231        assert_eq!(cursor.target, DropTableTarget::Physical);
232    }
233
234    #[tokio::test]
235    async fn test_next_logical_table() {
236        let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
237        let ddl_context = new_ddl_context(node_manager);
238        let physical_table_id = create_physical_table(&ddl_context, "phy").await;
239        create_logical_table(ddl_context.clone(), physical_table_id, "metric").await;
240        let logical_table_id = physical_table_id + 1;
241        let (_, table_route) = ddl_context
242            .table_metadata_manager
243            .table_route_manager()
244            .get_physical_table_route(logical_table_id)
245            .await
246            .unwrap();
247        {
248            let mut state = DropDatabaseExecutor::new(
249                logical_table_id,
250                physical_table_id,
251                TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"),
252                table_route.region_routes.clone(),
253                DropTableTarget::Logical,
254            );
255            let mut ctx = DropDatabaseContext {
256                catalog: DEFAULT_CATALOG_NAME.to_string(),
257                schema: DEFAULT_SCHEMA_NAME.to_string(),
258                drop_if_exists: false,
259                tables: None,
260            };
261            let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
262            assert!(!status.need_persist());
263            let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
264            assert_eq!(cursor.target, DropTableTarget::Logical);
265        }
266        // Execute again
267        let mut ctx = DropDatabaseContext {
268            catalog: DEFAULT_CATALOG_NAME.to_string(),
269            schema: DEFAULT_SCHEMA_NAME.to_string(),
270            drop_if_exists: false,
271            tables: None,
272        };
273        let mut state = DropDatabaseExecutor::new(
274            logical_table_id,
275            physical_table_id,
276            TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"),
277            table_route.region_routes,
278            DropTableTarget::Logical,
279        );
280        let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
281        assert!(!status.need_persist());
282        let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
283        assert_eq!(cursor.target, DropTableTarget::Logical);
284        // Checks table info
285        ddl_context
286            .table_metadata_manager
287            .table_info_manager()
288            .get(physical_table_id)
289            .await
290            .unwrap()
291            .unwrap();
292        // Checks table route
293        let table_route = ddl_context
294            .table_metadata_manager
295            .table_route_manager()
296            .table_route_storage()
297            .get(physical_table_id)
298            .await
299            .unwrap()
300            .unwrap();
301        let region_routes = table_route.region_routes().unwrap();
302        for datanode_id in region_distribution(region_routes).into_keys() {
303            ddl_context
304                .table_metadata_manager
305                .datanode_table_manager()
306                .get(&DatanodeTableKey::new(datanode_id, physical_table_id))
307                .await
308                .unwrap()
309                .unwrap();
310        }
311    }
312
313    #[derive(Clone)]
314    pub struct RetryErrorDatanodeHandler;
315
316    #[async_trait::async_trait]
317    impl MockDatanodeHandler for RetryErrorDatanodeHandler {
318        async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
319            Err(Error::RetryLater {
320                source: BoxedError::new(
321                    error::UnexpectedSnafu {
322                        err_msg: "retry later",
323                    }
324                    .build(),
325                ),
326            })
327        }
328
329        async fn handle_query(
330            &self,
331            _peer: &Peer,
332            _request: QueryRequest,
333        ) -> Result<SendableRecordBatchStream> {
334            unreachable!()
335        }
336    }
337
338    #[tokio::test]
339    async fn test_next_retryable_err() {
340        let node_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler));
341        let ddl_context = new_ddl_context(node_manager);
342        let physical_table_id = create_physical_table(&ddl_context, "phy").await;
343        let (_, table_route) = ddl_context
344            .table_metadata_manager
345            .table_route_manager()
346            .get_physical_table_route(physical_table_id)
347            .await
348            .unwrap();
349        let mut state = DropDatabaseExecutor::new(
350            physical_table_id,
351            physical_table_id,
352            TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
353            table_route.region_routes,
354            DropTableTarget::Physical,
355        );
356        let mut ctx = DropDatabaseContext {
357            catalog: DEFAULT_CATALOG_NAME.to_string(),
358            schema: DEFAULT_SCHEMA_NAME.to_string(),
359            drop_if_exists: false,
360            tables: None,
361        };
362        let err = state.next(&ddl_context, &mut ctx).await.unwrap_err();
363        assert!(err.is_retry_later());
364    }
365
366    #[tokio::test]
367    async fn test_on_recovery() {
368        let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
369        let ddl_context = new_ddl_context(node_manager);
370        let physical_table_id = create_physical_table(&ddl_context, "phy").await;
371        let (_, table_route) = ddl_context
372            .table_metadata_manager
373            .table_route_manager()
374            .get_physical_table_route(physical_table_id)
375            .await
376            .unwrap();
377        {
378            let mut state = DropDatabaseExecutor::new(
379                physical_table_id,
380                physical_table_id,
381                TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
382                table_route.region_routes.clone(),
383                DropTableTarget::Physical,
384            );
385            let mut ctx = DropDatabaseContext {
386                catalog: DEFAULT_CATALOG_NAME.to_string(),
387                schema: DEFAULT_SCHEMA_NAME.to_string(),
388                drop_if_exists: false,
389                tables: None,
390            };
391            state.recover(&ddl_context).unwrap();
392            assert_eq!(state.dropping_regions.len(), 1);
393            let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
394            assert!(!status.need_persist());
395            let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
396            assert_eq!(cursor.target, DropTableTarget::Physical);
397        }
398    }
399}