1mod check;
16mod metadata;
17mod region_request;
18mod update_metadata;
19
20use std::vec;
21
22use api::region::RegionResponse;
23use api::v1::alter_table_expr::Kind;
24use api::v1::RenameTable;
25use async_trait::async_trait;
26use common_error::ext::BoxedError;
27use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
28use common_procedure::{
29 Context as ProcedureContext, ContextProvider, Error as ProcedureError, LockKey, PoisonKey,
30 PoisonKeys, Procedure, ProcedureId, Status, StringKey,
31};
32use common_telemetry::{debug, error, info};
33use futures::future::{self};
34use serde::{Deserialize, Serialize};
35use snafu::{ensure, ResultExt};
36use store_api::storage::RegionId;
37use strum::AsRefStr;
38use table::metadata::{RawTableInfo, TableId, TableInfo};
39use table::table_reference::TableReference;
40
41use crate::cache_invalidator::Context;
42use crate::ddl::utils::{
43 add_peer_context_if_needed, handle_multiple_results, map_to_procedure_error,
44 sync_follower_regions, MultipleResults,
45};
46use crate::ddl::DdlContext;
47use crate::error::{AbortProcedureSnafu, NoLeaderSnafu, PutPoisonSnafu, Result, RetryLaterSnafu};
48use crate::instruction::CacheIdent;
49use crate::key::table_info::TableInfoValue;
50use crate::key::{DeserializedValueWithBytes, RegionDistribution};
51use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
52use crate::metrics;
53use crate::poison_key::table_poison_key;
54use crate::rpc::ddl::AlterTableTask;
55use crate::rpc::router::{find_leader_regions, find_leaders, region_distribution, RegionRoute};
56
57pub struct AlterTableProcedure {
59 context: DdlContext,
61 data: AlterTableData,
63 new_table_info: Option<TableInfo>,
67}
68
69impl AlterTableProcedure {
70 pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterTable";
71
72 pub fn new(table_id: TableId, task: AlterTableTask, context: DdlContext) -> Result<Self> {
73 task.validate()?;
74 Ok(Self {
75 context,
76 data: AlterTableData::new(task, table_id),
77 new_table_info: None,
78 })
79 }
80
81 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
82 let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
83 Ok(AlterTableProcedure {
84 context,
85 data,
86 new_table_info: None,
87 })
88 }
89
90 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
92 self.check_alter().await?;
93 self.fill_table_info().await?;
94
95 let table_info_value = self.data.table_info_value.as_ref().unwrap();
100 self.new_table_info = Some(self.build_new_table_info(&table_info_value.table_info)?);
101
102 let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
104 if matches!(alter_kind, Kind::RenameTable { .. }) {
105 self.data.state = AlterTableState::UpdateMetadata;
106 } else {
107 self.data.state = AlterTableState::SubmitAlterRegionRequests;
108 };
109 Ok(Status::executing(true))
110 }
111
112 fn table_poison_key(&self) -> PoisonKey {
113 table_poison_key(self.data.table_id())
114 }
115
116 async fn put_poison(
117 &self,
118 ctx_provider: &dyn ContextProvider,
119 procedure_id: ProcedureId,
120 ) -> Result<()> {
121 let poison_key = self.table_poison_key();
122 ctx_provider
123 .try_put_poison(&poison_key, procedure_id)
124 .await
125 .context(PutPoisonSnafu)
126 }
127
128 pub async fn submit_alter_region_requests(
129 &mut self,
130 procedure_id: ProcedureId,
131 ctx_provider: &dyn ContextProvider,
132 ) -> Result<Status> {
133 let table_id = self.data.table_id();
134 let (_, physical_table_route) = self
135 .context
136 .table_metadata_manager
137 .table_route_manager()
138 .get_physical_table_route(table_id)
139 .await?;
140
141 self.data.region_distribution =
142 Some(region_distribution(&physical_table_route.region_routes));
143
144 let leaders = find_leaders(&physical_table_route.region_routes);
145 let mut alter_region_tasks = Vec::with_capacity(leaders.len());
146 let alter_kind = self.make_region_alter_kind()?;
147
148 info!(
149 "Submitting alter region requests for table {}, table_id: {}, alter_kind: {:?}",
150 self.data.table_ref(),
151 table_id,
152 alter_kind,
153 );
154
155 ensure!(!leaders.is_empty(), NoLeaderSnafu { table_id });
156 self.put_poison(ctx_provider, procedure_id).await?;
158 for datanode in leaders {
159 let requester = self.context.node_manager.datanode(&datanode).await;
160 let regions = find_leader_regions(&physical_table_route.region_routes, &datanode);
161
162 for region in regions {
163 let region_id = RegionId::new(table_id, region);
164 let request = self.make_alter_region_request(region_id, alter_kind.clone())?;
165 debug!("Submitting {request:?} to {datanode}");
166
167 let datanode = datanode.clone();
168 let requester = requester.clone();
169
170 alter_region_tasks.push(async move {
171 requester
172 .handle(request)
173 .await
174 .map_err(add_peer_context_if_needed(datanode))
175 });
176 }
177 }
178
179 let results = future::join_all(alter_region_tasks)
180 .await
181 .into_iter()
182 .collect::<Vec<_>>();
183
184 match handle_multiple_results(results) {
185 MultipleResults::PartialRetryable(error) => {
186 Err(error)
188 }
189 MultipleResults::PartialNonRetryable(error) => {
190 error!(error; "Partial non-retryable errors occurred during alter table, table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
191 Ok(Status::poisoned(
193 Some(self.table_poison_key()),
194 ProcedureError::external(error),
195 ))
196 }
197 MultipleResults::AllRetryable(error) => {
198 let err = BoxedError::new(error);
200 Err(err).context(RetryLaterSnafu {
201 clean_poisons: true,
202 })
203 }
204 MultipleResults::Ok(results) => {
205 self.submit_sync_region_requests(results, &physical_table_route.region_routes)
206 .await;
207 self.data.state = AlterTableState::UpdateMetadata;
208 Ok(Status::executing_with_clean_poisons(true))
209 }
210 MultipleResults::AllNonRetryable(error) => {
211 error!(error; "All alter requests returned non-retryable errors for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
212 let err = BoxedError::new(error);
216 Err(err).context(AbortProcedureSnafu {
217 clean_poisons: true,
218 })
219 }
220 }
221 }
222
223 async fn submit_sync_region_requests(
224 &mut self,
225 results: Vec<RegionResponse>,
226 region_routes: &[RegionRoute],
227 ) {
228 let table_info = self.data.table_info().unwrap();
230 if let Err(err) = sync_follower_regions(
231 &self.context,
232 self.data.table_id(),
233 results,
234 region_routes,
235 table_info.meta.engine.as_str(),
236 )
237 .await
238 {
239 error!(err; "Failed to sync regions for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
240 }
241 }
242
243 pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
245 let table_id = self.data.table_id();
246 let table_ref = self.data.table_ref();
247 let table_info_value = self.data.table_info_value.as_ref().unwrap();
249 let new_info = match &self.new_table_info {
251 Some(cached) => cached.clone(),
252 None => self.build_new_table_info(&table_info_value.table_info)
253 .inspect_err(|e| {
254 error!(e; "Unable to build info for table {} in update metadata step, table_id: {}", table_ref, table_id);
256 })?,
257 };
258
259 debug!(
260 "Starting update table: {} metadata, new table info {:?}",
261 table_ref.to_string(),
262 new_info
263 );
264
265 let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
267 if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
268 self.on_update_metadata_for_rename(new_table_name.to_string(), table_info_value)
269 .await?;
270 } else {
271 let region_distribution = self.data.region_distribution.as_ref().unwrap().clone();
273 self.on_update_metadata_for_alter(
274 new_info.into(),
275 region_distribution,
276 table_info_value,
277 )
278 .await?;
279 }
280
281 info!("Updated table metadata for table {table_ref}, table_id: {table_id}, kind: {alter_kind:?}");
282 self.data.state = AlterTableState::InvalidateTableCache;
283 Ok(Status::executing(true))
284 }
285
286 async fn on_broadcast(&mut self) -> Result<Status> {
288 let cache_invalidator = &self.context.cache_invalidator;
289
290 cache_invalidator
291 .invalidate(
292 &Context::default(),
293 &[
294 CacheIdent::TableId(self.data.table_id()),
295 CacheIdent::TableName(self.data.table_ref().into()),
296 ],
297 )
298 .await?;
299
300 Ok(Status::done())
301 }
302
303 fn lock_key_inner(&self) -> Vec<StringKey> {
304 let mut lock_key = vec![];
305 let table_ref = self.data.table_ref();
306 let table_id = self.data.table_id();
307 lock_key.push(CatalogLock::Read(table_ref.catalog).into());
308 lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
309 lock_key.push(TableLock::Write(table_id).into());
310
311 let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
313 if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
314 lock_key.push(
315 TableNameLock::new(table_ref.catalog, table_ref.schema, new_table_name).into(),
316 )
317 }
318
319 lock_key
320 }
321}
322
323#[async_trait]
324impl Procedure for AlterTableProcedure {
325 fn type_name(&self) -> &str {
326 Self::TYPE_NAME
327 }
328
329 async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
330 let state = &self.data.state;
331
332 let step = state.as_ref();
333
334 let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE
335 .with_label_values(&[step])
336 .start_timer();
337
338 match state {
339 AlterTableState::Prepare => self.on_prepare().await,
340 AlterTableState::SubmitAlterRegionRequests => {
341 self.submit_alter_region_requests(ctx.procedure_id, ctx.provider.as_ref())
342 .await
343 }
344 AlterTableState::UpdateMetadata => self.on_update_metadata().await,
345 AlterTableState::InvalidateTableCache => self.on_broadcast().await,
346 }
347 .map_err(map_to_procedure_error)
348 }
349
350 fn dump(&self) -> ProcedureResult<String> {
351 serde_json::to_string(&self.data).context(ToJsonSnafu)
352 }
353
354 fn lock_key(&self) -> LockKey {
355 let key = self.lock_key_inner();
356
357 LockKey::new(key)
358 }
359
360 fn poison_keys(&self) -> PoisonKeys {
361 PoisonKeys::new(vec![self.table_poison_key()])
362 }
363}
364
365#[derive(Debug, Serialize, Deserialize, AsRefStr)]
366enum AlterTableState {
367 Prepare,
369 SubmitAlterRegionRequests,
371 UpdateMetadata,
373 InvalidateTableCache,
375}
376
377#[derive(Debug, Serialize, Deserialize)]
379pub struct AlterTableData {
380 state: AlterTableState,
381 task: AlterTableTask,
382 table_id: TableId,
383 table_info_value: Option<DeserializedValueWithBytes<TableInfoValue>>,
385 region_distribution: Option<RegionDistribution>,
387}
388
389impl AlterTableData {
390 pub fn new(task: AlterTableTask, table_id: TableId) -> Self {
391 Self {
392 state: AlterTableState::Prepare,
393 task,
394 table_id,
395 table_info_value: None,
396 region_distribution: None,
397 }
398 }
399
400 fn table_ref(&self) -> TableReference {
401 self.task.table_ref()
402 }
403
404 fn table_id(&self) -> TableId {
405 self.table_id
406 }
407
408 fn table_info(&self) -> Option<&RawTableInfo> {
409 self.table_info_value
410 .as_ref()
411 .map(|value| &value.table_info)
412 }
413}