common_meta/ddl/
alter_logical_tables.rs1mod check;
16mod metadata;
17mod region_request;
18mod table_cache_keys;
19mod update_metadata;
20
21use api::region::RegionResponse;
22use async_trait::async_trait;
23use common_catalog::format_full_table_name;
24use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
25use common_procedure::{Context, LockKey, Procedure, Status};
26use common_telemetry::{error, info, warn};
27use futures_util::future;
28pub use region_request::make_alter_region_request;
29use serde::{Deserialize, Serialize};
30use snafu::{ensure, ResultExt};
31use store_api::metadata::ColumnMetadata;
32use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
33use strum::AsRefStr;
34use table::metadata::TableId;
35
36use crate::ddl::utils::{
37 add_peer_context_if_needed, map_to_procedure_error, sync_follower_regions,
38};
39use crate::ddl::DdlContext;
40use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result};
41use crate::instruction::CacheIdent;
42use crate::key::table_info::TableInfoValue;
43use crate::key::table_route::PhysicalTableRouteValue;
44use crate::key::DeserializedValueWithBytes;
45use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
46use crate::metrics;
47use crate::rpc::ddl::AlterTableTask;
48use crate::rpc::router::{find_leaders, RegionRoute};
49
50pub struct AlterLogicalTablesProcedure {
51 pub context: DdlContext,
52 pub data: AlterTablesData,
53}
54
55impl AlterLogicalTablesProcedure {
56 pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterLogicalTables";
57
58 pub fn new(
59 tasks: Vec<AlterTableTask>,
60 physical_table_id: TableId,
61 context: DdlContext,
62 ) -> Self {
63 Self {
64 context,
65 data: AlterTablesData {
66 state: AlterTablesState::Prepare,
67 tasks,
68 table_info_values: vec![],
69 physical_table_id,
70 physical_table_info: None,
71 physical_table_route: None,
72 physical_columns: vec![],
73 table_cache_keys_to_invalidate: vec![],
74 },
75 }
76 }
77
78 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
79 let data = serde_json::from_str(json).context(FromJsonSnafu)?;
80 Ok(Self { context, data })
81 }
82
83 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
84 self.check_input_tasks()?;
86 self.fill_table_info_values().await?;
88 self.check_physical_table().await?;
90 self.fill_physical_table_info().await?;
92 let finished_tasks = self.check_finished_tasks()?;
94 let already_finished_count = finished_tasks
95 .iter()
96 .map(|x| if *x { 1 } else { 0 })
97 .sum::<usize>();
98 let apply_tasks_count = self.data.tasks.len();
99 if already_finished_count == apply_tasks_count {
100 info!("All the alter tasks are finished, will skip the procedure.");
101 self.data.state = AlterTablesState::InvalidateTableCache;
103 return Ok(Status::executing(true));
104 } else if already_finished_count > 0 {
105 info!(
106 "There are {} alter tasks, {} of them were already finished.",
107 apply_tasks_count, already_finished_count
108 );
109 }
110 self.filter_task(&finished_tasks)?;
111
112 self.data.state = AlterTablesState::SubmitAlterRegionRequests;
114 Ok(Status::executing(true))
115 }
116
117 pub(crate) async fn on_submit_alter_region_requests(&mut self) -> Result<Status> {
118 let physical_table_route = &self.data.physical_table_route.as_ref().unwrap();
120 let leaders = find_leaders(&physical_table_route.region_routes);
121 let mut alter_region_tasks = Vec::with_capacity(leaders.len());
122
123 for peer in leaders {
124 let requester = self.context.node_manager.datanode(&peer).await;
125 let request = self.make_request(&peer, &physical_table_route.region_routes)?;
126
127 alter_region_tasks.push(async move {
128 requester
129 .handle(request)
130 .await
131 .map_err(add_peer_context_if_needed(peer))
132 });
133 }
134
135 let mut results = future::join_all(alter_region_tasks)
136 .await
137 .into_iter()
138 .collect::<Result<Vec<_>>>()?;
139
140 let phy_raw_schemas = results
142 .iter_mut()
143 .map(|res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY))
144 .collect::<Vec<_>>();
145
146 if phy_raw_schemas.is_empty() {
147 self.submit_sync_region_requests(results, &physical_table_route.region_routes)
148 .await;
149 self.data.state = AlterTablesState::UpdateMetadata;
150 return Ok(Status::executing(true));
151 }
152
153 let first = phy_raw_schemas.first().unwrap();
156 ensure!(
157 phy_raw_schemas.iter().all(|x| x == first),
158 MetadataCorruptionSnafu {
159 err_msg: "The physical schemas from datanodes are not the same."
160 }
161 );
162
163 if let Some(phy_raw_schema) = first {
165 self.data.physical_columns =
166 ColumnMetadata::decode_list(phy_raw_schema).context(DecodeJsonSnafu)?;
167 } else {
168 warn!("altering logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
169 }
170
171 self.submit_sync_region_requests(results, &physical_table_route.region_routes)
172 .await;
173 self.data.state = AlterTablesState::UpdateMetadata;
174 Ok(Status::executing(true))
175 }
176
177 async fn submit_sync_region_requests(
178 &self,
179 results: Vec<RegionResponse>,
180 region_routes: &[RegionRoute],
181 ) {
182 let table_info = &self.data.physical_table_info.as_ref().unwrap().table_info;
183 if let Err(err) = sync_follower_regions(
184 &self.context,
185 self.data.physical_table_id,
186 results,
187 region_routes,
188 table_info.meta.engine.as_str(),
189 )
190 .await
191 {
192 error!(err; "Failed to sync regions for table {}, table_id: {}",
193 format_full_table_name(&table_info.catalog_name, &table_info.schema_name, &table_info.name),
194 self.data.physical_table_id
195 );
196 }
197 }
198
199 pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
200 self.update_physical_table_metadata().await?;
201 self.update_logical_tables_metadata().await?;
202
203 self.data.build_cache_keys_to_invalidate();
204 self.data.clear_metadata_fields();
205
206 self.data.state = AlterTablesState::InvalidateTableCache;
207 Ok(Status::executing(true))
208 }
209
210 pub(crate) async fn on_invalidate_table_cache(&mut self) -> Result<Status> {
211 let to_invalidate = &self.data.table_cache_keys_to_invalidate;
212
213 self.context
214 .cache_invalidator
215 .invalidate(&Default::default(), to_invalidate)
216 .await?;
217 Ok(Status::done())
218 }
219}
220
221#[async_trait]
222impl Procedure for AlterLogicalTablesProcedure {
223 fn type_name(&self) -> &str {
224 Self::TYPE_NAME
225 }
226
227 async fn execute(&mut self, _ctx: &Context) -> ProcedureResult<Status> {
228 let state = &self.data.state;
229
230 let step = state.as_ref();
231
232 let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE
233 .with_label_values(&[step])
234 .start_timer();
235
236 match state {
237 AlterTablesState::Prepare => self.on_prepare().await,
238 AlterTablesState::SubmitAlterRegionRequests => {
239 self.on_submit_alter_region_requests().await
240 }
241 AlterTablesState::UpdateMetadata => self.on_update_metadata().await,
242 AlterTablesState::InvalidateTableCache => self.on_invalidate_table_cache().await,
243 }
244 .map_err(map_to_procedure_error)
245 }
246
247 fn dump(&self) -> ProcedureResult<String> {
248 serde_json::to_string(&self.data).context(ToJsonSnafu)
249 }
250
251 fn lock_key(&self) -> LockKey {
252 let mut lock_key = Vec::with_capacity(2 + 1 + self.data.tasks.len());
256 let table_ref = self.data.tasks[0].table_ref();
257 lock_key.push(CatalogLock::Read(table_ref.catalog).into());
258 lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
259 lock_key.push(TableLock::Write(self.data.physical_table_id).into());
260 lock_key.extend(
261 self.data
262 .table_info_values
263 .iter()
264 .map(|table| TableLock::Write(table.table_info.ident.table_id).into()),
265 );
266
267 LockKey::new(lock_key)
268 }
269}
270
271#[derive(Debug, Serialize, Deserialize)]
272pub struct AlterTablesData {
273 state: AlterTablesState,
274 tasks: Vec<AlterTableTask>,
275 table_info_values: Vec<DeserializedValueWithBytes<TableInfoValue>>,
278 physical_table_id: TableId,
280 physical_table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
281 physical_table_route: Option<PhysicalTableRouteValue>,
282 physical_columns: Vec<ColumnMetadata>,
283 table_cache_keys_to_invalidate: Vec<CacheIdent>,
284}
285
286impl AlterTablesData {
287 fn clear_metadata_fields(&mut self) {
290 self.tasks.clear();
291 self.table_info_values.clear();
292 self.physical_table_id = 0;
293 self.physical_table_info = None;
294 self.physical_table_route = None;
295 self.physical_columns.clear();
296 }
297}
298
299#[derive(Debug, Serialize, Deserialize, AsRefStr)]
300enum AlterTablesState {
301 Prepare,
303 SubmitAlterRegionRequests,
304 UpdateMetadata,
306 InvalidateTableCache,
308}