common_meta/ddl/create_logical_tables/
metadata.rs1use std::collections::HashSet;
16
17use datatypes::prelude::ConcreteDataType;
18use datatypes::schema::{ColumnSchema, RawSchema};
19use snafu::OptionExt;
20
21use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
22use crate::error::Result;
23use crate::key::table_route::TableRouteValue;
24
25impl CreateLogicalTablesProcedure {
26 pub(crate) async fn fill_physical_table_info(&mut self) -> Result<()> {
27 let physical_region_numbers = self
28 .context
29 .table_metadata_manager
30 .table_route_manager()
31 .get_physical_table_route(self.data.physical_table_id)
32 .await
33 .map(|(_, route)| TableRouteValue::Physical(route).region_numbers())?;
34
35 self.data.physical_region_numbers = physical_region_numbers;
36
37 let physical_table_info = self
39 .context
40 .table_metadata_manager
41 .table_info_manager()
42 .get(self.data.physical_table_id)
43 .await?
44 .with_context(|| crate::error::TableInfoNotFoundSnafu {
45 table: format!("physical table {}", self.data.physical_table_id),
46 })?;
47
48 let physical_partition_columns: Vec<String> = physical_table_info
49 .table_info
50 .meta
51 .partition_key_indices
52 .iter()
53 .map(|&idx| {
54 physical_table_info.table_info.meta.schema.column_schemas[idx]
55 .name
56 .clone()
57 })
58 .collect();
59
60 self.data.physical_partition_columns = physical_partition_columns;
61
62 Ok(())
63 }
64
65 pub(crate) fn merge_partition_columns_into_logical_tables(&mut self) -> Result<()> {
66 let partition_columns = &self.data.physical_partition_columns;
67
68 if partition_columns.is_empty() {
70 return Ok(());
71 }
72
73 for task in &mut self.data.tasks {
74 let existing_column_names: HashSet<_> = task
76 .table_info
77 .meta
78 .schema
79 .column_schemas
80 .iter()
81 .map(|c| &c.name)
82 .collect();
83
84 let mut new_columns = Vec::new();
85 let mut new_primary_key_indices = task.table_info.meta.primary_key_indices.clone();
86
87 for partition_column in partition_columns {
89 if !existing_column_names.contains(partition_column) {
90 let new_column_index =
91 task.table_info.meta.schema.column_schemas.len() + new_columns.len();
92
93 let column_schema = ColumnSchema::new(
95 partition_column.clone(),
96 ConcreteDataType::string_datatype(),
97 true,
98 );
99 new_columns.push(column_schema);
100
101 new_primary_key_indices.push(new_column_index);
103 }
104 }
105
106 if !new_columns.is_empty() {
108 let mut updated_columns = task.table_info.meta.schema.column_schemas.clone();
109 updated_columns.extend(new_columns);
110
111 let new_schema = RawSchema::new(updated_columns);
113
114 task.table_info.meta.schema = new_schema;
116 task.table_info.meta.primary_key_indices = new_primary_key_indices;
117 }
118 }
119
120 Ok(())
121 }
122
123 pub(crate) async fn allocate_table_ids(&mut self) -> Result<()> {
124 for (task, table_id) in self
125 .data
126 .tasks
127 .iter_mut()
128 .zip(self.data.table_ids_already_exists.iter())
129 {
130 let table_id = if let Some(table_id) = table_id {
131 *table_id
132 } else {
133 self.context
134 .table_metadata_allocator
135 .allocate_table_id(&task.create_table.table_id)
136 .await?
137 };
138 task.set_table_id(table_id);
139
140 task.sort_columns();
142 }
143
144 Ok(())
145 }
146}