operator/
expr_helper.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19
20use api::helper::ColumnDataTypeWrapper;
21use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
22use api::v1::alter_table_expr::Kind as AlterTableKind;
23use api::v1::column_def::{options_from_column_schema, try_as_column_schema};
24use api::v1::{
25    AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer, ColumnDataType,
26    ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn,
27    DropColumns, DropDefaults, ExpireAfter, FulltextBackend as PbFulltextBackend, ModifyColumnType,
28    ModifyColumnTypes, RenameTable, SemanticType, SetDatabaseOptions, SetDefaults, SetFulltext,
29    SetIndex, SetIndexes, SetInverted, SetSkipping, SetTableOptions,
30    SkippingIndexType as PbSkippingIndexType, TableName, UnsetDatabaseOptions, UnsetFulltext,
31    UnsetIndex, UnsetIndexes, UnsetInverted, UnsetSkipping, UnsetTableOptions, set_index,
32    unset_index,
33};
34use common_error::ext::BoxedError;
35use common_grpc_expr::util::ColumnExpr;
36use common_time::Timezone;
37use datafusion::sql::planner::object_name_to_table_reference;
38use datatypes::schema::{
39    COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
40    COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
41    COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
42    COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
43    ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema,
44    SkippingIndexType,
45};
46use file_engine::FileOptions;
47use query::sql::{
48    check_file_to_table_schema_compatibility, file_column_schemas_to_table,
49    infer_file_table_schema, prepare_file_table_files,
50};
51use session::context::QueryContextRef;
52use session::table_name::table_idents_to_full_name;
53use snafu::{OptionExt, ResultExt, ensure};
54use sql::ast::{
55    ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName, ObjectNamePartExt,
56};
57use sql::dialect::GreptimeDbDialect;
58use sql::parser::ParserContext;
59use sql::statements::alter::{
60    AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
61};
62use sql::statements::create::{
63    Column as SqlColumn, ColumnExtensions, CreateExternalTable, CreateFlow, CreateTable,
64    CreateView, TableConstraint,
65};
66use sql::statements::{
67    OptionMap, column_to_schema, concrete_data_type_to_sql_data_type,
68    sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type, value_to_sql_value,
69};
70use sql::util::extract_tables_from_query;
71use table::requests::{FILE_TABLE_META_KEY, TableOptions};
72use table::table_reference::TableReference;
73#[cfg(feature = "enterprise")]
74pub use trigger::to_create_trigger_task_expr;
75
76use crate::error::{
77    BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
78    ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, FindNewColumnsOnInsertionSnafu,
79    IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidColumnDefSnafu,
80    InvalidFlowNameSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, ParseSqlValueSnafu,
81    PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu,
82};
83
84pub fn create_table_expr_by_column_schemas(
85    table_name: &TableReference<'_>,
86    column_schemas: &[api::v1::ColumnSchema],
87    engine: &str,
88    desc: Option<&str>,
89) -> Result<CreateTableExpr> {
90    let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
91    let expr = common_grpc_expr::util::build_create_table_expr(
92        None,
93        table_name,
94        column_exprs,
95        engine,
96        desc.unwrap_or("Created on insertion"),
97    )
98    .context(BuildCreateExprOnInsertionSnafu)?;
99
100    validate_create_expr(&expr)?;
101    Ok(expr)
102}
103
104pub fn extract_add_columns_expr(
105    schema: &Schema,
106    column_exprs: Vec<ColumnExpr>,
107) -> Result<Option<AddColumns>> {
108    let add_columns = common_grpc_expr::util::extract_new_columns(schema, column_exprs)
109        .context(FindNewColumnsOnInsertionSnafu)?;
110    if let Some(add_columns) = &add_columns {
111        validate_add_columns_expr(add_columns)?;
112    }
113    Ok(add_columns)
114}
115
116//   cpu float64,
117//   memory float64,
118//   TIME INDEX (ts),
119//   PRIMARY KEY(host)
120// ) WITH (location='/var/data/city.csv', format='csv');
121// ```
122// The user needs to specify the TIME INDEX column. If there is no suitable
123// column in the file to use as TIME INDEX, an additional placeholder column
124// needs to be created as the TIME INDEX, and a `DEFAULT <value>` constraint
125// should be added.
126//
127//
128// When the `CREATE EXTERNAL TABLE` statement is in inferred form, like
129// ```sql
130// CREATE EXTERNAL TABLE IF NOT EXISTS city WITH (location='/var/data/city.csv',format='csv');
131// ```
132// 1. If the TIME INDEX column can be inferred from metadata, use that column
133//    as the TIME INDEX. Otherwise,
134// 2. If a column named `greptime_timestamp` exists (with the requirement that
135//    the column is with type TIMESTAMP, otherwise an error is thrown), use
136//    that column as the TIME INDEX. Otherwise,
137// 3. Automatically create the `greptime_timestamp` column and add a `DEFAULT 0`
138//    constraint.
139pub(crate) async fn create_external_expr(
140    create: CreateExternalTable,
141    query_ctx: &QueryContextRef,
142) -> Result<CreateTableExpr> {
143    let (catalog_name, schema_name, table_name) =
144        table_idents_to_full_name(&create.name, query_ctx)
145            .map_err(BoxedError::new)
146            .context(ExternalSnafu)?;
147
148    let mut table_options = create.options.into_map();
149
150    let (object_store, files) = prepare_file_table_files(&table_options)
151        .await
152        .context(PrepareFileTableSnafu)?;
153
154    let file_column_schemas = infer_file_table_schema(&object_store, &files, &table_options)
155        .await
156        .context(InferFileTableSchemaSnafu)?
157        .column_schemas;
158
159    let (time_index, primary_keys, table_column_schemas) = if !create.columns.is_empty() {
160        // expanded form
161        let time_index = find_time_index(&create.constraints)?;
162        let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
163        let column_schemas =
164            columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
165        (time_index, primary_keys, column_schemas)
166    } else {
167        // inferred form
168        let (column_schemas, time_index) = file_column_schemas_to_table(&file_column_schemas);
169        let primary_keys = vec![];
170        (time_index, primary_keys, column_schemas)
171    };
172
173    check_file_to_table_schema_compatibility(&file_column_schemas, &table_column_schemas)
174        .context(SchemaIncompatibleSnafu)?;
175
176    let meta = FileOptions {
177        files,
178        file_column_schemas,
179    };
180    table_options.insert(
181        FILE_TABLE_META_KEY.to_string(),
182        serde_json::to_string(&meta).context(EncodeJsonSnafu)?,
183    );
184
185    let column_defs = column_schemas_to_defs(table_column_schemas, &primary_keys)?;
186    let expr = CreateTableExpr {
187        catalog_name,
188        schema_name,
189        table_name,
190        desc: String::default(),
191        column_defs,
192        time_index,
193        primary_keys,
194        create_if_not_exists: create.if_not_exists,
195        table_options,
196        table_id: None,
197        engine: create.engine.to_string(),
198    };
199
200    Ok(expr)
201}
202
203/// Convert `CreateTable` statement to [`CreateTableExpr`] gRPC request.
204pub fn create_to_expr(
205    create: &CreateTable,
206    query_ctx: &QueryContextRef,
207) -> Result<CreateTableExpr> {
208    let (catalog_name, schema_name, table_name) =
209        table_idents_to_full_name(&create.name, query_ctx)
210            .map_err(BoxedError::new)
211            .context(ExternalSnafu)?;
212
213    let time_index = find_time_index(&create.constraints)?;
214    let table_options = HashMap::from(
215        &TableOptions::try_from_iter(create.options.to_str_map())
216            .context(UnrecognizedTableOptionSnafu)?,
217    );
218
219    let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
220
221    let expr = CreateTableExpr {
222        catalog_name,
223        schema_name,
224        table_name,
225        desc: String::default(),
226        column_defs: columns_to_expr(
227            &create.columns,
228            &time_index,
229            &primary_keys,
230            Some(&query_ctx.timezone()),
231        )?,
232        time_index,
233        primary_keys,
234        create_if_not_exists: create.if_not_exists,
235        table_options,
236        table_id: None,
237        engine: create.engine.to_string(),
238    };
239
240    validate_create_expr(&expr)?;
241    Ok(expr)
242}
243
244/// Convert gRPC's [`CreateTableExpr`] back to `CreateTable` statement.
245/// You can use `create_table_expr_by_column_schemas` to create a `CreateTableExpr` from column schemas.
246///
247/// # Parameters
248///
249/// * `expr` - The `CreateTableExpr` to convert
250/// * `quote_style` - Optional quote style for identifiers (defaults to MySQL style ` backtick)
251pub fn expr_to_create(expr: &CreateTableExpr, quote_style: Option<char>) -> Result<CreateTable> {
252    let quote_style = quote_style.unwrap_or('`');
253
254    // Convert table name
255    let table_name = ObjectName(vec![sql::ast::ObjectNamePart::Identifier(
256        sql::ast::Ident::with_quote(quote_style, &expr.table_name),
257    )]);
258
259    // Convert columns
260    let mut columns = Vec::with_capacity(expr.column_defs.len());
261    for column_def in &expr.column_defs {
262        let column_schema = try_as_column_schema(column_def).context(InvalidColumnDefSnafu {
263            column: &column_def.name,
264        })?;
265
266        let mut options = Vec::new();
267
268        // Add NULL/NOT NULL constraint
269        if column_def.is_nullable {
270            options.push(ColumnOptionDef {
271                name: None,
272                option: ColumnOption::Null,
273            });
274        } else {
275            options.push(ColumnOptionDef {
276                name: None,
277                option: ColumnOption::NotNull,
278            });
279        }
280
281        // Add DEFAULT constraint if present
282        if let Some(default_constraint) = column_schema.default_constraint() {
283            let expr = match default_constraint {
284                ColumnDefaultConstraint::Value(v) => {
285                    Expr::Value(value_to_sql_value(v).context(ParseSqlValueSnafu)?.into())
286                }
287                ColumnDefaultConstraint::Function(func_expr) => {
288                    ParserContext::parse_function(func_expr, &GreptimeDbDialect {})
289                        .context(ParseSqlSnafu)?
290                }
291            };
292            options.push(ColumnOptionDef {
293                name: None,
294                option: ColumnOption::Default(expr),
295            });
296        }
297
298        // Add COMMENT if present
299        if !column_def.comment.is_empty() {
300            options.push(ColumnOptionDef {
301                name: None,
302                option: ColumnOption::Comment(column_def.comment.clone()),
303            });
304        }
305
306        // Note: We don't add inline PRIMARY KEY options here,
307        // we'll handle all primary keys as constraints instead for consistency
308
309        // Handle column extensions (fulltext, inverted index, skipping index)
310        let mut extensions = ColumnExtensions::default();
311
312        // Add fulltext index options if present
313        if let Ok(Some(opt)) = column_schema.fulltext_options()
314            && opt.enable
315        {
316            let mut map = HashMap::from([
317                (
318                    COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
319                    opt.analyzer.to_string(),
320                ),
321                (
322                    COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
323                    opt.case_sensitive.to_string(),
324                ),
325                (
326                    COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
327                    opt.backend.to_string(),
328                ),
329            ]);
330            if opt.backend == FulltextBackend::Bloom {
331                map.insert(
332                    COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(),
333                    opt.granularity.to_string(),
334                );
335                map.insert(
336                    COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
337                    opt.false_positive_rate().to_string(),
338                );
339            }
340            extensions.fulltext_index_options = Some(map.into());
341        }
342
343        // Add skipping index options if present
344        if let Ok(Some(opt)) = column_schema.skipping_index_options() {
345            let map = HashMap::from([
346                (
347                    COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
348                    opt.granularity.to_string(),
349                ),
350                (
351                    COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
352                    opt.false_positive_rate().to_string(),
353                ),
354                (
355                    COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
356                    opt.index_type.to_string(),
357                ),
358            ]);
359            extensions.skipping_index_options = Some(map.into());
360        }
361
362        // Add inverted index options if present
363        if column_schema.is_inverted_indexed() {
364            extensions.inverted_index_options = Some(HashMap::new().into());
365        }
366
367        let sql_column = SqlColumn {
368            column_def: ColumnDef {
369                name: Ident::with_quote(quote_style, &column_def.name),
370                data_type: concrete_data_type_to_sql_data_type(&column_schema.data_type)
371                    .context(ParseSqlSnafu)?,
372                options,
373            },
374            extensions,
375        };
376
377        columns.push(sql_column);
378    }
379
380    // Convert constraints
381    let mut constraints = Vec::new();
382
383    // Add TIME INDEX constraint
384    constraints.push(TableConstraint::TimeIndex {
385        column: Ident::with_quote(quote_style, &expr.time_index),
386    });
387
388    // Add PRIMARY KEY constraint (always add as constraint for consistency)
389    if !expr.primary_keys.is_empty() {
390        let primary_key_columns: Vec<Ident> = expr
391            .primary_keys
392            .iter()
393            .map(|pk| Ident::with_quote(quote_style, pk))
394            .collect();
395
396        constraints.push(TableConstraint::PrimaryKey {
397            columns: primary_key_columns,
398        });
399    }
400
401    // Convert table options
402    let mut options = OptionMap::default();
403    for (key, value) in &expr.table_options {
404        options.insert(key.clone(), value.clone());
405    }
406
407    Ok(CreateTable {
408        if_not_exists: expr.create_if_not_exists,
409        table_id: expr.table_id.as_ref().map(|tid| tid.id).unwrap_or(0),
410        name: table_name,
411        columns,
412        engine: expr.engine.clone(),
413        constraints,
414        options,
415        partitions: None,
416    })
417}
418
419/// Validate the [`CreateTableExpr`] request.
420pub fn validate_create_expr(create: &CreateTableExpr) -> Result<()> {
421    // construct column list
422    let mut column_to_indices = HashMap::with_capacity(create.column_defs.len());
423    for (idx, column) in create.column_defs.iter().enumerate() {
424        if let Some(indices) = column_to_indices.get(&column.name) {
425            return InvalidSqlSnafu {
426                err_msg: format!(
427                    "column name `{}` is duplicated at index {} and {}",
428                    column.name, indices, idx
429                ),
430            }
431            .fail();
432        }
433        column_to_indices.insert(&column.name, idx);
434    }
435
436    // verify time_index exists
437    let _ = column_to_indices
438        .get(&create.time_index)
439        .with_context(|| InvalidSqlSnafu {
440            err_msg: format!(
441                "column name `{}` is not found in column list",
442                create.time_index
443            ),
444        })?;
445
446    // verify primary_key exists
447    for pk in &create.primary_keys {
448        let _ = column_to_indices
449            .get(&pk)
450            .with_context(|| InvalidSqlSnafu {
451                err_msg: format!("column name `{}` is not found in column list", pk),
452            })?;
453    }
454
455    // construct primary_key set
456    let mut pk_set = HashSet::new();
457    for pk in &create.primary_keys {
458        if !pk_set.insert(pk) {
459            return InvalidSqlSnafu {
460                err_msg: format!("column name `{}` is duplicated in primary keys", pk),
461            }
462            .fail();
463        }
464    }
465
466    // verify time index is not primary key
467    if pk_set.contains(&create.time_index) {
468        return InvalidSqlSnafu {
469            err_msg: format!(
470                "column name `{}` is both primary key and time index",
471                create.time_index
472            ),
473        }
474        .fail();
475    }
476
477    for column in &create.column_defs {
478        // verify do not contain interval type column issue #3235
479        if is_interval_type(&column.data_type()) {
480            return InvalidSqlSnafu {
481                err_msg: format!(
482                    "column name `{}` is interval type, which is not supported",
483                    column.name
484                ),
485            }
486            .fail();
487        }
488        // verify do not contain datetime type column issue #5489
489        if is_date_time_type(&column.data_type()) {
490            return InvalidSqlSnafu {
491                err_msg: format!(
492                    "column name `{}` is datetime type, which is not supported, please use `timestamp` type instead",
493                    column.name
494                ),
495            }
496            .fail();
497        }
498    }
499    Ok(())
500}
501
502fn validate_add_columns_expr(add_columns: &AddColumns) -> Result<()> {
503    for add_column in &add_columns.add_columns {
504        let Some(column_def) = &add_column.column_def else {
505            continue;
506        };
507        if is_date_time_type(&column_def.data_type()) {
508            return InvalidSqlSnafu {
509                    err_msg: format!("column name `{}` is datetime type, which is not supported, please use `timestamp` type instead", column_def.name),
510                }
511                .fail();
512        }
513        if is_interval_type(&column_def.data_type()) {
514            return InvalidSqlSnafu {
515                err_msg: format!(
516                    "column name `{}` is interval type, which is not supported",
517                    column_def.name
518                ),
519            }
520            .fail();
521        }
522    }
523    Ok(())
524}
525
526fn is_date_time_type(data_type: &ColumnDataType) -> bool {
527    matches!(data_type, ColumnDataType::Datetime)
528}
529
530fn is_interval_type(data_type: &ColumnDataType) -> bool {
531    matches!(
532        data_type,
533        ColumnDataType::IntervalYearMonth
534            | ColumnDataType::IntervalDayTime
535            | ColumnDataType::IntervalMonthDayNano
536    )
537}
538
539fn find_primary_keys(
540    columns: &[SqlColumn],
541    constraints: &[TableConstraint],
542) -> Result<Vec<String>> {
543    let columns_pk = columns
544        .iter()
545        .filter_map(|x| {
546            if x.options().iter().any(|o| {
547                matches!(
548                    o.option,
549                    ColumnOption::Unique {
550                        is_primary: true,
551                        ..
552                    }
553                )
554            }) {
555                Some(x.name().value.clone())
556            } else {
557                None
558            }
559        })
560        .collect::<Vec<String>>();
561
562    ensure!(
563        columns_pk.len() <= 1,
564        IllegalPrimaryKeysDefSnafu {
565            msg: "not allowed to inline multiple primary keys in columns options"
566        }
567    );
568
569    let constraints_pk = constraints
570        .iter()
571        .filter_map(|constraint| match constraint {
572            TableConstraint::PrimaryKey { columns, .. } => {
573                Some(columns.iter().map(|ident| ident.value.clone()))
574            }
575            _ => None,
576        })
577        .flatten()
578        .collect::<Vec<String>>();
579
580    ensure!(
581        columns_pk.is_empty() || constraints_pk.is_empty(),
582        IllegalPrimaryKeysDefSnafu {
583            msg: "found definitions of primary keys in multiple places"
584        }
585    );
586
587    let mut primary_keys = Vec::with_capacity(columns_pk.len() + constraints_pk.len());
588    primary_keys.extend(columns_pk);
589    primary_keys.extend(constraints_pk);
590    Ok(primary_keys)
591}
592
593pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
594    let time_index = constraints
595        .iter()
596        .filter_map(|constraint| match constraint {
597            TableConstraint::TimeIndex { column, .. } => Some(&column.value),
598            _ => None,
599        })
600        .collect::<Vec<&String>>();
601    ensure!(
602        time_index.len() == 1,
603        InvalidSqlSnafu {
604            err_msg: "must have one and only one TimeIndex columns",
605        }
606    );
607    Ok(time_index.first().unwrap().to_string())
608}
609
610fn columns_to_expr(
611    column_defs: &[SqlColumn],
612    time_index: &str,
613    primary_keys: &[String],
614    timezone: Option<&Timezone>,
615) -> Result<Vec<api::v1::ColumnDef>> {
616    let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
617    column_schemas_to_defs(column_schemas, primary_keys)
618}
619
620fn columns_to_column_schemas(
621    columns: &[SqlColumn],
622    time_index: &str,
623    timezone: Option<&Timezone>,
624) -> Result<Vec<ColumnSchema>> {
625    columns
626        .iter()
627        .map(|c| column_to_schema(c, time_index, timezone).context(ParseSqlSnafu))
628        .collect::<Result<Vec<ColumnSchema>>>()
629}
630
631// TODO(weny): refactor this function to use `try_as_column_def`
632pub fn column_schemas_to_defs(
633    column_schemas: Vec<ColumnSchema>,
634    primary_keys: &[String],
635) -> Result<Vec<api::v1::ColumnDef>> {
636    let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
637        .iter()
638        .map(|c| {
639            ColumnDataTypeWrapper::try_from(c.data_type.clone())
640                .map(|w| w.to_parts())
641                .context(ColumnDataTypeSnafu)
642        })
643        .collect::<Result<Vec<_>>>()?;
644
645    column_schemas
646        .iter()
647        .zip(column_datatypes)
648        .map(|(schema, datatype)| {
649            let semantic_type = if schema.is_time_index() {
650                SemanticType::Timestamp
651            } else if primary_keys.contains(&schema.name) {
652                SemanticType::Tag
653            } else {
654                SemanticType::Field
655            } as i32;
656            let comment = schema
657                .metadata()
658                .get(COMMENT_KEY)
659                .cloned()
660                .unwrap_or_default();
661
662            Ok(api::v1::ColumnDef {
663                name: schema.name.clone(),
664                data_type: datatype.0 as i32,
665                is_nullable: schema.is_nullable(),
666                default_constraint: match schema.default_constraint() {
667                    None => vec![],
668                    Some(v) => {
669                        v.clone()
670                            .try_into()
671                            .context(ConvertColumnDefaultConstraintSnafu {
672                                column_name: &schema.name,
673                            })?
674                    }
675                },
676                semantic_type,
677                comment,
678                datatype_extension: datatype.1,
679                options: options_from_column_schema(schema),
680            })
681        })
682        .collect()
683}
684
685/// Converts a SQL alter table statement into a gRPC alter table expression.
686pub(crate) fn to_alter_table_expr(
687    alter_table: AlterTable,
688    query_ctx: &QueryContextRef,
689) -> Result<AlterTableExpr> {
690    let (catalog_name, schema_name, table_name) =
691        table_idents_to_full_name(alter_table.table_name(), query_ctx)
692            .map_err(BoxedError::new)
693            .context(ExternalSnafu)?;
694
695    let kind = match alter_table.alter_operation {
696        AlterTableOperation::AddConstraint(_) => {
697            return NotSupportedSnafu {
698                feat: "ADD CONSTRAINT",
699            }
700            .fail();
701        }
702        AlterTableOperation::AddColumns { add_columns } => AlterTableKind::AddColumns(AddColumns {
703            add_columns: add_columns
704                .into_iter()
705                .map(|add_column| {
706                    let column_def = sql_column_def_to_grpc_column_def(
707                        &add_column.column_def,
708                        Some(&query_ctx.timezone()),
709                    )
710                    .map_err(BoxedError::new)
711                    .context(ExternalSnafu)?;
712                    if is_interval_type(&column_def.data_type()) {
713                        return NotSupportedSnafu {
714                            feat: "Add column with interval type",
715                        }
716                        .fail();
717                    }
718                    Ok(AddColumn {
719                        column_def: Some(column_def),
720                        location: add_column.location.as_ref().map(From::from),
721                        add_if_not_exists: add_column.add_if_not_exists,
722                    })
723                })
724                .collect::<Result<Vec<AddColumn>>>()?,
725        }),
726        AlterTableOperation::ModifyColumnType {
727            column_name,
728            target_type,
729        } => {
730            let target_type =
731                sql_data_type_to_concrete_data_type(&target_type).context(ParseSqlSnafu)?;
732            let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
733                .map(|w| w.to_parts())
734                .context(ColumnDataTypeSnafu)?;
735            if is_interval_type(&target_type) {
736                return NotSupportedSnafu {
737                    feat: "Modify column type to interval type",
738                }
739                .fail();
740            }
741            AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
742                modify_column_types: vec![ModifyColumnType {
743                    column_name: column_name.value,
744                    target_type: target_type as i32,
745                    target_type_extension,
746                }],
747            })
748        }
749        AlterTableOperation::DropColumn { name } => AlterTableKind::DropColumns(DropColumns {
750            drop_columns: vec![DropColumn {
751                name: name.value.to_string(),
752            }],
753        }),
754        AlterTableOperation::RenameTable { new_table_name } => {
755            AlterTableKind::RenameTable(RenameTable {
756                new_table_name: new_table_name.to_string(),
757            })
758        }
759        AlterTableOperation::SetTableOptions { options } => {
760            AlterTableKind::SetTableOptions(SetTableOptions {
761                table_options: options.into_iter().map(Into::into).collect(),
762            })
763        }
764        AlterTableOperation::UnsetTableOptions { keys } => {
765            AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys })
766        }
767        AlterTableOperation::SetIndex { options } => {
768            let option = match options {
769                sql::statements::alter::SetIndexOperation::Fulltext {
770                    column_name,
771                    options,
772                } => SetIndex {
773                    options: Some(set_index::Options::Fulltext(SetFulltext {
774                        column_name: column_name.value,
775                        enable: options.enable,
776                        analyzer: match options.analyzer {
777                            FulltextAnalyzer::English => Analyzer::English.into(),
778                            FulltextAnalyzer::Chinese => Analyzer::Chinese.into(),
779                        },
780                        case_sensitive: options.case_sensitive,
781                        backend: match options.backend {
782                            FulltextBackend::Bloom => PbFulltextBackend::Bloom.into(),
783                            FulltextBackend::Tantivy => PbFulltextBackend::Tantivy.into(),
784                        },
785                        granularity: options.granularity as u64,
786                        false_positive_rate: options.false_positive_rate(),
787                    })),
788                },
789                sql::statements::alter::SetIndexOperation::Inverted { column_name } => SetIndex {
790                    options: Some(set_index::Options::Inverted(SetInverted {
791                        column_name: column_name.value,
792                    })),
793                },
794                sql::statements::alter::SetIndexOperation::Skipping {
795                    column_name,
796                    options,
797                } => SetIndex {
798                    options: Some(set_index::Options::Skipping(SetSkipping {
799                        column_name: column_name.value,
800                        enable: true,
801                        granularity: options.granularity as u64,
802                        false_positive_rate: options.false_positive_rate(),
803                        skipping_index_type: match options.index_type {
804                            SkippingIndexType::BloomFilter => {
805                                PbSkippingIndexType::BloomFilter.into()
806                            }
807                        },
808                    })),
809                },
810            };
811            AlterTableKind::SetIndexes(SetIndexes {
812                set_indexes: vec![option],
813            })
814        }
815        AlterTableOperation::UnsetIndex { options } => {
816            let option = match options {
817                sql::statements::alter::UnsetIndexOperation::Fulltext { column_name } => {
818                    UnsetIndex {
819                        options: Some(unset_index::Options::Fulltext(UnsetFulltext {
820                            column_name: column_name.value,
821                        })),
822                    }
823                }
824                sql::statements::alter::UnsetIndexOperation::Inverted { column_name } => {
825                    UnsetIndex {
826                        options: Some(unset_index::Options::Inverted(UnsetInverted {
827                            column_name: column_name.value,
828                        })),
829                    }
830                }
831                sql::statements::alter::UnsetIndexOperation::Skipping { column_name } => {
832                    UnsetIndex {
833                        options: Some(unset_index::Options::Skipping(UnsetSkipping {
834                            column_name: column_name.value,
835                        })),
836                    }
837                }
838            };
839
840            AlterTableKind::UnsetIndexes(UnsetIndexes {
841                unset_indexes: vec![option],
842            })
843        }
844        AlterTableOperation::DropDefaults { columns } => {
845            AlterTableKind::DropDefaults(DropDefaults {
846                drop_defaults: columns
847                    .into_iter()
848                    .map(|col| {
849                        let column_name = col.0.to_string();
850                        Ok(api::v1::DropDefault { column_name })
851                    })
852                    .collect::<Result<Vec<_>>>()?,
853            })
854        }
855        AlterTableOperation::SetDefaults { defaults } => AlterTableKind::SetDefaults(SetDefaults {
856            set_defaults: defaults
857                .into_iter()
858                .map(|col| {
859                    let column_name = col.column_name.to_string();
860                    let default_constraint = serde_json::to_string(&col.default_constraint)
861                        .context(EncodeJsonSnafu)?
862                        .into_bytes();
863                    Ok(api::v1::SetDefault {
864                        column_name,
865                        default_constraint,
866                    })
867                })
868                .collect::<Result<Vec<_>>>()?,
869        }),
870    };
871
872    Ok(AlterTableExpr {
873        catalog_name,
874        schema_name,
875        table_name,
876        kind: Some(kind),
877    })
878}
879
880/// Try to cast the `[AlterDatabase]` statement into gRPC `[AlterDatabaseExpr]`.
881pub fn to_alter_database_expr(
882    alter_database: AlterDatabase,
883    query_ctx: &QueryContextRef,
884) -> Result<AlterDatabaseExpr> {
885    let catalog = query_ctx.current_catalog();
886    let schema = alter_database.database_name;
887
888    let kind = match alter_database.alter_operation {
889        AlterDatabaseOperation::SetDatabaseOption { options } => {
890            let options = options.into_iter().map(Into::into).collect();
891            AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
892                set_database_options: options,
893            })
894        }
895        AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
896            AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys })
897        }
898    };
899
900    Ok(AlterDatabaseExpr {
901        catalog_name: catalog.to_string(),
902        schema_name: schema.to_string(),
903        kind: Some(kind),
904    })
905}
906
907/// Try to cast the `[CreateViewExpr]` statement into gRPC `[CreateViewExpr]`.
908pub fn to_create_view_expr(
909    stmt: CreateView,
910    logical_plan: Vec<u8>,
911    table_names: Vec<TableName>,
912    columns: Vec<String>,
913    plan_columns: Vec<String>,
914    definition: String,
915    query_ctx: QueryContextRef,
916) -> Result<CreateViewExpr> {
917    let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx)
918        .map_err(BoxedError::new)
919        .context(ExternalSnafu)?;
920
921    let expr = CreateViewExpr {
922        catalog_name,
923        schema_name,
924        view_name,
925        logical_plan,
926        create_if_not_exists: stmt.if_not_exists,
927        or_replace: stmt.or_replace,
928        table_names,
929        columns,
930        plan_columns,
931        definition,
932    };
933
934    Ok(expr)
935}
936
937pub fn to_create_flow_task_expr(
938    create_flow: CreateFlow,
939    query_ctx: &QueryContextRef,
940) -> Result<CreateFlowExpr> {
941    // retrieve sink table name
942    let sink_table_ref =
943        object_name_to_table_reference(create_flow.sink_table_name.clone().into(), true)
944            .with_context(|_| ConvertIdentifierSnafu {
945                ident: create_flow.sink_table_name.to_string(),
946            })?;
947    let catalog = sink_table_ref
948        .catalog()
949        .unwrap_or(query_ctx.current_catalog())
950        .to_string();
951    let schema = sink_table_ref
952        .schema()
953        .map(|s| s.to_owned())
954        .unwrap_or(query_ctx.current_schema());
955
956    let sink_table_name = TableName {
957        catalog_name: catalog,
958        schema_name: schema,
959        table_name: sink_table_ref.table().to_string(),
960    };
961
962    let source_table_names = extract_tables_from_query(&create_flow.query)
963        .map(|name| {
964            let reference = object_name_to_table_reference(name.clone().into(), true)
965                .with_context(|_| ConvertIdentifierSnafu {
966                    ident: name.to_string(),
967                })?;
968            let catalog = reference
969                .catalog()
970                .unwrap_or(query_ctx.current_catalog())
971                .to_string();
972            let schema = reference
973                .schema()
974                .map(|s| s.to_string())
975                .unwrap_or(query_ctx.current_schema());
976
977            let table_name = TableName {
978                catalog_name: catalog,
979                schema_name: schema,
980                table_name: reference.table().to_string(),
981            };
982            Ok(table_name)
983        })
984        .collect::<Result<Vec<_>>>()?;
985
986    let eval_interval = create_flow.eval_interval;
987
988    Ok(CreateFlowExpr {
989        catalog_name: query_ctx.current_catalog().to_string(),
990        flow_name: sanitize_flow_name(create_flow.flow_name)?,
991        source_table_names,
992        sink_table_name: Some(sink_table_name),
993        or_replace: create_flow.or_replace,
994        create_if_not_exists: create_flow.if_not_exists,
995        expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
996        eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }),
997        comment: create_flow.comment.unwrap_or_default(),
998        sql: create_flow.query.to_string(),
999        flow_options: Default::default(),
1000    })
1001}
1002
1003/// sanitize the flow name, remove possible quotes
1004fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
1005    ensure!(
1006        flow_name.0.len() == 1,
1007        InvalidFlowNameSnafu {
1008            name: flow_name.to_string(),
1009        }
1010    );
1011    // safety: we've checked flow_name.0 has exactly one element.
1012    Ok(flow_name.0.swap_remove(0).to_string_unquoted())
1013}
1014
1015#[cfg(test)]
1016mod tests {
1017    use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
1018    use datatypes::value::Value;
1019    use session::context::{QueryContext, QueryContextBuilder};
1020    use sql::dialect::GreptimeDbDialect;
1021    use sql::parser::{ParseOptions, ParserContext};
1022    use sql::statements::statement::Statement;
1023    use store_api::storage::ColumnDefaultConstraint;
1024
1025    use super::*;
1026
1027    #[test]
1028    fn test_create_flow_tql_expr() {
1029        let sql = r#"
1030CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1031TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#;
1032        let stmt =
1033            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1034
1035        assert!(
1036            stmt.is_err(),
1037            "Expected error for invalid TQL EVAL parameters: {:#?}",
1038            stmt
1039        );
1040
1041        let sql = r#"
1042CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1043TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests);"#;
1044        let stmt =
1045            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1046                .unwrap()
1047                .pop()
1048                .unwrap();
1049
1050        let Statement::CreateFlow(create_flow) = stmt else {
1051            unreachable!()
1052        };
1053        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1054
1055        let to_dot_sep =
1056            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1057        assert_eq!("calc_reqs", expr.flow_name);
1058        assert_eq!("greptime", expr.catalog_name);
1059        assert_eq!(
1060            "greptime.public.cnt_reqs",
1061            expr.sink_table_name.map(to_dot_sep).unwrap()
1062        );
1063        assert!(expr.source_table_names.is_empty());
1064        assert_eq!(
1065            r#"TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests)"#,
1066            expr.sql
1067        );
1068    }
1069
1070    #[test]
1071    fn test_create_flow_expr() {
1072        let sql = r"
1073CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
1074SELECT
1075    DISTINCT number as dis
1076FROM
1077    distinct_basic;";
1078        let stmt =
1079            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1080                .unwrap()
1081                .pop()
1082                .unwrap();
1083
1084        let Statement::CreateFlow(create_flow) = stmt else {
1085            unreachable!()
1086        };
1087        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1088
1089        let to_dot_sep =
1090            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1091        assert_eq!("test_distinct_basic", expr.flow_name);
1092        assert_eq!("greptime", expr.catalog_name);
1093        assert_eq!(
1094            "greptime.public.out_distinct_basic",
1095            expr.sink_table_name.map(to_dot_sep).unwrap()
1096        );
1097        assert_eq!(1, expr.source_table_names.len());
1098        assert_eq!(
1099            "greptime.public.distinct_basic",
1100            to_dot_sep(expr.source_table_names[0].clone())
1101        );
1102        assert_eq!(
1103            r"SELECT
1104    DISTINCT number as dis
1105FROM
1106    distinct_basic",
1107            expr.sql
1108        );
1109
1110        let sql = r"
1111CREATE FLOW `task_2`
1112SINK TO schema_1.table_1
1113AS
1114SELECT max(c1), min(c2) FROM schema_2.table_2;";
1115        let stmt =
1116            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1117                .unwrap()
1118                .pop()
1119                .unwrap();
1120
1121        let Statement::CreateFlow(create_flow) = stmt else {
1122            unreachable!()
1123        };
1124        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1125
1126        let to_dot_sep =
1127            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1128        assert_eq!("task_2", expr.flow_name);
1129        assert_eq!("greptime", expr.catalog_name);
1130        assert_eq!(
1131            "greptime.schema_1.table_1",
1132            expr.sink_table_name.map(to_dot_sep).unwrap()
1133        );
1134        assert_eq!(1, expr.source_table_names.len());
1135        assert_eq!(
1136            "greptime.schema_2.table_2",
1137            to_dot_sep(expr.source_table_names[0].clone())
1138        );
1139        assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
1140
1141        let sql = r"
1142CREATE FLOW abc.`task_2`
1143SINK TO schema_1.table_1
1144AS
1145SELECT max(c1), min(c2) FROM schema_2.table_2;";
1146        let stmt =
1147            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1148                .unwrap()
1149                .pop()
1150                .unwrap();
1151
1152        let Statement::CreateFlow(create_flow) = stmt else {
1153            unreachable!()
1154        };
1155        let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
1156
1157        assert!(res.is_err());
1158        assert!(
1159            res.unwrap_err()
1160                .to_string()
1161                .contains("Invalid flow name: abc.`task_2`")
1162        );
1163    }
1164
1165    #[test]
1166    fn test_create_to_expr() {
1167        let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(ttl='3days', write_buffer_size='1024KB');";
1168        let stmt =
1169            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1170                .unwrap()
1171                .pop()
1172                .unwrap();
1173
1174        let Statement::CreateTable(create_table) = stmt else {
1175            unreachable!()
1176        };
1177        let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1178        assert_eq!("3days", expr.table_options.get("ttl").unwrap());
1179        assert_eq!(
1180            "1.0MiB",
1181            expr.table_options.get("write_buffer_size").unwrap()
1182        );
1183    }
1184
1185    #[test]
1186    fn test_invalid_create_to_expr() {
1187        let cases = [
1188            // duplicate column declaration
1189            "CREATE TABLE monitor (host STRING primary key, ts TIMESTAMP TIME INDEX, some_column text, some_column string);",
1190            // duplicate primary key
1191            "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, some_column STRING, PRIMARY KEY (some_column, host, some_column));",
1192            // time index is primary key
1193            "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, PRIMARY KEY (host, ts));",
1194        ];
1195
1196        for sql in cases {
1197            let stmt = ParserContext::create_with_dialect(
1198                sql,
1199                &GreptimeDbDialect {},
1200                ParseOptions::default(),
1201            )
1202            .unwrap()
1203            .pop()
1204            .unwrap();
1205            let Statement::CreateTable(create_table) = stmt else {
1206                unreachable!()
1207            };
1208            create_to_expr(&create_table, &QueryContext::arc()).unwrap_err();
1209        }
1210    }
1211
1212    #[test]
1213    fn test_create_to_expr_with_default_timestamp_value() {
1214        let sql = "CREATE TABLE monitor (v double,ts TIMESTAMP default '2024-01-30T00:01:01',TIME INDEX (ts)) engine=mito;";
1215        let stmt =
1216            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1217                .unwrap()
1218                .pop()
1219                .unwrap();
1220
1221        let Statement::CreateTable(create_table) = stmt else {
1222            unreachable!()
1223        };
1224
1225        // query context with system timezone UTC.
1226        let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1227        let ts_column = &expr.column_defs[1];
1228        let constraint = assert_ts_column(ts_column);
1229        assert!(
1230            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1231                         if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1232        );
1233
1234        // query context with timezone `+08:00`
1235        let ctx = QueryContextBuilder::default()
1236            .timezone(Timezone::from_tz_string("+08:00").unwrap())
1237            .build()
1238            .into();
1239        let expr = create_to_expr(&create_table, &ctx).unwrap();
1240        let ts_column = &expr.column_defs[1];
1241        let constraint = assert_ts_column(ts_column);
1242        assert!(
1243            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1244                         if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1245        );
1246    }
1247
1248    fn assert_ts_column(ts_column: &api::v1::ColumnDef) -> ColumnDefaultConstraint {
1249        assert_eq!("ts", ts_column.name);
1250        assert_eq!(
1251            ColumnDataType::TimestampMillisecond as i32,
1252            ts_column.data_type
1253        );
1254        assert!(!ts_column.default_constraint.is_empty());
1255
1256        ColumnDefaultConstraint::try_from(&ts_column.default_constraint[..]).unwrap()
1257    }
1258
1259    #[test]
1260    fn test_to_alter_expr() {
1261        let sql = "ALTER DATABASE greptime SET key1='value1', key2='value2';";
1262        let stmt =
1263            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1264                .unwrap()
1265                .pop()
1266                .unwrap();
1267
1268        let Statement::AlterDatabase(alter_database) = stmt else {
1269            unreachable!()
1270        };
1271
1272        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1273        let kind = expr.kind.unwrap();
1274
1275        let AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
1276            set_database_options,
1277        }) = kind
1278        else {
1279            unreachable!()
1280        };
1281
1282        assert_eq!(2, set_database_options.len());
1283        assert_eq!("key1", set_database_options[0].key);
1284        assert_eq!("value1", set_database_options[0].value);
1285        assert_eq!("key2", set_database_options[1].key);
1286        assert_eq!("value2", set_database_options[1].value);
1287
1288        let sql = "ALTER DATABASE greptime UNSET key1, key2;";
1289        let stmt =
1290            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1291                .unwrap()
1292                .pop()
1293                .unwrap();
1294
1295        let Statement::AlterDatabase(alter_database) = stmt else {
1296            unreachable!()
1297        };
1298
1299        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1300        let kind = expr.kind.unwrap();
1301
1302        let AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys }) = kind else {
1303            unreachable!()
1304        };
1305
1306        assert_eq!(2, keys.len());
1307        assert!(keys.contains(&"key1".to_string()));
1308        assert!(keys.contains(&"key2".to_string()));
1309
1310        let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
1311        let stmt =
1312            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1313                .unwrap()
1314                .pop()
1315                .unwrap();
1316
1317        let Statement::AlterTable(alter_table) = stmt else {
1318            unreachable!()
1319        };
1320
1321        // query context with system timezone UTC.
1322        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1323        let kind = expr.kind.unwrap();
1324
1325        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1326            unreachable!()
1327        };
1328
1329        assert_eq!(1, add_columns.len());
1330        let ts_column = add_columns[0].column_def.clone().unwrap();
1331        let constraint = assert_ts_column(&ts_column);
1332        assert!(
1333            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1334                         if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1335        );
1336
1337        //
1338        // query context with timezone `+08:00`
1339        let ctx = QueryContextBuilder::default()
1340            .timezone(Timezone::from_tz_string("+08:00").unwrap())
1341            .build()
1342            .into();
1343        let expr = to_alter_table_expr(alter_table, &ctx).unwrap();
1344        let kind = expr.kind.unwrap();
1345
1346        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1347            unreachable!()
1348        };
1349
1350        assert_eq!(1, add_columns.len());
1351        let ts_column = add_columns[0].column_def.clone().unwrap();
1352        let constraint = assert_ts_column(&ts_column);
1353        assert!(
1354            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1355                         if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1356        );
1357    }
1358
1359    #[test]
1360    fn test_to_alter_modify_column_type_expr() {
1361        let sql = "ALTER TABLE monitor MODIFY COLUMN mem_usage STRING;";
1362        let stmt =
1363            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1364                .unwrap()
1365                .pop()
1366                .unwrap();
1367
1368        let Statement::AlterTable(alter_table) = stmt else {
1369            unreachable!()
1370        };
1371
1372        // query context with system timezone UTC.
1373        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1374        let kind = expr.kind.unwrap();
1375
1376        let AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
1377            modify_column_types,
1378        }) = kind
1379        else {
1380            unreachable!()
1381        };
1382
1383        assert_eq!(1, modify_column_types.len());
1384        let modify_column_type = &modify_column_types[0];
1385
1386        assert_eq!("mem_usage", modify_column_type.column_name);
1387        assert_eq!(
1388            ColumnDataType::String as i32,
1389            modify_column_type.target_type
1390        );
1391        assert!(modify_column_type.target_type_extension.is_none());
1392    }
1393
1394    fn new_test_table_names() -> Vec<TableName> {
1395        vec![
1396            TableName {
1397                catalog_name: "greptime".to_string(),
1398                schema_name: "public".to_string(),
1399                table_name: "a_table".to_string(),
1400            },
1401            TableName {
1402                catalog_name: "greptime".to_string(),
1403                schema_name: "public".to_string(),
1404                table_name: "b_table".to_string(),
1405            },
1406        ]
1407    }
1408
1409    #[test]
1410    fn test_to_create_view_expr() {
1411        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
1412        let stmt =
1413            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1414                .unwrap()
1415                .pop()
1416                .unwrap();
1417
1418        let Statement::CreateView(stmt) = stmt else {
1419            unreachable!()
1420        };
1421
1422        let logical_plan = vec![1, 2, 3];
1423        let table_names = new_test_table_names();
1424        let columns = vec!["a".to_string()];
1425        let plan_columns = vec!["number".to_string()];
1426
1427        let expr = to_create_view_expr(
1428            stmt,
1429            logical_plan.clone(),
1430            table_names.clone(),
1431            columns.clone(),
1432            plan_columns.clone(),
1433            sql.to_string(),
1434            QueryContext::arc(),
1435        )
1436        .unwrap();
1437
1438        assert_eq!("greptime", expr.catalog_name);
1439        assert_eq!("public", expr.schema_name);
1440        assert_eq!("test", expr.view_name);
1441        assert!(!expr.create_if_not_exists);
1442        assert!(!expr.or_replace);
1443        assert_eq!(logical_plan, expr.logical_plan);
1444        assert_eq!(table_names, expr.table_names);
1445        assert_eq!(sql, expr.definition);
1446        assert_eq!(columns, expr.columns);
1447        assert_eq!(plan_columns, expr.plan_columns);
1448    }
1449
1450    #[test]
1451    fn test_to_create_view_expr_complex() {
1452        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test.test_view AS SELECT * FROM NUMBERS";
1453        let stmt =
1454            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1455                .unwrap()
1456                .pop()
1457                .unwrap();
1458
1459        let Statement::CreateView(stmt) = stmt else {
1460            unreachable!()
1461        };
1462
1463        let logical_plan = vec![1, 2, 3];
1464        let table_names = new_test_table_names();
1465        let columns = vec!["a".to_string()];
1466        let plan_columns = vec!["number".to_string()];
1467
1468        let expr = to_create_view_expr(
1469            stmt,
1470            logical_plan.clone(),
1471            table_names.clone(),
1472            columns.clone(),
1473            plan_columns.clone(),
1474            sql.to_string(),
1475            QueryContext::arc(),
1476        )
1477        .unwrap();
1478
1479        assert_eq!("greptime", expr.catalog_name);
1480        assert_eq!("test", expr.schema_name);
1481        assert_eq!("test_view", expr.view_name);
1482        assert!(expr.create_if_not_exists);
1483        assert!(expr.or_replace);
1484        assert_eq!(logical_plan, expr.logical_plan);
1485        assert_eq!(table_names, expr.table_names);
1486        assert_eq!(sql, expr.definition);
1487        assert_eq!(columns, expr.columns);
1488        assert_eq!(plan_columns, expr.plan_columns);
1489    }
1490
1491    #[test]
1492    fn test_expr_to_create() {
1493        let sql = r#"CREATE TABLE IF NOT EXISTS `tt` (
1494  `timestamp` TIMESTAMP(9) NOT NULL,
1495  `ip_address` STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),
1496  `username` STRING NULL,
1497  `http_method` STRING NULL INVERTED INDEX,
1498  `request_line` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),
1499  `protocol` STRING NULL,
1500  `status_code` INT NULL INVERTED INDEX,
1501  `response_size` BIGINT NULL,
1502  `message` STRING NULL,
1503  TIME INDEX (`timestamp`),
1504  PRIMARY KEY (`username`, `status_code`)
1505)
1506ENGINE=mito
1507WITH(
1508  append_mode = 'true'
1509)"#;
1510        let stmt =
1511            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1512                .unwrap()
1513                .pop()
1514                .unwrap();
1515
1516        let Statement::CreateTable(original_create) = stmt else {
1517            unreachable!()
1518        };
1519
1520        // Convert CreateTable -> CreateTableExpr -> CreateTable
1521        let expr = create_to_expr(&original_create, &QueryContext::arc()).unwrap();
1522
1523        let create_table = expr_to_create(&expr, Some('`')).unwrap();
1524        let new_sql = format!("{:#}", create_table);
1525        assert_eq!(sql, new_sql);
1526    }
1527}