common_meta/ddl/create_logical_tables/
check.rsuse snafu::ensure;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::error::{CreateLogicalTablesInvalidArgumentsSnafu, Result, TableAlreadyExistsSnafu};
use crate::key::table_name::TableNameKey;
impl CreateLogicalTablesProcedure {
pub(crate) fn check_input_tasks(&self) -> Result<()> {
self.check_schema()?;
Ok(())
}
pub(crate) async fn check_tables_already_exist(&mut self) -> Result<()> {
let table_name_keys = self
.data
.all_create_table_exprs()
.iter()
.map(|expr| TableNameKey::new(&expr.catalog_name, &expr.schema_name, &expr.table_name))
.collect::<Vec<_>>();
let table_ids_already_exists = self
.context
.table_metadata_manager
.table_name_manager()
.batch_get(table_name_keys)
.await?
.iter()
.map(|x| x.map(|x| x.table_id()))
.collect::<Vec<_>>();
self.data.table_ids_already_exists = table_ids_already_exists;
let tasks = &mut self.data.tasks;
for (task, table_id) in tasks.iter().zip(self.data.table_ids_already_exists.iter()) {
if table_id.is_some() {
ensure!(
task.create_table.create_if_not_exists,
TableAlreadyExistsSnafu {
table_name: task.create_table.table_name.to_string(),
}
);
continue;
}
}
Ok(())
}
fn check_schema(&self) -> Result<()> {
let is_same_schema = self.data.tasks.windows(2).all(|pair| {
pair[0].create_table.catalog_name == pair[1].create_table.catalog_name
&& pair[0].create_table.schema_name == pair[1].create_table.schema_name
});
ensure!(
is_same_schema,
CreateLogicalTablesInvalidArgumentsSnafu {
err_msg: "Schemas of the tasks are not the same"
}
);
Ok(())
}
}