common_meta/ddl/alter_logical_tables/
validator.rs1use std::collections::HashSet;
16
17use api::v1::alter_table_expr::Kind;
18use api::v1::AlterTableExpr;
19use snafu::{ensure, OptionExt};
20use store_api::storage::TableId;
21use table::table_reference::TableReference;
22
23use crate::ddl::utils::table_id::get_all_table_ids_by_names;
24use crate::ddl::utils::table_info::get_all_table_info_values_by_table_ids;
25use crate::error::{
26 AlterLogicalTablesInvalidArgumentsSnafu, Result, TableInfoNotFoundSnafu,
27 TableRouteNotFoundSnafu,
28};
29use crate::key::table_info::TableInfoValue;
30use crate::key::table_route::{PhysicalTableRouteValue, TableRouteManager, TableRouteValue};
31use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
32
33pub struct AlterLogicalTableValidator<'a> {
35 physical_table_id: TableId,
36 alters: Vec<&'a AlterTableExpr>,
37}
38
39impl<'a> AlterLogicalTableValidator<'a> {
40 pub fn new(physical_table_id: TableId, alters: Vec<&'a AlterTableExpr>) -> Self {
41 Self {
42 physical_table_id,
43 alters,
44 }
45 }
46
47 fn validate_schema(&self) -> Result<()> {
49 let is_same_schema = self.alters.windows(2).all(|pair| {
50 pair[0].catalog_name == pair[1].catalog_name
51 && pair[0].schema_name == pair[1].schema_name
52 });
53
54 ensure!(
55 is_same_schema,
56 AlterLogicalTablesInvalidArgumentsSnafu {
57 err_msg: "Schemas of the alter table expressions are not the same"
58 }
59 );
60
61 Ok(())
62 }
63
64 fn validate_alter_kind(&self) -> Result<()> {
67 for alter in &self.alters {
68 let kind = alter
69 .kind
70 .as_ref()
71 .context(AlterLogicalTablesInvalidArgumentsSnafu {
72 err_msg: "Alter kind is missing",
73 })?;
74
75 let Kind::AddColumns(_) = kind else {
76 return AlterLogicalTablesInvalidArgumentsSnafu {
77 err_msg: "Only support add columns operation",
78 }
79 .fail();
80 };
81 }
82
83 Ok(())
84 }
85
86 fn table_names(&self) -> Vec<TableReference> {
87 self.alters
88 .iter()
89 .map(|alter| {
90 TableReference::full(&alter.catalog_name, &alter.schema_name, &alter.table_name)
91 })
92 .collect()
93 }
94
95 async fn validate_physical_table(
104 &self,
105 table_metadata_manager: &TableMetadataManagerRef,
106 ) -> Result<(
107 DeserializedValueWithBytes<TableInfoValue>,
108 PhysicalTableRouteValue,
109 )> {
110 let (table_info, table_route) = table_metadata_manager
111 .get_full_table_info(self.physical_table_id)
112 .await?;
113
114 let table_info = table_info.with_context(|| TableInfoNotFoundSnafu {
115 table: format!("table id - {}", self.physical_table_id),
116 })?;
117
118 let physical_table_route = table_route
119 .context(TableRouteNotFoundSnafu {
120 table_id: self.physical_table_id,
121 })?
122 .into_inner();
123
124 let TableRouteValue::Physical(table_route) = physical_table_route else {
125 return AlterLogicalTablesInvalidArgumentsSnafu {
126 err_msg: format!(
127 "expected a physical table but got a logical table: {:?}",
128 self.physical_table_id
129 ),
130 }
131 .fail();
132 };
133
134 Ok((table_info, table_route))
135 }
136
137 async fn validate_logical_table_routes(
145 &self,
146 table_route_manager: &TableRouteManager,
147 table_ids: &[TableId],
148 ) -> Result<()> {
149 let table_routes = table_route_manager
150 .table_route_storage()
151 .batch_get(table_ids)
152 .await?;
153
154 let physical_table_id = self.physical_table_id;
155
156 let is_same_physical_table = table_routes.iter().all(|r| {
157 if let Some(TableRouteValue::Logical(r)) = r {
158 r.physical_table_id() == physical_table_id
159 } else {
160 false
161 }
162 });
163
164 ensure!(
165 is_same_physical_table,
166 AlterLogicalTablesInvalidArgumentsSnafu {
167 err_msg: "All the tasks should have the same physical table id"
168 }
169 );
170
171 Ok(())
172 }
173
174 pub async fn validate(
184 &self,
185 table_metadata_manager: &TableMetadataManagerRef,
186 ) -> Result<ValidatorResult> {
187 self.validate_schema()?;
188 self.validate_alter_kind()?;
189 let (physical_table_info, physical_table_route) =
190 self.validate_physical_table(table_metadata_manager).await?;
191 let table_names = self.table_names();
192 let table_ids =
193 get_all_table_ids_by_names(table_metadata_manager.table_name_manager(), &table_names)
194 .await?;
195 let mut table_info_values = get_all_table_info_values_by_table_ids(
196 table_metadata_manager.table_info_manager(),
197 &table_ids,
198 &table_names,
199 )
200 .await?;
201 self.validate_logical_table_routes(
202 table_metadata_manager.table_route_manager(),
203 &table_ids,
204 )
205 .await?;
206 let skip_alter = self
207 .alters
208 .iter()
209 .zip(table_info_values.iter())
210 .map(|(task, table)| skip_alter_logical_region(task, table))
211 .collect::<Vec<_>>();
212 retain_unskipped(&mut table_info_values, &skip_alter);
213 let num_skipped = skip_alter.iter().filter(|&&x| x).count();
214
215 Ok(ValidatorResult {
216 num_skipped,
217 skip_alter,
218 table_info_values,
219 physical_table_info,
220 physical_table_route,
221 })
222 }
223}
224
225pub(crate) struct ValidatorResult {
227 pub(crate) num_skipped: usize,
228 pub(crate) skip_alter: Vec<bool>,
229 pub(crate) table_info_values: Vec<DeserializedValueWithBytes<TableInfoValue>>,
230 pub(crate) physical_table_info: DeserializedValueWithBytes<TableInfoValue>,
231 pub(crate) physical_table_route: PhysicalTableRouteValue,
232}
233
234pub(crate) fn retain_unskipped<T>(target: &mut Vec<T>, skipped: &[bool]) {
236 debug_assert_eq!(target.len(), skipped.len());
237 let mut iter = skipped.iter();
238 target.retain(|_| !iter.next().unwrap());
239}
240
241fn skip_alter_logical_region(alter: &AlterTableExpr, table: &TableInfoValue) -> bool {
243 let existing_columns = table
244 .table_info
245 .meta
246 .schema
247 .column_schemas
248 .iter()
249 .map(|c| &c.name)
250 .collect::<HashSet<_>>();
251
252 let Some(kind) = alter.kind.as_ref() else {
253 return true; };
255 let Kind::AddColumns(add_columns) = kind else {
256 return true; };
258
259 add_columns
263 .add_columns
264 .iter()
265 .map(|add_column| add_column.column_def.as_ref().map(|c| &c.name))
266 .all(|column| {
267 column
268 .map(|c| existing_columns.contains(c))
269 .unwrap_or(false)
270 })
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276
277 #[test]
278 fn test_retain_unskipped() {
279 let mut target = vec![1, 2, 3, 4, 5];
280 let skipped = vec![false, true, false, true, false];
281 retain_unskipped(&mut target, &skipped);
282 assert_eq!(target, vec![1, 3, 5]);
283 }
284}