operator/
expr_helper.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19
20use api::helper::ColumnDataTypeWrapper;
21use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
22use api::v1::alter_table_expr::Kind as AlterTableKind;
23use api::v1::column_def::{options_from_column_schema, try_as_column_schema};
24use api::v1::{
25    AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer, ColumnDataType,
26    ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn,
27    DropColumns, DropDefaults, ExpireAfter, FulltextBackend as PbFulltextBackend, ModifyColumnType,
28    ModifyColumnTypes, RenameTable, SemanticType, SetDatabaseOptions, SetDefaults, SetFulltext,
29    SetIndex, SetIndexes, SetInverted, SetSkipping, SetTableOptions,
30    SkippingIndexType as PbSkippingIndexType, TableName, UnsetDatabaseOptions, UnsetFulltext,
31    UnsetIndex, UnsetIndexes, UnsetInverted, UnsetSkipping, UnsetTableOptions, set_index,
32    unset_index,
33};
34use common_error::ext::BoxedError;
35use common_grpc_expr::util::ColumnExpr;
36use common_time::Timezone;
37use datafusion::sql::planner::object_name_to_table_reference;
38use datatypes::schema::{
39    COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
40    COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
41    COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
42    COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
43    ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema,
44    SkippingIndexType,
45};
46use file_engine::FileOptions;
47use query::sql::{
48    check_file_to_table_schema_compatibility, file_column_schemas_to_table,
49    infer_file_table_schema, prepare_file_table_files,
50};
51use session::context::QueryContextRef;
52use session::table_name::table_idents_to_full_name;
53use snafu::{OptionExt, ResultExt, ensure};
54use sql::ast::{
55    ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName, ObjectNamePartExt,
56};
57use sql::dialect::GreptimeDbDialect;
58use sql::parser::ParserContext;
59use sql::statements::alter::{
60    AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
61};
62use sql::statements::create::{
63    Column as SqlColumn, ColumnExtensions, CreateExternalTable, CreateFlow, CreateTable,
64    CreateView, TableConstraint,
65};
66use sql::statements::{
67    OptionMap, column_to_schema, concrete_data_type_to_sql_data_type,
68    sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type, value_to_sql_value,
69};
70use sql::util::extract_tables_from_query;
71use table::requests::{FILE_TABLE_META_KEY, TableOptions};
72use table::table_reference::TableReference;
73#[cfg(feature = "enterprise")]
74pub use trigger::to_create_trigger_task_expr;
75
76use crate::error::{
77    BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
78    ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, FindNewColumnsOnInsertionSnafu,
79    IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidColumnDefSnafu,
80    InvalidFlowNameSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, ParseSqlValueSnafu,
81    PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu,
82};
83
84pub fn create_table_expr_by_column_schemas(
85    table_name: &TableReference<'_>,
86    column_schemas: &[api::v1::ColumnSchema],
87    engine: &str,
88    desc: Option<&str>,
89) -> Result<CreateTableExpr> {
90    let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
91    let expr = common_grpc_expr::util::build_create_table_expr(
92        None,
93        table_name,
94        column_exprs,
95        engine,
96        desc.unwrap_or("Created on insertion"),
97    )
98    .context(BuildCreateExprOnInsertionSnafu)?;
99
100    validate_create_expr(&expr)?;
101    Ok(expr)
102}
103
104pub fn extract_add_columns_expr(
105    schema: &Schema,
106    column_exprs: Vec<ColumnExpr>,
107) -> Result<Option<AddColumns>> {
108    let add_columns = common_grpc_expr::util::extract_new_columns(schema, column_exprs)
109        .context(FindNewColumnsOnInsertionSnafu)?;
110    if let Some(add_columns) = &add_columns {
111        validate_add_columns_expr(add_columns)?;
112    }
113    Ok(add_columns)
114}
115
116//   cpu float64,
117//   memory float64,
118//   TIME INDEX (ts),
119//   PRIMARY KEY(host)
120// ) WITH (location='/var/data/city.csv', format='csv');
121// ```
122// The user needs to specify the TIME INDEX column. If there is no suitable
123// column in the file to use as TIME INDEX, an additional placeholder column
124// needs to be created as the TIME INDEX, and a `DEFAULT <value>` constraint
125// should be added.
126//
127//
128// When the `CREATE EXTERNAL TABLE` statement is in inferred form, like
129// ```sql
130// CREATE EXTERNAL TABLE IF NOT EXISTS city WITH (location='/var/data/city.csv',format='csv');
131// ```
132// 1. If the TIME INDEX column can be inferred from metadata, use that column
133//    as the TIME INDEX. Otherwise,
134// 2. If a column named `greptime_timestamp` exists (with the requirement that
135//    the column is with type TIMESTAMP, otherwise an error is thrown), use
136//    that column as the TIME INDEX. Otherwise,
137// 3. Automatically create the `greptime_timestamp` column and add a `DEFAULT 0`
138//    constraint.
139pub(crate) async fn create_external_expr(
140    create: CreateExternalTable,
141    query_ctx: &QueryContextRef,
142) -> Result<CreateTableExpr> {
143    let (catalog_name, schema_name, table_name) =
144        table_idents_to_full_name(&create.name, query_ctx)
145            .map_err(BoxedError::new)
146            .context(ExternalSnafu)?;
147
148    let mut table_options = create.options.into_map();
149
150    let (object_store, files) = prepare_file_table_files(&table_options)
151        .await
152        .context(PrepareFileTableSnafu)?;
153
154    let file_column_schemas = infer_file_table_schema(&object_store, &files, &table_options)
155        .await
156        .context(InferFileTableSchemaSnafu)?
157        .column_schemas;
158
159    let (time_index, primary_keys, table_column_schemas) = if !create.columns.is_empty() {
160        // expanded form
161        let time_index = find_time_index(&create.constraints)?;
162        let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
163        let column_schemas =
164            columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
165        (time_index, primary_keys, column_schemas)
166    } else {
167        // inferred form
168        let (column_schemas, time_index) = file_column_schemas_to_table(&file_column_schemas);
169        let primary_keys = vec![];
170        (time_index, primary_keys, column_schemas)
171    };
172
173    check_file_to_table_schema_compatibility(&file_column_schemas, &table_column_schemas)
174        .context(SchemaIncompatibleSnafu)?;
175
176    let meta = FileOptions {
177        files,
178        file_column_schemas,
179    };
180    table_options.insert(
181        FILE_TABLE_META_KEY.to_string(),
182        serde_json::to_string(&meta).context(EncodeJsonSnafu)?,
183    );
184
185    let column_defs = column_schemas_to_defs(table_column_schemas, &primary_keys)?;
186    let expr = CreateTableExpr {
187        catalog_name,
188        schema_name,
189        table_name,
190        desc: String::default(),
191        column_defs,
192        time_index,
193        primary_keys,
194        create_if_not_exists: create.if_not_exists,
195        table_options,
196        table_id: None,
197        engine: create.engine.clone(),
198    };
199
200    Ok(expr)
201}
202
203/// Convert `CreateTable` statement to [`CreateTableExpr`] gRPC request.
204pub fn create_to_expr(
205    create: &CreateTable,
206    query_ctx: &QueryContextRef,
207) -> Result<CreateTableExpr> {
208    let (catalog_name, schema_name, table_name) =
209        table_idents_to_full_name(&create.name, query_ctx)
210            .map_err(BoxedError::new)
211            .context(ExternalSnafu)?;
212
213    let time_index = find_time_index(&create.constraints)?;
214    let table_options = HashMap::from(
215        &TableOptions::try_from_iter(create.options.to_str_map())
216            .context(UnrecognizedTableOptionSnafu)?,
217    );
218
219    let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
220
221    let expr = CreateTableExpr {
222        catalog_name,
223        schema_name,
224        table_name,
225        desc: String::default(),
226        column_defs: columns_to_expr(
227            &create.columns,
228            &time_index,
229            &primary_keys,
230            Some(&query_ctx.timezone()),
231        )?,
232        time_index,
233        primary_keys,
234        create_if_not_exists: create.if_not_exists,
235        table_options,
236        table_id: None,
237        engine: create.engine.clone(),
238    };
239
240    validate_create_expr(&expr)?;
241    Ok(expr)
242}
243
244/// Convert gRPC's [`CreateTableExpr`] back to `CreateTable` statement.
245/// You can use `create_table_expr_by_column_schemas` to create a `CreateTableExpr` from column schemas.
246///
247/// # Parameters
248///
249/// * `expr` - The `CreateTableExpr` to convert
250/// * `quote_style` - Optional quote style for identifiers (defaults to MySQL style ` backtick)
251pub fn expr_to_create(expr: &CreateTableExpr, quote_style: Option<char>) -> Result<CreateTable> {
252    let quote_style = quote_style.unwrap_or('`');
253
254    // Convert table name
255    let table_name = ObjectName(vec![sql::ast::ObjectNamePart::Identifier(
256        sql::ast::Ident::with_quote(quote_style, &expr.table_name),
257    )]);
258
259    // Convert columns
260    let mut columns = Vec::with_capacity(expr.column_defs.len());
261    for column_def in &expr.column_defs {
262        let column_schema = try_as_column_schema(column_def).context(InvalidColumnDefSnafu {
263            column: &column_def.name,
264        })?;
265
266        let mut options = Vec::new();
267
268        // Add NULL/NOT NULL constraint
269        if column_def.is_nullable {
270            options.push(ColumnOptionDef {
271                name: None,
272                option: ColumnOption::Null,
273            });
274        } else {
275            options.push(ColumnOptionDef {
276                name: None,
277                option: ColumnOption::NotNull,
278            });
279        }
280
281        // Add DEFAULT constraint if present
282        if let Some(default_constraint) = column_schema.default_constraint() {
283            let expr = match default_constraint {
284                ColumnDefaultConstraint::Value(v) => {
285                    Expr::Value(value_to_sql_value(v).context(ParseSqlValueSnafu)?.into())
286                }
287                ColumnDefaultConstraint::Function(func_expr) => {
288                    ParserContext::parse_function(func_expr, &GreptimeDbDialect {})
289                        .context(ParseSqlSnafu)?
290                }
291            };
292            options.push(ColumnOptionDef {
293                name: None,
294                option: ColumnOption::Default(expr),
295            });
296        }
297
298        // Add COMMENT if present
299        if !column_def.comment.is_empty() {
300            options.push(ColumnOptionDef {
301                name: None,
302                option: ColumnOption::Comment(column_def.comment.clone()),
303            });
304        }
305
306        // Note: We don't add inline PRIMARY KEY options here,
307        // we'll handle all primary keys as constraints instead for consistency
308
309        // Handle column extensions (fulltext, inverted index, skipping index)
310        let mut extensions = ColumnExtensions::default();
311
312        // Add fulltext index options if present
313        if let Ok(Some(opt)) = column_schema.fulltext_options()
314            && opt.enable
315        {
316            let mut map = HashMap::from([
317                (
318                    COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
319                    opt.analyzer.to_string(),
320                ),
321                (
322                    COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
323                    opt.case_sensitive.to_string(),
324                ),
325                (
326                    COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
327                    opt.backend.to_string(),
328                ),
329            ]);
330            if opt.backend == FulltextBackend::Bloom {
331                map.insert(
332                    COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(),
333                    opt.granularity.to_string(),
334                );
335                map.insert(
336                    COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
337                    opt.false_positive_rate().to_string(),
338                );
339            }
340            extensions.fulltext_index_options = Some(map.into());
341        }
342
343        // Add skipping index options if present
344        if let Ok(Some(opt)) = column_schema.skipping_index_options() {
345            let map = HashMap::from([
346                (
347                    COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
348                    opt.granularity.to_string(),
349                ),
350                (
351                    COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
352                    opt.false_positive_rate().to_string(),
353                ),
354                (
355                    COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
356                    opt.index_type.to_string(),
357                ),
358            ]);
359            extensions.skipping_index_options = Some(map.into());
360        }
361
362        // Add inverted index options if present
363        if column_schema.is_inverted_indexed() {
364            extensions.inverted_index_options = Some(HashMap::new().into());
365        }
366
367        let sql_column = SqlColumn {
368            column_def: ColumnDef {
369                name: Ident::with_quote(quote_style, &column_def.name),
370                data_type: concrete_data_type_to_sql_data_type(&column_schema.data_type)
371                    .context(ParseSqlSnafu)?,
372                options,
373            },
374            extensions,
375        };
376
377        columns.push(sql_column);
378    }
379
380    // Convert constraints
381    let mut constraints = Vec::new();
382
383    // Add TIME INDEX constraint
384    constraints.push(TableConstraint::TimeIndex {
385        column: Ident::with_quote(quote_style, &expr.time_index),
386    });
387
388    // Add PRIMARY KEY constraint (always add as constraint for consistency)
389    if !expr.primary_keys.is_empty() {
390        let primary_key_columns: Vec<Ident> = expr
391            .primary_keys
392            .iter()
393            .map(|pk| Ident::with_quote(quote_style, pk))
394            .collect();
395
396        constraints.push(TableConstraint::PrimaryKey {
397            columns: primary_key_columns,
398        });
399    }
400
401    // Convert table options
402    let mut options = OptionMap::default();
403    for (key, value) in &expr.table_options {
404        options.insert(key.clone(), value.clone());
405    }
406
407    Ok(CreateTable {
408        if_not_exists: expr.create_if_not_exists,
409        table_id: expr.table_id.as_ref().map(|tid| tid.id).unwrap_or(0),
410        name: table_name,
411        columns,
412        engine: expr.engine.clone(),
413        constraints,
414        options,
415        partitions: None,
416    })
417}
418
419/// Validate the [`CreateTableExpr`] request.
420pub fn validate_create_expr(create: &CreateTableExpr) -> Result<()> {
421    // construct column list
422    let mut column_to_indices = HashMap::with_capacity(create.column_defs.len());
423    for (idx, column) in create.column_defs.iter().enumerate() {
424        if let Some(indices) = column_to_indices.get(&column.name) {
425            return InvalidSqlSnafu {
426                err_msg: format!(
427                    "column name `{}` is duplicated at index {} and {}",
428                    column.name, indices, idx
429                ),
430            }
431            .fail();
432        }
433        column_to_indices.insert(&column.name, idx);
434    }
435
436    // verify time_index exists
437    let _ = column_to_indices
438        .get(&create.time_index)
439        .with_context(|| InvalidSqlSnafu {
440            err_msg: format!(
441                "column name `{}` is not found in column list",
442                create.time_index
443            ),
444        })?;
445
446    // verify primary_key exists
447    for pk in &create.primary_keys {
448        let _ = column_to_indices
449            .get(&pk)
450            .with_context(|| InvalidSqlSnafu {
451                err_msg: format!("column name `{}` is not found in column list", pk),
452            })?;
453    }
454
455    // construct primary_key set
456    let mut pk_set = HashSet::new();
457    for pk in &create.primary_keys {
458        if !pk_set.insert(pk) {
459            return InvalidSqlSnafu {
460                err_msg: format!("column name `{}` is duplicated in primary keys", pk),
461            }
462            .fail();
463        }
464    }
465
466    // verify time index is not primary key
467    if pk_set.contains(&create.time_index) {
468        return InvalidSqlSnafu {
469            err_msg: format!(
470                "column name `{}` is both primary key and time index",
471                create.time_index
472            ),
473        }
474        .fail();
475    }
476
477    for column in &create.column_defs {
478        // verify do not contain interval type column issue #3235
479        if is_interval_type(&column.data_type()) {
480            return InvalidSqlSnafu {
481                err_msg: format!(
482                    "column name `{}` is interval type, which is not supported",
483                    column.name
484                ),
485            }
486            .fail();
487        }
488        // verify do not contain datetime type column issue #5489
489        if is_date_time_type(&column.data_type()) {
490            return InvalidSqlSnafu {
491                err_msg: format!(
492                    "column name `{}` is datetime type, which is not supported, please use `timestamp` type instead",
493                    column.name
494                ),
495            }
496            .fail();
497        }
498    }
499    Ok(())
500}
501
502fn validate_add_columns_expr(add_columns: &AddColumns) -> Result<()> {
503    for add_column in &add_columns.add_columns {
504        let Some(column_def) = &add_column.column_def else {
505            continue;
506        };
507        if is_date_time_type(&column_def.data_type()) {
508            return InvalidSqlSnafu {
509                    err_msg: format!("column name `{}` is datetime type, which is not supported, please use `timestamp` type instead", column_def.name),
510                }
511                .fail();
512        }
513        if is_interval_type(&column_def.data_type()) {
514            return InvalidSqlSnafu {
515                err_msg: format!(
516                    "column name `{}` is interval type, which is not supported",
517                    column_def.name
518                ),
519            }
520            .fail();
521        }
522    }
523    Ok(())
524}
525
526fn is_date_time_type(data_type: &ColumnDataType) -> bool {
527    matches!(data_type, ColumnDataType::Datetime)
528}
529
530fn is_interval_type(data_type: &ColumnDataType) -> bool {
531    matches!(
532        data_type,
533        ColumnDataType::IntervalYearMonth
534            | ColumnDataType::IntervalDayTime
535            | ColumnDataType::IntervalMonthDayNano
536    )
537}
538
539fn find_primary_keys(
540    columns: &[SqlColumn],
541    constraints: &[TableConstraint],
542) -> Result<Vec<String>> {
543    let columns_pk = columns
544        .iter()
545        .filter_map(|x| {
546            if x.options().iter().any(|o| {
547                matches!(
548                    o.option,
549                    ColumnOption::Unique {
550                        is_primary: true,
551                        ..
552                    }
553                )
554            }) {
555                Some(x.name().value.clone())
556            } else {
557                None
558            }
559        })
560        .collect::<Vec<String>>();
561
562    ensure!(
563        columns_pk.len() <= 1,
564        IllegalPrimaryKeysDefSnafu {
565            msg: "not allowed to inline multiple primary keys in columns options"
566        }
567    );
568
569    let constraints_pk = constraints
570        .iter()
571        .filter_map(|constraint| match constraint {
572            TableConstraint::PrimaryKey { columns, .. } => {
573                Some(columns.iter().map(|ident| ident.value.clone()))
574            }
575            _ => None,
576        })
577        .flatten()
578        .collect::<Vec<String>>();
579
580    ensure!(
581        columns_pk.is_empty() || constraints_pk.is_empty(),
582        IllegalPrimaryKeysDefSnafu {
583            msg: "found definitions of primary keys in multiple places"
584        }
585    );
586
587    let mut primary_keys = Vec::with_capacity(columns_pk.len() + constraints_pk.len());
588    primary_keys.extend(columns_pk);
589    primary_keys.extend(constraints_pk);
590    Ok(primary_keys)
591}
592
593pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
594    let time_index = constraints
595        .iter()
596        .filter_map(|constraint| match constraint {
597            TableConstraint::TimeIndex { column, .. } => Some(&column.value),
598            _ => None,
599        })
600        .collect::<Vec<&String>>();
601    ensure!(
602        time_index.len() == 1,
603        InvalidSqlSnafu {
604            err_msg: "must have one and only one TimeIndex columns",
605        }
606    );
607    Ok(time_index[0].clone())
608}
609
610fn columns_to_expr(
611    column_defs: &[SqlColumn],
612    time_index: &str,
613    primary_keys: &[String],
614    timezone: Option<&Timezone>,
615) -> Result<Vec<api::v1::ColumnDef>> {
616    let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
617    column_schemas_to_defs(column_schemas, primary_keys)
618}
619
620fn columns_to_column_schemas(
621    columns: &[SqlColumn],
622    time_index: &str,
623    timezone: Option<&Timezone>,
624) -> Result<Vec<ColumnSchema>> {
625    columns
626        .iter()
627        .map(|c| column_to_schema(c, time_index, timezone).context(ParseSqlSnafu))
628        .collect::<Result<Vec<ColumnSchema>>>()
629}
630
631// TODO(weny): refactor this function to use `try_as_column_def`
632pub fn column_schemas_to_defs(
633    column_schemas: Vec<ColumnSchema>,
634    primary_keys: &[String],
635) -> Result<Vec<api::v1::ColumnDef>> {
636    let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
637        .iter()
638        .map(|c| {
639            ColumnDataTypeWrapper::try_from(c.data_type.clone())
640                .map(|w| w.to_parts())
641                .context(ColumnDataTypeSnafu)
642        })
643        .collect::<Result<Vec<_>>>()?;
644
645    column_schemas
646        .iter()
647        .zip(column_datatypes)
648        .map(|(schema, datatype)| {
649            let semantic_type = if schema.is_time_index() {
650                SemanticType::Timestamp
651            } else if primary_keys.contains(&schema.name) {
652                SemanticType::Tag
653            } else {
654                SemanticType::Field
655            } as i32;
656            let comment = schema
657                .metadata()
658                .get(COMMENT_KEY)
659                .cloned()
660                .unwrap_or_default();
661
662            Ok(api::v1::ColumnDef {
663                name: schema.name.clone(),
664                data_type: datatype.0 as i32,
665                is_nullable: schema.is_nullable(),
666                default_constraint: match schema.default_constraint() {
667                    None => vec![],
668                    Some(v) => {
669                        v.clone()
670                            .try_into()
671                            .context(ConvertColumnDefaultConstraintSnafu {
672                                column_name: &schema.name,
673                            })?
674                    }
675                },
676                semantic_type,
677                comment,
678                datatype_extension: datatype.1,
679                options: options_from_column_schema(schema),
680            })
681        })
682        .collect()
683}
684
685#[derive(Debug, Clone, PartialEq, Eq)]
686pub struct RepartitionRequest {
687    pub catalog_name: String,
688    pub schema_name: String,
689    pub table_name: String,
690    pub from_exprs: Vec<Expr>,
691    pub into_exprs: Vec<Expr>,
692}
693
694pub(crate) fn to_repartition_request(
695    alter_table: AlterTable,
696    query_ctx: &QueryContextRef,
697) -> Result<RepartitionRequest> {
698    let (catalog_name, schema_name, table_name) =
699        table_idents_to_full_name(alter_table.table_name(), query_ctx)
700            .map_err(BoxedError::new)
701            .context(ExternalSnafu)?;
702
703    let AlterTableOperation::Repartition { operation } = alter_table.alter_operation else {
704        return InvalidSqlSnafu {
705            err_msg: "expected REPARTITION operation",
706        }
707        .fail();
708    };
709
710    Ok(RepartitionRequest {
711        catalog_name,
712        schema_name,
713        table_name,
714        from_exprs: operation.from_exprs,
715        into_exprs: operation.into_exprs,
716    })
717}
718
719/// Converts a SQL alter table statement into a gRPC alter table expression.
720pub(crate) fn to_alter_table_expr(
721    alter_table: AlterTable,
722    query_ctx: &QueryContextRef,
723) -> Result<AlterTableExpr> {
724    let (catalog_name, schema_name, table_name) =
725        table_idents_to_full_name(alter_table.table_name(), query_ctx)
726            .map_err(BoxedError::new)
727            .context(ExternalSnafu)?;
728
729    let kind = match alter_table.alter_operation {
730        AlterTableOperation::AddConstraint(_) => {
731            return NotSupportedSnafu {
732                feat: "ADD CONSTRAINT",
733            }
734            .fail();
735        }
736        AlterTableOperation::AddColumns { add_columns } => AlterTableKind::AddColumns(AddColumns {
737            add_columns: add_columns
738                .into_iter()
739                .map(|add_column| {
740                    let column_def = sql_column_def_to_grpc_column_def(
741                        &add_column.column_def,
742                        Some(&query_ctx.timezone()),
743                    )
744                    .map_err(BoxedError::new)
745                    .context(ExternalSnafu)?;
746                    if is_interval_type(&column_def.data_type()) {
747                        return NotSupportedSnafu {
748                            feat: "Add column with interval type",
749                        }
750                        .fail();
751                    }
752                    Ok(AddColumn {
753                        column_def: Some(column_def),
754                        location: add_column.location.as_ref().map(From::from),
755                        add_if_not_exists: add_column.add_if_not_exists,
756                    })
757                })
758                .collect::<Result<Vec<AddColumn>>>()?,
759        }),
760        AlterTableOperation::ModifyColumnType {
761            column_name,
762            target_type,
763        } => {
764            let target_type =
765                sql_data_type_to_concrete_data_type(&target_type).context(ParseSqlSnafu)?;
766            let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
767                .map(|w| w.to_parts())
768                .context(ColumnDataTypeSnafu)?;
769            if is_interval_type(&target_type) {
770                return NotSupportedSnafu {
771                    feat: "Modify column type to interval type",
772                }
773                .fail();
774            }
775            AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
776                modify_column_types: vec![ModifyColumnType {
777                    column_name: column_name.value,
778                    target_type: target_type as i32,
779                    target_type_extension,
780                }],
781            })
782        }
783        AlterTableOperation::DropColumn { name } => AlterTableKind::DropColumns(DropColumns {
784            drop_columns: vec![DropColumn {
785                name: name.value.clone(),
786            }],
787        }),
788        AlterTableOperation::RenameTable { new_table_name } => {
789            AlterTableKind::RenameTable(RenameTable {
790                new_table_name: new_table_name.clone(),
791            })
792        }
793        AlterTableOperation::SetTableOptions { options } => {
794            AlterTableKind::SetTableOptions(SetTableOptions {
795                table_options: options.into_iter().map(Into::into).collect(),
796            })
797        }
798        AlterTableOperation::UnsetTableOptions { keys } => {
799            AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys })
800        }
801        AlterTableOperation::Repartition { .. } => {
802            return NotSupportedSnafu {
803                feat: "ALTER TABLE ... REPARTITION",
804            }
805            .fail();
806        }
807        AlterTableOperation::SetIndex { options } => {
808            let option = match options {
809                sql::statements::alter::SetIndexOperation::Fulltext {
810                    column_name,
811                    options,
812                } => SetIndex {
813                    options: Some(set_index::Options::Fulltext(SetFulltext {
814                        column_name: column_name.value,
815                        enable: options.enable,
816                        analyzer: match options.analyzer {
817                            FulltextAnalyzer::English => Analyzer::English.into(),
818                            FulltextAnalyzer::Chinese => Analyzer::Chinese.into(),
819                        },
820                        case_sensitive: options.case_sensitive,
821                        backend: match options.backend {
822                            FulltextBackend::Bloom => PbFulltextBackend::Bloom.into(),
823                            FulltextBackend::Tantivy => PbFulltextBackend::Tantivy.into(),
824                        },
825                        granularity: options.granularity as u64,
826                        false_positive_rate: options.false_positive_rate(),
827                    })),
828                },
829                sql::statements::alter::SetIndexOperation::Inverted { column_name } => SetIndex {
830                    options: Some(set_index::Options::Inverted(SetInverted {
831                        column_name: column_name.value,
832                    })),
833                },
834                sql::statements::alter::SetIndexOperation::Skipping {
835                    column_name,
836                    options,
837                } => SetIndex {
838                    options: Some(set_index::Options::Skipping(SetSkipping {
839                        column_name: column_name.value,
840                        enable: true,
841                        granularity: options.granularity as u64,
842                        false_positive_rate: options.false_positive_rate(),
843                        skipping_index_type: match options.index_type {
844                            SkippingIndexType::BloomFilter => {
845                                PbSkippingIndexType::BloomFilter.into()
846                            }
847                        },
848                    })),
849                },
850            };
851            AlterTableKind::SetIndexes(SetIndexes {
852                set_indexes: vec![option],
853            })
854        }
855        AlterTableOperation::UnsetIndex { options } => {
856            let option = match options {
857                sql::statements::alter::UnsetIndexOperation::Fulltext { column_name } => {
858                    UnsetIndex {
859                        options: Some(unset_index::Options::Fulltext(UnsetFulltext {
860                            column_name: column_name.value,
861                        })),
862                    }
863                }
864                sql::statements::alter::UnsetIndexOperation::Inverted { column_name } => {
865                    UnsetIndex {
866                        options: Some(unset_index::Options::Inverted(UnsetInverted {
867                            column_name: column_name.value,
868                        })),
869                    }
870                }
871                sql::statements::alter::UnsetIndexOperation::Skipping { column_name } => {
872                    UnsetIndex {
873                        options: Some(unset_index::Options::Skipping(UnsetSkipping {
874                            column_name: column_name.value,
875                        })),
876                    }
877                }
878            };
879
880            AlterTableKind::UnsetIndexes(UnsetIndexes {
881                unset_indexes: vec![option],
882            })
883        }
884        AlterTableOperation::DropDefaults { columns } => {
885            AlterTableKind::DropDefaults(DropDefaults {
886                drop_defaults: columns
887                    .into_iter()
888                    .map(|col| {
889                        let column_name = col.0.to_string();
890                        Ok(api::v1::DropDefault { column_name })
891                    })
892                    .collect::<Result<Vec<_>>>()?,
893            })
894        }
895        AlterTableOperation::SetDefaults { defaults } => AlterTableKind::SetDefaults(SetDefaults {
896            set_defaults: defaults
897                .into_iter()
898                .map(|col| {
899                    let column_name = col.column_name.to_string();
900                    let default_constraint = serde_json::to_string(&col.default_constraint)
901                        .context(EncodeJsonSnafu)?
902                        .into_bytes();
903                    Ok(api::v1::SetDefault {
904                        column_name,
905                        default_constraint,
906                    })
907                })
908                .collect::<Result<Vec<_>>>()?,
909        }),
910    };
911
912    Ok(AlterTableExpr {
913        catalog_name,
914        schema_name,
915        table_name,
916        kind: Some(kind),
917    })
918}
919
920/// Try to cast the `[AlterDatabase]` statement into gRPC `[AlterDatabaseExpr]`.
921pub fn to_alter_database_expr(
922    alter_database: AlterDatabase,
923    query_ctx: &QueryContextRef,
924) -> Result<AlterDatabaseExpr> {
925    let catalog = query_ctx.current_catalog();
926    let schema = alter_database.database_name;
927
928    let kind = match alter_database.alter_operation {
929        AlterDatabaseOperation::SetDatabaseOption { options } => {
930            let options = options.into_iter().map(Into::into).collect();
931            AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
932                set_database_options: options,
933            })
934        }
935        AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
936            AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys })
937        }
938    };
939
940    Ok(AlterDatabaseExpr {
941        catalog_name: catalog.to_string(),
942        schema_name: schema.to_string(),
943        kind: Some(kind),
944    })
945}
946
947/// Try to cast the `[CreateViewExpr]` statement into gRPC `[CreateViewExpr]`.
948pub fn to_create_view_expr(
949    stmt: CreateView,
950    logical_plan: Vec<u8>,
951    table_names: Vec<TableName>,
952    columns: Vec<String>,
953    plan_columns: Vec<String>,
954    definition: String,
955    query_ctx: QueryContextRef,
956) -> Result<CreateViewExpr> {
957    let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx)
958        .map_err(BoxedError::new)
959        .context(ExternalSnafu)?;
960
961    let expr = CreateViewExpr {
962        catalog_name,
963        schema_name,
964        view_name,
965        logical_plan,
966        create_if_not_exists: stmt.if_not_exists,
967        or_replace: stmt.or_replace,
968        table_names,
969        columns,
970        plan_columns,
971        definition,
972    };
973
974    Ok(expr)
975}
976
977pub fn to_create_flow_task_expr(
978    create_flow: CreateFlow,
979    query_ctx: &QueryContextRef,
980) -> Result<CreateFlowExpr> {
981    // retrieve sink table name
982    let sink_table_ref = object_name_to_table_reference(create_flow.sink_table_name.clone(), true)
983        .with_context(|_| ConvertIdentifierSnafu {
984            ident: create_flow.sink_table_name.to_string(),
985        })?;
986    let catalog = sink_table_ref
987        .catalog()
988        .unwrap_or(query_ctx.current_catalog())
989        .to_string();
990    let schema = sink_table_ref
991        .schema()
992        .map(|s| s.to_owned())
993        .unwrap_or(query_ctx.current_schema());
994
995    let sink_table_name = TableName {
996        catalog_name: catalog,
997        schema_name: schema,
998        table_name: sink_table_ref.table().to_string(),
999    };
1000
1001    let source_table_names = extract_tables_from_query(&create_flow.query)
1002        .map(|name| {
1003            let reference =
1004                object_name_to_table_reference(name.clone(), true).with_context(|_| {
1005                    ConvertIdentifierSnafu {
1006                        ident: name.to_string(),
1007                    }
1008                })?;
1009            let catalog = reference
1010                .catalog()
1011                .unwrap_or(query_ctx.current_catalog())
1012                .to_string();
1013            let schema = reference
1014                .schema()
1015                .map(|s| s.to_string())
1016                .unwrap_or(query_ctx.current_schema());
1017
1018            let table_name = TableName {
1019                catalog_name: catalog,
1020                schema_name: schema,
1021                table_name: reference.table().to_string(),
1022            };
1023            Ok(table_name)
1024        })
1025        .collect::<Result<Vec<_>>>()?;
1026
1027    let eval_interval = create_flow.eval_interval;
1028
1029    Ok(CreateFlowExpr {
1030        catalog_name: query_ctx.current_catalog().to_string(),
1031        flow_name: sanitize_flow_name(create_flow.flow_name)?,
1032        source_table_names,
1033        sink_table_name: Some(sink_table_name),
1034        or_replace: create_flow.or_replace,
1035        create_if_not_exists: create_flow.if_not_exists,
1036        expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
1037        eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }),
1038        comment: create_flow.comment.unwrap_or_default(),
1039        sql: create_flow.query.to_string(),
1040        flow_options: Default::default(),
1041    })
1042}
1043
1044/// sanitize the flow name, remove possible quotes
1045fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
1046    ensure!(
1047        flow_name.0.len() == 1,
1048        InvalidFlowNameSnafu {
1049            name: flow_name.to_string(),
1050        }
1051    );
1052    // safety: we've checked flow_name.0 has exactly one element.
1053    Ok(flow_name.0.swap_remove(0).to_string_unquoted())
1054}
1055
1056#[cfg(test)]
1057mod tests {
1058    use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
1059    use datatypes::value::Value;
1060    use session::context::{QueryContext, QueryContextBuilder};
1061    use sql::dialect::GreptimeDbDialect;
1062    use sql::parser::{ParseOptions, ParserContext};
1063    use sql::statements::statement::Statement;
1064    use store_api::storage::ColumnDefaultConstraint;
1065
1066    use super::*;
1067
1068    #[test]
1069    fn test_create_flow_tql_expr() {
1070        let sql = r#"
1071CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1072TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#;
1073        let stmt =
1074            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1075
1076        assert!(
1077            stmt.is_err(),
1078            "Expected error for invalid TQL EVAL parameters: {:#?}",
1079            stmt
1080        );
1081
1082        let sql = r#"
1083CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1084TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests);"#;
1085        let stmt =
1086            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1087                .unwrap()
1088                .pop()
1089                .unwrap();
1090
1091        let Statement::CreateFlow(create_flow) = stmt else {
1092            unreachable!()
1093        };
1094        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1095
1096        let to_dot_sep =
1097            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1098        assert_eq!("calc_reqs", expr.flow_name);
1099        assert_eq!("greptime", expr.catalog_name);
1100        assert_eq!(
1101            "greptime.public.cnt_reqs",
1102            expr.sink_table_name.map(to_dot_sep).unwrap()
1103        );
1104        assert!(expr.source_table_names.is_empty());
1105        assert_eq!(
1106            r#"TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests)"#,
1107            expr.sql
1108        );
1109    }
1110
1111    #[test]
1112    fn test_create_flow_expr() {
1113        let sql = r"
1114CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
1115SELECT
1116    DISTINCT number as dis
1117FROM
1118    distinct_basic;";
1119        let stmt =
1120            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1121                .unwrap()
1122                .pop()
1123                .unwrap();
1124
1125        let Statement::CreateFlow(create_flow) = stmt else {
1126            unreachable!()
1127        };
1128        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1129
1130        let to_dot_sep =
1131            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1132        assert_eq!("test_distinct_basic", expr.flow_name);
1133        assert_eq!("greptime", expr.catalog_name);
1134        assert_eq!(
1135            "greptime.public.out_distinct_basic",
1136            expr.sink_table_name.map(to_dot_sep).unwrap()
1137        );
1138        assert_eq!(1, expr.source_table_names.len());
1139        assert_eq!(
1140            "greptime.public.distinct_basic",
1141            to_dot_sep(expr.source_table_names[0].clone())
1142        );
1143        assert_eq!(
1144            r"SELECT
1145    DISTINCT number as dis
1146FROM
1147    distinct_basic",
1148            expr.sql
1149        );
1150
1151        let sql = r"
1152CREATE FLOW `task_2`
1153SINK TO schema_1.table_1
1154AS
1155SELECT max(c1), min(c2) FROM schema_2.table_2;";
1156        let stmt =
1157            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1158                .unwrap()
1159                .pop()
1160                .unwrap();
1161
1162        let Statement::CreateFlow(create_flow) = stmt else {
1163            unreachable!()
1164        };
1165        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1166
1167        let to_dot_sep =
1168            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1169        assert_eq!("task_2", expr.flow_name);
1170        assert_eq!("greptime", expr.catalog_name);
1171        assert_eq!(
1172            "greptime.schema_1.table_1",
1173            expr.sink_table_name.map(to_dot_sep).unwrap()
1174        );
1175        assert_eq!(1, expr.source_table_names.len());
1176        assert_eq!(
1177            "greptime.schema_2.table_2",
1178            to_dot_sep(expr.source_table_names[0].clone())
1179        );
1180        assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
1181
1182        let sql = r"
1183CREATE FLOW abc.`task_2`
1184SINK TO schema_1.table_1
1185AS
1186SELECT max(c1), min(c2) FROM schema_2.table_2;";
1187        let stmt =
1188            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1189                .unwrap()
1190                .pop()
1191                .unwrap();
1192
1193        let Statement::CreateFlow(create_flow) = stmt else {
1194            unreachable!()
1195        };
1196        let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
1197
1198        assert!(res.is_err());
1199        assert!(
1200            res.unwrap_err()
1201                .to_string()
1202                .contains("Invalid flow name: abc.`task_2`")
1203        );
1204    }
1205
1206    #[test]
1207    fn test_create_to_expr() {
1208        let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(ttl='3days', write_buffer_size='1024KB');";
1209        let stmt =
1210            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1211                .unwrap()
1212                .pop()
1213                .unwrap();
1214
1215        let Statement::CreateTable(create_table) = stmt else {
1216            unreachable!()
1217        };
1218        let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1219        assert_eq!("3days", expr.table_options.get("ttl").unwrap());
1220        assert_eq!(
1221            "1.0MiB",
1222            expr.table_options.get("write_buffer_size").unwrap()
1223        );
1224    }
1225
1226    #[test]
1227    fn test_invalid_create_to_expr() {
1228        let cases = [
1229            // duplicate column declaration
1230            "CREATE TABLE monitor (host STRING primary key, ts TIMESTAMP TIME INDEX, some_column text, some_column string);",
1231            // duplicate primary key
1232            "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, some_column STRING, PRIMARY KEY (some_column, host, some_column));",
1233            // time index is primary key
1234            "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, PRIMARY KEY (host, ts));",
1235        ];
1236
1237        for sql in cases {
1238            let stmt = ParserContext::create_with_dialect(
1239                sql,
1240                &GreptimeDbDialect {},
1241                ParseOptions::default(),
1242            )
1243            .unwrap()
1244            .pop()
1245            .unwrap();
1246            let Statement::CreateTable(create_table) = stmt else {
1247                unreachable!()
1248            };
1249            create_to_expr(&create_table, &QueryContext::arc()).unwrap_err();
1250        }
1251    }
1252
1253    #[test]
1254    fn test_create_to_expr_with_default_timestamp_value() {
1255        let sql = "CREATE TABLE monitor (v double,ts TIMESTAMP default '2024-01-30T00:01:01',TIME INDEX (ts)) engine=mito;";
1256        let stmt =
1257            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1258                .unwrap()
1259                .pop()
1260                .unwrap();
1261
1262        let Statement::CreateTable(create_table) = stmt else {
1263            unreachable!()
1264        };
1265
1266        // query context with system timezone UTC.
1267        let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1268        let ts_column = &expr.column_defs[1];
1269        let constraint = assert_ts_column(ts_column);
1270        assert!(
1271            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1272                         if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1273        );
1274
1275        // query context with timezone `+08:00`
1276        let ctx = QueryContextBuilder::default()
1277            .timezone(Timezone::from_tz_string("+08:00").unwrap())
1278            .build()
1279            .into();
1280        let expr = create_to_expr(&create_table, &ctx).unwrap();
1281        let ts_column = &expr.column_defs[1];
1282        let constraint = assert_ts_column(ts_column);
1283        assert!(
1284            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1285                         if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1286        );
1287    }
1288
1289    fn assert_ts_column(ts_column: &api::v1::ColumnDef) -> ColumnDefaultConstraint {
1290        assert_eq!("ts", ts_column.name);
1291        assert_eq!(
1292            ColumnDataType::TimestampMillisecond as i32,
1293            ts_column.data_type
1294        );
1295        assert!(!ts_column.default_constraint.is_empty());
1296
1297        ColumnDefaultConstraint::try_from(&ts_column.default_constraint[..]).unwrap()
1298    }
1299
1300    #[test]
1301    fn test_to_alter_expr() {
1302        let sql = "ALTER DATABASE greptime SET key1='value1', key2='value2';";
1303        let stmt =
1304            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1305                .unwrap()
1306                .pop()
1307                .unwrap();
1308
1309        let Statement::AlterDatabase(alter_database) = stmt else {
1310            unreachable!()
1311        };
1312
1313        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1314        let kind = expr.kind.unwrap();
1315
1316        let AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
1317            set_database_options,
1318        }) = kind
1319        else {
1320            unreachable!()
1321        };
1322
1323        assert_eq!(2, set_database_options.len());
1324        assert_eq!("key1", set_database_options[0].key);
1325        assert_eq!("value1", set_database_options[0].value);
1326        assert_eq!("key2", set_database_options[1].key);
1327        assert_eq!("value2", set_database_options[1].value);
1328
1329        let sql = "ALTER DATABASE greptime UNSET key1, key2;";
1330        let stmt =
1331            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1332                .unwrap()
1333                .pop()
1334                .unwrap();
1335
1336        let Statement::AlterDatabase(alter_database) = stmt else {
1337            unreachable!()
1338        };
1339
1340        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1341        let kind = expr.kind.unwrap();
1342
1343        let AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys }) = kind else {
1344            unreachable!()
1345        };
1346
1347        assert_eq!(2, keys.len());
1348        assert!(keys.contains(&"key1".to_string()));
1349        assert!(keys.contains(&"key2".to_string()));
1350
1351        let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
1352        let stmt =
1353            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1354                .unwrap()
1355                .pop()
1356                .unwrap();
1357
1358        let Statement::AlterTable(alter_table) = stmt else {
1359            unreachable!()
1360        };
1361
1362        // query context with system timezone UTC.
1363        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1364        let kind = expr.kind.unwrap();
1365
1366        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1367            unreachable!()
1368        };
1369
1370        assert_eq!(1, add_columns.len());
1371        let ts_column = add_columns[0].column_def.clone().unwrap();
1372        let constraint = assert_ts_column(&ts_column);
1373        assert!(
1374            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1375                         if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1376        );
1377
1378        //
1379        // query context with timezone `+08:00`
1380        let ctx = QueryContextBuilder::default()
1381            .timezone(Timezone::from_tz_string("+08:00").unwrap())
1382            .build()
1383            .into();
1384        let expr = to_alter_table_expr(alter_table, &ctx).unwrap();
1385        let kind = expr.kind.unwrap();
1386
1387        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1388            unreachable!()
1389        };
1390
1391        assert_eq!(1, add_columns.len());
1392        let ts_column = add_columns[0].column_def.clone().unwrap();
1393        let constraint = assert_ts_column(&ts_column);
1394        assert!(
1395            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1396                         if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1397        );
1398    }
1399
1400    #[test]
1401    fn test_to_alter_modify_column_type_expr() {
1402        let sql = "ALTER TABLE monitor MODIFY COLUMN mem_usage STRING;";
1403        let stmt =
1404            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1405                .unwrap()
1406                .pop()
1407                .unwrap();
1408
1409        let Statement::AlterTable(alter_table) = stmt else {
1410            unreachable!()
1411        };
1412
1413        // query context with system timezone UTC.
1414        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1415        let kind = expr.kind.unwrap();
1416
1417        let AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
1418            modify_column_types,
1419        }) = kind
1420        else {
1421            unreachable!()
1422        };
1423
1424        assert_eq!(1, modify_column_types.len());
1425        let modify_column_type = &modify_column_types[0];
1426
1427        assert_eq!("mem_usage", modify_column_type.column_name);
1428        assert_eq!(
1429            ColumnDataType::String as i32,
1430            modify_column_type.target_type
1431        );
1432        assert!(modify_column_type.target_type_extension.is_none());
1433    }
1434
1435    #[test]
1436    fn test_to_repartition_request() {
1437        let sql = r#"
1438ALTER TABLE metrics REPARTITION (
1439  device_id < 100
1440) INTO (
1441  device_id < 100 AND area < 'South',
1442  device_id < 100 AND area >= 'South'
1443);"#;
1444        let stmt =
1445            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1446                .unwrap()
1447                .pop()
1448                .unwrap();
1449
1450        let Statement::AlterTable(alter_table) = stmt else {
1451            unreachable!()
1452        };
1453
1454        let request = to_repartition_request(alter_table, &QueryContext::arc()).unwrap();
1455        assert_eq!("greptime", request.catalog_name);
1456        assert_eq!("public", request.schema_name);
1457        assert_eq!("metrics", request.table_name);
1458        assert_eq!(
1459            request
1460                .from_exprs
1461                .into_iter()
1462                .map(|x| x.to_string())
1463                .collect::<Vec<_>>(),
1464            vec!["device_id < 100".to_string()]
1465        );
1466        assert_eq!(
1467            request
1468                .into_exprs
1469                .into_iter()
1470                .map(|x| x.to_string())
1471                .collect::<Vec<_>>(),
1472            vec![
1473                "device_id < 100 AND area < 'South'".to_string(),
1474                "device_id < 100 AND area >= 'South'".to_string()
1475            ]
1476        );
1477    }
1478
1479    fn new_test_table_names() -> Vec<TableName> {
1480        vec![
1481            TableName {
1482                catalog_name: "greptime".to_string(),
1483                schema_name: "public".to_string(),
1484                table_name: "a_table".to_string(),
1485            },
1486            TableName {
1487                catalog_name: "greptime".to_string(),
1488                schema_name: "public".to_string(),
1489                table_name: "b_table".to_string(),
1490            },
1491        ]
1492    }
1493
1494    #[test]
1495    fn test_to_create_view_expr() {
1496        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
1497        let stmt =
1498            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1499                .unwrap()
1500                .pop()
1501                .unwrap();
1502
1503        let Statement::CreateView(stmt) = stmt else {
1504            unreachable!()
1505        };
1506
1507        let logical_plan = vec![1, 2, 3];
1508        let table_names = new_test_table_names();
1509        let columns = vec!["a".to_string()];
1510        let plan_columns = vec!["number".to_string()];
1511
1512        let expr = to_create_view_expr(
1513            stmt,
1514            logical_plan.clone(),
1515            table_names.clone(),
1516            columns.clone(),
1517            plan_columns.clone(),
1518            sql.to_string(),
1519            QueryContext::arc(),
1520        )
1521        .unwrap();
1522
1523        assert_eq!("greptime", expr.catalog_name);
1524        assert_eq!("public", expr.schema_name);
1525        assert_eq!("test", expr.view_name);
1526        assert!(!expr.create_if_not_exists);
1527        assert!(!expr.or_replace);
1528        assert_eq!(logical_plan, expr.logical_plan);
1529        assert_eq!(table_names, expr.table_names);
1530        assert_eq!(sql, expr.definition);
1531        assert_eq!(columns, expr.columns);
1532        assert_eq!(plan_columns, expr.plan_columns);
1533    }
1534
1535    #[test]
1536    fn test_to_create_view_expr_complex() {
1537        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test.test_view AS SELECT * FROM NUMBERS";
1538        let stmt =
1539            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1540                .unwrap()
1541                .pop()
1542                .unwrap();
1543
1544        let Statement::CreateView(stmt) = stmt else {
1545            unreachable!()
1546        };
1547
1548        let logical_plan = vec![1, 2, 3];
1549        let table_names = new_test_table_names();
1550        let columns = vec!["a".to_string()];
1551        let plan_columns = vec!["number".to_string()];
1552
1553        let expr = to_create_view_expr(
1554            stmt,
1555            logical_plan.clone(),
1556            table_names.clone(),
1557            columns.clone(),
1558            plan_columns.clone(),
1559            sql.to_string(),
1560            QueryContext::arc(),
1561        )
1562        .unwrap();
1563
1564        assert_eq!("greptime", expr.catalog_name);
1565        assert_eq!("test", expr.schema_name);
1566        assert_eq!("test_view", expr.view_name);
1567        assert!(expr.create_if_not_exists);
1568        assert!(expr.or_replace);
1569        assert_eq!(logical_plan, expr.logical_plan);
1570        assert_eq!(table_names, expr.table_names);
1571        assert_eq!(sql, expr.definition);
1572        assert_eq!(columns, expr.columns);
1573        assert_eq!(plan_columns, expr.plan_columns);
1574    }
1575
1576    #[test]
1577    fn test_expr_to_create() {
1578        let sql = r#"CREATE TABLE IF NOT EXISTS `tt` (
1579  `timestamp` TIMESTAMP(9) NOT NULL,
1580  `ip_address` STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),
1581  `username` STRING NULL,
1582  `http_method` STRING NULL INVERTED INDEX,
1583  `request_line` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),
1584  `protocol` STRING NULL,
1585  `status_code` INT NULL INVERTED INDEX,
1586  `response_size` BIGINT NULL,
1587  `message` STRING NULL,
1588  TIME INDEX (`timestamp`),
1589  PRIMARY KEY (`username`, `status_code`)
1590)
1591ENGINE=mito
1592WITH(
1593  append_mode = 'true'
1594)"#;
1595        let stmt =
1596            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1597                .unwrap()
1598                .pop()
1599                .unwrap();
1600
1601        let Statement::CreateTable(original_create) = stmt else {
1602            unreachable!()
1603        };
1604
1605        // Convert CreateTable -> CreateTableExpr -> CreateTable
1606        let expr = create_to_expr(&original_create, &QueryContext::arc()).unwrap();
1607
1608        let create_table = expr_to_create(&expr, Some('`')).unwrap();
1609        let new_sql = format!("{:#}", create_table);
1610        assert_eq!(sql, new_sql);
1611    }
1612}