1pub mod executor;
16mod metadata;
17
18use std::collections::HashMap;
19
20use async_trait::async_trait;
21use common_error::ext::BoxedError;
22use common_procedure::error::{ExternalSnafu, FromJsonSnafu, ToJsonSnafu};
23use common_procedure::{
24 Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
25 Result as ProcedureResult, Status,
26};
27use common_telemetry::info;
28use common_telemetry::tracing::warn;
29use common_wal::options::WalOptions;
30use serde::{Deserialize, Serialize};
31use snafu::{OptionExt, ResultExt};
32use store_api::storage::RegionNumber;
33use strum::AsRefStr;
34use table::metadata::TableId;
35use table::table_reference::TableReference;
36
37use self::executor::DropTableExecutor;
38use crate::ddl::DdlContext;
39use crate::ddl::utils::map_to_procedure_error;
40use crate::error::{self, Result};
41use crate::key::table_route::TableRouteValue;
42use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
43use crate::metrics;
44use crate::region_keeper::OperatingRegionGuard;
45use crate::rpc::ddl::DropTableTask;
46use crate::rpc::router::{RegionRoute, operating_leader_region_roles};
47
48pub struct DropTableProcedure {
49 pub context: DdlContext,
51 pub data: DropTableData,
53 pub(crate) dropping_regions: Vec<OperatingRegionGuard>,
55 executor: DropTableExecutor,
57}
58
59impl DropTableProcedure {
60 pub const TYPE_NAME: &'static str = "metasrv-procedure::DropTable";
61
62 pub fn new(task: DropTableTask, context: DdlContext) -> Self {
63 let data = DropTableData::new(task);
64 let executor = data.build_executor();
65 Self {
66 context,
67 data,
68 dropping_regions: vec![],
69 executor,
70 }
71 }
72
73 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
74 let data: DropTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
75 let executor = data.build_executor();
76
77 Ok(Self {
78 context,
79 data,
80 dropping_regions: vec![],
81 executor,
82 })
83 }
84
85 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
86 if self.executor.on_prepare(&self.context).await?.stop() {
87 return Ok(Status::done());
88 }
89 self.fill_table_metadata().await?;
90 self.data.state = DropTableState::DeleteMetadata;
91
92 Ok(Status::executing(true))
93 }
94
95 fn register_dropping_regions(&mut self) -> Result<()> {
97 let dropping_regions = operating_leader_region_roles(&self.data.physical_region_routes);
98
99 if !self.dropping_regions.is_empty() {
100 return Ok(());
101 }
102
103 let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
104
105 for (region_id, datanode_id, role) in dropping_regions {
106 let guard = self
107 .context
108 .memory_region_keeper
109 .register_with_role(datanode_id, region_id, role)
110 .context(error::RegionOperatingRaceSnafu {
111 region_id,
112 peer_id: datanode_id,
113 })?;
114 dropping_region_guards.push(guard);
115 }
116
117 self.dropping_regions = dropping_region_guards;
118 Ok(())
119 }
120
121 pub(crate) async fn on_delete_metadata(&mut self) -> Result<Status> {
123 self.register_dropping_regions()?;
124 let table_id = self.data.table_id();
130 let table_route_value = &TableRouteValue::new(
131 self.data.task.table_id,
132 self.data.physical_table_id.unwrap(),
134 self.data.physical_region_routes.clone(),
135 );
136 self.executor
138 .on_delete_metadata(
139 &self.context,
140 table_route_value,
141 &self.data.region_wal_options,
142 )
143 .await?;
144 info!("Deleted table metadata for table {table_id}");
145 self.data.state = DropTableState::InvalidateTableCache;
146 Ok(Status::executing(true))
147 }
148
149 async fn on_broadcast(&mut self) -> Result<Status> {
151 self.executor.invalidate_table_cache(&self.context).await?;
152 self.data.state = DropTableState::DatanodeDropRegions;
153
154 Ok(Status::executing(true))
155 }
156
157 pub async fn on_datanode_drop_regions(&mut self, retrying: bool) -> Result<Status> {
158 if retrying {
159 info!(
160 "Remapping region routes addresses for retrying drop regions for table_id: {}",
161 self.data.table_id()
162 );
163 let storage = self
164 .context
165 .table_metadata_manager
166 .table_route_manager()
167 .table_route_storage();
168 storage
171 .remap_region_routes(&mut self.data.physical_region_routes)
172 .await?;
173 }
174
175 self.executor
176 .on_drop_regions(
177 &self.context.node_manager,
178 &self.context.leader_region_registry,
179 &self.data.physical_region_routes,
180 false,
181 false,
182 false,
183 )
184 .await?;
185 self.data.state = DropTableState::DeleteTombstone;
186 Ok(Status::executing(true))
187 }
188
189 async fn on_delete_metadata_tombstone(&mut self) -> Result<Status> {
191 let table_route_value = &TableRouteValue::new(
192 self.data.task.table_id,
193 self.data.physical_table_id.unwrap(),
195 self.data.physical_region_routes.clone(),
196 );
197 self.executor
198 .on_delete_metadata_tombstone(
199 &self.context,
200 table_route_value,
201 &self.data.region_wal_options,
202 )
203 .await?;
204
205 self.dropping_regions.clear();
206 Ok(Status::done())
207 }
208}
209
210#[async_trait]
211impl Procedure for DropTableProcedure {
212 fn type_name(&self) -> &str {
213 Self::TYPE_NAME
214 }
215
216 fn recover(&mut self) -> ProcedureResult<()> {
217 let register_operating_regions = matches!(
219 self.data.state,
220 DropTableState::DeleteMetadata
221 | DropTableState::InvalidateTableCache
222 | DropTableState::DatanodeDropRegions
223 );
224 if register_operating_regions {
225 self.register_dropping_regions()
226 .map_err(BoxedError::new)
227 .context(ExternalSnafu {
228 clean_poisons: false,
229 })?;
230 }
231
232 Ok(())
233 }
234
235 async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
236 let state = &self.data.state;
237 let _timer = metrics::METRIC_META_PROCEDURE_DROP_TABLE
238 .with_label_values(&[state.as_ref()])
239 .start_timer();
240
241 match self.data.state {
242 DropTableState::Prepare => self.on_prepare().await,
243 DropTableState::DeleteMetadata => self.on_delete_metadata().await,
244 DropTableState::InvalidateTableCache => self.on_broadcast().await,
245 DropTableState::DatanodeDropRegions => {
246 let retrying = ctx.is_retrying().await.unwrap_or(false);
247 self.on_datanode_drop_regions(retrying).await
248 }
249 DropTableState::DeleteTombstone => self.on_delete_metadata_tombstone().await,
250 }
251 .map_err(map_to_procedure_error)
252 }
253
254 fn dump(&self) -> ProcedureResult<String> {
255 serde_json::to_string(&self.data).context(ToJsonSnafu)
256 }
257
258 fn lock_key(&self) -> LockKey {
259 let table_ref = &self.data.table_ref();
260 let table_id = self.data.table_id();
261 let lock_key = vec![
262 CatalogLock::Read(table_ref.catalog).into(),
263 SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
264 TableLock::Write(table_id).into(),
265 ];
266
267 LockKey::new(lock_key)
268 }
269
270 fn rollback_supported(&self) -> bool {
271 !matches!(self.data.state, DropTableState::Prepare) && self.data.allow_rollback
272 }
273
274 async fn rollback(&mut self, _: &ProcedureContext) -> ProcedureResult<()> {
275 warn!(
276 "Rolling back the drop table procedure, table: {}",
277 self.data.table_id()
278 );
279
280 let table_route_value = &TableRouteValue::new(
281 self.data.task.table_id,
282 self.data.physical_table_id.unwrap(),
284 self.data.physical_region_routes.clone(),
285 );
286 self.executor
287 .on_restore_metadata(
288 &self.context,
289 table_route_value,
290 &self.data.region_wal_options,
291 )
292 .await
293 .map_err(ProcedureError::external)
294 }
295}
296
297#[derive(Debug, Serialize, Deserialize)]
298pub struct DropTableData {
299 pub state: DropTableState,
300 pub task: DropTableTask,
301 pub physical_region_routes: Vec<RegionRoute>,
302 pub physical_table_id: Option<TableId>,
303 #[serde(default)]
304 pub region_wal_options: HashMap<RegionNumber, WalOptions>,
305 #[serde(default)]
306 pub allow_rollback: bool,
307}
308
309impl DropTableData {
310 pub fn new(task: DropTableTask) -> Self {
311 Self {
312 state: DropTableState::Prepare,
313 task,
314 physical_region_routes: vec![],
315 physical_table_id: None,
316 region_wal_options: HashMap::new(),
317 allow_rollback: false,
318 }
319 }
320
321 fn table_ref(&self) -> TableReference<'_> {
322 self.task.table_ref()
323 }
324
325 fn table_id(&self) -> TableId {
326 self.task.table_id
327 }
328
329 fn build_executor(&self) -> DropTableExecutor {
330 DropTableExecutor::new(
331 self.task.table_name(),
332 self.task.table_id,
333 self.task.drop_if_exists,
334 )
335 }
336}
337
338#[derive(Debug, Serialize, Deserialize, AsRefStr, PartialEq)]
340pub enum DropTableState {
341 Prepare,
343 DeleteMetadata,
345 InvalidateTableCache,
347 DatanodeDropRegions,
349 DeleteTombstone,
351}