common_meta/ddl/utils/
raw_table_info.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use api::v1::SemanticType;
18use common_telemetry::debug;
19use common_telemetry::tracing::warn;
20use store_api::metadata::ColumnMetadata;
21use table::metadata::RawTableInfo;
22
23/// Generate the new physical table info.
24pub(crate) fn build_new_physical_table_info(
25    mut raw_table_info: RawTableInfo,
26    physical_columns: &[ColumnMetadata],
27) -> RawTableInfo {
28    debug!(
29        "building new physical table info for table: {}, table_id: {}",
30        raw_table_info.name, raw_table_info.ident.table_id
31    );
32    let existing_columns = raw_table_info
33        .meta
34        .schema
35        .column_schemas
36        .iter()
37        .map(|col| col.name.clone())
38        .collect::<HashSet<_>>();
39    let primary_key_indices = &mut raw_table_info.meta.primary_key_indices;
40    let value_indices = &mut raw_table_info.meta.value_indices;
41    value_indices.clear();
42    let time_index = &mut raw_table_info.meta.schema.timestamp_index;
43    let columns = &mut raw_table_info.meta.schema.column_schemas;
44    columns.clear();
45    let column_ids = &mut raw_table_info.meta.column_ids;
46    column_ids.clear();
47
48    for (idx, col) in physical_columns.iter().enumerate() {
49        match col.semantic_type {
50            SemanticType::Tag => {
51                // push new primary key to the end.
52                if !existing_columns.contains(&col.column_schema.name) {
53                    primary_key_indices.push(idx);
54                }
55            }
56            SemanticType::Field => value_indices.push(idx),
57            SemanticType::Timestamp => *time_index = Some(idx),
58        }
59
60        columns.push(col.column_schema.clone());
61        column_ids.push(col.column_id);
62    }
63
64    if let Some(time_index) = *time_index {
65        raw_table_info.meta.schema.column_schemas[time_index].set_time_index();
66    }
67
68    raw_table_info
69}
70
71/// Updates the column IDs in the table info based on the provided column metadata.
72///
73/// This function validates that the column metadata matches the existing table schema
74/// before updating the column ids. If the column metadata doesn't match the table schema,
75/// the table info remains unchanged.
76pub(crate) fn update_table_info_column_ids(
77    raw_table_info: &mut RawTableInfo,
78    column_metadatas: &[ColumnMetadata],
79) {
80    let mut table_column_names = raw_table_info
81        .meta
82        .schema
83        .column_schemas
84        .iter()
85        .map(|c| c.name.as_str())
86        .collect::<Vec<_>>();
87    table_column_names.sort_unstable();
88
89    let mut column_names = column_metadatas
90        .iter()
91        .map(|c| c.column_schema.name.as_str())
92        .collect::<Vec<_>>();
93    column_names.sort_unstable();
94
95    if table_column_names != column_names {
96        warn!(
97            "Column metadata doesn't match the table schema for table {}, table_id: {}, column in table: {:?}, column in metadata: {:?}",
98            raw_table_info.name,
99            raw_table_info.ident.table_id,
100            table_column_names,
101            column_names,
102        );
103        return;
104    }
105
106    let name_to_id = column_metadatas
107        .iter()
108        .map(|c| (c.column_schema.name.clone(), c.column_id))
109        .collect::<HashMap<_, _>>();
110
111    let schema = &raw_table_info.meta.schema.column_schemas;
112    let mut column_ids = Vec::with_capacity(schema.len());
113    for column_schema in schema {
114        if let Some(id) = name_to_id.get(&column_schema.name) {
115            column_ids.push(*id);
116        }
117    }
118
119    raw_table_info.meta.column_ids = column_ids;
120}