Skip to main content

common_meta/ddl/drop_database/
executor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_procedure::Status;
18use common_telemetry::info;
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use table::metadata::TableId;
22use table::table_name::TableName;
23
24use crate::ddl::DdlContext;
25use crate::ddl::drop_database::cursor::DropDatabaseCursor;
26use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
27use crate::ddl::drop_table::executor::DropTableExecutor;
28use crate::ddl::utils::get_region_wal_options;
29use crate::error::{self, Result};
30use crate::key::table_route::TableRouteValue;
31use crate::region_keeper::OperatingRegionGuard;
32use crate::rpc::router::{RegionRoute, operating_leader_region_roles};
33
34#[derive(Debug, Serialize, Deserialize)]
35pub(crate) struct DropDatabaseExecutor {
36    table_id: TableId,
37    physical_table_id: TableId,
38    table_name: TableName,
39    /// The physical table region routes.
40    pub(crate) physical_region_routes: Vec<RegionRoute>,
41    pub(crate) target: DropTableTarget,
42    #[serde(skip)]
43    dropping_regions: Vec<OperatingRegionGuard>,
44}
45
46impl DropDatabaseExecutor {
47    /// Returns a new [DropDatabaseExecutor].
48    pub fn new(
49        table_id: TableId,
50        physical_table_id: TableId,
51        table_name: TableName,
52        physical_region_routes: Vec<RegionRoute>,
53        target: DropTableTarget,
54    ) -> Self {
55        Self {
56            table_id,
57            physical_table_id,
58            table_name,
59            physical_region_routes,
60            target,
61            dropping_regions: vec![],
62        }
63    }
64}
65
66impl DropDatabaseExecutor {
67    /// Registers the operating regions.
68    pub(crate) fn register_dropping_regions(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
69        if !self.dropping_regions.is_empty() {
70            return Ok(());
71        }
72        let dropping_regions = operating_leader_region_roles(&self.physical_region_routes);
73        let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
74        for (region_id, datanode_id, role) in dropping_regions {
75            let guard = ddl_ctx
76                .memory_region_keeper
77                .register_with_role(datanode_id, region_id, role)
78                .context(error::RegionOperatingRaceSnafu {
79                    region_id,
80                    peer_id: datanode_id,
81                })?;
82            dropping_region_guards.push(guard);
83        }
84        self.dropping_regions = dropping_region_guards;
85        Ok(())
86    }
87}
88
89#[async_trait::async_trait]
90#[typetag::serde]
91impl State for DropDatabaseExecutor {
92    fn recover(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
93        self.register_dropping_regions(ddl_ctx)
94    }
95
96    async fn next(
97        &mut self,
98        ddl_ctx: &DdlContext,
99        ctx: &mut DropDatabaseContext,
100    ) -> Result<(Box<dyn State>, Status)> {
101        self.register_dropping_regions(ddl_ctx)?;
102        let executor = DropTableExecutor::new(self.table_name.clone(), self.table_id, true);
103        if ctx.retrying {
104            info!(
105                "Remapping region routes addresses for retrying drop regions for table_id: {}",
106                self.table_id
107            );
108            let storage = ddl_ctx
109                .table_metadata_manager
110                .table_route_manager()
111                .table_route_storage();
112            // The peer addresses may change during retries,
113            // so we always remap the region routes.
114            storage
115                .remap_region_routes(&mut self.physical_region_routes)
116                .await?;
117        }
118        // Deletes metadata for table permanently.
119        let table_route_value = TableRouteValue::new(
120            self.table_id,
121            self.physical_table_id,
122            self.physical_region_routes.clone(),
123        );
124
125        // Deletes topic-region mapping if dropping physical table
126        let region_wal_options = get_region_wal_options(
127            &ddl_ctx.table_metadata_manager,
128            &table_route_value,
129            self.physical_table_id,
130        )
131        .await?;
132
133        executor
134            .on_destroy_metadata(ddl_ctx, &table_route_value, &region_wal_options)
135            .await?;
136        executor.invalidate_table_cache(ddl_ctx).await?;
137        executor
138            .on_drop_regions(
139                &ddl_ctx.node_manager,
140                &ddl_ctx.leader_region_registry,
141                &self.physical_region_routes,
142                true,
143                false,
144                false,
145            )
146            .await?;
147        info!("Table: {}({}) is dropped", self.table_name, self.table_id);
148
149        Ok((
150            Box::new(DropDatabaseCursor::new(self.target)),
151            Status::executing(false),
152        ))
153    }
154
155    fn as_any(&self) -> &dyn Any {
156        self
157    }
158}
159
160#[cfg(test)]
161mod tests {
162    use std::collections::HashSet;
163    use std::sync::Arc;
164
165    use api::region::RegionResponse;
166    use api::v1::region::RegionRequest;
167    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
168    use common_error::ext::BoxedError;
169    use common_query::request::QueryRequest;
170    use common_recordbatch::SendableRecordBatchStream;
171    use store_api::region_engine::RegionRole;
172    use store_api::storage::RegionId;
173    use table::table_name::TableName;
174
175    use crate::ddl::drop_database::cursor::DropDatabaseCursor;
176    use crate::ddl::drop_database::executor::DropDatabaseExecutor;
177    use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
178    use crate::ddl::test_util::datanode_handler::DatanodeWatcher;
179    use crate::ddl::test_util::{
180        create_logical_table, create_physical_table, put_datanode_address,
181    };
182    use crate::error::{self, Error, Result};
183    use crate::key::datanode_table::DatanodeTableKey;
184    use crate::peer::Peer;
185    use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
186    use crate::test_util::{MockDatanodeHandler, MockDatanodeManager, new_ddl_context};
187
188    #[derive(Clone)]
189    pub struct NaiveDatanodeHandler;
190
191    #[async_trait::async_trait]
192    impl MockDatanodeHandler for NaiveDatanodeHandler {
193        async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
194            Ok(RegionResponse::new(0))
195        }
196
197        async fn handle_query(
198            &self,
199            _peer: &Peer,
200            _request: QueryRequest,
201        ) -> Result<SendableRecordBatchStream> {
202            unreachable!()
203        }
204    }
205
206    #[tokio::test]
207    async fn test_next_with_physical_table() {
208        let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
209        let ddl_context = new_ddl_context(node_manager);
210        let physical_table_id = create_physical_table(&ddl_context, "phy").await;
211        let (_, table_route) = ddl_context
212            .table_metadata_manager
213            .table_route_manager()
214            .get_physical_table_route(physical_table_id)
215            .await
216            .unwrap();
217        {
218            let mut state = DropDatabaseExecutor::new(
219                physical_table_id,
220                physical_table_id,
221                TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
222                table_route.region_routes.clone(),
223                DropTableTarget::Physical,
224            );
225            let mut ctx = DropDatabaseContext {
226                catalog: DEFAULT_CATALOG_NAME.to_string(),
227                schema: DEFAULT_SCHEMA_NAME.to_string(),
228                drop_if_exists: false,
229                tables: None,
230                retrying: false,
231            };
232            let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
233            assert!(!status.need_persist());
234            let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
235            assert_eq!(cursor.target, DropTableTarget::Physical);
236        }
237        // Execute again
238        let mut ctx = DropDatabaseContext {
239            catalog: DEFAULT_CATALOG_NAME.to_string(),
240            schema: DEFAULT_SCHEMA_NAME.to_string(),
241            drop_if_exists: false,
242            tables: None,
243            retrying: false,
244        };
245        let mut state = DropDatabaseExecutor::new(
246            physical_table_id,
247            physical_table_id,
248            TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
249            table_route.region_routes.clone(),
250            DropTableTarget::Physical,
251        );
252        let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
253        assert!(!status.need_persist());
254        let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
255        assert_eq!(cursor.target, DropTableTarget::Physical);
256    }
257
258    #[tokio::test]
259    async fn test_next_logical_table() {
260        let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
261        let ddl_context = new_ddl_context(node_manager);
262        let physical_table_id = create_physical_table(&ddl_context, "phy").await;
263        create_logical_table(ddl_context.clone(), physical_table_id, "metric").await;
264        let logical_table_id = physical_table_id + 1;
265        let (_, table_route) = ddl_context
266            .table_metadata_manager
267            .table_route_manager()
268            .get_physical_table_route(logical_table_id)
269            .await
270            .unwrap();
271        {
272            let mut state = DropDatabaseExecutor::new(
273                logical_table_id,
274                physical_table_id,
275                TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"),
276                table_route.region_routes.clone(),
277                DropTableTarget::Logical,
278            );
279            let mut ctx = DropDatabaseContext {
280                catalog: DEFAULT_CATALOG_NAME.to_string(),
281                schema: DEFAULT_SCHEMA_NAME.to_string(),
282                drop_if_exists: false,
283                tables: None,
284                retrying: false,
285            };
286            let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
287            assert!(!status.need_persist());
288            let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
289            assert_eq!(cursor.target, DropTableTarget::Logical);
290        }
291        // Execute again
292        let mut ctx = DropDatabaseContext {
293            catalog: DEFAULT_CATALOG_NAME.to_string(),
294            schema: DEFAULT_SCHEMA_NAME.to_string(),
295            drop_if_exists: false,
296            tables: None,
297            retrying: false,
298        };
299        let mut state = DropDatabaseExecutor::new(
300            logical_table_id,
301            physical_table_id,
302            TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"),
303            table_route.region_routes,
304            DropTableTarget::Logical,
305        );
306        let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
307        assert!(!status.need_persist());
308        let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
309        assert_eq!(cursor.target, DropTableTarget::Logical);
310        // Checks table info
311        ddl_context
312            .table_metadata_manager
313            .table_info_manager()
314            .get(physical_table_id)
315            .await
316            .unwrap()
317            .unwrap();
318        // Checks table route
319        let table_route = ddl_context
320            .table_metadata_manager
321            .table_route_manager()
322            .table_route_storage()
323            .get(physical_table_id)
324            .await
325            .unwrap()
326            .unwrap();
327        let region_routes = table_route.region_routes().unwrap();
328        for datanode_id in region_distribution(region_routes).into_keys() {
329            ddl_context
330                .table_metadata_manager
331                .datanode_table_manager()
332                .get(&DatanodeTableKey::new(datanode_id, physical_table_id))
333                .await
334                .unwrap()
335                .unwrap();
336        }
337    }
338
339    #[derive(Clone)]
340    pub struct RetryErrorDatanodeHandler;
341
342    #[async_trait::async_trait]
343    impl MockDatanodeHandler for RetryErrorDatanodeHandler {
344        async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
345            Err(Error::RetryLater {
346                source: BoxedError::new(
347                    error::UnexpectedSnafu {
348                        err_msg: "retry later",
349                    }
350                    .build(),
351                ),
352                clean_poisons: false,
353            })
354        }
355
356        async fn handle_query(
357            &self,
358            _peer: &Peer,
359            _request: QueryRequest,
360        ) -> Result<SendableRecordBatchStream> {
361            unreachable!()
362        }
363    }
364
365    #[tokio::test]
366    async fn test_next_retryable_err() {
367        let node_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler));
368        let ddl_context = new_ddl_context(node_manager);
369        let physical_table_id = create_physical_table(&ddl_context, "phy").await;
370        let (_, table_route) = ddl_context
371            .table_metadata_manager
372            .table_route_manager()
373            .get_physical_table_route(physical_table_id)
374            .await
375            .unwrap();
376        let mut state = DropDatabaseExecutor::new(
377            physical_table_id,
378            physical_table_id,
379            TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
380            table_route.region_routes,
381            DropTableTarget::Physical,
382        );
383        let mut ctx = DropDatabaseContext {
384            catalog: DEFAULT_CATALOG_NAME.to_string(),
385            schema: DEFAULT_SCHEMA_NAME.to_string(),
386            drop_if_exists: false,
387            tables: None,
388            retrying: false,
389        };
390        let err = state.next(&ddl_context, &mut ctx).await.unwrap_err();
391        assert!(err.is_retry_later());
392    }
393
394    #[tokio::test]
395    async fn test_on_recovery() {
396        let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
397        let ddl_context = new_ddl_context(node_manager);
398        let physical_table_id = create_physical_table(&ddl_context, "phy").await;
399        let (_, table_route) = ddl_context
400            .table_metadata_manager
401            .table_route_manager()
402            .get_physical_table_route(physical_table_id)
403            .await
404            .unwrap();
405        {
406            let mut state = DropDatabaseExecutor::new(
407                physical_table_id,
408                physical_table_id,
409                TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
410                table_route.region_routes.clone(),
411                DropTableTarget::Physical,
412            );
413            let mut ctx = DropDatabaseContext {
414                catalog: DEFAULT_CATALOG_NAME.to_string(),
415                schema: DEFAULT_SCHEMA_NAME.to_string(),
416                drop_if_exists: false,
417                tables: None,
418                retrying: false,
419            };
420            state.recover(&ddl_context).unwrap();
421            assert_eq!(state.dropping_regions.len(), 1);
422            let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
423            assert!(!status.need_persist());
424            let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
425            assert_eq!(cursor.target, DropTableTarget::Physical);
426        }
427    }
428
429    #[tokio::test]
430    async fn test_recover_registers_region_role_from_routes() {
431        let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
432        let ddl_context = new_ddl_context(node_manager);
433        let region_id = RegionId::new(1024, 1);
434        let mut state = DropDatabaseExecutor::new(
435            1024,
436            1024,
437            TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
438            vec![RegionRoute {
439                region: Region::new_test(region_id),
440                leader_peer: Some(Peer::empty(7)),
441                follower_peers: vec![],
442                leader_state: Some(LeaderState::Downgrading),
443                leader_down_since: None,
444                write_route_policy: None,
445            }],
446            DropTableTarget::Physical,
447        );
448
449        state.recover(&ddl_context).unwrap();
450
451        let roles = ddl_context
452            .memory_region_keeper
453            .extract_operating_region_roles(7, &HashSet::from([region_id]));
454        assert_eq!(roles.get(&region_id), Some(&RegionRole::DowngradingLeader));
455    }
456
457    #[tokio::test]
458    async fn test_next_remaps_addresses_when_retrying() {
459        let (tx, mut rx) = tokio::sync::mpsc::channel(8);
460        let node_manager = Arc::new(MockDatanodeManager::new(DatanodeWatcher::new(tx)));
461        let ddl_context = new_ddl_context(node_manager);
462        let physical_table_id = create_physical_table(&ddl_context, "phy").await;
463        let (_, table_route) = ddl_context
464            .table_metadata_manager
465            .table_route_manager()
466            .get_physical_table_route(physical_table_id)
467            .await
468            .unwrap();
469
470        let mut state = DropDatabaseExecutor::new(
471            physical_table_id,
472            physical_table_id,
473            TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
474            table_route.region_routes,
475            DropTableTarget::Physical,
476        );
477        state.physical_region_routes[0]
478            .leader_peer
479            .as_mut()
480            .unwrap()
481            .addr = "old-addr".to_string();
482        let mut ctx = DropDatabaseContext {
483            catalog: DEFAULT_CATALOG_NAME.to_string(),
484            schema: DEFAULT_SCHEMA_NAME.to_string(),
485            drop_if_exists: false,
486            tables: None,
487            retrying: true,
488        };
489
490        put_datanode_address(&ddl_context, 0, "new-addr").await;
491
492        state.next(&ddl_context, &mut ctx).await.unwrap();
493
494        let (peer, _) = rx.try_recv().unwrap();
495        assert_eq!(peer.addr, "new-addr");
496    }
497}