common_meta/ddl/create_logical_tables/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use datatypes::prelude::ConcreteDataType;
18use datatypes::schema::{ColumnSchema, RawSchema};
19use snafu::OptionExt;
20
21use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
22use crate::error::Result;
23use crate::key::table_route::TableRouteValue;
24
25impl CreateLogicalTablesProcedure {
26    pub(crate) async fn fill_physical_table_info(&mut self) -> Result<()> {
27        let physical_region_numbers = self
28            .context
29            .table_metadata_manager
30            .table_route_manager()
31            .get_physical_table_route(self.data.physical_table_id)
32            .await
33            .map(|(_, route)| TableRouteValue::Physical(route).region_numbers())?;
34
35        self.data.physical_region_numbers = physical_region_numbers;
36
37        // Extract partition column names from the physical table
38        let physical_table_info = self
39            .context
40            .table_metadata_manager
41            .table_info_manager()
42            .get(self.data.physical_table_id)
43            .await?
44            .with_context(|| crate::error::TableInfoNotFoundSnafu {
45                table: format!("physical table {}", self.data.physical_table_id),
46            })?;
47
48        let physical_partition_columns: Vec<String> = physical_table_info
49            .table_info
50            .meta
51            .partition_key_indices
52            .iter()
53            .map(|&idx| {
54                physical_table_info.table_info.meta.schema.column_schemas[idx]
55                    .name
56                    .clone()
57            })
58            .collect();
59
60        self.data.physical_partition_columns = physical_partition_columns;
61
62        Ok(())
63    }
64
65    pub(crate) fn merge_partition_columns_into_logical_tables(&mut self) -> Result<()> {
66        let partition_columns = &self.data.physical_partition_columns;
67
68        // Skip if no partition columns to add
69        if partition_columns.is_empty() {
70            return Ok(());
71        }
72
73        for task in &mut self.data.tasks {
74            // Get existing column names in the logical table
75            let existing_column_names: HashSet<_> = task
76                .table_info
77                .meta
78                .schema
79                .column_schemas
80                .iter()
81                .map(|c| &c.name)
82                .collect();
83
84            let mut new_columns = Vec::new();
85            let mut new_primary_key_indices = task.table_info.meta.primary_key_indices.clone();
86
87            // Add missing partition columns
88            for partition_column in partition_columns {
89                if !existing_column_names.contains(partition_column) {
90                    let new_column_index =
91                        task.table_info.meta.schema.column_schemas.len() + new_columns.len();
92
93                    // Create new column schema for the partition column
94                    let column_schema = ColumnSchema::new(
95                        partition_column.clone(),
96                        ConcreteDataType::string_datatype(),
97                        true,
98                    );
99                    new_columns.push(column_schema);
100
101                    // Add to primary key indices (partition columns are part of primary key)
102                    new_primary_key_indices.push(new_column_index);
103                }
104            }
105
106            // If we added new columns, update the table info
107            if !new_columns.is_empty() {
108                let mut updated_columns = task.table_info.meta.schema.column_schemas.clone();
109                updated_columns.extend(new_columns);
110
111                // Create new schema with updated columns
112                let new_schema = RawSchema::new(updated_columns);
113
114                // Update the table info
115                task.table_info.meta.schema = new_schema;
116                task.table_info.meta.primary_key_indices = new_primary_key_indices;
117            }
118        }
119
120        Ok(())
121    }
122
123    pub(crate) async fn allocate_table_ids(&mut self) -> Result<()> {
124        for (task, table_id) in self
125            .data
126            .tasks
127            .iter_mut()
128            .zip(self.data.table_ids_already_exists.iter())
129        {
130            let table_id = if let Some(table_id) = table_id {
131                *table_id
132            } else {
133                self.context
134                    .table_metadata_allocator
135                    .allocate_table_id(&task.create_table.table_id)
136                    .await?
137            };
138            task.set_table_id(table_id);
139
140            // sort columns in task
141            task.sort_columns();
142        }
143
144        Ok(())
145    }
146}