operator/
expr_helper.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use api::helper::ColumnDataTypeWrapper;
18use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
19use api::v1::alter_table_expr::Kind as AlterTableKind;
20use api::v1::column_def::options_from_column_schema;
21use api::v1::{
22    set_index, unset_index, AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer,
23    ColumnDataType, ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
24    DropColumn, DropColumns, ExpireAfter, FulltextBackend as PbFulltextBackend, ModifyColumnType,
25    ModifyColumnTypes, RenameTable, SemanticType, SetDatabaseOptions, SetFulltext, SetIndex,
26    SetInverted, SetSkipping, SetTableOptions, SkippingIndexType as PbSkippingIndexType, TableName,
27    UnsetDatabaseOptions, UnsetFulltext, UnsetIndex, UnsetInverted, UnsetSkipping,
28    UnsetTableOptions,
29};
30use common_error::ext::BoxedError;
31use common_grpc_expr::util::ColumnExpr;
32use common_time::Timezone;
33use datafusion::sql::planner::object_name_to_table_reference;
34use datatypes::schema::{
35    ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SkippingIndexType, COMMENT_KEY,
36};
37use file_engine::FileOptions;
38use query::sql::{
39    check_file_to_table_schema_compatibility, file_column_schemas_to_table,
40    infer_file_table_schema, prepare_file_table_files,
41};
42use session::context::QueryContextRef;
43use session::table_name::table_idents_to_full_name;
44use snafu::{ensure, OptionExt, ResultExt};
45use sql::ast::{ColumnOption, ObjectName};
46use sql::statements::alter::{
47    AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
48};
49use sql::statements::create::{
50    Column as SqlColumn, CreateExternalTable, CreateFlow, CreateTable, CreateView, TableConstraint,
51};
52use sql::statements::{
53    column_to_schema, sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type,
54};
55use sql::util::extract_tables_from_query;
56use table::requests::{TableOptions, FILE_TABLE_META_KEY};
57use table::table_reference::TableReference;
58
59use crate::error::{
60    BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
61    ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, FindNewColumnsOnInsertionSnafu,
62    IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidFlowNameSnafu, InvalidSqlSnafu,
63    NotSupportedSnafu, ParseSqlSnafu, PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu,
64    UnrecognizedTableOptionSnafu,
65};
66
67pub fn create_table_expr_by_column_schemas(
68    table_name: &TableReference<'_>,
69    column_schemas: &[api::v1::ColumnSchema],
70    engine: &str,
71    desc: Option<&str>,
72) -> Result<CreateTableExpr> {
73    let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
74    let expr = common_grpc_expr::util::build_create_table_expr(
75        None,
76        table_name,
77        column_exprs,
78        engine,
79        desc.unwrap_or("Created on insertion"),
80    )
81    .context(BuildCreateExprOnInsertionSnafu)?;
82
83    validate_create_expr(&expr)?;
84    Ok(expr)
85}
86
87pub(crate) fn extract_add_columns_expr(
88    schema: &Schema,
89    column_exprs: Vec<ColumnExpr>,
90) -> Result<Option<AddColumns>> {
91    let add_columns = common_grpc_expr::util::extract_new_columns(schema, column_exprs)
92        .context(FindNewColumnsOnInsertionSnafu)?;
93    if let Some(add_columns) = &add_columns {
94        validate_add_columns_expr(add_columns)?;
95    }
96    Ok(add_columns)
97}
98
99//   cpu float64,
100//   memory float64,
101//   TIME INDEX (ts),
102//   PRIMARY KEY(host)
103// ) WITH (location='/var/data/city.csv', format='csv');
104// ```
105// The user needs to specify the TIME INDEX column. If there is no suitable
106// column in the file to use as TIME INDEX, an additional placeholder column
107// needs to be created as the TIME INDEX, and a `DEFAULT <value>` constraint
108// should be added.
109//
110//
111// When the `CREATE EXTERNAL TABLE` statement is in inferred form, like
112// ```sql
113// CREATE EXTERNAL TABLE IF NOT EXISTS city WITH (location='/var/data/city.csv',format='csv');
114// ```
115// 1. If the TIME INDEX column can be inferred from metadata, use that column
116//    as the TIME INDEX. Otherwise,
117// 2. If a column named `greptime_timestamp` exists (with the requirement that
118//    the column is with type TIMESTAMP, otherwise an error is thrown), use
119//    that column as the TIME INDEX. Otherwise,
120// 3. Automatically create the `greptime_timestamp` column and add a `DEFAULT 0`
121//    constraint.
122pub(crate) async fn create_external_expr(
123    create: CreateExternalTable,
124    query_ctx: &QueryContextRef,
125) -> Result<CreateTableExpr> {
126    let (catalog_name, schema_name, table_name) =
127        table_idents_to_full_name(&create.name, query_ctx)
128            .map_err(BoxedError::new)
129            .context(ExternalSnafu)?;
130
131    let mut table_options = create.options.into_map();
132
133    let (object_store, files) = prepare_file_table_files(&table_options)
134        .await
135        .context(PrepareFileTableSnafu)?;
136
137    let file_column_schemas = infer_file_table_schema(&object_store, &files, &table_options)
138        .await
139        .context(InferFileTableSchemaSnafu)?
140        .column_schemas;
141
142    let (time_index, primary_keys, table_column_schemas) = if !create.columns.is_empty() {
143        // expanded form
144        let time_index = find_time_index(&create.constraints)?;
145        let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
146        let column_schemas =
147            columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
148        (time_index, primary_keys, column_schemas)
149    } else {
150        // inferred form
151        let (column_schemas, time_index) = file_column_schemas_to_table(&file_column_schemas);
152        let primary_keys = vec![];
153        (time_index, primary_keys, column_schemas)
154    };
155
156    check_file_to_table_schema_compatibility(&file_column_schemas, &table_column_schemas)
157        .context(SchemaIncompatibleSnafu)?;
158
159    let meta = FileOptions {
160        files,
161        file_column_schemas,
162    };
163    table_options.insert(
164        FILE_TABLE_META_KEY.to_string(),
165        serde_json::to_string(&meta).context(EncodeJsonSnafu)?,
166    );
167
168    let column_defs = column_schemas_to_defs(table_column_schemas, &primary_keys)?;
169    let expr = CreateTableExpr {
170        catalog_name,
171        schema_name,
172        table_name,
173        desc: String::default(),
174        column_defs,
175        time_index,
176        primary_keys,
177        create_if_not_exists: create.if_not_exists,
178        table_options,
179        table_id: None,
180        engine: create.engine.to_string(),
181    };
182
183    Ok(expr)
184}
185
186/// Convert `CreateTable` statement to [`CreateTableExpr`] gRPC request.
187pub fn create_to_expr(
188    create: &CreateTable,
189    query_ctx: &QueryContextRef,
190) -> Result<CreateTableExpr> {
191    let (catalog_name, schema_name, table_name) =
192        table_idents_to_full_name(&create.name, query_ctx)
193            .map_err(BoxedError::new)
194            .context(ExternalSnafu)?;
195
196    let time_index = find_time_index(&create.constraints)?;
197    let table_options = HashMap::from(
198        &TableOptions::try_from_iter(create.options.to_str_map())
199            .context(UnrecognizedTableOptionSnafu)?,
200    );
201
202    let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
203
204    let expr = CreateTableExpr {
205        catalog_name,
206        schema_name,
207        table_name,
208        desc: String::default(),
209        column_defs: columns_to_expr(
210            &create.columns,
211            &time_index,
212            &primary_keys,
213            Some(&query_ctx.timezone()),
214        )?,
215        time_index,
216        primary_keys,
217        create_if_not_exists: create.if_not_exists,
218        table_options,
219        table_id: None,
220        engine: create.engine.to_string(),
221    };
222
223    validate_create_expr(&expr)?;
224    Ok(expr)
225}
226
227/// Validate the [`CreateTableExpr`] request.
228pub fn validate_create_expr(create: &CreateTableExpr) -> Result<()> {
229    // construct column list
230    let mut column_to_indices = HashMap::with_capacity(create.column_defs.len());
231    for (idx, column) in create.column_defs.iter().enumerate() {
232        if let Some(indices) = column_to_indices.get(&column.name) {
233            return InvalidSqlSnafu {
234                err_msg: format!(
235                    "column name `{}` is duplicated at index {} and {}",
236                    column.name, indices, idx
237                ),
238            }
239            .fail();
240        }
241        column_to_indices.insert(&column.name, idx);
242    }
243
244    // verify time_index exists
245    let _ = column_to_indices
246        .get(&create.time_index)
247        .with_context(|| InvalidSqlSnafu {
248            err_msg: format!(
249                "column name `{}` is not found in column list",
250                create.time_index
251            ),
252        })?;
253
254    // verify primary_key exists
255    for pk in &create.primary_keys {
256        let _ = column_to_indices
257            .get(&pk)
258            .with_context(|| InvalidSqlSnafu {
259                err_msg: format!("column name `{}` is not found in column list", pk),
260            })?;
261    }
262
263    // construct primary_key set
264    let mut pk_set = HashSet::new();
265    for pk in &create.primary_keys {
266        if !pk_set.insert(pk) {
267            return InvalidSqlSnafu {
268                err_msg: format!("column name `{}` is duplicated in primary keys", pk),
269            }
270            .fail();
271        }
272    }
273
274    // verify time index is not primary key
275    if pk_set.contains(&create.time_index) {
276        return InvalidSqlSnafu {
277            err_msg: format!(
278                "column name `{}` is both primary key and time index",
279                create.time_index
280            ),
281        }
282        .fail();
283    }
284
285    for column in &create.column_defs {
286        // verify do not contain interval type column issue #3235
287        if is_interval_type(&column.data_type()) {
288            return InvalidSqlSnafu {
289                err_msg: format!(
290                    "column name `{}` is interval type, which is not supported",
291                    column.name
292                ),
293            }
294            .fail();
295        }
296        // verify do not contain datetime type column issue #5489
297        if is_date_time_type(&column.data_type()) {
298            return InvalidSqlSnafu {
299                err_msg: format!(
300                    "column name `{}` is datetime type, which is not supported, please use `timestamp` type instead",
301                    column.name
302                ),
303            }
304            .fail();
305        }
306    }
307    Ok(())
308}
309
310fn validate_add_columns_expr(add_columns: &AddColumns) -> Result<()> {
311    for add_column in &add_columns.add_columns {
312        let Some(column_def) = &add_column.column_def else {
313            continue;
314        };
315        if is_date_time_type(&column_def.data_type()) {
316            return InvalidSqlSnafu {
317                    err_msg: format!("column name `{}` is datetime type, which is not supported, please use `timestamp` type instead", column_def.name),
318                }
319                .fail();
320        }
321        if is_interval_type(&column_def.data_type()) {
322            return InvalidSqlSnafu {
323                err_msg: format!(
324                    "column name `{}` is interval type, which is not supported",
325                    column_def.name
326                ),
327            }
328            .fail();
329        }
330    }
331    Ok(())
332}
333
334fn is_date_time_type(data_type: &ColumnDataType) -> bool {
335    matches!(data_type, ColumnDataType::Datetime)
336}
337
338fn is_interval_type(data_type: &ColumnDataType) -> bool {
339    matches!(
340        data_type,
341        ColumnDataType::IntervalYearMonth
342            | ColumnDataType::IntervalDayTime
343            | ColumnDataType::IntervalMonthDayNano
344    )
345}
346
347fn find_primary_keys(
348    columns: &[SqlColumn],
349    constraints: &[TableConstraint],
350) -> Result<Vec<String>> {
351    let columns_pk = columns
352        .iter()
353        .filter_map(|x| {
354            if x.options().iter().any(|o| {
355                matches!(
356                    o.option,
357                    ColumnOption::Unique {
358                        is_primary: true,
359                        ..
360                    }
361                )
362            }) {
363                Some(x.name().value.clone())
364            } else {
365                None
366            }
367        })
368        .collect::<Vec<String>>();
369
370    ensure!(
371        columns_pk.len() <= 1,
372        IllegalPrimaryKeysDefSnafu {
373            msg: "not allowed to inline multiple primary keys in columns options"
374        }
375    );
376
377    let constraints_pk = constraints
378        .iter()
379        .filter_map(|constraint| match constraint {
380            TableConstraint::PrimaryKey { columns, .. } => {
381                Some(columns.iter().map(|ident| ident.value.clone()))
382            }
383            _ => None,
384        })
385        .flatten()
386        .collect::<Vec<String>>();
387
388    ensure!(
389        columns_pk.is_empty() || constraints_pk.is_empty(),
390        IllegalPrimaryKeysDefSnafu {
391            msg: "found definitions of primary keys in multiple places"
392        }
393    );
394
395    let mut primary_keys = Vec::with_capacity(columns_pk.len() + constraints_pk.len());
396    primary_keys.extend(columns_pk);
397    primary_keys.extend(constraints_pk);
398    Ok(primary_keys)
399}
400
401pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
402    let time_index = constraints
403        .iter()
404        .filter_map(|constraint| match constraint {
405            TableConstraint::TimeIndex { column, .. } => Some(&column.value),
406            _ => None,
407        })
408        .collect::<Vec<&String>>();
409    ensure!(
410        time_index.len() == 1,
411        InvalidSqlSnafu {
412            err_msg: "must have one and only one TimeIndex columns",
413        }
414    );
415    Ok(time_index.first().unwrap().to_string())
416}
417
418fn columns_to_expr(
419    column_defs: &[SqlColumn],
420    time_index: &str,
421    primary_keys: &[String],
422    timezone: Option<&Timezone>,
423) -> Result<Vec<api::v1::ColumnDef>> {
424    let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
425    column_schemas_to_defs(column_schemas, primary_keys)
426}
427
428fn columns_to_column_schemas(
429    columns: &[SqlColumn],
430    time_index: &str,
431    timezone: Option<&Timezone>,
432) -> Result<Vec<ColumnSchema>> {
433    columns
434        .iter()
435        .map(|c| column_to_schema(c, time_index, timezone).context(ParseSqlSnafu))
436        .collect::<Result<Vec<ColumnSchema>>>()
437}
438
439pub fn column_schemas_to_defs(
440    column_schemas: Vec<ColumnSchema>,
441    primary_keys: &[String],
442) -> Result<Vec<api::v1::ColumnDef>> {
443    let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
444        .iter()
445        .map(|c| {
446            ColumnDataTypeWrapper::try_from(c.data_type.clone())
447                .map(|w| w.to_parts())
448                .context(ColumnDataTypeSnafu)
449        })
450        .collect::<Result<Vec<_>>>()?;
451
452    column_schemas
453        .iter()
454        .zip(column_datatypes)
455        .map(|(schema, datatype)| {
456            let semantic_type = if schema.is_time_index() {
457                SemanticType::Timestamp
458            } else if primary_keys.contains(&schema.name) {
459                SemanticType::Tag
460            } else {
461                SemanticType::Field
462            } as i32;
463            let comment = schema
464                .metadata()
465                .get(COMMENT_KEY)
466                .cloned()
467                .unwrap_or_default();
468
469            Ok(api::v1::ColumnDef {
470                name: schema.name.clone(),
471                data_type: datatype.0 as i32,
472                is_nullable: schema.is_nullable(),
473                default_constraint: match schema.default_constraint() {
474                    None => vec![],
475                    Some(v) => {
476                        v.clone()
477                            .try_into()
478                            .context(ConvertColumnDefaultConstraintSnafu {
479                                column_name: &schema.name,
480                            })?
481                    }
482                },
483                semantic_type,
484                comment,
485                datatype_extension: datatype.1,
486                options: options_from_column_schema(schema),
487            })
488        })
489        .collect()
490}
491
492/// Converts a SQL alter table statement into a gRPC alter table expression.
493pub(crate) fn to_alter_table_expr(
494    alter_table: AlterTable,
495    query_ctx: &QueryContextRef,
496) -> Result<AlterTableExpr> {
497    let (catalog_name, schema_name, table_name) =
498        table_idents_to_full_name(alter_table.table_name(), query_ctx)
499            .map_err(BoxedError::new)
500            .context(ExternalSnafu)?;
501
502    let kind = match alter_table.alter_operation {
503        AlterTableOperation::AddConstraint(_) => {
504            return NotSupportedSnafu {
505                feat: "ADD CONSTRAINT",
506            }
507            .fail();
508        }
509        AlterTableOperation::AddColumns { add_columns } => AlterTableKind::AddColumns(AddColumns {
510            add_columns: add_columns
511                .into_iter()
512                .map(|add_column| {
513                    let column_def = sql_column_def_to_grpc_column_def(
514                        &add_column.column_def,
515                        Some(&query_ctx.timezone()),
516                    )
517                    .map_err(BoxedError::new)
518                    .context(ExternalSnafu)?;
519                    if is_interval_type(&column_def.data_type()) {
520                        return NotSupportedSnafu {
521                            feat: "Add column with interval type",
522                        }
523                        .fail();
524                    }
525                    Ok(AddColumn {
526                        column_def: Some(column_def),
527                        location: add_column.location.as_ref().map(From::from),
528                        add_if_not_exists: add_column.add_if_not_exists,
529                    })
530                })
531                .collect::<Result<Vec<AddColumn>>>()?,
532        }),
533        AlterTableOperation::ModifyColumnType {
534            column_name,
535            target_type,
536        } => {
537            let target_type =
538                sql_data_type_to_concrete_data_type(&target_type).context(ParseSqlSnafu)?;
539            let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
540                .map(|w| w.to_parts())
541                .context(ColumnDataTypeSnafu)?;
542            if is_interval_type(&target_type) {
543                return NotSupportedSnafu {
544                    feat: "Modify column type to interval type",
545                }
546                .fail();
547            }
548            AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
549                modify_column_types: vec![ModifyColumnType {
550                    column_name: column_name.value,
551                    target_type: target_type as i32,
552                    target_type_extension,
553                }],
554            })
555        }
556        AlterTableOperation::DropColumn { name } => AlterTableKind::DropColumns(DropColumns {
557            drop_columns: vec![DropColumn {
558                name: name.value.to_string(),
559            }],
560        }),
561        AlterTableOperation::RenameTable { new_table_name } => {
562            AlterTableKind::RenameTable(RenameTable {
563                new_table_name: new_table_name.to_string(),
564            })
565        }
566        AlterTableOperation::SetTableOptions { options } => {
567            AlterTableKind::SetTableOptions(SetTableOptions {
568                table_options: options.into_iter().map(Into::into).collect(),
569            })
570        }
571        AlterTableOperation::UnsetTableOptions { keys } => {
572            AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys })
573        }
574        AlterTableOperation::SetIndex { options } => AlterTableKind::SetIndex(match options {
575            sql::statements::alter::SetIndexOperation::Fulltext {
576                column_name,
577                options,
578            } => SetIndex {
579                options: Some(set_index::Options::Fulltext(SetFulltext {
580                    column_name: column_name.value,
581                    enable: options.enable,
582                    analyzer: match options.analyzer {
583                        FulltextAnalyzer::English => Analyzer::English.into(),
584                        FulltextAnalyzer::Chinese => Analyzer::Chinese.into(),
585                    },
586                    case_sensitive: options.case_sensitive,
587                    backend: match options.backend {
588                        FulltextBackend::Bloom => PbFulltextBackend::Bloom.into(),
589                        FulltextBackend::Tantivy => PbFulltextBackend::Tantivy.into(),
590                    },
591                })),
592            },
593            sql::statements::alter::SetIndexOperation::Inverted { column_name } => SetIndex {
594                options: Some(set_index::Options::Inverted(SetInverted {
595                    column_name: column_name.value,
596                })),
597            },
598            sql::statements::alter::SetIndexOperation::Skipping {
599                column_name,
600                options,
601            } => SetIndex {
602                options: Some(set_index::Options::Skipping(SetSkipping {
603                    column_name: column_name.value,
604                    enable: true,
605                    granularity: options.granularity as u64,
606                    skipping_index_type: match options.index_type {
607                        SkippingIndexType::BloomFilter => PbSkippingIndexType::BloomFilter.into(),
608                    },
609                })),
610            },
611        }),
612        AlterTableOperation::UnsetIndex { options } => AlterTableKind::UnsetIndex(match options {
613            sql::statements::alter::UnsetIndexOperation::Fulltext { column_name } => UnsetIndex {
614                options: Some(unset_index::Options::Fulltext(UnsetFulltext {
615                    column_name: column_name.value,
616                })),
617            },
618            sql::statements::alter::UnsetIndexOperation::Inverted { column_name } => UnsetIndex {
619                options: Some(unset_index::Options::Inverted(UnsetInverted {
620                    column_name: column_name.value,
621                })),
622            },
623            sql::statements::alter::UnsetIndexOperation::Skipping { column_name } => UnsetIndex {
624                options: Some(unset_index::Options::Skipping(UnsetSkipping {
625                    column_name: column_name.value,
626                })),
627            },
628        }),
629    };
630
631    Ok(AlterTableExpr {
632        catalog_name,
633        schema_name,
634        table_name,
635        kind: Some(kind),
636    })
637}
638
639/// Try to cast the `[AlterDatabase]` statement into gRPC `[AlterDatabaseExpr]`.
640pub fn to_alter_database_expr(
641    alter_database: AlterDatabase,
642    query_ctx: &QueryContextRef,
643) -> Result<AlterDatabaseExpr> {
644    let catalog = query_ctx.current_catalog();
645    let schema = alter_database.database_name;
646
647    let kind = match alter_database.alter_operation {
648        AlterDatabaseOperation::SetDatabaseOption { options } => {
649            let options = options.into_iter().map(Into::into).collect();
650            AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
651                set_database_options: options,
652            })
653        }
654        AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
655            AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys })
656        }
657    };
658
659    Ok(AlterDatabaseExpr {
660        catalog_name: catalog.to_string(),
661        schema_name: schema.to_string(),
662        kind: Some(kind),
663    })
664}
665
666/// Try to cast the `[CreateViewExpr]` statement into gRPC `[CreateViewExpr]`.
667pub fn to_create_view_expr(
668    stmt: CreateView,
669    logical_plan: Vec<u8>,
670    table_names: Vec<TableName>,
671    columns: Vec<String>,
672    plan_columns: Vec<String>,
673    definition: String,
674    query_ctx: QueryContextRef,
675) -> Result<CreateViewExpr> {
676    let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx)
677        .map_err(BoxedError::new)
678        .context(ExternalSnafu)?;
679
680    let expr = CreateViewExpr {
681        catalog_name,
682        schema_name,
683        view_name,
684        logical_plan,
685        create_if_not_exists: stmt.if_not_exists,
686        or_replace: stmt.or_replace,
687        table_names,
688        columns,
689        plan_columns,
690        definition,
691    };
692
693    Ok(expr)
694}
695
696pub fn to_create_flow_task_expr(
697    create_flow: CreateFlow,
698    query_ctx: &QueryContextRef,
699) -> Result<CreateFlowExpr> {
700    // retrieve sink table name
701    let sink_table_ref =
702        object_name_to_table_reference(create_flow.sink_table_name.clone().into(), true)
703            .with_context(|_| ConvertIdentifierSnafu {
704                ident: create_flow.sink_table_name.to_string(),
705            })?;
706    let catalog = sink_table_ref
707        .catalog()
708        .unwrap_or(query_ctx.current_catalog())
709        .to_string();
710    let schema = sink_table_ref
711        .schema()
712        .map(|s| s.to_owned())
713        .unwrap_or(query_ctx.current_schema());
714
715    let sink_table_name = TableName {
716        catalog_name: catalog,
717        schema_name: schema,
718        table_name: sink_table_ref.table().to_string(),
719    };
720
721    let source_table_names = extract_tables_from_query(&create_flow.query)
722        .map(|name| {
723            let reference = object_name_to_table_reference(name.clone().into(), true)
724                .with_context(|_| ConvertIdentifierSnafu {
725                    ident: name.to_string(),
726                })?;
727            let catalog = reference
728                .catalog()
729                .unwrap_or(query_ctx.current_catalog())
730                .to_string();
731            let schema = reference
732                .schema()
733                .map(|s| s.to_string())
734                .unwrap_or(query_ctx.current_schema());
735
736            let table_name = TableName {
737                catalog_name: catalog,
738                schema_name: schema,
739                table_name: reference.table().to_string(),
740            };
741            Ok(table_name)
742        })
743        .collect::<Result<Vec<_>>>()?;
744
745    Ok(CreateFlowExpr {
746        catalog_name: query_ctx.current_catalog().to_string(),
747        flow_name: sanitize_flow_name(create_flow.flow_name)?,
748        source_table_names,
749        sink_table_name: Some(sink_table_name),
750        or_replace: create_flow.or_replace,
751        create_if_not_exists: create_flow.if_not_exists,
752        expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
753        comment: create_flow.comment.unwrap_or_default(),
754        sql: create_flow.query.to_string(),
755        flow_options: HashMap::new(),
756    })
757}
758
759/// sanitize the flow name, remove possible quotes
760fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
761    ensure!(
762        flow_name.0.len() == 1,
763        InvalidFlowNameSnafu {
764            name: flow_name.to_string(),
765        }
766    );
767    // safety: we've checked flow_name.0 has exactly one element.
768    Ok(flow_name.0.swap_remove(0).value)
769}
770
771#[cfg(test)]
772mod tests {
773    use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
774    use datatypes::value::Value;
775    use session::context::{QueryContext, QueryContextBuilder};
776    use sql::dialect::GreptimeDbDialect;
777    use sql::parser::{ParseOptions, ParserContext};
778    use sql::statements::statement::Statement;
779    use store_api::storage::ColumnDefaultConstraint;
780
781    use super::*;
782
783    #[test]
784    fn test_create_flow_expr() {
785        let sql = r"
786CREATE FLOW `task_2`
787SINK TO schema_1.table_1
788AS
789SELECT max(c1), min(c2) FROM schema_2.table_2;";
790        let stmt =
791            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
792                .unwrap()
793                .pop()
794                .unwrap();
795
796        let Statement::CreateFlow(create_flow) = stmt else {
797            unreachable!()
798        };
799        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
800
801        let to_dot_sep =
802            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
803        assert_eq!("task_2", expr.flow_name);
804        assert_eq!("greptime", expr.catalog_name);
805        assert_eq!(
806            "greptime.schema_1.table_1",
807            expr.sink_table_name.map(to_dot_sep).unwrap()
808        );
809        assert_eq!(1, expr.source_table_names.len());
810        assert_eq!(
811            "greptime.schema_2.table_2",
812            to_dot_sep(expr.source_table_names[0].clone())
813        );
814        assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
815
816        let sql = r"
817CREATE FLOW abc.`task_2`
818SINK TO schema_1.table_1
819AS
820SELECT max(c1), min(c2) FROM schema_2.table_2;";
821        let stmt =
822            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
823                .unwrap()
824                .pop()
825                .unwrap();
826
827        let Statement::CreateFlow(create_flow) = stmt else {
828            unreachable!()
829        };
830        let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
831
832        assert!(res.is_err());
833        assert!(res
834            .unwrap_err()
835            .to_string()
836            .contains("Invalid flow name: abc.`task_2`"));
837    }
838
839    #[test]
840    fn test_create_to_expr() {
841        let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(ttl='3days', write_buffer_size='1024KB');";
842        let stmt =
843            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
844                .unwrap()
845                .pop()
846                .unwrap();
847
848        let Statement::CreateTable(create_table) = stmt else {
849            unreachable!()
850        };
851        let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
852        assert_eq!("3days", expr.table_options.get("ttl").unwrap());
853        assert_eq!(
854            "1.0MiB",
855            expr.table_options.get("write_buffer_size").unwrap()
856        );
857    }
858
859    #[test]
860    fn test_invalid_create_to_expr() {
861        let cases = [
862            // duplicate column declaration
863            "CREATE TABLE monitor (host STRING primary key, ts TIMESTAMP TIME INDEX, some_column text, some_column string);",
864            // duplicate primary key
865            "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, some_column STRING, PRIMARY KEY (some_column, host, some_column));",
866            // time index is primary key
867            "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, PRIMARY KEY (host, ts));"
868        ];
869
870        for sql in cases {
871            let stmt = ParserContext::create_with_dialect(
872                sql,
873                &GreptimeDbDialect {},
874                ParseOptions::default(),
875            )
876            .unwrap()
877            .pop()
878            .unwrap();
879            let Statement::CreateTable(create_table) = stmt else {
880                unreachable!()
881            };
882            create_to_expr(&create_table, &QueryContext::arc()).unwrap_err();
883        }
884    }
885
886    #[test]
887    fn test_create_to_expr_with_default_timestamp_value() {
888        let sql = "CREATE TABLE monitor (v double,ts TIMESTAMP default '2024-01-30T00:01:01',TIME INDEX (ts)) engine=mito;";
889        let stmt =
890            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
891                .unwrap()
892                .pop()
893                .unwrap();
894
895        let Statement::CreateTable(create_table) = stmt else {
896            unreachable!()
897        };
898
899        // query context with system timezone UTC.
900        let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
901        let ts_column = &expr.column_defs[1];
902        let constraint = assert_ts_column(ts_column);
903        assert!(
904            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
905                         if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
906        );
907
908        // query context with timezone `+08:00`
909        let ctx = QueryContextBuilder::default()
910            .timezone(Timezone::from_tz_string("+08:00").unwrap())
911            .build()
912            .into();
913        let expr = create_to_expr(&create_table, &ctx).unwrap();
914        let ts_column = &expr.column_defs[1];
915        let constraint = assert_ts_column(ts_column);
916        assert!(
917            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
918                         if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
919        );
920    }
921
922    fn assert_ts_column(ts_column: &api::v1::ColumnDef) -> ColumnDefaultConstraint {
923        assert_eq!("ts", ts_column.name);
924        assert_eq!(
925            ColumnDataType::TimestampMillisecond as i32,
926            ts_column.data_type
927        );
928        assert!(!ts_column.default_constraint.is_empty());
929
930        ColumnDefaultConstraint::try_from(&ts_column.default_constraint[..]).unwrap()
931    }
932
933    #[test]
934    fn test_to_alter_expr() {
935        let sql = "ALTER DATABASE greptime SET key1='value1', key2='value2';";
936        let stmt =
937            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
938                .unwrap()
939                .pop()
940                .unwrap();
941
942        let Statement::AlterDatabase(alter_database) = stmt else {
943            unreachable!()
944        };
945
946        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
947        let kind = expr.kind.unwrap();
948
949        let AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
950            set_database_options,
951        }) = kind
952        else {
953            unreachable!()
954        };
955
956        assert_eq!(2, set_database_options.len());
957        assert_eq!("key1", set_database_options[0].key);
958        assert_eq!("value1", set_database_options[0].value);
959        assert_eq!("key2", set_database_options[1].key);
960        assert_eq!("value2", set_database_options[1].value);
961
962        let sql = "ALTER DATABASE greptime UNSET key1, key2;";
963        let stmt =
964            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
965                .unwrap()
966                .pop()
967                .unwrap();
968
969        let Statement::AlterDatabase(alter_database) = stmt else {
970            unreachable!()
971        };
972
973        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
974        let kind = expr.kind.unwrap();
975
976        let AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys }) = kind else {
977            unreachable!()
978        };
979
980        assert_eq!(2, keys.len());
981        assert!(keys.contains(&"key1".to_string()));
982        assert!(keys.contains(&"key2".to_string()));
983
984        let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
985        let stmt =
986            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
987                .unwrap()
988                .pop()
989                .unwrap();
990
991        let Statement::AlterTable(alter_table) = stmt else {
992            unreachable!()
993        };
994
995        // query context with system timezone UTC.
996        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
997        let kind = expr.kind.unwrap();
998
999        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1000            unreachable!()
1001        };
1002
1003        assert_eq!(1, add_columns.len());
1004        let ts_column = add_columns[0].column_def.clone().unwrap();
1005        let constraint = assert_ts_column(&ts_column);
1006        assert!(
1007            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1008                         if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1009        );
1010
1011        //
1012        // query context with timezone `+08:00`
1013        let ctx = QueryContextBuilder::default()
1014            .timezone(Timezone::from_tz_string("+08:00").unwrap())
1015            .build()
1016            .into();
1017        let expr = to_alter_table_expr(alter_table, &ctx).unwrap();
1018        let kind = expr.kind.unwrap();
1019
1020        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1021            unreachable!()
1022        };
1023
1024        assert_eq!(1, add_columns.len());
1025        let ts_column = add_columns[0].column_def.clone().unwrap();
1026        let constraint = assert_ts_column(&ts_column);
1027        assert!(
1028            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1029                         if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1030        );
1031    }
1032
1033    #[test]
1034    fn test_to_alter_modify_column_type_expr() {
1035        let sql = "ALTER TABLE monitor MODIFY COLUMN mem_usage STRING;";
1036        let stmt =
1037            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1038                .unwrap()
1039                .pop()
1040                .unwrap();
1041
1042        let Statement::AlterTable(alter_table) = stmt else {
1043            unreachable!()
1044        };
1045
1046        // query context with system timezone UTC.
1047        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1048        let kind = expr.kind.unwrap();
1049
1050        let AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
1051            modify_column_types,
1052        }) = kind
1053        else {
1054            unreachable!()
1055        };
1056
1057        assert_eq!(1, modify_column_types.len());
1058        let modify_column_type = &modify_column_types[0];
1059
1060        assert_eq!("mem_usage", modify_column_type.column_name);
1061        assert_eq!(
1062            ColumnDataType::String as i32,
1063            modify_column_type.target_type
1064        );
1065        assert!(modify_column_type.target_type_extension.is_none());
1066    }
1067
1068    fn new_test_table_names() -> Vec<TableName> {
1069        vec![
1070            TableName {
1071                catalog_name: "greptime".to_string(),
1072                schema_name: "public".to_string(),
1073                table_name: "a_table".to_string(),
1074            },
1075            TableName {
1076                catalog_name: "greptime".to_string(),
1077                schema_name: "public".to_string(),
1078                table_name: "b_table".to_string(),
1079            },
1080        ]
1081    }
1082
1083    #[test]
1084    fn test_to_create_view_expr() {
1085        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
1086        let stmt =
1087            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1088                .unwrap()
1089                .pop()
1090                .unwrap();
1091
1092        let Statement::CreateView(stmt) = stmt else {
1093            unreachable!()
1094        };
1095
1096        let logical_plan = vec![1, 2, 3];
1097        let table_names = new_test_table_names();
1098        let columns = vec!["a".to_string()];
1099        let plan_columns = vec!["number".to_string()];
1100
1101        let expr = to_create_view_expr(
1102            stmt,
1103            logical_plan.clone(),
1104            table_names.clone(),
1105            columns.clone(),
1106            plan_columns.clone(),
1107            sql.to_string(),
1108            QueryContext::arc(),
1109        )
1110        .unwrap();
1111
1112        assert_eq!("greptime", expr.catalog_name);
1113        assert_eq!("public", expr.schema_name);
1114        assert_eq!("test", expr.view_name);
1115        assert!(!expr.create_if_not_exists);
1116        assert!(!expr.or_replace);
1117        assert_eq!(logical_plan, expr.logical_plan);
1118        assert_eq!(table_names, expr.table_names);
1119        assert_eq!(sql, expr.definition);
1120        assert_eq!(columns, expr.columns);
1121        assert_eq!(plan_columns, expr.plan_columns);
1122    }
1123
1124    #[test]
1125    fn test_to_create_view_expr_complex() {
1126        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test.test_view AS SELECT * FROM NUMBERS";
1127        let stmt =
1128            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1129                .unwrap()
1130                .pop()
1131                .unwrap();
1132
1133        let Statement::CreateView(stmt) = stmt else {
1134            unreachable!()
1135        };
1136
1137        let logical_plan = vec![1, 2, 3];
1138        let table_names = new_test_table_names();
1139        let columns = vec!["a".to_string()];
1140        let plan_columns = vec!["number".to_string()];
1141
1142        let expr = to_create_view_expr(
1143            stmt,
1144            logical_plan.clone(),
1145            table_names.clone(),
1146            columns.clone(),
1147            plan_columns.clone(),
1148            sql.to_string(),
1149            QueryContext::arc(),
1150        )
1151        .unwrap();
1152
1153        assert_eq!("greptime", expr.catalog_name);
1154        assert_eq!("test", expr.schema_name);
1155        assert_eq!("test_view", expr.view_name);
1156        assert!(expr.create_if_not_exists);
1157        assert!(expr.or_replace);
1158        assert_eq!(logical_plan, expr.logical_plan);
1159        assert_eq!(table_names, expr.table_names);
1160        assert_eq!(sql, expr.definition);
1161        assert_eq!(columns, expr.columns);
1162        assert_eq!(plan_columns, expr.plan_columns);
1163    }
1164}