common_meta/ddl/alter_logical_tables/
region_request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1;
16use api::v1::alter_table_expr::Kind;
17use api::v1::region::{
18    alter_request, region_request, AddColumn, AddColumns, AlterRequest, AlterRequests,
19    RegionColumnDef, RegionRequest, RegionRequestHeader,
20};
21use common_telemetry::tracing_context::TracingContext;
22use store_api::storage::RegionId;
23
24use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
25use crate::error::Result;
26use crate::key::table_info::TableInfoValue;
27use crate::peer::Peer;
28use crate::rpc::ddl::AlterTableTask;
29use crate::rpc::router::{find_leader_regions, RegionRoute};
30
31impl AlterLogicalTablesProcedure {
32    pub(crate) fn make_request(
33        &self,
34        peer: &Peer,
35        region_routes: &[RegionRoute],
36    ) -> Result<RegionRequest> {
37        let alter_requests = self.make_alter_region_requests(peer, region_routes)?;
38        let request = RegionRequest {
39            header: Some(RegionRequestHeader {
40                tracing_context: TracingContext::from_current_span().to_w3c(),
41                ..Default::default()
42            }),
43            body: Some(region_request::Body::Alters(alter_requests)),
44        };
45
46        Ok(request)
47    }
48
49    fn make_alter_region_requests(
50        &self,
51        peer: &Peer,
52        region_routes: &[RegionRoute],
53    ) -> Result<AlterRequests> {
54        let tasks = &self.data.tasks;
55        let regions_on_this_peer = find_leader_regions(region_routes, peer);
56        let mut requests = Vec::with_capacity(tasks.len() * regions_on_this_peer.len());
57        for (task, table) in self
58            .data
59            .tasks
60            .iter()
61            .zip(self.data.table_info_values.iter())
62        {
63            for region_number in &regions_on_this_peer {
64                let region_id = RegionId::new(table.table_info.ident.table_id, *region_number);
65                let request = self.make_alter_region_request(region_id, task, table)?;
66                requests.push(request);
67            }
68        }
69
70        Ok(AlterRequests { requests })
71    }
72
73    fn make_alter_region_request(
74        &self,
75        region_id: RegionId,
76        task: &AlterTableTask,
77        table: &TableInfoValue,
78    ) -> Result<AlterRequest> {
79        let region_id = region_id.as_u64();
80        let schema_version = table.table_info.ident.version;
81        let kind = match &task.alter_table.kind {
82            Some(Kind::AddColumns(add_columns)) => Some(alter_request::Kind::AddColumns(
83                to_region_add_columns(add_columns),
84            )),
85            _ => unreachable!(), // Safety: we have checked the kind in check_input_tasks
86        };
87
88        Ok(AlterRequest {
89            region_id,
90            schema_version,
91            kind,
92        })
93    }
94}
95
96fn to_region_add_columns(add_columns: &v1::AddColumns) -> AddColumns {
97    let add_columns = add_columns
98        .add_columns
99        .iter()
100        .map(|add_column| {
101            let region_column_def = RegionColumnDef {
102                column_def: add_column.column_def.clone(),
103                ..Default::default() // other fields are not used in alter logical table
104            };
105            AddColumn {
106                column_def: Some(region_column_def),
107                ..Default::default() // other fields are not used in alter logical table
108            }
109        })
110        .collect();
111    AddColumns { add_columns }
112}