Skip to main content

common_meta/ddl/create_table/
template.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::v1::column_def::try_as_column_def;
18use api::v1::meta::Partition;
19use api::v1::region::{CreateRequest, RegionColumnDef};
20use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
21use common_telemetry::warn;
22use snafu::{OptionExt, ResultExt};
23use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
24use store_api::region_request::RegionRequirements;
25use store_api::storage::{RegionId, RegionNumber};
26use table::metadata::{TableId, TableInfo};
27
28use crate::error::{self, Result, SerializeWalOptionsSnafu};
29use crate::reconciliation::utils::build_column_metadata_from_table_info;
30use crate::wal_provider::{RegionWalOptions, serialize_wal_options};
31
32/// Constructs a [CreateRequest] based on the provided [TableInfo].
33///
34/// Note: This function is primarily intended for creating logical tables.
35///
36/// Logical table templates keep the original column order and primary key indices from
37/// `TableInfo` (including internal columns when present), because these are used to
38/// reconstruct the logical schema on the engine side.
39pub fn build_template_from_raw_table_info(table_info: &TableInfo) -> Result<CreateRequest> {
40    let primary_key_indices = &table_info.meta.primary_key_indices;
41    let column_defs = table_info
42        .meta
43        .schema
44        .column_schemas()
45        .iter()
46        .enumerate()
47        .map(|(i, c)| {
48            let is_primary_key = primary_key_indices.contains(&i);
49            let column_def = try_as_column_def(c, is_primary_key)
50                .context(error::ConvertColumnDefSnafu { column: &c.name })?;
51            Ok(RegionColumnDef {
52                column_def: Some(column_def),
53                // The column id will be overridden by the metric engine.
54                // So we just use the index as the column id.
55                column_id: i as u32,
56            })
57        })
58        .collect::<Result<Vec<_>>>()?;
59
60    let options = HashMap::from(&table_info.meta.options);
61    let template = CreateRequest {
62        region_id: 0,
63        engine: table_info.meta.engine.clone(),
64        column_defs,
65        primary_key: table_info
66            .meta
67            .primary_key_indices
68            .iter()
69            .map(|i| *i as u32)
70            .collect(),
71        path: String::new(),
72        options,
73        partition: None,
74        requirements: None,
75    };
76
77    Ok(template)
78}
79
80/// Constructs a [CreateRequest] based on the provided [TableInfo] for physical table.
81///
82/// Note: This function is primarily intended for creating physical table.
83///
84/// Physical table templates mark primary
85/// keys by tag semantic type to match the physical storage layout.
86pub fn build_template_from_raw_table_info_for_physical_table(
87    table_info: &TableInfo,
88) -> Result<CreateRequest> {
89    let name_to_ids = table_info
90        .name_to_ids()
91        .context(error::MissingColumnIdsSnafu)?;
92    let column_metadatas = build_column_metadata_from_table_info(
93        table_info.meta.schema.column_schemas(),
94        &table_info.meta.primary_key_indices,
95        &name_to_ids,
96    )?;
97    let column_defs = column_metadatas
98        .iter()
99        .map(|c| {
100            let column_def =
101                try_as_column_def(&c.column_schema, c.semantic_type == SemanticType::Tag).context(
102                    error::ConvertColumnDefSnafu {
103                        column: &c.column_schema.name,
104                    },
105                )?;
106            let region_column_def = RegionColumnDef {
107                column_def: Some(column_def),
108                column_id: c.column_id,
109            };
110
111            Ok(region_column_def)
112        })
113        .collect::<Result<Vec<_>>>()?;
114    // Preserve the order of primary key indices as defined in the original table info.
115    let primary_key = table_info
116        .meta
117        .primary_key_indices
118        .iter()
119        .map(|idx| column_metadatas[*idx].column_id)
120        .collect();
121
122    let options = HashMap::from(&table_info.meta.options);
123    let template = CreateRequest {
124        region_id: 0,
125        engine: table_info.meta.engine.clone(),
126        column_defs,
127        primary_key,
128        path: String::new(),
129        options,
130        partition: None,
131        requirements: None,
132    };
133
134    Ok(template)
135}
136
137pub(crate) fn build_template(create_table_expr: &CreateTableExpr) -> Result<CreateRequest> {
138    let column_defs = create_table_expr
139        .column_defs
140        .iter()
141        .enumerate()
142        .map(|(i, c)| {
143            let semantic_type = if create_table_expr.time_index == c.name {
144                SemanticType::Timestamp
145            } else if create_table_expr.primary_keys.contains(&c.name) {
146                SemanticType::Tag
147            } else {
148                SemanticType::Field
149            };
150
151            RegionColumnDef {
152                column_def: Some(ColumnDef {
153                    name: c.name.clone(),
154                    data_type: c.data_type,
155                    is_nullable: c.is_nullable,
156                    default_constraint: c.default_constraint.clone(),
157                    semantic_type: semantic_type as i32,
158                    comment: String::new(),
159                    datatype_extension: c.datatype_extension.clone(),
160                    options: c.options.clone(),
161                }),
162                column_id: i as u32,
163            }
164        })
165        .collect::<Vec<_>>();
166
167    let primary_key = create_table_expr
168        .primary_keys
169        .iter()
170        .map(|key| {
171            column_defs
172                .iter()
173                .find_map(|c| {
174                    c.column_def.as_ref().and_then(|x| {
175                        if &x.name == key {
176                            Some(c.column_id)
177                        } else {
178                            None
179                        }
180                    })
181                })
182                .context(error::PrimaryKeyNotFoundSnafu { key })
183        })
184        .collect::<Result<_>>()?;
185
186    let template = CreateRequest {
187        region_id: 0,
188        engine: create_table_expr.engine.clone(),
189        column_defs,
190        primary_key,
191        path: String::new(),
192        options: create_table_expr.table_options.clone(),
193        partition: None,
194        requirements: None,
195    };
196
197    Ok(template)
198}
199
200/// Builder for [PbCreateRegionRequest].
201pub struct CreateRequestBuilder {
202    template: CreateRequest,
203    /// Optional. Only for metric engine.
204    physical_table_id: Option<TableId>,
205    requirements: RegionRequirements,
206}
207
208impl CreateRequestBuilder {
209    pub fn new(template: CreateRequest, physical_table_id: Option<TableId>) -> Self {
210        Self {
211            template,
212            physical_table_id,
213            requirements: RegionRequirements::empty(),
214        }
215    }
216
217    pub fn template(&self) -> &CreateRequest {
218        &self.template
219    }
220
221    pub fn with_requirements(mut self, requirements: RegionRequirements) -> Self {
222        self.requirements = requirements;
223        self
224    }
225
226    pub fn build_one(
227        &self,
228        region_id: RegionId,
229        storage_path: String,
230        region_wal_options: &RegionWalOptions,
231        partition_exprs: &HashMap<RegionNumber, String>,
232    ) -> Result<CreateRequest> {
233        let mut request = self.template.clone();
234
235        request.region_id = region_id.as_u64();
236        request.path = storage_path;
237        request.requirements = Some(self.requirements.into());
238        // Stores the encoded wal options into the request options.
239        serialize_wal_options(&mut request.options, region_id, region_wal_options)
240            .context(SerializeWalOptionsSnafu { region_id })?;
241        request.partition = Some(prepare_partition_expr(region_id, partition_exprs));
242
243        if let Some(physical_table_id) = self.physical_table_id {
244            // Logical table has the same region numbers with physical table, and they have a one-to-one mapping.
245            // For example, region 0 of logical table must resides with region 0 of physical table. So here we can
246            // simply concat the physical table id and the logical region number to get the physical region id.
247            let physical_region_id = RegionId::new(physical_table_id, region_id.region_number());
248
249            request.options.insert(
250                LOGICAL_TABLE_METADATA_KEY.to_string(),
251                physical_region_id.as_u64().to_string(),
252            );
253        }
254
255        Ok(request)
256    }
257}
258
259fn prepare_partition_expr(
260    region_id: RegionId,
261    partition_exprs: &HashMap<RegionNumber, String>,
262) -> Partition {
263    let expr = partition_exprs.get(&region_id.region_number()).cloned();
264    if expr.is_none() {
265        warn!("region {} has no partition expr", region_id);
266    }
267
268    Partition {
269        expression: expr.unwrap_or_default(),
270        ..Default::default()
271    }
272}
273
274#[cfg(test)]
275mod tests {
276    use std::collections::HashMap;
277
278    use store_api::storage::{RegionId, RegionNumber};
279
280    use super::*;
281    use crate::key::test_utils;
282
283    #[test]
284    fn test_build_one_sets_partition_expr_per_region() {
285        // minimal template
286        let template = CreateRequest {
287            region_id: 0,
288            engine: "mito".to_string(),
289            column_defs: vec![],
290            primary_key: vec![],
291            path: String::new(),
292            options: Default::default(),
293            partition: None,
294            requirements: None,
295        };
296        let builder = CreateRequestBuilder::new(template, None);
297
298        let mut partition_exprs: HashMap<RegionNumber, String> = HashMap::new();
299        let expr_a =
300            r#"{"Expr":{"lhs":{"Column":"a"},"op":"Eq","rhs":{"Value":{"UInt32":1}}}}"#.to_string();
301        partition_exprs.insert(0, expr_a.clone());
302
303        let r0 = builder
304            .build_one(
305                RegionId::new(42, 0),
306                "/p".to_string(),
307                &Default::default(),
308                &partition_exprs,
309            )
310            .unwrap();
311        assert_eq!(r0.partition.as_ref().unwrap().expression, expr_a);
312        assert_eq!(
313            r0.requirements.map(RegionRequirements::from),
314            Some(RegionRequirements::empty())
315        );
316    }
317
318    #[test]
319    fn test_build_one_sets_explicit_requirements() {
320        let template = CreateRequest {
321            region_id: 0,
322            engine: "mito".to_string(),
323            column_defs: vec![],
324            primary_key: vec![],
325            path: String::new(),
326            options: Default::default(),
327            partition: None,
328            requirements: None,
329        };
330        let builder = CreateRequestBuilder::new(template, None)
331            .with_requirements(RegionRequirements::object_storage());
332
333        let request = builder
334            .build_one(
335                RegionId::new(42, 0),
336                "/p".to_string(),
337                &Default::default(),
338                &Default::default(),
339            )
340            .unwrap();
341
342        assert_eq!(
343            request.requirements.map(RegionRequirements::from),
344            Some(RegionRequirements::object_storage())
345        );
346    }
347
348    #[test]
349    fn test_build_template_for_physical_table_primary_key_matches_indices() {
350        let mut table_info = test_utils::new_test_table_info(42);
351        table_info.meta.primary_key_indices = vec![0, 2];
352        table_info.meta.column_ids = vec![10, 20, 30];
353
354        let template = build_template_from_raw_table_info_for_physical_table(&table_info).unwrap();
355        assert_eq!(template.primary_key, vec![10, 30]);
356    }
357}