common_meta/ddl/utils/
raw_table_info.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use api::v1::SemanticType;
19use common_telemetry::debug;
20use common_telemetry::tracing::warn;
21use datatypes::schema::Schema;
22use store_api::metadata::ColumnMetadata;
23use table::metadata::TableInfo;
24
25/// Generate the new physical table info.
26pub(crate) fn build_new_physical_table_info(
27    mut table_info: TableInfo,
28    physical_columns: &[ColumnMetadata],
29) -> TableInfo {
30    debug!(
31        "building new physical table info for table: {}, table_id: {}",
32        table_info.name, table_info.ident.table_id
33    );
34    let existing_columns = table_info
35        .meta
36        .schema
37        .column_schemas()
38        .iter()
39        .map(|col| col.name.clone())
40        .collect::<HashSet<_>>();
41    let primary_key_indices = &mut table_info.meta.primary_key_indices;
42    let value_indices = &mut table_info.meta.value_indices;
43    value_indices.clear();
44    let column_ids = &mut table_info.meta.column_ids;
45    column_ids.clear();
46
47    let mut columns = Vec::with_capacity(physical_columns.len());
48    for (idx, col) in physical_columns.iter().enumerate() {
49        match col.semantic_type {
50            SemanticType::Tag => {
51                // push new primary key to the end.
52                if !existing_columns.contains(&col.column_schema.name) {
53                    primary_key_indices.push(idx);
54                }
55            }
56            SemanticType::Field => value_indices.push(idx),
57            SemanticType::Timestamp => {
58                value_indices.push(idx);
59            }
60        }
61
62        columns.push(col.column_schema.clone());
63        column_ids.push(col.column_id);
64    }
65
66    table_info.meta.schema = Arc::new(Schema::new_with_version(
67        columns,
68        table_info.meta.schema.version(),
69    ));
70    table_info
71}
72
73/// Updates the column IDs in the table info based on the provided column metadata.
74///
75/// This function validates that the column metadata matches the existing table schema
76/// before updating the column ids. If the column metadata doesn't match the table schema,
77/// the table info remains unchanged.
78pub(crate) fn update_table_info_column_ids(
79    table_info: &mut TableInfo,
80    column_metadatas: &[ColumnMetadata],
81) {
82    let mut table_column_names = table_info
83        .meta
84        .schema
85        .column_schemas()
86        .iter()
87        .map(|c| c.name.as_str())
88        .collect::<Vec<_>>();
89    table_column_names.sort_unstable();
90
91    let mut column_names = column_metadatas
92        .iter()
93        .map(|c| c.column_schema.name.as_str())
94        .collect::<Vec<_>>();
95    column_names.sort_unstable();
96
97    if table_column_names != column_names {
98        warn!(
99            "Column metadata doesn't match the table schema for table {}, table_id: {}, column in table: {:?}, column in metadata: {:?}",
100            table_info.name, table_info.ident.table_id, table_column_names, column_names,
101        );
102        return;
103    }
104
105    let name_to_id = column_metadatas
106        .iter()
107        .map(|c| (c.column_schema.name.clone(), c.column_id))
108        .collect::<HashMap<_, _>>();
109
110    let schema = table_info.meta.schema.column_schemas();
111    let mut column_ids = Vec::with_capacity(schema.len());
112    for column_schema in schema {
113        if let Some(id) = name_to_id.get(&column_schema.name) {
114            column_ids.push(*id);
115        }
116    }
117
118    table_info.meta.column_ids = column_ids;
119}