common_meta/ddl/alter_logical_tables/
update_metadata.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_grpc_expr::alter_expr_to_request;
use common_telemetry::warn;
use itertools::Itertools;
use snafu::ResultExt;
use table::metadata::{RawTableInfo, TableInfo};

use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::ddl::physical_table_metadata;
use crate::error;
use crate::error::{ConvertAlterTableRequestSnafu, Result};
use crate::key::table_info::TableInfoValue;
use crate::key::DeserializedValueWithBytes;
use crate::rpc::ddl::AlterTableTask;

impl AlterLogicalTablesProcedure {
    pub(crate) async fn update_physical_table_metadata(&mut self) -> Result<()> {
        if self.data.physical_columns.is_empty() {
            warn!("No physical columns found, leaving the physical table's schema unchanged when altering logical tables");
            return Ok(());
        }

        // Safety: must exist.
        let physical_table_info = self.data.physical_table_info.as_ref().unwrap();

        // Generates new table info
        let old_raw_table_info = physical_table_info.table_info.clone();
        let new_raw_table_info = physical_table_metadata::build_new_physical_table_info(
            old_raw_table_info,
            &self.data.physical_columns,
        );

        // Updates physical table's metadata, and we don't need to touch per-region settings.
        self.context
            .table_metadata_manager
            .update_table_info(physical_table_info, None, new_raw_table_info)
            .await?;

        Ok(())
    }

    pub(crate) async fn update_logical_tables_metadata(&mut self) -> Result<()> {
        let table_info_values = self.build_update_metadata()?;
        let manager = &self.context.table_metadata_manager;
        let chunk_size = manager.batch_update_table_info_value_chunk_size();
        if table_info_values.len() > chunk_size {
            let chunks = table_info_values
                .into_iter()
                .chunks(chunk_size)
                .into_iter()
                .map(|check| check.collect::<Vec<_>>())
                .collect::<Vec<_>>();
            for chunk in chunks {
                manager.batch_update_table_info_values(chunk).await?;
            }
        } else {
            manager
                .batch_update_table_info_values(table_info_values)
                .await?;
        }

        Ok(())
    }

    pub(crate) fn build_update_metadata(
        &self,
    ) -> Result<Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>> {
        let mut table_info_values_to_update = Vec::with_capacity(self.data.tasks.len());
        for (task, table) in self
            .data
            .tasks
            .iter()
            .zip(self.data.table_info_values.iter())
        {
            table_info_values_to_update.push(self.build_new_table_info(task, table)?);
        }

        Ok(table_info_values_to_update)
    }

    fn build_new_table_info(
        &self,
        task: &AlterTableTask,
        table: &DeserializedValueWithBytes<TableInfoValue>,
    ) -> Result<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)> {
        // Builds new_meta
        let table_info = TableInfo::try_from(table.table_info.clone())
            .context(error::ConvertRawTableInfoSnafu)?;
        let table_ref = task.table_ref();
        let request =
            alter_expr_to_request(table.table_info.ident.table_id, task.alter_table.clone())
                .context(ConvertAlterTableRequestSnafu)?;
        let new_meta = table_info
            .meta
            .builder_with_alter_kind(table_ref.table, &request.alter_kind)
            .context(error::TableSnafu)?
            .build()
            .with_context(|_| error::BuildTableMetaSnafu {
                table_name: table_ref.table,
            })?;
        let version = table_info.ident.version + 1;
        let mut new_table = table_info;
        new_table.meta = new_meta;
        new_table.ident.version = version;

        let mut raw_table_info = RawTableInfo::from(new_table);
        raw_table_info.sort_columns();

        Ok((table.clone(), raw_table_info))
    }
}