common_meta/ddl/drop_table/
executor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::v1::region::{
18    region_request, CloseRequest as PbCloseRegionRequest, DropRequest as PbDropRegionRequest,
19    RegionRequest, RegionRequestHeader,
20};
21use common_error::ext::ErrorExt;
22use common_error::status_code::StatusCode;
23use common_telemetry::tracing_context::TracingContext;
24use common_telemetry::{debug, error};
25use common_wal::options::WalOptions;
26use futures::future::join_all;
27use snafu::ensure;
28use store_api::storage::{RegionId, RegionNumber};
29use table::metadata::TableId;
30use table::table_name::TableName;
31
32use crate::cache_invalidator::Context;
33use crate::ddl::utils::{add_peer_context_if_needed, convert_region_routes_to_detecting_regions};
34use crate::ddl::DdlContext;
35use crate::error::{self, Result};
36use crate::instruction::CacheIdent;
37use crate::key::table_name::TableNameKey;
38use crate::key::table_route::TableRouteValue;
39use crate::rpc::router::{
40    find_follower_regions, find_followers, find_leader_regions, find_leaders,
41    operating_leader_regions, RegionRoute,
42};
43
44/// [Control] indicated to the caller whether to go to the next step.
45#[derive(Debug)]
46pub enum Control<T> {
47    Continue(T),
48    Stop,
49}
50
51impl<T> Control<T> {
52    /// Returns true if it's [Control::Stop].
53    pub fn stop(&self) -> bool {
54        matches!(self, Control::Stop)
55    }
56}
57
58impl DropTableExecutor {
59    /// Returns the [DropTableExecutor].
60    pub fn new(table: TableName, table_id: TableId, drop_if_exists: bool) -> Self {
61        Self {
62            table,
63            table_id,
64            drop_if_exists,
65        }
66    }
67}
68
69/// [DropTableExecutor] performs:
70/// - Drops the metadata of the table.
71/// - Invalidates the cache on the Frontend nodes.
72/// - Drops the regions on the Datanode nodes.
73pub struct DropTableExecutor {
74    table: TableName,
75    table_id: TableId,
76    drop_if_exists: bool,
77}
78
79impl DropTableExecutor {
80    /// Checks whether table exists.
81    /// - Early returns if table not exists and `drop_if_exists` is `true`.
82    /// - Throws an error if table not exists and `drop_if_exists` is `false`.
83    pub async fn on_prepare(&self, ctx: &DdlContext) -> Result<Control<()>> {
84        let table_ref = self.table.table_ref();
85
86        let exist = ctx
87            .table_metadata_manager
88            .table_name_manager()
89            .exists(TableNameKey::new(
90                table_ref.catalog,
91                table_ref.schema,
92                table_ref.table,
93            ))
94            .await?;
95
96        if !exist && self.drop_if_exists {
97            return Ok(Control::Stop);
98        }
99
100        ensure!(
101            exist,
102            error::TableNotFoundSnafu {
103                table_name: table_ref.to_string()
104            }
105        );
106
107        Ok(Control::Continue(()))
108    }
109
110    /// Deletes the table metadata **logically**.
111    pub async fn on_delete_metadata(
112        &self,
113        ctx: &DdlContext,
114        table_route_value: &TableRouteValue,
115        region_wal_options: &HashMap<RegionNumber, WalOptions>,
116    ) -> Result<()> {
117        ctx.table_metadata_manager
118            .delete_table_metadata(
119                self.table_id,
120                &self.table,
121                table_route_value,
122                region_wal_options,
123            )
124            .await
125    }
126
127    /// Deletes the table metadata tombstone **permanently**.
128    pub async fn on_delete_metadata_tombstone(
129        &self,
130        ctx: &DdlContext,
131        table_route_value: &TableRouteValue,
132        region_wal_options: &HashMap<u32, WalOptions>,
133    ) -> Result<()> {
134        ctx.table_metadata_manager
135            .delete_table_metadata_tombstone(
136                self.table_id,
137                &self.table,
138                table_route_value,
139                region_wal_options,
140            )
141            .await
142    }
143
144    /// Deletes metadata for table **permanently**.
145    pub async fn on_destroy_metadata(
146        &self,
147        ctx: &DdlContext,
148        table_route_value: &TableRouteValue,
149        region_wal_options: &HashMap<u32, WalOptions>,
150    ) -> Result<()> {
151        ctx.table_metadata_manager
152            .destroy_table_metadata(
153                self.table_id,
154                &self.table,
155                table_route_value,
156                region_wal_options,
157            )
158            .await?;
159
160        let detecting_regions = if table_route_value.is_physical() {
161            // Safety: checked.
162            let regions = table_route_value.region_routes().unwrap();
163            convert_region_routes_to_detecting_regions(regions)
164        } else {
165            vec![]
166        };
167        ctx.deregister_failure_detectors(detecting_regions).await;
168        Ok(())
169    }
170
171    /// Restores the table metadata.
172    pub async fn on_restore_metadata(
173        &self,
174        ctx: &DdlContext,
175        table_route_value: &TableRouteValue,
176        region_wal_options: &HashMap<u32, WalOptions>,
177    ) -> Result<()> {
178        ctx.table_metadata_manager
179            .restore_table_metadata(
180                self.table_id,
181                &self.table,
182                table_route_value,
183                region_wal_options,
184            )
185            .await
186    }
187
188    /// Invalidates caches for the table.
189    pub async fn invalidate_table_cache(&self, ctx: &DdlContext) -> Result<()> {
190        let cache_invalidator = &ctx.cache_invalidator;
191        let ctx = Context {
192            subject: Some(format!(
193                "Invalidate table cache by dropping table {}, table_id: {}",
194                self.table.table_ref(),
195                self.table_id,
196            )),
197        };
198
199        cache_invalidator
200            .invalidate(
201                &ctx,
202                &[
203                    CacheIdent::TableName(self.table.table_ref().into()),
204                    CacheIdent::TableId(self.table_id),
205                ],
206            )
207            .await?;
208
209        Ok(())
210    }
211
212    /// Drops region on datanode.
213    pub async fn on_drop_regions(
214        &self,
215        ctx: &DdlContext,
216        region_routes: &[RegionRoute],
217        fast_path: bool,
218    ) -> Result<()> {
219        // Drops leader regions on datanodes.
220        let leaders = find_leaders(region_routes);
221        let mut drop_region_tasks = Vec::with_capacity(leaders.len());
222        let table_id = self.table_id;
223        for datanode in leaders {
224            let requester = ctx.node_manager.datanode(&datanode).await;
225            let regions = find_leader_regions(region_routes, &datanode);
226            let region_ids = regions
227                .iter()
228                .map(|region_number| RegionId::new(table_id, *region_number))
229                .collect::<Vec<_>>();
230
231            for region_id in region_ids {
232                debug!("Dropping region {region_id} on Datanode {datanode:?}");
233                let request = RegionRequest {
234                    header: Some(RegionRequestHeader {
235                        tracing_context: TracingContext::from_current_span().to_w3c(),
236                        ..Default::default()
237                    }),
238                    body: Some(region_request::Body::Drop(PbDropRegionRequest {
239                        region_id: region_id.as_u64(),
240                        fast_path,
241                    })),
242                };
243                let datanode = datanode.clone();
244                let requester = requester.clone();
245                drop_region_tasks.push(async move {
246                    if let Err(err) = requester.handle(request).await {
247                        if err.status_code() != StatusCode::RegionNotFound {
248                            return Err(add_peer_context_if_needed(datanode)(err));
249                        }
250                    }
251                    Ok(())
252                });
253            }
254        }
255
256        join_all(drop_region_tasks)
257            .await
258            .into_iter()
259            .collect::<Result<Vec<_>>>()?;
260
261        // Drops follower regions on datanodes.
262        let followers = find_followers(region_routes);
263        let mut close_region_tasks = Vec::with_capacity(followers.len());
264        for datanode in followers {
265            let requester = ctx.node_manager.datanode(&datanode).await;
266            let regions = find_follower_regions(region_routes, &datanode);
267            let region_ids = regions
268                .iter()
269                .map(|region_number| RegionId::new(table_id, *region_number))
270                .collect::<Vec<_>>();
271
272            for region_id in region_ids {
273                debug!("Closing region {region_id} on Datanode {datanode:?}");
274                let request = RegionRequest {
275                    header: Some(RegionRequestHeader {
276                        tracing_context: TracingContext::from_current_span().to_w3c(),
277                        ..Default::default()
278                    }),
279                    body: Some(region_request::Body::Close(PbCloseRegionRequest {
280                        region_id: region_id.as_u64(),
281                    })),
282                };
283
284                let datanode = datanode.clone();
285                let requester = requester.clone();
286                close_region_tasks.push(async move {
287                    if let Err(err) = requester.handle(request).await {
288                        if err.status_code() != StatusCode::RegionNotFound {
289                            return Err(add_peer_context_if_needed(datanode)(err));
290                        }
291                    }
292                    Ok(())
293                });
294            }
295        }
296
297        // Failure to close follower regions is not critical.
298        // When a leader region is dropped, follower regions will be unable to renew their leases via metasrv.
299        // Eventually, these follower regions will be automatically closed by the region livekeeper.
300        if let Err(err) = join_all(close_region_tasks)
301            .await
302            .into_iter()
303            .collect::<Result<Vec<_>>>()
304        {
305            error!(err; "Failed to close follower regions on datanodes, table_id: {}", table_id);
306        }
307
308        // Deletes the leader region from registry.
309        let region_ids = operating_leader_regions(region_routes);
310        ctx.leader_region_registry
311            .batch_delete(region_ids.into_iter().map(|(region_id, _)| region_id));
312
313        Ok(())
314    }
315}
316
317#[cfg(test)]
318mod tests {
319    use std::assert_matches::assert_matches;
320    use std::collections::HashMap;
321    use std::sync::Arc;
322
323    use api::v1::{ColumnDataType, SemanticType};
324    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
325    use table::metadata::RawTableInfo;
326    use table::table_name::TableName;
327
328    use super::*;
329    use crate::ddl::test_util::columns::TestColumnDefBuilder;
330    use crate::ddl::test_util::create_table::{
331        build_raw_table_info_from_expr, TestCreateTableExprBuilder,
332    };
333    use crate::key::table_route::TableRouteValue;
334    use crate::test_util::{new_ddl_context, MockDatanodeManager};
335
336    fn test_create_raw_table_info(name: &str) -> RawTableInfo {
337        let create_table = TestCreateTableExprBuilder::default()
338            .column_defs([
339                TestColumnDefBuilder::default()
340                    .name("ts")
341                    .data_type(ColumnDataType::TimestampMillisecond)
342                    .semantic_type(SemanticType::Timestamp)
343                    .build()
344                    .unwrap()
345                    .into(),
346                TestColumnDefBuilder::default()
347                    .name("host")
348                    .data_type(ColumnDataType::String)
349                    .semantic_type(SemanticType::Tag)
350                    .build()
351                    .unwrap()
352                    .into(),
353                TestColumnDefBuilder::default()
354                    .name("cpu")
355                    .data_type(ColumnDataType::Float64)
356                    .semantic_type(SemanticType::Field)
357                    .build()
358                    .unwrap()
359                    .into(),
360            ])
361            .time_index("ts")
362            .primary_keys(["host".into()])
363            .table_name(name)
364            .build()
365            .unwrap()
366            .into();
367        build_raw_table_info_from_expr(&create_table)
368    }
369
370    #[tokio::test]
371    async fn test_on_prepare() {
372        // Drops if exists
373        let node_manager = Arc::new(MockDatanodeManager::new(()));
374        let ctx = new_ddl_context(node_manager);
375        let executor = DropTableExecutor::new(
376            TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table"),
377            1024,
378            true,
379        );
380        let ctrl = executor.on_prepare(&ctx).await.unwrap();
381        assert!(ctrl.stop());
382
383        // Drops a non-exists table
384        let executor = DropTableExecutor::new(
385            TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table"),
386            1024,
387            false,
388        );
389        let err = executor.on_prepare(&ctx).await.unwrap_err();
390        assert_matches!(err, error::Error::TableNotFound { .. });
391
392        // Drops a exists table
393        let executor = DropTableExecutor::new(
394            TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table"),
395            1024,
396            false,
397        );
398        let raw_table_info = test_create_raw_table_info("my_table");
399        ctx.table_metadata_manager
400            .create_table_metadata(
401                raw_table_info,
402                TableRouteValue::physical(vec![]),
403                HashMap::new(),
404            )
405            .await
406            .unwrap();
407        let ctrl = executor.on_prepare(&ctx).await.unwrap();
408        assert!(!ctrl.stop());
409    }
410}