common_meta/ddl/create_logical_tables/
check.rs1use snafu::ensure;
16
17use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
18use crate::error::{CreateLogicalTablesInvalidArgumentsSnafu, Result, TableAlreadyExistsSnafu};
19use crate::key::table_name::TableNameKey;
20
21impl CreateLogicalTablesProcedure {
22 pub(crate) fn check_input_tasks(&self) -> Result<()> {
23 self.check_schema()?;
24
25 Ok(())
26 }
27
28 pub async fn check_tables_already_exist(&mut self) -> Result<()> {
29 let table_name_keys = self
30 .data
31 .all_create_table_exprs()
32 .iter()
33 .map(|expr| TableNameKey::new(&expr.catalog_name, &expr.schema_name, &expr.table_name))
34 .collect::<Vec<_>>();
35 let table_ids_already_exists = self
36 .context
37 .table_metadata_manager
38 .table_name_manager()
39 .batch_get(table_name_keys)
40 .await?
41 .iter()
42 .map(|x| x.map(|x| x.table_id()))
43 .collect::<Vec<_>>();
44
45 self.data.table_ids_already_exists = table_ids_already_exists;
46
47 let tasks = &mut self.data.tasks;
49 for (task, table_id) in tasks.iter().zip(self.data.table_ids_already_exists.iter()) {
50 if table_id.is_some() {
51 ensure!(
53 task.create_table.create_if_not_exists,
54 TableAlreadyExistsSnafu {
55 table_name: task.create_table.table_name.to_string(),
56 }
57 );
58 continue;
59 }
60 }
61
62 Ok(())
63 }
64
65 fn check_schema(&self) -> Result<()> {
67 let is_same_schema = self.data.tasks.windows(2).all(|pair| {
68 pair[0].create_table.catalog_name == pair[1].create_table.catalog_name
69 && pair[0].create_table.schema_name == pair[1].create_table.schema_name
70 });
71
72 ensure!(
73 is_same_schema,
74 CreateLogicalTablesInvalidArgumentsSnafu {
75 err_msg: "Schemas of the tasks are not the same"
76 }
77 );
78
79 Ok(())
80 }
81}