1mod check;
16mod metadata;
17mod region_request;
18mod update_metadata;
19
20use std::vec;
21
22use api::region::RegionResponse;
23use api::v1::alter_table_expr::Kind;
24use api::v1::RenameTable;
25use async_trait::async_trait;
26use common_error::ext::BoxedError;
27use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
28use common_procedure::{
29 Context as ProcedureContext, ContextProvider, Error as ProcedureError, LockKey, PoisonKey,
30 PoisonKeys, Procedure, ProcedureId, Status, StringKey,
31};
32use common_telemetry::{debug, error, info};
33use futures::future::{self};
34use serde::{Deserialize, Serialize};
35use snafu::{ensure, ResultExt};
36use store_api::storage::RegionId;
37use strum::AsRefStr;
38use table::metadata::{RawTableInfo, TableId, TableInfo};
39use table::table_reference::TableReference;
40
41use crate::cache_invalidator::Context;
42use crate::ddl::utils::{
43 add_peer_context_if_needed, handle_multiple_results, sync_follower_regions, MultipleResults,
44};
45use crate::ddl::DdlContext;
46use crate::error::{AbortProcedureSnafu, Error, NoLeaderSnafu, PutPoisonSnafu, Result};
47use crate::instruction::CacheIdent;
48use crate::key::table_info::TableInfoValue;
49use crate::key::{DeserializedValueWithBytes, RegionDistribution};
50use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
51use crate::metrics;
52use crate::poison_key::table_poison_key;
53use crate::rpc::ddl::AlterTableTask;
54use crate::rpc::router::{find_leader_regions, find_leaders, region_distribution, RegionRoute};
55
56pub struct AlterTableProcedure {
58 context: DdlContext,
60 data: AlterTableData,
62 new_table_info: Option<TableInfo>,
66}
67
68impl AlterTableProcedure {
69 pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterTable";
70
71 pub fn new(table_id: TableId, task: AlterTableTask, context: DdlContext) -> Result<Self> {
72 task.validate()?;
73 Ok(Self {
74 context,
75 data: AlterTableData::new(task, table_id),
76 new_table_info: None,
77 })
78 }
79
80 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
81 let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
82 Ok(AlterTableProcedure {
83 context,
84 data,
85 new_table_info: None,
86 })
87 }
88
89 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
91 self.check_alter().await?;
92 self.fill_table_info().await?;
93
94 let table_info_value = self.data.table_info_value.as_ref().unwrap();
99 self.new_table_info = Some(self.build_new_table_info(&table_info_value.table_info)?);
100
101 let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
103 if matches!(alter_kind, Kind::RenameTable { .. }) {
104 self.data.state = AlterTableState::UpdateMetadata;
105 } else {
106 self.data.state = AlterTableState::SubmitAlterRegionRequests;
107 };
108 Ok(Status::executing(true))
109 }
110
111 fn table_poison_key(&self) -> PoisonKey {
112 table_poison_key(self.data.table_id())
113 }
114
115 async fn put_poison(
116 &self,
117 ctx_provider: &dyn ContextProvider,
118 procedure_id: ProcedureId,
119 ) -> Result<()> {
120 let poison_key = self.table_poison_key();
121 ctx_provider
122 .try_put_poison(&poison_key, procedure_id)
123 .await
124 .context(PutPoisonSnafu)
125 }
126
127 pub async fn submit_alter_region_requests(
128 &mut self,
129 procedure_id: ProcedureId,
130 ctx_provider: &dyn ContextProvider,
131 ) -> Result<Status> {
132 let table_id = self.data.table_id();
133 let (_, physical_table_route) = self
134 .context
135 .table_metadata_manager
136 .table_route_manager()
137 .get_physical_table_route(table_id)
138 .await?;
139
140 self.data.region_distribution =
141 Some(region_distribution(&physical_table_route.region_routes));
142
143 let leaders = find_leaders(&physical_table_route.region_routes);
144 let mut alter_region_tasks = Vec::with_capacity(leaders.len());
145 let alter_kind = self.make_region_alter_kind()?;
146
147 info!(
148 "Submitting alter region requests for table {}, table_id: {}, alter_kind: {:?}",
149 self.data.table_ref(),
150 table_id,
151 alter_kind,
152 );
153
154 ensure!(!leaders.is_empty(), NoLeaderSnafu { table_id });
155 self.put_poison(ctx_provider, procedure_id).await?;
157 for datanode in leaders {
158 let requester = self.context.node_manager.datanode(&datanode).await;
159 let regions = find_leader_regions(&physical_table_route.region_routes, &datanode);
160
161 for region in regions {
162 let region_id = RegionId::new(table_id, region);
163 let request = self.make_alter_region_request(region_id, alter_kind.clone())?;
164 debug!("Submitting {request:?} to {datanode}");
165
166 let datanode = datanode.clone();
167 let requester = requester.clone();
168
169 alter_region_tasks.push(async move {
170 requester
171 .handle(request)
172 .await
173 .map_err(add_peer_context_if_needed(datanode))
174 });
175 }
176 }
177
178 let results = future::join_all(alter_region_tasks)
179 .await
180 .into_iter()
181 .collect::<Vec<_>>();
182
183 match handle_multiple_results(results) {
184 MultipleResults::PartialRetryable(error) => {
185 Err(error)
187 }
188 MultipleResults::PartialNonRetryable(error) => {
189 error!(error; "Partial non-retryable errors occurred during alter table, table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
190 Ok(Status::poisoned(
192 Some(self.table_poison_key()),
193 ProcedureError::external(error),
194 ))
195 }
196 MultipleResults::AllRetryable(error) => {
197 Err(error)
199 }
200 MultipleResults::Ok(results) => {
201 self.submit_sync_region_requests(results, &physical_table_route.region_routes)
202 .await;
203 self.data.state = AlterTableState::UpdateMetadata;
204 Ok(Status::executing_with_clean_poisons(true))
205 }
206 MultipleResults::AllNonRetryable(error) => {
207 error!(error; "All alter requests returned non-retryable errors for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
208 let err = BoxedError::new(error);
212 Err(err).context(AbortProcedureSnafu {
213 clean_poisons: true,
214 })
215 }
216 }
217 }
218
219 async fn submit_sync_region_requests(
220 &mut self,
221 results: Vec<RegionResponse>,
222 region_routes: &[RegionRoute],
223 ) {
224 let table_info = self.data.table_info().unwrap();
226 if let Err(err) = sync_follower_regions(
227 &self.context,
228 self.data.table_id(),
229 results,
230 region_routes,
231 table_info.meta.engine.as_str(),
232 )
233 .await
234 {
235 error!(err; "Failed to sync regions for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
236 }
237 }
238
239 pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
241 let table_id = self.data.table_id();
242 let table_ref = self.data.table_ref();
243 let table_info_value = self.data.table_info_value.as_ref().unwrap();
245 let new_info = match &self.new_table_info {
247 Some(cached) => cached.clone(),
248 None => self.build_new_table_info(&table_info_value.table_info)
249 .inspect_err(|e| {
250 error!(e; "Unable to build info for table {} in update metadata step, table_id: {}", table_ref, table_id);
252 })?,
253 };
254
255 debug!(
256 "Starting update table: {} metadata, new table info {:?}",
257 table_ref.to_string(),
258 new_info
259 );
260
261 let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
263 if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
264 self.on_update_metadata_for_rename(new_table_name.to_string(), table_info_value)
265 .await?;
266 } else {
267 let region_distribution = self.data.region_distribution.as_ref().unwrap().clone();
269 self.on_update_metadata_for_alter(
270 new_info.into(),
271 region_distribution,
272 table_info_value,
273 )
274 .await?;
275 }
276
277 info!("Updated table metadata for table {table_ref}, table_id: {table_id}, kind: {alter_kind:?}");
278 self.data.state = AlterTableState::InvalidateTableCache;
279 Ok(Status::executing(true))
280 }
281
282 async fn on_broadcast(&mut self) -> Result<Status> {
284 let cache_invalidator = &self.context.cache_invalidator;
285
286 cache_invalidator
287 .invalidate(
288 &Context::default(),
289 &[
290 CacheIdent::TableId(self.data.table_id()),
291 CacheIdent::TableName(self.data.table_ref().into()),
292 ],
293 )
294 .await?;
295
296 Ok(Status::done())
297 }
298
299 fn lock_key_inner(&self) -> Vec<StringKey> {
300 let mut lock_key = vec![];
301 let table_ref = self.data.table_ref();
302 let table_id = self.data.table_id();
303 lock_key.push(CatalogLock::Read(table_ref.catalog).into());
304 lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
305 lock_key.push(TableLock::Write(table_id).into());
306
307 let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
309 if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
310 lock_key.push(
311 TableNameLock::new(table_ref.catalog, table_ref.schema, new_table_name).into(),
312 )
313 }
314
315 lock_key
316 }
317}
318
319#[async_trait]
320impl Procedure for AlterTableProcedure {
321 fn type_name(&self) -> &str {
322 Self::TYPE_NAME
323 }
324
325 async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
326 let error_handler = |e: Error| {
327 if e.is_retry_later() {
328 ProcedureError::retry_later(e)
329 } else if e.need_clean_poisons() {
330 ProcedureError::external_and_clean_poisons(e)
331 } else {
332 ProcedureError::external(e)
333 }
334 };
335
336 let state = &self.data.state;
337
338 let step = state.as_ref();
339
340 let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE
341 .with_label_values(&[step])
342 .start_timer();
343
344 match state {
345 AlterTableState::Prepare => self.on_prepare().await,
346 AlterTableState::SubmitAlterRegionRequests => {
347 self.submit_alter_region_requests(ctx.procedure_id, ctx.provider.as_ref())
348 .await
349 }
350 AlterTableState::UpdateMetadata => self.on_update_metadata().await,
351 AlterTableState::InvalidateTableCache => self.on_broadcast().await,
352 }
353 .map_err(error_handler)
354 }
355
356 fn dump(&self) -> ProcedureResult<String> {
357 serde_json::to_string(&self.data).context(ToJsonSnafu)
358 }
359
360 fn lock_key(&self) -> LockKey {
361 let key = self.lock_key_inner();
362
363 LockKey::new(key)
364 }
365
366 fn poison_keys(&self) -> PoisonKeys {
367 PoisonKeys::new(vec![self.table_poison_key()])
368 }
369}
370
371#[derive(Debug, Serialize, Deserialize, AsRefStr)]
372enum AlterTableState {
373 Prepare,
375 SubmitAlterRegionRequests,
377 UpdateMetadata,
379 InvalidateTableCache,
381}
382
383#[derive(Debug, Serialize, Deserialize)]
385pub struct AlterTableData {
386 state: AlterTableState,
387 task: AlterTableTask,
388 table_id: TableId,
389 table_info_value: Option<DeserializedValueWithBytes<TableInfoValue>>,
391 region_distribution: Option<RegionDistribution>,
393}
394
395impl AlterTableData {
396 pub fn new(task: AlterTableTask, table_id: TableId) -> Self {
397 Self {
398 state: AlterTableState::Prepare,
399 task,
400 table_id,
401 table_info_value: None,
402 region_distribution: None,
403 }
404 }
405
406 fn table_ref(&self) -> TableReference {
407 self.task.table_ref()
408 }
409
410 fn table_id(&self) -> TableId {
411 self.table_id
412 }
413
414 fn table_info(&self) -> Option<&RawTableInfo> {
415 self.table_info_value
416 .as_ref()
417 .map(|value| &value.table_info)
418 }
419}