common_meta/ddl/create_table/
template.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::v1::column_def::try_as_column_def;
18use api::v1::meta::Partition;
19use api::v1::region::{CreateRequest, RegionColumnDef};
20use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
21use common_telemetry::warn;
22use snafu::{OptionExt, ResultExt};
23use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
24use store_api::storage::{RegionId, RegionNumber};
25use table::metadata::{TableId, TableInfo};
26
27use crate::error::{self, Result};
28use crate::reconciliation::utils::build_column_metadata_from_table_info;
29use crate::wal_provider::prepare_wal_options;
30
31/// Constructs a [CreateRequest] based on the provided [TableInfo].
32///
33/// Note: This function is primarily intended for creating logical tables.
34///
35/// Logical table templates keep the original column order and primary key indices from
36/// `TableInfo` (including internal columns when present), because these are used to
37/// reconstruct the logical schema on the engine side.
38pub fn build_template_from_raw_table_info(table_info: &TableInfo) -> Result<CreateRequest> {
39    let primary_key_indices = &table_info.meta.primary_key_indices;
40    let column_defs = table_info
41        .meta
42        .schema
43        .column_schemas()
44        .iter()
45        .enumerate()
46        .map(|(i, c)| {
47            let is_primary_key = primary_key_indices.contains(&i);
48            let column_def = try_as_column_def(c, is_primary_key)
49                .context(error::ConvertColumnDefSnafu { column: &c.name })?;
50            Ok(RegionColumnDef {
51                column_def: Some(column_def),
52                // The column id will be overridden by the metric engine.
53                // So we just use the index as the column id.
54                column_id: i as u32,
55            })
56        })
57        .collect::<Result<Vec<_>>>()?;
58
59    let options = HashMap::from(&table_info.meta.options);
60    let template = CreateRequest {
61        region_id: 0,
62        engine: table_info.meta.engine.clone(),
63        column_defs,
64        primary_key: table_info
65            .meta
66            .primary_key_indices
67            .iter()
68            .map(|i| *i as u32)
69            .collect(),
70        path: String::new(),
71        options,
72        partition: None,
73    };
74
75    Ok(template)
76}
77
78/// Constructs a [CreateRequest] based on the provided [TableInfo] for physical table.
79///
80/// Note: This function is primarily intended for creating physical table.
81///
82/// Physical table templates mark primary
83/// keys by tag semantic type to match the physical storage layout.
84pub fn build_template_from_raw_table_info_for_physical_table(
85    table_info: &TableInfo,
86) -> Result<CreateRequest> {
87    let name_to_ids = table_info
88        .name_to_ids()
89        .context(error::MissingColumnIdsSnafu)?;
90    let column_metadatas = build_column_metadata_from_table_info(
91        table_info.meta.schema.column_schemas(),
92        &table_info.meta.primary_key_indices,
93        &name_to_ids,
94    )?;
95    let column_defs = column_metadatas
96        .iter()
97        .map(|c| {
98            let column_def =
99                try_as_column_def(&c.column_schema, c.semantic_type == SemanticType::Tag).context(
100                    error::ConvertColumnDefSnafu {
101                        column: &c.column_schema.name,
102                    },
103                )?;
104            let region_column_def = RegionColumnDef {
105                column_def: Some(column_def),
106                column_id: c.column_id,
107            };
108
109            Ok(region_column_def)
110        })
111        .collect::<Result<Vec<_>>>()?;
112    // Preserve the order of primary key indices as defined in the original table info.
113    let primary_key = table_info
114        .meta
115        .primary_key_indices
116        .iter()
117        .map(|idx| column_metadatas[*idx].column_id)
118        .collect();
119
120    let options = HashMap::from(&table_info.meta.options);
121    let template = CreateRequest {
122        region_id: 0,
123        engine: table_info.meta.engine.clone(),
124        column_defs,
125        primary_key,
126        path: String::new(),
127        options,
128        partition: None,
129    };
130
131    Ok(template)
132}
133
134pub(crate) fn build_template(create_table_expr: &CreateTableExpr) -> Result<CreateRequest> {
135    let column_defs = create_table_expr
136        .column_defs
137        .iter()
138        .enumerate()
139        .map(|(i, c)| {
140            let semantic_type = if create_table_expr.time_index == c.name {
141                SemanticType::Timestamp
142            } else if create_table_expr.primary_keys.contains(&c.name) {
143                SemanticType::Tag
144            } else {
145                SemanticType::Field
146            };
147
148            RegionColumnDef {
149                column_def: Some(ColumnDef {
150                    name: c.name.clone(),
151                    data_type: c.data_type,
152                    is_nullable: c.is_nullable,
153                    default_constraint: c.default_constraint.clone(),
154                    semantic_type: semantic_type as i32,
155                    comment: String::new(),
156                    datatype_extension: c.datatype_extension.clone(),
157                    options: c.options.clone(),
158                }),
159                column_id: i as u32,
160            }
161        })
162        .collect::<Vec<_>>();
163
164    let primary_key = create_table_expr
165        .primary_keys
166        .iter()
167        .map(|key| {
168            column_defs
169                .iter()
170                .find_map(|c| {
171                    c.column_def.as_ref().and_then(|x| {
172                        if &x.name == key {
173                            Some(c.column_id)
174                        } else {
175                            None
176                        }
177                    })
178                })
179                .context(error::PrimaryKeyNotFoundSnafu { key })
180        })
181        .collect::<Result<_>>()?;
182
183    let template = CreateRequest {
184        region_id: 0,
185        engine: create_table_expr.engine.clone(),
186        column_defs,
187        primary_key,
188        path: String::new(),
189        options: create_table_expr.table_options.clone(),
190        partition: None,
191    };
192
193    Ok(template)
194}
195
196/// Builder for [PbCreateRegionRequest].
197pub struct CreateRequestBuilder {
198    template: CreateRequest,
199    /// Optional. Only for metric engine.
200    physical_table_id: Option<TableId>,
201}
202
203impl CreateRequestBuilder {
204    pub fn new(template: CreateRequest, physical_table_id: Option<TableId>) -> Self {
205        Self {
206            template,
207            physical_table_id,
208        }
209    }
210
211    pub fn template(&self) -> &CreateRequest {
212        &self.template
213    }
214
215    pub fn build_one(
216        &self,
217        region_id: RegionId,
218        storage_path: String,
219        region_wal_options: &HashMap<RegionNumber, String>,
220        partition_exprs: &HashMap<RegionNumber, String>,
221    ) -> CreateRequest {
222        let mut request = self.template.clone();
223
224        request.region_id = region_id.as_u64();
225        request.path = storage_path;
226        // Stores the encoded wal options into the request options.
227        prepare_wal_options(&mut request.options, region_id, region_wal_options);
228        request.partition = Some(prepare_partition_expr(region_id, partition_exprs));
229
230        if let Some(physical_table_id) = self.physical_table_id {
231            // Logical table has the same region numbers with physical table, and they have a one-to-one mapping.
232            // For example, region 0 of logical table must resides with region 0 of physical table. So here we can
233            // simply concat the physical table id and the logical region number to get the physical region id.
234            let physical_region_id = RegionId::new(physical_table_id, region_id.region_number());
235
236            request.options.insert(
237                LOGICAL_TABLE_METADATA_KEY.to_string(),
238                physical_region_id.as_u64().to_string(),
239            );
240        }
241
242        request
243    }
244}
245
246fn prepare_partition_expr(
247    region_id: RegionId,
248    partition_exprs: &HashMap<RegionNumber, String>,
249) -> Partition {
250    let expr = partition_exprs.get(&region_id.region_number()).cloned();
251    if expr.is_none() {
252        warn!("region {} has no partition expr", region_id);
253    }
254
255    Partition {
256        expression: expr.unwrap_or_default(),
257        ..Default::default()
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use std::collections::HashMap;
264
265    use store_api::storage::{RegionId, RegionNumber};
266
267    use super::*;
268    use crate::key::test_utils;
269
270    #[test]
271    fn test_build_one_sets_partition_expr_per_region() {
272        // minimal template
273        let template = CreateRequest {
274            region_id: 0,
275            engine: "mito".to_string(),
276            column_defs: vec![],
277            primary_key: vec![],
278            path: String::new(),
279            options: Default::default(),
280            partition: None,
281        };
282        let builder = CreateRequestBuilder::new(template, None);
283
284        let mut partition_exprs: HashMap<RegionNumber, String> = HashMap::new();
285        let expr_a =
286            r#"{"Expr":{"lhs":{"Column":"a"},"op":"Eq","rhs":{"Value":{"UInt32":1}}}}"#.to_string();
287        partition_exprs.insert(0, expr_a.clone());
288
289        let r0 = builder.build_one(
290            RegionId::new(42, 0),
291            "/p".to_string(),
292            &Default::default(),
293            &partition_exprs,
294        );
295        assert_eq!(r0.partition.as_ref().unwrap().expression, expr_a);
296    }
297
298    #[test]
299    fn test_build_template_for_physical_table_primary_key_matches_indices() {
300        let mut table_info = test_utils::new_test_table_info(42);
301        table_info.meta.primary_key_indices = vec![0, 2];
302        table_info.meta.column_ids = vec![10, 20, 30];
303
304        let template = build_template_from_raw_table_info_for_physical_table(&table_info).unwrap();
305        assert_eq!(template.primary_key, vec![10, 30]);
306    }
307}