operator/
expr_helper.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19
20use api::helper::ColumnDataTypeWrapper;
21use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
22use api::v1::alter_table_expr::Kind as AlterTableKind;
23use api::v1::column_def::{options_from_column_schema, try_as_column_schema};
24use api::v1::{
25    AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer, ColumnDataType,
26    ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn,
27    DropColumns, DropDefaults, ExpireAfter, FulltextBackend as PbFulltextBackend, ModifyColumnType,
28    ModifyColumnTypes, RenameTable, SemanticType, SetDatabaseOptions, SetDefaults, SetFulltext,
29    SetIndex, SetIndexes, SetInverted, SetSkipping, SetTableOptions,
30    SkippingIndexType as PbSkippingIndexType, TableName, UnsetDatabaseOptions, UnsetFulltext,
31    UnsetIndex, UnsetIndexes, UnsetInverted, UnsetSkipping, UnsetTableOptions, set_index,
32    unset_index,
33};
34use common_error::ext::BoxedError;
35use common_grpc_expr::util::ColumnExpr;
36use common_time::Timezone;
37use datafusion::sql::planner::object_name_to_table_reference;
38use datatypes::schema::{
39    COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
40    COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
41    COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
42    COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
43    ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema,
44    SkippingIndexType,
45};
46use file_engine::FileOptions;
47use query::sql::{
48    check_file_to_table_schema_compatibility, file_column_schemas_to_table,
49    infer_file_table_schema, prepare_file_table_files,
50};
51use session::context::QueryContextRef;
52use session::table_name::table_idents_to_full_name;
53use snafu::{OptionExt, ResultExt, ensure};
54use sql::ast::{
55    ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName, ObjectNamePartExt,
56};
57use sql::dialect::GreptimeDbDialect;
58use sql::parser::ParserContext;
59use sql::statements::alter::{
60    AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
61};
62use sql::statements::create::{
63    Column as SqlColumn, ColumnExtensions, CreateExternalTable, CreateFlow, CreateTable,
64    CreateView, TableConstraint,
65};
66use sql::statements::{
67    OptionMap, column_to_schema, concrete_data_type_to_sql_data_type,
68    sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type, value_to_sql_value,
69};
70use sql::util::extract_tables_from_query;
71use table::requests::{FILE_TABLE_META_KEY, TableOptions};
72use table::table_reference::TableReference;
73#[cfg(feature = "enterprise")]
74pub use trigger::to_create_trigger_task_expr;
75
76use crate::error::{
77    BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
78    ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, FindNewColumnsOnInsertionSnafu,
79    IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidColumnDefSnafu,
80    InvalidFlowNameSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, ParseSqlValueSnafu,
81    PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu,
82};
83
84pub fn create_table_expr_by_column_schemas(
85    table_name: &TableReference<'_>,
86    column_schemas: &[api::v1::ColumnSchema],
87    engine: &str,
88    desc: Option<&str>,
89) -> Result<CreateTableExpr> {
90    let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
91    let expr = common_grpc_expr::util::build_create_table_expr(
92        None,
93        table_name,
94        column_exprs,
95        engine,
96        desc.unwrap_or("Created on insertion"),
97    )
98    .context(BuildCreateExprOnInsertionSnafu)?;
99
100    validate_create_expr(&expr)?;
101    Ok(expr)
102}
103
104pub fn extract_add_columns_expr(
105    schema: &Schema,
106    column_exprs: Vec<ColumnExpr>,
107) -> Result<Option<AddColumns>> {
108    let add_columns = common_grpc_expr::util::extract_new_columns(schema, column_exprs)
109        .context(FindNewColumnsOnInsertionSnafu)?;
110    if let Some(add_columns) = &add_columns {
111        validate_add_columns_expr(add_columns)?;
112    }
113    Ok(add_columns)
114}
115
116//   cpu float64,
117//   memory float64,
118//   TIME INDEX (ts),
119//   PRIMARY KEY(host)
120// ) WITH (location='/var/data/city.csv', format='csv');
121// ```
122// The user needs to specify the TIME INDEX column. If there is no suitable
123// column in the file to use as TIME INDEX, an additional placeholder column
124// needs to be created as the TIME INDEX, and a `DEFAULT <value>` constraint
125// should be added.
126//
127//
128// When the `CREATE EXTERNAL TABLE` statement is in inferred form, like
129// ```sql
130// CREATE EXTERNAL TABLE IF NOT EXISTS city WITH (location='/var/data/city.csv',format='csv');
131// ```
132// 1. If the TIME INDEX column can be inferred from metadata, use that column
133//    as the TIME INDEX. Otherwise,
134// 2. If a column named `greptime_timestamp` exists (with the requirement that
135//    the column is with type TIMESTAMP, otherwise an error is thrown), use
136//    that column as the TIME INDEX. Otherwise,
137// 3. Automatically create the `greptime_timestamp` column and add a `DEFAULT 0`
138//    constraint.
139pub(crate) async fn create_external_expr(
140    create: CreateExternalTable,
141    query_ctx: &QueryContextRef,
142) -> Result<CreateTableExpr> {
143    let (catalog_name, schema_name, table_name) =
144        table_idents_to_full_name(&create.name, query_ctx)
145            .map_err(BoxedError::new)
146            .context(ExternalSnafu)?;
147
148    let mut table_options = create.options.into_map();
149
150    let (object_store, files) = prepare_file_table_files(&table_options)
151        .await
152        .context(PrepareFileTableSnafu)?;
153
154    let file_column_schemas = infer_file_table_schema(&object_store, &files, &table_options)
155        .await
156        .context(InferFileTableSchemaSnafu)?
157        .column_schemas;
158
159    let (time_index, primary_keys, table_column_schemas) = if !create.columns.is_empty() {
160        // expanded form
161        let time_index = find_time_index(&create.constraints)?;
162        let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
163        let column_schemas =
164            columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
165        (time_index, primary_keys, column_schemas)
166    } else {
167        // inferred form
168        let (column_schemas, time_index) = file_column_schemas_to_table(&file_column_schemas);
169        let primary_keys = vec![];
170        (time_index, primary_keys, column_schemas)
171    };
172
173    check_file_to_table_schema_compatibility(&file_column_schemas, &table_column_schemas)
174        .context(SchemaIncompatibleSnafu)?;
175
176    let meta = FileOptions {
177        files,
178        file_column_schemas,
179    };
180    table_options.insert(
181        FILE_TABLE_META_KEY.to_string(),
182        serde_json::to_string(&meta).context(EncodeJsonSnafu)?,
183    );
184
185    let column_defs = column_schemas_to_defs(table_column_schemas, &primary_keys)?;
186    let expr = CreateTableExpr {
187        catalog_name,
188        schema_name,
189        table_name,
190        desc: String::default(),
191        column_defs,
192        time_index,
193        primary_keys,
194        create_if_not_exists: create.if_not_exists,
195        table_options,
196        table_id: None,
197        engine: create.engine.clone(),
198    };
199
200    Ok(expr)
201}
202
203/// Convert `CreateTable` statement to [`CreateTableExpr`] gRPC request.
204pub fn create_to_expr(
205    create: &CreateTable,
206    query_ctx: &QueryContextRef,
207) -> Result<CreateTableExpr> {
208    let (catalog_name, schema_name, table_name) =
209        table_idents_to_full_name(&create.name, query_ctx)
210            .map_err(BoxedError::new)
211            .context(ExternalSnafu)?;
212
213    let time_index = find_time_index(&create.constraints)?;
214    let table_options = HashMap::from(
215        &TableOptions::try_from_iter(create.options.to_str_map())
216            .context(UnrecognizedTableOptionSnafu)?,
217    );
218
219    let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
220
221    let expr = CreateTableExpr {
222        catalog_name,
223        schema_name,
224        table_name,
225        desc: String::default(),
226        column_defs: columns_to_expr(
227            &create.columns,
228            &time_index,
229            &primary_keys,
230            Some(&query_ctx.timezone()),
231        )?,
232        time_index,
233        primary_keys,
234        create_if_not_exists: create.if_not_exists,
235        table_options,
236        table_id: None,
237        engine: create.engine.clone(),
238    };
239
240    validate_create_expr(&expr)?;
241    Ok(expr)
242}
243
244/// Convert gRPC's [`CreateTableExpr`] back to `CreateTable` statement.
245/// You can use `create_table_expr_by_column_schemas` to create a `CreateTableExpr` from column schemas.
246///
247/// # Parameters
248///
249/// * `expr` - The `CreateTableExpr` to convert
250/// * `quote_style` - Optional quote style for identifiers (defaults to MySQL style ` backtick)
251pub fn expr_to_create(expr: &CreateTableExpr, quote_style: Option<char>) -> Result<CreateTable> {
252    let quote_style = quote_style.unwrap_or('`');
253
254    // Convert table name
255    let table_name = ObjectName(vec![sql::ast::ObjectNamePart::Identifier(
256        sql::ast::Ident::with_quote(quote_style, &expr.table_name),
257    )]);
258
259    // Convert columns
260    let mut columns = Vec::with_capacity(expr.column_defs.len());
261    for column_def in &expr.column_defs {
262        let column_schema = try_as_column_schema(column_def).context(InvalidColumnDefSnafu {
263            column: &column_def.name,
264        })?;
265
266        let mut options = Vec::new();
267
268        // Add NULL/NOT NULL constraint
269        if column_def.is_nullable {
270            options.push(ColumnOptionDef {
271                name: None,
272                option: ColumnOption::Null,
273            });
274        } else {
275            options.push(ColumnOptionDef {
276                name: None,
277                option: ColumnOption::NotNull,
278            });
279        }
280
281        // Add DEFAULT constraint if present
282        if let Some(default_constraint) = column_schema.default_constraint() {
283            let expr = match default_constraint {
284                ColumnDefaultConstraint::Value(v) => {
285                    Expr::Value(value_to_sql_value(v).context(ParseSqlValueSnafu)?.into())
286                }
287                ColumnDefaultConstraint::Function(func_expr) => {
288                    ParserContext::parse_function(func_expr, &GreptimeDbDialect {})
289                        .context(ParseSqlSnafu)?
290                }
291            };
292            options.push(ColumnOptionDef {
293                name: None,
294                option: ColumnOption::Default(expr),
295            });
296        }
297
298        // Add COMMENT if present
299        if !column_def.comment.is_empty() {
300            options.push(ColumnOptionDef {
301                name: None,
302                option: ColumnOption::Comment(column_def.comment.clone()),
303            });
304        }
305
306        // Note: We don't add inline PRIMARY KEY options here,
307        // we'll handle all primary keys as constraints instead for consistency
308
309        // Handle column extensions (fulltext, inverted index, skipping index)
310        let mut extensions = ColumnExtensions::default();
311
312        // Add fulltext index options if present
313        if let Ok(Some(opt)) = column_schema.fulltext_options()
314            && opt.enable
315        {
316            let mut map = HashMap::from([
317                (
318                    COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
319                    opt.analyzer.to_string(),
320                ),
321                (
322                    COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
323                    opt.case_sensitive.to_string(),
324                ),
325                (
326                    COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
327                    opt.backend.to_string(),
328                ),
329            ]);
330            if opt.backend == FulltextBackend::Bloom {
331                map.insert(
332                    COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(),
333                    opt.granularity.to_string(),
334                );
335                map.insert(
336                    COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
337                    opt.false_positive_rate().to_string(),
338                );
339            }
340            extensions.fulltext_index_options = Some(map.into());
341        }
342
343        // Add skipping index options if present
344        if let Ok(Some(opt)) = column_schema.skipping_index_options() {
345            let map = HashMap::from([
346                (
347                    COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
348                    opt.granularity.to_string(),
349                ),
350                (
351                    COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
352                    opt.false_positive_rate().to_string(),
353                ),
354                (
355                    COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
356                    opt.index_type.to_string(),
357                ),
358            ]);
359            extensions.skipping_index_options = Some(map.into());
360        }
361
362        // Add inverted index options if present
363        if column_schema.is_inverted_indexed() {
364            extensions.inverted_index_options = Some(HashMap::new().into());
365        }
366
367        let sql_column = SqlColumn {
368            column_def: ColumnDef {
369                name: Ident::with_quote(quote_style, &column_def.name),
370                data_type: concrete_data_type_to_sql_data_type(&column_schema.data_type)
371                    .context(ParseSqlSnafu)?,
372                options,
373            },
374            extensions,
375        };
376
377        columns.push(sql_column);
378    }
379
380    // Convert constraints
381    let mut constraints = Vec::new();
382
383    // Add TIME INDEX constraint
384    constraints.push(TableConstraint::TimeIndex {
385        column: Ident::with_quote(quote_style, &expr.time_index),
386    });
387
388    // Add PRIMARY KEY constraint (always add as constraint for consistency)
389    if !expr.primary_keys.is_empty() {
390        let primary_key_columns: Vec<Ident> = expr
391            .primary_keys
392            .iter()
393            .map(|pk| Ident::with_quote(quote_style, pk))
394            .collect();
395
396        constraints.push(TableConstraint::PrimaryKey {
397            columns: primary_key_columns,
398        });
399    }
400
401    // Convert table options
402    let mut options = OptionMap::default();
403    for (key, value) in &expr.table_options {
404        options.insert(key.clone(), value.clone());
405    }
406
407    Ok(CreateTable {
408        if_not_exists: expr.create_if_not_exists,
409        table_id: expr.table_id.as_ref().map(|tid| tid.id).unwrap_or(0),
410        name: table_name,
411        columns,
412        engine: expr.engine.clone(),
413        constraints,
414        options,
415        partitions: None,
416    })
417}
418
419/// Validate the [`CreateTableExpr`] request.
420pub fn validate_create_expr(create: &CreateTableExpr) -> Result<()> {
421    // construct column list
422    let mut column_to_indices = HashMap::with_capacity(create.column_defs.len());
423    for (idx, column) in create.column_defs.iter().enumerate() {
424        if let Some(indices) = column_to_indices.get(&column.name) {
425            return InvalidSqlSnafu {
426                err_msg: format!(
427                    "column name `{}` is duplicated at index {} and {}",
428                    column.name, indices, idx
429                ),
430            }
431            .fail();
432        }
433        column_to_indices.insert(&column.name, idx);
434    }
435
436    // verify time_index exists
437    let _ = column_to_indices
438        .get(&create.time_index)
439        .with_context(|| InvalidSqlSnafu {
440            err_msg: format!(
441                "column name `{}` is not found in column list",
442                create.time_index
443            ),
444        })?;
445
446    // verify primary_key exists
447    for pk in &create.primary_keys {
448        let _ = column_to_indices
449            .get(&pk)
450            .with_context(|| InvalidSqlSnafu {
451                err_msg: format!("column name `{}` is not found in column list", pk),
452            })?;
453    }
454
455    // construct primary_key set
456    let mut pk_set = HashSet::new();
457    for pk in &create.primary_keys {
458        if !pk_set.insert(pk) {
459            return InvalidSqlSnafu {
460                err_msg: format!("column name `{}` is duplicated in primary keys", pk),
461            }
462            .fail();
463        }
464    }
465
466    // verify time index is not primary key
467    if pk_set.contains(&create.time_index) {
468        return InvalidSqlSnafu {
469            err_msg: format!(
470                "column name `{}` is both primary key and time index",
471                create.time_index
472            ),
473        }
474        .fail();
475    }
476
477    for column in &create.column_defs {
478        // verify do not contain interval type column issue #3235
479        if is_interval_type(&column.data_type()) {
480            return InvalidSqlSnafu {
481                err_msg: format!(
482                    "column name `{}` is interval type, which is not supported",
483                    column.name
484                ),
485            }
486            .fail();
487        }
488        // verify do not contain datetime type column issue #5489
489        if is_date_time_type(&column.data_type()) {
490            return InvalidSqlSnafu {
491                err_msg: format!(
492                    "column name `{}` is datetime type, which is not supported, please use `timestamp` type instead",
493                    column.name
494                ),
495            }
496            .fail();
497        }
498    }
499    Ok(())
500}
501
502fn validate_add_columns_expr(add_columns: &AddColumns) -> Result<()> {
503    for add_column in &add_columns.add_columns {
504        let Some(column_def) = &add_column.column_def else {
505            continue;
506        };
507        if is_date_time_type(&column_def.data_type()) {
508            return InvalidSqlSnafu {
509                    err_msg: format!("column name `{}` is datetime type, which is not supported, please use `timestamp` type instead", column_def.name),
510                }
511                .fail();
512        }
513        if is_interval_type(&column_def.data_type()) {
514            return InvalidSqlSnafu {
515                err_msg: format!(
516                    "column name `{}` is interval type, which is not supported",
517                    column_def.name
518                ),
519            }
520            .fail();
521        }
522    }
523    Ok(())
524}
525
526fn is_date_time_type(data_type: &ColumnDataType) -> bool {
527    matches!(data_type, ColumnDataType::Datetime)
528}
529
530fn is_interval_type(data_type: &ColumnDataType) -> bool {
531    matches!(
532        data_type,
533        ColumnDataType::IntervalYearMonth
534            | ColumnDataType::IntervalDayTime
535            | ColumnDataType::IntervalMonthDayNano
536    )
537}
538
539fn find_primary_keys(
540    columns: &[SqlColumn],
541    constraints: &[TableConstraint],
542) -> Result<Vec<String>> {
543    let columns_pk = columns
544        .iter()
545        .filter_map(|x| {
546            if x.options().iter().any(|o| {
547                matches!(
548                    o.option,
549                    ColumnOption::Unique {
550                        is_primary: true,
551                        ..
552                    }
553                )
554            }) {
555                Some(x.name().value.clone())
556            } else {
557                None
558            }
559        })
560        .collect::<Vec<String>>();
561
562    ensure!(
563        columns_pk.len() <= 1,
564        IllegalPrimaryKeysDefSnafu {
565            msg: "not allowed to inline multiple primary keys in columns options"
566        }
567    );
568
569    let constraints_pk = constraints
570        .iter()
571        .filter_map(|constraint| match constraint {
572            TableConstraint::PrimaryKey { columns, .. } => {
573                Some(columns.iter().map(|ident| ident.value.clone()))
574            }
575            _ => None,
576        })
577        .flatten()
578        .collect::<Vec<String>>();
579
580    ensure!(
581        columns_pk.is_empty() || constraints_pk.is_empty(),
582        IllegalPrimaryKeysDefSnafu {
583            msg: "found definitions of primary keys in multiple places"
584        }
585    );
586
587    let mut primary_keys = Vec::with_capacity(columns_pk.len() + constraints_pk.len());
588    primary_keys.extend(columns_pk);
589    primary_keys.extend(constraints_pk);
590    Ok(primary_keys)
591}
592
593pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
594    let time_index = constraints
595        .iter()
596        .filter_map(|constraint| match constraint {
597            TableConstraint::TimeIndex { column, .. } => Some(&column.value),
598            _ => None,
599        })
600        .collect::<Vec<&String>>();
601    ensure!(
602        time_index.len() == 1,
603        InvalidSqlSnafu {
604            err_msg: "must have one and only one TimeIndex columns",
605        }
606    );
607    Ok(time_index[0].clone())
608}
609
610fn columns_to_expr(
611    column_defs: &[SqlColumn],
612    time_index: &str,
613    primary_keys: &[String],
614    timezone: Option<&Timezone>,
615) -> Result<Vec<api::v1::ColumnDef>> {
616    let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
617    column_schemas_to_defs(column_schemas, primary_keys)
618}
619
620fn columns_to_column_schemas(
621    columns: &[SqlColumn],
622    time_index: &str,
623    timezone: Option<&Timezone>,
624) -> Result<Vec<ColumnSchema>> {
625    columns
626        .iter()
627        .map(|c| column_to_schema(c, time_index, timezone).context(ParseSqlSnafu))
628        .collect::<Result<Vec<ColumnSchema>>>()
629}
630
631// TODO(weny): refactor this function to use `try_as_column_def`
632pub fn column_schemas_to_defs(
633    column_schemas: Vec<ColumnSchema>,
634    primary_keys: &[String],
635) -> Result<Vec<api::v1::ColumnDef>> {
636    let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
637        .iter()
638        .map(|c| {
639            ColumnDataTypeWrapper::try_from(c.data_type.clone())
640                .map(|w| w.to_parts())
641                .context(ColumnDataTypeSnafu)
642        })
643        .collect::<Result<Vec<_>>>()?;
644
645    column_schemas
646        .iter()
647        .zip(column_datatypes)
648        .map(|(schema, datatype)| {
649            let semantic_type = if schema.is_time_index() {
650                SemanticType::Timestamp
651            } else if primary_keys.contains(&schema.name) {
652                SemanticType::Tag
653            } else {
654                SemanticType::Field
655            } as i32;
656            let comment = schema
657                .metadata()
658                .get(COMMENT_KEY)
659                .cloned()
660                .unwrap_or_default();
661
662            Ok(api::v1::ColumnDef {
663                name: schema.name.clone(),
664                data_type: datatype.0 as i32,
665                is_nullable: schema.is_nullable(),
666                default_constraint: match schema.default_constraint() {
667                    None => vec![],
668                    Some(v) => {
669                        v.clone()
670                            .try_into()
671                            .context(ConvertColumnDefaultConstraintSnafu {
672                                column_name: &schema.name,
673                            })?
674                    }
675                },
676                semantic_type,
677                comment,
678                datatype_extension: datatype.1,
679                options: options_from_column_schema(schema),
680            })
681        })
682        .collect()
683}
684
685#[derive(Debug, Clone, PartialEq, Eq)]
686pub struct RepartitionRequest {
687    pub catalog_name: String,
688    pub schema_name: String,
689    pub table_name: String,
690    pub from_exprs: Vec<Expr>,
691    pub into_exprs: Vec<Expr>,
692}
693
694pub(crate) fn to_repartition_request(
695    alter_table: AlterTable,
696    query_ctx: &QueryContextRef,
697) -> Result<RepartitionRequest> {
698    let (catalog_name, schema_name, table_name) =
699        table_idents_to_full_name(alter_table.table_name(), query_ctx)
700            .map_err(BoxedError::new)
701            .context(ExternalSnafu)?;
702
703    let AlterTableOperation::Repartition { operation } = alter_table.alter_operation else {
704        return InvalidSqlSnafu {
705            err_msg: "expected REPARTITION operation",
706        }
707        .fail();
708    };
709
710    Ok(RepartitionRequest {
711        catalog_name,
712        schema_name,
713        table_name,
714        from_exprs: operation.from_exprs,
715        into_exprs: operation.into_exprs,
716    })
717}
718
719/// Converts a SQL alter table statement into a gRPC alter table expression.
720pub(crate) fn to_alter_table_expr(
721    alter_table: AlterTable,
722    query_ctx: &QueryContextRef,
723) -> Result<AlterTableExpr> {
724    let (catalog_name, schema_name, table_name) =
725        table_idents_to_full_name(alter_table.table_name(), query_ctx)
726            .map_err(BoxedError::new)
727            .context(ExternalSnafu)?;
728
729    let kind = match alter_table.alter_operation {
730        AlterTableOperation::AddConstraint(_) => {
731            return NotSupportedSnafu {
732                feat: "ADD CONSTRAINT",
733            }
734            .fail();
735        }
736        AlterTableOperation::AddColumns { add_columns } => AlterTableKind::AddColumns(AddColumns {
737            add_columns: add_columns
738                .into_iter()
739                .map(|add_column| {
740                    let column_def = sql_column_def_to_grpc_column_def(
741                        &add_column.column_def,
742                        Some(&query_ctx.timezone()),
743                    )
744                    .map_err(BoxedError::new)
745                    .context(ExternalSnafu)?;
746                    if is_interval_type(&column_def.data_type()) {
747                        return NotSupportedSnafu {
748                            feat: "Add column with interval type",
749                        }
750                        .fail();
751                    }
752                    Ok(AddColumn {
753                        column_def: Some(column_def),
754                        location: add_column.location.as_ref().map(From::from),
755                        add_if_not_exists: add_column.add_if_not_exists,
756                    })
757                })
758                .collect::<Result<Vec<AddColumn>>>()?,
759        }),
760        AlterTableOperation::ModifyColumnType {
761            column_name,
762            target_type,
763        } => {
764            let target_type =
765                sql_data_type_to_concrete_data_type(&target_type, &Default::default())
766                    .context(ParseSqlSnafu)?;
767            let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
768                .map(|w| w.to_parts())
769                .context(ColumnDataTypeSnafu)?;
770            if is_interval_type(&target_type) {
771                return NotSupportedSnafu {
772                    feat: "Modify column type to interval type",
773                }
774                .fail();
775            }
776            AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
777                modify_column_types: vec![ModifyColumnType {
778                    column_name: column_name.value,
779                    target_type: target_type as i32,
780                    target_type_extension,
781                }],
782            })
783        }
784        AlterTableOperation::DropColumn { name } => AlterTableKind::DropColumns(DropColumns {
785            drop_columns: vec![DropColumn {
786                name: name.value.clone(),
787            }],
788        }),
789        AlterTableOperation::RenameTable { new_table_name } => {
790            AlterTableKind::RenameTable(RenameTable {
791                new_table_name: new_table_name.clone(),
792            })
793        }
794        AlterTableOperation::SetTableOptions { options } => {
795            AlterTableKind::SetTableOptions(SetTableOptions {
796                table_options: options.into_iter().map(Into::into).collect(),
797            })
798        }
799        AlterTableOperation::UnsetTableOptions { keys } => {
800            AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys })
801        }
802        AlterTableOperation::Repartition { .. } => {
803            return NotSupportedSnafu {
804                feat: "ALTER TABLE ... REPARTITION",
805            }
806            .fail();
807        }
808        AlterTableOperation::SetIndex { options } => {
809            let option = match options {
810                sql::statements::alter::SetIndexOperation::Fulltext {
811                    column_name,
812                    options,
813                } => SetIndex {
814                    options: Some(set_index::Options::Fulltext(SetFulltext {
815                        column_name: column_name.value,
816                        enable: options.enable,
817                        analyzer: match options.analyzer {
818                            FulltextAnalyzer::English => Analyzer::English.into(),
819                            FulltextAnalyzer::Chinese => Analyzer::Chinese.into(),
820                        },
821                        case_sensitive: options.case_sensitive,
822                        backend: match options.backend {
823                            FulltextBackend::Bloom => PbFulltextBackend::Bloom.into(),
824                            FulltextBackend::Tantivy => PbFulltextBackend::Tantivy.into(),
825                        },
826                        granularity: options.granularity as u64,
827                        false_positive_rate: options.false_positive_rate(),
828                    })),
829                },
830                sql::statements::alter::SetIndexOperation::Inverted { column_name } => SetIndex {
831                    options: Some(set_index::Options::Inverted(SetInverted {
832                        column_name: column_name.value,
833                    })),
834                },
835                sql::statements::alter::SetIndexOperation::Skipping {
836                    column_name,
837                    options,
838                } => SetIndex {
839                    options: Some(set_index::Options::Skipping(SetSkipping {
840                        column_name: column_name.value,
841                        enable: true,
842                        granularity: options.granularity as u64,
843                        false_positive_rate: options.false_positive_rate(),
844                        skipping_index_type: match options.index_type {
845                            SkippingIndexType::BloomFilter => {
846                                PbSkippingIndexType::BloomFilter.into()
847                            }
848                        },
849                    })),
850                },
851            };
852            AlterTableKind::SetIndexes(SetIndexes {
853                set_indexes: vec![option],
854            })
855        }
856        AlterTableOperation::UnsetIndex { options } => {
857            let option = match options {
858                sql::statements::alter::UnsetIndexOperation::Fulltext { column_name } => {
859                    UnsetIndex {
860                        options: Some(unset_index::Options::Fulltext(UnsetFulltext {
861                            column_name: column_name.value,
862                        })),
863                    }
864                }
865                sql::statements::alter::UnsetIndexOperation::Inverted { column_name } => {
866                    UnsetIndex {
867                        options: Some(unset_index::Options::Inverted(UnsetInverted {
868                            column_name: column_name.value,
869                        })),
870                    }
871                }
872                sql::statements::alter::UnsetIndexOperation::Skipping { column_name } => {
873                    UnsetIndex {
874                        options: Some(unset_index::Options::Skipping(UnsetSkipping {
875                            column_name: column_name.value,
876                        })),
877                    }
878                }
879            };
880
881            AlterTableKind::UnsetIndexes(UnsetIndexes {
882                unset_indexes: vec![option],
883            })
884        }
885        AlterTableOperation::DropDefaults { columns } => {
886            AlterTableKind::DropDefaults(DropDefaults {
887                drop_defaults: columns
888                    .into_iter()
889                    .map(|col| {
890                        let column_name = col.0.to_string();
891                        Ok(api::v1::DropDefault { column_name })
892                    })
893                    .collect::<Result<Vec<_>>>()?,
894            })
895        }
896        AlterTableOperation::SetDefaults { defaults } => AlterTableKind::SetDefaults(SetDefaults {
897            set_defaults: defaults
898                .into_iter()
899                .map(|col| {
900                    let column_name = col.column_name.to_string();
901                    let default_constraint = serde_json::to_string(&col.default_constraint)
902                        .context(EncodeJsonSnafu)?
903                        .into_bytes();
904                    Ok(api::v1::SetDefault {
905                        column_name,
906                        default_constraint,
907                    })
908                })
909                .collect::<Result<Vec<_>>>()?,
910        }),
911    };
912
913    Ok(AlterTableExpr {
914        catalog_name,
915        schema_name,
916        table_name,
917        kind: Some(kind),
918    })
919}
920
921/// Try to cast the `[AlterDatabase]` statement into gRPC `[AlterDatabaseExpr]`.
922pub fn to_alter_database_expr(
923    alter_database: AlterDatabase,
924    query_ctx: &QueryContextRef,
925) -> Result<AlterDatabaseExpr> {
926    let catalog = query_ctx.current_catalog();
927    let schema = alter_database.database_name;
928
929    let kind = match alter_database.alter_operation {
930        AlterDatabaseOperation::SetDatabaseOption { options } => {
931            let options = options.into_iter().map(Into::into).collect();
932            AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
933                set_database_options: options,
934            })
935        }
936        AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
937            AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys })
938        }
939    };
940
941    Ok(AlterDatabaseExpr {
942        catalog_name: catalog.to_string(),
943        schema_name: schema.to_string(),
944        kind: Some(kind),
945    })
946}
947
948/// Try to cast the `[CreateViewExpr]` statement into gRPC `[CreateViewExpr]`.
949pub fn to_create_view_expr(
950    stmt: CreateView,
951    logical_plan: Vec<u8>,
952    table_names: Vec<TableName>,
953    columns: Vec<String>,
954    plan_columns: Vec<String>,
955    definition: String,
956    query_ctx: QueryContextRef,
957) -> Result<CreateViewExpr> {
958    let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx)
959        .map_err(BoxedError::new)
960        .context(ExternalSnafu)?;
961
962    let expr = CreateViewExpr {
963        catalog_name,
964        schema_name,
965        view_name,
966        logical_plan,
967        create_if_not_exists: stmt.if_not_exists,
968        or_replace: stmt.or_replace,
969        table_names,
970        columns,
971        plan_columns,
972        definition,
973    };
974
975    Ok(expr)
976}
977
978pub fn to_create_flow_task_expr(
979    create_flow: CreateFlow,
980    query_ctx: &QueryContextRef,
981) -> Result<CreateFlowExpr> {
982    // retrieve sink table name
983    let sink_table_ref = object_name_to_table_reference(create_flow.sink_table_name.clone(), true)
984        .with_context(|_| ConvertIdentifierSnafu {
985            ident: create_flow.sink_table_name.to_string(),
986        })?;
987    let catalog = sink_table_ref
988        .catalog()
989        .unwrap_or(query_ctx.current_catalog())
990        .to_string();
991    let schema = sink_table_ref
992        .schema()
993        .map(|s| s.to_owned())
994        .unwrap_or(query_ctx.current_schema());
995
996    let sink_table_name = TableName {
997        catalog_name: catalog,
998        schema_name: schema,
999        table_name: sink_table_ref.table().to_string(),
1000    };
1001
1002    let source_table_names = extract_tables_from_query(&create_flow.query)
1003        .map(|name| {
1004            let reference =
1005                object_name_to_table_reference(name.clone(), true).with_context(|_| {
1006                    ConvertIdentifierSnafu {
1007                        ident: name.to_string(),
1008                    }
1009                })?;
1010            let catalog = reference
1011                .catalog()
1012                .unwrap_or(query_ctx.current_catalog())
1013                .to_string();
1014            let schema = reference
1015                .schema()
1016                .map(|s| s.to_string())
1017                .unwrap_or(query_ctx.current_schema());
1018
1019            let table_name = TableName {
1020                catalog_name: catalog,
1021                schema_name: schema,
1022                table_name: reference.table().to_string(),
1023            };
1024            Ok(table_name)
1025        })
1026        .collect::<Result<Vec<_>>>()?;
1027
1028    let eval_interval = create_flow.eval_interval;
1029
1030    Ok(CreateFlowExpr {
1031        catalog_name: query_ctx.current_catalog().to_string(),
1032        flow_name: sanitize_flow_name(create_flow.flow_name)?,
1033        source_table_names,
1034        sink_table_name: Some(sink_table_name),
1035        or_replace: create_flow.or_replace,
1036        create_if_not_exists: create_flow.if_not_exists,
1037        expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
1038        eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }),
1039        comment: create_flow.comment.unwrap_or_default(),
1040        sql: create_flow.query.to_string(),
1041        flow_options: Default::default(),
1042    })
1043}
1044
1045/// sanitize the flow name, remove possible quotes
1046fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
1047    ensure!(
1048        flow_name.0.len() == 1,
1049        InvalidFlowNameSnafu {
1050            name: flow_name.to_string(),
1051        }
1052    );
1053    // safety: we've checked flow_name.0 has exactly one element.
1054    Ok(flow_name.0.swap_remove(0).to_string_unquoted())
1055}
1056
1057#[cfg(test)]
1058mod tests {
1059    use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
1060    use datatypes::value::Value;
1061    use session::context::{QueryContext, QueryContextBuilder};
1062    use sql::dialect::GreptimeDbDialect;
1063    use sql::parser::{ParseOptions, ParserContext};
1064    use sql::statements::statement::Statement;
1065    use store_api::storage::ColumnDefaultConstraint;
1066
1067    use super::*;
1068
1069    #[test]
1070    fn test_create_flow_tql_expr() {
1071        let sql = r#"
1072CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1073TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#;
1074        let stmt =
1075            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1076
1077        assert!(
1078            stmt.is_err(),
1079            "Expected error for invalid TQL EVAL parameters: {:#?}",
1080            stmt
1081        );
1082
1083        let sql = r#"
1084CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1085TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests);"#;
1086        let stmt =
1087            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1088                .unwrap()
1089                .pop()
1090                .unwrap();
1091
1092        let Statement::CreateFlow(create_flow) = stmt else {
1093            unreachable!()
1094        };
1095        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1096
1097        let to_dot_sep =
1098            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1099        assert_eq!("calc_reqs", expr.flow_name);
1100        assert_eq!("greptime", expr.catalog_name);
1101        assert_eq!(
1102            "greptime.public.cnt_reqs",
1103            expr.sink_table_name.map(to_dot_sep).unwrap()
1104        );
1105        assert!(expr.source_table_names.is_empty());
1106        assert_eq!(
1107            r#"TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests)"#,
1108            expr.sql
1109        );
1110    }
1111
1112    #[test]
1113    fn test_create_flow_expr() {
1114        let sql = r"
1115CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
1116SELECT
1117    DISTINCT number as dis
1118FROM
1119    distinct_basic;";
1120        let stmt =
1121            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1122                .unwrap()
1123                .pop()
1124                .unwrap();
1125
1126        let Statement::CreateFlow(create_flow) = stmt else {
1127            unreachable!()
1128        };
1129        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1130
1131        let to_dot_sep =
1132            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1133        assert_eq!("test_distinct_basic", expr.flow_name);
1134        assert_eq!("greptime", expr.catalog_name);
1135        assert_eq!(
1136            "greptime.public.out_distinct_basic",
1137            expr.sink_table_name.map(to_dot_sep).unwrap()
1138        );
1139        assert_eq!(1, expr.source_table_names.len());
1140        assert_eq!(
1141            "greptime.public.distinct_basic",
1142            to_dot_sep(expr.source_table_names[0].clone())
1143        );
1144        assert_eq!(
1145            r"SELECT
1146    DISTINCT number as dis
1147FROM
1148    distinct_basic",
1149            expr.sql
1150        );
1151
1152        let sql = r"
1153CREATE FLOW `task_2`
1154SINK TO schema_1.table_1
1155AS
1156SELECT max(c1), min(c2) FROM schema_2.table_2;";
1157        let stmt =
1158            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1159                .unwrap()
1160                .pop()
1161                .unwrap();
1162
1163        let Statement::CreateFlow(create_flow) = stmt else {
1164            unreachable!()
1165        };
1166        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1167
1168        let to_dot_sep =
1169            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1170        assert_eq!("task_2", expr.flow_name);
1171        assert_eq!("greptime", expr.catalog_name);
1172        assert_eq!(
1173            "greptime.schema_1.table_1",
1174            expr.sink_table_name.map(to_dot_sep).unwrap()
1175        );
1176        assert_eq!(1, expr.source_table_names.len());
1177        assert_eq!(
1178            "greptime.schema_2.table_2",
1179            to_dot_sep(expr.source_table_names[0].clone())
1180        );
1181        assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
1182
1183        let sql = r"
1184CREATE FLOW abc.`task_2`
1185SINK TO schema_1.table_1
1186AS
1187SELECT max(c1), min(c2) FROM schema_2.table_2;";
1188        let stmt =
1189            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1190                .unwrap()
1191                .pop()
1192                .unwrap();
1193
1194        let Statement::CreateFlow(create_flow) = stmt else {
1195            unreachable!()
1196        };
1197        let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
1198
1199        assert!(res.is_err());
1200        assert!(
1201            res.unwrap_err()
1202                .to_string()
1203                .contains("Invalid flow name: abc.`task_2`")
1204        );
1205    }
1206
1207    #[test]
1208    fn test_create_to_expr() {
1209        let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(ttl='3days', write_buffer_size='1024KB');";
1210        let stmt =
1211            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1212                .unwrap()
1213                .pop()
1214                .unwrap();
1215
1216        let Statement::CreateTable(create_table) = stmt else {
1217            unreachable!()
1218        };
1219        let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1220        assert_eq!("3days", expr.table_options.get("ttl").unwrap());
1221        assert_eq!(
1222            "1.0MiB",
1223            expr.table_options.get("write_buffer_size").unwrap()
1224        );
1225    }
1226
1227    #[test]
1228    fn test_invalid_create_to_expr() {
1229        let cases = [
1230            // duplicate column declaration
1231            "CREATE TABLE monitor (host STRING primary key, ts TIMESTAMP TIME INDEX, some_column text, some_column string);",
1232            // duplicate primary key
1233            "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, some_column STRING, PRIMARY KEY (some_column, host, some_column));",
1234            // time index is primary key
1235            "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, PRIMARY KEY (host, ts));",
1236        ];
1237
1238        for sql in cases {
1239            let stmt = ParserContext::create_with_dialect(
1240                sql,
1241                &GreptimeDbDialect {},
1242                ParseOptions::default(),
1243            )
1244            .unwrap()
1245            .pop()
1246            .unwrap();
1247            let Statement::CreateTable(create_table) = stmt else {
1248                unreachable!()
1249            };
1250            create_to_expr(&create_table, &QueryContext::arc()).unwrap_err();
1251        }
1252    }
1253
1254    #[test]
1255    fn test_create_to_expr_with_default_timestamp_value() {
1256        let sql = "CREATE TABLE monitor (v double,ts TIMESTAMP default '2024-01-30T00:01:01',TIME INDEX (ts)) engine=mito;";
1257        let stmt =
1258            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1259                .unwrap()
1260                .pop()
1261                .unwrap();
1262
1263        let Statement::CreateTable(create_table) = stmt else {
1264            unreachable!()
1265        };
1266
1267        // query context with system timezone UTC.
1268        let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1269        let ts_column = &expr.column_defs[1];
1270        let constraint = assert_ts_column(ts_column);
1271        assert!(
1272            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1273                         if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1274        );
1275
1276        // query context with timezone `+08:00`
1277        let ctx = QueryContextBuilder::default()
1278            .timezone(Timezone::from_tz_string("+08:00").unwrap())
1279            .build()
1280            .into();
1281        let expr = create_to_expr(&create_table, &ctx).unwrap();
1282        let ts_column = &expr.column_defs[1];
1283        let constraint = assert_ts_column(ts_column);
1284        assert!(
1285            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1286                         if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1287        );
1288    }
1289
1290    fn assert_ts_column(ts_column: &api::v1::ColumnDef) -> ColumnDefaultConstraint {
1291        assert_eq!("ts", ts_column.name);
1292        assert_eq!(
1293            ColumnDataType::TimestampMillisecond as i32,
1294            ts_column.data_type
1295        );
1296        assert!(!ts_column.default_constraint.is_empty());
1297
1298        ColumnDefaultConstraint::try_from(&ts_column.default_constraint[..]).unwrap()
1299    }
1300
1301    #[test]
1302    fn test_to_alter_expr() {
1303        let sql = "ALTER DATABASE greptime SET key1='value1', key2='value2';";
1304        let stmt =
1305            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1306                .unwrap()
1307                .pop()
1308                .unwrap();
1309
1310        let Statement::AlterDatabase(alter_database) = stmt else {
1311            unreachable!()
1312        };
1313
1314        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1315        let kind = expr.kind.unwrap();
1316
1317        let AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
1318            set_database_options,
1319        }) = kind
1320        else {
1321            unreachable!()
1322        };
1323
1324        assert_eq!(2, set_database_options.len());
1325        assert_eq!("key1", set_database_options[0].key);
1326        assert_eq!("value1", set_database_options[0].value);
1327        assert_eq!("key2", set_database_options[1].key);
1328        assert_eq!("value2", set_database_options[1].value);
1329
1330        let sql = "ALTER DATABASE greptime UNSET key1, key2;";
1331        let stmt =
1332            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1333                .unwrap()
1334                .pop()
1335                .unwrap();
1336
1337        let Statement::AlterDatabase(alter_database) = stmt else {
1338            unreachable!()
1339        };
1340
1341        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1342        let kind = expr.kind.unwrap();
1343
1344        let AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys }) = kind else {
1345            unreachable!()
1346        };
1347
1348        assert_eq!(2, keys.len());
1349        assert!(keys.contains(&"key1".to_string()));
1350        assert!(keys.contains(&"key2".to_string()));
1351
1352        let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
1353        let stmt =
1354            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1355                .unwrap()
1356                .pop()
1357                .unwrap();
1358
1359        let Statement::AlterTable(alter_table) = stmt else {
1360            unreachable!()
1361        };
1362
1363        // query context with system timezone UTC.
1364        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1365        let kind = expr.kind.unwrap();
1366
1367        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1368            unreachable!()
1369        };
1370
1371        assert_eq!(1, add_columns.len());
1372        let ts_column = add_columns[0].column_def.clone().unwrap();
1373        let constraint = assert_ts_column(&ts_column);
1374        assert!(
1375            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1376                         if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1377        );
1378
1379        //
1380        // query context with timezone `+08:00`
1381        let ctx = QueryContextBuilder::default()
1382            .timezone(Timezone::from_tz_string("+08:00").unwrap())
1383            .build()
1384            .into();
1385        let expr = to_alter_table_expr(alter_table, &ctx).unwrap();
1386        let kind = expr.kind.unwrap();
1387
1388        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1389            unreachable!()
1390        };
1391
1392        assert_eq!(1, add_columns.len());
1393        let ts_column = add_columns[0].column_def.clone().unwrap();
1394        let constraint = assert_ts_column(&ts_column);
1395        assert!(
1396            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1397                         if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1398        );
1399    }
1400
1401    #[test]
1402    fn test_to_alter_modify_column_type_expr() {
1403        let sql = "ALTER TABLE monitor MODIFY COLUMN mem_usage STRING;";
1404        let stmt =
1405            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1406                .unwrap()
1407                .pop()
1408                .unwrap();
1409
1410        let Statement::AlterTable(alter_table) = stmt else {
1411            unreachable!()
1412        };
1413
1414        // query context with system timezone UTC.
1415        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1416        let kind = expr.kind.unwrap();
1417
1418        let AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
1419            modify_column_types,
1420        }) = kind
1421        else {
1422            unreachable!()
1423        };
1424
1425        assert_eq!(1, modify_column_types.len());
1426        let modify_column_type = &modify_column_types[0];
1427
1428        assert_eq!("mem_usage", modify_column_type.column_name);
1429        assert_eq!(
1430            ColumnDataType::String as i32,
1431            modify_column_type.target_type
1432        );
1433        assert!(modify_column_type.target_type_extension.is_none());
1434    }
1435
1436    #[test]
1437    fn test_to_repartition_request() {
1438        let sql = r#"
1439ALTER TABLE metrics REPARTITION (
1440  device_id < 100
1441) INTO (
1442  device_id < 100 AND area < 'South',
1443  device_id < 100 AND area >= 'South'
1444);"#;
1445        let stmt =
1446            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1447                .unwrap()
1448                .pop()
1449                .unwrap();
1450
1451        let Statement::AlterTable(alter_table) = stmt else {
1452            unreachable!()
1453        };
1454
1455        let request = to_repartition_request(alter_table, &QueryContext::arc()).unwrap();
1456        assert_eq!("greptime", request.catalog_name);
1457        assert_eq!("public", request.schema_name);
1458        assert_eq!("metrics", request.table_name);
1459        assert_eq!(
1460            request
1461                .from_exprs
1462                .into_iter()
1463                .map(|x| x.to_string())
1464                .collect::<Vec<_>>(),
1465            vec!["device_id < 100".to_string()]
1466        );
1467        assert_eq!(
1468            request
1469                .into_exprs
1470                .into_iter()
1471                .map(|x| x.to_string())
1472                .collect::<Vec<_>>(),
1473            vec![
1474                "device_id < 100 AND area < 'South'".to_string(),
1475                "device_id < 100 AND area >= 'South'".to_string()
1476            ]
1477        );
1478    }
1479
1480    fn new_test_table_names() -> Vec<TableName> {
1481        vec![
1482            TableName {
1483                catalog_name: "greptime".to_string(),
1484                schema_name: "public".to_string(),
1485                table_name: "a_table".to_string(),
1486            },
1487            TableName {
1488                catalog_name: "greptime".to_string(),
1489                schema_name: "public".to_string(),
1490                table_name: "b_table".to_string(),
1491            },
1492        ]
1493    }
1494
1495    #[test]
1496    fn test_to_create_view_expr() {
1497        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
1498        let stmt =
1499            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1500                .unwrap()
1501                .pop()
1502                .unwrap();
1503
1504        let Statement::CreateView(stmt) = stmt else {
1505            unreachable!()
1506        };
1507
1508        let logical_plan = vec![1, 2, 3];
1509        let table_names = new_test_table_names();
1510        let columns = vec!["a".to_string()];
1511        let plan_columns = vec!["number".to_string()];
1512
1513        let expr = to_create_view_expr(
1514            stmt,
1515            logical_plan.clone(),
1516            table_names.clone(),
1517            columns.clone(),
1518            plan_columns.clone(),
1519            sql.to_string(),
1520            QueryContext::arc(),
1521        )
1522        .unwrap();
1523
1524        assert_eq!("greptime", expr.catalog_name);
1525        assert_eq!("public", expr.schema_name);
1526        assert_eq!("test", expr.view_name);
1527        assert!(!expr.create_if_not_exists);
1528        assert!(!expr.or_replace);
1529        assert_eq!(logical_plan, expr.logical_plan);
1530        assert_eq!(table_names, expr.table_names);
1531        assert_eq!(sql, expr.definition);
1532        assert_eq!(columns, expr.columns);
1533        assert_eq!(plan_columns, expr.plan_columns);
1534    }
1535
1536    #[test]
1537    fn test_to_create_view_expr_complex() {
1538        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test.test_view AS SELECT * FROM NUMBERS";
1539        let stmt =
1540            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1541                .unwrap()
1542                .pop()
1543                .unwrap();
1544
1545        let Statement::CreateView(stmt) = stmt else {
1546            unreachable!()
1547        };
1548
1549        let logical_plan = vec![1, 2, 3];
1550        let table_names = new_test_table_names();
1551        let columns = vec!["a".to_string()];
1552        let plan_columns = vec!["number".to_string()];
1553
1554        let expr = to_create_view_expr(
1555            stmt,
1556            logical_plan.clone(),
1557            table_names.clone(),
1558            columns.clone(),
1559            plan_columns.clone(),
1560            sql.to_string(),
1561            QueryContext::arc(),
1562        )
1563        .unwrap();
1564
1565        assert_eq!("greptime", expr.catalog_name);
1566        assert_eq!("test", expr.schema_name);
1567        assert_eq!("test_view", expr.view_name);
1568        assert!(expr.create_if_not_exists);
1569        assert!(expr.or_replace);
1570        assert_eq!(logical_plan, expr.logical_plan);
1571        assert_eq!(table_names, expr.table_names);
1572        assert_eq!(sql, expr.definition);
1573        assert_eq!(columns, expr.columns);
1574        assert_eq!(plan_columns, expr.plan_columns);
1575    }
1576
1577    #[test]
1578    fn test_expr_to_create() {
1579        let sql = r#"CREATE TABLE IF NOT EXISTS `tt` (
1580  `timestamp` TIMESTAMP(9) NOT NULL,
1581  `ip_address` STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),
1582  `username` STRING NULL,
1583  `http_method` STRING NULL INVERTED INDEX,
1584  `request_line` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),
1585  `protocol` STRING NULL,
1586  `status_code` INT NULL INVERTED INDEX,
1587  `response_size` BIGINT NULL,
1588  `message` STRING NULL,
1589  TIME INDEX (`timestamp`),
1590  PRIMARY KEY (`username`, `status_code`)
1591)
1592ENGINE=mito
1593WITH(
1594  append_mode = 'true'
1595)"#;
1596        let stmt =
1597            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1598                .unwrap()
1599                .pop()
1600                .unwrap();
1601
1602        let Statement::CreateTable(original_create) = stmt else {
1603            unreachable!()
1604        };
1605
1606        // Convert CreateTable -> CreateTableExpr -> CreateTable
1607        let expr = create_to_expr(&original_create, &QueryContext::arc()).unwrap();
1608
1609        let create_table = expr_to_create(&expr, Some('`')).unwrap();
1610        let new_sql = format!("{:#}", create_table);
1611        assert_eq!(sql, new_sql);
1612    }
1613}