common_meta/ddl/
alter_logical_tables.rs1mod check;
16mod metadata;
17mod region_request;
18mod table_cache_keys;
19mod update_metadata;
20
21use api::region::RegionResponse;
22use async_trait::async_trait;
23use common_catalog::format_full_table_name;
24use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
25use common_procedure::{Context, LockKey, Procedure, Status};
26use common_telemetry::{error, info, warn};
27use futures_util::future;
28use serde::{Deserialize, Serialize};
29use snafu::{ensure, ResultExt};
30use store_api::metadata::ColumnMetadata;
31use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
32use strum::AsRefStr;
33use table::metadata::TableId;
34
35use crate::ddl::utils::{
36 add_peer_context_if_needed, map_to_procedure_error, sync_follower_regions,
37};
38use crate::ddl::DdlContext;
39use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result};
40use crate::instruction::CacheIdent;
41use crate::key::table_info::TableInfoValue;
42use crate::key::table_route::PhysicalTableRouteValue;
43use crate::key::DeserializedValueWithBytes;
44use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
45use crate::metrics;
46use crate::rpc::ddl::AlterTableTask;
47use crate::rpc::router::{find_leaders, RegionRoute};
48
49pub struct AlterLogicalTablesProcedure {
50 pub context: DdlContext,
51 pub data: AlterTablesData,
52}
53
54impl AlterLogicalTablesProcedure {
55 pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterLogicalTables";
56
57 pub fn new(
58 tasks: Vec<AlterTableTask>,
59 physical_table_id: TableId,
60 context: DdlContext,
61 ) -> Self {
62 Self {
63 context,
64 data: AlterTablesData {
65 state: AlterTablesState::Prepare,
66 tasks,
67 table_info_values: vec![],
68 physical_table_id,
69 physical_table_info: None,
70 physical_table_route: None,
71 physical_columns: vec![],
72 table_cache_keys_to_invalidate: vec![],
73 },
74 }
75 }
76
77 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
78 let data = serde_json::from_str(json).context(FromJsonSnafu)?;
79 Ok(Self { context, data })
80 }
81
82 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
83 self.check_input_tasks()?;
85 self.fill_table_info_values().await?;
87 self.check_physical_table().await?;
89 self.fill_physical_table_info().await?;
91 let finished_tasks = self.check_finished_tasks()?;
93 let already_finished_count = finished_tasks
94 .iter()
95 .map(|x| if *x { 1 } else { 0 })
96 .sum::<usize>();
97 let apply_tasks_count = self.data.tasks.len();
98 if already_finished_count == apply_tasks_count {
99 info!("All the alter tasks are finished, will skip the procedure.");
100 self.data.state = AlterTablesState::InvalidateTableCache;
102 return Ok(Status::executing(true));
103 } else if already_finished_count > 0 {
104 info!(
105 "There are {} alter tasks, {} of them were already finished.",
106 apply_tasks_count, already_finished_count
107 );
108 }
109 self.filter_task(&finished_tasks)?;
110
111 self.data.state = AlterTablesState::SubmitAlterRegionRequests;
113 Ok(Status::executing(true))
114 }
115
116 pub(crate) async fn on_submit_alter_region_requests(&mut self) -> Result<Status> {
117 let physical_table_route = &self.data.physical_table_route.as_ref().unwrap();
119 let leaders = find_leaders(&physical_table_route.region_routes);
120 let mut alter_region_tasks = Vec::with_capacity(leaders.len());
121
122 for peer in leaders {
123 let requester = self.context.node_manager.datanode(&peer).await;
124 let request = self.make_request(&peer, &physical_table_route.region_routes)?;
125
126 alter_region_tasks.push(async move {
127 requester
128 .handle(request)
129 .await
130 .map_err(add_peer_context_if_needed(peer))
131 });
132 }
133
134 let mut results = future::join_all(alter_region_tasks)
135 .await
136 .into_iter()
137 .collect::<Result<Vec<_>>>()?;
138
139 let phy_raw_schemas = results
141 .iter_mut()
142 .map(|res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY))
143 .collect::<Vec<_>>();
144
145 if phy_raw_schemas.is_empty() {
146 self.submit_sync_region_requests(results, &physical_table_route.region_routes)
147 .await;
148 self.data.state = AlterTablesState::UpdateMetadata;
149 return Ok(Status::executing(true));
150 }
151
152 let first = phy_raw_schemas.first().unwrap();
155 ensure!(
156 phy_raw_schemas.iter().all(|x| x == first),
157 MetadataCorruptionSnafu {
158 err_msg: "The physical schemas from datanodes are not the same."
159 }
160 );
161
162 if let Some(phy_raw_schema) = first {
164 self.data.physical_columns =
165 ColumnMetadata::decode_list(phy_raw_schema).context(DecodeJsonSnafu)?;
166 } else {
167 warn!("altering logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
168 }
169
170 self.submit_sync_region_requests(results, &physical_table_route.region_routes)
171 .await;
172 self.data.state = AlterTablesState::UpdateMetadata;
173 Ok(Status::executing(true))
174 }
175
176 async fn submit_sync_region_requests(
177 &self,
178 results: Vec<RegionResponse>,
179 region_routes: &[RegionRoute],
180 ) {
181 let table_info = &self.data.physical_table_info.as_ref().unwrap().table_info;
182 if let Err(err) = sync_follower_regions(
183 &self.context,
184 self.data.physical_table_id,
185 results,
186 region_routes,
187 table_info.meta.engine.as_str(),
188 )
189 .await
190 {
191 error!(err; "Failed to sync regions for table {}, table_id: {}",
192 format_full_table_name(&table_info.catalog_name, &table_info.schema_name, &table_info.name),
193 self.data.physical_table_id
194 );
195 }
196 }
197
198 pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
199 self.update_physical_table_metadata().await?;
200 self.update_logical_tables_metadata().await?;
201
202 self.data.build_cache_keys_to_invalidate();
203 self.data.clear_metadata_fields();
204
205 self.data.state = AlterTablesState::InvalidateTableCache;
206 Ok(Status::executing(true))
207 }
208
209 pub(crate) async fn on_invalidate_table_cache(&mut self) -> Result<Status> {
210 let to_invalidate = &self.data.table_cache_keys_to_invalidate;
211
212 self.context
213 .cache_invalidator
214 .invalidate(&Default::default(), to_invalidate)
215 .await?;
216 Ok(Status::done())
217 }
218}
219
220#[async_trait]
221impl Procedure for AlterLogicalTablesProcedure {
222 fn type_name(&self) -> &str {
223 Self::TYPE_NAME
224 }
225
226 async fn execute(&mut self, _ctx: &Context) -> ProcedureResult<Status> {
227 let state = &self.data.state;
228
229 let step = state.as_ref();
230
231 let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE
232 .with_label_values(&[step])
233 .start_timer();
234
235 match state {
236 AlterTablesState::Prepare => self.on_prepare().await,
237 AlterTablesState::SubmitAlterRegionRequests => {
238 self.on_submit_alter_region_requests().await
239 }
240 AlterTablesState::UpdateMetadata => self.on_update_metadata().await,
241 AlterTablesState::InvalidateTableCache => self.on_invalidate_table_cache().await,
242 }
243 .map_err(map_to_procedure_error)
244 }
245
246 fn dump(&self) -> ProcedureResult<String> {
247 serde_json::to_string(&self.data).context(ToJsonSnafu)
248 }
249
250 fn lock_key(&self) -> LockKey {
251 let mut lock_key = Vec::with_capacity(2 + 1 + self.data.tasks.len());
255 let table_ref = self.data.tasks[0].table_ref();
256 lock_key.push(CatalogLock::Read(table_ref.catalog).into());
257 lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
258 lock_key.push(TableLock::Write(self.data.physical_table_id).into());
259 lock_key.extend(
260 self.data
261 .table_info_values
262 .iter()
263 .map(|table| TableLock::Write(table.table_info.ident.table_id).into()),
264 );
265
266 LockKey::new(lock_key)
267 }
268}
269
270#[derive(Debug, Serialize, Deserialize)]
271pub struct AlterTablesData {
272 state: AlterTablesState,
273 tasks: Vec<AlterTableTask>,
274 table_info_values: Vec<DeserializedValueWithBytes<TableInfoValue>>,
277 physical_table_id: TableId,
279 physical_table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
280 physical_table_route: Option<PhysicalTableRouteValue>,
281 physical_columns: Vec<ColumnMetadata>,
282 table_cache_keys_to_invalidate: Vec<CacheIdent>,
283}
284
285impl AlterTablesData {
286 fn clear_metadata_fields(&mut self) {
289 self.tasks.clear();
290 self.table_info_values.clear();
291 self.physical_table_id = 0;
292 self.physical_table_info = None;
293 self.physical_table_route = None;
294 self.physical_columns.clear();
295 }
296}
297
298#[derive(Debug, Serialize, Deserialize, AsRefStr)]
299enum AlterTablesState {
300 Prepare,
302 SubmitAlterRegionRequests,
303 UpdateMetadata,
305 InvalidateTableCache,
307}