common_meta/ddl/alter_logical_tables/
update_metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_grpc_expr::alter_expr_to_request;
16use snafu::ResultExt;
17use table::metadata::{RawTableInfo, TableInfo};
18
19use crate::ddl::alter_logical_tables::executor::AlterLogicalTablesExecutor;
20use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
21use crate::ddl::utils::table_info::batch_update_table_info_values;
22use crate::error;
23use crate::error::{ConvertAlterTableRequestSnafu, Result};
24use crate::key::table_info::TableInfoValue;
25use crate::key::DeserializedValueWithBytes;
26use crate::rpc::ddl::AlterTableTask;
27use crate::rpc::router::region_distribution;
28
29impl AlterLogicalTablesProcedure {
30    pub(crate) async fn update_physical_table_metadata(&mut self) -> Result<()> {
31        // Safety: must exist.
32        let physical_table_info = self.data.physical_table_info.as_ref().unwrap();
33        let physical_table_route = self.data.physical_table_route.as_ref().unwrap();
34        let region_distribution = region_distribution(&physical_table_route.region_routes);
35
36        // Updates physical table's metadata.
37        AlterLogicalTablesExecutor::on_alter_metadata(
38            self.data.physical_table_id,
39            &self.context.table_metadata_manager,
40            physical_table_info,
41            region_distribution,
42            &self.data.physical_columns,
43        )
44        .await?;
45
46        Ok(())
47    }
48
49    pub(crate) async fn update_logical_tables_metadata(&mut self) -> Result<()> {
50        let table_info_values = self.build_update_metadata()?;
51        batch_update_table_info_values(&self.context.table_metadata_manager, table_info_values)
52            .await
53    }
54
55    pub(crate) fn build_update_metadata(
56        &self,
57    ) -> Result<Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>> {
58        let mut table_info_values_to_update = Vec::with_capacity(self.data.tasks.len());
59        for (task, table) in self
60            .data
61            .tasks
62            .iter()
63            .zip(self.data.table_info_values.iter())
64        {
65            table_info_values_to_update.push(self.build_new_table_info(task, table)?);
66        }
67
68        Ok(table_info_values_to_update)
69    }
70
71    fn build_new_table_info(
72        &self,
73        task: &AlterTableTask,
74        table: &DeserializedValueWithBytes<TableInfoValue>,
75    ) -> Result<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)> {
76        // Builds new_meta
77        let table_info = TableInfo::try_from(table.table_info.clone())
78            .context(error::ConvertRawTableInfoSnafu)?;
79        let table_ref = task.table_ref();
80        let request = alter_expr_to_request(
81            table.table_info.ident.table_id,
82            task.alter_table.clone(),
83            Some(&table_info.meta),
84        )
85        .context(ConvertAlterTableRequestSnafu)?;
86        let new_meta = table_info
87            .meta
88            .builder_with_alter_kind(table_ref.table, &request.alter_kind)
89            .context(error::TableSnafu)?
90            .build()
91            .with_context(|_| error::BuildTableMetaSnafu {
92                table_name: table_ref.table,
93            })?;
94        let version = table_info.ident.version + 1;
95        let mut new_table = table_info;
96        new_table.meta = new_meta;
97        new_table.ident.version = version;
98
99        let mut raw_table_info = RawTableInfo::from(new_table);
100        raw_table_info.sort_columns();
101
102        Ok((table.clone(), raw_table_info))
103    }
104}