common_meta/ddl/alter_logical_tables/
executor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::region::RegionResponse;
18use api::v1::alter_table_expr::Kind;
19use api::v1::region::{
20    AddColumn, AddColumns, AlterRequest, AlterRequests, RegionColumnDef, RegionRequest,
21    RegionRequestHeader, alter_request, region_request,
22};
23use api::v1::{self, AlterTableExpr};
24use common_telemetry::tracing_context::TracingContext;
25use common_telemetry::{debug, warn};
26use futures::future;
27use store_api::metadata::ColumnMetadata;
28use store_api::storage::{RegionId, RegionNumber, TableId};
29
30use crate::ddl::utils::{add_peer_context_if_needed, raw_table_info};
31use crate::error::Result;
32use crate::instruction::CacheIdent;
33use crate::key::table_info::TableInfoValue;
34use crate::key::{DeserializedValueWithBytes, RegionDistribution, TableMetadataManagerRef};
35use crate::node_manager::NodeManagerRef;
36use crate::rpc::router::{RegionRoute, find_leaders, region_distribution};
37
38/// [AlterLogicalTablesExecutor] performs:
39/// - Alters logical regions on the datanodes.
40/// - Updates table metadata for alter table operation.
41pub struct AlterLogicalTablesExecutor<'a> {
42    /// The alter table expressions.
43    ///
44    /// The first element is the logical table id, the second element is the alter table expression.
45    alters: Vec<(TableId, &'a AlterTableExpr)>,
46}
47
48impl<'a> AlterLogicalTablesExecutor<'a> {
49    pub fn new(alters: Vec<(TableId, &'a AlterTableExpr)>) -> Self {
50        Self { alters }
51    }
52
53    /// Alters logical regions on the datanodes.
54    pub(crate) async fn on_alter_regions(
55        &self,
56        node_manager: &NodeManagerRef,
57        region_routes: &[RegionRoute],
58    ) -> Result<Vec<RegionResponse>> {
59        let region_distribution = region_distribution(region_routes);
60        let leaders = find_leaders(region_routes)
61            .into_iter()
62            .map(|p| (p.id, p))
63            .collect::<HashMap<_, _>>();
64        let mut alter_region_tasks = Vec::with_capacity(leaders.len());
65        for (datanode_id, region_role_set) in region_distribution {
66            if region_role_set.leader_regions.is_empty() {
67                continue;
68            }
69            // Safety: must exists.
70            let peer = leaders.get(&datanode_id).unwrap();
71            let requester = node_manager.datanode(peer).await;
72            let requests = self.make_alter_region_request(&region_role_set.leader_regions);
73            let requester = requester.clone();
74            let peer = peer.clone();
75
76            debug!("Sending alter region requests to datanode {}", peer);
77            alter_region_tasks.push(async move {
78                requester
79                    .handle(make_request(requests))
80                    .await
81                    .map_err(add_peer_context_if_needed(peer))
82            });
83        }
84
85        future::join_all(alter_region_tasks)
86            .await
87            .into_iter()
88            .collect::<Result<Vec<_>>>()
89    }
90
91    fn make_alter_region_request(&self, region_numbers: &[RegionNumber]) -> AlterRequests {
92        let mut requests = Vec::with_capacity(region_numbers.len() * self.alters.len());
93        for (table_id, alter) in self.alters.iter() {
94            for region_number in region_numbers {
95                let region_id = RegionId::new(*table_id, *region_number);
96                let request = make_alter_region_request(region_id, alter);
97                requests.push(request);
98            }
99        }
100
101        AlterRequests { requests }
102    }
103
104    /// Updates table metadata for alter table operation.
105    ///
106    /// ## Panic:
107    /// - If the region distribution is not set when updating table metadata.
108    pub(crate) async fn on_alter_metadata(
109        physical_table_id: TableId,
110        table_metadata_manager: &TableMetadataManagerRef,
111        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
112        region_distribution: RegionDistribution,
113        physical_columns: &[ColumnMetadata],
114    ) -> Result<()> {
115        if physical_columns.is_empty() {
116            warn!(
117                "No physical columns found, leaving the physical table's schema unchanged when altering logical tables"
118            );
119            return Ok(());
120        }
121
122        let table_ref = current_table_info_value.table_ref();
123        let table_id = physical_table_id;
124
125        // Generates new table info
126        let old_raw_table_info = current_table_info_value.table_info.clone();
127        let new_raw_table_info =
128            raw_table_info::build_new_physical_table_info(old_raw_table_info, physical_columns);
129
130        debug!(
131            "Starting update table: {} metadata, table_id: {}, new table info: {:?}",
132            table_ref, table_id, new_raw_table_info
133        );
134
135        table_metadata_manager
136            .update_table_info(
137                current_table_info_value,
138                Some(region_distribution),
139                new_raw_table_info,
140            )
141            .await?;
142
143        Ok(())
144    }
145
146    /// Builds the cache ident keys for the alter logical tables.
147    ///
148    /// The cache ident keys are:
149    /// - The table id of the logical tables.
150    /// - The table name of the logical tables.
151    /// - The table id of the physical table.
152    pub(crate) fn build_cache_ident_keys(
153        physical_table_info: &TableInfoValue,
154        logical_table_info_values: &[&TableInfoValue],
155    ) -> Vec<CacheIdent> {
156        let mut cache_keys = Vec::with_capacity(logical_table_info_values.len() * 2 + 2);
157        cache_keys.extend(logical_table_info_values.iter().flat_map(|table| {
158            vec![
159                CacheIdent::TableId(table.table_info.ident.table_id),
160                CacheIdent::TableName(table.table_name()),
161            ]
162        }));
163        cache_keys.push(CacheIdent::TableId(
164            physical_table_info.table_info.ident.table_id,
165        ));
166        cache_keys.push(CacheIdent::TableName(physical_table_info.table_name()));
167
168        cache_keys
169    }
170}
171
172fn make_request(alter_requests: AlterRequests) -> RegionRequest {
173    RegionRequest {
174        header: Some(RegionRequestHeader {
175            tracing_context: TracingContext::from_current_span().to_w3c(),
176            ..Default::default()
177        }),
178        body: Some(region_request::Body::Alters(alter_requests)),
179    }
180}
181
182/// Makes an alter region request.
183pub fn make_alter_region_request(
184    region_id: RegionId,
185    alter_table_expr: &AlterTableExpr,
186) -> AlterRequest {
187    let region_id = region_id.as_u64();
188    let kind = match &alter_table_expr.kind {
189        Some(Kind::AddColumns(add_columns)) => Some(alter_request::Kind::AddColumns(
190            to_region_add_columns(add_columns),
191        )),
192        _ => unreachable!(), // Safety: we have checked the kind in check_input_tasks
193    };
194
195    AlterRequest {
196        region_id,
197        schema_version: 0,
198        kind,
199    }
200}
201
202fn to_region_add_columns(add_columns: &v1::AddColumns) -> AddColumns {
203    let add_columns = add_columns
204        .add_columns
205        .iter()
206        .map(|add_column| {
207            let region_column_def = RegionColumnDef {
208                column_def: add_column.column_def.clone(),
209                ..Default::default() // other fields are not used in alter logical table
210            };
211            AddColumn {
212                column_def: Some(region_column_def),
213                ..Default::default() // other fields are not used in alter logical table
214            }
215        })
216        .collect();
217    AddColumns { add_columns }
218}