operator/
expr_helper.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19
20use api::helper::ColumnDataTypeWrapper;
21use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
22use api::v1::alter_table_expr::Kind as AlterTableKind;
23use api::v1::column_def::options_from_column_schema;
24use api::v1::{
25    set_index, unset_index, AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer,
26    ColumnDataType, ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
27    DropColumn, DropColumns, DropDefaults, ExpireAfter, FulltextBackend as PbFulltextBackend,
28    ModifyColumnType, ModifyColumnTypes, RenameTable, SemanticType, SetDatabaseOptions,
29    SetDefaults, SetFulltext, SetIndex, SetIndexes, SetInverted, SetSkipping, SetTableOptions,
30    SkippingIndexType as PbSkippingIndexType, TableName, UnsetDatabaseOptions, UnsetFulltext,
31    UnsetIndex, UnsetIndexes, UnsetInverted, UnsetSkipping, UnsetTableOptions,
32};
33use common_error::ext::BoxedError;
34use common_grpc_expr::util::ColumnExpr;
35use common_time::Timezone;
36use datafusion::sql::planner::object_name_to_table_reference;
37use datatypes::schema::{
38    ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SkippingIndexType, COMMENT_KEY,
39};
40use file_engine::FileOptions;
41use query::sql::{
42    check_file_to_table_schema_compatibility, file_column_schemas_to_table,
43    infer_file_table_schema, prepare_file_table_files,
44};
45use session::context::QueryContextRef;
46use session::table_name::table_idents_to_full_name;
47use snafu::{ensure, OptionExt, ResultExt};
48use sql::ast::{ColumnOption, ObjectName};
49use sql::statements::alter::{
50    AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
51};
52use sql::statements::create::{
53    Column as SqlColumn, CreateExternalTable, CreateFlow, CreateTable, CreateView, TableConstraint,
54};
55use sql::statements::{
56    column_to_schema, sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type,
57};
58use sql::util::extract_tables_from_query;
59use table::requests::{TableOptions, FILE_TABLE_META_KEY};
60use table::table_reference::TableReference;
61#[cfg(feature = "enterprise")]
62pub use trigger::to_create_trigger_task_expr;
63
64use crate::error::{
65    BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
66    ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, FindNewColumnsOnInsertionSnafu,
67    IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidFlowNameSnafu, InvalidSqlSnafu,
68    NotSupportedSnafu, ParseSqlSnafu, PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu,
69    UnrecognizedTableOptionSnafu,
70};
71
72pub fn create_table_expr_by_column_schemas(
73    table_name: &TableReference<'_>,
74    column_schemas: &[api::v1::ColumnSchema],
75    engine: &str,
76    desc: Option<&str>,
77) -> Result<CreateTableExpr> {
78    let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
79    let expr = common_grpc_expr::util::build_create_table_expr(
80        None,
81        table_name,
82        column_exprs,
83        engine,
84        desc.unwrap_or("Created on insertion"),
85    )
86    .context(BuildCreateExprOnInsertionSnafu)?;
87
88    validate_create_expr(&expr)?;
89    Ok(expr)
90}
91
92pub fn extract_add_columns_expr(
93    schema: &Schema,
94    column_exprs: Vec<ColumnExpr>,
95) -> Result<Option<AddColumns>> {
96    let add_columns = common_grpc_expr::util::extract_new_columns(schema, column_exprs)
97        .context(FindNewColumnsOnInsertionSnafu)?;
98    if let Some(add_columns) = &add_columns {
99        validate_add_columns_expr(add_columns)?;
100    }
101    Ok(add_columns)
102}
103
104//   cpu float64,
105//   memory float64,
106//   TIME INDEX (ts),
107//   PRIMARY KEY(host)
108// ) WITH (location='/var/data/city.csv', format='csv');
109// ```
110// The user needs to specify the TIME INDEX column. If there is no suitable
111// column in the file to use as TIME INDEX, an additional placeholder column
112// needs to be created as the TIME INDEX, and a `DEFAULT <value>` constraint
113// should be added.
114//
115//
116// When the `CREATE EXTERNAL TABLE` statement is in inferred form, like
117// ```sql
118// CREATE EXTERNAL TABLE IF NOT EXISTS city WITH (location='/var/data/city.csv',format='csv');
119// ```
120// 1. If the TIME INDEX column can be inferred from metadata, use that column
121//    as the TIME INDEX. Otherwise,
122// 2. If a column named `greptime_timestamp` exists (with the requirement that
123//    the column is with type TIMESTAMP, otherwise an error is thrown), use
124//    that column as the TIME INDEX. Otherwise,
125// 3. Automatically create the `greptime_timestamp` column and add a `DEFAULT 0`
126//    constraint.
127pub(crate) async fn create_external_expr(
128    create: CreateExternalTable,
129    query_ctx: &QueryContextRef,
130) -> Result<CreateTableExpr> {
131    let (catalog_name, schema_name, table_name) =
132        table_idents_to_full_name(&create.name, query_ctx)
133            .map_err(BoxedError::new)
134            .context(ExternalSnafu)?;
135
136    let mut table_options = create.options.into_map();
137
138    let (object_store, files) = prepare_file_table_files(&table_options)
139        .await
140        .context(PrepareFileTableSnafu)?;
141
142    let file_column_schemas = infer_file_table_schema(&object_store, &files, &table_options)
143        .await
144        .context(InferFileTableSchemaSnafu)?
145        .column_schemas;
146
147    let (time_index, primary_keys, table_column_schemas) = if !create.columns.is_empty() {
148        // expanded form
149        let time_index = find_time_index(&create.constraints)?;
150        let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
151        let column_schemas =
152            columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
153        (time_index, primary_keys, column_schemas)
154    } else {
155        // inferred form
156        let (column_schemas, time_index) = file_column_schemas_to_table(&file_column_schemas);
157        let primary_keys = vec![];
158        (time_index, primary_keys, column_schemas)
159    };
160
161    check_file_to_table_schema_compatibility(&file_column_schemas, &table_column_schemas)
162        .context(SchemaIncompatibleSnafu)?;
163
164    let meta = FileOptions {
165        files,
166        file_column_schemas,
167    };
168    table_options.insert(
169        FILE_TABLE_META_KEY.to_string(),
170        serde_json::to_string(&meta).context(EncodeJsonSnafu)?,
171    );
172
173    let column_defs = column_schemas_to_defs(table_column_schemas, &primary_keys)?;
174    let expr = CreateTableExpr {
175        catalog_name,
176        schema_name,
177        table_name,
178        desc: String::default(),
179        column_defs,
180        time_index,
181        primary_keys,
182        create_if_not_exists: create.if_not_exists,
183        table_options,
184        table_id: None,
185        engine: create.engine.to_string(),
186    };
187
188    Ok(expr)
189}
190
191/// Convert `CreateTable` statement to [`CreateTableExpr`] gRPC request.
192pub fn create_to_expr(
193    create: &CreateTable,
194    query_ctx: &QueryContextRef,
195) -> Result<CreateTableExpr> {
196    let (catalog_name, schema_name, table_name) =
197        table_idents_to_full_name(&create.name, query_ctx)
198            .map_err(BoxedError::new)
199            .context(ExternalSnafu)?;
200
201    let time_index = find_time_index(&create.constraints)?;
202    let table_options = HashMap::from(
203        &TableOptions::try_from_iter(create.options.to_str_map())
204            .context(UnrecognizedTableOptionSnafu)?,
205    );
206
207    let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
208
209    let expr = CreateTableExpr {
210        catalog_name,
211        schema_name,
212        table_name,
213        desc: String::default(),
214        column_defs: columns_to_expr(
215            &create.columns,
216            &time_index,
217            &primary_keys,
218            Some(&query_ctx.timezone()),
219        )?,
220        time_index,
221        primary_keys,
222        create_if_not_exists: create.if_not_exists,
223        table_options,
224        table_id: None,
225        engine: create.engine.to_string(),
226    };
227
228    validate_create_expr(&expr)?;
229    Ok(expr)
230}
231
232/// Validate the [`CreateTableExpr`] request.
233pub fn validate_create_expr(create: &CreateTableExpr) -> Result<()> {
234    // construct column list
235    let mut column_to_indices = HashMap::with_capacity(create.column_defs.len());
236    for (idx, column) in create.column_defs.iter().enumerate() {
237        if let Some(indices) = column_to_indices.get(&column.name) {
238            return InvalidSqlSnafu {
239                err_msg: format!(
240                    "column name `{}` is duplicated at index {} and {}",
241                    column.name, indices, idx
242                ),
243            }
244            .fail();
245        }
246        column_to_indices.insert(&column.name, idx);
247    }
248
249    // verify time_index exists
250    let _ = column_to_indices
251        .get(&create.time_index)
252        .with_context(|| InvalidSqlSnafu {
253            err_msg: format!(
254                "column name `{}` is not found in column list",
255                create.time_index
256            ),
257        })?;
258
259    // verify primary_key exists
260    for pk in &create.primary_keys {
261        let _ = column_to_indices
262            .get(&pk)
263            .with_context(|| InvalidSqlSnafu {
264                err_msg: format!("column name `{}` is not found in column list", pk),
265            })?;
266    }
267
268    // construct primary_key set
269    let mut pk_set = HashSet::new();
270    for pk in &create.primary_keys {
271        if !pk_set.insert(pk) {
272            return InvalidSqlSnafu {
273                err_msg: format!("column name `{}` is duplicated in primary keys", pk),
274            }
275            .fail();
276        }
277    }
278
279    // verify time index is not primary key
280    if pk_set.contains(&create.time_index) {
281        return InvalidSqlSnafu {
282            err_msg: format!(
283                "column name `{}` is both primary key and time index",
284                create.time_index
285            ),
286        }
287        .fail();
288    }
289
290    for column in &create.column_defs {
291        // verify do not contain interval type column issue #3235
292        if is_interval_type(&column.data_type()) {
293            return InvalidSqlSnafu {
294                err_msg: format!(
295                    "column name `{}` is interval type, which is not supported",
296                    column.name
297                ),
298            }
299            .fail();
300        }
301        // verify do not contain datetime type column issue #5489
302        if is_date_time_type(&column.data_type()) {
303            return InvalidSqlSnafu {
304                err_msg: format!(
305                    "column name `{}` is datetime type, which is not supported, please use `timestamp` type instead",
306                    column.name
307                ),
308            }
309            .fail();
310        }
311    }
312    Ok(())
313}
314
315fn validate_add_columns_expr(add_columns: &AddColumns) -> Result<()> {
316    for add_column in &add_columns.add_columns {
317        let Some(column_def) = &add_column.column_def else {
318            continue;
319        };
320        if is_date_time_type(&column_def.data_type()) {
321            return InvalidSqlSnafu {
322                    err_msg: format!("column name `{}` is datetime type, which is not supported, please use `timestamp` type instead", column_def.name),
323                }
324                .fail();
325        }
326        if is_interval_type(&column_def.data_type()) {
327            return InvalidSqlSnafu {
328                err_msg: format!(
329                    "column name `{}` is interval type, which is not supported",
330                    column_def.name
331                ),
332            }
333            .fail();
334        }
335    }
336    Ok(())
337}
338
339fn is_date_time_type(data_type: &ColumnDataType) -> bool {
340    matches!(data_type, ColumnDataType::Datetime)
341}
342
343fn is_interval_type(data_type: &ColumnDataType) -> bool {
344    matches!(
345        data_type,
346        ColumnDataType::IntervalYearMonth
347            | ColumnDataType::IntervalDayTime
348            | ColumnDataType::IntervalMonthDayNano
349    )
350}
351
352fn find_primary_keys(
353    columns: &[SqlColumn],
354    constraints: &[TableConstraint],
355) -> Result<Vec<String>> {
356    let columns_pk = columns
357        .iter()
358        .filter_map(|x| {
359            if x.options().iter().any(|o| {
360                matches!(
361                    o.option,
362                    ColumnOption::Unique {
363                        is_primary: true,
364                        ..
365                    }
366                )
367            }) {
368                Some(x.name().value.clone())
369            } else {
370                None
371            }
372        })
373        .collect::<Vec<String>>();
374
375    ensure!(
376        columns_pk.len() <= 1,
377        IllegalPrimaryKeysDefSnafu {
378            msg: "not allowed to inline multiple primary keys in columns options"
379        }
380    );
381
382    let constraints_pk = constraints
383        .iter()
384        .filter_map(|constraint| match constraint {
385            TableConstraint::PrimaryKey { columns, .. } => {
386                Some(columns.iter().map(|ident| ident.value.clone()))
387            }
388            _ => None,
389        })
390        .flatten()
391        .collect::<Vec<String>>();
392
393    ensure!(
394        columns_pk.is_empty() || constraints_pk.is_empty(),
395        IllegalPrimaryKeysDefSnafu {
396            msg: "found definitions of primary keys in multiple places"
397        }
398    );
399
400    let mut primary_keys = Vec::with_capacity(columns_pk.len() + constraints_pk.len());
401    primary_keys.extend(columns_pk);
402    primary_keys.extend(constraints_pk);
403    Ok(primary_keys)
404}
405
406pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
407    let time_index = constraints
408        .iter()
409        .filter_map(|constraint| match constraint {
410            TableConstraint::TimeIndex { column, .. } => Some(&column.value),
411            _ => None,
412        })
413        .collect::<Vec<&String>>();
414    ensure!(
415        time_index.len() == 1,
416        InvalidSqlSnafu {
417            err_msg: "must have one and only one TimeIndex columns",
418        }
419    );
420    Ok(time_index.first().unwrap().to_string())
421}
422
423fn columns_to_expr(
424    column_defs: &[SqlColumn],
425    time_index: &str,
426    primary_keys: &[String],
427    timezone: Option<&Timezone>,
428) -> Result<Vec<api::v1::ColumnDef>> {
429    let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
430    column_schemas_to_defs(column_schemas, primary_keys)
431}
432
433fn columns_to_column_schemas(
434    columns: &[SqlColumn],
435    time_index: &str,
436    timezone: Option<&Timezone>,
437) -> Result<Vec<ColumnSchema>> {
438    columns
439        .iter()
440        .map(|c| column_to_schema(c, time_index, timezone).context(ParseSqlSnafu))
441        .collect::<Result<Vec<ColumnSchema>>>()
442}
443
444// TODO(weny): refactor this function to use `try_as_column_def`
445pub fn column_schemas_to_defs(
446    column_schemas: Vec<ColumnSchema>,
447    primary_keys: &[String],
448) -> Result<Vec<api::v1::ColumnDef>> {
449    let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
450        .iter()
451        .map(|c| {
452            ColumnDataTypeWrapper::try_from(c.data_type.clone())
453                .map(|w| w.to_parts())
454                .context(ColumnDataTypeSnafu)
455        })
456        .collect::<Result<Vec<_>>>()?;
457
458    column_schemas
459        .iter()
460        .zip(column_datatypes)
461        .map(|(schema, datatype)| {
462            let semantic_type = if schema.is_time_index() {
463                SemanticType::Timestamp
464            } else if primary_keys.contains(&schema.name) {
465                SemanticType::Tag
466            } else {
467                SemanticType::Field
468            } as i32;
469            let comment = schema
470                .metadata()
471                .get(COMMENT_KEY)
472                .cloned()
473                .unwrap_or_default();
474
475            Ok(api::v1::ColumnDef {
476                name: schema.name.clone(),
477                data_type: datatype.0 as i32,
478                is_nullable: schema.is_nullable(),
479                default_constraint: match schema.default_constraint() {
480                    None => vec![],
481                    Some(v) => {
482                        v.clone()
483                            .try_into()
484                            .context(ConvertColumnDefaultConstraintSnafu {
485                                column_name: &schema.name,
486                            })?
487                    }
488                },
489                semantic_type,
490                comment,
491                datatype_extension: datatype.1,
492                options: options_from_column_schema(schema),
493            })
494        })
495        .collect()
496}
497
498/// Converts a SQL alter table statement into a gRPC alter table expression.
499pub(crate) fn to_alter_table_expr(
500    alter_table: AlterTable,
501    query_ctx: &QueryContextRef,
502) -> Result<AlterTableExpr> {
503    let (catalog_name, schema_name, table_name) =
504        table_idents_to_full_name(alter_table.table_name(), query_ctx)
505            .map_err(BoxedError::new)
506            .context(ExternalSnafu)?;
507
508    let kind = match alter_table.alter_operation {
509        AlterTableOperation::AddConstraint(_) => {
510            return NotSupportedSnafu {
511                feat: "ADD CONSTRAINT",
512            }
513            .fail();
514        }
515        AlterTableOperation::AddColumns { add_columns } => AlterTableKind::AddColumns(AddColumns {
516            add_columns: add_columns
517                .into_iter()
518                .map(|add_column| {
519                    let column_def = sql_column_def_to_grpc_column_def(
520                        &add_column.column_def,
521                        Some(&query_ctx.timezone()),
522                    )
523                    .map_err(BoxedError::new)
524                    .context(ExternalSnafu)?;
525                    if is_interval_type(&column_def.data_type()) {
526                        return NotSupportedSnafu {
527                            feat: "Add column with interval type",
528                        }
529                        .fail();
530                    }
531                    Ok(AddColumn {
532                        column_def: Some(column_def),
533                        location: add_column.location.as_ref().map(From::from),
534                        add_if_not_exists: add_column.add_if_not_exists,
535                    })
536                })
537                .collect::<Result<Vec<AddColumn>>>()?,
538        }),
539        AlterTableOperation::ModifyColumnType {
540            column_name,
541            target_type,
542        } => {
543            let target_type =
544                sql_data_type_to_concrete_data_type(&target_type).context(ParseSqlSnafu)?;
545            let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
546                .map(|w| w.to_parts())
547                .context(ColumnDataTypeSnafu)?;
548            if is_interval_type(&target_type) {
549                return NotSupportedSnafu {
550                    feat: "Modify column type to interval type",
551                }
552                .fail();
553            }
554            AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
555                modify_column_types: vec![ModifyColumnType {
556                    column_name: column_name.value,
557                    target_type: target_type as i32,
558                    target_type_extension,
559                }],
560            })
561        }
562        AlterTableOperation::DropColumn { name } => AlterTableKind::DropColumns(DropColumns {
563            drop_columns: vec![DropColumn {
564                name: name.value.to_string(),
565            }],
566        }),
567        AlterTableOperation::RenameTable { new_table_name } => {
568            AlterTableKind::RenameTable(RenameTable {
569                new_table_name: new_table_name.to_string(),
570            })
571        }
572        AlterTableOperation::SetTableOptions { options } => {
573            AlterTableKind::SetTableOptions(SetTableOptions {
574                table_options: options.into_iter().map(Into::into).collect(),
575            })
576        }
577        AlterTableOperation::UnsetTableOptions { keys } => {
578            AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys })
579        }
580        AlterTableOperation::SetIndex { options } => {
581            let option = match options {
582                sql::statements::alter::SetIndexOperation::Fulltext {
583                    column_name,
584                    options,
585                } => SetIndex {
586                    options: Some(set_index::Options::Fulltext(SetFulltext {
587                        column_name: column_name.value,
588                        enable: options.enable,
589                        analyzer: match options.analyzer {
590                            FulltextAnalyzer::English => Analyzer::English.into(),
591                            FulltextAnalyzer::Chinese => Analyzer::Chinese.into(),
592                        },
593                        case_sensitive: options.case_sensitive,
594                        backend: match options.backend {
595                            FulltextBackend::Bloom => PbFulltextBackend::Bloom.into(),
596                            FulltextBackend::Tantivy => PbFulltextBackend::Tantivy.into(),
597                        },
598                        granularity: options.granularity as u64,
599                        false_positive_rate: options.false_positive_rate(),
600                    })),
601                },
602                sql::statements::alter::SetIndexOperation::Inverted { column_name } => SetIndex {
603                    options: Some(set_index::Options::Inverted(SetInverted {
604                        column_name: column_name.value,
605                    })),
606                },
607                sql::statements::alter::SetIndexOperation::Skipping {
608                    column_name,
609                    options,
610                } => SetIndex {
611                    options: Some(set_index::Options::Skipping(SetSkipping {
612                        column_name: column_name.value,
613                        enable: true,
614                        granularity: options.granularity as u64,
615                        false_positive_rate: options.false_positive_rate(),
616                        skipping_index_type: match options.index_type {
617                            SkippingIndexType::BloomFilter => {
618                                PbSkippingIndexType::BloomFilter.into()
619                            }
620                        },
621                    })),
622                },
623            };
624            AlterTableKind::SetIndexes(SetIndexes {
625                set_indexes: vec![option],
626            })
627        }
628        AlterTableOperation::UnsetIndex { options } => {
629            let option = match options {
630                sql::statements::alter::UnsetIndexOperation::Fulltext { column_name } => {
631                    UnsetIndex {
632                        options: Some(unset_index::Options::Fulltext(UnsetFulltext {
633                            column_name: column_name.value,
634                        })),
635                    }
636                }
637                sql::statements::alter::UnsetIndexOperation::Inverted { column_name } => {
638                    UnsetIndex {
639                        options: Some(unset_index::Options::Inverted(UnsetInverted {
640                            column_name: column_name.value,
641                        })),
642                    }
643                }
644                sql::statements::alter::UnsetIndexOperation::Skipping { column_name } => {
645                    UnsetIndex {
646                        options: Some(unset_index::Options::Skipping(UnsetSkipping {
647                            column_name: column_name.value,
648                        })),
649                    }
650                }
651            };
652
653            AlterTableKind::UnsetIndexes(UnsetIndexes {
654                unset_indexes: vec![option],
655            })
656        }
657        AlterTableOperation::DropDefaults { columns } => {
658            AlterTableKind::DropDefaults(DropDefaults {
659                drop_defaults: columns
660                    .into_iter()
661                    .map(|col| {
662                        let column_name = col.0.to_string();
663                        Ok(api::v1::DropDefault { column_name })
664                    })
665                    .collect::<Result<Vec<_>>>()?,
666            })
667        }
668        AlterTableOperation::SetDefaults { defaults } => AlterTableKind::SetDefaults(SetDefaults {
669            set_defaults: defaults
670                .into_iter()
671                .map(|col| {
672                    let column_name = col.column_name.to_string();
673                    let default_constraint = serde_json::to_string(&col.default_constraint)
674                        .context(EncodeJsonSnafu)?
675                        .into_bytes();
676                    Ok(api::v1::SetDefault {
677                        column_name,
678                        default_constraint,
679                    })
680                })
681                .collect::<Result<Vec<_>>>()?,
682        }),
683    };
684
685    Ok(AlterTableExpr {
686        catalog_name,
687        schema_name,
688        table_name,
689        kind: Some(kind),
690    })
691}
692
693/// Try to cast the `[AlterDatabase]` statement into gRPC `[AlterDatabaseExpr]`.
694pub fn to_alter_database_expr(
695    alter_database: AlterDatabase,
696    query_ctx: &QueryContextRef,
697) -> Result<AlterDatabaseExpr> {
698    let catalog = query_ctx.current_catalog();
699    let schema = alter_database.database_name;
700
701    let kind = match alter_database.alter_operation {
702        AlterDatabaseOperation::SetDatabaseOption { options } => {
703            let options = options.into_iter().map(Into::into).collect();
704            AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
705                set_database_options: options,
706            })
707        }
708        AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
709            AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys })
710        }
711    };
712
713    Ok(AlterDatabaseExpr {
714        catalog_name: catalog.to_string(),
715        schema_name: schema.to_string(),
716        kind: Some(kind),
717    })
718}
719
720/// Try to cast the `[CreateViewExpr]` statement into gRPC `[CreateViewExpr]`.
721pub fn to_create_view_expr(
722    stmt: CreateView,
723    logical_plan: Vec<u8>,
724    table_names: Vec<TableName>,
725    columns: Vec<String>,
726    plan_columns: Vec<String>,
727    definition: String,
728    query_ctx: QueryContextRef,
729) -> Result<CreateViewExpr> {
730    let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx)
731        .map_err(BoxedError::new)
732        .context(ExternalSnafu)?;
733
734    let expr = CreateViewExpr {
735        catalog_name,
736        schema_name,
737        view_name,
738        logical_plan,
739        create_if_not_exists: stmt.if_not_exists,
740        or_replace: stmt.or_replace,
741        table_names,
742        columns,
743        plan_columns,
744        definition,
745    };
746
747    Ok(expr)
748}
749
750pub fn to_create_flow_task_expr(
751    create_flow: CreateFlow,
752    query_ctx: &QueryContextRef,
753) -> Result<CreateFlowExpr> {
754    // retrieve sink table name
755    let sink_table_ref =
756        object_name_to_table_reference(create_flow.sink_table_name.clone().into(), true)
757            .with_context(|_| ConvertIdentifierSnafu {
758                ident: create_flow.sink_table_name.to_string(),
759            })?;
760    let catalog = sink_table_ref
761        .catalog()
762        .unwrap_or(query_ctx.current_catalog())
763        .to_string();
764    let schema = sink_table_ref
765        .schema()
766        .map(|s| s.to_owned())
767        .unwrap_or(query_ctx.current_schema());
768
769    let sink_table_name = TableName {
770        catalog_name: catalog,
771        schema_name: schema,
772        table_name: sink_table_ref.table().to_string(),
773    };
774
775    let source_table_names = extract_tables_from_query(&create_flow.query)
776        .map(|name| {
777            let reference = object_name_to_table_reference(name.clone().into(), true)
778                .with_context(|_| ConvertIdentifierSnafu {
779                    ident: name.to_string(),
780                })?;
781            let catalog = reference
782                .catalog()
783                .unwrap_or(query_ctx.current_catalog())
784                .to_string();
785            let schema = reference
786                .schema()
787                .map(|s| s.to_string())
788                .unwrap_or(query_ctx.current_schema());
789
790            let table_name = TableName {
791                catalog_name: catalog,
792                schema_name: schema,
793                table_name: reference.table().to_string(),
794            };
795            Ok(table_name)
796        })
797        .collect::<Result<Vec<_>>>()?;
798
799    Ok(CreateFlowExpr {
800        catalog_name: query_ctx.current_catalog().to_string(),
801        flow_name: sanitize_flow_name(create_flow.flow_name)?,
802        source_table_names,
803        sink_table_name: Some(sink_table_name),
804        or_replace: create_flow.or_replace,
805        create_if_not_exists: create_flow.if_not_exists,
806        expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
807        comment: create_flow.comment.unwrap_or_default(),
808        sql: create_flow.query.to_string(),
809        flow_options: HashMap::new(),
810    })
811}
812
813/// sanitize the flow name, remove possible quotes
814fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
815    ensure!(
816        flow_name.0.len() == 1,
817        InvalidFlowNameSnafu {
818            name: flow_name.to_string(),
819        }
820    );
821    // safety: we've checked flow_name.0 has exactly one element.
822    Ok(flow_name.0.swap_remove(0).value)
823}
824
825#[cfg(test)]
826mod tests {
827    use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
828    use datatypes::value::Value;
829    use session::context::{QueryContext, QueryContextBuilder};
830    use sql::dialect::GreptimeDbDialect;
831    use sql::parser::{ParseOptions, ParserContext};
832    use sql::statements::statement::Statement;
833    use store_api::storage::ColumnDefaultConstraint;
834
835    use super::*;
836
837    #[test]
838    fn test_create_flow_tql_expr() {
839        let sql = r#"
840CREATE FLOW calc_reqs SINK TO cnt_reqs AS
841TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#;
842        let stmt =
843            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
844                .unwrap()
845                .pop()
846                .unwrap();
847
848        let Statement::CreateFlow(create_flow) = stmt else {
849            unreachable!()
850        };
851        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
852
853        let to_dot_sep =
854            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
855        assert_eq!("calc_reqs", expr.flow_name);
856        assert_eq!("greptime", expr.catalog_name);
857        assert_eq!(
858            "greptime.public.cnt_reqs",
859            expr.sink_table_name.map(to_dot_sep).unwrap()
860        );
861        assert!(expr.source_table_names.is_empty());
862        assert_eq!(
863            r#"TQL EVAL (0, 15, '5s') count_values("status_code", http_requests)"#,
864            expr.sql
865        );
866    }
867
868    #[test]
869    fn test_create_flow_expr() {
870        let sql = r"
871CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
872SELECT
873    DISTINCT number as dis
874FROM
875    distinct_basic;";
876        let stmt =
877            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
878                .unwrap()
879                .pop()
880                .unwrap();
881
882        let Statement::CreateFlow(create_flow) = stmt else {
883            unreachable!()
884        };
885        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
886
887        let to_dot_sep =
888            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
889        assert_eq!("test_distinct_basic", expr.flow_name);
890        assert_eq!("greptime", expr.catalog_name);
891        assert_eq!(
892            "greptime.public.out_distinct_basic",
893            expr.sink_table_name.map(to_dot_sep).unwrap()
894        );
895        assert_eq!(1, expr.source_table_names.len());
896        assert_eq!(
897            "greptime.public.distinct_basic",
898            to_dot_sep(expr.source_table_names[0].clone())
899        );
900        assert_eq!(
901            r"SELECT
902    DISTINCT number as dis
903FROM
904    distinct_basic",
905            expr.sql
906        );
907
908        let sql = r"
909CREATE FLOW `task_2`
910SINK TO schema_1.table_1
911AS
912SELECT max(c1), min(c2) FROM schema_2.table_2;";
913        let stmt =
914            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
915                .unwrap()
916                .pop()
917                .unwrap();
918
919        let Statement::CreateFlow(create_flow) = stmt else {
920            unreachable!()
921        };
922        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
923
924        let to_dot_sep =
925            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
926        assert_eq!("task_2", expr.flow_name);
927        assert_eq!("greptime", expr.catalog_name);
928        assert_eq!(
929            "greptime.schema_1.table_1",
930            expr.sink_table_name.map(to_dot_sep).unwrap()
931        );
932        assert_eq!(1, expr.source_table_names.len());
933        assert_eq!(
934            "greptime.schema_2.table_2",
935            to_dot_sep(expr.source_table_names[0].clone())
936        );
937        assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
938
939        let sql = r"
940CREATE FLOW abc.`task_2`
941SINK TO schema_1.table_1
942AS
943SELECT max(c1), min(c2) FROM schema_2.table_2;";
944        let stmt =
945            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
946                .unwrap()
947                .pop()
948                .unwrap();
949
950        let Statement::CreateFlow(create_flow) = stmt else {
951            unreachable!()
952        };
953        let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
954
955        assert!(res.is_err());
956        assert!(res
957            .unwrap_err()
958            .to_string()
959            .contains("Invalid flow name: abc.`task_2`"));
960    }
961
962    #[test]
963    fn test_create_to_expr() {
964        let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(ttl='3days', write_buffer_size='1024KB');";
965        let stmt =
966            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
967                .unwrap()
968                .pop()
969                .unwrap();
970
971        let Statement::CreateTable(create_table) = stmt else {
972            unreachable!()
973        };
974        let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
975        assert_eq!("3days", expr.table_options.get("ttl").unwrap());
976        assert_eq!(
977            "1.0MiB",
978            expr.table_options.get("write_buffer_size").unwrap()
979        );
980    }
981
982    #[test]
983    fn test_invalid_create_to_expr() {
984        let cases = [
985            // duplicate column declaration
986            "CREATE TABLE monitor (host STRING primary key, ts TIMESTAMP TIME INDEX, some_column text, some_column string);",
987            // duplicate primary key
988            "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, some_column STRING, PRIMARY KEY (some_column, host, some_column));",
989            // time index is primary key
990            "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, PRIMARY KEY (host, ts));"
991        ];
992
993        for sql in cases {
994            let stmt = ParserContext::create_with_dialect(
995                sql,
996                &GreptimeDbDialect {},
997                ParseOptions::default(),
998            )
999            .unwrap()
1000            .pop()
1001            .unwrap();
1002            let Statement::CreateTable(create_table) = stmt else {
1003                unreachable!()
1004            };
1005            create_to_expr(&create_table, &QueryContext::arc()).unwrap_err();
1006        }
1007    }
1008
1009    #[test]
1010    fn test_create_to_expr_with_default_timestamp_value() {
1011        let sql = "CREATE TABLE monitor (v double,ts TIMESTAMP default '2024-01-30T00:01:01',TIME INDEX (ts)) engine=mito;";
1012        let stmt =
1013            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1014                .unwrap()
1015                .pop()
1016                .unwrap();
1017
1018        let Statement::CreateTable(create_table) = stmt else {
1019            unreachable!()
1020        };
1021
1022        // query context with system timezone UTC.
1023        let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1024        let ts_column = &expr.column_defs[1];
1025        let constraint = assert_ts_column(ts_column);
1026        assert!(
1027            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1028                         if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1029        );
1030
1031        // query context with timezone `+08:00`
1032        let ctx = QueryContextBuilder::default()
1033            .timezone(Timezone::from_tz_string("+08:00").unwrap())
1034            .build()
1035            .into();
1036        let expr = create_to_expr(&create_table, &ctx).unwrap();
1037        let ts_column = &expr.column_defs[1];
1038        let constraint = assert_ts_column(ts_column);
1039        assert!(
1040            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1041                         if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1042        );
1043    }
1044
1045    fn assert_ts_column(ts_column: &api::v1::ColumnDef) -> ColumnDefaultConstraint {
1046        assert_eq!("ts", ts_column.name);
1047        assert_eq!(
1048            ColumnDataType::TimestampMillisecond as i32,
1049            ts_column.data_type
1050        );
1051        assert!(!ts_column.default_constraint.is_empty());
1052
1053        ColumnDefaultConstraint::try_from(&ts_column.default_constraint[..]).unwrap()
1054    }
1055
1056    #[test]
1057    fn test_to_alter_expr() {
1058        let sql = "ALTER DATABASE greptime SET key1='value1', key2='value2';";
1059        let stmt =
1060            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1061                .unwrap()
1062                .pop()
1063                .unwrap();
1064
1065        let Statement::AlterDatabase(alter_database) = stmt else {
1066            unreachable!()
1067        };
1068
1069        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1070        let kind = expr.kind.unwrap();
1071
1072        let AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
1073            set_database_options,
1074        }) = kind
1075        else {
1076            unreachable!()
1077        };
1078
1079        assert_eq!(2, set_database_options.len());
1080        assert_eq!("key1", set_database_options[0].key);
1081        assert_eq!("value1", set_database_options[0].value);
1082        assert_eq!("key2", set_database_options[1].key);
1083        assert_eq!("value2", set_database_options[1].value);
1084
1085        let sql = "ALTER DATABASE greptime UNSET key1, key2;";
1086        let stmt =
1087            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1088                .unwrap()
1089                .pop()
1090                .unwrap();
1091
1092        let Statement::AlterDatabase(alter_database) = stmt else {
1093            unreachable!()
1094        };
1095
1096        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1097        let kind = expr.kind.unwrap();
1098
1099        let AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys }) = kind else {
1100            unreachable!()
1101        };
1102
1103        assert_eq!(2, keys.len());
1104        assert!(keys.contains(&"key1".to_string()));
1105        assert!(keys.contains(&"key2".to_string()));
1106
1107        let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
1108        let stmt =
1109            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1110                .unwrap()
1111                .pop()
1112                .unwrap();
1113
1114        let Statement::AlterTable(alter_table) = stmt else {
1115            unreachable!()
1116        };
1117
1118        // query context with system timezone UTC.
1119        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1120        let kind = expr.kind.unwrap();
1121
1122        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1123            unreachable!()
1124        };
1125
1126        assert_eq!(1, add_columns.len());
1127        let ts_column = add_columns[0].column_def.clone().unwrap();
1128        let constraint = assert_ts_column(&ts_column);
1129        assert!(
1130            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1131                         if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1132        );
1133
1134        //
1135        // query context with timezone `+08:00`
1136        let ctx = QueryContextBuilder::default()
1137            .timezone(Timezone::from_tz_string("+08:00").unwrap())
1138            .build()
1139            .into();
1140        let expr = to_alter_table_expr(alter_table, &ctx).unwrap();
1141        let kind = expr.kind.unwrap();
1142
1143        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1144            unreachable!()
1145        };
1146
1147        assert_eq!(1, add_columns.len());
1148        let ts_column = add_columns[0].column_def.clone().unwrap();
1149        let constraint = assert_ts_column(&ts_column);
1150        assert!(
1151            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1152                         if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1153        );
1154    }
1155
1156    #[test]
1157    fn test_to_alter_modify_column_type_expr() {
1158        let sql = "ALTER TABLE monitor MODIFY COLUMN mem_usage STRING;";
1159        let stmt =
1160            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1161                .unwrap()
1162                .pop()
1163                .unwrap();
1164
1165        let Statement::AlterTable(alter_table) = stmt else {
1166            unreachable!()
1167        };
1168
1169        // query context with system timezone UTC.
1170        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1171        let kind = expr.kind.unwrap();
1172
1173        let AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
1174            modify_column_types,
1175        }) = kind
1176        else {
1177            unreachable!()
1178        };
1179
1180        assert_eq!(1, modify_column_types.len());
1181        let modify_column_type = &modify_column_types[0];
1182
1183        assert_eq!("mem_usage", modify_column_type.column_name);
1184        assert_eq!(
1185            ColumnDataType::String as i32,
1186            modify_column_type.target_type
1187        );
1188        assert!(modify_column_type.target_type_extension.is_none());
1189    }
1190
1191    fn new_test_table_names() -> Vec<TableName> {
1192        vec![
1193            TableName {
1194                catalog_name: "greptime".to_string(),
1195                schema_name: "public".to_string(),
1196                table_name: "a_table".to_string(),
1197            },
1198            TableName {
1199                catalog_name: "greptime".to_string(),
1200                schema_name: "public".to_string(),
1201                table_name: "b_table".to_string(),
1202            },
1203        ]
1204    }
1205
1206    #[test]
1207    fn test_to_create_view_expr() {
1208        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
1209        let stmt =
1210            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1211                .unwrap()
1212                .pop()
1213                .unwrap();
1214
1215        let Statement::CreateView(stmt) = stmt else {
1216            unreachable!()
1217        };
1218
1219        let logical_plan = vec![1, 2, 3];
1220        let table_names = new_test_table_names();
1221        let columns = vec!["a".to_string()];
1222        let plan_columns = vec!["number".to_string()];
1223
1224        let expr = to_create_view_expr(
1225            stmt,
1226            logical_plan.clone(),
1227            table_names.clone(),
1228            columns.clone(),
1229            plan_columns.clone(),
1230            sql.to_string(),
1231            QueryContext::arc(),
1232        )
1233        .unwrap();
1234
1235        assert_eq!("greptime", expr.catalog_name);
1236        assert_eq!("public", expr.schema_name);
1237        assert_eq!("test", expr.view_name);
1238        assert!(!expr.create_if_not_exists);
1239        assert!(!expr.or_replace);
1240        assert_eq!(logical_plan, expr.logical_plan);
1241        assert_eq!(table_names, expr.table_names);
1242        assert_eq!(sql, expr.definition);
1243        assert_eq!(columns, expr.columns);
1244        assert_eq!(plan_columns, expr.plan_columns);
1245    }
1246
1247    #[test]
1248    fn test_to_create_view_expr_complex() {
1249        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test.test_view AS SELECT * FROM NUMBERS";
1250        let stmt =
1251            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1252                .unwrap()
1253                .pop()
1254                .unwrap();
1255
1256        let Statement::CreateView(stmt) = stmt else {
1257            unreachable!()
1258        };
1259
1260        let logical_plan = vec![1, 2, 3];
1261        let table_names = new_test_table_names();
1262        let columns = vec!["a".to_string()];
1263        let plan_columns = vec!["number".to_string()];
1264
1265        let expr = to_create_view_expr(
1266            stmt,
1267            logical_plan.clone(),
1268            table_names.clone(),
1269            columns.clone(),
1270            plan_columns.clone(),
1271            sql.to_string(),
1272            QueryContext::arc(),
1273        )
1274        .unwrap();
1275
1276        assert_eq!("greptime", expr.catalog_name);
1277        assert_eq!("test", expr.schema_name);
1278        assert_eq!("test_view", expr.view_name);
1279        assert!(expr.create_if_not_exists);
1280        assert!(expr.or_replace);
1281        assert_eq!(logical_plan, expr.logical_plan);
1282        assert_eq!(table_names, expr.table_names);
1283        assert_eq!(sql, expr.definition);
1284        assert_eq!(columns, expr.columns);
1285        assert_eq!(plan_columns, expr.plan_columns);
1286    }
1287}