common_meta/ddl/alter_logical_tables/
executor.rs1use std::collections::HashMap;
16
17use api::region::RegionResponse;
18use api::v1::alter_table_expr::Kind;
19use api::v1::region::{
20 AddColumn, AddColumns, AlterRequest, AlterRequests, RegionColumnDef, RegionRequest,
21 RegionRequestHeader, alter_request, region_request,
22};
23use api::v1::{self, AlterTableExpr};
24use common_telemetry::tracing_context::TracingContext;
25use common_telemetry::{debug, warn};
26use futures::future;
27use store_api::metadata::ColumnMetadata;
28use store_api::storage::{RegionId, RegionNumber, TableId};
29
30use crate::ddl::utils::{add_peer_context_if_needed, raw_table_info};
31use crate::error::Result;
32use crate::instruction::CacheIdent;
33use crate::key::table_info::TableInfoValue;
34use crate::key::{DeserializedValueWithBytes, RegionDistribution, TableMetadataManagerRef};
35use crate::node_manager::NodeManagerRef;
36use crate::rpc::router::{RegionRoute, find_leaders, region_distribution};
37
38pub struct AlterLogicalTablesExecutor<'a> {
42 alters: Vec<(TableId, &'a AlterTableExpr)>,
46}
47
48impl<'a> AlterLogicalTablesExecutor<'a> {
49 pub fn new(alters: Vec<(TableId, &'a AlterTableExpr)>) -> Self {
50 Self { alters }
51 }
52
53 pub(crate) async fn on_alter_regions(
55 &self,
56 node_manager: &NodeManagerRef,
57 region_routes: &[RegionRoute],
58 ) -> Result<Vec<RegionResponse>> {
59 let region_distribution = region_distribution(region_routes);
60 let leaders = find_leaders(region_routes)
61 .into_iter()
62 .map(|p| (p.id, p))
63 .collect::<HashMap<_, _>>();
64 let mut alter_region_tasks = Vec::with_capacity(leaders.len());
65 for (datanode_id, region_role_set) in region_distribution {
66 if region_role_set.leader_regions.is_empty() {
67 continue;
68 }
69 let peer = leaders.get(&datanode_id).unwrap();
71 let requester = node_manager.datanode(peer).await;
72 let requests = self.make_alter_region_request(®ion_role_set.leader_regions);
73 let requester = requester.clone();
74 let peer = peer.clone();
75
76 debug!("Sending alter region requests to datanode {}", peer);
77 alter_region_tasks.push(async move {
78 requester
79 .handle(make_request(requests))
80 .await
81 .map_err(add_peer_context_if_needed(peer))
82 });
83 }
84
85 future::join_all(alter_region_tasks)
86 .await
87 .into_iter()
88 .collect::<Result<Vec<_>>>()
89 }
90
91 fn make_alter_region_request(&self, region_numbers: &[RegionNumber]) -> AlterRequests {
92 let mut requests = Vec::with_capacity(region_numbers.len() * self.alters.len());
93 for (table_id, alter) in self.alters.iter() {
94 for region_number in region_numbers {
95 let region_id = RegionId::new(*table_id, *region_number);
96 let request = make_alter_region_request(region_id, alter);
97 requests.push(request);
98 }
99 }
100
101 AlterRequests { requests }
102 }
103
104 pub(crate) async fn on_alter_metadata(
109 physical_table_id: TableId,
110 table_metadata_manager: &TableMetadataManagerRef,
111 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
112 region_distribution: RegionDistribution,
113 physical_columns: &[ColumnMetadata],
114 ) -> Result<()> {
115 if physical_columns.is_empty() {
116 warn!(
117 "No physical columns found, leaving the physical table's schema unchanged when altering logical tables"
118 );
119 return Ok(());
120 }
121
122 let table_ref = current_table_info_value.table_ref();
123 let table_id = physical_table_id;
124
125 let old_raw_table_info = current_table_info_value.table_info.clone();
127 let new_raw_table_info =
128 raw_table_info::build_new_physical_table_info(old_raw_table_info, physical_columns);
129
130 debug!(
131 "Starting update table: {} metadata, table_id: {}, new table info: {:?}",
132 table_ref, table_id, new_raw_table_info
133 );
134
135 table_metadata_manager
136 .update_table_info(
137 current_table_info_value,
138 Some(region_distribution),
139 new_raw_table_info,
140 )
141 .await?;
142
143 Ok(())
144 }
145
146 pub(crate) fn build_cache_ident_keys(
153 physical_table_info: &TableInfoValue,
154 logical_table_info_values: &[&TableInfoValue],
155 ) -> Vec<CacheIdent> {
156 let mut cache_keys = Vec::with_capacity(logical_table_info_values.len() * 2 + 2);
157 cache_keys.extend(logical_table_info_values.iter().flat_map(|table| {
158 vec![
159 CacheIdent::TableId(table.table_info.ident.table_id),
160 CacheIdent::TableName(table.table_name()),
161 ]
162 }));
163 cache_keys.push(CacheIdent::TableId(
164 physical_table_info.table_info.ident.table_id,
165 ));
166 cache_keys.push(CacheIdent::TableName(physical_table_info.table_name()));
167
168 cache_keys
169 }
170}
171
172fn make_request(alter_requests: AlterRequests) -> RegionRequest {
173 RegionRequest {
174 header: Some(RegionRequestHeader {
175 tracing_context: TracingContext::from_current_span().to_w3c(),
176 ..Default::default()
177 }),
178 body: Some(region_request::Body::Alters(alter_requests)),
179 }
180}
181
182pub fn make_alter_region_request(
184 region_id: RegionId,
185 alter_table_expr: &AlterTableExpr,
186) -> AlterRequest {
187 let region_id = region_id.as_u64();
188 let kind = match &alter_table_expr.kind {
189 Some(Kind::AddColumns(add_columns)) => Some(alter_request::Kind::AddColumns(
190 to_region_add_columns(add_columns),
191 )),
192 _ => unreachable!(), };
194
195 AlterRequest {
196 region_id,
197 schema_version: 0,
198 kind,
199 }
200}
201
202fn to_region_add_columns(add_columns: &v1::AddColumns) -> AddColumns {
203 let add_columns = add_columns
204 .add_columns
205 .iter()
206 .map(|add_column| {
207 let region_column_def = RegionColumnDef {
208 column_def: add_column.column_def.clone(),
209 ..Default::default() };
211 AddColumn {
212 column_def: Some(region_column_def),
213 ..Default::default() }
215 })
216 .collect();
217 AddColumns { add_columns }
218}