common_grpc_expr/
util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use api::v1::column_data_type_extension::TypeExt;
18use api::v1::column_def::{contains_fulltext, contains_skipping};
19use api::v1::{
20    AddColumn, AddColumns, Column, ColumnDataType, ColumnDataTypeExtension, ColumnDef,
21    ColumnOptions, ColumnSchema, CreateTableExpr, JsonTypeExtension, SemanticType,
22};
23use datatypes::schema::Schema;
24use snafu::{ensure, OptionExt, ResultExt};
25use table::metadata::TableId;
26use table::table_reference::TableReference;
27
28use crate::error::{
29    self, DuplicatedColumnNameSnafu, DuplicatedTimestampColumnSnafu,
30    InvalidStringIndexColumnTypeSnafu, MissingTimestampColumnSnafu, Result,
31    UnknownColumnDataTypeSnafu,
32};
33pub struct ColumnExpr<'a> {
34    pub column_name: &'a str,
35    pub datatype: i32,
36    pub semantic_type: i32,
37    pub datatype_extension: &'a Option<ColumnDataTypeExtension>,
38    pub options: &'a Option<ColumnOptions>,
39}
40
41impl<'a> ColumnExpr<'a> {
42    #[inline]
43    pub fn from_columns(columns: &'a [Column]) -> Vec<Self> {
44        columns.iter().map(Self::from).collect()
45    }
46
47    #[inline]
48    pub fn from_column_schemas(schemas: &'a [ColumnSchema]) -> Vec<Self> {
49        schemas.iter().map(Self::from).collect()
50    }
51}
52
53impl<'a> From<&'a Column> for ColumnExpr<'a> {
54    fn from(column: &'a Column) -> Self {
55        Self {
56            column_name: &column.column_name,
57            datatype: column.datatype,
58            semantic_type: column.semantic_type,
59            datatype_extension: &column.datatype_extension,
60            options: &column.options,
61        }
62    }
63}
64
65impl<'a> From<&'a ColumnSchema> for ColumnExpr<'a> {
66    fn from(schema: &'a ColumnSchema) -> Self {
67        Self {
68            column_name: &schema.column_name,
69            datatype: schema.datatype,
70            semantic_type: schema.semantic_type,
71            datatype_extension: &schema.datatype_extension,
72            options: &schema.options,
73        }
74    }
75}
76
77fn infer_column_datatype(
78    datatype: i32,
79    datatype_extension: &Option<ColumnDataTypeExtension>,
80) -> Result<ColumnDataType> {
81    let column_type =
82        ColumnDataType::try_from(datatype).context(UnknownColumnDataTypeSnafu { datatype })?;
83
84    if matches!(&column_type, ColumnDataType::Binary) {
85        if let Some(ext) = datatype_extension {
86            let type_ext = ext
87                .type_ext
88                .as_ref()
89                .context(error::MissingFieldSnafu { field: "type_ext" })?;
90            if *type_ext == TypeExt::JsonType(JsonTypeExtension::JsonBinary.into()) {
91                return Ok(ColumnDataType::Json);
92            }
93        }
94    }
95
96    Ok(column_type)
97}
98
99pub fn build_create_table_expr(
100    table_id: Option<TableId>,
101    table_name: &TableReference<'_>,
102    column_exprs: Vec<ColumnExpr>,
103    engine: &str,
104    desc: &str,
105) -> Result<CreateTableExpr> {
106    // Check for duplicate names. If found, raise an error.
107    //
108    // The introduction of hashset incurs additional memory overhead
109    // but achieves a time complexity of O(1).
110    //
111    // The separate iteration over `column_exprs` is because the CPU prefers
112    // smaller loops, and avoid cloning String.
113    let mut distinct_names = HashSet::with_capacity(column_exprs.len());
114    for ColumnExpr { column_name, .. } in &column_exprs {
115        ensure!(
116            distinct_names.insert(*column_name),
117            DuplicatedColumnNameSnafu { name: *column_name }
118        );
119    }
120
121    let mut column_defs = Vec::with_capacity(column_exprs.len());
122    let mut primary_keys = Vec::with_capacity(column_exprs.len());
123    let mut time_index = None;
124
125    for expr in column_exprs {
126        let ColumnExpr {
127            column_name,
128            datatype,
129            semantic_type,
130            datatype_extension,
131            options,
132        } = expr;
133
134        let mut is_nullable = true;
135        match semantic_type {
136            v if v == SemanticType::Tag as i32 => primary_keys.push(column_name.to_owned()),
137            v if v == SemanticType::Timestamp as i32 => {
138                ensure!(
139                    time_index.is_none(),
140                    DuplicatedTimestampColumnSnafu {
141                        exists: time_index.as_ref().unwrap(),
142                        duplicated: column_name,
143                    }
144                );
145                time_index = Some(column_name.to_owned());
146                // Timestamp column must not be null.
147                is_nullable = false;
148            }
149            _ => {}
150        }
151
152        let column_type = infer_column_datatype(datatype, datatype_extension)?;
153
154        ensure!(
155            (!contains_fulltext(options) && !contains_skipping(options))
156                || column_type == ColumnDataType::String,
157            InvalidStringIndexColumnTypeSnafu {
158                column_name,
159                column_type,
160            }
161        );
162
163        column_defs.push(ColumnDef {
164            name: column_name.to_owned(),
165            data_type: datatype,
166            is_nullable,
167            default_constraint: vec![],
168            semantic_type,
169            comment: String::new(),
170            datatype_extension: *datatype_extension,
171            options: options.clone(),
172        });
173    }
174
175    let time_index = time_index.context(MissingTimestampColumnSnafu {
176        msg: format!("table is {}", table_name.table),
177    })?;
178
179    Ok(CreateTableExpr {
180        catalog_name: table_name.catalog.to_string(),
181        schema_name: table_name.schema.to_string(),
182        table_name: table_name.table.to_string(),
183        desc: desc.to_string(),
184        column_defs,
185        time_index,
186        primary_keys,
187        create_if_not_exists: true,
188        table_options: Default::default(),
189        table_id: table_id.map(|id| api::v1::TableId { id }),
190        engine: engine.to_string(),
191    })
192}
193
194/// Find columns that are not present in the schema and return them as `AddColumns`
195/// for adding columns automatically.
196/// It always sets `add_if_not_exists` to `true` for now.
197pub fn extract_new_columns(
198    schema: &Schema,
199    column_exprs: Vec<ColumnExpr>,
200) -> Result<Option<AddColumns>> {
201    let columns_to_add = column_exprs
202        .into_iter()
203        .filter(|expr| schema.column_schema_by_name(expr.column_name).is_none())
204        .map(|expr| {
205            let column_def = Some(ColumnDef {
206                name: expr.column_name.to_string(),
207                data_type: expr.datatype,
208                is_nullable: true,
209                default_constraint: vec![],
210                semantic_type: expr.semantic_type,
211                comment: String::new(),
212                datatype_extension: *expr.datatype_extension,
213                options: expr.options.clone(),
214            });
215            AddColumn {
216                column_def,
217                location: None,
218                add_if_not_exists: true,
219            }
220        })
221        .collect::<Vec<_>>();
222
223    if columns_to_add.is_empty() {
224        Ok(None)
225    } else {
226        let mut distinct_names = HashSet::with_capacity(columns_to_add.len());
227        for add_column in &columns_to_add {
228            let name = add_column.column_def.as_ref().unwrap().name.as_str();
229            ensure!(
230                distinct_names.insert(name),
231                DuplicatedColumnNameSnafu { name }
232            );
233        }
234
235        Ok(Some(AddColumns {
236            add_columns: columns_to_add,
237        }))
238    }
239}
240#[cfg(test)]
241mod tests {
242    use std::sync::Arc;
243    use std::{assert_eq, vec};
244
245    use api::helper::ColumnDataTypeWrapper;
246    use api::v1::column::Values;
247    use api::v1::column_data_type_extension::TypeExt;
248    use api::v1::{
249        Column, ColumnDataType, ColumnDataTypeExtension, Decimal128, DecimalTypeExtension,
250        IntervalMonthDayNano, SemanticType,
251    };
252    use common_catalog::consts::MITO_ENGINE;
253    use common_time::interval::IntervalUnit;
254    use common_time::timestamp::TimeUnit;
255    use datatypes::data_type::ConcreteDataType;
256    use datatypes::schema::{ColumnSchema, SchemaBuilder};
257    use snafu::ResultExt;
258
259    use super::*;
260    use crate::error;
261    use crate::error::ColumnDataTypeSnafu;
262
263    #[inline]
264    fn build_column_schema(
265        column_name: &str,
266        datatype: i32,
267        nullable: bool,
268    ) -> error::Result<ColumnSchema> {
269        let datatype_wrapper =
270            ColumnDataTypeWrapper::try_new(datatype, None).context(ColumnDataTypeSnafu)?;
271
272        Ok(ColumnSchema::new(
273            column_name,
274            datatype_wrapper.into(),
275            nullable,
276        ))
277    }
278
279    fn build_create_expr_from_insertion(
280        catalog_name: &str,
281        schema_name: &str,
282        table_id: Option<TableId>,
283        table_name: &str,
284        columns: &[Column],
285        engine: &str,
286    ) -> Result<CreateTableExpr> {
287        let table_name = TableReference::full(catalog_name, schema_name, table_name);
288        let column_exprs = ColumnExpr::from_columns(columns);
289        build_create_table_expr(
290            table_id,
291            &table_name,
292            column_exprs,
293            engine,
294            "Created on insertion",
295        )
296    }
297
298    #[test]
299    fn test_build_create_table_request() {
300        let table_id = Some(10);
301        let table_name = "test_metric";
302
303        assert!(
304            build_create_expr_from_insertion("", "", table_id, table_name, &[], MITO_ENGINE)
305                .is_err()
306        );
307
308        let insert_batch = mock_insert_batch();
309
310        let create_expr = build_create_expr_from_insertion(
311            "",
312            "",
313            table_id,
314            table_name,
315            &insert_batch.0,
316            MITO_ENGINE,
317        )
318        .unwrap();
319
320        assert_eq!(table_id, create_expr.table_id.map(|x| x.id));
321        assert_eq!(table_name, create_expr.table_name);
322        assert_eq!("Created on insertion".to_string(), create_expr.desc);
323        assert_eq!(
324            vec![create_expr.column_defs[0].name.clone()],
325            create_expr.primary_keys
326        );
327
328        let column_defs = create_expr.column_defs;
329        assert_eq!(column_defs[5].name, create_expr.time_index);
330        assert_eq!(7, column_defs.len());
331
332        assert_eq!(
333            ConcreteDataType::string_datatype(),
334            ConcreteDataType::from(
335                ColumnDataTypeWrapper::try_new(
336                    column_defs
337                        .iter()
338                        .find(|c| c.name == "host")
339                        .unwrap()
340                        .data_type,
341                    None
342                )
343                .unwrap()
344            )
345        );
346
347        assert_eq!(
348            ConcreteDataType::float64_datatype(),
349            ConcreteDataType::from(
350                ColumnDataTypeWrapper::try_new(
351                    column_defs
352                        .iter()
353                        .find(|c| c.name == "cpu")
354                        .unwrap()
355                        .data_type,
356                    None
357                )
358                .unwrap()
359            )
360        );
361
362        assert_eq!(
363            ConcreteDataType::float64_datatype(),
364            ConcreteDataType::from(
365                ColumnDataTypeWrapper::try_new(
366                    column_defs
367                        .iter()
368                        .find(|c| c.name == "memory")
369                        .unwrap()
370                        .data_type,
371                    None
372                )
373                .unwrap()
374            )
375        );
376
377        assert_eq!(
378            ConcreteDataType::time_datatype(TimeUnit::Millisecond),
379            ConcreteDataType::from(
380                ColumnDataTypeWrapper::try_new(
381                    column_defs
382                        .iter()
383                        .find(|c| c.name == "time")
384                        .unwrap()
385                        .data_type,
386                    None
387                )
388                .unwrap()
389            )
390        );
391
392        assert_eq!(
393            ConcreteDataType::interval_datatype(IntervalUnit::MonthDayNano),
394            ConcreteDataType::from(
395                ColumnDataTypeWrapper::try_new(
396                    column_defs
397                        .iter()
398                        .find(|c| c.name == "interval")
399                        .unwrap()
400                        .data_type,
401                    None
402                )
403                .unwrap()
404            )
405        );
406
407        assert_eq!(
408            ConcreteDataType::timestamp_millisecond_datatype(),
409            ConcreteDataType::from(
410                ColumnDataTypeWrapper::try_new(
411                    column_defs
412                        .iter()
413                        .find(|c| c.name == "ts")
414                        .unwrap()
415                        .data_type,
416                    None
417                )
418                .unwrap()
419            )
420        );
421
422        let decimal_column = column_defs.iter().find(|c| c.name == "decimals").unwrap();
423        assert_eq!(
424            ConcreteDataType::decimal128_datatype(38, 10),
425            ConcreteDataType::from(
426                ColumnDataTypeWrapper::try_new(
427                    decimal_column.data_type,
428                    decimal_column.datatype_extension,
429                )
430                .unwrap()
431            )
432        );
433    }
434
435    #[test]
436    fn test_find_new_columns() {
437        let mut columns = Vec::with_capacity(1);
438        let cpu_column = build_column_schema("cpu", 10, true).unwrap();
439        let ts_column = build_column_schema("ts", 15, false)
440            .unwrap()
441            .with_time_index(true);
442        columns.push(cpu_column);
443        columns.push(ts_column);
444
445        let schema = Arc::new(SchemaBuilder::try_from(columns).unwrap().build().unwrap());
446
447        assert!(extract_new_columns(&schema, ColumnExpr::from_columns(&[]))
448            .unwrap()
449            .is_none());
450
451        let insert_batch = mock_insert_batch();
452
453        let add_columns = extract_new_columns(&schema, ColumnExpr::from_columns(&insert_batch.0))
454            .unwrap()
455            .unwrap();
456
457        assert_eq!(5, add_columns.add_columns.len());
458        let host_column = &add_columns.add_columns[0];
459        assert_eq!(
460            ConcreteDataType::string_datatype(),
461            ConcreteDataType::from(
462                ColumnDataTypeWrapper::try_new(
463                    host_column.column_def.as_ref().unwrap().data_type,
464                    None
465                )
466                .unwrap()
467            )
468        );
469        assert!(host_column.add_if_not_exists);
470
471        let memory_column = &add_columns.add_columns[1];
472        assert_eq!(
473            ConcreteDataType::float64_datatype(),
474            ConcreteDataType::from(
475                ColumnDataTypeWrapper::try_new(
476                    memory_column.column_def.as_ref().unwrap().data_type,
477                    None
478                )
479                .unwrap()
480            )
481        );
482        assert!(host_column.add_if_not_exists);
483
484        let time_column = &add_columns.add_columns[2];
485        assert_eq!(
486            ConcreteDataType::time_datatype(TimeUnit::Millisecond),
487            ConcreteDataType::from(
488                ColumnDataTypeWrapper::try_new(
489                    time_column.column_def.as_ref().unwrap().data_type,
490                    None
491                )
492                .unwrap()
493            )
494        );
495        assert!(host_column.add_if_not_exists);
496
497        let interval_column = &add_columns.add_columns[3];
498        assert_eq!(
499            ConcreteDataType::interval_datatype(IntervalUnit::MonthDayNano),
500            ConcreteDataType::from(
501                ColumnDataTypeWrapper::try_new(
502                    interval_column.column_def.as_ref().unwrap().data_type,
503                    None
504                )
505                .unwrap()
506            )
507        );
508        assert!(host_column.add_if_not_exists);
509
510        let decimal_column = &add_columns.add_columns[4];
511        assert_eq!(
512            ConcreteDataType::decimal128_datatype(38, 10),
513            ConcreteDataType::from(
514                ColumnDataTypeWrapper::try_new(
515                    decimal_column.column_def.as_ref().unwrap().data_type,
516                    decimal_column
517                        .column_def
518                        .as_ref()
519                        .unwrap()
520                        .datatype_extension
521                )
522                .unwrap()
523            )
524        );
525        assert!(host_column.add_if_not_exists);
526    }
527
528    fn mock_insert_batch() -> (Vec<Column>, u32) {
529        let row_count = 2;
530
531        let host_vals = Values {
532            string_values: vec!["host1".to_string(), "host2".to_string()],
533            ..Default::default()
534        };
535        let host_column = Column {
536            column_name: "host".to_string(),
537            semantic_type: SemanticType::Tag as i32,
538            values: Some(host_vals),
539            null_mask: vec![0],
540            datatype: ColumnDataType::String as i32,
541            ..Default::default()
542        };
543
544        let cpu_vals = Values {
545            f64_values: vec![0.31],
546            ..Default::default()
547        };
548        let cpu_column = Column {
549            column_name: "cpu".to_string(),
550            semantic_type: SemanticType::Field as i32,
551            values: Some(cpu_vals),
552            null_mask: vec![2],
553            datatype: ColumnDataType::Float64 as i32,
554            ..Default::default()
555        };
556
557        let mem_vals = Values {
558            f64_values: vec![0.1],
559            ..Default::default()
560        };
561        let mem_column = Column {
562            column_name: "memory".to_string(),
563            semantic_type: SemanticType::Field as i32,
564            values: Some(mem_vals),
565            null_mask: vec![1],
566            datatype: ColumnDataType::Float64 as i32,
567            ..Default::default()
568        };
569
570        let time_vals = Values {
571            time_millisecond_values: vec![100, 101],
572            ..Default::default()
573        };
574        let time_column = Column {
575            column_name: "time".to_string(),
576            semantic_type: SemanticType::Field as i32,
577            values: Some(time_vals),
578            null_mask: vec![0],
579            datatype: ColumnDataType::TimeMillisecond as i32,
580            ..Default::default()
581        };
582
583        let interval1 = IntervalMonthDayNano {
584            months: 1,
585            days: 2,
586            nanoseconds: 3,
587        };
588        let interval2 = IntervalMonthDayNano {
589            months: 4,
590            days: 5,
591            nanoseconds: 6,
592        };
593        let interval_vals = Values {
594            interval_month_day_nano_values: vec![interval1, interval2],
595            ..Default::default()
596        };
597        let interval_column = Column {
598            column_name: "interval".to_string(),
599            semantic_type: SemanticType::Field as i32,
600            values: Some(interval_vals),
601            null_mask: vec![0],
602            datatype: ColumnDataType::IntervalMonthDayNano as i32,
603            ..Default::default()
604        };
605
606        let ts_vals = Values {
607            timestamp_millisecond_values: vec![100, 101],
608            ..Default::default()
609        };
610        let ts_column = Column {
611            column_name: "ts".to_string(),
612            semantic_type: SemanticType::Timestamp as i32,
613            values: Some(ts_vals),
614            null_mask: vec![0],
615            datatype: ColumnDataType::TimestampMillisecond as i32,
616            ..Default::default()
617        };
618        let decimal_vals = Values {
619            decimal128_values: vec![Decimal128 { hi: 0, lo: 123 }, Decimal128 { hi: 0, lo: 456 }],
620            ..Default::default()
621        };
622        let decimal_column = Column {
623            column_name: "decimals".to_string(),
624            semantic_type: SemanticType::Field as i32,
625            values: Some(decimal_vals),
626            null_mask: vec![0],
627            datatype: ColumnDataType::Decimal128 as i32,
628            datatype_extension: Some(ColumnDataTypeExtension {
629                type_ext: Some(TypeExt::DecimalType(DecimalTypeExtension {
630                    precision: 38,
631                    scale: 10,
632                })),
633            }),
634            options: None,
635        };
636
637        (
638            vec![
639                host_column,
640                cpu_column,
641                mem_column,
642                time_column,
643                interval_column,
644                ts_column,
645                decimal_column,
646            ],
647            row_count,
648        )
649    }
650}