common_meta/ddl/
create_table_template.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::v1::column_def::try_as_column_def;
18use api::v1::meta::Partition;
19use api::v1::region::{CreateRequest, RegionColumnDef};
20use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
21use common_telemetry::warn;
22use snafu::{OptionExt, ResultExt};
23use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
24use store_api::storage::{RegionId, RegionNumber};
25use table::metadata::{RawTableInfo, TableId};
26
27use crate::error::{self, Result};
28use crate::wal_options_allocator::prepare_wal_options;
29
30/// Builds a [CreateRequest] from a [RawTableInfo].
31///
32/// Note: **This method is only used for creating logical tables.**
33pub(crate) fn build_template_from_raw_table_info(
34    raw_table_info: &RawTableInfo,
35) -> Result<CreateRequest> {
36    let primary_key_indices = &raw_table_info.meta.primary_key_indices;
37    let column_defs = raw_table_info
38        .meta
39        .schema
40        .column_schemas
41        .iter()
42        .enumerate()
43        .map(|(i, c)| {
44            let is_primary_key = primary_key_indices.contains(&i);
45            let column_def = try_as_column_def(c, is_primary_key)
46                .context(error::ConvertColumnDefSnafu { column: &c.name })?;
47
48            Ok(RegionColumnDef {
49                column_def: Some(column_def),
50                // The column id will be overridden by the metric engine.
51                // So we just use the index as the column id.
52                column_id: i as u32,
53            })
54        })
55        .collect::<Result<Vec<_>>>()?;
56
57    let options = HashMap::from(&raw_table_info.meta.options);
58    let template = CreateRequest {
59        region_id: 0,
60        engine: METRIC_ENGINE_NAME.to_string(),
61        column_defs,
62        primary_key: primary_key_indices.iter().map(|i| *i as u32).collect(),
63        path: String::new(),
64        options,
65        partition: None,
66    };
67
68    Ok(template)
69}
70
71pub(crate) fn build_template(create_table_expr: &CreateTableExpr) -> Result<CreateRequest> {
72    let column_defs = create_table_expr
73        .column_defs
74        .iter()
75        .enumerate()
76        .map(|(i, c)| {
77            let semantic_type = if create_table_expr.time_index == c.name {
78                SemanticType::Timestamp
79            } else if create_table_expr.primary_keys.contains(&c.name) {
80                SemanticType::Tag
81            } else {
82                SemanticType::Field
83            };
84
85            RegionColumnDef {
86                column_def: Some(ColumnDef {
87                    name: c.name.clone(),
88                    data_type: c.data_type,
89                    is_nullable: c.is_nullable,
90                    default_constraint: c.default_constraint.clone(),
91                    semantic_type: semantic_type as i32,
92                    comment: String::new(),
93                    datatype_extension: c.datatype_extension,
94                    options: c.options.clone(),
95                }),
96                column_id: i as u32,
97            }
98        })
99        .collect::<Vec<_>>();
100
101    let primary_key = create_table_expr
102        .primary_keys
103        .iter()
104        .map(|key| {
105            column_defs
106                .iter()
107                .find_map(|c| {
108                    c.column_def.as_ref().and_then(|x| {
109                        if &x.name == key {
110                            Some(c.column_id)
111                        } else {
112                            None
113                        }
114                    })
115                })
116                .context(error::PrimaryKeyNotFoundSnafu { key })
117        })
118        .collect::<Result<_>>()?;
119
120    let template = CreateRequest {
121        region_id: 0,
122        engine: create_table_expr.engine.to_string(),
123        column_defs,
124        primary_key,
125        path: String::new(),
126        options: create_table_expr.table_options.clone(),
127        partition: None,
128    };
129
130    Ok(template)
131}
132
133/// Builder for [PbCreateRegionRequest].
134pub struct CreateRequestBuilder {
135    template: CreateRequest,
136    /// Optional. Only for metric engine.
137    physical_table_id: Option<TableId>,
138}
139
140impl CreateRequestBuilder {
141    pub(crate) fn new(template: CreateRequest, physical_table_id: Option<TableId>) -> Self {
142        Self {
143            template,
144            physical_table_id,
145        }
146    }
147
148    pub fn template(&self) -> &CreateRequest {
149        &self.template
150    }
151
152    pub fn build_one(
153        &self,
154        region_id: RegionId,
155        storage_path: String,
156        region_wal_options: &HashMap<RegionNumber, String>,
157        partition_exprs: &HashMap<RegionNumber, String>,
158    ) -> CreateRequest {
159        let mut request = self.template.clone();
160
161        request.region_id = region_id.as_u64();
162        request.path = storage_path;
163        // Stores the encoded wal options into the request options.
164        prepare_wal_options(&mut request.options, region_id, region_wal_options);
165        request.partition = Some(prepare_partition_expr(region_id, partition_exprs));
166
167        if let Some(physical_table_id) = self.physical_table_id {
168            // Logical table has the same region numbers with physical table, and they have a one-to-one mapping.
169            // For example, region 0 of logical table must resides with region 0 of physical table. So here we can
170            // simply concat the physical table id and the logical region number to get the physical region id.
171            let physical_region_id = RegionId::new(physical_table_id, region_id.region_number());
172
173            request.options.insert(
174                LOGICAL_TABLE_METADATA_KEY.to_string(),
175                physical_region_id.as_u64().to_string(),
176            );
177        }
178
179        request
180    }
181}
182
183fn prepare_partition_expr(
184    region_id: RegionId,
185    partition_exprs: &HashMap<RegionNumber, String>,
186) -> Partition {
187    let expr = partition_exprs.get(&region_id.region_number()).cloned();
188    if expr.is_none() {
189        warn!("region {} has no partition expr", region_id);
190    }
191
192    Partition {
193        expression: expr.unwrap_or_default(),
194        ..Default::default()
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use std::collections::HashMap;
201
202    use store_api::storage::{RegionId, RegionNumber};
203
204    use super::*;
205
206    #[test]
207    fn test_build_one_sets_partition_expr_per_region() {
208        // minimal template
209        let template = CreateRequest {
210            region_id: 0,
211            engine: "mito".to_string(),
212            column_defs: vec![],
213            primary_key: vec![],
214            path: String::new(),
215            options: Default::default(),
216            partition: None,
217        };
218        let builder = CreateRequestBuilder::new(template, None);
219
220        let mut partition_exprs: HashMap<RegionNumber, String> = HashMap::new();
221        let expr_a =
222            r#"{"Expr":{"lhs":{"Column":"a"},"op":"Eq","rhs":{"Value":{"UInt32":1}}}}"#.to_string();
223        partition_exprs.insert(0, expr_a.clone());
224
225        let r0 = builder.build_one(
226            RegionId::new(42, 0),
227            "/p".to_string(),
228            &Default::default(),
229            &partition_exprs,
230        );
231        assert_eq!(r0.partition.as_ref().unwrap().expression, expr_a);
232    }
233}