common_meta/ddl/utils/
raw_table_info.rs1use std::collections::{HashMap, HashSet};
16
17use api::v1::SemanticType;
18use common_telemetry::debug;
19use common_telemetry::tracing::warn;
20use store_api::metadata::ColumnMetadata;
21use table::metadata::RawTableInfo;
22
23pub(crate) fn build_new_physical_table_info(
25 mut raw_table_info: RawTableInfo,
26 physical_columns: &[ColumnMetadata],
27) -> RawTableInfo {
28 debug!(
29 "building new physical table info for table: {}, table_id: {}",
30 raw_table_info.name, raw_table_info.ident.table_id
31 );
32 let existing_columns = raw_table_info
33 .meta
34 .schema
35 .column_schemas
36 .iter()
37 .map(|col| col.name.clone())
38 .collect::<HashSet<_>>();
39 let primary_key_indices = &mut raw_table_info.meta.primary_key_indices;
40 let value_indices = &mut raw_table_info.meta.value_indices;
41 value_indices.clear();
42 let time_index = &mut raw_table_info.meta.schema.timestamp_index;
43 let columns = &mut raw_table_info.meta.schema.column_schemas;
44 columns.clear();
45 let column_ids = &mut raw_table_info.meta.column_ids;
46 column_ids.clear();
47
48 for (idx, col) in physical_columns.iter().enumerate() {
49 match col.semantic_type {
50 SemanticType::Tag => {
51 if !existing_columns.contains(&col.column_schema.name) {
53 primary_key_indices.push(idx);
54 }
55 }
56 SemanticType::Field => value_indices.push(idx),
57 SemanticType::Timestamp => {
58 value_indices.push(idx);
59 *time_index = Some(idx);
60 }
61 }
62
63 columns.push(col.column_schema.clone());
64 column_ids.push(col.column_id);
65 }
66
67 if let Some(time_index) = *time_index {
68 raw_table_info.meta.schema.column_schemas[time_index].set_time_index();
69 }
70
71 raw_table_info
72}
73
74pub(crate) fn update_table_info_column_ids(
80 raw_table_info: &mut RawTableInfo,
81 column_metadatas: &[ColumnMetadata],
82) {
83 let mut table_column_names = raw_table_info
84 .meta
85 .schema
86 .column_schemas
87 .iter()
88 .map(|c| c.name.as_str())
89 .collect::<Vec<_>>();
90 table_column_names.sort_unstable();
91
92 let mut column_names = column_metadatas
93 .iter()
94 .map(|c| c.column_schema.name.as_str())
95 .collect::<Vec<_>>();
96 column_names.sort_unstable();
97
98 if table_column_names != column_names {
99 warn!(
100 "Column metadata doesn't match the table schema for table {}, table_id: {}, column in table: {:?}, column in metadata: {:?}",
101 raw_table_info.name,
102 raw_table_info.ident.table_id,
103 table_column_names,
104 column_names,
105 );
106 return;
107 }
108
109 let name_to_id = column_metadatas
110 .iter()
111 .map(|c| (c.column_schema.name.clone(), c.column_id))
112 .collect::<HashMap<_, _>>();
113
114 let schema = &raw_table_info.meta.schema.column_schemas;
115 let mut column_ids = Vec::with_capacity(schema.len());
116 for column_schema in schema {
117 if let Some(id) = name_to_id.get(&column_schema.name) {
118 column_ids.push(*id);
119 }
120 }
121
122 raw_table_info.meta.column_ids = column_ids;
123}