common_meta/ddl/create_logical_tables/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use datatypes::prelude::ConcreteDataType;
19use datatypes::schema::{ColumnSchema, Schema};
20use snafu::OptionExt;
21
22use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
23use crate::error::Result;
24use crate::key::table_route::TableRouteValue;
25
26impl CreateLogicalTablesProcedure {
27    pub(crate) async fn fill_physical_table_info(&mut self) -> Result<()> {
28        let physical_region_numbers = self
29            .context
30            .table_metadata_manager
31            .table_route_manager()
32            .get_physical_table_route(self.data.physical_table_id)
33            .await
34            .map(|(_, route)| TableRouteValue::Physical(route).region_numbers())?;
35
36        self.data.physical_region_numbers = physical_region_numbers;
37
38        // Extract partition column names from the physical table
39        let physical_table_info = self
40            .context
41            .table_metadata_manager
42            .table_info_manager()
43            .get(self.data.physical_table_id)
44            .await?
45            .with_context(|| crate::error::TableInfoNotFoundSnafu {
46                table: format!("physical table {}", self.data.physical_table_id),
47            })?;
48
49        let physical_partition_columns: Vec<String> = physical_table_info
50            .table_info
51            .meta
52            .partition_key_indices
53            .iter()
54            .map(|&idx| {
55                physical_table_info.table_info.meta.schema.column_schemas()[idx]
56                    .name
57                    .clone()
58            })
59            .collect();
60
61        self.data.physical_partition_columns = physical_partition_columns;
62
63        Ok(())
64    }
65
66    pub(crate) fn merge_partition_columns_into_logical_tables(&mut self) -> Result<()> {
67        let partition_columns = &self.data.physical_partition_columns;
68
69        // Skip if no partition columns to add
70        if partition_columns.is_empty() {
71            return Ok(());
72        }
73
74        for task in &mut self.data.tasks {
75            // Get existing column names in the logical table
76            let column_schemas = task.table_info.meta.schema.column_schemas();
77            let existing_column_names: HashSet<_> =
78                column_schemas.iter().map(|c| &c.name).collect();
79
80            let mut new_columns = Vec::new();
81            let mut new_primary_key_indices = task.table_info.meta.primary_key_indices.clone();
82
83            // Add missing partition columns
84            for partition_column in partition_columns {
85                if !existing_column_names.contains(partition_column) {
86                    let new_column_index = column_schemas.len() + new_columns.len();
87
88                    // Create new column schema for the partition column
89                    let column_schema = ColumnSchema::new(
90                        partition_column.clone(),
91                        ConcreteDataType::string_datatype(),
92                        true,
93                    );
94                    new_columns.push(column_schema);
95
96                    // Add to primary key indices (partition columns are part of primary key)
97                    new_primary_key_indices.push(new_column_index);
98                }
99            }
100
101            // If we added new columns, update the table info
102            if !new_columns.is_empty() {
103                let mut updated_columns = column_schemas.to_vec();
104                updated_columns.extend(new_columns);
105
106                // Create new schema with updated columns
107                let new_schema = Arc::new(Schema::new(updated_columns));
108
109                // Update the table info
110                task.table_info.meta.schema = new_schema;
111                task.table_info.meta.primary_key_indices = new_primary_key_indices;
112            }
113        }
114
115        Ok(())
116    }
117
118    pub(crate) async fn allocate_table_ids(&mut self) -> Result<()> {
119        for (task, table_id) in self
120            .data
121            .tasks
122            .iter_mut()
123            .zip(self.data.table_ids_already_exists.iter())
124        {
125            let table_id = if let Some(table_id) = table_id {
126                *table_id
127            } else {
128                self.context
129                    .table_metadata_allocator
130                    .allocate_table_id(&task.create_table.table_id)
131                    .await?
132            };
133            task.set_table_id(table_id);
134
135            // sort columns in task
136            task.sort_columns();
137        }
138
139        Ok(())
140    }
141}