common_meta/ddl/create_table/
template.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::v1::column_def::try_as_column_def;
18use api::v1::meta::Partition;
19use api::v1::region::{CreateRequest, RegionColumnDef};
20use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
21use common_telemetry::warn;
22use snafu::{OptionExt, ResultExt};
23use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
24use store_api::storage::{RegionId, RegionNumber};
25use table::metadata::{RawTableInfo, TableId};
26
27use crate::error::{self, Result};
28use crate::wal_provider::prepare_wal_options;
29
30/// Constructs a [CreateRequest] based on the provided [RawTableInfo].
31///
32/// Note: This function is primarily intended for creating logical tables or allocating placeholder regions.
33pub fn build_template_from_raw_table_info(raw_table_info: &RawTableInfo) -> Result<CreateRequest> {
34    let primary_key_indices = &raw_table_info.meta.primary_key_indices;
35    let column_defs = raw_table_info
36        .meta
37        .schema
38        .column_schemas
39        .iter()
40        .enumerate()
41        .map(|(i, c)| {
42            let is_primary_key = primary_key_indices.contains(&i);
43            let column_def = try_as_column_def(c, is_primary_key)
44                .context(error::ConvertColumnDefSnafu { column: &c.name })?;
45
46            Ok(RegionColumnDef {
47                column_def: Some(column_def),
48                // The column id will be overridden by the metric engine.
49                // So we just use the index as the column id.
50                column_id: i as u32,
51            })
52        })
53        .collect::<Result<Vec<_>>>()?;
54
55    let options = HashMap::from(&raw_table_info.meta.options);
56    let template = CreateRequest {
57        region_id: 0,
58        engine: raw_table_info.meta.engine.clone(),
59        column_defs,
60        primary_key: primary_key_indices.iter().map(|i| *i as u32).collect(),
61        path: String::new(),
62        options,
63        partition: None,
64    };
65
66    Ok(template)
67}
68
69pub(crate) fn build_template(create_table_expr: &CreateTableExpr) -> Result<CreateRequest> {
70    let column_defs = create_table_expr
71        .column_defs
72        .iter()
73        .enumerate()
74        .map(|(i, c)| {
75            let semantic_type = if create_table_expr.time_index == c.name {
76                SemanticType::Timestamp
77            } else if create_table_expr.primary_keys.contains(&c.name) {
78                SemanticType::Tag
79            } else {
80                SemanticType::Field
81            };
82
83            RegionColumnDef {
84                column_def: Some(ColumnDef {
85                    name: c.name.clone(),
86                    data_type: c.data_type,
87                    is_nullable: c.is_nullable,
88                    default_constraint: c.default_constraint.clone(),
89                    semantic_type: semantic_type as i32,
90                    comment: String::new(),
91                    datatype_extension: c.datatype_extension.clone(),
92                    options: c.options.clone(),
93                }),
94                column_id: i as u32,
95            }
96        })
97        .collect::<Vec<_>>();
98
99    let primary_key = create_table_expr
100        .primary_keys
101        .iter()
102        .map(|key| {
103            column_defs
104                .iter()
105                .find_map(|c| {
106                    c.column_def.as_ref().and_then(|x| {
107                        if &x.name == key {
108                            Some(c.column_id)
109                        } else {
110                            None
111                        }
112                    })
113                })
114                .context(error::PrimaryKeyNotFoundSnafu { key })
115        })
116        .collect::<Result<_>>()?;
117
118    let template = CreateRequest {
119        region_id: 0,
120        engine: create_table_expr.engine.clone(),
121        column_defs,
122        primary_key,
123        path: String::new(),
124        options: create_table_expr.table_options.clone(),
125        partition: None,
126    };
127
128    Ok(template)
129}
130
131/// Builder for [PbCreateRegionRequest].
132pub struct CreateRequestBuilder {
133    template: CreateRequest,
134    /// Optional. Only for metric engine.
135    physical_table_id: Option<TableId>,
136}
137
138impl CreateRequestBuilder {
139    pub fn new(template: CreateRequest, physical_table_id: Option<TableId>) -> Self {
140        Self {
141            template,
142            physical_table_id,
143        }
144    }
145
146    pub fn template(&self) -> &CreateRequest {
147        &self.template
148    }
149
150    pub fn build_one(
151        &self,
152        region_id: RegionId,
153        storage_path: String,
154        region_wal_options: &HashMap<RegionNumber, String>,
155        partition_exprs: &HashMap<RegionNumber, String>,
156    ) -> CreateRequest {
157        let mut request = self.template.clone();
158
159        request.region_id = region_id.as_u64();
160        request.path = storage_path;
161        // Stores the encoded wal options into the request options.
162        prepare_wal_options(&mut request.options, region_id, region_wal_options);
163        request.partition = Some(prepare_partition_expr(region_id, partition_exprs));
164
165        if let Some(physical_table_id) = self.physical_table_id {
166            // Logical table has the same region numbers with physical table, and they have a one-to-one mapping.
167            // For example, region 0 of logical table must resides with region 0 of physical table. So here we can
168            // simply concat the physical table id and the logical region number to get the physical region id.
169            let physical_region_id = RegionId::new(physical_table_id, region_id.region_number());
170
171            request.options.insert(
172                LOGICAL_TABLE_METADATA_KEY.to_string(),
173                physical_region_id.as_u64().to_string(),
174            );
175        }
176
177        request
178    }
179}
180
181fn prepare_partition_expr(
182    region_id: RegionId,
183    partition_exprs: &HashMap<RegionNumber, String>,
184) -> Partition {
185    let expr = partition_exprs.get(&region_id.region_number()).cloned();
186    if expr.is_none() {
187        warn!("region {} has no partition expr", region_id);
188    }
189
190    Partition {
191        expression: expr.unwrap_or_default(),
192        ..Default::default()
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use std::collections::HashMap;
199
200    use store_api::storage::{RegionId, RegionNumber};
201
202    use super::*;
203
204    #[test]
205    fn test_build_one_sets_partition_expr_per_region() {
206        // minimal template
207        let template = CreateRequest {
208            region_id: 0,
209            engine: "mito".to_string(),
210            column_defs: vec![],
211            primary_key: vec![],
212            path: String::new(),
213            options: Default::default(),
214            partition: None,
215        };
216        let builder = CreateRequestBuilder::new(template, None);
217
218        let mut partition_exprs: HashMap<RegionNumber, String> = HashMap::new();
219        let expr_a =
220            r#"{"Expr":{"lhs":{"Column":"a"},"op":"Eq","rhs":{"Value":{"UInt32":1}}}}"#.to_string();
221        partition_exprs.insert(0, expr_a.clone());
222
223        let r0 = builder.build_one(
224            RegionId::new(42, 0),
225            "/p".to_string(),
226            &Default::default(),
227            &partition_exprs,
228        );
229        assert_eq!(r0.partition.as_ref().unwrap().expression, expr_a);
230    }
231}