common_meta/ddl/alter_table/
executor.rs1use std::collections::HashMap;
16
17use api::region::RegionResponse;
18use api::v1::AlterTableExpr;
19use api::v1::region::region_request::Body;
20use api::v1::region::{AlterRequest, RegionRequest, RegionRequestHeader, alter_request};
21use common_catalog::format_full_table_name;
22use common_grpc_expr::alter_expr_to_request;
23use common_telemetry::tracing_context::TracingContext;
24use common_telemetry::{debug, info};
25use futures::future;
26use snafu::{ResultExt, ensure};
27use store_api::metadata::ColumnMetadata;
28use store_api::storage::{RegionId, TableId};
29use table::metadata::TableInfo;
30use table::requests::AlterKind;
31use table::table_name::TableName;
32
33use crate::cache_invalidator::{CacheInvalidatorRef, Context};
34use crate::ddl::utils::{add_peer_context_if_needed, raw_table_info};
35use crate::error::{self, Result, UnexpectedSnafu};
36use crate::instruction::CacheIdent;
37use crate::key::table_info::TableInfoValue;
38use crate::key::table_name::TableNameKey;
39use crate::key::{DeserializedValueWithBytes, RegionDistribution, TableMetadataManagerRef};
40use crate::node_manager::NodeManagerRef;
41use crate::rpc::router::{RegionRoute, find_leaders, region_distribution};
42
43pub struct AlterTableExecutor {
47 table: TableName,
48 table_id: TableId,
49 new_table_name: Option<String>,
51}
52
53impl AlterTableExecutor {
54 pub fn new(table: TableName, table_id: TableId, new_table_name: Option<String>) -> Self {
56 Self {
57 table,
58 table_id,
59 new_table_name,
60 }
61 }
62
63 pub(crate) async fn on_prepare(
69 &self,
70 table_metadata_manager: &TableMetadataManagerRef,
71 ) -> Result<()> {
72 let catalog = &self.table.catalog_name;
73 let schema = &self.table.schema_name;
74 let table_name = &self.table.table_name;
75
76 let manager = table_metadata_manager;
77 if let Some(new_table_name) = &self.new_table_name {
78 let new_table_name_key = TableNameKey::new(catalog, schema, new_table_name);
79 let exists = manager
80 .table_name_manager()
81 .exists(new_table_name_key)
82 .await?;
83 ensure!(
84 !exists,
85 error::TableAlreadyExistsSnafu {
86 table_name: format_full_table_name(catalog, schema, new_table_name),
87 }
88 )
89 }
90
91 let table_name_key = TableNameKey::new(catalog, schema, table_name);
92 let exists = manager.table_name_manager().exists(table_name_key).await?;
93 ensure!(
94 exists,
95 error::TableNotFoundSnafu {
96 table_name: format_full_table_name(catalog, schema, table_name),
97 }
98 );
99
100 Ok(())
101 }
102
103 pub(crate) fn validate_alter_table_expr(
110 table_info: &TableInfo,
111 alter_table_expr: AlterTableExpr,
112 ) -> Result<TableInfo> {
113 build_new_table_info(table_info, alter_table_expr)
114 }
115
116 pub(crate) async fn on_alter_metadata(
118 &self,
119 table_metadata_manager: &TableMetadataManagerRef,
120 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
121 region_distribution: Option<&RegionDistribution>,
122 mut raw_table_info: TableInfo,
123 column_metadatas: &[ColumnMetadata],
124 metadata_only_alter: bool,
125 ) -> Result<()> {
126 let table_ref = self.table.table_ref();
127 let table_id = self.table_id;
128
129 if let Some(new_table_name) = &self.new_table_name {
130 debug!(
131 "Starting update table: {} metadata, table_id: {}, new table info: {:?}, new table name: {}",
132 table_ref, table_id, raw_table_info, new_table_name
133 );
134
135 table_metadata_manager
136 .rename_table(current_table_info_value, new_table_name.clone())
137 .await?;
138 } else {
139 debug!(
140 "Starting update table: {} metadata, table_id: {}, new table info: {:?}",
141 table_ref, table_id, raw_table_info
142 );
143
144 ensure!(
145 metadata_only_alter || region_distribution.is_some(),
146 UnexpectedSnafu {
147 err_msg: "region distribution is not set when updating table metadata",
148 }
149 );
150
151 if !column_metadatas.is_empty() {
152 raw_table_info::update_table_info_column_ids(&mut raw_table_info, column_metadatas);
153 }
154 table_metadata_manager
155 .update_table_info(
156 current_table_info_value,
157 region_distribution.cloned(),
158 raw_table_info,
159 )
160 .await?;
161 }
162
163 Ok(())
164 }
165
166 pub(crate) async fn on_alter_regions(
168 &self,
169 node_manager: &NodeManagerRef,
170 region_routes: &[RegionRoute],
171 kind: Option<alter_request::Kind>,
172 ) -> Vec<Result<RegionResponse>> {
173 let region_distribution = region_distribution(region_routes);
174 let leaders = find_leaders(region_routes)
175 .into_iter()
176 .map(|p| (p.id, p))
177 .collect::<HashMap<_, _>>();
178 let total_num_region = region_distribution
179 .values()
180 .map(|r| r.leader_regions.len())
181 .sum::<usize>();
182 let mut alter_region_tasks = Vec::with_capacity(total_num_region);
183 for (datanode_id, region_role_set) in region_distribution {
184 if region_role_set.leader_regions.is_empty() {
185 continue;
186 }
187 let peer = leaders.get(&datanode_id).unwrap();
189 let requester = node_manager.datanode(peer).await;
190
191 for region_id in region_role_set.leader_regions {
192 let region_id = RegionId::new(self.table_id, region_id);
193 let request = make_alter_region_request(region_id, kind.clone());
194
195 let requester = requester.clone();
196 let peer = peer.clone();
197
198 alter_region_tasks.push(async move {
199 requester
200 .handle(request)
201 .await
202 .map_err(add_peer_context_if_needed(peer))
203 });
204 }
205 }
206
207 future::join_all(alter_region_tasks)
208 .await
209 .into_iter()
210 .collect::<Vec<_>>()
211 }
212
213 pub(crate) async fn invalidate_table_cache(
215 &self,
216 cache_invalidator: &CacheInvalidatorRef,
217 ) -> Result<()> {
218 let ctx = Context {
219 subject: Some(format!(
220 "Invalidate table cache by altering table {}, table_id: {}",
221 self.table.table_ref(),
222 self.table_id,
223 )),
224 };
225
226 cache_invalidator
227 .invalidate(
228 &ctx,
229 &[
230 CacheIdent::TableName(self.table.clone()),
231 CacheIdent::TableId(self.table_id),
232 ],
233 )
234 .await?;
235
236 Ok(())
237 }
238}
239
240pub(crate) fn make_alter_region_request(
242 region_id: RegionId,
243 kind: Option<alter_request::Kind>,
244) -> RegionRequest {
245 RegionRequest {
246 header: Some(RegionRequestHeader {
247 tracing_context: TracingContext::from_current_span().to_w3c(),
248 ..Default::default()
249 }),
250 body: Some(Body::Alter(AlterRequest {
251 region_id: region_id.as_u64(),
252 kind,
253 ..Default::default()
254 })),
255 }
256}
257
258fn build_new_table_info(
265 table_info: &TableInfo,
266 alter_table_expr: AlterTableExpr,
267) -> Result<TableInfo> {
268 let table_info = table_info.clone();
269 let schema_name = &table_info.schema_name;
270 let catalog_name = &table_info.catalog_name;
271 let table_name = &table_info.name;
272 let table_id = table_info.ident.table_id;
273 let request = alter_expr_to_request(table_id, alter_table_expr, Some(&table_info.meta))
274 .context(error::ConvertAlterTableRequestSnafu)?;
275
276 let new_meta = table_info
277 .meta
278 .builder_with_alter_kind(table_name, &request.alter_kind)
279 .context(error::TableSnafu)?
280 .build()
281 .with_context(|_| error::BuildTableMetaSnafu {
282 table_name: format_full_table_name(catalog_name, schema_name, table_name),
283 })?;
284
285 let mut new_info = table_info.clone();
286 new_info.meta = new_meta;
287 new_info.ident.version = table_info.ident.version + 1;
288 match request.alter_kind {
289 AlterKind::AddColumns { columns } => {
290 new_info.meta.next_column_id += columns.len() as u32;
294 }
295 AlterKind::RenameTable { new_table_name } => {
296 new_info.name = new_table_name.clone();
297 }
298 AlterKind::DropColumns { .. }
299 | AlterKind::ModifyColumnTypes { .. }
300 | AlterKind::SetTableOptions { .. }
301 | AlterKind::UnsetTableOptions { .. }
302 | AlterKind::SetRepartitionColumnHint { .. }
303 | AlterKind::UnsetRepartitionColumnHint
304 | AlterKind::SetIndexes { .. }
305 | AlterKind::UnsetIndexes { .. }
306 | AlterKind::DropDefaults { .. }
307 | AlterKind::SetDefaults { .. } => {}
308 }
309
310 info!(
311 "Built new table info: {:?} for table {}, table_id: {}",
312 new_info.meta, table_name, table_id
313 );
314 Ok(new_info)
315}