common_meta/ddl/alter_logical_tables/
check.rs1use std::collections::HashSet;
16
17use api::v1::alter_table_expr::Kind;
18use snafu::{ensure, OptionExt};
19
20use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
21use crate::error::{AlterLogicalTablesInvalidArgumentsSnafu, Result};
22use crate::key::table_info::TableInfoValue;
23use crate::key::table_route::TableRouteValue;
24use crate::rpc::ddl::AlterTableTask;
25
26impl AlterLogicalTablesProcedure {
27 pub(crate) fn check_input_tasks(&self) -> Result<()> {
28 self.check_schema()?;
29 self.check_alter_kind()?;
30 Ok(())
31 }
32
33 pub(crate) async fn check_physical_table(&self) -> Result<()> {
34 let table_route_manager = self.context.table_metadata_manager.table_route_manager();
35 let table_ids = self
36 .data
37 .table_info_values
38 .iter()
39 .map(|v| v.table_info.ident.table_id)
40 .collect::<Vec<_>>();
41 let table_routes = table_route_manager
42 .table_route_storage()
43 .batch_get(&table_ids)
44 .await?;
45 let physical_table_id = self.data.physical_table_id;
46 let is_same_physical_table = table_routes.iter().all(|r| {
47 if let Some(TableRouteValue::Logical(r)) = r {
48 r.physical_table_id() == physical_table_id
49 } else {
50 false
51 }
52 });
53
54 ensure!(
55 is_same_physical_table,
56 AlterLogicalTablesInvalidArgumentsSnafu {
57 err_msg: "All the tasks should have the same physical table id"
58 }
59 );
60
61 Ok(())
62 }
63
64 pub(crate) fn check_finished_tasks(&self) -> Result<Vec<bool>> {
65 let task = &self.data.tasks;
66 let table_info_values = &self.data.table_info_values;
67
68 Ok(task
69 .iter()
70 .zip(table_info_values.iter())
71 .map(|(task, table)| Self::check_finished_task(task, table))
72 .collect())
73 }
74
75 fn check_schema(&self) -> Result<()> {
77 let is_same_schema = self.data.tasks.windows(2).all(|pair| {
78 pair[0].alter_table.catalog_name == pair[1].alter_table.catalog_name
79 && pair[0].alter_table.schema_name == pair[1].alter_table.schema_name
80 });
81
82 ensure!(
83 is_same_schema,
84 AlterLogicalTablesInvalidArgumentsSnafu {
85 err_msg: "Schemas of the tasks are not the same"
86 }
87 );
88
89 Ok(())
90 }
91
92 fn check_alter_kind(&self) -> Result<()> {
93 for task in &self.data.tasks {
94 let kind = task.alter_table.kind.as_ref().context(
95 AlterLogicalTablesInvalidArgumentsSnafu {
96 err_msg: "Alter kind is missing",
97 },
98 )?;
99 let Kind::AddColumns(_) = kind else {
100 return AlterLogicalTablesInvalidArgumentsSnafu {
101 err_msg: "Only support add columns operation",
102 }
103 .fail();
104 };
105 }
106
107 Ok(())
108 }
109
110 fn check_finished_task(task: &AlterTableTask, table: &TableInfoValue) -> bool {
111 let columns = table
112 .table_info
113 .meta
114 .schema
115 .column_schemas
116 .iter()
117 .map(|c| &c.name)
118 .collect::<HashSet<_>>();
119
120 let Some(kind) = task.alter_table.kind.as_ref() else {
121 return true; };
123 let Kind::AddColumns(add_columns) = kind else {
124 return true; };
126
127 add_columns
131 .add_columns
132 .iter()
133 .map(|add_column| add_column.column_def.as_ref().map(|c| &c.name))
134 .all(|column| column.map(|c| columns.contains(c)).unwrap_or(false))
135 }
136}