common_meta/ddl/alter_logical_tables/
region_request.rs1use api::v1::alter_table_expr::Kind;
16use api::v1::region::{
17 alter_request, region_request, AddColumn, AddColumns, AlterRequest, AlterRequests,
18 RegionColumnDef, RegionRequest, RegionRequestHeader,
19};
20use api::v1::{self, AlterTableExpr};
21use common_telemetry::tracing_context::TracingContext;
22use store_api::storage::RegionId;
23
24use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
25use crate::error::Result;
26use crate::peer::Peer;
27use crate::rpc::router::{find_leader_regions, RegionRoute};
28
29impl AlterLogicalTablesProcedure {
30 pub(crate) fn make_request(
31 &self,
32 peer: &Peer,
33 region_routes: &[RegionRoute],
34 ) -> Result<RegionRequest> {
35 let alter_requests = self.make_alter_region_requests(peer, region_routes)?;
36 let request = RegionRequest {
37 header: Some(RegionRequestHeader {
38 tracing_context: TracingContext::from_current_span().to_w3c(),
39 ..Default::default()
40 }),
41 body: Some(region_request::Body::Alters(alter_requests)),
42 };
43
44 Ok(request)
45 }
46
47 fn make_alter_region_requests(
48 &self,
49 peer: &Peer,
50 region_routes: &[RegionRoute],
51 ) -> Result<AlterRequests> {
52 let tasks = &self.data.tasks;
53 let regions_on_this_peer = find_leader_regions(region_routes, peer);
54 let mut requests = Vec::with_capacity(tasks.len() * regions_on_this_peer.len());
55 for (task, table) in self
56 .data
57 .tasks
58 .iter()
59 .zip(self.data.table_info_values.iter())
60 {
61 for region_number in ®ions_on_this_peer {
62 let region_id = RegionId::new(table.table_info.ident.table_id, *region_number);
63 let request = make_alter_region_request(
64 region_id,
65 &task.alter_table,
66 table.table_info.ident.version,
67 );
68 requests.push(request);
69 }
70 }
71
72 Ok(AlterRequests { requests })
73 }
74}
75
76pub fn make_alter_region_request(
78 region_id: RegionId,
79 alter_table_expr: &AlterTableExpr,
80 schema_version: u64,
81) -> AlterRequest {
82 let region_id = region_id.as_u64();
83 let kind = match &alter_table_expr.kind {
84 Some(Kind::AddColumns(add_columns)) => Some(alter_request::Kind::AddColumns(
85 to_region_add_columns(add_columns),
86 )),
87 _ => unreachable!(), };
89
90 AlterRequest {
91 region_id,
92 schema_version,
93 kind,
94 }
95}
96
97fn to_region_add_columns(add_columns: &v1::AddColumns) -> AddColumns {
98 let add_columns = add_columns
99 .add_columns
100 .iter()
101 .map(|add_column| {
102 let region_column_def = RegionColumnDef {
103 column_def: add_column.column_def.clone(),
104 ..Default::default() };
106 AddColumn {
107 column_def: Some(region_column_def),
108 ..Default::default() }
110 })
111 .collect();
112 AddColumns { add_columns }
113}