common_meta/ddl/alter_table/
executor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::region::RegionResponse;
18use api::v1::region::region_request::Body;
19use api::v1::region::{alter_request, AlterRequest, RegionRequest, RegionRequestHeader};
20use api::v1::AlterTableExpr;
21use common_catalog::format_full_table_name;
22use common_grpc_expr::alter_expr_to_request;
23use common_telemetry::tracing_context::TracingContext;
24use common_telemetry::{debug, info};
25use futures::future;
26use snafu::{ensure, ResultExt};
27use store_api::metadata::ColumnMetadata;
28use store_api::storage::{RegionId, TableId};
29use table::metadata::{RawTableInfo, TableInfo};
30use table::requests::AlterKind;
31use table::table_name::TableName;
32
33use crate::cache_invalidator::{CacheInvalidatorRef, Context};
34use crate::ddl::utils::{add_peer_context_if_needed, raw_table_info};
35use crate::error::{self, Result, UnexpectedSnafu};
36use crate::instruction::CacheIdent;
37use crate::key::table_info::TableInfoValue;
38use crate::key::table_name::TableNameKey;
39use crate::key::{DeserializedValueWithBytes, RegionDistribution, TableMetadataManagerRef};
40use crate::node_manager::NodeManagerRef;
41use crate::rpc::router::{find_leaders, region_distribution, RegionRoute};
42
43/// [AlterTableExecutor] performs:
44/// - Alters the metadata of the table.
45/// - Alters regions on the datanode nodes.
46pub struct AlterTableExecutor {
47    table: TableName,
48    table_id: TableId,
49    /// The new table name if the alter kind is rename table.
50    new_table_name: Option<String>,
51}
52
53impl AlterTableExecutor {
54    /// Creates a new [`AlterTableExecutor`].
55    pub fn new(table: TableName, table_id: TableId, new_table_name: Option<String>) -> Self {
56        Self {
57            table,
58            table_id,
59            new_table_name,
60        }
61    }
62
63    /// Prepares to alter the table.
64    ///
65    /// ## Checks:
66    /// - The new table name doesn't exist (rename).
67    /// - Table exists.
68    pub(crate) async fn on_prepare(
69        &self,
70        table_metadata_manager: &TableMetadataManagerRef,
71    ) -> Result<()> {
72        let catalog = &self.table.catalog_name;
73        let schema = &self.table.schema_name;
74        let table_name = &self.table.table_name;
75
76        let manager = table_metadata_manager;
77        if let Some(new_table_name) = &self.new_table_name {
78            let new_table_name_key = TableNameKey::new(catalog, schema, new_table_name);
79            let exists = manager
80                .table_name_manager()
81                .exists(new_table_name_key)
82                .await?;
83            ensure!(
84                !exists,
85                error::TableAlreadyExistsSnafu {
86                    table_name: format_full_table_name(catalog, schema, new_table_name),
87                }
88            )
89        }
90
91        let table_name_key = TableNameKey::new(catalog, schema, table_name);
92        let exists = manager.table_name_manager().exists(table_name_key).await?;
93        ensure!(
94            exists,
95            error::TableNotFoundSnafu {
96                table_name: format_full_table_name(catalog, schema, table_name),
97            }
98        );
99
100        Ok(())
101    }
102
103    /// Validates the alter table expression and builds the new table info.
104    ///
105    /// This validation is performed early to ensure the alteration is valid before
106    /// proceeding to the `on_alter_metadata` state, where regions have already been altered.
107    /// Building the new table info here allows us to catch any issues with the
108    /// alteration before committing metadata changes.
109    pub(crate) fn validate_alter_table_expr(
110        table_info: &RawTableInfo,
111        alter_table_expr: AlterTableExpr,
112    ) -> Result<TableInfo> {
113        build_new_table_info(table_info, alter_table_expr)
114    }
115
116    /// Updates table metadata for alter table operation.
117    pub(crate) async fn on_alter_metadata(
118        &self,
119        table_metadata_manager: &TableMetadataManagerRef,
120        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
121        region_distribution: Option<&RegionDistribution>,
122        mut raw_table_info: RawTableInfo,
123        column_metadatas: &[ColumnMetadata],
124    ) -> Result<()> {
125        let table_ref = self.table.table_ref();
126        let table_id = self.table_id;
127
128        if let Some(new_table_name) = &self.new_table_name {
129            debug!(
130                "Starting update table: {} metadata, table_id: {}, new table info: {:?}, new table name: {}",
131                table_ref, table_id, raw_table_info, new_table_name
132            );
133
134            table_metadata_manager
135                .rename_table(current_table_info_value, new_table_name.to_string())
136                .await?;
137        } else {
138            debug!(
139                "Starting update table: {} metadata, table_id: {}, new table info: {:?}",
140                table_ref, table_id, raw_table_info
141            );
142
143            ensure!(
144                region_distribution.is_some(),
145                UnexpectedSnafu {
146                    err_msg: "region distribution is not set when updating table metadata",
147                }
148            );
149
150            if !column_metadatas.is_empty() {
151                raw_table_info::update_table_info_column_ids(&mut raw_table_info, column_metadatas);
152            }
153            table_metadata_manager
154                .update_table_info(
155                    current_table_info_value,
156                    region_distribution.cloned(),
157                    raw_table_info,
158                )
159                .await?;
160        }
161
162        Ok(())
163    }
164
165    /// Alters regions on the datanode nodes.
166    pub(crate) async fn on_alter_regions(
167        &self,
168        node_manager: &NodeManagerRef,
169        region_routes: &[RegionRoute],
170        kind: Option<alter_request::Kind>,
171    ) -> Vec<Result<RegionResponse>> {
172        let region_distribution = region_distribution(region_routes);
173        let leaders = find_leaders(region_routes)
174            .into_iter()
175            .map(|p| (p.id, p))
176            .collect::<HashMap<_, _>>();
177        let total_num_region = region_distribution
178            .values()
179            .map(|r| r.leader_regions.len())
180            .sum::<usize>();
181        let mut alter_region_tasks = Vec::with_capacity(total_num_region);
182        for (datanode_id, region_role_set) in region_distribution {
183            if region_role_set.leader_regions.is_empty() {
184                continue;
185            }
186            // Safety: must exists.
187            let peer = leaders.get(&datanode_id).unwrap();
188            let requester = node_manager.datanode(peer).await;
189
190            for region_id in region_role_set.leader_regions {
191                let region_id = RegionId::new(self.table_id, region_id);
192                let request = make_alter_region_request(region_id, kind.clone());
193
194                let requester = requester.clone();
195                let peer = peer.clone();
196
197                alter_region_tasks.push(async move {
198                    requester
199                        .handle(request)
200                        .await
201                        .map_err(add_peer_context_if_needed(peer))
202                });
203            }
204        }
205
206        future::join_all(alter_region_tasks)
207            .await
208            .into_iter()
209            .collect::<Vec<_>>()
210    }
211
212    /// Invalidates cache for the table.
213    pub(crate) async fn invalidate_table_cache(
214        &self,
215        cache_invalidator: &CacheInvalidatorRef,
216    ) -> Result<()> {
217        let ctx = Context {
218            subject: Some(format!(
219                "Invalidate table cache by altering table {}, table_id: {}",
220                self.table.table_ref(),
221                self.table_id,
222            )),
223        };
224
225        cache_invalidator
226            .invalidate(
227                &ctx,
228                &[
229                    CacheIdent::TableName(self.table.clone()),
230                    CacheIdent::TableId(self.table_id),
231                ],
232            )
233            .await?;
234
235        Ok(())
236    }
237}
238
239/// Makes alter region request.
240pub(crate) fn make_alter_region_request(
241    region_id: RegionId,
242    kind: Option<alter_request::Kind>,
243) -> RegionRequest {
244    RegionRequest {
245        header: Some(RegionRequestHeader {
246            tracing_context: TracingContext::from_current_span().to_w3c(),
247            ..Default::default()
248        }),
249        body: Some(Body::Alter(AlterRequest {
250            region_id: region_id.as_u64(),
251            kind,
252            ..Default::default()
253        })),
254    }
255}
256
257/// Builds new table info after alteration.
258///
259/// This function creates a new table info by applying the alter table expression
260/// to the existing table info. For add column operations, it increments the
261/// `next_column_id` by the number of columns being added, which may result in gaps
262/// in the column id sequence.
263fn build_new_table_info(
264    table_info: &RawTableInfo,
265    alter_table_expr: AlterTableExpr,
266) -> Result<TableInfo> {
267    let table_info =
268        TableInfo::try_from(table_info.clone()).context(error::ConvertRawTableInfoSnafu)?;
269    let schema_name = &table_info.schema_name;
270    let catalog_name = &table_info.catalog_name;
271    let table_name = &table_info.name;
272    let table_id = table_info.ident.table_id;
273    let request = alter_expr_to_request(table_id, alter_table_expr)
274        .context(error::ConvertAlterTableRequestSnafu)?;
275
276    let new_meta = table_info
277        .meta
278        .builder_with_alter_kind(table_name, &request.alter_kind)
279        .context(error::TableSnafu)?
280        .build()
281        .with_context(|_| error::BuildTableMetaSnafu {
282            table_name: format_full_table_name(catalog_name, schema_name, table_name),
283        })?;
284
285    let mut new_info = table_info.clone();
286    new_info.meta = new_meta;
287    new_info.ident.version = table_info.ident.version + 1;
288    match request.alter_kind {
289        AlterKind::AddColumns { columns } => {
290            // Bumps the column id for the new columns.
291            // It may bump more than the actual number of columns added if there are
292            // existing columns, but it's fine.
293            new_info.meta.next_column_id += columns.len() as u32;
294        }
295        AlterKind::RenameTable { new_table_name } => {
296            new_info.name = new_table_name.to_string();
297        }
298        AlterKind::DropColumns { .. }
299        | AlterKind::ModifyColumnTypes { .. }
300        | AlterKind::SetTableOptions { .. }
301        | AlterKind::UnsetTableOptions { .. }
302        | AlterKind::SetIndex { .. }
303        | AlterKind::UnsetIndex { .. }
304        | AlterKind::DropDefaults { .. } => {}
305    }
306
307    info!(
308        "Built new table info: {:?} for table {}, table_id: {}",
309        new_info.meta, table_name, table_id
310    );
311
312    Ok(new_info)
313}