common_meta/ddl/alter_logical_tables/
check.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use api::v1::alter_table_expr::Kind;
18use snafu::{ensure, OptionExt};
19
20use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
21use crate::error::{AlterLogicalTablesInvalidArgumentsSnafu, Result};
22use crate::key::table_info::TableInfoValue;
23use crate::key::table_route::TableRouteValue;
24use crate::rpc::ddl::AlterTableTask;
25
26impl AlterLogicalTablesProcedure {
27    pub(crate) fn check_input_tasks(&self) -> Result<()> {
28        self.check_schema()?;
29        self.check_alter_kind()?;
30        Ok(())
31    }
32
33    pub(crate) async fn check_physical_table(&self) -> Result<()> {
34        let table_route_manager = self.context.table_metadata_manager.table_route_manager();
35        let table_ids = self
36            .data
37            .table_info_values
38            .iter()
39            .map(|v| v.table_info.ident.table_id)
40            .collect::<Vec<_>>();
41        let table_routes = table_route_manager
42            .table_route_storage()
43            .batch_get(&table_ids)
44            .await?;
45        let physical_table_id = self.data.physical_table_id;
46        let is_same_physical_table = table_routes.iter().all(|r| {
47            if let Some(TableRouteValue::Logical(r)) = r {
48                r.physical_table_id() == physical_table_id
49            } else {
50                false
51            }
52        });
53
54        ensure!(
55            is_same_physical_table,
56            AlterLogicalTablesInvalidArgumentsSnafu {
57                err_msg: "All the tasks should have the same physical table id"
58            }
59        );
60
61        Ok(())
62    }
63
64    pub(crate) fn check_finished_tasks(&self) -> Result<Vec<bool>> {
65        let task = &self.data.tasks;
66        let table_info_values = &self.data.table_info_values;
67
68        Ok(task
69            .iter()
70            .zip(table_info_values.iter())
71            .map(|(task, table)| Self::check_finished_task(task, table))
72            .collect())
73    }
74
75    // Checks if the schemas of the tasks are the same
76    fn check_schema(&self) -> Result<()> {
77        let is_same_schema = self.data.tasks.windows(2).all(|pair| {
78            pair[0].alter_table.catalog_name == pair[1].alter_table.catalog_name
79                && pair[0].alter_table.schema_name == pair[1].alter_table.schema_name
80        });
81
82        ensure!(
83            is_same_schema,
84            AlterLogicalTablesInvalidArgumentsSnafu {
85                err_msg: "Schemas of the tasks are not the same"
86            }
87        );
88
89        Ok(())
90    }
91
92    fn check_alter_kind(&self) -> Result<()> {
93        for task in &self.data.tasks {
94            let kind = task.alter_table.kind.as_ref().context(
95                AlterLogicalTablesInvalidArgumentsSnafu {
96                    err_msg: "Alter kind is missing",
97                },
98            )?;
99            let Kind::AddColumns(_) = kind else {
100                return AlterLogicalTablesInvalidArgumentsSnafu {
101                    err_msg: "Only support add columns operation",
102                }
103                .fail();
104            };
105        }
106
107        Ok(())
108    }
109
110    fn check_finished_task(task: &AlterTableTask, table: &TableInfoValue) -> bool {
111        let columns = table
112            .table_info
113            .meta
114            .schema
115            .column_schemas
116            .iter()
117            .map(|c| &c.name)
118            .collect::<HashSet<_>>();
119
120        let Some(kind) = task.alter_table.kind.as_ref() else {
121            return true; // Never get here since we have checked it in `check_alter_kind`
122        };
123        let Kind::AddColumns(add_columns) = kind else {
124            return true; // Never get here since we have checked it in `check_alter_kind`
125        };
126
127        // We only check that all columns have been finished. That is to say,
128        // if one part is finished but another part is not, it will be considered
129        // unfinished.
130        add_columns
131            .add_columns
132            .iter()
133            .map(|add_column| add_column.column_def.as_ref().map(|c| &c.name))
134            .all(|column| column.map(|c| columns.contains(c)).unwrap_or(false))
135    }
136}