1mod executor;
16mod update_metadata;
17mod validator;
18
19use api::region::RegionResponse;
20use async_trait::async_trait;
21use common_catalog::format_full_table_name;
22use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
23use common_procedure::{Context, LockKey, Procedure, Status};
24use common_telemetry::{debug, error, info, warn};
25pub use executor::make_alter_region_request;
26use serde::{Deserialize, Serialize};
27use snafu::ResultExt;
28use store_api::metadata::ColumnMetadata;
29use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
30use strum::AsRefStr;
31use table::metadata::TableId;
32
33use crate::cache_invalidator::Context as CacheContext;
34use crate::ddl::DdlContext;
35use crate::ddl::alter_logical_tables::executor::AlterLogicalTablesExecutor;
36use crate::ddl::alter_logical_tables::validator::{
37 AlterLogicalTableValidator, ValidatorResult, retain_unskipped,
38};
39use crate::ddl::utils::{extract_column_metadatas, map_to_procedure_error, sync_follower_regions};
40use crate::error::Result;
41use crate::instruction::CacheIdent;
42use crate::key::DeserializedValueWithBytes;
43use crate::key::table_info::TableInfoValue;
44use crate::key::table_route::PhysicalTableRouteValue;
45use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
46use crate::metrics;
47use crate::rpc::ddl::AlterTableTask;
48use crate::rpc::router::RegionRoute;
49
50pub struct AlterLogicalTablesProcedure {
51 pub context: DdlContext,
52 pub data: AlterTablesData,
53 pub physical_table_route: Option<PhysicalTableRouteValue>,
55}
56
57fn build_validator_from_alter_table_data<'a>(
59 data: &'a AlterTablesData,
60) -> AlterLogicalTableValidator<'a> {
61 let phsycial_table_id = data.physical_table_id;
62 let alters = data
63 .tasks
64 .iter()
65 .map(|task| &task.alter_table)
66 .collect::<Vec<_>>();
67 AlterLogicalTableValidator::new(phsycial_table_id, alters)
68}
69
70fn build_executor_from_alter_expr<'a>(data: &'a AlterTablesData) -> AlterLogicalTablesExecutor<'a> {
72 debug_assert_eq!(data.tasks.len(), data.table_info_values.len());
73 let alters = data
74 .tasks
75 .iter()
76 .zip(data.table_info_values.iter())
77 .map(|(task, table_info)| (table_info.table_info.ident.table_id, &task.alter_table))
78 .collect::<Vec<_>>();
79 AlterLogicalTablesExecutor::new(alters)
80}
81
82impl AlterLogicalTablesProcedure {
83 pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterLogicalTables";
84
85 pub fn new(
86 tasks: Vec<AlterTableTask>,
87 physical_table_id: TableId,
88 context: DdlContext,
89 ) -> Self {
90 Self {
91 context,
92 data: AlterTablesData {
93 state: AlterTablesState::Prepare,
94 tasks,
95 table_info_values: vec![],
96 physical_table_id,
97 physical_table_info: None,
98 physical_columns: vec![],
99 table_cache_keys_to_invalidate: vec![],
100 },
101 physical_table_route: None,
102 }
103 }
104
105 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
106 let data = serde_json::from_str(json).context(FromJsonSnafu)?;
107 Ok(Self {
108 context,
109 data,
110 physical_table_route: None,
111 })
112 }
113
114 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
115 let validator = build_validator_from_alter_table_data(&self.data);
116 let ValidatorResult {
117 num_skipped,
118 skip_alter,
119 table_info_values,
120 physical_table_info,
121 physical_table_route,
122 } = validator
123 .validate(&self.context.table_metadata_manager)
124 .await?;
125
126 let num_tasks = self.data.tasks.len();
127 if num_skipped == num_tasks {
128 info!("All the alter tasks are finished, will skip the procedure.");
129 let cache_ident_keys = AlterLogicalTablesExecutor::build_cache_ident_keys(
130 &physical_table_info,
131 &table_info_values
132 .iter()
133 .map(|v| v.get_inner_ref())
134 .collect::<Vec<_>>(),
135 );
136 self.data.table_cache_keys_to_invalidate = cache_ident_keys;
137 self.data.state = AlterTablesState::InvalidateTableCache;
139 return Ok(Status::executing(true));
140 } else if num_skipped > 0 {
141 info!(
142 "There are {} alter tasks, {} of them were already finished.",
143 num_tasks, num_skipped
144 );
145 }
146
147 retain_unskipped(&mut self.data.tasks, &skip_alter);
149 self.data.physical_table_info = Some(physical_table_info);
150 self.data.table_info_values = table_info_values;
151 debug_assert_eq!(self.data.tasks.len(), self.data.table_info_values.len());
152 self.physical_table_route = Some(physical_table_route);
153 self.data.state = AlterTablesState::SubmitAlterRegionRequests;
154 Ok(Status::executing(true))
155 }
156
157 pub(crate) async fn on_submit_alter_region_requests(&mut self) -> Result<Status> {
158 self.fetch_physical_table_route_if_non_exist().await?;
159 let region_routes = &self.physical_table_route.as_ref().unwrap().region_routes;
161
162 let executor = build_executor_from_alter_expr(&self.data);
163 let mut results = executor
164 .on_alter_regions(
165 &self.context.node_manager,
166 region_routes,
168 )
169 .await?;
170
171 if let Some(column_metadatas) =
172 extract_column_metadatas(&mut results, ALTER_PHYSICAL_EXTENSION_KEY)?
173 {
174 self.data.physical_columns = column_metadatas;
175 } else {
176 warn!(
177 "altering logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged"
178 );
179 }
180 self.submit_sync_region_requests(results, region_routes)
181 .await;
182 self.data.state = AlterTablesState::UpdateMetadata;
183 Ok(Status::executing(true))
184 }
185
186 async fn submit_sync_region_requests(
187 &self,
188 results: Vec<RegionResponse>,
189 region_routes: &[RegionRoute],
190 ) {
191 let table_info = &self.data.physical_table_info.as_ref().unwrap().table_info;
192 if let Err(err) = sync_follower_regions(
193 &self.context,
194 self.data.physical_table_id,
195 &results,
196 region_routes,
197 table_info.meta.engine.as_str(),
198 )
199 .await
200 {
201 error!(err; "Failed to sync regions for table {}, table_id: {}",
202 format_full_table_name(&table_info.catalog_name, &table_info.schema_name, &table_info.name),
203 self.data.physical_table_id
204 );
205 }
206 }
207
208 pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
209 self.update_physical_table_metadata().await?;
210 self.update_logical_tables_metadata().await?;
211
212 let logical_table_info_values = self
213 .data
214 .table_info_values
215 .iter()
216 .map(|v| v.get_inner_ref())
217 .collect::<Vec<_>>();
218
219 let cache_ident_keys = AlterLogicalTablesExecutor::build_cache_ident_keys(
220 self.data.physical_table_info.as_ref().unwrap(),
221 &logical_table_info_values,
222 );
223 self.data.table_cache_keys_to_invalidate = cache_ident_keys;
224 self.data.clear_metadata_fields();
225
226 self.data.state = AlterTablesState::InvalidateTableCache;
227 Ok(Status::executing(true))
228 }
229
230 pub(crate) async fn on_invalidate_table_cache(&mut self) -> Result<Status> {
231 let to_invalidate = &self.data.table_cache_keys_to_invalidate;
232
233 let ctx = CacheContext {
234 subject: Some(format!(
235 "Invalidate table cache by altering logical tables, physical_table_id: {}",
236 self.data.physical_table_id,
237 )),
238 };
239
240 self.context
241 .cache_invalidator
242 .invalidate(&ctx, to_invalidate)
243 .await?;
244 Ok(Status::done())
245 }
246
247 async fn fetch_physical_table_route_if_non_exist(&mut self) -> Result<()> {
249 if self.physical_table_route.is_none() {
250 let (_, physical_table_route) = self
251 .context
252 .table_metadata_manager
253 .table_route_manager()
254 .get_physical_table_route(self.data.physical_table_id)
255 .await?;
256 self.physical_table_route = Some(physical_table_route);
257 }
258
259 Ok(())
260 }
261}
262
263#[async_trait]
264impl Procedure for AlterLogicalTablesProcedure {
265 fn type_name(&self) -> &str {
266 Self::TYPE_NAME
267 }
268
269 async fn execute(&mut self, _ctx: &Context) -> ProcedureResult<Status> {
270 let state = &self.data.state;
271
272 let step = state.as_ref();
273
274 let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE
275 .with_label_values(&[step])
276 .start_timer();
277 debug!(
278 "Executing alter logical tables procedure, state: {:?}",
279 state
280 );
281
282 match state {
283 AlterTablesState::Prepare => self.on_prepare().await,
284 AlterTablesState::SubmitAlterRegionRequests => {
285 self.on_submit_alter_region_requests().await
286 }
287 AlterTablesState::UpdateMetadata => self.on_update_metadata().await,
288 AlterTablesState::InvalidateTableCache => self.on_invalidate_table_cache().await,
289 }
290 .inspect_err(|_| {
291 self.physical_table_route = None;
293 })
294 .map_err(map_to_procedure_error)
295 }
296
297 fn dump(&self) -> ProcedureResult<String> {
298 serde_json::to_string(&self.data).context(ToJsonSnafu)
299 }
300
301 fn lock_key(&self) -> LockKey {
302 let mut lock_key = Vec::with_capacity(2 + 1 + self.data.tasks.len());
306 let table_ref = self.data.tasks[0].table_ref();
307 lock_key.push(CatalogLock::Read(table_ref.catalog).into());
308 lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
309 lock_key.push(TableLock::Write(self.data.physical_table_id).into());
310 lock_key.extend(
311 self.data
312 .table_info_values
313 .iter()
314 .map(|table| TableLock::Write(table.table_info.ident.table_id).into()),
315 );
316
317 LockKey::new(lock_key)
318 }
319}
320
321#[derive(Debug, Serialize, Deserialize)]
322pub struct AlterTablesData {
323 state: AlterTablesState,
324 tasks: Vec<AlterTableTask>,
325 table_info_values: Vec<DeserializedValueWithBytes<TableInfoValue>>,
328 physical_table_id: TableId,
330 physical_table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
331 physical_columns: Vec<ColumnMetadata>,
332 table_cache_keys_to_invalidate: Vec<CacheIdent>,
333}
334
335impl AlterTablesData {
336 fn clear_metadata_fields(&mut self) {
339 self.tasks.clear();
340 self.table_info_values.clear();
341 self.physical_table_id = 0;
342 self.physical_table_info = None;
343 self.physical_columns.clear();
344 }
345}
346
347#[derive(Debug, Serialize, Deserialize, AsRefStr)]
348enum AlterTablesState {
349 Prepare,
351 SubmitAlterRegionRequests,
352 UpdateMetadata,
354 InvalidateTableCache,
356}