Skip to main content

common_meta/ddl/drop_table/
executor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::v1::region::{
18    CloseRequest as PbCloseRegionRequest, DropRequest as PbDropRegionRequest, RegionRequest,
19    RegionRequestHeader, region_request,
20};
21use common_error::ext::ErrorExt;
22use common_error::status_code::StatusCode;
23use common_telemetry::tracing_context::TracingContext;
24use common_telemetry::{debug, error};
25use common_wal::options::WalOptions;
26use futures::future::join_all;
27use snafu::ensure;
28use store_api::storage::{RegionId, RegionNumber};
29use table::metadata::TableId;
30use table::table_name::TableName;
31
32use crate::cache_invalidator::Context;
33use crate::ddl::DdlContext;
34use crate::ddl::utils::{add_peer_context_if_needed, convert_region_routes_to_detecting_regions};
35use crate::error::{self, Result};
36use crate::instruction::CacheIdent;
37use crate::key::table_name::TableNameKey;
38use crate::key::table_route::TableRouteValue;
39use crate::node_manager::NodeManagerRef;
40use crate::region_registry::LeaderRegionRegistryRef;
41use crate::rpc::router::{
42    RegionRoute, find_follower_regions, find_followers, find_leader_regions, find_leaders,
43    operating_leader_regions,
44};
45
46/// [Control] indicated to the caller whether to go to the next step.
47#[derive(Debug)]
48pub enum Control<T> {
49    Continue(T),
50    Stop,
51}
52
53impl<T> Control<T> {
54    /// Returns true if it's [Control::Stop].
55    pub fn stop(&self) -> bool {
56        matches!(self, Control::Stop)
57    }
58}
59
60impl DropTableExecutor {
61    /// Returns the [DropTableExecutor].
62    pub fn new(table: TableName, table_id: TableId, drop_if_exists: bool) -> Self {
63        Self {
64            table,
65            table_id,
66            drop_if_exists,
67        }
68    }
69}
70
71/// [DropTableExecutor] performs:
72/// - Drops the metadata of the table.
73/// - Invalidates the cache on the Frontend nodes.
74/// - Drops the regions on the Datanode nodes.
75pub struct DropTableExecutor {
76    table: TableName,
77    table_id: TableId,
78    drop_if_exists: bool,
79}
80
81impl DropTableExecutor {
82    /// Checks whether table exists.
83    /// - Early returns if table not exists and `drop_if_exists` is `true`.
84    /// - Throws an error if table not exists and `drop_if_exists` is `false`.
85    pub async fn on_prepare(&self, ctx: &DdlContext) -> Result<Control<()>> {
86        let table_ref = self.table.table_ref();
87
88        let exist = ctx
89            .table_metadata_manager
90            .table_name_manager()
91            .exists(TableNameKey::new(
92                table_ref.catalog,
93                table_ref.schema,
94                table_ref.table,
95            ))
96            .await?;
97
98        if !exist && self.drop_if_exists {
99            return Ok(Control::Stop);
100        }
101
102        ensure!(
103            exist,
104            error::TableNotFoundSnafu {
105                table_name: table_ref.to_string()
106            }
107        );
108
109        Ok(Control::Continue(()))
110    }
111
112    /// Deletes the table metadata **logically**.
113    pub async fn on_delete_metadata(
114        &self,
115        ctx: &DdlContext,
116        table_route_value: &TableRouteValue,
117        region_wal_options: &HashMap<RegionNumber, WalOptions>,
118    ) -> Result<()> {
119        ctx.table_metadata_manager
120            .delete_table_metadata(
121                self.table_id,
122                &self.table,
123                table_route_value,
124                region_wal_options,
125            )
126            .await
127    }
128
129    /// Deletes the table metadata tombstone **permanently**.
130    pub async fn on_delete_metadata_tombstone(
131        &self,
132        ctx: &DdlContext,
133        table_route_value: &TableRouteValue,
134        region_wal_options: &HashMap<u32, WalOptions>,
135    ) -> Result<()> {
136        ctx.table_metadata_manager
137            .delete_table_metadata_tombstone(
138                self.table_id,
139                &self.table,
140                table_route_value,
141                region_wal_options,
142            )
143            .await
144    }
145
146    /// Deletes metadata for table **permanently**.
147    pub async fn on_destroy_metadata(
148        &self,
149        ctx: &DdlContext,
150        table_route_value: &TableRouteValue,
151        region_wal_options: &HashMap<u32, WalOptions>,
152    ) -> Result<()> {
153        ctx.table_metadata_manager
154            .destroy_table_metadata(
155                self.table_id,
156                &self.table,
157                table_route_value,
158                region_wal_options,
159            )
160            .await?;
161
162        let detecting_regions = if table_route_value.is_physical() {
163            // Safety: checked.
164            let regions = table_route_value.region_routes().unwrap();
165            convert_region_routes_to_detecting_regions(regions)
166        } else {
167            vec![]
168        };
169        ctx.deregister_failure_detectors(detecting_regions).await;
170        Ok(())
171    }
172
173    /// Restores the table metadata.
174    pub async fn on_restore_metadata(
175        &self,
176        ctx: &DdlContext,
177        table_route_value: &TableRouteValue,
178        region_wal_options: &HashMap<u32, WalOptions>,
179    ) -> Result<()> {
180        ctx.table_metadata_manager
181            .restore_table_metadata(
182                self.table_id,
183                &self.table,
184                table_route_value,
185                region_wal_options,
186            )
187            .await
188    }
189
190    /// Invalidates caches for the table.
191    pub async fn invalidate_table_cache(&self, ctx: &DdlContext) -> Result<()> {
192        let cache_invalidator = &ctx.cache_invalidator;
193        let ctx = Context {
194            subject: Some(format!(
195                "Invalidate table cache by dropping table {}, table_id: {}",
196                self.table.table_ref(),
197                self.table_id,
198            )),
199        };
200
201        cache_invalidator
202            .invalidate(
203                &ctx,
204                &[
205                    CacheIdent::TableName(self.table.table_ref().into()),
206                    CacheIdent::TableId(self.table_id),
207                ],
208            )
209            .await?;
210
211        Ok(())
212    }
213
214    /// Drops region on datanode.
215    pub async fn on_drop_regions(
216        &self,
217        node_manager: &NodeManagerRef,
218        leader_region_registry: &LeaderRegionRegistryRef,
219        region_routes: &[RegionRoute],
220        fast_path: bool,
221        force: bool,
222        partial_drop: bool,
223    ) -> Result<()> {
224        // Drops leader regions on datanodes.
225        let leaders = find_leaders(region_routes);
226        let mut drop_region_tasks = Vec::with_capacity(leaders.len());
227        let table_id = self.table_id;
228        for datanode in leaders {
229            let requester = node_manager.datanode(&datanode).await;
230            let regions = find_leader_regions(region_routes, &datanode);
231            let region_ids = regions
232                .iter()
233                .map(|region_number| RegionId::new(table_id, *region_number))
234                .collect::<Vec<_>>();
235
236            for region_id in region_ids {
237                debug!("Dropping region {region_id} on Datanode {datanode:?}");
238                let request = RegionRequest {
239                    header: Some(RegionRequestHeader {
240                        tracing_context: TracingContext::from_current_span().to_w3c(),
241                        ..Default::default()
242                    }),
243                    body: Some(region_request::Body::Drop(PbDropRegionRequest {
244                        region_id: region_id.as_u64(),
245                        fast_path,
246                        force,
247                        partial_drop,
248                        soft_drop: false,
249                    })),
250                };
251                let datanode = datanode.clone();
252                let requester = requester.clone();
253                drop_region_tasks.push(async move {
254                    if let Err(err) = requester.handle(request).await
255                        && err.status_code() != StatusCode::RegionNotFound
256                    {
257                        return Err(add_peer_context_if_needed(datanode)(err));
258                    }
259                    Ok(())
260                });
261            }
262        }
263
264        join_all(drop_region_tasks)
265            .await
266            .into_iter()
267            .collect::<Result<Vec<_>>>()?;
268
269        // Drops follower regions on datanodes.
270        let followers = find_followers(region_routes);
271        let mut close_region_tasks = Vec::with_capacity(followers.len());
272        for datanode in followers {
273            let requester = node_manager.datanode(&datanode).await;
274            let regions = find_follower_regions(region_routes, &datanode);
275            let region_ids = regions
276                .iter()
277                .map(|region_number| RegionId::new(table_id, *region_number))
278                .collect::<Vec<_>>();
279
280            for region_id in region_ids {
281                debug!("Closing region {region_id} on Datanode {datanode:?}");
282                let request = RegionRequest {
283                    header: Some(RegionRequestHeader {
284                        tracing_context: TracingContext::from_current_span().to_w3c(),
285                        ..Default::default()
286                    }),
287                    body: Some(region_request::Body::Close(PbCloseRegionRequest {
288                        region_id: region_id.as_u64(),
289                    })),
290                };
291
292                let datanode = datanode.clone();
293                let requester = requester.clone();
294                close_region_tasks.push(async move {
295                    if let Err(err) = requester.handle(request).await
296                        && err.status_code() != StatusCode::RegionNotFound
297                    {
298                        return Err(add_peer_context_if_needed(datanode)(err));
299                    }
300                    Ok(())
301                });
302            }
303        }
304
305        // Failure to close follower regions is not critical.
306        // When a leader region is dropped, follower regions will be unable to renew their leases via metasrv.
307        // Eventually, these follower regions will be automatically closed by the region livekeeper.
308        if let Err(err) = join_all(close_region_tasks)
309            .await
310            .into_iter()
311            .collect::<Result<Vec<_>>>()
312        {
313            error!(err; "Failed to close follower regions on datanodes, table_id: {}", table_id);
314        }
315
316        // Deletes the leader region from registry.
317        let region_ids = operating_leader_regions(region_routes);
318        leader_region_registry.batch_delete(region_ids.into_iter().map(|(region_id, _)| region_id));
319
320        Ok(())
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use std::assert_matches;
327    use std::collections::HashMap;
328    use std::sync::Arc;
329
330    use api::v1::{ColumnDataType, SemanticType};
331    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
332    use table::metadata::TableInfo;
333    use table::table_name::TableName;
334
335    use super::*;
336    use crate::ddl::test_util::columns::TestColumnDefBuilder;
337    use crate::ddl::test_util::create_table::{
338        TestCreateTableExprBuilder, build_raw_table_info_from_expr,
339    };
340    use crate::key::table_route::TableRouteValue;
341    use crate::test_util::{MockDatanodeManager, new_ddl_context};
342
343    fn test_create_raw_table_info(name: &str) -> TableInfo {
344        let create_table = TestCreateTableExprBuilder::default()
345            .column_defs([
346                TestColumnDefBuilder::default()
347                    .name("ts")
348                    .data_type(ColumnDataType::TimestampMillisecond)
349                    .semantic_type(SemanticType::Timestamp)
350                    .build()
351                    .unwrap()
352                    .into(),
353                TestColumnDefBuilder::default()
354                    .name("host")
355                    .data_type(ColumnDataType::String)
356                    .semantic_type(SemanticType::Tag)
357                    .build()
358                    .unwrap()
359                    .into(),
360                TestColumnDefBuilder::default()
361                    .name("cpu")
362                    .data_type(ColumnDataType::Float64)
363                    .semantic_type(SemanticType::Field)
364                    .build()
365                    .unwrap()
366                    .into(),
367            ])
368            .time_index("ts")
369            .primary_keys(["host".into()])
370            .table_name(name)
371            .build()
372            .unwrap()
373            .into();
374        build_raw_table_info_from_expr(&create_table)
375    }
376
377    #[tokio::test]
378    async fn test_on_prepare() {
379        // Drops if exists
380        let node_manager = Arc::new(MockDatanodeManager::new(()));
381        let ctx = new_ddl_context(node_manager);
382        let executor = DropTableExecutor::new(
383            TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table"),
384            1024,
385            true,
386        );
387        let ctrl = executor.on_prepare(&ctx).await.unwrap();
388        assert!(ctrl.stop());
389
390        // Drops a non-exists table
391        let executor = DropTableExecutor::new(
392            TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table"),
393            1024,
394            false,
395        );
396        let err = executor.on_prepare(&ctx).await.unwrap_err();
397        assert_matches!(err, error::Error::TableNotFound { .. });
398
399        // Drops a exists table
400        let executor = DropTableExecutor::new(
401            TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table"),
402            1024,
403            false,
404        );
405        let raw_table_info = test_create_raw_table_info("my_table");
406        ctx.table_metadata_manager
407            .create_table_metadata(
408                raw_table_info,
409                TableRouteValue::physical(vec![]),
410                HashMap::new(),
411            )
412            .await
413            .unwrap();
414        let ctrl = executor.on_prepare(&ctx).await.unwrap();
415        assert!(!ctrl.stop());
416    }
417}