common_meta/ddl/create_logical_tables/
metadata.rs1use std::collections::HashSet;
16use std::sync::Arc;
17
18use datatypes::prelude::ConcreteDataType;
19use datatypes::schema::{ColumnSchema, Schema};
20use snafu::OptionExt;
21
22use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
23use crate::error::Result;
24use crate::key::table_route::TableRouteValue;
25
26impl CreateLogicalTablesProcedure {
27 pub(crate) async fn fill_physical_table_info(&mut self) -> Result<()> {
28 let physical_region_numbers = self
29 .context
30 .table_metadata_manager
31 .table_route_manager()
32 .get_physical_table_route(self.data.physical_table_id)
33 .await
34 .map(|(_, route)| TableRouteValue::Physical(route).region_numbers())?;
35
36 self.data.physical_region_numbers = physical_region_numbers;
37
38 let physical_table_info = self
40 .context
41 .table_metadata_manager
42 .table_info_manager()
43 .get(self.data.physical_table_id)
44 .await?
45 .with_context(|| crate::error::TableInfoNotFoundSnafu {
46 table: format!("physical table {}", self.data.physical_table_id),
47 })?;
48
49 let physical_partition_columns: Vec<String> = physical_table_info
50 .table_info
51 .meta
52 .partition_key_indices
53 .iter()
54 .map(|&idx| {
55 physical_table_info.table_info.meta.schema.column_schemas()[idx]
56 .name
57 .clone()
58 })
59 .collect();
60
61 self.data.physical_partition_columns = physical_partition_columns;
62
63 Ok(())
64 }
65
66 pub(crate) fn merge_partition_columns_into_logical_tables(&mut self) -> Result<()> {
67 let partition_columns = &self.data.physical_partition_columns;
68
69 if partition_columns.is_empty() {
71 return Ok(());
72 }
73
74 for task in &mut self.data.tasks {
75 let column_schemas = task.table_info.meta.schema.column_schemas();
77 let existing_column_names: HashSet<_> =
78 column_schemas.iter().map(|c| &c.name).collect();
79
80 let mut new_columns = Vec::new();
81 let mut new_primary_key_indices = task.table_info.meta.primary_key_indices.clone();
82
83 for partition_column in partition_columns {
85 if !existing_column_names.contains(partition_column) {
86 let new_column_index = column_schemas.len() + new_columns.len();
87
88 let column_schema = ColumnSchema::new(
90 partition_column.clone(),
91 ConcreteDataType::string_datatype(),
92 true,
93 );
94 new_columns.push(column_schema);
95
96 new_primary_key_indices.push(new_column_index);
98 }
99 }
100
101 if !new_columns.is_empty() {
103 let mut updated_columns = column_schemas.to_vec();
104 updated_columns.extend(new_columns);
105
106 let new_schema = Arc::new(Schema::new(updated_columns));
108
109 task.table_info.meta.schema = new_schema;
111 task.table_info.meta.primary_key_indices = new_primary_key_indices;
112 }
113 }
114
115 Ok(())
116 }
117
118 pub(crate) async fn allocate_table_ids(&mut self) -> Result<()> {
119 for (task, table_id) in self
120 .data
121 .tasks
122 .iter_mut()
123 .zip(self.data.table_ids_already_exists.iter())
124 {
125 let table_id = if let Some(table_id) = table_id {
126 *table_id
127 } else {
128 self.context
129 .table_metadata_allocator
130 .allocate_table_id(&task.create_table.table_id)
131 .await?
132 };
133 task.set_table_id(table_id);
134
135 task.sort_columns();
137 }
138
139 Ok(())
140 }
141}