1mod executor;
16mod metadata;
17mod region_request;
18
19use std::vec;
20
21use api::region::RegionResponse;
22use api::v1::RenameTable;
23use api::v1::alter_table_expr::Kind;
24use async_trait::async_trait;
25use common_error::ext::BoxedError;
26use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
27use common_procedure::{
28 Context as ProcedureContext, ContextProvider, Error as ProcedureError, LockKey, PoisonKey,
29 PoisonKeys, Procedure, ProcedureId, Status, StringKey,
30};
31use common_telemetry::{error, info, warn};
32use serde::{Deserialize, Serialize};
33use snafu::{ResultExt, ensure};
34use store_api::metadata::ColumnMetadata;
35use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
36use strum::AsRefStr;
37use table::metadata::{RawTableInfo, TableId, TableInfo};
38use table::table_reference::TableReference;
39
40use crate::ddl::DdlContext;
41use crate::ddl::alter_table::executor::AlterTableExecutor;
42use crate::ddl::utils::{
43 MultipleResults, extract_column_metadatas, handle_multiple_results, map_to_procedure_error,
44 sync_follower_regions,
45};
46use crate::error::{AbortProcedureSnafu, NoLeaderSnafu, PutPoisonSnafu, Result, RetryLaterSnafu};
47use crate::key::table_info::TableInfoValue;
48use crate::key::{DeserializedValueWithBytes, RegionDistribution};
49use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
50use crate::metrics;
51use crate::poison_key::table_poison_key;
52use crate::rpc::ddl::AlterTableTask;
53use crate::rpc::router::{RegionRoute, find_leaders, region_distribution};
54
55pub struct AlterTableProcedure {
57 context: DdlContext,
59 data: AlterTableData,
61 new_table_info: Option<TableInfo>,
65 executor: AlterTableExecutor,
67}
68
69fn build_executor_from_alter_expr(alter_data: &AlterTableData) -> AlterTableExecutor {
74 let table_name = alter_data.table_ref().into();
75 let table_id = alter_data.table_id;
76 let alter_kind = alter_data.task.alter_table.kind.as_ref().unwrap();
77 let new_table_name = if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
78 Some(new_table_name.to_string())
79 } else {
80 None
81 };
82 AlterTableExecutor::new(table_name, table_id, new_table_name)
83}
84
85impl AlterTableProcedure {
86 pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterTable";
87
88 pub fn new(table_id: TableId, task: AlterTableTask, context: DdlContext) -> Result<Self> {
89 task.validate()?;
90 let data = AlterTableData::new(task, table_id);
91 let executor = build_executor_from_alter_expr(&data);
92 Ok(Self {
93 context,
94 data,
95 new_table_info: None,
96 executor,
97 })
98 }
99
100 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
101 let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
102 let executor = build_executor_from_alter_expr(&data);
103
104 Ok(AlterTableProcedure {
105 context,
106 data,
107 new_table_info: None,
108 executor,
109 })
110 }
111
112 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
114 self.executor
115 .on_prepare(&self.context.table_metadata_manager)
116 .await?;
117 self.fill_table_info().await?;
118
119 let table_info_value = self.data.table_info_value.as_ref().unwrap();
121 let new_table_info = AlterTableExecutor::validate_alter_table_expr(
122 &table_info_value.table_info,
123 self.data.task.alter_table.clone(),
124 )?;
125 self.new_table_info = Some(new_table_info);
126
127 let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
129 if matches!(alter_kind, Kind::RenameTable { .. }) {
130 self.data.state = AlterTableState::UpdateMetadata;
131 } else {
132 self.data.state = AlterTableState::SubmitAlterRegionRequests;
133 };
134 Ok(Status::executing(true))
135 }
136
137 fn table_poison_key(&self) -> PoisonKey {
138 table_poison_key(self.data.table_id())
139 }
140
141 async fn put_poison(
142 &self,
143 ctx_provider: &dyn ContextProvider,
144 procedure_id: ProcedureId,
145 ) -> Result<()> {
146 let poison_key = self.table_poison_key();
147 ctx_provider
148 .try_put_poison(&poison_key, procedure_id)
149 .await
150 .context(PutPoisonSnafu)
151 }
152
153 pub async fn submit_alter_region_requests(
154 &mut self,
155 procedure_id: ProcedureId,
156 ctx_provider: &dyn ContextProvider,
157 ) -> Result<Status> {
158 let table_id = self.data.table_id();
159 let (_, physical_table_route) = self
160 .context
161 .table_metadata_manager
162 .table_route_manager()
163 .get_physical_table_route(table_id)
164 .await?;
165
166 self.data.region_distribution =
167 Some(region_distribution(&physical_table_route.region_routes));
168 let leaders = find_leaders(&physical_table_route.region_routes);
169 let alter_kind = self.make_region_alter_kind()?;
170
171 info!(
172 "Submitting alter region requests for table {}, table_id: {}, alter_kind: {:?}",
173 self.data.table_ref(),
174 table_id,
175 alter_kind,
176 );
177
178 ensure!(!leaders.is_empty(), NoLeaderSnafu { table_id });
179 self.put_poison(ctx_provider, procedure_id).await?;
181 let results = self
182 .executor
183 .on_alter_regions(
184 &self.context.node_manager,
185 &physical_table_route.region_routes,
186 alter_kind,
187 )
188 .await;
189
190 match handle_multiple_results(results) {
191 MultipleResults::PartialRetryable(error) => {
192 Err(error)
194 }
195 MultipleResults::PartialNonRetryable(error) => {
196 error!(error; "Partial non-retryable errors occurred during alter table, table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
197 Ok(Status::poisoned(
199 Some(self.table_poison_key()),
200 ProcedureError::external(error),
201 ))
202 }
203 MultipleResults::AllRetryable(error) => {
204 let err = BoxedError::new(error);
206 Err(err).context(RetryLaterSnafu {
207 clean_poisons: true,
208 })
209 }
210 MultipleResults::Ok(results) => {
211 self.submit_sync_region_requests(&results, &physical_table_route.region_routes)
212 .await;
213 self.handle_alter_region_response(results)?;
214 Ok(Status::executing_with_clean_poisons(true))
215 }
216 MultipleResults::AllNonRetryable(error) => {
217 error!(error; "All alter requests returned non-retryable errors for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
218 let err = BoxedError::new(error);
222 Err(err).context(AbortProcedureSnafu {
223 clean_poisons: true,
224 })
225 }
226 }
227 }
228
229 fn handle_alter_region_response(&mut self, mut results: Vec<RegionResponse>) -> Result<()> {
230 if let Some(column_metadatas) =
231 extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)?
232 {
233 self.data.column_metadatas = column_metadatas;
234 } else {
235 warn!(
236 "altering table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged"
237 );
238 }
239 self.data.state = AlterTableState::UpdateMetadata;
240 Ok(())
241 }
242
243 async fn submit_sync_region_requests(
244 &mut self,
245 results: &[RegionResponse],
246 region_routes: &[RegionRoute],
247 ) {
248 let table_info = self.data.table_info().unwrap();
250 if let Err(err) = sync_follower_regions(
251 &self.context,
252 self.data.table_id(),
253 results,
254 region_routes,
255 table_info.meta.engine.as_str(),
256 )
257 .await
258 {
259 error!(err; "Failed to sync regions for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
260 }
261 }
262
263 pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
265 let table_id = self.data.table_id();
266 let table_ref = self.data.table_ref();
267 let table_info_value = self.data.table_info_value.as_ref().unwrap();
269 let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
271
272 let new_info = match &self.new_table_info {
274 Some(cached) => cached.clone(),
275 None => AlterTableExecutor::validate_alter_table_expr(
276 &table_info_value.table_info,
277 self.data.task.alter_table.clone(),
278 )
279 .inspect_err(|e| {
280 error!(e; "Unable to build info for table {} in update metadata step, table_id: {}", table_ref, table_id);
282 })?,
283 };
284
285 self.executor
287 .on_alter_metadata(
288 &self.context.table_metadata_manager,
289 table_info_value,
290 self.data.region_distribution.as_ref(),
291 new_info.into(),
292 &self.data.column_metadatas,
293 )
294 .await?;
295
296 info!(
297 "Updated table metadata for table {table_ref}, table_id: {table_id}, kind: {alter_kind:?}"
298 );
299 self.data.state = AlterTableState::InvalidateTableCache;
300 Ok(Status::executing(true))
301 }
302
303 async fn on_broadcast(&mut self) -> Result<Status> {
305 self.executor
306 .invalidate_table_cache(&self.context.cache_invalidator)
307 .await?;
308 Ok(Status::done())
309 }
310
311 fn lock_key_inner(&self) -> Vec<StringKey> {
312 let mut lock_key = vec![];
313 let table_ref = self.data.table_ref();
314 let table_id = self.data.table_id();
315 lock_key.push(CatalogLock::Read(table_ref.catalog).into());
316 lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
317 lock_key.push(TableLock::Write(table_id).into());
318
319 let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
321 if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
322 lock_key.push(
323 TableNameLock::new(table_ref.catalog, table_ref.schema, new_table_name).into(),
324 )
325 }
326
327 lock_key
328 }
329
330 #[cfg(test)]
331 pub(crate) fn data(&self) -> &AlterTableData {
332 &self.data
333 }
334
335 #[cfg(test)]
336 pub(crate) fn mut_data(&mut self) -> &mut AlterTableData {
337 &mut self.data
338 }
339}
340
341#[async_trait]
342impl Procedure for AlterTableProcedure {
343 fn type_name(&self) -> &str {
344 Self::TYPE_NAME
345 }
346
347 async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
348 let state = &self.data.state;
349
350 let step = state.as_ref();
351
352 let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE
353 .with_label_values(&[step])
354 .start_timer();
355
356 match state {
357 AlterTableState::Prepare => self.on_prepare().await,
358 AlterTableState::SubmitAlterRegionRequests => {
359 self.submit_alter_region_requests(ctx.procedure_id, ctx.provider.as_ref())
360 .await
361 }
362 AlterTableState::UpdateMetadata => self.on_update_metadata().await,
363 AlterTableState::InvalidateTableCache => self.on_broadcast().await,
364 }
365 .map_err(map_to_procedure_error)
366 }
367
368 fn dump(&self) -> ProcedureResult<String> {
369 serde_json::to_string(&self.data).context(ToJsonSnafu)
370 }
371
372 fn lock_key(&self) -> LockKey {
373 let key = self.lock_key_inner();
374
375 LockKey::new(key)
376 }
377
378 fn poison_keys(&self) -> PoisonKeys {
379 PoisonKeys::new(vec![self.table_poison_key()])
380 }
381}
382
383#[derive(Debug, Serialize, Deserialize, AsRefStr)]
384enum AlterTableState {
385 Prepare,
387 SubmitAlterRegionRequests,
389 UpdateMetadata,
391 InvalidateTableCache,
393}
394
395#[derive(Debug, Serialize, Deserialize)]
397pub struct AlterTableData {
398 state: AlterTableState,
399 task: AlterTableTask,
400 table_id: TableId,
401 #[serde(default)]
402 column_metadatas: Vec<ColumnMetadata>,
403 table_info_value: Option<DeserializedValueWithBytes<TableInfoValue>>,
405 region_distribution: Option<RegionDistribution>,
407}
408
409impl AlterTableData {
410 pub fn new(task: AlterTableTask, table_id: TableId) -> Self {
411 Self {
412 state: AlterTableState::Prepare,
413 task,
414 table_id,
415 column_metadatas: vec![],
416 table_info_value: None,
417 region_distribution: None,
418 }
419 }
420
421 fn table_ref(&self) -> TableReference {
422 self.task.table_ref()
423 }
424
425 fn table_id(&self) -> TableId {
426 self.table_id
427 }
428
429 fn table_info(&self) -> Option<&RawTableInfo> {
430 self.table_info_value
431 .as_ref()
432 .map(|value| &value.table_info)
433 }
434
435 #[cfg(test)]
436 pub(crate) fn column_metadatas(&self) -> &[ColumnMetadata] {
437 &self.column_metadatas
438 }
439
440 #[cfg(test)]
441 pub(crate) fn set_column_metadatas(&mut self, column_metadatas: Vec<ColumnMetadata>) {
442 self.column_metadatas = column_metadatas;
443 }
444}