1mod executor;
16mod metadata;
17mod region_request;
18
19use std::vec;
20
21use api::region::RegionResponse;
22use api::v1::alter_table_expr::Kind;
23use api::v1::{RenameTable, SetTableOptions, UnsetTableOptions};
24use async_trait::async_trait;
25use common_error::ext::BoxedError;
26use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
27use common_procedure::{
28 Context as ProcedureContext, ContextProvider, Error as ProcedureError, LockKey, PoisonKey,
29 PoisonKeys, Procedure, ProcedureId, Status, StringKey,
30};
31use common_telemetry::{error, info, warn};
32use serde::{Deserialize, Serialize};
33use snafu::{ResultExt, ensure};
34use store_api::metadata::ColumnMetadata;
35use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
36use strum::AsRefStr;
37use table::metadata::{TableId, TableInfo};
38use table::requests::REPARTITION_COLUMN_HINT_KEY;
39use table::table_reference::TableReference;
40
41use crate::ddl::DdlContext;
42use crate::ddl::alter_table::executor::AlterTableExecutor;
43use crate::ddl::utils::{
44 MultipleResults, extract_column_metadatas, handle_multiple_results, map_to_procedure_error,
45 sync_follower_regions,
46};
47use crate::error::{AbortProcedureSnafu, NoLeaderSnafu, PutPoisonSnafu, Result, RetryLaterSnafu};
48use crate::key::table_info::TableInfoValue;
49use crate::key::{DeserializedValueWithBytes, RegionDistribution};
50use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
51use crate::metrics;
52use crate::poison_key::table_poison_key;
53use crate::rpc::ddl::AlterTableTask;
54use crate::rpc::router::{RegionRoute, find_leaders, region_distribution};
55
56pub struct AlterTableProcedure {
58 context: DdlContext,
60 data: AlterTableData,
62 new_table_info: Option<TableInfo>,
66 executor: AlterTableExecutor,
68}
69
70fn build_executor_from_alter_expr(alter_data: &AlterTableData) -> AlterTableExecutor {
75 let table_name = alter_data.table_ref().into();
76 let table_id = alter_data.table_id;
77 let alter_kind = alter_data.task.alter_table.kind.as_ref().unwrap();
78 let new_table_name = if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
79 Some(new_table_name.clone())
80 } else {
81 None
82 };
83 AlterTableExecutor::new(table_name, table_id, new_table_name)
84}
85
86impl AlterTableProcedure {
87 pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterTable";
88
89 pub fn new(table_id: TableId, task: AlterTableTask, context: DdlContext) -> Result<Self> {
90 task.validate()?;
91 let data = AlterTableData::new(task, table_id);
92 let executor = build_executor_from_alter_expr(&data);
93 Ok(Self {
94 context,
95 data,
96 new_table_info: None,
97 executor,
98 })
99 }
100
101 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
102 let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
103 let executor = build_executor_from_alter_expr(&data);
104
105 Ok(AlterTableProcedure {
106 context,
107 data,
108 new_table_info: None,
109 executor,
110 })
111 }
112
113 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
115 self.executor
116 .on_prepare(&self.context.table_metadata_manager)
117 .await?;
118 self.fill_table_info().await?;
119
120 let table_info_value = self.data.table_info_value.as_ref().unwrap();
122 let new_table_info = AlterTableExecutor::validate_alter_table_expr(
123 &table_info_value.table_info,
124 self.data.task.alter_table.clone(),
125 )?;
126 self.new_table_info = Some(new_table_info);
127
128 let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
130 if is_metadata_only_alter(alter_kind) {
131 self.data.state = AlterTableState::UpdateMetadata;
132 } else {
133 self.data.state = AlterTableState::SubmitAlterRegionRequests;
134 };
135 Ok(Status::executing(true))
136 }
137
138 fn table_poison_key(&self) -> PoisonKey {
139 table_poison_key(self.data.table_id())
140 }
141
142 async fn put_poison(
143 &self,
144 ctx_provider: &dyn ContextProvider,
145 procedure_id: ProcedureId,
146 ) -> Result<()> {
147 let poison_key = self.table_poison_key();
148 ctx_provider
149 .try_put_poison(&poison_key, procedure_id)
150 .await
151 .context(PutPoisonSnafu)
152 }
153
154 pub async fn submit_alter_region_requests(
155 &mut self,
156 procedure_id: ProcedureId,
157 ctx_provider: &dyn ContextProvider,
158 ) -> Result<Status> {
159 let table_id = self.data.table_id();
160 let (_, physical_table_route) = self
161 .context
162 .table_metadata_manager
163 .table_route_manager()
164 .get_physical_table_route(table_id)
165 .await?;
166
167 self.data.region_distribution =
168 Some(region_distribution(&physical_table_route.region_routes));
169 let leaders = find_leaders(&physical_table_route.region_routes);
170 let alter_kind = self.make_region_alter_kind()?;
171
172 info!(
173 "Submitting alter region requests for table {}, table_id: {}, alter_kind: {:?}",
174 self.data.table_ref(),
175 table_id,
176 alter_kind,
177 );
178
179 ensure!(!leaders.is_empty(), NoLeaderSnafu { table_id });
180 self.put_poison(ctx_provider, procedure_id).await?;
182 let results = self
183 .executor
184 .on_alter_regions(
185 &self.context.node_manager,
186 &physical_table_route.region_routes,
187 alter_kind,
188 )
189 .await;
190
191 match handle_multiple_results(results) {
192 MultipleResults::PartialRetryable(error) => {
193 Err(error)
195 }
196 MultipleResults::PartialNonRetryable(error) => {
197 error!(error; "Partial non-retryable errors occurred during alter table, table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
198 Ok(Status::poisoned(
200 Some(self.table_poison_key()),
201 ProcedureError::external(error),
202 ))
203 }
204 MultipleResults::AllRetryable(error) => {
205 let err = BoxedError::new(error);
207 Err(err).context(RetryLaterSnafu {
208 clean_poisons: true,
209 })
210 }
211 MultipleResults::Ok(results) => {
212 self.submit_sync_region_requests(&results, &physical_table_route.region_routes)
213 .await;
214 self.handle_alter_region_response(results)?;
215 Ok(Status::executing_with_clean_poisons(true))
216 }
217 MultipleResults::AllNonRetryable(error) => {
218 error!(error; "All alter requests returned non-retryable errors for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
219 let err = BoxedError::new(error);
223 Err(err).context(AbortProcedureSnafu {
224 clean_poisons: true,
225 })
226 }
227 }
228 }
229
230 fn handle_alter_region_response(&mut self, mut results: Vec<RegionResponse>) -> Result<()> {
231 if let Some(column_metadatas) =
232 extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)?
233 {
234 self.data.column_metadatas = column_metadatas;
235 } else {
236 warn!(
237 "altering table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged"
238 );
239 }
240 self.data.state = AlterTableState::UpdateMetadata;
241 Ok(())
242 }
243
244 async fn submit_sync_region_requests(
245 &mut self,
246 results: &[RegionResponse],
247 region_routes: &[RegionRoute],
248 ) {
249 let table_info = self.data.table_info().unwrap();
251 if let Err(err) = sync_follower_regions(
252 &self.context,
253 self.data.table_id(),
254 results,
255 region_routes,
256 table_info.meta.engine.as_str(),
257 )
258 .await
259 {
260 error!(err; "Failed to sync regions for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
261 }
262 }
263
264 pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
266 let table_id = self.data.table_id();
267 let table_ref = self.data.table_ref();
268 let table_info_value = self.data.table_info_value.as_ref().unwrap();
270 let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
272 let metadata_only_alter = is_metadata_only_alter(alter_kind);
273
274 let new_info = match &self.new_table_info {
276 Some(cached) => cached.clone(),
277 None => AlterTableExecutor::validate_alter_table_expr(
278 &table_info_value.table_info,
279 self.data.task.alter_table.clone(),
280 )
281 .inspect_err(|e| {
282 error!(e; "Unable to build info for table {} in update metadata step, table_id: {}", table_ref, table_id);
284 })?,
285 };
286
287 self.executor
289 .on_alter_metadata(
290 &self.context.table_metadata_manager,
291 table_info_value,
292 self.data.region_distribution.as_ref(),
293 new_info,
294 &self.data.column_metadatas,
295 metadata_only_alter,
296 )
297 .await?;
298
299 info!(
300 "Updated table metadata for table {table_ref}, table_id: {table_id}, kind: {alter_kind:?}"
301 );
302 self.data.state = AlterTableState::InvalidateTableCache;
303 Ok(Status::executing(true))
304 }
305
306 async fn on_broadcast(&mut self) -> Result<Status> {
308 self.executor
309 .invalidate_table_cache(&self.context.cache_invalidator)
310 .await?;
311 Ok(Status::done())
312 }
313
314 fn lock_key_inner(&self) -> Vec<StringKey> {
315 let mut lock_key = vec![];
316 let table_ref = self.data.table_ref();
317 let table_id = self.data.table_id();
318 lock_key.push(CatalogLock::Read(table_ref.catalog).into());
319 lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
320 lock_key.push(TableLock::Write(table_id).into());
321
322 let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
324 if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
325 lock_key.push(
326 TableNameLock::new(table_ref.catalog, table_ref.schema, new_table_name).into(),
327 )
328 }
329
330 lock_key
331 }
332
333 #[cfg(test)]
334 pub(crate) fn data(&self) -> &AlterTableData {
335 &self.data
336 }
337
338 #[cfg(test)]
339 pub(crate) fn mut_data(&mut self) -> &mut AlterTableData {
340 &mut self.data
341 }
342}
343
344fn is_metadata_only_alter(alter_kind: &Kind) -> bool {
345 match alter_kind {
346 Kind::RenameTable { .. } => true,
347 Kind::SetTableOptions(SetTableOptions { table_options }) => {
348 table_options.len() == 1 && table_options[0].key.as_str() == REPARTITION_COLUMN_HINT_KEY
349 }
350 Kind::UnsetTableOptions(UnsetTableOptions { keys }) => {
351 keys.len() == 1 && keys[0].as_str() == REPARTITION_COLUMN_HINT_KEY
352 }
353 _ => false,
354 }
355}
356
357#[async_trait]
358impl Procedure for AlterTableProcedure {
359 fn type_name(&self) -> &str {
360 Self::TYPE_NAME
361 }
362
363 async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
364 let state = &self.data.state;
365
366 let step = state.as_ref();
367
368 let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE
369 .with_label_values(&[step])
370 .start_timer();
371
372 match state {
373 AlterTableState::Prepare => self.on_prepare().await,
374 AlterTableState::SubmitAlterRegionRequests => {
375 self.submit_alter_region_requests(ctx.procedure_id, ctx.provider.as_ref())
376 .await
377 }
378 AlterTableState::UpdateMetadata => self.on_update_metadata().await,
379 AlterTableState::InvalidateTableCache => self.on_broadcast().await,
380 }
381 .map_err(map_to_procedure_error)
382 }
383
384 fn dump(&self) -> ProcedureResult<String> {
385 serde_json::to_string(&self.data).context(ToJsonSnafu)
386 }
387
388 fn lock_key(&self) -> LockKey {
389 let key = self.lock_key_inner();
390
391 LockKey::new(key)
392 }
393
394 fn poison_keys(&self) -> PoisonKeys {
395 PoisonKeys::new(vec![self.table_poison_key()])
396 }
397}
398
399#[derive(Debug, Serialize, Deserialize, AsRefStr)]
400enum AlterTableState {
401 Prepare,
403 SubmitAlterRegionRequests,
405 UpdateMetadata,
407 InvalidateTableCache,
409}
410
411#[derive(Debug, Serialize, Deserialize)]
413pub struct AlterTableData {
414 state: AlterTableState,
415 task: AlterTableTask,
416 table_id: TableId,
417 #[serde(default)]
418 column_metadatas: Vec<ColumnMetadata>,
419 table_info_value: Option<DeserializedValueWithBytes<TableInfoValue>>,
421 region_distribution: Option<RegionDistribution>,
423}
424
425impl AlterTableData {
426 pub fn new(task: AlterTableTask, table_id: TableId) -> Self {
427 Self {
428 state: AlterTableState::Prepare,
429 task,
430 table_id,
431 column_metadatas: vec![],
432 table_info_value: None,
433 region_distribution: None,
434 }
435 }
436
437 fn table_ref(&self) -> TableReference<'_> {
438 self.task.table_ref()
439 }
440
441 fn table_id(&self) -> TableId {
442 self.table_id
443 }
444
445 fn table_info(&self) -> Option<&TableInfo> {
446 self.table_info_value
447 .as_ref()
448 .map(|value| &value.table_info)
449 }
450
451 #[cfg(test)]
452 pub(crate) fn column_metadatas(&self) -> &[ColumnMetadata] {
453 &self.column_metadatas
454 }
455
456 #[cfg(test)]
457 pub(crate) fn set_column_metadatas(&mut self, column_metadatas: Vec<ColumnMetadata>) {
458 self.column_metadatas = column_metadatas;
459 }
460}