common_meta/ddl/alter_logical_tables/
validator.rs1use std::collections::HashSet;
16
17use api::v1::alter_table_expr::Kind;
18use api::v1::AlterTableExpr;
19use snafu::{ensure, OptionExt};
20use store_api::storage::TableId;
21use table::table_reference::TableReference;
22
23use crate::ddl::utils::table_id::get_all_table_ids_by_names;
24use crate::ddl::utils::table_info::{
25 all_logical_table_routes_have_same_physical_id, get_all_table_info_values_by_table_ids,
26};
27use crate::error::{
28 AlterLogicalTablesInvalidArgumentsSnafu, Result, TableInfoNotFoundSnafu,
29 TableRouteNotFoundSnafu,
30};
31use crate::key::table_info::TableInfoValue;
32use crate::key::table_route::{PhysicalTableRouteValue, TableRouteManager, TableRouteValue};
33use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
34
35pub struct AlterLogicalTableValidator<'a> {
37 physical_table_id: TableId,
38 alters: Vec<&'a AlterTableExpr>,
39}
40
41impl<'a> AlterLogicalTableValidator<'a> {
42 pub fn new(physical_table_id: TableId, alters: Vec<&'a AlterTableExpr>) -> Self {
43 Self {
44 physical_table_id,
45 alters,
46 }
47 }
48
49 fn validate_schema(&self) -> Result<()> {
51 let is_same_schema = self.alters.windows(2).all(|pair| {
52 pair[0].catalog_name == pair[1].catalog_name
53 && pair[0].schema_name == pair[1].schema_name
54 });
55
56 ensure!(
57 is_same_schema,
58 AlterLogicalTablesInvalidArgumentsSnafu {
59 err_msg: "Schemas of the alter table expressions are not the same"
60 }
61 );
62
63 Ok(())
64 }
65
66 fn validate_alter_kind(&self) -> Result<()> {
69 for alter in &self.alters {
70 let kind = alter
71 .kind
72 .as_ref()
73 .context(AlterLogicalTablesInvalidArgumentsSnafu {
74 err_msg: "Alter kind is missing",
75 })?;
76
77 let Kind::AddColumns(_) = kind else {
78 return AlterLogicalTablesInvalidArgumentsSnafu {
79 err_msg: "Only support add columns operation",
80 }
81 .fail();
82 };
83 }
84
85 Ok(())
86 }
87
88 fn table_names(&self) -> Vec<TableReference> {
89 self.alters
90 .iter()
91 .map(|alter| {
92 TableReference::full(&alter.catalog_name, &alter.schema_name, &alter.table_name)
93 })
94 .collect()
95 }
96
97 async fn validate_physical_table(
106 &self,
107 table_metadata_manager: &TableMetadataManagerRef,
108 ) -> Result<(
109 DeserializedValueWithBytes<TableInfoValue>,
110 PhysicalTableRouteValue,
111 )> {
112 let (table_info, table_route) = table_metadata_manager
113 .get_full_table_info(self.physical_table_id)
114 .await?;
115
116 let table_info = table_info.with_context(|| TableInfoNotFoundSnafu {
117 table: format!("table id - {}", self.physical_table_id),
118 })?;
119
120 let physical_table_route = table_route
121 .context(TableRouteNotFoundSnafu {
122 table_id: self.physical_table_id,
123 })?
124 .into_inner();
125
126 let TableRouteValue::Physical(table_route) = physical_table_route else {
127 return AlterLogicalTablesInvalidArgumentsSnafu {
128 err_msg: format!(
129 "expected a physical table but got a logical table: {:?}",
130 self.physical_table_id
131 ),
132 }
133 .fail();
134 };
135
136 Ok((table_info, table_route))
137 }
138
139 async fn validate_logical_table_routes(
147 &self,
148 table_route_manager: &TableRouteManager,
149 table_ids: &[TableId],
150 ) -> Result<()> {
151 let all_logical_table_routes_have_same_physical_id =
152 all_logical_table_routes_have_same_physical_id(
153 table_route_manager,
154 table_ids,
155 self.physical_table_id,
156 )
157 .await?;
158
159 ensure!(
160 all_logical_table_routes_have_same_physical_id,
161 AlterLogicalTablesInvalidArgumentsSnafu {
162 err_msg: "All the tasks should have the same physical table id"
163 }
164 );
165
166 Ok(())
167 }
168
169 pub async fn validate(
179 &self,
180 table_metadata_manager: &TableMetadataManagerRef,
181 ) -> Result<ValidatorResult> {
182 self.validate_schema()?;
183 self.validate_alter_kind()?;
184 let (physical_table_info, physical_table_route) =
185 self.validate_physical_table(table_metadata_manager).await?;
186 let table_names = self.table_names();
187 let table_ids =
188 get_all_table_ids_by_names(table_metadata_manager.table_name_manager(), &table_names)
189 .await?;
190 let mut table_info_values = get_all_table_info_values_by_table_ids(
191 table_metadata_manager.table_info_manager(),
192 &table_ids,
193 &table_names,
194 )
195 .await?;
196 self.validate_logical_table_routes(
197 table_metadata_manager.table_route_manager(),
198 &table_ids,
199 )
200 .await?;
201 let skip_alter = self
202 .alters
203 .iter()
204 .zip(table_info_values.iter())
205 .map(|(task, table)| skip_alter_logical_region(task, table))
206 .collect::<Vec<_>>();
207 retain_unskipped(&mut table_info_values, &skip_alter);
208 let num_skipped = skip_alter.iter().filter(|&&x| x).count();
209
210 Ok(ValidatorResult {
211 num_skipped,
212 skip_alter,
213 table_info_values,
214 physical_table_info,
215 physical_table_route,
216 })
217 }
218}
219
220pub(crate) struct ValidatorResult {
222 pub(crate) num_skipped: usize,
223 pub(crate) skip_alter: Vec<bool>,
224 pub(crate) table_info_values: Vec<DeserializedValueWithBytes<TableInfoValue>>,
225 pub(crate) physical_table_info: DeserializedValueWithBytes<TableInfoValue>,
226 pub(crate) physical_table_route: PhysicalTableRouteValue,
227}
228
229pub(crate) fn retain_unskipped<T>(target: &mut Vec<T>, skipped: &[bool]) {
231 debug_assert_eq!(target.len(), skipped.len());
232 let mut iter = skipped.iter();
233 target.retain(|_| !iter.next().unwrap());
234}
235
236fn skip_alter_logical_region(alter: &AlterTableExpr, table: &TableInfoValue) -> bool {
238 let existing_columns = table
239 .table_info
240 .meta
241 .schema
242 .column_schemas
243 .iter()
244 .map(|c| &c.name)
245 .collect::<HashSet<_>>();
246
247 let Some(kind) = alter.kind.as_ref() else {
248 return true; };
250 let Kind::AddColumns(add_columns) = kind else {
251 return true; };
253
254 add_columns
258 .add_columns
259 .iter()
260 .map(|add_column| add_column.column_def.as_ref().map(|c| &c.name))
261 .all(|column| {
262 column
263 .map(|c| existing_columns.contains(c))
264 .unwrap_or(false)
265 })
266}
267
268#[cfg(test)]
269mod tests {
270 use super::*;
271
272 #[test]
273 fn test_retain_unskipped() {
274 let mut target = vec![1, 2, 3, 4, 5];
275 let skipped = vec![false, true, false, true, false];
276 retain_unskipped(&mut target, &skipped);
277 assert_eq!(target, vec![1, 3, 5]);
278 }
279}