common_meta/ddl/alter_logical_tables/
validator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use api::v1::alter_table_expr::Kind;
18use api::v1::AlterTableExpr;
19use snafu::{ensure, OptionExt};
20use store_api::storage::TableId;
21use table::table_reference::TableReference;
22
23use crate::ddl::utils::table_id::get_all_table_ids_by_names;
24use crate::ddl::utils::table_info::get_all_table_info_values_by_table_ids;
25use crate::error::{
26    AlterLogicalTablesInvalidArgumentsSnafu, Result, TableInfoNotFoundSnafu,
27    TableRouteNotFoundSnafu,
28};
29use crate::key::table_info::TableInfoValue;
30use crate::key::table_route::{PhysicalTableRouteValue, TableRouteManager, TableRouteValue};
31use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
32
33/// [AlterLogicalTableValidator] validates the alter logical expressions.
34pub struct AlterLogicalTableValidator<'a> {
35    physical_table_id: TableId,
36    alters: Vec<&'a AlterTableExpr>,
37}
38
39impl<'a> AlterLogicalTableValidator<'a> {
40    pub fn new(physical_table_id: TableId, alters: Vec<&'a AlterTableExpr>) -> Self {
41        Self {
42            physical_table_id,
43            alters,
44        }
45    }
46
47    /// Validates all alter table expressions have the same schema and catalog.
48    fn validate_schema(&self) -> Result<()> {
49        let is_same_schema = self.alters.windows(2).all(|pair| {
50            pair[0].catalog_name == pair[1].catalog_name
51                && pair[0].schema_name == pair[1].schema_name
52        });
53
54        ensure!(
55            is_same_schema,
56            AlterLogicalTablesInvalidArgumentsSnafu {
57                err_msg: "Schemas of the alter table expressions are not the same"
58            }
59        );
60
61        Ok(())
62    }
63
64    /// Validates that all alter table expressions are of the supported kind.
65    /// Currently only supports `AddColumns` operations.
66    fn validate_alter_kind(&self) -> Result<()> {
67        for alter in &self.alters {
68            let kind = alter
69                .kind
70                .as_ref()
71                .context(AlterLogicalTablesInvalidArgumentsSnafu {
72                    err_msg: "Alter kind is missing",
73                })?;
74
75            let Kind::AddColumns(_) = kind else {
76                return AlterLogicalTablesInvalidArgumentsSnafu {
77                    err_msg: "Only support add columns operation",
78                }
79                .fail();
80            };
81        }
82
83        Ok(())
84    }
85
86    fn table_names(&self) -> Vec<TableReference> {
87        self.alters
88            .iter()
89            .map(|alter| {
90                TableReference::full(&alter.catalog_name, &alter.schema_name, &alter.table_name)
91            })
92            .collect()
93    }
94
95    /// Validates that the physical table info and route exist.
96    ///
97    /// This method performs the following validations:
98    /// 1. Retrieves the full table info and route for the given physical table id
99    /// 2. Ensures the table info and table route exists
100    /// 3. Verifies that the table route is actually a physical table route, not a logical one
101    ///
102    /// Returns a tuple containing the validated table info and physical table route.
103    async fn validate_physical_table(
104        &self,
105        table_metadata_manager: &TableMetadataManagerRef,
106    ) -> Result<(
107        DeserializedValueWithBytes<TableInfoValue>,
108        PhysicalTableRouteValue,
109    )> {
110        let (table_info, table_route) = table_metadata_manager
111            .get_full_table_info(self.physical_table_id)
112            .await?;
113
114        let table_info = table_info.with_context(|| TableInfoNotFoundSnafu {
115            table: format!("table id - {}", self.physical_table_id),
116        })?;
117
118        let physical_table_route = table_route
119            .context(TableRouteNotFoundSnafu {
120                table_id: self.physical_table_id,
121            })?
122            .into_inner();
123
124        let TableRouteValue::Physical(table_route) = physical_table_route else {
125            return AlterLogicalTablesInvalidArgumentsSnafu {
126                err_msg: format!(
127                    "expected a physical table but got a logical table: {:?}",
128                    self.physical_table_id
129                ),
130            }
131            .fail();
132        };
133
134        Ok((table_info, table_route))
135    }
136
137    /// Validates that all logical table routes have the same physical table id.
138    ///
139    /// This method performs the following validations:
140    /// 1. Retrieves table routes for all the given table ids.
141    /// 2. Ensures that all retrieved routes are logical table routes (not physical)
142    /// 3. Verifies that all logical table routes reference the same physical table id.
143    /// 4. Returns an error if any route is not logical or references a different physical table.
144    async fn validate_logical_table_routes(
145        &self,
146        table_route_manager: &TableRouteManager,
147        table_ids: &[TableId],
148    ) -> Result<()> {
149        let table_routes = table_route_manager
150            .table_route_storage()
151            .batch_get(table_ids)
152            .await?;
153
154        let physical_table_id = self.physical_table_id;
155
156        let is_same_physical_table = table_routes.iter().all(|r| {
157            if let Some(TableRouteValue::Logical(r)) = r {
158                r.physical_table_id() == physical_table_id
159            } else {
160                false
161            }
162        });
163
164        ensure!(
165            is_same_physical_table,
166            AlterLogicalTablesInvalidArgumentsSnafu {
167                err_msg: "All the tasks should have the same physical table id"
168            }
169        );
170
171        Ok(())
172    }
173
174    /// Validates the alter logical expressions.
175    ///
176    /// This method performs the following validations:
177    /// 1. Validates that all alter table expressions have the same schema and catalog.
178    /// 2. Validates that all alter table expressions are of the supported kind.
179    /// 3. Validates that the physical table info and route exist.
180    /// 4. Validates that all logical table routes have the same physical table id.
181    ///
182    /// Returns a [ValidatorResult] containing the validation results.
183    pub async fn validate(
184        &self,
185        table_metadata_manager: &TableMetadataManagerRef,
186    ) -> Result<ValidatorResult> {
187        self.validate_schema()?;
188        self.validate_alter_kind()?;
189        let (physical_table_info, physical_table_route) =
190            self.validate_physical_table(table_metadata_manager).await?;
191        let table_names = self.table_names();
192        let table_ids =
193            get_all_table_ids_by_names(table_metadata_manager.table_name_manager(), &table_names)
194                .await?;
195        let mut table_info_values = get_all_table_info_values_by_table_ids(
196            table_metadata_manager.table_info_manager(),
197            &table_ids,
198            &table_names,
199        )
200        .await?;
201        self.validate_logical_table_routes(
202            table_metadata_manager.table_route_manager(),
203            &table_ids,
204        )
205        .await?;
206        let skip_alter = self
207            .alters
208            .iter()
209            .zip(table_info_values.iter())
210            .map(|(task, table)| skip_alter_logical_region(task, table))
211            .collect::<Vec<_>>();
212        retain_unskipped(&mut table_info_values, &skip_alter);
213        let num_skipped = skip_alter.iter().filter(|&&x| x).count();
214
215        Ok(ValidatorResult {
216            num_skipped,
217            skip_alter,
218            table_info_values,
219            physical_table_info,
220            physical_table_route,
221        })
222    }
223}
224
225/// The result of the validator.
226pub(crate) struct ValidatorResult {
227    pub(crate) num_skipped: usize,
228    pub(crate) skip_alter: Vec<bool>,
229    pub(crate) table_info_values: Vec<DeserializedValueWithBytes<TableInfoValue>>,
230    pub(crate) physical_table_info: DeserializedValueWithBytes<TableInfoValue>,
231    pub(crate) physical_table_route: PhysicalTableRouteValue,
232}
233
234/// Retains the elements that are not skipped.
235pub(crate) fn retain_unskipped<T>(target: &mut Vec<T>, skipped: &[bool]) {
236    debug_assert_eq!(target.len(), skipped.len());
237    let mut iter = skipped.iter();
238    target.retain(|_| !iter.next().unwrap());
239}
240
241/// Returns true if does not required to alter the logical region.
242fn skip_alter_logical_region(alter: &AlterTableExpr, table: &TableInfoValue) -> bool {
243    let existing_columns = table
244        .table_info
245        .meta
246        .schema
247        .column_schemas
248        .iter()
249        .map(|c| &c.name)
250        .collect::<HashSet<_>>();
251
252    let Some(kind) = alter.kind.as_ref() else {
253        return true; // Never get here since we have checked it in `validate_alter_kind`
254    };
255    let Kind::AddColumns(add_columns) = kind else {
256        return true; // Never get here since we have checked it in `validate_alter_kind`
257    };
258
259    // We only check that all columns have been finished. That is to say,
260    // if one part is finished but another part is not, it will be considered
261    // unfinished.
262    add_columns
263        .add_columns
264        .iter()
265        .map(|add_column| add_column.column_def.as_ref().map(|c| &c.name))
266        .all(|column| {
267            column
268                .map(|c| existing_columns.contains(c))
269                .unwrap_or(false)
270        })
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276
277    #[test]
278    fn test_retain_unskipped() {
279        let mut target = vec![1, 2, 3, 4, 5];
280        let skipped = vec![false, true, false, true, false];
281        retain_unskipped(&mut target, &skipped);
282        assert_eq!(target, vec![1, 3, 5]);
283    }
284}