common_meta/ddl/alter_logical_tables/
update_metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_grpc_expr::alter_expr_to_request;
16use common_telemetry::warn;
17use itertools::Itertools;
18use snafu::ResultExt;
19use table::metadata::{RawTableInfo, TableInfo};
20
21use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
22use crate::ddl::physical_table_metadata;
23use crate::error;
24use crate::error::{ConvertAlterTableRequestSnafu, Result};
25use crate::key::table_info::TableInfoValue;
26use crate::key::DeserializedValueWithBytes;
27use crate::rpc::ddl::AlterTableTask;
28
29impl AlterLogicalTablesProcedure {
30    pub(crate) async fn update_physical_table_metadata(&mut self) -> Result<()> {
31        if self.data.physical_columns.is_empty() {
32            warn!("No physical columns found, leaving the physical table's schema unchanged when altering logical tables");
33            return Ok(());
34        }
35
36        // Safety: must exist.
37        let physical_table_info = self.data.physical_table_info.as_ref().unwrap();
38
39        // Generates new table info
40        let old_raw_table_info = physical_table_info.table_info.clone();
41        let new_raw_table_info = physical_table_metadata::build_new_physical_table_info(
42            old_raw_table_info,
43            &self.data.physical_columns,
44        );
45
46        // Updates physical table's metadata, and we don't need to touch per-region settings.
47        self.context
48            .table_metadata_manager
49            .update_table_info(physical_table_info, None, new_raw_table_info)
50            .await?;
51
52        Ok(())
53    }
54
55    pub(crate) async fn update_logical_tables_metadata(&mut self) -> Result<()> {
56        let table_info_values = self.build_update_metadata()?;
57        let manager = &self.context.table_metadata_manager;
58        let chunk_size = manager.batch_update_table_info_value_chunk_size();
59        if table_info_values.len() > chunk_size {
60            let chunks = table_info_values
61                .into_iter()
62                .chunks(chunk_size)
63                .into_iter()
64                .map(|check| check.collect::<Vec<_>>())
65                .collect::<Vec<_>>();
66            for chunk in chunks {
67                manager.batch_update_table_info_values(chunk).await?;
68            }
69        } else {
70            manager
71                .batch_update_table_info_values(table_info_values)
72                .await?;
73        }
74
75        Ok(())
76    }
77
78    pub(crate) fn build_update_metadata(
79        &self,
80    ) -> Result<Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>> {
81        let mut table_info_values_to_update = Vec::with_capacity(self.data.tasks.len());
82        for (task, table) in self
83            .data
84            .tasks
85            .iter()
86            .zip(self.data.table_info_values.iter())
87        {
88            table_info_values_to_update.push(self.build_new_table_info(task, table)?);
89        }
90
91        Ok(table_info_values_to_update)
92    }
93
94    fn build_new_table_info(
95        &self,
96        task: &AlterTableTask,
97        table: &DeserializedValueWithBytes<TableInfoValue>,
98    ) -> Result<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)> {
99        // Builds new_meta
100        let table_info = TableInfo::try_from(table.table_info.clone())
101            .context(error::ConvertRawTableInfoSnafu)?;
102        let table_ref = task.table_ref();
103        let request =
104            alter_expr_to_request(table.table_info.ident.table_id, task.alter_table.clone())
105                .context(ConvertAlterTableRequestSnafu)?;
106        let new_meta = table_info
107            .meta
108            .builder_with_alter_kind(table_ref.table, &request.alter_kind)
109            .context(error::TableSnafu)?
110            .build()
111            .with_context(|_| error::BuildTableMetaSnafu {
112                table_name: table_ref.table,
113            })?;
114        let version = table_info.ident.version + 1;
115        let mut new_table = table_info;
116        new_table.meta = new_meta;
117        new_table.ident.version = version;
118
119        let mut raw_table_info = RawTableInfo::from(new_table);
120        raw_table_info.sort_columns();
121
122        Ok((table.clone(), raw_table_info))
123    }
124}