common_meta/ddl/utils/
raw_table_info.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use api::v1::SemanticType;
18use common_telemetry::debug;
19use common_telemetry::tracing::warn;
20use store_api::metadata::ColumnMetadata;
21use table::metadata::RawTableInfo;
22
23/// Generate the new physical table info.
24pub(crate) fn build_new_physical_table_info(
25    mut raw_table_info: RawTableInfo,
26    physical_columns: &[ColumnMetadata],
27) -> RawTableInfo {
28    debug!(
29        "building new physical table info for table: {}, table_id: {}",
30        raw_table_info.name, raw_table_info.ident.table_id
31    );
32    let existing_columns = raw_table_info
33        .meta
34        .schema
35        .column_schemas
36        .iter()
37        .map(|col| col.name.clone())
38        .collect::<HashSet<_>>();
39    let primary_key_indices = &mut raw_table_info.meta.primary_key_indices;
40    let value_indices = &mut raw_table_info.meta.value_indices;
41    value_indices.clear();
42    let time_index = &mut raw_table_info.meta.schema.timestamp_index;
43    let columns = &mut raw_table_info.meta.schema.column_schemas;
44    columns.clear();
45    let column_ids = &mut raw_table_info.meta.column_ids;
46    column_ids.clear();
47
48    for (idx, col) in physical_columns.iter().enumerate() {
49        match col.semantic_type {
50            SemanticType::Tag => {
51                // push new primary key to the end.
52                if !existing_columns.contains(&col.column_schema.name) {
53                    primary_key_indices.push(idx);
54                }
55            }
56            SemanticType::Field => value_indices.push(idx),
57            SemanticType::Timestamp => {
58                value_indices.push(idx);
59                *time_index = Some(idx);
60            }
61        }
62
63        columns.push(col.column_schema.clone());
64        column_ids.push(col.column_id);
65    }
66
67    if let Some(time_index) = *time_index {
68        raw_table_info.meta.schema.column_schemas[time_index].set_time_index();
69    }
70
71    raw_table_info
72}
73
74/// Updates the column IDs in the table info based on the provided column metadata.
75///
76/// This function validates that the column metadata matches the existing table schema
77/// before updating the column ids. If the column metadata doesn't match the table schema,
78/// the table info remains unchanged.
79pub(crate) fn update_table_info_column_ids(
80    raw_table_info: &mut RawTableInfo,
81    column_metadatas: &[ColumnMetadata],
82) {
83    let mut table_column_names = raw_table_info
84        .meta
85        .schema
86        .column_schemas
87        .iter()
88        .map(|c| c.name.as_str())
89        .collect::<Vec<_>>();
90    table_column_names.sort_unstable();
91
92    let mut column_names = column_metadatas
93        .iter()
94        .map(|c| c.column_schema.name.as_str())
95        .collect::<Vec<_>>();
96    column_names.sort_unstable();
97
98    if table_column_names != column_names {
99        warn!(
100            "Column metadata doesn't match the table schema for table {}, table_id: {}, column in table: {:?}, column in metadata: {:?}",
101            raw_table_info.name,
102            raw_table_info.ident.table_id,
103            table_column_names,
104            column_names,
105        );
106        return;
107    }
108
109    let name_to_id = column_metadatas
110        .iter()
111        .map(|c| (c.column_schema.name.clone(), c.column_id))
112        .collect::<HashMap<_, _>>();
113
114    let schema = &raw_table_info.meta.schema.column_schemas;
115    let mut column_ids = Vec::with_capacity(schema.len());
116    for column_schema in schema {
117        if let Some(id) = name_to_id.get(&column_schema.name) {
118            column_ids.push(*id);
119        }
120    }
121
122    raw_table_info.meta.column_ids = column_ids;
123}