1pub mod executor;
16mod metadata;
17
18use std::collections::HashMap;
19
20use async_trait::async_trait;
21use common_error::ext::BoxedError;
22use common_procedure::error::{ExternalSnafu, FromJsonSnafu, ToJsonSnafu};
23use common_procedure::{
24 Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
25 Result as ProcedureResult, Status,
26};
27use common_telemetry::info;
28use common_telemetry::tracing::warn;
29use common_wal::options::WalOptions;
30use serde::{Deserialize, Serialize};
31use snafu::{OptionExt, ResultExt};
32use store_api::storage::RegionNumber;
33use strum::AsRefStr;
34use table::metadata::TableId;
35use table::table_reference::TableReference;
36
37use self::executor::DropTableExecutor;
38use crate::ddl::DdlContext;
39use crate::ddl::utils::map_to_procedure_error;
40use crate::error::{self, Result};
41use crate::key::table_route::TableRouteValue;
42use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
43use crate::metrics;
44use crate::region_keeper::OperatingRegionGuard;
45use crate::rpc::ddl::DropTableTask;
46use crate::rpc::router::{RegionRoute, operating_leader_regions};
47
48pub struct DropTableProcedure {
49 pub context: DdlContext,
51 pub data: DropTableData,
53 pub(crate) dropping_regions: Vec<OperatingRegionGuard>,
55 executor: DropTableExecutor,
57}
58
59impl DropTableProcedure {
60 pub const TYPE_NAME: &'static str = "metasrv-procedure::DropTable";
61
62 pub fn new(task: DropTableTask, context: DdlContext) -> Self {
63 let data = DropTableData::new(task);
64 let executor = data.build_executor();
65 Self {
66 context,
67 data,
68 dropping_regions: vec![],
69 executor,
70 }
71 }
72
73 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
74 let data: DropTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
75 let executor = data.build_executor();
76
77 Ok(Self {
78 context,
79 data,
80 dropping_regions: vec![],
81 executor,
82 })
83 }
84
85 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
86 if self.executor.on_prepare(&self.context).await?.stop() {
87 return Ok(Status::done());
88 }
89 self.fill_table_metadata().await?;
90 self.data.state = DropTableState::DeleteMetadata;
91
92 Ok(Status::executing(true))
93 }
94
95 fn register_dropping_regions(&mut self) -> Result<()> {
97 let dropping_regions = operating_leader_regions(&self.data.physical_region_routes);
98
99 if !self.dropping_regions.is_empty() {
100 return Ok(());
101 }
102
103 let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
104
105 for (region_id, datanode_id) in dropping_regions {
106 let guard = self
107 .context
108 .memory_region_keeper
109 .register(datanode_id, region_id)
110 .context(error::RegionOperatingRaceSnafu {
111 region_id,
112 peer_id: datanode_id,
113 })?;
114 dropping_region_guards.push(guard);
115 }
116
117 self.dropping_regions = dropping_region_guards;
118 Ok(())
119 }
120
121 pub(crate) async fn on_delete_metadata(&mut self) -> Result<Status> {
123 self.register_dropping_regions()?;
124 let table_id = self.data.table_id();
130 let table_route_value = &TableRouteValue::new(
131 self.data.task.table_id,
132 self.data.physical_table_id.unwrap(),
134 self.data.physical_region_routes.clone(),
135 );
136 self.executor
138 .on_delete_metadata(
139 &self.context,
140 table_route_value,
141 &self.data.region_wal_options,
142 )
143 .await?;
144 info!("Deleted table metadata for table {table_id}");
145 self.data.state = DropTableState::InvalidateTableCache;
146 Ok(Status::executing(true))
147 }
148
149 async fn on_broadcast(&mut self) -> Result<Status> {
151 self.executor.invalidate_table_cache(&self.context).await?;
152 self.data.state = DropTableState::DatanodeDropRegions;
153
154 Ok(Status::executing(true))
155 }
156
157 pub async fn on_datanode_drop_regions(&mut self) -> Result<Status> {
158 self.executor
159 .on_drop_regions(
160 &self.context.node_manager,
161 &self.context.leader_region_registry,
162 &self.data.physical_region_routes,
163 false,
164 false,
165 )
166 .await?;
167 self.data.state = DropTableState::DeleteTombstone;
168 Ok(Status::executing(true))
169 }
170
171 async fn on_delete_metadata_tombstone(&mut self) -> Result<Status> {
173 let table_route_value = &TableRouteValue::new(
174 self.data.task.table_id,
175 self.data.physical_table_id.unwrap(),
177 self.data.physical_region_routes.clone(),
178 );
179 self.executor
180 .on_delete_metadata_tombstone(
181 &self.context,
182 table_route_value,
183 &self.data.region_wal_options,
184 )
185 .await?;
186
187 self.dropping_regions.clear();
188 Ok(Status::done())
189 }
190}
191
192#[async_trait]
193impl Procedure for DropTableProcedure {
194 fn type_name(&self) -> &str {
195 Self::TYPE_NAME
196 }
197
198 fn recover(&mut self) -> ProcedureResult<()> {
199 let register_operating_regions = matches!(
201 self.data.state,
202 DropTableState::DeleteMetadata
203 | DropTableState::InvalidateTableCache
204 | DropTableState::DatanodeDropRegions
205 );
206 if register_operating_regions {
207 self.register_dropping_regions()
208 .map_err(BoxedError::new)
209 .context(ExternalSnafu {
210 clean_poisons: false,
211 })?;
212 }
213
214 Ok(())
215 }
216
217 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
218 let state = &self.data.state;
219 let _timer = metrics::METRIC_META_PROCEDURE_DROP_TABLE
220 .with_label_values(&[state.as_ref()])
221 .start_timer();
222
223 match self.data.state {
224 DropTableState::Prepare => self.on_prepare().await,
225 DropTableState::DeleteMetadata => self.on_delete_metadata().await,
226 DropTableState::InvalidateTableCache => self.on_broadcast().await,
227 DropTableState::DatanodeDropRegions => self.on_datanode_drop_regions().await,
228 DropTableState::DeleteTombstone => self.on_delete_metadata_tombstone().await,
229 }
230 .map_err(map_to_procedure_error)
231 }
232
233 fn dump(&self) -> ProcedureResult<String> {
234 serde_json::to_string(&self.data).context(ToJsonSnafu)
235 }
236
237 fn lock_key(&self) -> LockKey {
238 let table_ref = &self.data.table_ref();
239 let table_id = self.data.table_id();
240 let lock_key = vec![
241 CatalogLock::Read(table_ref.catalog).into(),
242 SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
243 TableLock::Write(table_id).into(),
244 ];
245
246 LockKey::new(lock_key)
247 }
248
249 fn rollback_supported(&self) -> bool {
250 !matches!(self.data.state, DropTableState::Prepare) && self.data.allow_rollback
251 }
252
253 async fn rollback(&mut self, _: &ProcedureContext) -> ProcedureResult<()> {
254 warn!(
255 "Rolling back the drop table procedure, table: {}",
256 self.data.table_id()
257 );
258
259 let table_route_value = &TableRouteValue::new(
260 self.data.task.table_id,
261 self.data.physical_table_id.unwrap(),
263 self.data.physical_region_routes.clone(),
264 );
265 self.executor
266 .on_restore_metadata(
267 &self.context,
268 table_route_value,
269 &self.data.region_wal_options,
270 )
271 .await
272 .map_err(ProcedureError::external)
273 }
274}
275
276#[derive(Debug, Serialize, Deserialize)]
277pub struct DropTableData {
278 pub state: DropTableState,
279 pub task: DropTableTask,
280 pub physical_region_routes: Vec<RegionRoute>,
281 pub physical_table_id: Option<TableId>,
282 #[serde(default)]
283 pub region_wal_options: HashMap<RegionNumber, WalOptions>,
284 #[serde(default)]
285 pub allow_rollback: bool,
286}
287
288impl DropTableData {
289 pub fn new(task: DropTableTask) -> Self {
290 Self {
291 state: DropTableState::Prepare,
292 task,
293 physical_region_routes: vec![],
294 physical_table_id: None,
295 region_wal_options: HashMap::new(),
296 allow_rollback: false,
297 }
298 }
299
300 fn table_ref(&self) -> TableReference<'_> {
301 self.task.table_ref()
302 }
303
304 fn table_id(&self) -> TableId {
305 self.task.table_id
306 }
307
308 fn build_executor(&self) -> DropTableExecutor {
309 DropTableExecutor::new(
310 self.task.table_name(),
311 self.task.table_id,
312 self.task.drop_if_exists,
313 )
314 }
315}
316
317#[derive(Debug, Serialize, Deserialize, AsRefStr, PartialEq)]
319pub enum DropTableState {
320 Prepare,
322 DeleteMetadata,
324 InvalidateTableCache,
326 DatanodeDropRegions,
328 DeleteTombstone,
330}