1mod executor;
16mod metadata;
17mod region_request;
18
19use std::vec;
20
21use api::region::RegionResponse;
22use api::v1::alter_table_expr::Kind;
23use api::v1::RenameTable;
24use async_trait::async_trait;
25use common_error::ext::BoxedError;
26use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
27use common_procedure::{
28 Context as ProcedureContext, ContextProvider, Error as ProcedureError, LockKey, PoisonKey,
29 PoisonKeys, Procedure, ProcedureId, Status, StringKey,
30};
31use common_telemetry::{error, info, warn};
32use serde::{Deserialize, Serialize};
33use snafu::{ensure, ResultExt};
34use store_api::metadata::ColumnMetadata;
35use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
36use strum::AsRefStr;
37use table::metadata::{RawTableInfo, TableId, TableInfo};
38use table::table_reference::TableReference;
39
40use crate::ddl::alter_table::executor::AlterTableExecutor;
41use crate::ddl::utils::{
42 extract_column_metadatas, handle_multiple_results, map_to_procedure_error,
43 sync_follower_regions, MultipleResults,
44};
45use crate::ddl::DdlContext;
46use crate::error::{AbortProcedureSnafu, NoLeaderSnafu, PutPoisonSnafu, Result, RetryLaterSnafu};
47use crate::key::table_info::TableInfoValue;
48use crate::key::{DeserializedValueWithBytes, RegionDistribution};
49use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
50use crate::metrics;
51use crate::poison_key::table_poison_key;
52use crate::rpc::ddl::AlterTableTask;
53use crate::rpc::router::{find_leaders, region_distribution, RegionRoute};
54
55pub struct AlterTableProcedure {
57 context: DdlContext,
59 data: AlterTableData,
61 new_table_info: Option<TableInfo>,
65 executor: AlterTableExecutor,
67}
68
69fn build_executor_from_alter_expr(alter_data: &AlterTableData) -> AlterTableExecutor {
74 let table_name = alter_data.table_ref().into();
75 let table_id = alter_data.table_id;
76 let alter_kind = alter_data.task.alter_table.kind.as_ref().unwrap();
77 let new_table_name = if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
78 Some(new_table_name.to_string())
79 } else {
80 None
81 };
82 AlterTableExecutor::new(table_name, table_id, new_table_name)
83}
84
85impl AlterTableProcedure {
86 pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterTable";
87
88 pub fn new(table_id: TableId, task: AlterTableTask, context: DdlContext) -> Result<Self> {
89 task.validate()?;
90 let data = AlterTableData::new(task, table_id);
91 let executor = build_executor_from_alter_expr(&data);
92 Ok(Self {
93 context,
94 data,
95 new_table_info: None,
96 executor,
97 })
98 }
99
100 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
101 let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
102 let executor = build_executor_from_alter_expr(&data);
103
104 Ok(AlterTableProcedure {
105 context,
106 data,
107 new_table_info: None,
108 executor,
109 })
110 }
111
112 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
114 self.executor
115 .on_prepare(&self.context.table_metadata_manager)
116 .await?;
117 self.fill_table_info().await?;
118
119 let table_info_value = self.data.table_info_value.as_ref().unwrap();
121 let new_table_info = AlterTableExecutor::validate_alter_table_expr(
122 &table_info_value.table_info,
123 self.data.task.alter_table.clone(),
124 )?;
125 self.new_table_info = Some(new_table_info);
126
127 let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
129 if matches!(alter_kind, Kind::RenameTable { .. }) {
130 self.data.state = AlterTableState::UpdateMetadata;
131 } else {
132 self.data.state = AlterTableState::SubmitAlterRegionRequests;
133 };
134 Ok(Status::executing(true))
135 }
136
137 fn table_poison_key(&self) -> PoisonKey {
138 table_poison_key(self.data.table_id())
139 }
140
141 async fn put_poison(
142 &self,
143 ctx_provider: &dyn ContextProvider,
144 procedure_id: ProcedureId,
145 ) -> Result<()> {
146 let poison_key = self.table_poison_key();
147 ctx_provider
148 .try_put_poison(&poison_key, procedure_id)
149 .await
150 .context(PutPoisonSnafu)
151 }
152
153 pub async fn submit_alter_region_requests(
154 &mut self,
155 procedure_id: ProcedureId,
156 ctx_provider: &dyn ContextProvider,
157 ) -> Result<Status> {
158 let table_id = self.data.table_id();
159 let (_, physical_table_route) = self
160 .context
161 .table_metadata_manager
162 .table_route_manager()
163 .get_physical_table_route(table_id)
164 .await?;
165
166 self.data.region_distribution =
167 Some(region_distribution(&physical_table_route.region_routes));
168 let leaders = find_leaders(&physical_table_route.region_routes);
169 let alter_kind = self.make_region_alter_kind()?;
170
171 info!(
172 "Submitting alter region requests for table {}, table_id: {}, alter_kind: {:?}",
173 self.data.table_ref(),
174 table_id,
175 alter_kind,
176 );
177
178 ensure!(!leaders.is_empty(), NoLeaderSnafu { table_id });
179 self.put_poison(ctx_provider, procedure_id).await?;
181 let results = self
182 .executor
183 .on_alter_regions(
184 &self.context.node_manager,
185 &physical_table_route.region_routes,
186 alter_kind,
187 )
188 .await;
189
190 match handle_multiple_results(results) {
191 MultipleResults::PartialRetryable(error) => {
192 Err(error)
194 }
195 MultipleResults::PartialNonRetryable(error) => {
196 error!(error; "Partial non-retryable errors occurred during alter table, table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
197 Ok(Status::poisoned(
199 Some(self.table_poison_key()),
200 ProcedureError::external(error),
201 ))
202 }
203 MultipleResults::AllRetryable(error) => {
204 let err = BoxedError::new(error);
206 Err(err).context(RetryLaterSnafu {
207 clean_poisons: true,
208 })
209 }
210 MultipleResults::Ok(results) => {
211 self.submit_sync_region_requests(&results, &physical_table_route.region_routes)
212 .await;
213 self.handle_alter_region_response(results)?;
214 Ok(Status::executing_with_clean_poisons(true))
215 }
216 MultipleResults::AllNonRetryable(error) => {
217 error!(error; "All alter requests returned non-retryable errors for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
218 let err = BoxedError::new(error);
222 Err(err).context(AbortProcedureSnafu {
223 clean_poisons: true,
224 })
225 }
226 }
227 }
228
229 fn handle_alter_region_response(&mut self, mut results: Vec<RegionResponse>) -> Result<()> {
230 self.data.state = AlterTableState::UpdateMetadata;
231 if let Some(column_metadatas) =
232 extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)?
233 {
234 self.data.column_metadatas = column_metadatas;
235 } else {
236 warn!("altering table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged");
237 }
238
239 Ok(())
240 }
241
242 async fn submit_sync_region_requests(
243 &mut self,
244 results: &[RegionResponse],
245 region_routes: &[RegionRoute],
246 ) {
247 let table_info = self.data.table_info().unwrap();
249 if let Err(err) = sync_follower_regions(
250 &self.context,
251 self.data.table_id(),
252 results,
253 region_routes,
254 table_info.meta.engine.as_str(),
255 )
256 .await
257 {
258 error!(err; "Failed to sync regions for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
259 }
260 }
261
262 pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
264 let table_id = self.data.table_id();
265 let table_ref = self.data.table_ref();
266 let table_info_value = self.data.table_info_value.as_ref().unwrap();
268 let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
270
271 let new_info = match &self.new_table_info {
273 Some(cached) => cached.clone(),
274 None => AlterTableExecutor::validate_alter_table_expr(
275 &table_info_value.table_info,
276 self.data.task.alter_table.clone(),
277 )
278 .inspect_err(|e| {
279 error!(e; "Unable to build info for table {} in update metadata step, table_id: {}", table_ref, table_id);
281 })?,
282 };
283
284 self.executor
286 .on_alter_metadata(
287 &self.context.table_metadata_manager,
288 table_info_value,
289 self.data.region_distribution.as_ref(),
290 new_info.into(),
291 &self.data.column_metadatas,
292 )
293 .await?;
294
295 info!("Updated table metadata for table {table_ref}, table_id: {table_id}, kind: {alter_kind:?}");
296 self.data.state = AlterTableState::InvalidateTableCache;
297 Ok(Status::executing(true))
298 }
299
300 async fn on_broadcast(&mut self) -> Result<Status> {
302 self.executor
303 .invalidate_table_cache(&self.context.cache_invalidator)
304 .await?;
305 Ok(Status::done())
306 }
307
308 fn lock_key_inner(&self) -> Vec<StringKey> {
309 let mut lock_key = vec![];
310 let table_ref = self.data.table_ref();
311 let table_id = self.data.table_id();
312 lock_key.push(CatalogLock::Read(table_ref.catalog).into());
313 lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
314 lock_key.push(TableLock::Write(table_id).into());
315
316 let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
318 if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
319 lock_key.push(
320 TableNameLock::new(table_ref.catalog, table_ref.schema, new_table_name).into(),
321 )
322 }
323
324 lock_key
325 }
326
327 #[cfg(test)]
328 pub(crate) fn data(&self) -> &AlterTableData {
329 &self.data
330 }
331
332 #[cfg(test)]
333 pub(crate) fn mut_data(&mut self) -> &mut AlterTableData {
334 &mut self.data
335 }
336}
337
338#[async_trait]
339impl Procedure for AlterTableProcedure {
340 fn type_name(&self) -> &str {
341 Self::TYPE_NAME
342 }
343
344 async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
345 let state = &self.data.state;
346
347 let step = state.as_ref();
348
349 let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE
350 .with_label_values(&[step])
351 .start_timer();
352
353 match state {
354 AlterTableState::Prepare => self.on_prepare().await,
355 AlterTableState::SubmitAlterRegionRequests => {
356 self.submit_alter_region_requests(ctx.procedure_id, ctx.provider.as_ref())
357 .await
358 }
359 AlterTableState::UpdateMetadata => self.on_update_metadata().await,
360 AlterTableState::InvalidateTableCache => self.on_broadcast().await,
361 }
362 .map_err(map_to_procedure_error)
363 }
364
365 fn dump(&self) -> ProcedureResult<String> {
366 serde_json::to_string(&self.data).context(ToJsonSnafu)
367 }
368
369 fn lock_key(&self) -> LockKey {
370 let key = self.lock_key_inner();
371
372 LockKey::new(key)
373 }
374
375 fn poison_keys(&self) -> PoisonKeys {
376 PoisonKeys::new(vec![self.table_poison_key()])
377 }
378}
379
380#[derive(Debug, Serialize, Deserialize, AsRefStr)]
381enum AlterTableState {
382 Prepare,
384 SubmitAlterRegionRequests,
386 UpdateMetadata,
388 InvalidateTableCache,
390}
391
392#[derive(Debug, Serialize, Deserialize)]
394pub struct AlterTableData {
395 state: AlterTableState,
396 task: AlterTableTask,
397 table_id: TableId,
398 #[serde(default)]
399 column_metadatas: Vec<ColumnMetadata>,
400 table_info_value: Option<DeserializedValueWithBytes<TableInfoValue>>,
402 region_distribution: Option<RegionDistribution>,
404}
405
406impl AlterTableData {
407 pub fn new(task: AlterTableTask, table_id: TableId) -> Self {
408 Self {
409 state: AlterTableState::Prepare,
410 task,
411 table_id,
412 column_metadatas: vec![],
413 table_info_value: None,
414 region_distribution: None,
415 }
416 }
417
418 fn table_ref(&self) -> TableReference {
419 self.task.table_ref()
420 }
421
422 fn table_id(&self) -> TableId {
423 self.table_id
424 }
425
426 fn table_info(&self) -> Option<&RawTableInfo> {
427 self.table_info_value
428 .as_ref()
429 .map(|value| &value.table_info)
430 }
431
432 #[cfg(test)]
433 pub(crate) fn column_metadatas(&self) -> &[ColumnMetadata] {
434 &self.column_metadatas
435 }
436
437 #[cfg(test)]
438 pub(crate) fn set_column_metadatas(&mut self, column_metadatas: Vec<ColumnMetadata>) {
439 self.column_metadatas = column_metadatas;
440 }
441}