common_meta/ddl/alter_logical_tables/
region_request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::alter_table_expr::Kind;
16use api::v1::region::{
17    alter_request, region_request, AddColumn, AddColumns, AlterRequest, AlterRequests,
18    RegionColumnDef, RegionRequest, RegionRequestHeader,
19};
20use api::v1::{self, AlterTableExpr};
21use common_telemetry::tracing_context::TracingContext;
22use store_api::storage::RegionId;
23
24use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
25use crate::error::Result;
26use crate::peer::Peer;
27use crate::rpc::router::{find_leader_regions, RegionRoute};
28
29impl AlterLogicalTablesProcedure {
30    pub(crate) fn make_request(
31        &self,
32        peer: &Peer,
33        region_routes: &[RegionRoute],
34    ) -> Result<RegionRequest> {
35        let alter_requests = self.make_alter_region_requests(peer, region_routes)?;
36        let request = RegionRequest {
37            header: Some(RegionRequestHeader {
38                tracing_context: TracingContext::from_current_span().to_w3c(),
39                ..Default::default()
40            }),
41            body: Some(region_request::Body::Alters(alter_requests)),
42        };
43
44        Ok(request)
45    }
46
47    fn make_alter_region_requests(
48        &self,
49        peer: &Peer,
50        region_routes: &[RegionRoute],
51    ) -> Result<AlterRequests> {
52        let tasks = &self.data.tasks;
53        let regions_on_this_peer = find_leader_regions(region_routes, peer);
54        let mut requests = Vec::with_capacity(tasks.len() * regions_on_this_peer.len());
55        for (task, table) in self
56            .data
57            .tasks
58            .iter()
59            .zip(self.data.table_info_values.iter())
60        {
61            for region_number in &regions_on_this_peer {
62                let region_id = RegionId::new(table.table_info.ident.table_id, *region_number);
63                let request = make_alter_region_request(
64                    region_id,
65                    &task.alter_table,
66                    table.table_info.ident.version,
67                );
68                requests.push(request);
69            }
70        }
71
72        Ok(AlterRequests { requests })
73    }
74}
75
76/// Makes an alter region request.
77pub fn make_alter_region_request(
78    region_id: RegionId,
79    alter_table_expr: &AlterTableExpr,
80    schema_version: u64,
81) -> AlterRequest {
82    let region_id = region_id.as_u64();
83    let kind = match &alter_table_expr.kind {
84        Some(Kind::AddColumns(add_columns)) => Some(alter_request::Kind::AddColumns(
85            to_region_add_columns(add_columns),
86        )),
87        _ => unreachable!(), // Safety: we have checked the kind in check_input_tasks
88    };
89
90    AlterRequest {
91        region_id,
92        schema_version,
93        kind,
94    }
95}
96
97fn to_region_add_columns(add_columns: &v1::AddColumns) -> AddColumns {
98    let add_columns = add_columns
99        .add_columns
100        .iter()
101        .map(|add_column| {
102            let region_column_def = RegionColumnDef {
103                column_def: add_column.column_def.clone(),
104                ..Default::default() // other fields are not used in alter logical table
105            };
106            AddColumn {
107                column_def: Some(region_column_def),
108                ..Default::default() // other fields are not used in alter logical table
109            }
110        })
111        .collect();
112    AddColumns { add_columns }
113}