Skip to main content

common_meta/ddl/
drop_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod executor;
16mod metadata;
17
18use std::collections::HashMap;
19
20use async_trait::async_trait;
21use common_error::ext::BoxedError;
22use common_procedure::error::{ExternalSnafu, FromJsonSnafu, ToJsonSnafu};
23use common_procedure::{
24    Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
25    Result as ProcedureResult, Status,
26};
27use common_telemetry::info;
28use common_telemetry::tracing::warn;
29use common_wal::options::WalOptions;
30use serde::{Deserialize, Serialize};
31use snafu::{OptionExt, ResultExt};
32use store_api::storage::RegionNumber;
33use strum::AsRefStr;
34use table::metadata::TableId;
35use table::table_reference::TableReference;
36
37use self::executor::DropTableExecutor;
38use crate::ddl::DdlContext;
39use crate::ddl::utils::map_to_procedure_error;
40use crate::error::{self, Result};
41use crate::key::table_route::TableRouteValue;
42use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
43use crate::metrics;
44use crate::region_keeper::OperatingRegionGuard;
45use crate::rpc::ddl::DropTableTask;
46use crate::rpc::router::{RegionRoute, operating_leader_region_roles};
47
48pub struct DropTableProcedure {
49    /// The context of procedure runtime.
50    pub context: DdlContext,
51    /// The serializable data.
52    pub data: DropTableData,
53    /// The guards of opening regions.
54    pub(crate) dropping_regions: Vec<OperatingRegionGuard>,
55    /// The drop table executor.
56    executor: DropTableExecutor,
57}
58
59impl DropTableProcedure {
60    pub const TYPE_NAME: &'static str = "metasrv-procedure::DropTable";
61
62    pub fn new(task: DropTableTask, context: DdlContext) -> Self {
63        let data = DropTableData::new(task);
64        let executor = data.build_executor();
65        Self {
66            context,
67            data,
68            dropping_regions: vec![],
69            executor,
70        }
71    }
72
73    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
74        let data: DropTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
75        let executor = data.build_executor();
76
77        Ok(Self {
78            context,
79            data,
80            dropping_regions: vec![],
81            executor,
82        })
83    }
84
85    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
86        if self.executor.on_prepare(&self.context).await?.stop() {
87            return Ok(Status::done());
88        }
89        self.fill_table_metadata().await?;
90        self.data.state = DropTableState::DeleteMetadata;
91
92        Ok(Status::executing(true))
93    }
94
95    /// Register dropping regions if doesn't exist.
96    fn register_dropping_regions(&mut self) -> Result<()> {
97        let dropping_regions = operating_leader_region_roles(&self.data.physical_region_routes);
98
99        if !self.dropping_regions.is_empty() {
100            return Ok(());
101        }
102
103        let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
104
105        for (region_id, datanode_id, role) in dropping_regions {
106            let guard = self
107                .context
108                .memory_region_keeper
109                .register_with_role(datanode_id, region_id, role)
110                .context(error::RegionOperatingRaceSnafu {
111                    region_id,
112                    peer_id: datanode_id,
113                })?;
114            dropping_region_guards.push(guard);
115        }
116
117        self.dropping_regions = dropping_region_guards;
118        Ok(())
119    }
120
121    /// Removes the table metadata.
122    pub(crate) async fn on_delete_metadata(&mut self) -> Result<Status> {
123        self.register_dropping_regions()?;
124        // NOTES: If the meta server is crashed after the `RemoveMetadata`,
125        // Corresponding regions of this table on the Datanode will be closed automatically.
126        // Then any future dropping operation will fail.
127
128        // TODO(weny): Considers introducing a RegionStatus to indicate the region is dropping.
129        let table_id = self.data.table_id();
130        let table_route_value = &TableRouteValue::new(
131            self.data.task.table_id,
132            // Safety: checked
133            self.data.physical_table_id.unwrap(),
134            self.data.physical_region_routes.clone(),
135        );
136        // Deletes table metadata logically.
137        self.executor
138            .on_delete_metadata(
139                &self.context,
140                table_route_value,
141                &self.data.region_wal_options,
142            )
143            .await?;
144        info!("Deleted table metadata for table {table_id}");
145        self.data.state = DropTableState::InvalidateTableCache;
146        Ok(Status::executing(true))
147    }
148
149    /// Broadcasts invalidate table cache instruction.
150    async fn on_broadcast(&mut self) -> Result<Status> {
151        self.executor.invalidate_table_cache(&self.context).await?;
152        self.data.state = DropTableState::DatanodeDropRegions;
153
154        Ok(Status::executing(true))
155    }
156
157    pub async fn on_datanode_drop_regions(&mut self, retrying: bool) -> Result<Status> {
158        if retrying {
159            info!(
160                "Remapping region routes addresses for retrying drop regions for table_id: {}",
161                self.data.table_id()
162            );
163            let storage = self
164                .context
165                .table_metadata_manager
166                .table_route_manager()
167                .table_route_storage();
168            // The peer addresses may change during retries,
169            // so we always remap the region routes.
170            storage
171                .remap_region_routes(&mut self.data.physical_region_routes)
172                .await?;
173        }
174
175        self.executor
176            .on_drop_regions(
177                &self.context.node_manager,
178                &self.context.leader_region_registry,
179                &self.data.physical_region_routes,
180                false,
181                false,
182                false,
183            )
184            .await?;
185        self.data.state = DropTableState::DeleteTombstone;
186        Ok(Status::executing(true))
187    }
188
189    /// Deletes metadata tombstone.
190    async fn on_delete_metadata_tombstone(&mut self) -> Result<Status> {
191        let table_route_value = &TableRouteValue::new(
192            self.data.task.table_id,
193            // Safety: checked
194            self.data.physical_table_id.unwrap(),
195            self.data.physical_region_routes.clone(),
196        );
197        self.executor
198            .on_delete_metadata_tombstone(
199                &self.context,
200                table_route_value,
201                &self.data.region_wal_options,
202            )
203            .await?;
204
205        self.dropping_regions.clear();
206        Ok(Status::done())
207    }
208}
209
210#[async_trait]
211impl Procedure for DropTableProcedure {
212    fn type_name(&self) -> &str {
213        Self::TYPE_NAME
214    }
215
216    fn recover(&mut self) -> ProcedureResult<()> {
217        // Only registers regions if the metadata is deleted.
218        let register_operating_regions = matches!(
219            self.data.state,
220            DropTableState::DeleteMetadata
221                | DropTableState::InvalidateTableCache
222                | DropTableState::DatanodeDropRegions
223        );
224        if register_operating_regions {
225            self.register_dropping_regions()
226                .map_err(BoxedError::new)
227                .context(ExternalSnafu {
228                    clean_poisons: false,
229                })?;
230        }
231
232        Ok(())
233    }
234
235    async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
236        let state = &self.data.state;
237        let _timer = metrics::METRIC_META_PROCEDURE_DROP_TABLE
238            .with_label_values(&[state.as_ref()])
239            .start_timer();
240
241        match self.data.state {
242            DropTableState::Prepare => self.on_prepare().await,
243            DropTableState::DeleteMetadata => self.on_delete_metadata().await,
244            DropTableState::InvalidateTableCache => self.on_broadcast().await,
245            DropTableState::DatanodeDropRegions => {
246                let retrying = ctx.is_retrying().await.unwrap_or(false);
247                self.on_datanode_drop_regions(retrying).await
248            }
249            DropTableState::DeleteTombstone => self.on_delete_metadata_tombstone().await,
250        }
251        .map_err(map_to_procedure_error)
252    }
253
254    fn dump(&self) -> ProcedureResult<String> {
255        serde_json::to_string(&self.data).context(ToJsonSnafu)
256    }
257
258    fn lock_key(&self) -> LockKey {
259        let table_ref = &self.data.table_ref();
260        let table_id = self.data.table_id();
261        let lock_key = vec![
262            CatalogLock::Read(table_ref.catalog).into(),
263            SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
264            TableLock::Write(table_id).into(),
265        ];
266
267        LockKey::new(lock_key)
268    }
269
270    fn rollback_supported(&self) -> bool {
271        !matches!(self.data.state, DropTableState::Prepare) && self.data.allow_rollback
272    }
273
274    async fn rollback(&mut self, _: &ProcedureContext) -> ProcedureResult<()> {
275        warn!(
276            "Rolling back the drop table procedure, table: {}",
277            self.data.table_id()
278        );
279
280        let table_route_value = &TableRouteValue::new(
281            self.data.task.table_id,
282            // Safety: checked
283            self.data.physical_table_id.unwrap(),
284            self.data.physical_region_routes.clone(),
285        );
286        self.executor
287            .on_restore_metadata(
288                &self.context,
289                table_route_value,
290                &self.data.region_wal_options,
291            )
292            .await
293            .map_err(ProcedureError::external)
294    }
295}
296
297#[derive(Debug, Serialize, Deserialize)]
298pub struct DropTableData {
299    pub state: DropTableState,
300    pub task: DropTableTask,
301    pub physical_region_routes: Vec<RegionRoute>,
302    pub physical_table_id: Option<TableId>,
303    #[serde(default)]
304    pub region_wal_options: HashMap<RegionNumber, WalOptions>,
305    #[serde(default)]
306    pub allow_rollback: bool,
307}
308
309impl DropTableData {
310    pub fn new(task: DropTableTask) -> Self {
311        Self {
312            state: DropTableState::Prepare,
313            task,
314            physical_region_routes: vec![],
315            physical_table_id: None,
316            region_wal_options: HashMap::new(),
317            allow_rollback: false,
318        }
319    }
320
321    fn table_ref(&self) -> TableReference<'_> {
322        self.task.table_ref()
323    }
324
325    fn table_id(&self) -> TableId {
326        self.task.table_id
327    }
328
329    fn build_executor(&self) -> DropTableExecutor {
330        DropTableExecutor::new(
331            self.task.table_name(),
332            self.task.table_id,
333            self.task.drop_if_exists,
334        )
335    }
336}
337
338/// The state of drop table.
339#[derive(Debug, Serialize, Deserialize, AsRefStr, PartialEq)]
340pub enum DropTableState {
341    /// Prepares to drop the table
342    Prepare,
343    /// Deletes metadata logically
344    DeleteMetadata,
345    /// Invalidates Table Cache
346    InvalidateTableCache,
347    /// Drops regions on Datanode
348    DatanodeDropRegions,
349    /// Deletes metadata tombstone permanently
350    DeleteTombstone,
351}