operator/
expr_helper.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19
20use api::helper::ColumnDataTypeWrapper;
21use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
22use api::v1::alter_table_expr::Kind as AlterTableKind;
23use api::v1::column_def::options_from_column_schema;
24use api::v1::{
25    set_index, unset_index, AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer,
26    ColumnDataType, ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
27    DropColumn, DropColumns, DropDefaults, ExpireAfter, FulltextBackend as PbFulltextBackend,
28    ModifyColumnType, ModifyColumnTypes, RenameTable, SemanticType, SetDatabaseOptions,
29    SetFulltext, SetIndex, SetInverted, SetSkipping, SetTableOptions,
30    SkippingIndexType as PbSkippingIndexType, TableName, UnsetDatabaseOptions, UnsetFulltext,
31    UnsetIndex, UnsetInverted, UnsetSkipping, UnsetTableOptions,
32};
33use common_error::ext::BoxedError;
34use common_grpc_expr::util::ColumnExpr;
35use common_time::Timezone;
36use datafusion::sql::planner::object_name_to_table_reference;
37use datatypes::schema::{
38    ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SkippingIndexType, COMMENT_KEY,
39};
40use file_engine::FileOptions;
41use query::sql::{
42    check_file_to_table_schema_compatibility, file_column_schemas_to_table,
43    infer_file_table_schema, prepare_file_table_files,
44};
45use session::context::QueryContextRef;
46use session::table_name::table_idents_to_full_name;
47use snafu::{ensure, OptionExt, ResultExt};
48use sql::ast::{ColumnOption, ObjectName};
49use sql::statements::alter::{
50    AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
51};
52use sql::statements::create::{
53    Column as SqlColumn, CreateExternalTable, CreateFlow, CreateTable, CreateView, TableConstraint,
54};
55use sql::statements::{
56    column_to_schema, sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type,
57};
58use sql::util::extract_tables_from_query;
59use table::requests::{TableOptions, FILE_TABLE_META_KEY};
60use table::table_reference::TableReference;
61#[cfg(feature = "enterprise")]
62pub use trigger::to_create_trigger_task_expr;
63
64use crate::error::{
65    BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
66    ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, FindNewColumnsOnInsertionSnafu,
67    IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidFlowNameSnafu, InvalidSqlSnafu,
68    NotSupportedSnafu, ParseSqlSnafu, PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu,
69    UnrecognizedTableOptionSnafu,
70};
71
72pub fn create_table_expr_by_column_schemas(
73    table_name: &TableReference<'_>,
74    column_schemas: &[api::v1::ColumnSchema],
75    engine: &str,
76    desc: Option<&str>,
77) -> Result<CreateTableExpr> {
78    let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
79    let expr = common_grpc_expr::util::build_create_table_expr(
80        None,
81        table_name,
82        column_exprs,
83        engine,
84        desc.unwrap_or("Created on insertion"),
85    )
86    .context(BuildCreateExprOnInsertionSnafu)?;
87
88    validate_create_expr(&expr)?;
89    Ok(expr)
90}
91
92pub(crate) fn extract_add_columns_expr(
93    schema: &Schema,
94    column_exprs: Vec<ColumnExpr>,
95) -> Result<Option<AddColumns>> {
96    let add_columns = common_grpc_expr::util::extract_new_columns(schema, column_exprs)
97        .context(FindNewColumnsOnInsertionSnafu)?;
98    if let Some(add_columns) = &add_columns {
99        validate_add_columns_expr(add_columns)?;
100    }
101    Ok(add_columns)
102}
103
104//   cpu float64,
105//   memory float64,
106//   TIME INDEX (ts),
107//   PRIMARY KEY(host)
108// ) WITH (location='/var/data/city.csv', format='csv');
109// ```
110// The user needs to specify the TIME INDEX column. If there is no suitable
111// column in the file to use as TIME INDEX, an additional placeholder column
112// needs to be created as the TIME INDEX, and a `DEFAULT <value>` constraint
113// should be added.
114//
115//
116// When the `CREATE EXTERNAL TABLE` statement is in inferred form, like
117// ```sql
118// CREATE EXTERNAL TABLE IF NOT EXISTS city WITH (location='/var/data/city.csv',format='csv');
119// ```
120// 1. If the TIME INDEX column can be inferred from metadata, use that column
121//    as the TIME INDEX. Otherwise,
122// 2. If a column named `greptime_timestamp` exists (with the requirement that
123//    the column is with type TIMESTAMP, otherwise an error is thrown), use
124//    that column as the TIME INDEX. Otherwise,
125// 3. Automatically create the `greptime_timestamp` column and add a `DEFAULT 0`
126//    constraint.
127pub(crate) async fn create_external_expr(
128    create: CreateExternalTable,
129    query_ctx: &QueryContextRef,
130) -> Result<CreateTableExpr> {
131    let (catalog_name, schema_name, table_name) =
132        table_idents_to_full_name(&create.name, query_ctx)
133            .map_err(BoxedError::new)
134            .context(ExternalSnafu)?;
135
136    let mut table_options = create.options.into_map();
137
138    let (object_store, files) = prepare_file_table_files(&table_options)
139        .await
140        .context(PrepareFileTableSnafu)?;
141
142    let file_column_schemas = infer_file_table_schema(&object_store, &files, &table_options)
143        .await
144        .context(InferFileTableSchemaSnafu)?
145        .column_schemas;
146
147    let (time_index, primary_keys, table_column_schemas) = if !create.columns.is_empty() {
148        // expanded form
149        let time_index = find_time_index(&create.constraints)?;
150        let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
151        let column_schemas =
152            columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
153        (time_index, primary_keys, column_schemas)
154    } else {
155        // inferred form
156        let (column_schemas, time_index) = file_column_schemas_to_table(&file_column_schemas);
157        let primary_keys = vec![];
158        (time_index, primary_keys, column_schemas)
159    };
160
161    check_file_to_table_schema_compatibility(&file_column_schemas, &table_column_schemas)
162        .context(SchemaIncompatibleSnafu)?;
163
164    let meta = FileOptions {
165        files,
166        file_column_schemas,
167    };
168    table_options.insert(
169        FILE_TABLE_META_KEY.to_string(),
170        serde_json::to_string(&meta).context(EncodeJsonSnafu)?,
171    );
172
173    let column_defs = column_schemas_to_defs(table_column_schemas, &primary_keys)?;
174    let expr = CreateTableExpr {
175        catalog_name,
176        schema_name,
177        table_name,
178        desc: String::default(),
179        column_defs,
180        time_index,
181        primary_keys,
182        create_if_not_exists: create.if_not_exists,
183        table_options,
184        table_id: None,
185        engine: create.engine.to_string(),
186    };
187
188    Ok(expr)
189}
190
191/// Convert `CreateTable` statement to [`CreateTableExpr`] gRPC request.
192pub fn create_to_expr(
193    create: &CreateTable,
194    query_ctx: &QueryContextRef,
195) -> Result<CreateTableExpr> {
196    let (catalog_name, schema_name, table_name) =
197        table_idents_to_full_name(&create.name, query_ctx)
198            .map_err(BoxedError::new)
199            .context(ExternalSnafu)?;
200
201    let time_index = find_time_index(&create.constraints)?;
202    let table_options = HashMap::from(
203        &TableOptions::try_from_iter(create.options.to_str_map())
204            .context(UnrecognizedTableOptionSnafu)?,
205    );
206
207    let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
208
209    let expr = CreateTableExpr {
210        catalog_name,
211        schema_name,
212        table_name,
213        desc: String::default(),
214        column_defs: columns_to_expr(
215            &create.columns,
216            &time_index,
217            &primary_keys,
218            Some(&query_ctx.timezone()),
219        )?,
220        time_index,
221        primary_keys,
222        create_if_not_exists: create.if_not_exists,
223        table_options,
224        table_id: None,
225        engine: create.engine.to_string(),
226    };
227
228    validate_create_expr(&expr)?;
229    Ok(expr)
230}
231
232/// Validate the [`CreateTableExpr`] request.
233pub fn validate_create_expr(create: &CreateTableExpr) -> Result<()> {
234    // construct column list
235    let mut column_to_indices = HashMap::with_capacity(create.column_defs.len());
236    for (idx, column) in create.column_defs.iter().enumerate() {
237        if let Some(indices) = column_to_indices.get(&column.name) {
238            return InvalidSqlSnafu {
239                err_msg: format!(
240                    "column name `{}` is duplicated at index {} and {}",
241                    column.name, indices, idx
242                ),
243            }
244            .fail();
245        }
246        column_to_indices.insert(&column.name, idx);
247    }
248
249    // verify time_index exists
250    let _ = column_to_indices
251        .get(&create.time_index)
252        .with_context(|| InvalidSqlSnafu {
253            err_msg: format!(
254                "column name `{}` is not found in column list",
255                create.time_index
256            ),
257        })?;
258
259    // verify primary_key exists
260    for pk in &create.primary_keys {
261        let _ = column_to_indices
262            .get(&pk)
263            .with_context(|| InvalidSqlSnafu {
264                err_msg: format!("column name `{}` is not found in column list", pk),
265            })?;
266    }
267
268    // construct primary_key set
269    let mut pk_set = HashSet::new();
270    for pk in &create.primary_keys {
271        if !pk_set.insert(pk) {
272            return InvalidSqlSnafu {
273                err_msg: format!("column name `{}` is duplicated in primary keys", pk),
274            }
275            .fail();
276        }
277    }
278
279    // verify time index is not primary key
280    if pk_set.contains(&create.time_index) {
281        return InvalidSqlSnafu {
282            err_msg: format!(
283                "column name `{}` is both primary key and time index",
284                create.time_index
285            ),
286        }
287        .fail();
288    }
289
290    for column in &create.column_defs {
291        // verify do not contain interval type column issue #3235
292        if is_interval_type(&column.data_type()) {
293            return InvalidSqlSnafu {
294                err_msg: format!(
295                    "column name `{}` is interval type, which is not supported",
296                    column.name
297                ),
298            }
299            .fail();
300        }
301        // verify do not contain datetime type column issue #5489
302        if is_date_time_type(&column.data_type()) {
303            return InvalidSqlSnafu {
304                err_msg: format!(
305                    "column name `{}` is datetime type, which is not supported, please use `timestamp` type instead",
306                    column.name
307                ),
308            }
309            .fail();
310        }
311    }
312    Ok(())
313}
314
315fn validate_add_columns_expr(add_columns: &AddColumns) -> Result<()> {
316    for add_column in &add_columns.add_columns {
317        let Some(column_def) = &add_column.column_def else {
318            continue;
319        };
320        if is_date_time_type(&column_def.data_type()) {
321            return InvalidSqlSnafu {
322                    err_msg: format!("column name `{}` is datetime type, which is not supported, please use `timestamp` type instead", column_def.name),
323                }
324                .fail();
325        }
326        if is_interval_type(&column_def.data_type()) {
327            return InvalidSqlSnafu {
328                err_msg: format!(
329                    "column name `{}` is interval type, which is not supported",
330                    column_def.name
331                ),
332            }
333            .fail();
334        }
335    }
336    Ok(())
337}
338
339fn is_date_time_type(data_type: &ColumnDataType) -> bool {
340    matches!(data_type, ColumnDataType::Datetime)
341}
342
343fn is_interval_type(data_type: &ColumnDataType) -> bool {
344    matches!(
345        data_type,
346        ColumnDataType::IntervalYearMonth
347            | ColumnDataType::IntervalDayTime
348            | ColumnDataType::IntervalMonthDayNano
349    )
350}
351
352fn find_primary_keys(
353    columns: &[SqlColumn],
354    constraints: &[TableConstraint],
355) -> Result<Vec<String>> {
356    let columns_pk = columns
357        .iter()
358        .filter_map(|x| {
359            if x.options().iter().any(|o| {
360                matches!(
361                    o.option,
362                    ColumnOption::Unique {
363                        is_primary: true,
364                        ..
365                    }
366                )
367            }) {
368                Some(x.name().value.clone())
369            } else {
370                None
371            }
372        })
373        .collect::<Vec<String>>();
374
375    ensure!(
376        columns_pk.len() <= 1,
377        IllegalPrimaryKeysDefSnafu {
378            msg: "not allowed to inline multiple primary keys in columns options"
379        }
380    );
381
382    let constraints_pk = constraints
383        .iter()
384        .filter_map(|constraint| match constraint {
385            TableConstraint::PrimaryKey { columns, .. } => {
386                Some(columns.iter().map(|ident| ident.value.clone()))
387            }
388            _ => None,
389        })
390        .flatten()
391        .collect::<Vec<String>>();
392
393    ensure!(
394        columns_pk.is_empty() || constraints_pk.is_empty(),
395        IllegalPrimaryKeysDefSnafu {
396            msg: "found definitions of primary keys in multiple places"
397        }
398    );
399
400    let mut primary_keys = Vec::with_capacity(columns_pk.len() + constraints_pk.len());
401    primary_keys.extend(columns_pk);
402    primary_keys.extend(constraints_pk);
403    Ok(primary_keys)
404}
405
406pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
407    let time_index = constraints
408        .iter()
409        .filter_map(|constraint| match constraint {
410            TableConstraint::TimeIndex { column, .. } => Some(&column.value),
411            _ => None,
412        })
413        .collect::<Vec<&String>>();
414    ensure!(
415        time_index.len() == 1,
416        InvalidSqlSnafu {
417            err_msg: "must have one and only one TimeIndex columns",
418        }
419    );
420    Ok(time_index.first().unwrap().to_string())
421}
422
423fn columns_to_expr(
424    column_defs: &[SqlColumn],
425    time_index: &str,
426    primary_keys: &[String],
427    timezone: Option<&Timezone>,
428) -> Result<Vec<api::v1::ColumnDef>> {
429    let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
430    column_schemas_to_defs(column_schemas, primary_keys)
431}
432
433fn columns_to_column_schemas(
434    columns: &[SqlColumn],
435    time_index: &str,
436    timezone: Option<&Timezone>,
437) -> Result<Vec<ColumnSchema>> {
438    columns
439        .iter()
440        .map(|c| column_to_schema(c, time_index, timezone).context(ParseSqlSnafu))
441        .collect::<Result<Vec<ColumnSchema>>>()
442}
443
444pub fn column_schemas_to_defs(
445    column_schemas: Vec<ColumnSchema>,
446    primary_keys: &[String],
447) -> Result<Vec<api::v1::ColumnDef>> {
448    let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
449        .iter()
450        .map(|c| {
451            ColumnDataTypeWrapper::try_from(c.data_type.clone())
452                .map(|w| w.to_parts())
453                .context(ColumnDataTypeSnafu)
454        })
455        .collect::<Result<Vec<_>>>()?;
456
457    column_schemas
458        .iter()
459        .zip(column_datatypes)
460        .map(|(schema, datatype)| {
461            let semantic_type = if schema.is_time_index() {
462                SemanticType::Timestamp
463            } else if primary_keys.contains(&schema.name) {
464                SemanticType::Tag
465            } else {
466                SemanticType::Field
467            } as i32;
468            let comment = schema
469                .metadata()
470                .get(COMMENT_KEY)
471                .cloned()
472                .unwrap_or_default();
473
474            Ok(api::v1::ColumnDef {
475                name: schema.name.clone(),
476                data_type: datatype.0 as i32,
477                is_nullable: schema.is_nullable(),
478                default_constraint: match schema.default_constraint() {
479                    None => vec![],
480                    Some(v) => {
481                        v.clone()
482                            .try_into()
483                            .context(ConvertColumnDefaultConstraintSnafu {
484                                column_name: &schema.name,
485                            })?
486                    }
487                },
488                semantic_type,
489                comment,
490                datatype_extension: datatype.1,
491                options: options_from_column_schema(schema),
492            })
493        })
494        .collect()
495}
496
497/// Converts a SQL alter table statement into a gRPC alter table expression.
498pub(crate) fn to_alter_table_expr(
499    alter_table: AlterTable,
500    query_ctx: &QueryContextRef,
501) -> Result<AlterTableExpr> {
502    let (catalog_name, schema_name, table_name) =
503        table_idents_to_full_name(alter_table.table_name(), query_ctx)
504            .map_err(BoxedError::new)
505            .context(ExternalSnafu)?;
506
507    let kind = match alter_table.alter_operation {
508        AlterTableOperation::AddConstraint(_) => {
509            return NotSupportedSnafu {
510                feat: "ADD CONSTRAINT",
511            }
512            .fail();
513        }
514        AlterTableOperation::AddColumns { add_columns } => AlterTableKind::AddColumns(AddColumns {
515            add_columns: add_columns
516                .into_iter()
517                .map(|add_column| {
518                    let column_def = sql_column_def_to_grpc_column_def(
519                        &add_column.column_def,
520                        Some(&query_ctx.timezone()),
521                    )
522                    .map_err(BoxedError::new)
523                    .context(ExternalSnafu)?;
524                    if is_interval_type(&column_def.data_type()) {
525                        return NotSupportedSnafu {
526                            feat: "Add column with interval type",
527                        }
528                        .fail();
529                    }
530                    Ok(AddColumn {
531                        column_def: Some(column_def),
532                        location: add_column.location.as_ref().map(From::from),
533                        add_if_not_exists: add_column.add_if_not_exists,
534                    })
535                })
536                .collect::<Result<Vec<AddColumn>>>()?,
537        }),
538        AlterTableOperation::ModifyColumnType {
539            column_name,
540            target_type,
541        } => {
542            let target_type =
543                sql_data_type_to_concrete_data_type(&target_type).context(ParseSqlSnafu)?;
544            let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
545                .map(|w| w.to_parts())
546                .context(ColumnDataTypeSnafu)?;
547            if is_interval_type(&target_type) {
548                return NotSupportedSnafu {
549                    feat: "Modify column type to interval type",
550                }
551                .fail();
552            }
553            AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
554                modify_column_types: vec![ModifyColumnType {
555                    column_name: column_name.value,
556                    target_type: target_type as i32,
557                    target_type_extension,
558                }],
559            })
560        }
561        AlterTableOperation::DropColumn { name } => AlterTableKind::DropColumns(DropColumns {
562            drop_columns: vec![DropColumn {
563                name: name.value.to_string(),
564            }],
565        }),
566        AlterTableOperation::RenameTable { new_table_name } => {
567            AlterTableKind::RenameTable(RenameTable {
568                new_table_name: new_table_name.to_string(),
569            })
570        }
571        AlterTableOperation::SetTableOptions { options } => {
572            AlterTableKind::SetTableOptions(SetTableOptions {
573                table_options: options.into_iter().map(Into::into).collect(),
574            })
575        }
576        AlterTableOperation::UnsetTableOptions { keys } => {
577            AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys })
578        }
579        AlterTableOperation::SetIndex { options } => AlterTableKind::SetIndex(match options {
580            sql::statements::alter::SetIndexOperation::Fulltext {
581                column_name,
582                options,
583            } => SetIndex {
584                options: Some(set_index::Options::Fulltext(SetFulltext {
585                    column_name: column_name.value,
586                    enable: options.enable,
587                    analyzer: match options.analyzer {
588                        FulltextAnalyzer::English => Analyzer::English.into(),
589                        FulltextAnalyzer::Chinese => Analyzer::Chinese.into(),
590                    },
591                    case_sensitive: options.case_sensitive,
592                    backend: match options.backend {
593                        FulltextBackend::Bloom => PbFulltextBackend::Bloom.into(),
594                        FulltextBackend::Tantivy => PbFulltextBackend::Tantivy.into(),
595                    },
596                })),
597            },
598            sql::statements::alter::SetIndexOperation::Inverted { column_name } => SetIndex {
599                options: Some(set_index::Options::Inverted(SetInverted {
600                    column_name: column_name.value,
601                })),
602            },
603            sql::statements::alter::SetIndexOperation::Skipping {
604                column_name,
605                options,
606            } => SetIndex {
607                options: Some(set_index::Options::Skipping(SetSkipping {
608                    column_name: column_name.value,
609                    enable: true,
610                    granularity: options.granularity as u64,
611                    skipping_index_type: match options.index_type {
612                        SkippingIndexType::BloomFilter => PbSkippingIndexType::BloomFilter.into(),
613                    },
614                })),
615            },
616        }),
617        AlterTableOperation::UnsetIndex { options } => AlterTableKind::UnsetIndex(match options {
618            sql::statements::alter::UnsetIndexOperation::Fulltext { column_name } => UnsetIndex {
619                options: Some(unset_index::Options::Fulltext(UnsetFulltext {
620                    column_name: column_name.value,
621                })),
622            },
623            sql::statements::alter::UnsetIndexOperation::Inverted { column_name } => UnsetIndex {
624                options: Some(unset_index::Options::Inverted(UnsetInverted {
625                    column_name: column_name.value,
626                })),
627            },
628            sql::statements::alter::UnsetIndexOperation::Skipping { column_name } => UnsetIndex {
629                options: Some(unset_index::Options::Skipping(UnsetSkipping {
630                    column_name: column_name.value,
631                })),
632            },
633        }),
634        AlterTableOperation::DropDefaults { columns } => {
635            AlterTableKind::DropDefaults(DropDefaults {
636                drop_defaults: columns
637                    .into_iter()
638                    .map(|col| {
639                        let column_name = col.0.to_string();
640                        Ok(api::v1::DropDefault { column_name })
641                    })
642                    .collect::<Result<Vec<_>>>()?,
643            })
644        }
645    };
646
647    Ok(AlterTableExpr {
648        catalog_name,
649        schema_name,
650        table_name,
651        kind: Some(kind),
652    })
653}
654
655/// Try to cast the `[AlterDatabase]` statement into gRPC `[AlterDatabaseExpr]`.
656pub fn to_alter_database_expr(
657    alter_database: AlterDatabase,
658    query_ctx: &QueryContextRef,
659) -> Result<AlterDatabaseExpr> {
660    let catalog = query_ctx.current_catalog();
661    let schema = alter_database.database_name;
662
663    let kind = match alter_database.alter_operation {
664        AlterDatabaseOperation::SetDatabaseOption { options } => {
665            let options = options.into_iter().map(Into::into).collect();
666            AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
667                set_database_options: options,
668            })
669        }
670        AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
671            AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys })
672        }
673    };
674
675    Ok(AlterDatabaseExpr {
676        catalog_name: catalog.to_string(),
677        schema_name: schema.to_string(),
678        kind: Some(kind),
679    })
680}
681
682/// Try to cast the `[CreateViewExpr]` statement into gRPC `[CreateViewExpr]`.
683pub fn to_create_view_expr(
684    stmt: CreateView,
685    logical_plan: Vec<u8>,
686    table_names: Vec<TableName>,
687    columns: Vec<String>,
688    plan_columns: Vec<String>,
689    definition: String,
690    query_ctx: QueryContextRef,
691) -> Result<CreateViewExpr> {
692    let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx)
693        .map_err(BoxedError::new)
694        .context(ExternalSnafu)?;
695
696    let expr = CreateViewExpr {
697        catalog_name,
698        schema_name,
699        view_name,
700        logical_plan,
701        create_if_not_exists: stmt.if_not_exists,
702        or_replace: stmt.or_replace,
703        table_names,
704        columns,
705        plan_columns,
706        definition,
707    };
708
709    Ok(expr)
710}
711
712pub fn to_create_flow_task_expr(
713    create_flow: CreateFlow,
714    query_ctx: &QueryContextRef,
715) -> Result<CreateFlowExpr> {
716    // retrieve sink table name
717    let sink_table_ref =
718        object_name_to_table_reference(create_flow.sink_table_name.clone().into(), true)
719            .with_context(|_| ConvertIdentifierSnafu {
720                ident: create_flow.sink_table_name.to_string(),
721            })?;
722    let catalog = sink_table_ref
723        .catalog()
724        .unwrap_or(query_ctx.current_catalog())
725        .to_string();
726    let schema = sink_table_ref
727        .schema()
728        .map(|s| s.to_owned())
729        .unwrap_or(query_ctx.current_schema());
730
731    let sink_table_name = TableName {
732        catalog_name: catalog,
733        schema_name: schema,
734        table_name: sink_table_ref.table().to_string(),
735    };
736
737    let source_table_names = extract_tables_from_query(&create_flow.query)
738        .map(|name| {
739            let reference = object_name_to_table_reference(name.clone().into(), true)
740                .with_context(|_| ConvertIdentifierSnafu {
741                    ident: name.to_string(),
742                })?;
743            let catalog = reference
744                .catalog()
745                .unwrap_or(query_ctx.current_catalog())
746                .to_string();
747            let schema = reference
748                .schema()
749                .map(|s| s.to_string())
750                .unwrap_or(query_ctx.current_schema());
751
752            let table_name = TableName {
753                catalog_name: catalog,
754                schema_name: schema,
755                table_name: reference.table().to_string(),
756            };
757            Ok(table_name)
758        })
759        .collect::<Result<Vec<_>>>()?;
760
761    Ok(CreateFlowExpr {
762        catalog_name: query_ctx.current_catalog().to_string(),
763        flow_name: sanitize_flow_name(create_flow.flow_name)?,
764        source_table_names,
765        sink_table_name: Some(sink_table_name),
766        or_replace: create_flow.or_replace,
767        create_if_not_exists: create_flow.if_not_exists,
768        expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
769        comment: create_flow.comment.unwrap_or_default(),
770        sql: create_flow.query.to_string(),
771        flow_options: HashMap::new(),
772    })
773}
774
775/// sanitize the flow name, remove possible quotes
776fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
777    ensure!(
778        flow_name.0.len() == 1,
779        InvalidFlowNameSnafu {
780            name: flow_name.to_string(),
781        }
782    );
783    // safety: we've checked flow_name.0 has exactly one element.
784    Ok(flow_name.0.swap_remove(0).value)
785}
786
787#[cfg(test)]
788mod tests {
789    use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
790    use datatypes::value::Value;
791    use session::context::{QueryContext, QueryContextBuilder};
792    use sql::dialect::GreptimeDbDialect;
793    use sql::parser::{ParseOptions, ParserContext};
794    use sql::statements::statement::Statement;
795    use store_api::storage::ColumnDefaultConstraint;
796
797    use super::*;
798
799    #[test]
800    fn test_create_flow_tql_expr() {
801        let sql = r#"
802CREATE FLOW calc_reqs SINK TO cnt_reqs AS
803TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#;
804        let stmt =
805            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
806                .unwrap()
807                .pop()
808                .unwrap();
809
810        let Statement::CreateFlow(create_flow) = stmt else {
811            unreachable!()
812        };
813        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
814
815        let to_dot_sep =
816            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
817        assert_eq!("calc_reqs", expr.flow_name);
818        assert_eq!("greptime", expr.catalog_name);
819        assert_eq!(
820            "greptime.public.cnt_reqs",
821            expr.sink_table_name.map(to_dot_sep).unwrap()
822        );
823        assert!(expr.source_table_names.is_empty());
824        assert_eq!(
825            r#"TQL EVAL (0, 15, '5s') count_values("status_code", http_requests)"#,
826            expr.sql
827        );
828    }
829
830    #[test]
831    fn test_create_flow_expr() {
832        let sql = r"
833CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
834SELECT
835    DISTINCT number as dis
836FROM
837    distinct_basic;";
838        let stmt =
839            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
840                .unwrap()
841                .pop()
842                .unwrap();
843
844        let Statement::CreateFlow(create_flow) = stmt else {
845            unreachable!()
846        };
847        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
848
849        let to_dot_sep =
850            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
851        assert_eq!("test_distinct_basic", expr.flow_name);
852        assert_eq!("greptime", expr.catalog_name);
853        assert_eq!(
854            "greptime.public.out_distinct_basic",
855            expr.sink_table_name.map(to_dot_sep).unwrap()
856        );
857        assert_eq!(1, expr.source_table_names.len());
858        assert_eq!(
859            "greptime.public.distinct_basic",
860            to_dot_sep(expr.source_table_names[0].clone())
861        );
862        assert_eq!(
863            r"SELECT
864    DISTINCT number as dis
865FROM
866    distinct_basic",
867            expr.sql
868        );
869
870        let sql = r"
871CREATE FLOW `task_2`
872SINK TO schema_1.table_1
873AS
874SELECT max(c1), min(c2) FROM schema_2.table_2;";
875        let stmt =
876            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
877                .unwrap()
878                .pop()
879                .unwrap();
880
881        let Statement::CreateFlow(create_flow) = stmt else {
882            unreachable!()
883        };
884        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
885
886        let to_dot_sep =
887            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
888        assert_eq!("task_2", expr.flow_name);
889        assert_eq!("greptime", expr.catalog_name);
890        assert_eq!(
891            "greptime.schema_1.table_1",
892            expr.sink_table_name.map(to_dot_sep).unwrap()
893        );
894        assert_eq!(1, expr.source_table_names.len());
895        assert_eq!(
896            "greptime.schema_2.table_2",
897            to_dot_sep(expr.source_table_names[0].clone())
898        );
899        assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
900
901        let sql = r"
902CREATE FLOW abc.`task_2`
903SINK TO schema_1.table_1
904AS
905SELECT max(c1), min(c2) FROM schema_2.table_2;";
906        let stmt =
907            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
908                .unwrap()
909                .pop()
910                .unwrap();
911
912        let Statement::CreateFlow(create_flow) = stmt else {
913            unreachable!()
914        };
915        let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
916
917        assert!(res.is_err());
918        assert!(res
919            .unwrap_err()
920            .to_string()
921            .contains("Invalid flow name: abc.`task_2`"));
922    }
923
924    #[test]
925    fn test_create_to_expr() {
926        let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(ttl='3days', write_buffer_size='1024KB');";
927        let stmt =
928            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
929                .unwrap()
930                .pop()
931                .unwrap();
932
933        let Statement::CreateTable(create_table) = stmt else {
934            unreachable!()
935        };
936        let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
937        assert_eq!("3days", expr.table_options.get("ttl").unwrap());
938        assert_eq!(
939            "1.0MiB",
940            expr.table_options.get("write_buffer_size").unwrap()
941        );
942    }
943
944    #[test]
945    fn test_invalid_create_to_expr() {
946        let cases = [
947            // duplicate column declaration
948            "CREATE TABLE monitor (host STRING primary key, ts TIMESTAMP TIME INDEX, some_column text, some_column string);",
949            // duplicate primary key
950            "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, some_column STRING, PRIMARY KEY (some_column, host, some_column));",
951            // time index is primary key
952            "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, PRIMARY KEY (host, ts));"
953        ];
954
955        for sql in cases {
956            let stmt = ParserContext::create_with_dialect(
957                sql,
958                &GreptimeDbDialect {},
959                ParseOptions::default(),
960            )
961            .unwrap()
962            .pop()
963            .unwrap();
964            let Statement::CreateTable(create_table) = stmt else {
965                unreachable!()
966            };
967            create_to_expr(&create_table, &QueryContext::arc()).unwrap_err();
968        }
969    }
970
971    #[test]
972    fn test_create_to_expr_with_default_timestamp_value() {
973        let sql = "CREATE TABLE monitor (v double,ts TIMESTAMP default '2024-01-30T00:01:01',TIME INDEX (ts)) engine=mito;";
974        let stmt =
975            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
976                .unwrap()
977                .pop()
978                .unwrap();
979
980        let Statement::CreateTable(create_table) = stmt else {
981            unreachable!()
982        };
983
984        // query context with system timezone UTC.
985        let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
986        let ts_column = &expr.column_defs[1];
987        let constraint = assert_ts_column(ts_column);
988        assert!(
989            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
990                         if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
991        );
992
993        // query context with timezone `+08:00`
994        let ctx = QueryContextBuilder::default()
995            .timezone(Timezone::from_tz_string("+08:00").unwrap())
996            .build()
997            .into();
998        let expr = create_to_expr(&create_table, &ctx).unwrap();
999        let ts_column = &expr.column_defs[1];
1000        let constraint = assert_ts_column(ts_column);
1001        assert!(
1002            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1003                         if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1004        );
1005    }
1006
1007    fn assert_ts_column(ts_column: &api::v1::ColumnDef) -> ColumnDefaultConstraint {
1008        assert_eq!("ts", ts_column.name);
1009        assert_eq!(
1010            ColumnDataType::TimestampMillisecond as i32,
1011            ts_column.data_type
1012        );
1013        assert!(!ts_column.default_constraint.is_empty());
1014
1015        ColumnDefaultConstraint::try_from(&ts_column.default_constraint[..]).unwrap()
1016    }
1017
1018    #[test]
1019    fn test_to_alter_expr() {
1020        let sql = "ALTER DATABASE greptime SET key1='value1', key2='value2';";
1021        let stmt =
1022            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1023                .unwrap()
1024                .pop()
1025                .unwrap();
1026
1027        let Statement::AlterDatabase(alter_database) = stmt else {
1028            unreachable!()
1029        };
1030
1031        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1032        let kind = expr.kind.unwrap();
1033
1034        let AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
1035            set_database_options,
1036        }) = kind
1037        else {
1038            unreachable!()
1039        };
1040
1041        assert_eq!(2, set_database_options.len());
1042        assert_eq!("key1", set_database_options[0].key);
1043        assert_eq!("value1", set_database_options[0].value);
1044        assert_eq!("key2", set_database_options[1].key);
1045        assert_eq!("value2", set_database_options[1].value);
1046
1047        let sql = "ALTER DATABASE greptime UNSET key1, key2;";
1048        let stmt =
1049            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1050                .unwrap()
1051                .pop()
1052                .unwrap();
1053
1054        let Statement::AlterDatabase(alter_database) = stmt else {
1055            unreachable!()
1056        };
1057
1058        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1059        let kind = expr.kind.unwrap();
1060
1061        let AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys }) = kind else {
1062            unreachable!()
1063        };
1064
1065        assert_eq!(2, keys.len());
1066        assert!(keys.contains(&"key1".to_string()));
1067        assert!(keys.contains(&"key2".to_string()));
1068
1069        let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
1070        let stmt =
1071            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1072                .unwrap()
1073                .pop()
1074                .unwrap();
1075
1076        let Statement::AlterTable(alter_table) = stmt else {
1077            unreachable!()
1078        };
1079
1080        // query context with system timezone UTC.
1081        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1082        let kind = expr.kind.unwrap();
1083
1084        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1085            unreachable!()
1086        };
1087
1088        assert_eq!(1, add_columns.len());
1089        let ts_column = add_columns[0].column_def.clone().unwrap();
1090        let constraint = assert_ts_column(&ts_column);
1091        assert!(
1092            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1093                         if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1094        );
1095
1096        //
1097        // query context with timezone `+08:00`
1098        let ctx = QueryContextBuilder::default()
1099            .timezone(Timezone::from_tz_string("+08:00").unwrap())
1100            .build()
1101            .into();
1102        let expr = to_alter_table_expr(alter_table, &ctx).unwrap();
1103        let kind = expr.kind.unwrap();
1104
1105        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1106            unreachable!()
1107        };
1108
1109        assert_eq!(1, add_columns.len());
1110        let ts_column = add_columns[0].column_def.clone().unwrap();
1111        let constraint = assert_ts_column(&ts_column);
1112        assert!(
1113            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1114                         if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1115        );
1116    }
1117
1118    #[test]
1119    fn test_to_alter_modify_column_type_expr() {
1120        let sql = "ALTER TABLE monitor MODIFY COLUMN mem_usage STRING;";
1121        let stmt =
1122            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1123                .unwrap()
1124                .pop()
1125                .unwrap();
1126
1127        let Statement::AlterTable(alter_table) = stmt else {
1128            unreachable!()
1129        };
1130
1131        // query context with system timezone UTC.
1132        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1133        let kind = expr.kind.unwrap();
1134
1135        let AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
1136            modify_column_types,
1137        }) = kind
1138        else {
1139            unreachable!()
1140        };
1141
1142        assert_eq!(1, modify_column_types.len());
1143        let modify_column_type = &modify_column_types[0];
1144
1145        assert_eq!("mem_usage", modify_column_type.column_name);
1146        assert_eq!(
1147            ColumnDataType::String as i32,
1148            modify_column_type.target_type
1149        );
1150        assert!(modify_column_type.target_type_extension.is_none());
1151    }
1152
1153    fn new_test_table_names() -> Vec<TableName> {
1154        vec![
1155            TableName {
1156                catalog_name: "greptime".to_string(),
1157                schema_name: "public".to_string(),
1158                table_name: "a_table".to_string(),
1159            },
1160            TableName {
1161                catalog_name: "greptime".to_string(),
1162                schema_name: "public".to_string(),
1163                table_name: "b_table".to_string(),
1164            },
1165        ]
1166    }
1167
1168    #[test]
1169    fn test_to_create_view_expr() {
1170        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
1171        let stmt =
1172            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1173                .unwrap()
1174                .pop()
1175                .unwrap();
1176
1177        let Statement::CreateView(stmt) = stmt else {
1178            unreachable!()
1179        };
1180
1181        let logical_plan = vec![1, 2, 3];
1182        let table_names = new_test_table_names();
1183        let columns = vec!["a".to_string()];
1184        let plan_columns = vec!["number".to_string()];
1185
1186        let expr = to_create_view_expr(
1187            stmt,
1188            logical_plan.clone(),
1189            table_names.clone(),
1190            columns.clone(),
1191            plan_columns.clone(),
1192            sql.to_string(),
1193            QueryContext::arc(),
1194        )
1195        .unwrap();
1196
1197        assert_eq!("greptime", expr.catalog_name);
1198        assert_eq!("public", expr.schema_name);
1199        assert_eq!("test", expr.view_name);
1200        assert!(!expr.create_if_not_exists);
1201        assert!(!expr.or_replace);
1202        assert_eq!(logical_plan, expr.logical_plan);
1203        assert_eq!(table_names, expr.table_names);
1204        assert_eq!(sql, expr.definition);
1205        assert_eq!(columns, expr.columns);
1206        assert_eq!(plan_columns, expr.plan_columns);
1207    }
1208
1209    #[test]
1210    fn test_to_create_view_expr_complex() {
1211        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test.test_view AS SELECT * FROM NUMBERS";
1212        let stmt =
1213            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1214                .unwrap()
1215                .pop()
1216                .unwrap();
1217
1218        let Statement::CreateView(stmt) = stmt else {
1219            unreachable!()
1220        };
1221
1222        let logical_plan = vec![1, 2, 3];
1223        let table_names = new_test_table_names();
1224        let columns = vec!["a".to_string()];
1225        let plan_columns = vec!["number".to_string()];
1226
1227        let expr = to_create_view_expr(
1228            stmt,
1229            logical_plan.clone(),
1230            table_names.clone(),
1231            columns.clone(),
1232            plan_columns.clone(),
1233            sql.to_string(),
1234            QueryContext::arc(),
1235        )
1236        .unwrap();
1237
1238        assert_eq!("greptime", expr.catalog_name);
1239        assert_eq!("test", expr.schema_name);
1240        assert_eq!("test_view", expr.view_name);
1241        assert!(expr.create_if_not_exists);
1242        assert!(expr.or_replace);
1243        assert_eq!(logical_plan, expr.logical_plan);
1244        assert_eq!(table_names, expr.table_names);
1245        assert_eq!(sql, expr.definition);
1246        assert_eq!(columns, expr.columns);
1247        assert_eq!(plan_columns, expr.plan_columns);
1248    }
1249}