common_meta/ddl/utils/
raw_table_info.rs1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use api::v1::SemanticType;
19use common_telemetry::debug;
20use common_telemetry::tracing::warn;
21use datatypes::schema::Schema;
22use store_api::metadata::ColumnMetadata;
23use table::metadata::TableInfo;
24
25pub(crate) fn build_new_physical_table_info(
27 mut table_info: TableInfo,
28 physical_columns: &[ColumnMetadata],
29) -> TableInfo {
30 debug!(
31 "building new physical table info for table: {}, table_id: {}",
32 table_info.name, table_info.ident.table_id
33 );
34 let existing_columns = table_info
35 .meta
36 .schema
37 .column_schemas()
38 .iter()
39 .map(|col| col.name.clone())
40 .collect::<HashSet<_>>();
41 let primary_key_indices = &mut table_info.meta.primary_key_indices;
42 let value_indices = &mut table_info.meta.value_indices;
43 value_indices.clear();
44 let column_ids = &mut table_info.meta.column_ids;
45 column_ids.clear();
46
47 let mut columns = Vec::with_capacity(physical_columns.len());
48 for (idx, col) in physical_columns.iter().enumerate() {
49 match col.semantic_type {
50 SemanticType::Tag => {
51 if !existing_columns.contains(&col.column_schema.name) {
53 primary_key_indices.push(idx);
54 }
55 }
56 SemanticType::Field => value_indices.push(idx),
57 SemanticType::Timestamp => {
58 value_indices.push(idx);
59 }
60 }
61
62 columns.push(col.column_schema.clone());
63 column_ids.push(col.column_id);
64 }
65
66 table_info.meta.schema = Arc::new(Schema::new_with_version(
67 columns,
68 table_info.meta.schema.version(),
69 ));
70 table_info
71}
72
73pub(crate) fn update_table_info_column_ids(
79 table_info: &mut TableInfo,
80 column_metadatas: &[ColumnMetadata],
81) {
82 let mut table_column_names = table_info
83 .meta
84 .schema
85 .column_schemas()
86 .iter()
87 .map(|c| c.name.as_str())
88 .collect::<Vec<_>>();
89 table_column_names.sort_unstable();
90
91 let mut column_names = column_metadatas
92 .iter()
93 .map(|c| c.column_schema.name.as_str())
94 .collect::<Vec<_>>();
95 column_names.sort_unstable();
96
97 if table_column_names != column_names {
98 warn!(
99 "Column metadata doesn't match the table schema for table {}, table_id: {}, column in table: {:?}, column in metadata: {:?}",
100 table_info.name, table_info.ident.table_id, table_column_names, column_names,
101 );
102 return;
103 }
104
105 let name_to_id = column_metadatas
106 .iter()
107 .map(|c| (c.column_schema.name.clone(), c.column_id))
108 .collect::<HashMap<_, _>>();
109
110 let schema = table_info.meta.schema.column_schemas();
111 let mut column_ids = Vec::with_capacity(schema.len());
112 for column_schema in schema {
113 if let Some(id) = name_to_id.get(&column_schema.name) {
114 column_ids.push(*id);
115 }
116 }
117
118 table_info.meta.column_ids = column_ids;
119}