1mod executor;
16mod update_metadata;
17mod validator;
18
19use api::region::RegionResponse;
20use async_trait::async_trait;
21use common_catalog::format_full_table_name;
22use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
23use common_procedure::{Context, LockKey, Procedure, Status};
24use common_telemetry::{debug, error, info, warn};
25pub use executor::make_alter_region_request;
26use serde::{Deserialize, Serialize};
27use snafu::ResultExt;
28use store_api::metadata::ColumnMetadata;
29use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
30use strum::AsRefStr;
31use table::metadata::TableId;
32
33use crate::cache_invalidator::Context as CacheContext;
34use crate::ddl::alter_logical_tables::executor::AlterLogicalTablesExecutor;
35use crate::ddl::alter_logical_tables::validator::{
36 retain_unskipped, AlterLogicalTableValidator, ValidatorResult,
37};
38use crate::ddl::utils::{extract_column_metadatas, map_to_procedure_error, sync_follower_regions};
39use crate::ddl::DdlContext;
40use crate::error::Result;
41use crate::instruction::CacheIdent;
42use crate::key::table_info::TableInfoValue;
43use crate::key::table_route::PhysicalTableRouteValue;
44use crate::key::DeserializedValueWithBytes;
45use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
46use crate::metrics;
47use crate::rpc::ddl::AlterTableTask;
48use crate::rpc::router::RegionRoute;
49
50pub struct AlterLogicalTablesProcedure {
51 pub context: DdlContext,
52 pub data: AlterTablesData,
53}
54
55fn build_validator_from_alter_table_data<'a>(
57 data: &'a AlterTablesData,
58) -> AlterLogicalTableValidator<'a> {
59 let phsycial_table_id = data.physical_table_id;
60 let alters = data
61 .tasks
62 .iter()
63 .map(|task| &task.alter_table)
64 .collect::<Vec<_>>();
65 AlterLogicalTableValidator::new(phsycial_table_id, alters)
66}
67
68fn build_executor_from_alter_expr<'a>(data: &'a AlterTablesData) -> AlterLogicalTablesExecutor<'a> {
70 debug_assert_eq!(data.tasks.len(), data.table_info_values.len());
71 let alters = data
72 .tasks
73 .iter()
74 .zip(data.table_info_values.iter())
75 .map(|(task, table_info)| (table_info.table_info.ident.table_id, &task.alter_table))
76 .collect::<Vec<_>>();
77 AlterLogicalTablesExecutor::new(alters)
78}
79
80impl AlterLogicalTablesProcedure {
81 pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterLogicalTables";
82
83 pub fn new(
84 tasks: Vec<AlterTableTask>,
85 physical_table_id: TableId,
86 context: DdlContext,
87 ) -> Self {
88 Self {
89 context,
90 data: AlterTablesData {
91 state: AlterTablesState::Prepare,
92 tasks,
93 table_info_values: vec![],
94 physical_table_id,
95 physical_table_info: None,
96 physical_table_route: None,
97 physical_columns: vec![],
98 table_cache_keys_to_invalidate: vec![],
99 },
100 }
101 }
102
103 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
104 let data = serde_json::from_str(json).context(FromJsonSnafu)?;
105 Ok(Self { context, data })
106 }
107
108 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
109 let validator = build_validator_from_alter_table_data(&self.data);
110 let ValidatorResult {
111 num_skipped,
112 skip_alter,
113 table_info_values,
114 physical_table_info,
115 physical_table_route,
116 } = validator
117 .validate(&self.context.table_metadata_manager)
118 .await?;
119
120 let num_tasks = self.data.tasks.len();
121 if num_skipped == num_tasks {
122 info!("All the alter tasks are finished, will skip the procedure.");
123 let cache_ident_keys = AlterLogicalTablesExecutor::build_cache_ident_keys(
124 &physical_table_info,
125 &table_info_values
126 .iter()
127 .map(|v| v.get_inner_ref())
128 .collect::<Vec<_>>(),
129 );
130 self.data.table_cache_keys_to_invalidate = cache_ident_keys;
131 self.data.state = AlterTablesState::InvalidateTableCache;
133 return Ok(Status::executing(true));
134 } else if num_skipped > 0 {
135 info!(
136 "There are {} alter tasks, {} of them were already finished.",
137 num_tasks, num_skipped
138 );
139 }
140
141 retain_unskipped(&mut self.data.tasks, &skip_alter);
143 self.data.physical_table_info = Some(physical_table_info);
144 self.data.physical_table_route = Some(physical_table_route);
145 self.data.table_info_values = table_info_values;
146 debug_assert_eq!(self.data.tasks.len(), self.data.table_info_values.len());
147 self.data.state = AlterTablesState::SubmitAlterRegionRequests;
148 Ok(Status::executing(true))
149 }
150
151 pub(crate) async fn on_submit_alter_region_requests(&mut self) -> Result<Status> {
152 let physical_table_route = &self.data.physical_table_route.as_ref().unwrap();
154 let executor = build_executor_from_alter_expr(&self.data);
155 let mut results = executor
156 .on_alter_regions(
157 &self.context.node_manager,
158 &physical_table_route.region_routes,
159 )
160 .await?;
161
162 if let Some(column_metadatas) =
163 extract_column_metadatas(&mut results, ALTER_PHYSICAL_EXTENSION_KEY)?
164 {
165 self.data.physical_columns = column_metadatas;
166 } else {
167 warn!("altering logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
168 }
169 self.submit_sync_region_requests(results, &physical_table_route.region_routes)
170 .await;
171 self.data.state = AlterTablesState::UpdateMetadata;
172 Ok(Status::executing(true))
173 }
174
175 async fn submit_sync_region_requests(
176 &self,
177 results: Vec<RegionResponse>,
178 region_routes: &[RegionRoute],
179 ) {
180 let table_info = &self.data.physical_table_info.as_ref().unwrap().table_info;
181 if let Err(err) = sync_follower_regions(
182 &self.context,
183 self.data.physical_table_id,
184 &results,
185 region_routes,
186 table_info.meta.engine.as_str(),
187 )
188 .await
189 {
190 error!(err; "Failed to sync regions for table {}, table_id: {}",
191 format_full_table_name(&table_info.catalog_name, &table_info.schema_name, &table_info.name),
192 self.data.physical_table_id
193 );
194 }
195 }
196
197 pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
198 self.update_physical_table_metadata().await?;
199 self.update_logical_tables_metadata().await?;
200
201 let logical_table_info_values = self
202 .data
203 .table_info_values
204 .iter()
205 .map(|v| v.get_inner_ref())
206 .collect::<Vec<_>>();
207
208 let cache_ident_keys = AlterLogicalTablesExecutor::build_cache_ident_keys(
209 self.data.physical_table_info.as_ref().unwrap(),
210 &logical_table_info_values,
211 );
212 self.data.table_cache_keys_to_invalidate = cache_ident_keys;
213 self.data.clear_metadata_fields();
214
215 self.data.state = AlterTablesState::InvalidateTableCache;
216 Ok(Status::executing(true))
217 }
218
219 pub(crate) async fn on_invalidate_table_cache(&mut self) -> Result<Status> {
220 let to_invalidate = &self.data.table_cache_keys_to_invalidate;
221
222 let ctx = CacheContext {
223 subject: Some(format!(
224 "Invalidate table cache by altering logical tables, physical_table_id: {}",
225 self.data.physical_table_id,
226 )),
227 };
228
229 self.context
230 .cache_invalidator
231 .invalidate(&ctx, to_invalidate)
232 .await?;
233 Ok(Status::done())
234 }
235}
236
237#[async_trait]
238impl Procedure for AlterLogicalTablesProcedure {
239 fn type_name(&self) -> &str {
240 Self::TYPE_NAME
241 }
242
243 async fn execute(&mut self, _ctx: &Context) -> ProcedureResult<Status> {
244 let state = &self.data.state;
245
246 let step = state.as_ref();
247
248 let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE
249 .with_label_values(&[step])
250 .start_timer();
251 debug!(
252 "Executing alter logical tables procedure, state: {:?}",
253 state
254 );
255
256 match state {
257 AlterTablesState::Prepare => self.on_prepare().await,
258 AlterTablesState::SubmitAlterRegionRequests => {
259 self.on_submit_alter_region_requests().await
260 }
261 AlterTablesState::UpdateMetadata => self.on_update_metadata().await,
262 AlterTablesState::InvalidateTableCache => self.on_invalidate_table_cache().await,
263 }
264 .map_err(map_to_procedure_error)
265 }
266
267 fn dump(&self) -> ProcedureResult<String> {
268 serde_json::to_string(&self.data).context(ToJsonSnafu)
269 }
270
271 fn lock_key(&self) -> LockKey {
272 let mut lock_key = Vec::with_capacity(2 + 1 + self.data.tasks.len());
276 let table_ref = self.data.tasks[0].table_ref();
277 lock_key.push(CatalogLock::Read(table_ref.catalog).into());
278 lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
279 lock_key.push(TableLock::Write(self.data.physical_table_id).into());
280 lock_key.extend(
281 self.data
282 .table_info_values
283 .iter()
284 .map(|table| TableLock::Write(table.table_info.ident.table_id).into()),
285 );
286
287 LockKey::new(lock_key)
288 }
289}
290
291#[derive(Debug, Serialize, Deserialize)]
292pub struct AlterTablesData {
293 state: AlterTablesState,
294 tasks: Vec<AlterTableTask>,
295 table_info_values: Vec<DeserializedValueWithBytes<TableInfoValue>>,
298 physical_table_id: TableId,
300 physical_table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
301 physical_table_route: Option<PhysicalTableRouteValue>,
302 physical_columns: Vec<ColumnMetadata>,
303 table_cache_keys_to_invalidate: Vec<CacheIdent>,
304}
305
306impl AlterTablesData {
307 fn clear_metadata_fields(&mut self) {
310 self.tasks.clear();
311 self.table_info_values.clear();
312 self.physical_table_id = 0;
313 self.physical_table_info = None;
314 self.physical_table_route = None;
315 self.physical_columns.clear();
316 }
317}
318
319#[derive(Debug, Serialize, Deserialize, AsRefStr)]
320enum AlterTablesState {
321 Prepare,
323 SubmitAlterRegionRequests,
324 UpdateMetadata,
326 InvalidateTableCache,
328}