1mod executor;
16mod metadata;
17mod region_request;
18
19use std::vec;
20
21use api::region::RegionResponse;
22use api::v1::alter_table_expr::Kind;
23use api::v1::RenameTable;
24use async_trait::async_trait;
25use common_error::ext::BoxedError;
26use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
27use common_procedure::{
28 Context as ProcedureContext, ContextProvider, Error as ProcedureError, LockKey, PoisonKey,
29 PoisonKeys, Procedure, ProcedureId, Status, StringKey,
30};
31use common_telemetry::{error, info, warn};
32use serde::{Deserialize, Serialize};
33use snafu::{ensure, ResultExt};
34use store_api::metadata::ColumnMetadata;
35use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
36use strum::AsRefStr;
37use table::metadata::{RawTableInfo, TableId, TableInfo};
38use table::table_reference::TableReference;
39
40use crate::ddl::alter_table::executor::AlterTableExecutor;
41use crate::ddl::utils::{
42 extract_column_metadatas, handle_multiple_results, map_to_procedure_error,
43 sync_follower_regions, MultipleResults,
44};
45use crate::ddl::DdlContext;
46use crate::error::{AbortProcedureSnafu, NoLeaderSnafu, PutPoisonSnafu, Result, RetryLaterSnafu};
47use crate::key::table_info::TableInfoValue;
48use crate::key::{DeserializedValueWithBytes, RegionDistribution};
49use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
50use crate::metrics;
51use crate::poison_key::table_poison_key;
52use crate::rpc::ddl::AlterTableTask;
53use crate::rpc::router::{find_leaders, region_distribution, RegionRoute};
54
55pub struct AlterTableProcedure {
57 context: DdlContext,
59 data: AlterTableData,
61 new_table_info: Option<TableInfo>,
65 executor: AlterTableExecutor,
67}
68
69fn build_executor_from_alter_expr(alter_data: &AlterTableData) -> AlterTableExecutor {
74 let table_name = alter_data.table_ref().into();
75 let table_id = alter_data.table_id;
76 let alter_kind = alter_data.task.alter_table.kind.as_ref().unwrap();
77 let new_table_name = if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
78 Some(new_table_name.to_string())
79 } else {
80 None
81 };
82 AlterTableExecutor::new(table_name, table_id, new_table_name)
83}
84
85impl AlterTableProcedure {
86 pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterTable";
87
88 pub fn new(table_id: TableId, task: AlterTableTask, context: DdlContext) -> Result<Self> {
89 task.validate()?;
90 let data = AlterTableData::new(task, table_id);
91 let executor = build_executor_from_alter_expr(&data);
92 Ok(Self {
93 context,
94 data,
95 new_table_info: None,
96 executor,
97 })
98 }
99
100 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
101 let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
102 let executor = build_executor_from_alter_expr(&data);
103
104 Ok(AlterTableProcedure {
105 context,
106 data,
107 new_table_info: None,
108 executor,
109 })
110 }
111
112 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
114 self.executor
115 .on_prepare(&self.context.table_metadata_manager)
116 .await?;
117 self.fill_table_info().await?;
118
119 let table_info_value = self.data.table_info_value.as_ref().unwrap();
121 let new_table_info = AlterTableExecutor::validate_alter_table_expr(
122 &table_info_value.table_info,
123 self.data.task.alter_table.clone(),
124 )?;
125 self.new_table_info = Some(new_table_info);
126
127 let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
129 if matches!(alter_kind, Kind::RenameTable { .. }) {
130 self.data.state = AlterTableState::UpdateMetadata;
131 } else {
132 self.data.state = AlterTableState::SubmitAlterRegionRequests;
133 };
134 Ok(Status::executing(true))
135 }
136
137 fn table_poison_key(&self) -> PoisonKey {
138 table_poison_key(self.data.table_id())
139 }
140
141 async fn put_poison(
142 &self,
143 ctx_provider: &dyn ContextProvider,
144 procedure_id: ProcedureId,
145 ) -> Result<()> {
146 let poison_key = self.table_poison_key();
147 ctx_provider
148 .try_put_poison(&poison_key, procedure_id)
149 .await
150 .context(PutPoisonSnafu)
151 }
152
153 pub async fn submit_alter_region_requests(
154 &mut self,
155 procedure_id: ProcedureId,
156 ctx_provider: &dyn ContextProvider,
157 ) -> Result<Status> {
158 let table_id = self.data.table_id();
159 let (_, physical_table_route) = self
160 .context
161 .table_metadata_manager
162 .table_route_manager()
163 .get_physical_table_route(table_id)
164 .await?;
165
166 self.data.region_distribution =
167 Some(region_distribution(&physical_table_route.region_routes));
168 let leaders = find_leaders(&physical_table_route.region_routes);
169 let alter_kind = self.make_region_alter_kind()?;
170
171 info!(
172 "Submitting alter region requests for table {}, table_id: {}, alter_kind: {:?}",
173 self.data.table_ref(),
174 table_id,
175 alter_kind,
176 );
177
178 ensure!(!leaders.is_empty(), NoLeaderSnafu { table_id });
179 self.put_poison(ctx_provider, procedure_id).await?;
181 let results = self
182 .executor
183 .on_alter_regions(
184 &self.context.node_manager,
185 &physical_table_route.region_routes,
186 alter_kind,
187 )
188 .await;
189
190 match handle_multiple_results(results) {
191 MultipleResults::PartialRetryable(error) => {
192 Err(error)
194 }
195 MultipleResults::PartialNonRetryable(error) => {
196 error!(error; "Partial non-retryable errors occurred during alter table, table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
197 Ok(Status::poisoned(
199 Some(self.table_poison_key()),
200 ProcedureError::external(error),
201 ))
202 }
203 MultipleResults::AllRetryable(error) => {
204 let err = BoxedError::new(error);
206 Err(err).context(RetryLaterSnafu {
207 clean_poisons: true,
208 })
209 }
210 MultipleResults::Ok(results) => {
211 self.submit_sync_region_requests(&results, &physical_table_route.region_routes)
212 .await;
213 self.handle_alter_region_response(results)?;
214 Ok(Status::executing_with_clean_poisons(true))
215 }
216 MultipleResults::AllNonRetryable(error) => {
217 error!(error; "All alter requests returned non-retryable errors for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
218 let err = BoxedError::new(error);
222 Err(err).context(AbortProcedureSnafu {
223 clean_poisons: true,
224 })
225 }
226 }
227 }
228
229 fn handle_alter_region_response(&mut self, mut results: Vec<RegionResponse>) -> Result<()> {
230 if let Some(column_metadatas) =
231 extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)?
232 {
233 self.data.column_metadatas = column_metadatas;
234 } else {
235 warn!("altering table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged");
236 }
237 self.data.state = AlterTableState::UpdateMetadata;
238 Ok(())
239 }
240
241 async fn submit_sync_region_requests(
242 &mut self,
243 results: &[RegionResponse],
244 region_routes: &[RegionRoute],
245 ) {
246 let table_info = self.data.table_info().unwrap();
248 if let Err(err) = sync_follower_regions(
249 &self.context,
250 self.data.table_id(),
251 results,
252 region_routes,
253 table_info.meta.engine.as_str(),
254 )
255 .await
256 {
257 error!(err; "Failed to sync regions for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
258 }
259 }
260
261 pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
263 let table_id = self.data.table_id();
264 let table_ref = self.data.table_ref();
265 let table_info_value = self.data.table_info_value.as_ref().unwrap();
267 let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
269
270 let new_info = match &self.new_table_info {
272 Some(cached) => cached.clone(),
273 None => AlterTableExecutor::validate_alter_table_expr(
274 &table_info_value.table_info,
275 self.data.task.alter_table.clone(),
276 )
277 .inspect_err(|e| {
278 error!(e; "Unable to build info for table {} in update metadata step, table_id: {}", table_ref, table_id);
280 })?,
281 };
282
283 self.executor
285 .on_alter_metadata(
286 &self.context.table_metadata_manager,
287 table_info_value,
288 self.data.region_distribution.as_ref(),
289 new_info.into(),
290 &self.data.column_metadatas,
291 )
292 .await?;
293
294 info!("Updated table metadata for table {table_ref}, table_id: {table_id}, kind: {alter_kind:?}");
295 self.data.state = AlterTableState::InvalidateTableCache;
296 Ok(Status::executing(true))
297 }
298
299 async fn on_broadcast(&mut self) -> Result<Status> {
301 self.executor
302 .invalidate_table_cache(&self.context.cache_invalidator)
303 .await?;
304 Ok(Status::done())
305 }
306
307 fn lock_key_inner(&self) -> Vec<StringKey> {
308 let mut lock_key = vec![];
309 let table_ref = self.data.table_ref();
310 let table_id = self.data.table_id();
311 lock_key.push(CatalogLock::Read(table_ref.catalog).into());
312 lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
313 lock_key.push(TableLock::Write(table_id).into());
314
315 let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
317 if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
318 lock_key.push(
319 TableNameLock::new(table_ref.catalog, table_ref.schema, new_table_name).into(),
320 )
321 }
322
323 lock_key
324 }
325
326 #[cfg(test)]
327 pub(crate) fn data(&self) -> &AlterTableData {
328 &self.data
329 }
330
331 #[cfg(test)]
332 pub(crate) fn mut_data(&mut self) -> &mut AlterTableData {
333 &mut self.data
334 }
335}
336
337#[async_trait]
338impl Procedure for AlterTableProcedure {
339 fn type_name(&self) -> &str {
340 Self::TYPE_NAME
341 }
342
343 async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
344 let state = &self.data.state;
345
346 let step = state.as_ref();
347
348 let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE
349 .with_label_values(&[step])
350 .start_timer();
351
352 match state {
353 AlterTableState::Prepare => self.on_prepare().await,
354 AlterTableState::SubmitAlterRegionRequests => {
355 self.submit_alter_region_requests(ctx.procedure_id, ctx.provider.as_ref())
356 .await
357 }
358 AlterTableState::UpdateMetadata => self.on_update_metadata().await,
359 AlterTableState::InvalidateTableCache => self.on_broadcast().await,
360 }
361 .map_err(map_to_procedure_error)
362 }
363
364 fn dump(&self) -> ProcedureResult<String> {
365 serde_json::to_string(&self.data).context(ToJsonSnafu)
366 }
367
368 fn lock_key(&self) -> LockKey {
369 let key = self.lock_key_inner();
370
371 LockKey::new(key)
372 }
373
374 fn poison_keys(&self) -> PoisonKeys {
375 PoisonKeys::new(vec![self.table_poison_key()])
376 }
377}
378
379#[derive(Debug, Serialize, Deserialize, AsRefStr)]
380enum AlterTableState {
381 Prepare,
383 SubmitAlterRegionRequests,
385 UpdateMetadata,
387 InvalidateTableCache,
389}
390
391#[derive(Debug, Serialize, Deserialize)]
393pub struct AlterTableData {
394 state: AlterTableState,
395 task: AlterTableTask,
396 table_id: TableId,
397 #[serde(default)]
398 column_metadatas: Vec<ColumnMetadata>,
399 table_info_value: Option<DeserializedValueWithBytes<TableInfoValue>>,
401 region_distribution: Option<RegionDistribution>,
403}
404
405impl AlterTableData {
406 pub fn new(task: AlterTableTask, table_id: TableId) -> Self {
407 Self {
408 state: AlterTableState::Prepare,
409 task,
410 table_id,
411 column_metadatas: vec![],
412 table_info_value: None,
413 region_distribution: None,
414 }
415 }
416
417 fn table_ref(&self) -> TableReference {
418 self.task.table_ref()
419 }
420
421 fn table_id(&self) -> TableId {
422 self.table_id
423 }
424
425 fn table_info(&self) -> Option<&RawTableInfo> {
426 self.table_info_value
427 .as_ref()
428 .map(|value| &value.table_info)
429 }
430
431 #[cfg(test)]
432 pub(crate) fn column_metadatas(&self) -> &[ColumnMetadata] {
433 &self.column_metadatas
434 }
435
436 #[cfg(test)]
437 pub(crate) fn set_column_metadatas(&mut self, column_metadatas: Vec<ColumnMetadata>) {
438 self.column_metadatas = column_metadatas;
439 }
440}