common_meta/ddl/
drop_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod executor;
16mod metadata;
17
18use std::collections::HashMap;
19
20use async_trait::async_trait;
21use common_error::ext::BoxedError;
22use common_procedure::error::{ExternalSnafu, FromJsonSnafu, ToJsonSnafu};
23use common_procedure::{
24    Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
25    Result as ProcedureResult, Status,
26};
27use common_telemetry::info;
28use common_telemetry::tracing::warn;
29use common_wal::options::WalOptions;
30use serde::{Deserialize, Serialize};
31use snafu::{OptionExt, ResultExt};
32use store_api::storage::RegionNumber;
33use strum::AsRefStr;
34use table::metadata::TableId;
35use table::table_reference::TableReference;
36
37use self::executor::DropTableExecutor;
38use crate::ddl::utils::handle_retry_error;
39use crate::ddl::DdlContext;
40use crate::error::{self, Result};
41use crate::key::table_route::TableRouteValue;
42use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
43use crate::metrics;
44use crate::region_keeper::OperatingRegionGuard;
45use crate::rpc::ddl::DropTableTask;
46use crate::rpc::router::{operating_leader_regions, RegionRoute};
47
48pub struct DropTableProcedure {
49    /// The context of procedure runtime.
50    pub context: DdlContext,
51    /// The serializable data.
52    pub data: DropTableData,
53    /// The guards of opening regions.
54    pub(crate) dropping_regions: Vec<OperatingRegionGuard>,
55    /// The drop table executor.
56    executor: DropTableExecutor,
57}
58
59impl DropTableProcedure {
60    pub const TYPE_NAME: &'static str = "metasrv-procedure::DropTable";
61
62    pub fn new(task: DropTableTask, context: DdlContext) -> Self {
63        let data = DropTableData::new(task);
64        let executor = data.build_executor();
65        Self {
66            context,
67            data,
68            dropping_regions: vec![],
69            executor,
70        }
71    }
72
73    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
74        let data: DropTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
75        let executor = data.build_executor();
76
77        Ok(Self {
78            context,
79            data,
80            dropping_regions: vec![],
81            executor,
82        })
83    }
84
85    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
86        if self.executor.on_prepare(&self.context).await?.stop() {
87            return Ok(Status::done());
88        }
89        self.fill_table_metadata().await?;
90        self.data.state = DropTableState::DeleteMetadata;
91
92        Ok(Status::executing(true))
93    }
94
95    /// Register dropping regions if doesn't exist.
96    fn register_dropping_regions(&mut self) -> Result<()> {
97        let dropping_regions = operating_leader_regions(&self.data.physical_region_routes);
98
99        if !self.dropping_regions.is_empty() {
100            return Ok(());
101        }
102
103        let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
104
105        for (region_id, datanode_id) in dropping_regions {
106            let guard = self
107                .context
108                .memory_region_keeper
109                .register(datanode_id, region_id)
110                .context(error::RegionOperatingRaceSnafu {
111                    region_id,
112                    peer_id: datanode_id,
113                })?;
114            dropping_region_guards.push(guard);
115        }
116
117        self.dropping_regions = dropping_region_guards;
118        Ok(())
119    }
120
121    /// Removes the table metadata.
122    pub(crate) async fn on_delete_metadata(&mut self) -> Result<Status> {
123        self.register_dropping_regions()?;
124        // NOTES: If the meta server is crashed after the `RemoveMetadata`,
125        // Corresponding regions of this table on the Datanode will be closed automatically.
126        // Then any future dropping operation will fail.
127
128        // TODO(weny): Considers introducing a RegionStatus to indicate the region is dropping.
129        let table_id = self.data.table_id();
130        let table_route_value = &TableRouteValue::new(
131            self.data.task.table_id,
132            // Safety: checked
133            self.data.physical_table_id.unwrap(),
134            self.data.physical_region_routes.clone(),
135        );
136        // Deletes table metadata logically.
137        self.executor
138            .on_delete_metadata(
139                &self.context,
140                table_route_value,
141                &self.data.region_wal_options,
142            )
143            .await?;
144        info!("Deleted table metadata for table {table_id}");
145        self.data.state = DropTableState::InvalidateTableCache;
146        Ok(Status::executing(true))
147    }
148
149    /// Broadcasts invalidate table cache instruction.
150    async fn on_broadcast(&mut self) -> Result<Status> {
151        self.executor.invalidate_table_cache(&self.context).await?;
152        self.data.state = DropTableState::DatanodeDropRegions;
153
154        Ok(Status::executing(true))
155    }
156
157    pub async fn on_datanode_drop_regions(&mut self) -> Result<Status> {
158        self.executor
159            .on_drop_regions(&self.context, &self.data.physical_region_routes, false)
160            .await?;
161        self.data.state = DropTableState::DeleteTombstone;
162        Ok(Status::executing(true))
163    }
164
165    /// Deletes metadata tombstone.
166    async fn on_delete_metadata_tombstone(&mut self) -> Result<Status> {
167        let table_route_value = &TableRouteValue::new(
168            self.data.task.table_id,
169            // Safety: checked
170            self.data.physical_table_id.unwrap(),
171            self.data.physical_region_routes.clone(),
172        );
173        self.executor
174            .on_delete_metadata_tombstone(
175                &self.context,
176                table_route_value,
177                &self.data.region_wal_options,
178            )
179            .await?;
180
181        self.dropping_regions.clear();
182        Ok(Status::done())
183    }
184}
185
186#[async_trait]
187impl Procedure for DropTableProcedure {
188    fn type_name(&self) -> &str {
189        Self::TYPE_NAME
190    }
191
192    fn recover(&mut self) -> ProcedureResult<()> {
193        // Only registers regions if the metadata is deleted.
194        let register_operating_regions = matches!(
195            self.data.state,
196            DropTableState::DeleteMetadata
197                | DropTableState::InvalidateTableCache
198                | DropTableState::DatanodeDropRegions
199        );
200        if register_operating_regions {
201            self.register_dropping_regions()
202                .map_err(BoxedError::new)
203                .context(ExternalSnafu {
204                    clean_poisons: false,
205                })?;
206        }
207
208        Ok(())
209    }
210
211    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
212        let state = &self.data.state;
213        let _timer = metrics::METRIC_META_PROCEDURE_DROP_TABLE
214            .with_label_values(&[state.as_ref()])
215            .start_timer();
216
217        match self.data.state {
218            DropTableState::Prepare => self.on_prepare().await,
219            DropTableState::DeleteMetadata => self.on_delete_metadata().await,
220            DropTableState::InvalidateTableCache => self.on_broadcast().await,
221            DropTableState::DatanodeDropRegions => self.on_datanode_drop_regions().await,
222            DropTableState::DeleteTombstone => self.on_delete_metadata_tombstone().await,
223        }
224        .map_err(handle_retry_error)
225    }
226
227    fn dump(&self) -> ProcedureResult<String> {
228        serde_json::to_string(&self.data).context(ToJsonSnafu)
229    }
230
231    fn lock_key(&self) -> LockKey {
232        let table_ref = &self.data.table_ref();
233        let table_id = self.data.table_id();
234        let lock_key = vec![
235            CatalogLock::Read(table_ref.catalog).into(),
236            SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
237            TableLock::Write(table_id).into(),
238        ];
239
240        LockKey::new(lock_key)
241    }
242
243    fn rollback_supported(&self) -> bool {
244        !matches!(self.data.state, DropTableState::Prepare) && self.data.allow_rollback
245    }
246
247    async fn rollback(&mut self, _: &ProcedureContext) -> ProcedureResult<()> {
248        warn!(
249            "Rolling back the drop table procedure, table: {}",
250            self.data.table_id()
251        );
252
253        let table_route_value = &TableRouteValue::new(
254            self.data.task.table_id,
255            // Safety: checked
256            self.data.physical_table_id.unwrap(),
257            self.data.physical_region_routes.clone(),
258        );
259        self.executor
260            .on_restore_metadata(
261                &self.context,
262                table_route_value,
263                &self.data.region_wal_options,
264            )
265            .await
266            .map_err(ProcedureError::external)
267    }
268}
269
270#[derive(Debug, Serialize, Deserialize)]
271pub struct DropTableData {
272    pub state: DropTableState,
273    pub task: DropTableTask,
274    pub physical_region_routes: Vec<RegionRoute>,
275    pub physical_table_id: Option<TableId>,
276    #[serde(default)]
277    pub region_wal_options: HashMap<RegionNumber, WalOptions>,
278    #[serde(default)]
279    pub allow_rollback: bool,
280}
281
282impl DropTableData {
283    pub fn new(task: DropTableTask) -> Self {
284        Self {
285            state: DropTableState::Prepare,
286            task,
287            physical_region_routes: vec![],
288            physical_table_id: None,
289            region_wal_options: HashMap::new(),
290            allow_rollback: false,
291        }
292    }
293
294    fn table_ref(&self) -> TableReference {
295        self.task.table_ref()
296    }
297
298    fn table_id(&self) -> TableId {
299        self.task.table_id
300    }
301
302    fn build_executor(&self) -> DropTableExecutor {
303        DropTableExecutor::new(
304            self.task.table_name(),
305            self.task.table_id,
306            self.task.drop_if_exists,
307        )
308    }
309}
310
311/// The state of drop table.
312#[derive(Debug, Serialize, Deserialize, AsRefStr, PartialEq)]
313pub enum DropTableState {
314    /// Prepares to drop the table
315    Prepare,
316    /// Deletes metadata logically
317    DeleteMetadata,
318    /// Invalidates Table Cache
319    InvalidateTableCache,
320    /// Drops regions on Datanode
321    DatanodeDropRegions,
322    /// Deletes metadata tombstone permanently
323    DeleteTombstone,
324}