common_meta/ddl/alter_logical_tables/
region_request.rs1use api::v1;
16use api::v1::alter_table_expr::Kind;
17use api::v1::region::{
18 alter_request, region_request, AddColumn, AddColumns, AlterRequest, AlterRequests,
19 RegionColumnDef, RegionRequest, RegionRequestHeader,
20};
21use common_telemetry::tracing_context::TracingContext;
22use store_api::storage::RegionId;
23
24use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
25use crate::error::Result;
26use crate::key::table_info::TableInfoValue;
27use crate::peer::Peer;
28use crate::rpc::ddl::AlterTableTask;
29use crate::rpc::router::{find_leader_regions, RegionRoute};
30
31impl AlterLogicalTablesProcedure {
32 pub(crate) fn make_request(
33 &self,
34 peer: &Peer,
35 region_routes: &[RegionRoute],
36 ) -> Result<RegionRequest> {
37 let alter_requests = self.make_alter_region_requests(peer, region_routes)?;
38 let request = RegionRequest {
39 header: Some(RegionRequestHeader {
40 tracing_context: TracingContext::from_current_span().to_w3c(),
41 ..Default::default()
42 }),
43 body: Some(region_request::Body::Alters(alter_requests)),
44 };
45
46 Ok(request)
47 }
48
49 fn make_alter_region_requests(
50 &self,
51 peer: &Peer,
52 region_routes: &[RegionRoute],
53 ) -> Result<AlterRequests> {
54 let tasks = &self.data.tasks;
55 let regions_on_this_peer = find_leader_regions(region_routes, peer);
56 let mut requests = Vec::with_capacity(tasks.len() * regions_on_this_peer.len());
57 for (task, table) in self
58 .data
59 .tasks
60 .iter()
61 .zip(self.data.table_info_values.iter())
62 {
63 for region_number in ®ions_on_this_peer {
64 let region_id = RegionId::new(table.table_info.ident.table_id, *region_number);
65 let request = self.make_alter_region_request(region_id, task, table)?;
66 requests.push(request);
67 }
68 }
69
70 Ok(AlterRequests { requests })
71 }
72
73 fn make_alter_region_request(
74 &self,
75 region_id: RegionId,
76 task: &AlterTableTask,
77 table: &TableInfoValue,
78 ) -> Result<AlterRequest> {
79 let region_id = region_id.as_u64();
80 let schema_version = table.table_info.ident.version;
81 let kind = match &task.alter_table.kind {
82 Some(Kind::AddColumns(add_columns)) => Some(alter_request::Kind::AddColumns(
83 to_region_add_columns(add_columns),
84 )),
85 _ => unreachable!(), };
87
88 Ok(AlterRequest {
89 region_id,
90 schema_version,
91 kind,
92 })
93 }
94}
95
96fn to_region_add_columns(add_columns: &v1::AddColumns) -> AddColumns {
97 let add_columns = add_columns
98 .add_columns
99 .iter()
100 .map(|add_column| {
101 let region_column_def = RegionColumnDef {
102 column_def: add_column.column_def.clone(),
103 ..Default::default() };
105 AddColumn {
106 column_def: Some(region_column_def),
107 ..Default::default() }
109 })
110 .collect();
111 AddColumns { add_columns }
112}