common_meta/ddl/utils/
raw_table_info.rs1use std::collections::{HashMap, HashSet};
16
17use api::v1::SemanticType;
18use common_telemetry::debug;
19use common_telemetry::tracing::warn;
20use store_api::metadata::ColumnMetadata;
21use table::metadata::RawTableInfo;
22
23pub(crate) fn build_new_physical_table_info(
25 mut raw_table_info: RawTableInfo,
26 physical_columns: &[ColumnMetadata],
27) -> RawTableInfo {
28 debug!(
29 "building new physical table info for table: {}, table_id: {}",
30 raw_table_info.name, raw_table_info.ident.table_id
31 );
32 let existing_columns = raw_table_info
33 .meta
34 .schema
35 .column_schemas
36 .iter()
37 .map(|col| col.name.clone())
38 .collect::<HashSet<_>>();
39 let primary_key_indices = &mut raw_table_info.meta.primary_key_indices;
40 let value_indices = &mut raw_table_info.meta.value_indices;
41 value_indices.clear();
42 let time_index = &mut raw_table_info.meta.schema.timestamp_index;
43 let columns = &mut raw_table_info.meta.schema.column_schemas;
44 columns.clear();
45 let column_ids = &mut raw_table_info.meta.column_ids;
46 column_ids.clear();
47
48 for (idx, col) in physical_columns.iter().enumerate() {
49 match col.semantic_type {
50 SemanticType::Tag => {
51 if !existing_columns.contains(&col.column_schema.name) {
53 primary_key_indices.push(idx);
54 }
55 }
56 SemanticType::Field => value_indices.push(idx),
57 SemanticType::Timestamp => *time_index = Some(idx),
58 }
59
60 columns.push(col.column_schema.clone());
61 column_ids.push(col.column_id);
62 }
63
64 if let Some(time_index) = *time_index {
65 raw_table_info.meta.schema.column_schemas[time_index].set_time_index();
66 }
67
68 raw_table_info
69}
70
71pub(crate) fn update_table_info_column_ids(
77 raw_table_info: &mut RawTableInfo,
78 column_metadatas: &[ColumnMetadata],
79) {
80 let mut table_column_names = raw_table_info
81 .meta
82 .schema
83 .column_schemas
84 .iter()
85 .map(|c| c.name.as_str())
86 .collect::<Vec<_>>();
87 table_column_names.sort_unstable();
88
89 let mut column_names = column_metadatas
90 .iter()
91 .map(|c| c.column_schema.name.as_str())
92 .collect::<Vec<_>>();
93 column_names.sort_unstable();
94
95 if table_column_names != column_names {
96 warn!(
97 "Column metadata doesn't match the table schema for table {}, table_id: {}, column in table: {:?}, column in metadata: {:?}",
98 raw_table_info.name,
99 raw_table_info.ident.table_id,
100 table_column_names,
101 column_names,
102 );
103 return;
104 }
105
106 let name_to_id = column_metadatas
107 .iter()
108 .map(|c| (c.column_schema.name.clone(), c.column_id))
109 .collect::<HashMap<_, _>>();
110
111 let schema = &raw_table_info.meta.schema.column_schemas;
112 let mut column_ids = Vec::with_capacity(schema.len());
113 for column_schema in schema {
114 if let Some(id) = name_to_id.get(&column_schema.name) {
115 column_ids.push(*id);
116 }
117 }
118
119 raw_table_info.meta.column_ids = column_ids;
120}