1pub mod executor;
16mod metadata;
17
18use std::collections::HashMap;
19
20use async_trait::async_trait;
21use common_error::ext::BoxedError;
22use common_procedure::error::{ExternalSnafu, FromJsonSnafu, ToJsonSnafu};
23use common_procedure::{
24 Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
25 Result as ProcedureResult, Status,
26};
27use common_telemetry::info;
28use common_telemetry::tracing::warn;
29use common_wal::options::WalOptions;
30use serde::{Deserialize, Serialize};
31use snafu::{OptionExt, ResultExt};
32use store_api::storage::RegionNumber;
33use strum::AsRefStr;
34use table::metadata::TableId;
35use table::table_reference::TableReference;
36
37use self::executor::DropTableExecutor;
38use crate::ddl::DdlContext;
39use crate::ddl::utils::map_to_procedure_error;
40use crate::error::{self, Result};
41use crate::key::table_route::TableRouteValue;
42use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
43use crate::metrics;
44use crate::region_keeper::OperatingRegionGuard;
45use crate::rpc::ddl::DropTableTask;
46use crate::rpc::router::{RegionRoute, operating_leader_regions};
47
48pub struct DropTableProcedure {
49 pub context: DdlContext,
51 pub data: DropTableData,
53 pub(crate) dropping_regions: Vec<OperatingRegionGuard>,
55 executor: DropTableExecutor,
57}
58
59impl DropTableProcedure {
60 pub const TYPE_NAME: &'static str = "metasrv-procedure::DropTable";
61
62 pub fn new(task: DropTableTask, context: DdlContext) -> Self {
63 let data = DropTableData::new(task);
64 let executor = data.build_executor();
65 Self {
66 context,
67 data,
68 dropping_regions: vec![],
69 executor,
70 }
71 }
72
73 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
74 let data: DropTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
75 let executor = data.build_executor();
76
77 Ok(Self {
78 context,
79 data,
80 dropping_regions: vec![],
81 executor,
82 })
83 }
84
85 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
86 if self.executor.on_prepare(&self.context).await?.stop() {
87 return Ok(Status::done());
88 }
89 self.fill_table_metadata().await?;
90 self.data.state = DropTableState::DeleteMetadata;
91
92 Ok(Status::executing(true))
93 }
94
95 fn register_dropping_regions(&mut self) -> Result<()> {
97 let dropping_regions = operating_leader_regions(&self.data.physical_region_routes);
98
99 if !self.dropping_regions.is_empty() {
100 return Ok(());
101 }
102
103 let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
104
105 for (region_id, datanode_id) in dropping_regions {
106 let guard = self
107 .context
108 .memory_region_keeper
109 .register(datanode_id, region_id)
110 .context(error::RegionOperatingRaceSnafu {
111 region_id,
112 peer_id: datanode_id,
113 })?;
114 dropping_region_guards.push(guard);
115 }
116
117 self.dropping_regions = dropping_region_guards;
118 Ok(())
119 }
120
121 pub(crate) async fn on_delete_metadata(&mut self) -> Result<Status> {
123 self.register_dropping_regions()?;
124 let table_id = self.data.table_id();
130 let table_route_value = &TableRouteValue::new(
131 self.data.task.table_id,
132 self.data.physical_table_id.unwrap(),
134 self.data.physical_region_routes.clone(),
135 );
136 self.executor
138 .on_delete_metadata(
139 &self.context,
140 table_route_value,
141 &self.data.region_wal_options,
142 )
143 .await?;
144 info!("Deleted table metadata for table {table_id}");
145 self.data.state = DropTableState::InvalidateTableCache;
146 Ok(Status::executing(true))
147 }
148
149 async fn on_broadcast(&mut self) -> Result<Status> {
151 self.executor.invalidate_table_cache(&self.context).await?;
152 self.data.state = DropTableState::DatanodeDropRegions;
153
154 Ok(Status::executing(true))
155 }
156
157 pub async fn on_datanode_drop_regions(&mut self) -> Result<Status> {
158 self.executor
159 .on_drop_regions(
160 &self.context.node_manager,
161 &self.context.leader_region_registry,
162 &self.data.physical_region_routes,
163 false,
164 false,
165 false,
166 )
167 .await?;
168 self.data.state = DropTableState::DeleteTombstone;
169 Ok(Status::executing(true))
170 }
171
172 async fn on_delete_metadata_tombstone(&mut self) -> Result<Status> {
174 let table_route_value = &TableRouteValue::new(
175 self.data.task.table_id,
176 self.data.physical_table_id.unwrap(),
178 self.data.physical_region_routes.clone(),
179 );
180 self.executor
181 .on_delete_metadata_tombstone(
182 &self.context,
183 table_route_value,
184 &self.data.region_wal_options,
185 )
186 .await?;
187
188 self.dropping_regions.clear();
189 Ok(Status::done())
190 }
191}
192
193#[async_trait]
194impl Procedure for DropTableProcedure {
195 fn type_name(&self) -> &str {
196 Self::TYPE_NAME
197 }
198
199 fn recover(&mut self) -> ProcedureResult<()> {
200 let register_operating_regions = matches!(
202 self.data.state,
203 DropTableState::DeleteMetadata
204 | DropTableState::InvalidateTableCache
205 | DropTableState::DatanodeDropRegions
206 );
207 if register_operating_regions {
208 self.register_dropping_regions()
209 .map_err(BoxedError::new)
210 .context(ExternalSnafu {
211 clean_poisons: false,
212 })?;
213 }
214
215 Ok(())
216 }
217
218 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
219 let state = &self.data.state;
220 let _timer = metrics::METRIC_META_PROCEDURE_DROP_TABLE
221 .with_label_values(&[state.as_ref()])
222 .start_timer();
223
224 match self.data.state {
225 DropTableState::Prepare => self.on_prepare().await,
226 DropTableState::DeleteMetadata => self.on_delete_metadata().await,
227 DropTableState::InvalidateTableCache => self.on_broadcast().await,
228 DropTableState::DatanodeDropRegions => self.on_datanode_drop_regions().await,
229 DropTableState::DeleteTombstone => self.on_delete_metadata_tombstone().await,
230 }
231 .map_err(map_to_procedure_error)
232 }
233
234 fn dump(&self) -> ProcedureResult<String> {
235 serde_json::to_string(&self.data).context(ToJsonSnafu)
236 }
237
238 fn lock_key(&self) -> LockKey {
239 let table_ref = &self.data.table_ref();
240 let table_id = self.data.table_id();
241 let lock_key = vec![
242 CatalogLock::Read(table_ref.catalog).into(),
243 SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
244 TableLock::Write(table_id).into(),
245 ];
246
247 LockKey::new(lock_key)
248 }
249
250 fn rollback_supported(&self) -> bool {
251 !matches!(self.data.state, DropTableState::Prepare) && self.data.allow_rollback
252 }
253
254 async fn rollback(&mut self, _: &ProcedureContext) -> ProcedureResult<()> {
255 warn!(
256 "Rolling back the drop table procedure, table: {}",
257 self.data.table_id()
258 );
259
260 let table_route_value = &TableRouteValue::new(
261 self.data.task.table_id,
262 self.data.physical_table_id.unwrap(),
264 self.data.physical_region_routes.clone(),
265 );
266 self.executor
267 .on_restore_metadata(
268 &self.context,
269 table_route_value,
270 &self.data.region_wal_options,
271 )
272 .await
273 .map_err(ProcedureError::external)
274 }
275}
276
277#[derive(Debug, Serialize, Deserialize)]
278pub struct DropTableData {
279 pub state: DropTableState,
280 pub task: DropTableTask,
281 pub physical_region_routes: Vec<RegionRoute>,
282 pub physical_table_id: Option<TableId>,
283 #[serde(default)]
284 pub region_wal_options: HashMap<RegionNumber, WalOptions>,
285 #[serde(default)]
286 pub allow_rollback: bool,
287}
288
289impl DropTableData {
290 pub fn new(task: DropTableTask) -> Self {
291 Self {
292 state: DropTableState::Prepare,
293 task,
294 physical_region_routes: vec![],
295 physical_table_id: None,
296 region_wal_options: HashMap::new(),
297 allow_rollback: false,
298 }
299 }
300
301 fn table_ref(&self) -> TableReference<'_> {
302 self.task.table_ref()
303 }
304
305 fn table_id(&self) -> TableId {
306 self.task.table_id
307 }
308
309 fn build_executor(&self) -> DropTableExecutor {
310 DropTableExecutor::new(
311 self.task.table_name(),
312 self.task.table_id,
313 self.task.drop_if_exists,
314 )
315 }
316}
317
318#[derive(Debug, Serialize, Deserialize, AsRefStr, PartialEq)]
320pub enum DropTableState {
321 Prepare,
323 DeleteMetadata,
325 InvalidateTableCache,
327 DatanodeDropRegions,
329 DeleteTombstone,
331}