common_meta/ddl/
drop_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod executor;
16mod metadata;
17
18use std::collections::HashMap;
19
20use async_trait::async_trait;
21use common_error::ext::BoxedError;
22use common_procedure::error::{ExternalSnafu, FromJsonSnafu, ToJsonSnafu};
23use common_procedure::{
24    Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
25    Result as ProcedureResult, Status,
26};
27use common_telemetry::info;
28use common_telemetry::tracing::warn;
29use common_wal::options::WalOptions;
30use serde::{Deserialize, Serialize};
31use snafu::{OptionExt, ResultExt};
32use store_api::storage::RegionNumber;
33use strum::AsRefStr;
34use table::metadata::TableId;
35use table::table_reference::TableReference;
36
37use self::executor::DropTableExecutor;
38use crate::ddl::DdlContext;
39use crate::ddl::utils::map_to_procedure_error;
40use crate::error::{self, Result};
41use crate::key::table_route::TableRouteValue;
42use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
43use crate::metrics;
44use crate::region_keeper::OperatingRegionGuard;
45use crate::rpc::ddl::DropTableTask;
46use crate::rpc::router::{RegionRoute, operating_leader_regions};
47
48pub struct DropTableProcedure {
49    /// The context of procedure runtime.
50    pub context: DdlContext,
51    /// The serializable data.
52    pub data: DropTableData,
53    /// The guards of opening regions.
54    pub(crate) dropping_regions: Vec<OperatingRegionGuard>,
55    /// The drop table executor.
56    executor: DropTableExecutor,
57}
58
59impl DropTableProcedure {
60    pub const TYPE_NAME: &'static str = "metasrv-procedure::DropTable";
61
62    pub fn new(task: DropTableTask, context: DdlContext) -> Self {
63        let data = DropTableData::new(task);
64        let executor = data.build_executor();
65        Self {
66            context,
67            data,
68            dropping_regions: vec![],
69            executor,
70        }
71    }
72
73    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
74        let data: DropTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
75        let executor = data.build_executor();
76
77        Ok(Self {
78            context,
79            data,
80            dropping_regions: vec![],
81            executor,
82        })
83    }
84
85    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
86        if self.executor.on_prepare(&self.context).await?.stop() {
87            return Ok(Status::done());
88        }
89        self.fill_table_metadata().await?;
90        self.data.state = DropTableState::DeleteMetadata;
91
92        Ok(Status::executing(true))
93    }
94
95    /// Register dropping regions if doesn't exist.
96    fn register_dropping_regions(&mut self) -> Result<()> {
97        let dropping_regions = operating_leader_regions(&self.data.physical_region_routes);
98
99        if !self.dropping_regions.is_empty() {
100            return Ok(());
101        }
102
103        let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
104
105        for (region_id, datanode_id) in dropping_regions {
106            let guard = self
107                .context
108                .memory_region_keeper
109                .register(datanode_id, region_id)
110                .context(error::RegionOperatingRaceSnafu {
111                    region_id,
112                    peer_id: datanode_id,
113                })?;
114            dropping_region_guards.push(guard);
115        }
116
117        self.dropping_regions = dropping_region_guards;
118        Ok(())
119    }
120
121    /// Removes the table metadata.
122    pub(crate) async fn on_delete_metadata(&mut self) -> Result<Status> {
123        self.register_dropping_regions()?;
124        // NOTES: If the meta server is crashed after the `RemoveMetadata`,
125        // Corresponding regions of this table on the Datanode will be closed automatically.
126        // Then any future dropping operation will fail.
127
128        // TODO(weny): Considers introducing a RegionStatus to indicate the region is dropping.
129        let table_id = self.data.table_id();
130        let table_route_value = &TableRouteValue::new(
131            self.data.task.table_id,
132            // Safety: checked
133            self.data.physical_table_id.unwrap(),
134            self.data.physical_region_routes.clone(),
135        );
136        // Deletes table metadata logically.
137        self.executor
138            .on_delete_metadata(
139                &self.context,
140                table_route_value,
141                &self.data.region_wal_options,
142            )
143            .await?;
144        info!("Deleted table metadata for table {table_id}");
145        self.data.state = DropTableState::InvalidateTableCache;
146        Ok(Status::executing(true))
147    }
148
149    /// Broadcasts invalidate table cache instruction.
150    async fn on_broadcast(&mut self) -> Result<Status> {
151        self.executor.invalidate_table_cache(&self.context).await?;
152        self.data.state = DropTableState::DatanodeDropRegions;
153
154        Ok(Status::executing(true))
155    }
156
157    pub async fn on_datanode_drop_regions(&mut self) -> Result<Status> {
158        self.executor
159            .on_drop_regions(
160                &self.context.node_manager,
161                &self.context.leader_region_registry,
162                &self.data.physical_region_routes,
163                false,
164                false,
165                false,
166            )
167            .await?;
168        self.data.state = DropTableState::DeleteTombstone;
169        Ok(Status::executing(true))
170    }
171
172    /// Deletes metadata tombstone.
173    async fn on_delete_metadata_tombstone(&mut self) -> Result<Status> {
174        let table_route_value = &TableRouteValue::new(
175            self.data.task.table_id,
176            // Safety: checked
177            self.data.physical_table_id.unwrap(),
178            self.data.physical_region_routes.clone(),
179        );
180        self.executor
181            .on_delete_metadata_tombstone(
182                &self.context,
183                table_route_value,
184                &self.data.region_wal_options,
185            )
186            .await?;
187
188        self.dropping_regions.clear();
189        Ok(Status::done())
190    }
191}
192
193#[async_trait]
194impl Procedure for DropTableProcedure {
195    fn type_name(&self) -> &str {
196        Self::TYPE_NAME
197    }
198
199    fn recover(&mut self) -> ProcedureResult<()> {
200        // Only registers regions if the metadata is deleted.
201        let register_operating_regions = matches!(
202            self.data.state,
203            DropTableState::DeleteMetadata
204                | DropTableState::InvalidateTableCache
205                | DropTableState::DatanodeDropRegions
206        );
207        if register_operating_regions {
208            self.register_dropping_regions()
209                .map_err(BoxedError::new)
210                .context(ExternalSnafu {
211                    clean_poisons: false,
212                })?;
213        }
214
215        Ok(())
216    }
217
218    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
219        let state = &self.data.state;
220        let _timer = metrics::METRIC_META_PROCEDURE_DROP_TABLE
221            .with_label_values(&[state.as_ref()])
222            .start_timer();
223
224        match self.data.state {
225            DropTableState::Prepare => self.on_prepare().await,
226            DropTableState::DeleteMetadata => self.on_delete_metadata().await,
227            DropTableState::InvalidateTableCache => self.on_broadcast().await,
228            DropTableState::DatanodeDropRegions => self.on_datanode_drop_regions().await,
229            DropTableState::DeleteTombstone => self.on_delete_metadata_tombstone().await,
230        }
231        .map_err(map_to_procedure_error)
232    }
233
234    fn dump(&self) -> ProcedureResult<String> {
235        serde_json::to_string(&self.data).context(ToJsonSnafu)
236    }
237
238    fn lock_key(&self) -> LockKey {
239        let table_ref = &self.data.table_ref();
240        let table_id = self.data.table_id();
241        let lock_key = vec![
242            CatalogLock::Read(table_ref.catalog).into(),
243            SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
244            TableLock::Write(table_id).into(),
245        ];
246
247        LockKey::new(lock_key)
248    }
249
250    fn rollback_supported(&self) -> bool {
251        !matches!(self.data.state, DropTableState::Prepare) && self.data.allow_rollback
252    }
253
254    async fn rollback(&mut self, _: &ProcedureContext) -> ProcedureResult<()> {
255        warn!(
256            "Rolling back the drop table procedure, table: {}",
257            self.data.table_id()
258        );
259
260        let table_route_value = &TableRouteValue::new(
261            self.data.task.table_id,
262            // Safety: checked
263            self.data.physical_table_id.unwrap(),
264            self.data.physical_region_routes.clone(),
265        );
266        self.executor
267            .on_restore_metadata(
268                &self.context,
269                table_route_value,
270                &self.data.region_wal_options,
271            )
272            .await
273            .map_err(ProcedureError::external)
274    }
275}
276
277#[derive(Debug, Serialize, Deserialize)]
278pub struct DropTableData {
279    pub state: DropTableState,
280    pub task: DropTableTask,
281    pub physical_region_routes: Vec<RegionRoute>,
282    pub physical_table_id: Option<TableId>,
283    #[serde(default)]
284    pub region_wal_options: HashMap<RegionNumber, WalOptions>,
285    #[serde(default)]
286    pub allow_rollback: bool,
287}
288
289impl DropTableData {
290    pub fn new(task: DropTableTask) -> Self {
291        Self {
292            state: DropTableState::Prepare,
293            task,
294            physical_region_routes: vec![],
295            physical_table_id: None,
296            region_wal_options: HashMap::new(),
297            allow_rollback: false,
298        }
299    }
300
301    fn table_ref(&self) -> TableReference<'_> {
302        self.task.table_ref()
303    }
304
305    fn table_id(&self) -> TableId {
306        self.task.table_id
307    }
308
309    fn build_executor(&self) -> DropTableExecutor {
310        DropTableExecutor::new(
311            self.task.table_name(),
312            self.task.table_id,
313            self.task.drop_if_exists,
314        )
315    }
316}
317
318/// The state of drop table.
319#[derive(Debug, Serialize, Deserialize, AsRefStr, PartialEq)]
320pub enum DropTableState {
321    /// Prepares to drop the table
322    Prepare,
323    /// Deletes metadata logically
324    DeleteMetadata,
325    /// Invalidates Table Cache
326    InvalidateTableCache,
327    /// Drops regions on Datanode
328    DatanodeDropRegions,
329    /// Deletes metadata tombstone permanently
330    DeleteTombstone,
331}