common_meta/ddl/drop_database/
executor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_procedure::Status;
18use common_telemetry::info;
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use table::metadata::TableId;
22use table::table_name::TableName;
23
24use crate::ddl::DdlContext;
25use crate::ddl::drop_database::cursor::DropDatabaseCursor;
26use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
27use crate::ddl::drop_table::executor::DropTableExecutor;
28use crate::ddl::utils::get_region_wal_options;
29use crate::error::{self, Result};
30use crate::key::table_route::TableRouteValue;
31use crate::region_keeper::OperatingRegionGuard;
32use crate::rpc::router::{RegionRoute, operating_leader_regions};
33
34#[derive(Debug, Serialize, Deserialize)]
35pub(crate) struct DropDatabaseExecutor {
36    table_id: TableId,
37    physical_table_id: TableId,
38    table_name: TableName,
39    /// The physical table region routes.
40    pub(crate) physical_region_routes: Vec<RegionRoute>,
41    pub(crate) target: DropTableTarget,
42    #[serde(skip)]
43    dropping_regions: Vec<OperatingRegionGuard>,
44}
45
46impl DropDatabaseExecutor {
47    /// Returns a new [DropDatabaseExecutor].
48    pub fn new(
49        table_id: TableId,
50        physical_table_id: TableId,
51        table_name: TableName,
52        physical_region_routes: Vec<RegionRoute>,
53        target: DropTableTarget,
54    ) -> Self {
55        Self {
56            table_id,
57            physical_table_id,
58            table_name,
59            physical_region_routes,
60            target,
61            dropping_regions: vec![],
62        }
63    }
64}
65
66impl DropDatabaseExecutor {
67    /// Registers the operating regions.
68    pub(crate) fn register_dropping_regions(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
69        if !self.dropping_regions.is_empty() {
70            return Ok(());
71        }
72        let dropping_regions = operating_leader_regions(&self.physical_region_routes);
73        let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
74        for (region_id, datanode_id) in dropping_regions {
75            let guard = ddl_ctx
76                .memory_region_keeper
77                .register(datanode_id, region_id)
78                .context(error::RegionOperatingRaceSnafu {
79                    region_id,
80                    peer_id: datanode_id,
81                })?;
82            dropping_region_guards.push(guard);
83        }
84        self.dropping_regions = dropping_region_guards;
85        Ok(())
86    }
87}
88
89#[async_trait::async_trait]
90#[typetag::serde]
91impl State for DropDatabaseExecutor {
92    fn recover(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
93        self.register_dropping_regions(ddl_ctx)
94    }
95
96    async fn next(
97        &mut self,
98        ddl_ctx: &DdlContext,
99        _ctx: &mut DropDatabaseContext,
100    ) -> Result<(Box<dyn State>, Status)> {
101        self.register_dropping_regions(ddl_ctx)?;
102        let executor = DropTableExecutor::new(self.table_name.clone(), self.table_id, true);
103        // Deletes metadata for table permanently.
104        let table_route_value = TableRouteValue::new(
105            self.table_id,
106            self.physical_table_id,
107            self.physical_region_routes.clone(),
108        );
109
110        // Deletes topic-region mapping if dropping physical table
111        let region_wal_options = get_region_wal_options(
112            &ddl_ctx.table_metadata_manager,
113            &table_route_value,
114            self.physical_table_id,
115        )
116        .await?;
117
118        executor
119            .on_destroy_metadata(ddl_ctx, &table_route_value, &region_wal_options)
120            .await?;
121        executor.invalidate_table_cache(ddl_ctx).await?;
122        executor
123            .on_drop_regions(
124                &ddl_ctx.node_manager,
125                &ddl_ctx.leader_region_registry,
126                &self.physical_region_routes,
127                true,
128                false,
129                false,
130            )
131            .await?;
132        info!("Table: {}({}) is dropped", self.table_name, self.table_id);
133
134        Ok((
135            Box::new(DropDatabaseCursor::new(self.target)),
136            Status::executing(false),
137        ))
138    }
139
140    fn as_any(&self) -> &dyn Any {
141        self
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use std::sync::Arc;
148
149    use api::region::RegionResponse;
150    use api::v1::region::RegionRequest;
151    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
152    use common_error::ext::BoxedError;
153    use common_query::request::QueryRequest;
154    use common_recordbatch::SendableRecordBatchStream;
155    use table::table_name::TableName;
156
157    use crate::ddl::drop_database::cursor::DropDatabaseCursor;
158    use crate::ddl::drop_database::executor::DropDatabaseExecutor;
159    use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
160    use crate::ddl::test_util::{create_logical_table, create_physical_table};
161    use crate::error::{self, Error, Result};
162    use crate::key::datanode_table::DatanodeTableKey;
163    use crate::peer::Peer;
164    use crate::rpc::router::region_distribution;
165    use crate::test_util::{MockDatanodeHandler, MockDatanodeManager, new_ddl_context};
166
167    #[derive(Clone)]
168    pub struct NaiveDatanodeHandler;
169
170    #[async_trait::async_trait]
171    impl MockDatanodeHandler for NaiveDatanodeHandler {
172        async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
173            Ok(RegionResponse::new(0))
174        }
175
176        async fn handle_query(
177            &self,
178            _peer: &Peer,
179            _request: QueryRequest,
180        ) -> Result<SendableRecordBatchStream> {
181            unreachable!()
182        }
183    }
184
185    #[tokio::test]
186    async fn test_next_with_physical_table() {
187        let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
188        let ddl_context = new_ddl_context(node_manager);
189        let physical_table_id = create_physical_table(&ddl_context, "phy").await;
190        let (_, table_route) = ddl_context
191            .table_metadata_manager
192            .table_route_manager()
193            .get_physical_table_route(physical_table_id)
194            .await
195            .unwrap();
196        {
197            let mut state = DropDatabaseExecutor::new(
198                physical_table_id,
199                physical_table_id,
200                TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
201                table_route.region_routes.clone(),
202                DropTableTarget::Physical,
203            );
204            let mut ctx = DropDatabaseContext {
205                catalog: DEFAULT_CATALOG_NAME.to_string(),
206                schema: DEFAULT_SCHEMA_NAME.to_string(),
207                drop_if_exists: false,
208                tables: None,
209            };
210            let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
211            assert!(!status.need_persist());
212            let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
213            assert_eq!(cursor.target, DropTableTarget::Physical);
214        }
215        // Execute again
216        let mut ctx = DropDatabaseContext {
217            catalog: DEFAULT_CATALOG_NAME.to_string(),
218            schema: DEFAULT_SCHEMA_NAME.to_string(),
219            drop_if_exists: false,
220            tables: None,
221        };
222        let mut state = DropDatabaseExecutor::new(
223            physical_table_id,
224            physical_table_id,
225            TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
226            table_route.region_routes.clone(),
227            DropTableTarget::Physical,
228        );
229        let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
230        assert!(!status.need_persist());
231        let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
232        assert_eq!(cursor.target, DropTableTarget::Physical);
233    }
234
235    #[tokio::test]
236    async fn test_next_logical_table() {
237        let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
238        let ddl_context = new_ddl_context(node_manager);
239        let physical_table_id = create_physical_table(&ddl_context, "phy").await;
240        create_logical_table(ddl_context.clone(), physical_table_id, "metric").await;
241        let logical_table_id = physical_table_id + 1;
242        let (_, table_route) = ddl_context
243            .table_metadata_manager
244            .table_route_manager()
245            .get_physical_table_route(logical_table_id)
246            .await
247            .unwrap();
248        {
249            let mut state = DropDatabaseExecutor::new(
250                logical_table_id,
251                physical_table_id,
252                TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"),
253                table_route.region_routes.clone(),
254                DropTableTarget::Logical,
255            );
256            let mut ctx = DropDatabaseContext {
257                catalog: DEFAULT_CATALOG_NAME.to_string(),
258                schema: DEFAULT_SCHEMA_NAME.to_string(),
259                drop_if_exists: false,
260                tables: None,
261            };
262            let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
263            assert!(!status.need_persist());
264            let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
265            assert_eq!(cursor.target, DropTableTarget::Logical);
266        }
267        // Execute again
268        let mut ctx = DropDatabaseContext {
269            catalog: DEFAULT_CATALOG_NAME.to_string(),
270            schema: DEFAULT_SCHEMA_NAME.to_string(),
271            drop_if_exists: false,
272            tables: None,
273        };
274        let mut state = DropDatabaseExecutor::new(
275            logical_table_id,
276            physical_table_id,
277            TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"),
278            table_route.region_routes,
279            DropTableTarget::Logical,
280        );
281        let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
282        assert!(!status.need_persist());
283        let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
284        assert_eq!(cursor.target, DropTableTarget::Logical);
285        // Checks table info
286        ddl_context
287            .table_metadata_manager
288            .table_info_manager()
289            .get(physical_table_id)
290            .await
291            .unwrap()
292            .unwrap();
293        // Checks table route
294        let table_route = ddl_context
295            .table_metadata_manager
296            .table_route_manager()
297            .table_route_storage()
298            .get(physical_table_id)
299            .await
300            .unwrap()
301            .unwrap();
302        let region_routes = table_route.region_routes().unwrap();
303        for datanode_id in region_distribution(region_routes).into_keys() {
304            ddl_context
305                .table_metadata_manager
306                .datanode_table_manager()
307                .get(&DatanodeTableKey::new(datanode_id, physical_table_id))
308                .await
309                .unwrap()
310                .unwrap();
311        }
312    }
313
314    #[derive(Clone)]
315    pub struct RetryErrorDatanodeHandler;
316
317    #[async_trait::async_trait]
318    impl MockDatanodeHandler for RetryErrorDatanodeHandler {
319        async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
320            Err(Error::RetryLater {
321                source: BoxedError::new(
322                    error::UnexpectedSnafu {
323                        err_msg: "retry later",
324                    }
325                    .build(),
326                ),
327                clean_poisons: false,
328            })
329        }
330
331        async fn handle_query(
332            &self,
333            _peer: &Peer,
334            _request: QueryRequest,
335        ) -> Result<SendableRecordBatchStream> {
336            unreachable!()
337        }
338    }
339
340    #[tokio::test]
341    async fn test_next_retryable_err() {
342        let node_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler));
343        let ddl_context = new_ddl_context(node_manager);
344        let physical_table_id = create_physical_table(&ddl_context, "phy").await;
345        let (_, table_route) = ddl_context
346            .table_metadata_manager
347            .table_route_manager()
348            .get_physical_table_route(physical_table_id)
349            .await
350            .unwrap();
351        let mut state = DropDatabaseExecutor::new(
352            physical_table_id,
353            physical_table_id,
354            TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
355            table_route.region_routes,
356            DropTableTarget::Physical,
357        );
358        let mut ctx = DropDatabaseContext {
359            catalog: DEFAULT_CATALOG_NAME.to_string(),
360            schema: DEFAULT_SCHEMA_NAME.to_string(),
361            drop_if_exists: false,
362            tables: None,
363        };
364        let err = state.next(&ddl_context, &mut ctx).await.unwrap_err();
365        assert!(err.is_retry_later());
366    }
367
368    #[tokio::test]
369    async fn test_on_recovery() {
370        let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
371        let ddl_context = new_ddl_context(node_manager);
372        let physical_table_id = create_physical_table(&ddl_context, "phy").await;
373        let (_, table_route) = ddl_context
374            .table_metadata_manager
375            .table_route_manager()
376            .get_physical_table_route(physical_table_id)
377            .await
378            .unwrap();
379        {
380            let mut state = DropDatabaseExecutor::new(
381                physical_table_id,
382                physical_table_id,
383                TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
384                table_route.region_routes.clone(),
385                DropTableTarget::Physical,
386            );
387            let mut ctx = DropDatabaseContext {
388                catalog: DEFAULT_CATALOG_NAME.to_string(),
389                schema: DEFAULT_SCHEMA_NAME.to_string(),
390                drop_if_exists: false,
391                tables: None,
392            };
393            state.recover(&ddl_context).unwrap();
394            assert_eq!(state.dropping_regions.len(), 1);
395            let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
396            assert!(!status.need_persist());
397            let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
398            assert_eq!(cursor.target, DropTableTarget::Physical);
399        }
400    }
401}