common_grpc_expr/
util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use api::v1::column_data_type_extension::TypeExt;
18use api::v1::column_def::{contains_fulltext, contains_skipping};
19use api::v1::{
20    AddColumn, AddColumns, Column, ColumnDataType, ColumnDataTypeExtension, ColumnDef,
21    ColumnOptions, ColumnSchema, CreateTableExpr, JsonTypeExtension, SemanticType,
22};
23use datatypes::schema::Schema;
24use snafu::{OptionExt, ResultExt, ensure};
25use table::metadata::TableId;
26use table::table_reference::TableReference;
27
28use crate::error::{
29    self, DuplicatedColumnNameSnafu, DuplicatedTimestampColumnSnafu,
30    InvalidStringIndexColumnTypeSnafu, MissingTimestampColumnSnafu, Result,
31    UnknownColumnDataTypeSnafu,
32};
33pub struct ColumnExpr<'a> {
34    pub column_name: &'a str,
35    pub datatype: i32,
36    pub semantic_type: i32,
37    pub datatype_extension: &'a Option<ColumnDataTypeExtension>,
38    pub options: &'a Option<ColumnOptions>,
39}
40
41impl<'a> ColumnExpr<'a> {
42    #[inline]
43    pub fn from_columns(columns: &'a [Column]) -> Vec<Self> {
44        columns.iter().map(Self::from).collect()
45    }
46
47    #[inline]
48    pub fn from_column_schemas(schemas: &'a [ColumnSchema]) -> Vec<Self> {
49        schemas.iter().map(Self::from).collect()
50    }
51}
52
53impl<'a> From<&'a Column> for ColumnExpr<'a> {
54    fn from(column: &'a Column) -> Self {
55        Self {
56            column_name: &column.column_name,
57            datatype: column.datatype,
58            semantic_type: column.semantic_type,
59            datatype_extension: &column.datatype_extension,
60            options: &column.options,
61        }
62    }
63}
64
65impl<'a> From<&'a ColumnSchema> for ColumnExpr<'a> {
66    fn from(schema: &'a ColumnSchema) -> Self {
67        Self {
68            column_name: &schema.column_name,
69            datatype: schema.datatype,
70            semantic_type: schema.semantic_type,
71            datatype_extension: &schema.datatype_extension,
72            options: &schema.options,
73        }
74    }
75}
76
77fn infer_column_datatype(
78    datatype: i32,
79    datatype_extension: &Option<ColumnDataTypeExtension>,
80) -> Result<ColumnDataType> {
81    let column_type =
82        ColumnDataType::try_from(datatype).context(UnknownColumnDataTypeSnafu { datatype })?;
83
84    if matches!(&column_type, ColumnDataType::Binary)
85        && let Some(ext) = datatype_extension
86    {
87        let type_ext = ext
88            .type_ext
89            .as_ref()
90            .context(error::MissingFieldSnafu { field: "type_ext" })?;
91        if *type_ext == TypeExt::JsonType(JsonTypeExtension::JsonBinary.into()) {
92            return Ok(ColumnDataType::Json);
93        }
94    }
95
96    Ok(column_type)
97}
98
99pub fn build_create_table_expr(
100    table_id: Option<TableId>,
101    table_name: &TableReference<'_>,
102    column_exprs: Vec<ColumnExpr>,
103    engine: &str,
104    desc: &str,
105) -> Result<CreateTableExpr> {
106    // Check for duplicate names. If found, raise an error.
107    //
108    // The introduction of hashset incurs additional memory overhead
109    // but achieves a time complexity of O(1).
110    //
111    // The separate iteration over `column_exprs` is because the CPU prefers
112    // smaller loops, and avoid cloning String.
113    let mut distinct_names = HashSet::with_capacity(column_exprs.len());
114    for ColumnExpr { column_name, .. } in &column_exprs {
115        ensure!(
116            distinct_names.insert(*column_name),
117            DuplicatedColumnNameSnafu { name: *column_name }
118        );
119    }
120
121    let mut column_defs = Vec::with_capacity(column_exprs.len());
122    let mut primary_keys = Vec::with_capacity(column_exprs.len());
123    let mut time_index = None;
124
125    for expr in column_exprs {
126        let ColumnExpr {
127            column_name,
128            datatype,
129            semantic_type,
130            datatype_extension,
131            options,
132        } = expr;
133
134        let mut is_nullable = true;
135        match semantic_type {
136            v if v == SemanticType::Tag as i32 => primary_keys.push(column_name.to_owned()),
137            v if v == SemanticType::Timestamp as i32 => {
138                ensure!(
139                    time_index.is_none(),
140                    DuplicatedTimestampColumnSnafu {
141                        exists: time_index.as_ref().unwrap(),
142                        duplicated: column_name,
143                    }
144                );
145                time_index = Some(column_name.to_owned());
146                // Timestamp column must not be null.
147                is_nullable = false;
148            }
149            _ => {}
150        }
151
152        let column_type = infer_column_datatype(datatype, datatype_extension)?;
153
154        ensure!(
155            (!contains_fulltext(options) && !contains_skipping(options))
156                || column_type == ColumnDataType::String,
157            InvalidStringIndexColumnTypeSnafu {
158                column_name,
159                column_type,
160            }
161        );
162
163        column_defs.push(ColumnDef {
164            name: column_name.to_owned(),
165            data_type: datatype,
166            is_nullable,
167            default_constraint: vec![],
168            semantic_type,
169            comment: String::new(),
170            datatype_extension: *datatype_extension,
171            options: options.clone(),
172        });
173    }
174
175    let time_index = time_index.context(MissingTimestampColumnSnafu {
176        msg: format!("table is {}", table_name.table),
177    })?;
178
179    Ok(CreateTableExpr {
180        catalog_name: table_name.catalog.to_string(),
181        schema_name: table_name.schema.to_string(),
182        table_name: table_name.table.to_string(),
183        desc: desc.to_string(),
184        column_defs,
185        time_index,
186        primary_keys,
187        create_if_not_exists: true,
188        table_options: Default::default(),
189        table_id: table_id.map(|id| api::v1::TableId { id }),
190        engine: engine.to_string(),
191    })
192}
193
194/// Find columns that are not present in the schema and return them as `AddColumns`
195/// for adding columns automatically.
196/// It always sets `add_if_not_exists` to `true` for now.
197pub fn extract_new_columns(
198    schema: &Schema,
199    column_exprs: Vec<ColumnExpr>,
200) -> Result<Option<AddColumns>> {
201    let columns_to_add = column_exprs
202        .into_iter()
203        .filter(|expr| schema.column_schema_by_name(expr.column_name).is_none())
204        .map(|expr| {
205            let column_def = Some(ColumnDef {
206                name: expr.column_name.to_string(),
207                data_type: expr.datatype,
208                is_nullable: true,
209                default_constraint: vec![],
210                semantic_type: expr.semantic_type,
211                comment: String::new(),
212                datatype_extension: *expr.datatype_extension,
213                options: expr.options.clone(),
214            });
215            AddColumn {
216                column_def,
217                location: None,
218                add_if_not_exists: true,
219            }
220        })
221        .collect::<Vec<_>>();
222
223    if columns_to_add.is_empty() {
224        Ok(None)
225    } else {
226        let mut distinct_names = HashSet::with_capacity(columns_to_add.len());
227        for add_column in &columns_to_add {
228            let name = add_column.column_def.as_ref().unwrap().name.as_str();
229            ensure!(
230                distinct_names.insert(name),
231                DuplicatedColumnNameSnafu { name }
232            );
233        }
234
235        Ok(Some(AddColumns {
236            add_columns: columns_to_add,
237        }))
238    }
239}
240#[cfg(test)]
241mod tests {
242    use std::sync::Arc;
243    use std::{assert_eq, vec};
244
245    use api::helper::ColumnDataTypeWrapper;
246    use api::v1::column::Values;
247    use api::v1::column_data_type_extension::TypeExt;
248    use api::v1::{
249        Column, ColumnDataType, ColumnDataTypeExtension, Decimal128, DecimalTypeExtension,
250        IntervalMonthDayNano, SemanticType,
251    };
252    use common_catalog::consts::MITO_ENGINE;
253    use common_time::interval::IntervalUnit;
254    use common_time::timestamp::TimeUnit;
255    use datatypes::data_type::ConcreteDataType;
256    use datatypes::schema::{ColumnSchema, SchemaBuilder};
257    use snafu::ResultExt;
258
259    use super::*;
260    use crate::error;
261    use crate::error::ColumnDataTypeSnafu;
262
263    #[inline]
264    fn build_column_schema(
265        column_name: &str,
266        datatype: i32,
267        nullable: bool,
268    ) -> error::Result<ColumnSchema> {
269        let datatype_wrapper =
270            ColumnDataTypeWrapper::try_new(datatype, None).context(ColumnDataTypeSnafu)?;
271
272        Ok(ColumnSchema::new(
273            column_name,
274            datatype_wrapper.into(),
275            nullable,
276        ))
277    }
278
279    fn build_create_expr_from_insertion(
280        catalog_name: &str,
281        schema_name: &str,
282        table_id: Option<TableId>,
283        table_name: &str,
284        columns: &[Column],
285        engine: &str,
286    ) -> Result<CreateTableExpr> {
287        let table_name = TableReference::full(catalog_name, schema_name, table_name);
288        let column_exprs = ColumnExpr::from_columns(columns);
289        build_create_table_expr(
290            table_id,
291            &table_name,
292            column_exprs,
293            engine,
294            "Created on insertion",
295        )
296    }
297
298    #[test]
299    fn test_build_create_table_request() {
300        let table_id = Some(10);
301        let table_name = "test_metric";
302
303        assert!(
304            build_create_expr_from_insertion("", "", table_id, table_name, &[], MITO_ENGINE)
305                .is_err()
306        );
307
308        let insert_batch = mock_insert_batch();
309
310        let create_expr = build_create_expr_from_insertion(
311            "",
312            "",
313            table_id,
314            table_name,
315            &insert_batch.0,
316            MITO_ENGINE,
317        )
318        .unwrap();
319
320        assert_eq!(table_id, create_expr.table_id.map(|x| x.id));
321        assert_eq!(table_name, create_expr.table_name);
322        assert_eq!("Created on insertion".to_string(), create_expr.desc);
323        assert_eq!(
324            vec![create_expr.column_defs[0].name.clone()],
325            create_expr.primary_keys
326        );
327
328        let column_defs = create_expr.column_defs;
329        assert_eq!(column_defs[5].name, create_expr.time_index);
330        assert_eq!(7, column_defs.len());
331
332        assert_eq!(
333            ConcreteDataType::string_datatype(),
334            ConcreteDataType::from(
335                ColumnDataTypeWrapper::try_new(
336                    column_defs
337                        .iter()
338                        .find(|c| c.name == "host")
339                        .unwrap()
340                        .data_type,
341                    None
342                )
343                .unwrap()
344            )
345        );
346
347        assert_eq!(
348            ConcreteDataType::float64_datatype(),
349            ConcreteDataType::from(
350                ColumnDataTypeWrapper::try_new(
351                    column_defs
352                        .iter()
353                        .find(|c| c.name == "cpu")
354                        .unwrap()
355                        .data_type,
356                    None
357                )
358                .unwrap()
359            )
360        );
361
362        assert_eq!(
363            ConcreteDataType::float64_datatype(),
364            ConcreteDataType::from(
365                ColumnDataTypeWrapper::try_new(
366                    column_defs
367                        .iter()
368                        .find(|c| c.name == "memory")
369                        .unwrap()
370                        .data_type,
371                    None
372                )
373                .unwrap()
374            )
375        );
376
377        assert_eq!(
378            ConcreteDataType::time_datatype(TimeUnit::Millisecond),
379            ConcreteDataType::from(
380                ColumnDataTypeWrapper::try_new(
381                    column_defs
382                        .iter()
383                        .find(|c| c.name == "time")
384                        .unwrap()
385                        .data_type,
386                    None
387                )
388                .unwrap()
389            )
390        );
391
392        assert_eq!(
393            ConcreteDataType::interval_datatype(IntervalUnit::MonthDayNano),
394            ConcreteDataType::from(
395                ColumnDataTypeWrapper::try_new(
396                    column_defs
397                        .iter()
398                        .find(|c| c.name == "interval")
399                        .unwrap()
400                        .data_type,
401                    None
402                )
403                .unwrap()
404            )
405        );
406
407        assert_eq!(
408            ConcreteDataType::timestamp_millisecond_datatype(),
409            ConcreteDataType::from(
410                ColumnDataTypeWrapper::try_new(
411                    column_defs
412                        .iter()
413                        .find(|c| c.name == "ts")
414                        .unwrap()
415                        .data_type,
416                    None
417                )
418                .unwrap()
419            )
420        );
421
422        let decimal_column = column_defs.iter().find(|c| c.name == "decimals").unwrap();
423        assert_eq!(
424            ConcreteDataType::decimal128_datatype(38, 10),
425            ConcreteDataType::from(
426                ColumnDataTypeWrapper::try_new(
427                    decimal_column.data_type,
428                    decimal_column.datatype_extension,
429                )
430                .unwrap()
431            )
432        );
433    }
434
435    #[test]
436    fn test_find_new_columns() {
437        let mut columns = Vec::with_capacity(1);
438        let cpu_column = build_column_schema("cpu", 10, true).unwrap();
439        let ts_column = build_column_schema("ts", 15, false)
440            .unwrap()
441            .with_time_index(true);
442        columns.push(cpu_column);
443        columns.push(ts_column);
444
445        let schema = Arc::new(SchemaBuilder::try_from(columns).unwrap().build().unwrap());
446
447        assert!(
448            extract_new_columns(&schema, ColumnExpr::from_columns(&[]))
449                .unwrap()
450                .is_none()
451        );
452
453        let insert_batch = mock_insert_batch();
454
455        let add_columns = extract_new_columns(&schema, ColumnExpr::from_columns(&insert_batch.0))
456            .unwrap()
457            .unwrap();
458
459        assert_eq!(5, add_columns.add_columns.len());
460        let host_column = &add_columns.add_columns[0];
461        assert_eq!(
462            ConcreteDataType::string_datatype(),
463            ConcreteDataType::from(
464                ColumnDataTypeWrapper::try_new(
465                    host_column.column_def.as_ref().unwrap().data_type,
466                    None
467                )
468                .unwrap()
469            )
470        );
471        assert!(host_column.add_if_not_exists);
472
473        let memory_column = &add_columns.add_columns[1];
474        assert_eq!(
475            ConcreteDataType::float64_datatype(),
476            ConcreteDataType::from(
477                ColumnDataTypeWrapper::try_new(
478                    memory_column.column_def.as_ref().unwrap().data_type,
479                    None
480                )
481                .unwrap()
482            )
483        );
484        assert!(host_column.add_if_not_exists);
485
486        let time_column = &add_columns.add_columns[2];
487        assert_eq!(
488            ConcreteDataType::time_datatype(TimeUnit::Millisecond),
489            ConcreteDataType::from(
490                ColumnDataTypeWrapper::try_new(
491                    time_column.column_def.as_ref().unwrap().data_type,
492                    None
493                )
494                .unwrap()
495            )
496        );
497        assert!(host_column.add_if_not_exists);
498
499        let interval_column = &add_columns.add_columns[3];
500        assert_eq!(
501            ConcreteDataType::interval_datatype(IntervalUnit::MonthDayNano),
502            ConcreteDataType::from(
503                ColumnDataTypeWrapper::try_new(
504                    interval_column.column_def.as_ref().unwrap().data_type,
505                    None
506                )
507                .unwrap()
508            )
509        );
510        assert!(host_column.add_if_not_exists);
511
512        let decimal_column = &add_columns.add_columns[4];
513        assert_eq!(
514            ConcreteDataType::decimal128_datatype(38, 10),
515            ConcreteDataType::from(
516                ColumnDataTypeWrapper::try_new(
517                    decimal_column.column_def.as_ref().unwrap().data_type,
518                    decimal_column
519                        .column_def
520                        .as_ref()
521                        .unwrap()
522                        .datatype_extension
523                )
524                .unwrap()
525            )
526        );
527        assert!(host_column.add_if_not_exists);
528    }
529
530    fn mock_insert_batch() -> (Vec<Column>, u32) {
531        let row_count = 2;
532
533        let host_vals = Values {
534            string_values: vec!["host1".to_string(), "host2".to_string()],
535            ..Default::default()
536        };
537        let host_column = Column {
538            column_name: "host".to_string(),
539            semantic_type: SemanticType::Tag as i32,
540            values: Some(host_vals),
541            null_mask: vec![0],
542            datatype: ColumnDataType::String as i32,
543            ..Default::default()
544        };
545
546        let cpu_vals = Values {
547            f64_values: vec![0.31],
548            ..Default::default()
549        };
550        let cpu_column = Column {
551            column_name: "cpu".to_string(),
552            semantic_type: SemanticType::Field as i32,
553            values: Some(cpu_vals),
554            null_mask: vec![2],
555            datatype: ColumnDataType::Float64 as i32,
556            ..Default::default()
557        };
558
559        let mem_vals = Values {
560            f64_values: vec![0.1],
561            ..Default::default()
562        };
563        let mem_column = Column {
564            column_name: "memory".to_string(),
565            semantic_type: SemanticType::Field as i32,
566            values: Some(mem_vals),
567            null_mask: vec![1],
568            datatype: ColumnDataType::Float64 as i32,
569            ..Default::default()
570        };
571
572        let time_vals = Values {
573            time_millisecond_values: vec![100, 101],
574            ..Default::default()
575        };
576        let time_column = Column {
577            column_name: "time".to_string(),
578            semantic_type: SemanticType::Field as i32,
579            values: Some(time_vals),
580            null_mask: vec![0],
581            datatype: ColumnDataType::TimeMillisecond as i32,
582            ..Default::default()
583        };
584
585        let interval1 = IntervalMonthDayNano {
586            months: 1,
587            days: 2,
588            nanoseconds: 3,
589        };
590        let interval2 = IntervalMonthDayNano {
591            months: 4,
592            days: 5,
593            nanoseconds: 6,
594        };
595        let interval_vals = Values {
596            interval_month_day_nano_values: vec![interval1, interval2],
597            ..Default::default()
598        };
599        let interval_column = Column {
600            column_name: "interval".to_string(),
601            semantic_type: SemanticType::Field as i32,
602            values: Some(interval_vals),
603            null_mask: vec![0],
604            datatype: ColumnDataType::IntervalMonthDayNano as i32,
605            ..Default::default()
606        };
607
608        let ts_vals = Values {
609            timestamp_millisecond_values: vec![100, 101],
610            ..Default::default()
611        };
612        let ts_column = Column {
613            column_name: "ts".to_string(),
614            semantic_type: SemanticType::Timestamp as i32,
615            values: Some(ts_vals),
616            null_mask: vec![0],
617            datatype: ColumnDataType::TimestampMillisecond as i32,
618            ..Default::default()
619        };
620        let decimal_vals = Values {
621            decimal128_values: vec![Decimal128 { hi: 0, lo: 123 }, Decimal128 { hi: 0, lo: 456 }],
622            ..Default::default()
623        };
624        let decimal_column = Column {
625            column_name: "decimals".to_string(),
626            semantic_type: SemanticType::Field as i32,
627            values: Some(decimal_vals),
628            null_mask: vec![0],
629            datatype: ColumnDataType::Decimal128 as i32,
630            datatype_extension: Some(ColumnDataTypeExtension {
631                type_ext: Some(TypeExt::DecimalType(DecimalTypeExtension {
632                    precision: 38,
633                    scale: 10,
634                })),
635            }),
636            options: None,
637        };
638
639        (
640            vec![
641                host_column,
642                cpu_column,
643                mem_column,
644                time_column,
645                interval_column,
646                ts_column,
647                decimal_column,
648            ],
649            row_count,
650        )
651    }
652}