1pub(crate) mod executor;
16mod metadata;
17
18use std::collections::HashMap;
19
20use async_trait::async_trait;
21use common_error::ext::BoxedError;
22use common_procedure::error::{ExternalSnafu, FromJsonSnafu, ToJsonSnafu};
23use common_procedure::{
24 Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
25 Result as ProcedureResult, Status,
26};
27use common_telemetry::info;
28use common_telemetry::tracing::warn;
29use common_wal::options::WalOptions;
30use serde::{Deserialize, Serialize};
31use snafu::{OptionExt, ResultExt};
32use store_api::storage::RegionNumber;
33use strum::AsRefStr;
34use table::metadata::TableId;
35use table::table_reference::TableReference;
36
37use self::executor::DropTableExecutor;
38use crate::ddl::utils::handle_retry_error;
39use crate::ddl::DdlContext;
40use crate::error::{self, Result};
41use crate::key::table_route::TableRouteValue;
42use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
43use crate::metrics;
44use crate::region_keeper::OperatingRegionGuard;
45use crate::rpc::ddl::DropTableTask;
46use crate::rpc::router::{operating_leader_regions, RegionRoute};
47
48pub struct DropTableProcedure {
49 pub context: DdlContext,
51 pub data: DropTableData,
53 pub(crate) dropping_regions: Vec<OperatingRegionGuard>,
55 executor: DropTableExecutor,
57}
58
59impl DropTableProcedure {
60 pub const TYPE_NAME: &'static str = "metasrv-procedure::DropTable";
61
62 pub fn new(task: DropTableTask, context: DdlContext) -> Self {
63 let data = DropTableData::new(task);
64 let executor = data.build_executor();
65 Self {
66 context,
67 data,
68 dropping_regions: vec![],
69 executor,
70 }
71 }
72
73 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
74 let data: DropTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
75 let executor = data.build_executor();
76
77 Ok(Self {
78 context,
79 data,
80 dropping_regions: vec![],
81 executor,
82 })
83 }
84
85 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
86 if self.executor.on_prepare(&self.context).await?.stop() {
87 return Ok(Status::done());
88 }
89 self.fill_table_metadata().await?;
90 self.data.state = DropTableState::DeleteMetadata;
91
92 Ok(Status::executing(true))
93 }
94
95 fn register_dropping_regions(&mut self) -> Result<()> {
97 let dropping_regions = operating_leader_regions(&self.data.physical_region_routes);
98
99 if !self.dropping_regions.is_empty() {
100 return Ok(());
101 }
102
103 let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
104
105 for (region_id, datanode_id) in dropping_regions {
106 let guard = self
107 .context
108 .memory_region_keeper
109 .register(datanode_id, region_id)
110 .context(error::RegionOperatingRaceSnafu {
111 region_id,
112 peer_id: datanode_id,
113 })?;
114 dropping_region_guards.push(guard);
115 }
116
117 self.dropping_regions = dropping_region_guards;
118 Ok(())
119 }
120
121 pub(crate) async fn on_delete_metadata(&mut self) -> Result<Status> {
123 self.register_dropping_regions()?;
124 let table_id = self.data.table_id();
130 let table_route_value = &TableRouteValue::new(
131 self.data.task.table_id,
132 self.data.physical_table_id.unwrap(),
134 self.data.physical_region_routes.clone(),
135 );
136 self.executor
138 .on_delete_metadata(
139 &self.context,
140 table_route_value,
141 &self.data.region_wal_options,
142 )
143 .await?;
144 info!("Deleted table metadata for table {table_id}");
145 self.data.state = DropTableState::InvalidateTableCache;
146 Ok(Status::executing(true))
147 }
148
149 async fn on_broadcast(&mut self) -> Result<Status> {
151 self.executor.invalidate_table_cache(&self.context).await?;
152 self.data.state = DropTableState::DatanodeDropRegions;
153
154 Ok(Status::executing(true))
155 }
156
157 pub async fn on_datanode_drop_regions(&mut self) -> Result<Status> {
158 self.executor
159 .on_drop_regions(&self.context, &self.data.physical_region_routes, false)
160 .await?;
161 self.data.state = DropTableState::DeleteTombstone;
162 Ok(Status::executing(true))
163 }
164
165 async fn on_delete_metadata_tombstone(&mut self) -> Result<Status> {
167 let table_route_value = &TableRouteValue::new(
168 self.data.task.table_id,
169 self.data.physical_table_id.unwrap(),
171 self.data.physical_region_routes.clone(),
172 );
173 self.executor
174 .on_delete_metadata_tombstone(
175 &self.context,
176 table_route_value,
177 &self.data.region_wal_options,
178 )
179 .await?;
180
181 self.dropping_regions.clear();
182 Ok(Status::done())
183 }
184}
185
186#[async_trait]
187impl Procedure for DropTableProcedure {
188 fn type_name(&self) -> &str {
189 Self::TYPE_NAME
190 }
191
192 fn recover(&mut self) -> ProcedureResult<()> {
193 let register_operating_regions = matches!(
195 self.data.state,
196 DropTableState::DeleteMetadata
197 | DropTableState::InvalidateTableCache
198 | DropTableState::DatanodeDropRegions
199 );
200 if register_operating_regions {
201 self.register_dropping_regions()
202 .map_err(BoxedError::new)
203 .context(ExternalSnafu {
204 clean_poisons: false,
205 })?;
206 }
207
208 Ok(())
209 }
210
211 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
212 let state = &self.data.state;
213 let _timer = metrics::METRIC_META_PROCEDURE_DROP_TABLE
214 .with_label_values(&[state.as_ref()])
215 .start_timer();
216
217 match self.data.state {
218 DropTableState::Prepare => self.on_prepare().await,
219 DropTableState::DeleteMetadata => self.on_delete_metadata().await,
220 DropTableState::InvalidateTableCache => self.on_broadcast().await,
221 DropTableState::DatanodeDropRegions => self.on_datanode_drop_regions().await,
222 DropTableState::DeleteTombstone => self.on_delete_metadata_tombstone().await,
223 }
224 .map_err(handle_retry_error)
225 }
226
227 fn dump(&self) -> ProcedureResult<String> {
228 serde_json::to_string(&self.data).context(ToJsonSnafu)
229 }
230
231 fn lock_key(&self) -> LockKey {
232 let table_ref = &self.data.table_ref();
233 let table_id = self.data.table_id();
234 let lock_key = vec![
235 CatalogLock::Read(table_ref.catalog).into(),
236 SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
237 TableLock::Write(table_id).into(),
238 ];
239
240 LockKey::new(lock_key)
241 }
242
243 fn rollback_supported(&self) -> bool {
244 !matches!(self.data.state, DropTableState::Prepare) && self.data.allow_rollback
245 }
246
247 async fn rollback(&mut self, _: &ProcedureContext) -> ProcedureResult<()> {
248 warn!(
249 "Rolling back the drop table procedure, table: {}",
250 self.data.table_id()
251 );
252
253 let table_route_value = &TableRouteValue::new(
254 self.data.task.table_id,
255 self.data.physical_table_id.unwrap(),
257 self.data.physical_region_routes.clone(),
258 );
259 self.executor
260 .on_restore_metadata(
261 &self.context,
262 table_route_value,
263 &self.data.region_wal_options,
264 )
265 .await
266 .map_err(ProcedureError::external)
267 }
268}
269
270#[derive(Debug, Serialize, Deserialize)]
271pub struct DropTableData {
272 pub state: DropTableState,
273 pub task: DropTableTask,
274 pub physical_region_routes: Vec<RegionRoute>,
275 pub physical_table_id: Option<TableId>,
276 #[serde(default)]
277 pub region_wal_options: HashMap<RegionNumber, WalOptions>,
278 #[serde(default)]
279 pub allow_rollback: bool,
280}
281
282impl DropTableData {
283 pub fn new(task: DropTableTask) -> Self {
284 Self {
285 state: DropTableState::Prepare,
286 task,
287 physical_region_routes: vec![],
288 physical_table_id: None,
289 region_wal_options: HashMap::new(),
290 allow_rollback: false,
291 }
292 }
293
294 fn table_ref(&self) -> TableReference {
295 self.task.table_ref()
296 }
297
298 fn table_id(&self) -> TableId {
299 self.task.table_id
300 }
301
302 fn build_executor(&self) -> DropTableExecutor {
303 DropTableExecutor::new(
304 self.task.table_name(),
305 self.task.table_id,
306 self.task.drop_if_exists,
307 )
308 }
309}
310
311#[derive(Debug, Serialize, Deserialize, AsRefStr, PartialEq)]
313pub enum DropTableState {
314 Prepare,
316 DeleteMetadata,
318 InvalidateTableCache,
320 DatanodeDropRegions,
322 DeleteTombstone,
324}