common_meta/ddl/alter_logical_tables/
update_metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_grpc_expr::alter_expr_to_request;
16use snafu::ResultExt;
17use table::metadata::{RawTableInfo, TableInfo};
18
19use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
20use crate::ddl::alter_logical_tables::executor::AlterLogicalTablesExecutor;
21use crate::ddl::utils::table_info::batch_update_table_info_values;
22use crate::error;
23use crate::error::{ConvertAlterTableRequestSnafu, Result};
24use crate::key::DeserializedValueWithBytes;
25use crate::key::table_info::TableInfoValue;
26use crate::rpc::ddl::AlterTableTask;
27use crate::rpc::router::region_distribution;
28
29impl AlterLogicalTablesProcedure {
30    pub(crate) async fn update_physical_table_metadata(&mut self) -> Result<()> {
31        self.fetch_physical_table_route_if_non_exist().await?;
32        // Safety: must exist.
33        let physical_table_info = self.data.physical_table_info.as_ref().unwrap();
34        // Safety: fetched in `fetch_physical_table_route_if_non_exist`.
35        let physical_table_route = self.physical_table_route.as_ref().unwrap();
36        let region_distribution = region_distribution(&physical_table_route.region_routes);
37
38        // Updates physical table's metadata.
39        AlterLogicalTablesExecutor::on_alter_metadata(
40            self.data.physical_table_id,
41            &self.context.table_metadata_manager,
42            physical_table_info,
43            region_distribution,
44            &self.data.physical_columns,
45        )
46        .await?;
47
48        Ok(())
49    }
50
51    pub(crate) async fn update_logical_tables_metadata(&mut self) -> Result<()> {
52        let table_info_values = self.build_update_metadata()?;
53        batch_update_table_info_values(&self.context.table_metadata_manager, table_info_values)
54            .await
55    }
56
57    pub(crate) fn build_update_metadata(
58        &self,
59    ) -> Result<Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>> {
60        let mut table_info_values_to_update = Vec::with_capacity(self.data.tasks.len());
61        for (task, table) in self
62            .data
63            .tasks
64            .iter()
65            .zip(self.data.table_info_values.iter())
66        {
67            table_info_values_to_update.push(self.build_new_table_info(task, table)?);
68        }
69
70        Ok(table_info_values_to_update)
71    }
72
73    fn build_new_table_info(
74        &self,
75        task: &AlterTableTask,
76        table: &DeserializedValueWithBytes<TableInfoValue>,
77    ) -> Result<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)> {
78        // Builds new_meta
79        let table_info = TableInfo::try_from(table.table_info.clone())
80            .context(error::ConvertRawTableInfoSnafu)?;
81        let table_ref = task.table_ref();
82        let request = alter_expr_to_request(
83            table.table_info.ident.table_id,
84            task.alter_table.clone(),
85            Some(&table_info.meta),
86        )
87        .context(ConvertAlterTableRequestSnafu)?;
88        let new_meta = table_info
89            .meta
90            .builder_with_alter_kind(table_ref.table, &request.alter_kind)
91            .context(error::TableSnafu)?
92            .build()
93            .with_context(|_| error::BuildTableMetaSnafu {
94                table_name: table_ref.table,
95            })?;
96        let version = table_info.ident.version + 1;
97        let mut new_table = table_info;
98        new_table.meta = new_meta;
99        new_table.ident.version = version;
100
101        let mut raw_table_info = RawTableInfo::from(new_table);
102        raw_table_info.sort_columns();
103
104        Ok((table.clone(), raw_table_info))
105    }
106}