operator/
expr_helper.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19
20use api::helper::ColumnDataTypeWrapper;
21use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
22use api::v1::alter_table_expr::Kind as AlterTableKind;
23use api::v1::column_def::{options_from_column_schema, try_as_column_schema};
24use api::v1::{
25    AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer, ColumnDataType,
26    ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn,
27    DropColumns, DropDefaults, ExpireAfter, FulltextBackend as PbFulltextBackend, ModifyColumnType,
28    ModifyColumnTypes, RenameTable, SemanticType, SetDatabaseOptions, SetDefaults, SetFulltext,
29    SetIndex, SetIndexes, SetInverted, SetSkipping, SetTableOptions,
30    SkippingIndexType as PbSkippingIndexType, TableName, UnsetDatabaseOptions, UnsetFulltext,
31    UnsetIndex, UnsetIndexes, UnsetInverted, UnsetSkipping, UnsetTableOptions, set_index,
32    unset_index,
33};
34use common_error::ext::BoxedError;
35use common_grpc_expr::util::ColumnExpr;
36use common_time::Timezone;
37use datafusion::sql::planner::object_name_to_table_reference;
38use datatypes::schema::{
39    COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
40    COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
41    COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
42    COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
43    ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema,
44    SkippingIndexType,
45};
46use file_engine::FileOptions;
47use query::sql::{
48    check_file_to_table_schema_compatibility, file_column_schemas_to_table,
49    infer_file_table_schema, prepare_file_table_files,
50};
51use session::context::QueryContextRef;
52use session::table_name::table_idents_to_full_name;
53use snafu::{OptionExt, ResultExt, ensure};
54use sql::ast::{
55    ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName, ObjectNamePartExt,
56};
57use sql::dialect::GreptimeDbDialect;
58use sql::parser::ParserContext;
59use sql::statements::alter::{
60    AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
61};
62use sql::statements::create::{
63    Column as SqlColumn, ColumnExtensions, CreateExternalTable, CreateFlow, CreateTable,
64    CreateView, TableConstraint,
65};
66use sql::statements::{
67    OptionMap, column_to_schema, concrete_data_type_to_sql_data_type,
68    sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type, value_to_sql_value,
69};
70use sql::util::extract_tables_from_query;
71use store_api::mito_engine_options::{COMPACTION_OVERRIDE, COMPACTION_TYPE};
72use table::requests::{FILE_TABLE_META_KEY, TableOptions};
73use table::table_reference::TableReference;
74#[cfg(feature = "enterprise")]
75pub use trigger::to_create_trigger_task_expr;
76
77use crate::error::{
78    BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
79    ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, FindNewColumnsOnInsertionSnafu,
80    IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidColumnDefSnafu,
81    InvalidFlowNameSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, ParseSqlValueSnafu,
82    PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu,
83};
84
85pub fn create_table_expr_by_column_schemas(
86    table_name: &TableReference<'_>,
87    column_schemas: &[api::v1::ColumnSchema],
88    engine: &str,
89    desc: Option<&str>,
90) -> Result<CreateTableExpr> {
91    let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
92    let expr = common_grpc_expr::util::build_create_table_expr(
93        None,
94        table_name,
95        column_exprs,
96        engine,
97        desc.unwrap_or("Created on insertion"),
98    )
99    .context(BuildCreateExprOnInsertionSnafu)?;
100
101    validate_create_expr(&expr)?;
102    Ok(expr)
103}
104
105pub fn extract_add_columns_expr(
106    schema: &Schema,
107    column_exprs: Vec<ColumnExpr>,
108) -> Result<Option<AddColumns>> {
109    let add_columns = common_grpc_expr::util::extract_new_columns(schema, column_exprs)
110        .context(FindNewColumnsOnInsertionSnafu)?;
111    if let Some(add_columns) = &add_columns {
112        validate_add_columns_expr(add_columns)?;
113    }
114    Ok(add_columns)
115}
116
117//   cpu float64,
118//   memory float64,
119//   TIME INDEX (ts),
120//   PRIMARY KEY(host)
121// ) WITH (location='/var/data/city.csv', format='csv');
122// ```
123// The user needs to specify the TIME INDEX column. If there is no suitable
124// column in the file to use as TIME INDEX, an additional placeholder column
125// needs to be created as the TIME INDEX, and a `DEFAULT <value>` constraint
126// should be added.
127//
128//
129// When the `CREATE EXTERNAL TABLE` statement is in inferred form, like
130// ```sql
131// CREATE EXTERNAL TABLE IF NOT EXISTS city WITH (location='/var/data/city.csv',format='csv');
132// ```
133// 1. If the TIME INDEX column can be inferred from metadata, use that column
134//    as the TIME INDEX. Otherwise,
135// 2. If a column named `greptime_timestamp` exists (with the requirement that
136//    the column is with type TIMESTAMP, otherwise an error is thrown), use
137//    that column as the TIME INDEX. Otherwise,
138// 3. Automatically create the `greptime_timestamp` column and add a `DEFAULT 0`
139//    constraint.
140pub(crate) async fn create_external_expr(
141    create: CreateExternalTable,
142    query_ctx: &QueryContextRef,
143) -> Result<CreateTableExpr> {
144    let (catalog_name, schema_name, table_name) =
145        table_idents_to_full_name(&create.name, query_ctx)
146            .map_err(BoxedError::new)
147            .context(ExternalSnafu)?;
148
149    let mut table_options = create.options.into_map();
150
151    let (object_store, files) = prepare_file_table_files(&table_options)
152        .await
153        .context(PrepareFileTableSnafu)?;
154
155    let file_column_schemas = infer_file_table_schema(&object_store, &files, &table_options)
156        .await
157        .context(InferFileTableSchemaSnafu)?
158        .column_schemas()
159        .to_vec();
160
161    let (time_index, primary_keys, table_column_schemas) = if !create.columns.is_empty() {
162        // expanded form
163        let time_index = find_time_index(&create.constraints)?;
164        let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
165        let column_schemas =
166            columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
167        (time_index, primary_keys, column_schemas)
168    } else {
169        // inferred form
170        let (column_schemas, time_index) = file_column_schemas_to_table(&file_column_schemas);
171        let primary_keys = vec![];
172        (time_index, primary_keys, column_schemas)
173    };
174
175    check_file_to_table_schema_compatibility(&file_column_schemas, &table_column_schemas)
176        .context(SchemaIncompatibleSnafu)?;
177
178    let meta = FileOptions {
179        files,
180        file_column_schemas,
181    };
182    table_options.insert(
183        FILE_TABLE_META_KEY.to_string(),
184        serde_json::to_string(&meta).context(EncodeJsonSnafu)?,
185    );
186
187    let column_defs = column_schemas_to_defs(table_column_schemas, &primary_keys)?;
188    let expr = CreateTableExpr {
189        catalog_name,
190        schema_name,
191        table_name,
192        desc: String::default(),
193        column_defs,
194        time_index,
195        primary_keys,
196        create_if_not_exists: create.if_not_exists,
197        table_options,
198        table_id: None,
199        engine: create.engine.clone(),
200    };
201
202    Ok(expr)
203}
204
205/// Convert `CreateTable` statement to [`CreateTableExpr`] gRPC request.
206pub fn create_to_expr(
207    create: &CreateTable,
208    query_ctx: &QueryContextRef,
209) -> Result<CreateTableExpr> {
210    let (catalog_name, schema_name, table_name) =
211        table_idents_to_full_name(&create.name, query_ctx)
212            .map_err(BoxedError::new)
213            .context(ExternalSnafu)?;
214
215    let time_index = find_time_index(&create.constraints)?;
216    let table_options = HashMap::from(
217        &TableOptions::try_from_iter(create.options.to_str_map())
218            .context(UnrecognizedTableOptionSnafu)?,
219    );
220
221    let mut table_options = table_options;
222    if table_options.contains_key(COMPACTION_TYPE) {
223        table_options.insert(COMPACTION_OVERRIDE.to_string(), "true".to_string());
224    }
225
226    let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
227
228    let expr = CreateTableExpr {
229        catalog_name,
230        schema_name,
231        table_name,
232        desc: String::default(),
233        column_defs: columns_to_expr(
234            &create.columns,
235            &time_index,
236            &primary_keys,
237            Some(&query_ctx.timezone()),
238        )?,
239        time_index,
240        primary_keys,
241        create_if_not_exists: create.if_not_exists,
242        table_options,
243        table_id: None,
244        engine: create.engine.clone(),
245    };
246
247    validate_create_expr(&expr)?;
248    Ok(expr)
249}
250
251/// Convert gRPC's [`CreateTableExpr`] back to `CreateTable` statement.
252/// You can use `create_table_expr_by_column_schemas` to create a `CreateTableExpr` from column schemas.
253///
254/// # Parameters
255///
256/// * `expr` - The `CreateTableExpr` to convert
257/// * `quote_style` - Optional quote style for identifiers (defaults to MySQL style ` backtick)
258pub fn expr_to_create(expr: &CreateTableExpr, quote_style: Option<char>) -> Result<CreateTable> {
259    let quote_style = quote_style.unwrap_or('`');
260
261    // Convert table name
262    let table_name = ObjectName(vec![sql::ast::ObjectNamePart::Identifier(
263        sql::ast::Ident::with_quote(quote_style, &expr.table_name),
264    )]);
265
266    // Convert columns
267    let mut columns = Vec::with_capacity(expr.column_defs.len());
268    for column_def in &expr.column_defs {
269        let column_schema = try_as_column_schema(column_def).context(InvalidColumnDefSnafu {
270            column: &column_def.name,
271        })?;
272
273        let mut options = Vec::new();
274
275        // Add NULL/NOT NULL constraint
276        if column_def.is_nullable {
277            options.push(ColumnOptionDef {
278                name: None,
279                option: ColumnOption::Null,
280            });
281        } else {
282            options.push(ColumnOptionDef {
283                name: None,
284                option: ColumnOption::NotNull,
285            });
286        }
287
288        // Add DEFAULT constraint if present
289        if let Some(default_constraint) = column_schema.default_constraint() {
290            let expr = match default_constraint {
291                ColumnDefaultConstraint::Value(v) => {
292                    Expr::Value(value_to_sql_value(v).context(ParseSqlValueSnafu)?.into())
293                }
294                ColumnDefaultConstraint::Function(func_expr) => {
295                    ParserContext::parse_function(func_expr, &GreptimeDbDialect {})
296                        .context(ParseSqlSnafu)?
297                }
298            };
299            options.push(ColumnOptionDef {
300                name: None,
301                option: ColumnOption::Default(expr),
302            });
303        }
304
305        // Add COMMENT if present
306        if !column_def.comment.is_empty() {
307            options.push(ColumnOptionDef {
308                name: None,
309                option: ColumnOption::Comment(column_def.comment.clone()),
310            });
311        }
312
313        // Note: We don't add inline PRIMARY KEY options here,
314        // we'll handle all primary keys as constraints instead for consistency
315
316        // Handle column extensions (fulltext, inverted index, skipping index)
317        let mut extensions = ColumnExtensions::default();
318
319        // Add fulltext index options if present
320        if let Ok(Some(opt)) = column_schema.fulltext_options()
321            && opt.enable
322        {
323            let mut map = HashMap::from([
324                (
325                    COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
326                    opt.analyzer.to_string(),
327                ),
328                (
329                    COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
330                    opt.case_sensitive.to_string(),
331                ),
332                (
333                    COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
334                    opt.backend.to_string(),
335                ),
336            ]);
337            if opt.backend == FulltextBackend::Bloom {
338                map.insert(
339                    COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(),
340                    opt.granularity.to_string(),
341                );
342                map.insert(
343                    COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
344                    opt.false_positive_rate().to_string(),
345                );
346            }
347            extensions.fulltext_index_options = Some(map.into());
348        }
349
350        // Add skipping index options if present
351        if let Ok(Some(opt)) = column_schema.skipping_index_options() {
352            let map = HashMap::from([
353                (
354                    COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
355                    opt.granularity.to_string(),
356                ),
357                (
358                    COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
359                    opt.false_positive_rate().to_string(),
360                ),
361                (
362                    COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
363                    opt.index_type.to_string(),
364                ),
365            ]);
366            extensions.skipping_index_options = Some(map.into());
367        }
368
369        // Add inverted index options if present
370        if column_schema.is_inverted_indexed() {
371            extensions.inverted_index_options = Some(HashMap::new().into());
372        }
373
374        let sql_column = SqlColumn {
375            column_def: ColumnDef {
376                name: Ident::with_quote(quote_style, &column_def.name),
377                data_type: concrete_data_type_to_sql_data_type(&column_schema.data_type)
378                    .context(ParseSqlSnafu)?,
379                options,
380            },
381            extensions,
382        };
383
384        columns.push(sql_column);
385    }
386
387    // Convert constraints
388    let mut constraints = Vec::new();
389
390    // Add TIME INDEX constraint
391    constraints.push(TableConstraint::TimeIndex {
392        column: Ident::with_quote(quote_style, &expr.time_index),
393    });
394
395    // Add PRIMARY KEY constraint (always add as constraint for consistency)
396    if !expr.primary_keys.is_empty() {
397        let primary_key_columns: Vec<Ident> = expr
398            .primary_keys
399            .iter()
400            .map(|pk| Ident::with_quote(quote_style, pk))
401            .collect();
402
403        constraints.push(TableConstraint::PrimaryKey {
404            columns: primary_key_columns,
405        });
406    }
407
408    // Convert table options
409    let mut options = OptionMap::default();
410    for (key, value) in &expr.table_options {
411        options.insert(key.clone(), value.clone());
412    }
413
414    Ok(CreateTable {
415        if_not_exists: expr.create_if_not_exists,
416        table_id: expr.table_id.as_ref().map(|tid| tid.id).unwrap_or(0),
417        name: table_name,
418        columns,
419        engine: expr.engine.clone(),
420        constraints,
421        options,
422        partitions: None,
423    })
424}
425
426/// Validate the [`CreateTableExpr`] request.
427pub fn validate_create_expr(create: &CreateTableExpr) -> Result<()> {
428    // construct column list
429    let mut column_to_indices = HashMap::with_capacity(create.column_defs.len());
430    for (idx, column) in create.column_defs.iter().enumerate() {
431        if let Some(indices) = column_to_indices.get(&column.name) {
432            return InvalidSqlSnafu {
433                err_msg: format!(
434                    "column name `{}` is duplicated at index {} and {}",
435                    column.name, indices, idx
436                ),
437            }
438            .fail();
439        }
440        column_to_indices.insert(&column.name, idx);
441    }
442
443    // verify time_index exists
444    let _ = column_to_indices
445        .get(&create.time_index)
446        .with_context(|| InvalidSqlSnafu {
447            err_msg: format!(
448                "column name `{}` is not found in column list",
449                create.time_index
450            ),
451        })?;
452
453    // verify primary_key exists
454    for pk in &create.primary_keys {
455        let _ = column_to_indices
456            .get(&pk)
457            .with_context(|| InvalidSqlSnafu {
458                err_msg: format!("column name `{}` is not found in column list", pk),
459            })?;
460    }
461
462    // construct primary_key set
463    let mut pk_set = HashSet::new();
464    for pk in &create.primary_keys {
465        if !pk_set.insert(pk) {
466            return InvalidSqlSnafu {
467                err_msg: format!("column name `{}` is duplicated in primary keys", pk),
468            }
469            .fail();
470        }
471    }
472
473    // verify time index is not primary key
474    if pk_set.contains(&create.time_index) {
475        return InvalidSqlSnafu {
476            err_msg: format!(
477                "column name `{}` is both primary key and time index",
478                create.time_index
479            ),
480        }
481        .fail();
482    }
483
484    for column in &create.column_defs {
485        // verify do not contain interval type column issue #3235
486        if is_interval_type(&column.data_type()) {
487            return InvalidSqlSnafu {
488                err_msg: format!(
489                    "column name `{}` is interval type, which is not supported",
490                    column.name
491                ),
492            }
493            .fail();
494        }
495        // verify do not contain datetime type column issue #5489
496        if is_date_time_type(&column.data_type()) {
497            return InvalidSqlSnafu {
498                err_msg: format!(
499                    "column name `{}` is datetime type, which is not supported, please use `timestamp` type instead",
500                    column.name
501                ),
502            }
503            .fail();
504        }
505    }
506    Ok(())
507}
508
509fn validate_add_columns_expr(add_columns: &AddColumns) -> Result<()> {
510    for add_column in &add_columns.add_columns {
511        let Some(column_def) = &add_column.column_def else {
512            continue;
513        };
514        if is_date_time_type(&column_def.data_type()) {
515            return InvalidSqlSnafu {
516                    err_msg: format!("column name `{}` is datetime type, which is not supported, please use `timestamp` type instead", column_def.name),
517                }
518                .fail();
519        }
520        if is_interval_type(&column_def.data_type()) {
521            return InvalidSqlSnafu {
522                err_msg: format!(
523                    "column name `{}` is interval type, which is not supported",
524                    column_def.name
525                ),
526            }
527            .fail();
528        }
529    }
530    Ok(())
531}
532
533fn is_date_time_type(data_type: &ColumnDataType) -> bool {
534    matches!(data_type, ColumnDataType::Datetime)
535}
536
537fn is_interval_type(data_type: &ColumnDataType) -> bool {
538    matches!(
539        data_type,
540        ColumnDataType::IntervalYearMonth
541            | ColumnDataType::IntervalDayTime
542            | ColumnDataType::IntervalMonthDayNano
543    )
544}
545
546fn find_primary_keys(
547    columns: &[SqlColumn],
548    constraints: &[TableConstraint],
549) -> Result<Vec<String>> {
550    let columns_pk = columns
551        .iter()
552        .filter_map(|x| {
553            if x.options().iter().any(|o| {
554                matches!(
555                    o.option,
556                    ColumnOption::Unique {
557                        is_primary: true,
558                        ..
559                    }
560                )
561            }) {
562                Some(x.name().value.clone())
563            } else {
564                None
565            }
566        })
567        .collect::<Vec<String>>();
568
569    ensure!(
570        columns_pk.len() <= 1,
571        IllegalPrimaryKeysDefSnafu {
572            msg: "not allowed to inline multiple primary keys in columns options"
573        }
574    );
575
576    let constraints_pk = constraints
577        .iter()
578        .filter_map(|constraint| match constraint {
579            TableConstraint::PrimaryKey { columns, .. } => {
580                Some(columns.iter().map(|ident| ident.value.clone()))
581            }
582            _ => None,
583        })
584        .flatten()
585        .collect::<Vec<String>>();
586
587    ensure!(
588        columns_pk.is_empty() || constraints_pk.is_empty(),
589        IllegalPrimaryKeysDefSnafu {
590            msg: "found definitions of primary keys in multiple places"
591        }
592    );
593
594    let mut primary_keys = Vec::with_capacity(columns_pk.len() + constraints_pk.len());
595    primary_keys.extend(columns_pk);
596    primary_keys.extend(constraints_pk);
597    Ok(primary_keys)
598}
599
600pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
601    let time_index = constraints
602        .iter()
603        .filter_map(|constraint| match constraint {
604            TableConstraint::TimeIndex { column, .. } => Some(&column.value),
605            _ => None,
606        })
607        .collect::<Vec<&String>>();
608    ensure!(
609        time_index.len() == 1,
610        InvalidSqlSnafu {
611            err_msg: "must have one and only one TimeIndex columns",
612        }
613    );
614    Ok(time_index[0].clone())
615}
616
617fn columns_to_expr(
618    column_defs: &[SqlColumn],
619    time_index: &str,
620    primary_keys: &[String],
621    timezone: Option<&Timezone>,
622) -> Result<Vec<api::v1::ColumnDef>> {
623    let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
624    column_schemas_to_defs(column_schemas, primary_keys)
625}
626
627fn columns_to_column_schemas(
628    columns: &[SqlColumn],
629    time_index: &str,
630    timezone: Option<&Timezone>,
631) -> Result<Vec<ColumnSchema>> {
632    columns
633        .iter()
634        .map(|c| column_to_schema(c, time_index, timezone).context(ParseSqlSnafu))
635        .collect::<Result<Vec<ColumnSchema>>>()
636}
637
638// TODO(weny): refactor this function to use `try_as_column_def`
639pub fn column_schemas_to_defs(
640    column_schemas: Vec<ColumnSchema>,
641    primary_keys: &[String],
642) -> Result<Vec<api::v1::ColumnDef>> {
643    let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
644        .iter()
645        .map(|c| {
646            ColumnDataTypeWrapper::try_from(c.data_type.clone())
647                .map(|w| w.to_parts())
648                .context(ColumnDataTypeSnafu)
649        })
650        .collect::<Result<Vec<_>>>()?;
651
652    column_schemas
653        .iter()
654        .zip(column_datatypes)
655        .map(|(schema, datatype)| {
656            let semantic_type = if schema.is_time_index() {
657                SemanticType::Timestamp
658            } else if primary_keys.contains(&schema.name) {
659                SemanticType::Tag
660            } else {
661                SemanticType::Field
662            } as i32;
663            let comment = schema
664                .metadata()
665                .get(COMMENT_KEY)
666                .cloned()
667                .unwrap_or_default();
668
669            Ok(api::v1::ColumnDef {
670                name: schema.name.clone(),
671                data_type: datatype.0 as i32,
672                is_nullable: schema.is_nullable(),
673                default_constraint: match schema.default_constraint() {
674                    None => vec![],
675                    Some(v) => {
676                        v.clone()
677                            .try_into()
678                            .context(ConvertColumnDefaultConstraintSnafu {
679                                column_name: &schema.name,
680                            })?
681                    }
682                },
683                semantic_type,
684                comment,
685                datatype_extension: datatype.1,
686                options: options_from_column_schema(schema),
687            })
688        })
689        .collect()
690}
691
692#[derive(Debug, Clone, PartialEq, Eq)]
693pub struct RepartitionRequest {
694    pub catalog_name: String,
695    pub schema_name: String,
696    pub table_name: String,
697    pub from_exprs: Vec<Expr>,
698    pub into_exprs: Vec<Expr>,
699    pub options: OptionMap,
700}
701
702pub(crate) fn to_repartition_request(
703    alter_table: AlterTable,
704    query_ctx: &QueryContextRef,
705) -> Result<RepartitionRequest> {
706    let AlterTable {
707        table_name,
708        alter_operation,
709        options,
710    } = alter_table;
711
712    let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&table_name, query_ctx)
713        .map_err(BoxedError::new)
714        .context(ExternalSnafu)?;
715
716    let AlterTableOperation::Repartition { operation } = alter_operation else {
717        return InvalidSqlSnafu {
718            err_msg: "expected REPARTITION operation",
719        }
720        .fail();
721    };
722
723    Ok(RepartitionRequest {
724        catalog_name,
725        schema_name,
726        table_name,
727        from_exprs: operation.from_exprs,
728        into_exprs: operation.into_exprs,
729        options,
730    })
731}
732
733/// Converts a SQL alter table statement into a gRPC alter table expression.
734pub(crate) fn to_alter_table_expr(
735    alter_table: AlterTable,
736    query_ctx: &QueryContextRef,
737) -> Result<AlterTableExpr> {
738    let (catalog_name, schema_name, table_name) =
739        table_idents_to_full_name(alter_table.table_name(), query_ctx)
740            .map_err(BoxedError::new)
741            .context(ExternalSnafu)?;
742
743    let kind = match alter_table.alter_operation {
744        AlterTableOperation::AddConstraint(_) => {
745            return NotSupportedSnafu {
746                feat: "ADD CONSTRAINT",
747            }
748            .fail();
749        }
750        AlterTableOperation::AddColumns { add_columns } => AlterTableKind::AddColumns(AddColumns {
751            add_columns: add_columns
752                .into_iter()
753                .map(|add_column| {
754                    let column_def = sql_column_def_to_grpc_column_def(
755                        &add_column.column_def,
756                        Some(&query_ctx.timezone()),
757                    )
758                    .map_err(BoxedError::new)
759                    .context(ExternalSnafu)?;
760                    if is_interval_type(&column_def.data_type()) {
761                        return NotSupportedSnafu {
762                            feat: "Add column with interval type",
763                        }
764                        .fail();
765                    }
766                    Ok(AddColumn {
767                        column_def: Some(column_def),
768                        location: add_column.location.as_ref().map(From::from),
769                        add_if_not_exists: add_column.add_if_not_exists,
770                    })
771                })
772                .collect::<Result<Vec<AddColumn>>>()?,
773        }),
774        AlterTableOperation::ModifyColumnType {
775            column_name,
776            target_type,
777        } => {
778            let target_type =
779                sql_data_type_to_concrete_data_type(&target_type, &Default::default())
780                    .context(ParseSqlSnafu)?;
781            let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
782                .map(|w| w.to_parts())
783                .context(ColumnDataTypeSnafu)?;
784            if is_interval_type(&target_type) {
785                return NotSupportedSnafu {
786                    feat: "Modify column type to interval type",
787                }
788                .fail();
789            }
790            AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
791                modify_column_types: vec![ModifyColumnType {
792                    column_name: column_name.value,
793                    target_type: target_type as i32,
794                    target_type_extension,
795                }],
796            })
797        }
798        AlterTableOperation::DropColumn { name } => AlterTableKind::DropColumns(DropColumns {
799            drop_columns: vec![DropColumn {
800                name: name.value.clone(),
801            }],
802        }),
803        AlterTableOperation::RenameTable { new_table_name } => {
804            AlterTableKind::RenameTable(RenameTable {
805                new_table_name: new_table_name.clone(),
806            })
807        }
808        AlterTableOperation::SetTableOptions { options } => {
809            AlterTableKind::SetTableOptions(SetTableOptions {
810                table_options: options.into_iter().map(Into::into).collect(),
811            })
812        }
813        AlterTableOperation::UnsetTableOptions { keys } => {
814            AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys })
815        }
816        AlterTableOperation::Repartition { .. } => {
817            return NotSupportedSnafu {
818                feat: "ALTER TABLE ... REPARTITION",
819            }
820            .fail();
821        }
822        AlterTableOperation::SetIndex { options } => {
823            let option = match options {
824                sql::statements::alter::SetIndexOperation::Fulltext {
825                    column_name,
826                    options,
827                } => SetIndex {
828                    options: Some(set_index::Options::Fulltext(SetFulltext {
829                        column_name: column_name.value,
830                        enable: options.enable,
831                        analyzer: match options.analyzer {
832                            FulltextAnalyzer::English => Analyzer::English.into(),
833                            FulltextAnalyzer::Chinese => Analyzer::Chinese.into(),
834                        },
835                        case_sensitive: options.case_sensitive,
836                        backend: match options.backend {
837                            FulltextBackend::Bloom => PbFulltextBackend::Bloom.into(),
838                            FulltextBackend::Tantivy => PbFulltextBackend::Tantivy.into(),
839                        },
840                        granularity: options.granularity as u64,
841                        false_positive_rate: options.false_positive_rate(),
842                    })),
843                },
844                sql::statements::alter::SetIndexOperation::Inverted { column_name } => SetIndex {
845                    options: Some(set_index::Options::Inverted(SetInverted {
846                        column_name: column_name.value,
847                    })),
848                },
849                sql::statements::alter::SetIndexOperation::Skipping {
850                    column_name,
851                    options,
852                } => SetIndex {
853                    options: Some(set_index::Options::Skipping(SetSkipping {
854                        column_name: column_name.value,
855                        enable: true,
856                        granularity: options.granularity as u64,
857                        false_positive_rate: options.false_positive_rate(),
858                        skipping_index_type: match options.index_type {
859                            SkippingIndexType::BloomFilter => {
860                                PbSkippingIndexType::BloomFilter.into()
861                            }
862                        },
863                    })),
864                },
865            };
866            AlterTableKind::SetIndexes(SetIndexes {
867                set_indexes: vec![option],
868            })
869        }
870        AlterTableOperation::UnsetIndex { options } => {
871            let option = match options {
872                sql::statements::alter::UnsetIndexOperation::Fulltext { column_name } => {
873                    UnsetIndex {
874                        options: Some(unset_index::Options::Fulltext(UnsetFulltext {
875                            column_name: column_name.value,
876                        })),
877                    }
878                }
879                sql::statements::alter::UnsetIndexOperation::Inverted { column_name } => {
880                    UnsetIndex {
881                        options: Some(unset_index::Options::Inverted(UnsetInverted {
882                            column_name: column_name.value,
883                        })),
884                    }
885                }
886                sql::statements::alter::UnsetIndexOperation::Skipping { column_name } => {
887                    UnsetIndex {
888                        options: Some(unset_index::Options::Skipping(UnsetSkipping {
889                            column_name: column_name.value,
890                        })),
891                    }
892                }
893            };
894
895            AlterTableKind::UnsetIndexes(UnsetIndexes {
896                unset_indexes: vec![option],
897            })
898        }
899        AlterTableOperation::DropDefaults { columns } => {
900            AlterTableKind::DropDefaults(DropDefaults {
901                drop_defaults: columns
902                    .into_iter()
903                    .map(|col| {
904                        let column_name = col.0.to_string();
905                        Ok(api::v1::DropDefault { column_name })
906                    })
907                    .collect::<Result<Vec<_>>>()?,
908            })
909        }
910        AlterTableOperation::SetDefaults { defaults } => AlterTableKind::SetDefaults(SetDefaults {
911            set_defaults: defaults
912                .into_iter()
913                .map(|col| {
914                    let column_name = col.column_name.to_string();
915                    let default_constraint = serde_json::to_string(&col.default_constraint)
916                        .context(EncodeJsonSnafu)?
917                        .into_bytes();
918                    Ok(api::v1::SetDefault {
919                        column_name,
920                        default_constraint,
921                    })
922                })
923                .collect::<Result<Vec<_>>>()?,
924        }),
925    };
926
927    Ok(AlterTableExpr {
928        catalog_name,
929        schema_name,
930        table_name,
931        kind: Some(kind),
932    })
933}
934
935/// Try to cast the `[AlterDatabase]` statement into gRPC `[AlterDatabaseExpr]`.
936pub fn to_alter_database_expr(
937    alter_database: AlterDatabase,
938    query_ctx: &QueryContextRef,
939) -> Result<AlterDatabaseExpr> {
940    let catalog = query_ctx.current_catalog();
941    let schema = alter_database.database_name;
942
943    let kind = match alter_database.alter_operation {
944        AlterDatabaseOperation::SetDatabaseOption { options } => {
945            let options = options.into_iter().map(Into::into).collect();
946            AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
947                set_database_options: options,
948            })
949        }
950        AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
951            AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys })
952        }
953    };
954
955    Ok(AlterDatabaseExpr {
956        catalog_name: catalog.to_string(),
957        schema_name: schema.to_string(),
958        kind: Some(kind),
959    })
960}
961
962/// Try to cast the `[CreateViewExpr]` statement into gRPC `[CreateViewExpr]`.
963pub fn to_create_view_expr(
964    stmt: CreateView,
965    logical_plan: Vec<u8>,
966    table_names: Vec<TableName>,
967    columns: Vec<String>,
968    plan_columns: Vec<String>,
969    definition: String,
970    query_ctx: QueryContextRef,
971) -> Result<CreateViewExpr> {
972    let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx)
973        .map_err(BoxedError::new)
974        .context(ExternalSnafu)?;
975
976    let expr = CreateViewExpr {
977        catalog_name,
978        schema_name,
979        view_name,
980        logical_plan,
981        create_if_not_exists: stmt.if_not_exists,
982        or_replace: stmt.or_replace,
983        table_names,
984        columns,
985        plan_columns,
986        definition,
987    };
988
989    Ok(expr)
990}
991
992pub fn to_create_flow_task_expr(
993    create_flow: CreateFlow,
994    query_ctx: &QueryContextRef,
995) -> Result<CreateFlowExpr> {
996    // retrieve sink table name
997    let sink_table_ref = object_name_to_table_reference(create_flow.sink_table_name.clone(), true)
998        .with_context(|_| ConvertIdentifierSnafu {
999            ident: create_flow.sink_table_name.to_string(),
1000        })?;
1001    let catalog = sink_table_ref
1002        .catalog()
1003        .unwrap_or(query_ctx.current_catalog())
1004        .to_string();
1005    let schema = sink_table_ref
1006        .schema()
1007        .map(|s| s.to_owned())
1008        .unwrap_or(query_ctx.current_schema());
1009
1010    let sink_table_name = TableName {
1011        catalog_name: catalog,
1012        schema_name: schema,
1013        table_name: sink_table_ref.table().to_string(),
1014    };
1015
1016    let source_table_names = extract_tables_from_query(&create_flow.query)
1017        .map(|name| {
1018            let reference =
1019                object_name_to_table_reference(name.clone(), true).with_context(|_| {
1020                    ConvertIdentifierSnafu {
1021                        ident: name.to_string(),
1022                    }
1023                })?;
1024            let catalog = reference
1025                .catalog()
1026                .unwrap_or(query_ctx.current_catalog())
1027                .to_string();
1028            let schema = reference
1029                .schema()
1030                .map(|s| s.to_string())
1031                .unwrap_or(query_ctx.current_schema());
1032
1033            let table_name = TableName {
1034                catalog_name: catalog,
1035                schema_name: schema,
1036                table_name: reference.table().to_string(),
1037            };
1038            Ok(table_name)
1039        })
1040        .collect::<Result<Vec<_>>>()?;
1041
1042    let eval_interval = create_flow.eval_interval;
1043
1044    Ok(CreateFlowExpr {
1045        catalog_name: query_ctx.current_catalog().to_string(),
1046        flow_name: sanitize_flow_name(create_flow.flow_name)?,
1047        source_table_names,
1048        sink_table_name: Some(sink_table_name),
1049        or_replace: create_flow.or_replace,
1050        create_if_not_exists: create_flow.if_not_exists,
1051        expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
1052        eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }),
1053        comment: create_flow.comment.unwrap_or_default(),
1054        sql: create_flow.query.to_string(),
1055        flow_options: Default::default(),
1056    })
1057}
1058
1059/// sanitize the flow name, remove possible quotes
1060fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
1061    ensure!(
1062        flow_name.0.len() == 1,
1063        InvalidFlowNameSnafu {
1064            name: flow_name.to_string(),
1065        }
1066    );
1067    // safety: we've checked flow_name.0 has exactly one element.
1068    Ok(flow_name.0.swap_remove(0).to_string_unquoted())
1069}
1070
1071#[cfg(test)]
1072mod tests {
1073    use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
1074    use datatypes::value::Value;
1075    use session::context::{QueryContext, QueryContextBuilder};
1076    use sql::dialect::GreptimeDbDialect;
1077    use sql::parser::{ParseOptions, ParserContext};
1078    use sql::statements::statement::Statement;
1079    use store_api::storage::ColumnDefaultConstraint;
1080
1081    use super::*;
1082
1083    #[test]
1084    fn test_create_flow_tql_expr() {
1085        let sql = r#"
1086CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1087TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#;
1088        let stmt =
1089            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1090
1091        assert!(
1092            stmt.is_err(),
1093            "Expected error for invalid TQL EVAL parameters: {:#?}",
1094            stmt
1095        );
1096
1097        let sql = r#"
1098CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1099TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests);"#;
1100        let stmt =
1101            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1102                .unwrap()
1103                .pop()
1104                .unwrap();
1105
1106        let Statement::CreateFlow(create_flow) = stmt else {
1107            unreachable!()
1108        };
1109        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1110
1111        let to_dot_sep =
1112            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1113        assert_eq!("calc_reqs", expr.flow_name);
1114        assert_eq!("greptime", expr.catalog_name);
1115        assert_eq!(
1116            "greptime.public.cnt_reqs",
1117            expr.sink_table_name.map(to_dot_sep).unwrap()
1118        );
1119        assert_eq!(1, expr.source_table_names.len());
1120        assert_eq!(
1121            "greptime.public.http_requests",
1122            to_dot_sep(expr.source_table_names[0].clone())
1123        );
1124        assert_eq!(
1125            r#"TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests)"#,
1126            expr.sql
1127        );
1128
1129        let sql = r#"
1130CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1131TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests{__schema__="greptime_private"});"#;
1132        let stmt =
1133            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1134                .unwrap()
1135                .pop()
1136                .unwrap();
1137        let Statement::CreateFlow(create_flow) = stmt else {
1138            unreachable!()
1139        };
1140        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1141        assert_eq!(1, expr.source_table_names.len());
1142        assert_eq!(
1143            "greptime.greptime_private.http_requests",
1144            to_dot_sep(expr.source_table_names[0].clone())
1145        );
1146
1147        let sql = r#"
1148CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1149TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests{__database__="greptime_private"});"#;
1150        let stmt =
1151            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1152                .unwrap()
1153                .pop()
1154                .unwrap();
1155        let Statement::CreateFlow(create_flow) = stmt else {
1156            unreachable!()
1157        };
1158        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1159        assert_eq!(1, expr.source_table_names.len());
1160        assert_eq!(
1161            "greptime.greptime_private.http_requests",
1162            to_dot_sep(expr.source_table_names[0].clone())
1163        );
1164    }
1165
1166    #[test]
1167    fn test_create_flow_expr() {
1168        let sql = r"
1169CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
1170SELECT
1171    DISTINCT number as dis
1172FROM
1173    distinct_basic;";
1174        let stmt =
1175            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1176                .unwrap()
1177                .pop()
1178                .unwrap();
1179
1180        let Statement::CreateFlow(create_flow) = stmt else {
1181            unreachable!()
1182        };
1183        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1184
1185        let to_dot_sep =
1186            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1187        assert_eq!("test_distinct_basic", expr.flow_name);
1188        assert_eq!("greptime", expr.catalog_name);
1189        assert_eq!(
1190            "greptime.public.out_distinct_basic",
1191            expr.sink_table_name.map(to_dot_sep).unwrap()
1192        );
1193        assert_eq!(1, expr.source_table_names.len());
1194        assert_eq!(
1195            "greptime.public.distinct_basic",
1196            to_dot_sep(expr.source_table_names[0].clone())
1197        );
1198        assert_eq!(
1199            r"SELECT
1200    DISTINCT number as dis
1201FROM
1202    distinct_basic",
1203            expr.sql
1204        );
1205
1206        let sql = r"
1207CREATE FLOW `task_2`
1208SINK TO schema_1.table_1
1209AS
1210SELECT max(c1), min(c2) FROM schema_2.table_2;";
1211        let stmt =
1212            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1213                .unwrap()
1214                .pop()
1215                .unwrap();
1216
1217        let Statement::CreateFlow(create_flow) = stmt else {
1218            unreachable!()
1219        };
1220        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1221
1222        let to_dot_sep =
1223            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1224        assert_eq!("task_2", expr.flow_name);
1225        assert_eq!("greptime", expr.catalog_name);
1226        assert_eq!(
1227            "greptime.schema_1.table_1",
1228            expr.sink_table_name.map(to_dot_sep).unwrap()
1229        );
1230        assert_eq!(1, expr.source_table_names.len());
1231        assert_eq!(
1232            "greptime.schema_2.table_2",
1233            to_dot_sep(expr.source_table_names[0].clone())
1234        );
1235        assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
1236
1237        let sql = r"
1238CREATE FLOW abc.`task_2`
1239SINK TO schema_1.table_1
1240AS
1241SELECT max(c1), min(c2) FROM schema_2.table_2;";
1242        let stmt =
1243            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1244                .unwrap()
1245                .pop()
1246                .unwrap();
1247
1248        let Statement::CreateFlow(create_flow) = stmt else {
1249            unreachable!()
1250        };
1251        let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
1252
1253        assert!(res.is_err());
1254        assert!(
1255            res.unwrap_err()
1256                .to_string()
1257                .contains("Invalid flow name: abc.`task_2`")
1258        );
1259    }
1260
1261    #[test]
1262    fn test_create_to_expr() {
1263        let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(ttl='3days', write_buffer_size='1024KB');";
1264        let stmt =
1265            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1266                .unwrap()
1267                .pop()
1268                .unwrap();
1269
1270        let Statement::CreateTable(create_table) = stmt else {
1271            unreachable!()
1272        };
1273        let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1274        assert_eq!("3days", expr.table_options.get("ttl").unwrap());
1275        assert_eq!(
1276            "1.0MiB",
1277            expr.table_options.get("write_buffer_size").unwrap()
1278        );
1279    }
1280
1281    #[test]
1282    fn test_invalid_create_to_expr() {
1283        let cases = [
1284            // duplicate column declaration
1285            "CREATE TABLE monitor (host STRING primary key, ts TIMESTAMP TIME INDEX, some_column text, some_column string);",
1286            // duplicate primary key
1287            "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, some_column STRING, PRIMARY KEY (some_column, host, some_column));",
1288            // time index is primary key
1289            "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, PRIMARY KEY (host, ts));",
1290        ];
1291
1292        for sql in cases {
1293            let stmt = ParserContext::create_with_dialect(
1294                sql,
1295                &GreptimeDbDialect {},
1296                ParseOptions::default(),
1297            )
1298            .unwrap()
1299            .pop()
1300            .unwrap();
1301            let Statement::CreateTable(create_table) = stmt else {
1302                unreachable!()
1303            };
1304            create_to_expr(&create_table, &QueryContext::arc()).unwrap_err();
1305        }
1306    }
1307
1308    #[test]
1309    fn test_create_to_expr_with_default_timestamp_value() {
1310        let sql = "CREATE TABLE monitor (v double,ts TIMESTAMP default '2024-01-30T00:01:01',TIME INDEX (ts)) engine=mito;";
1311        let stmt =
1312            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1313                .unwrap()
1314                .pop()
1315                .unwrap();
1316
1317        let Statement::CreateTable(create_table) = stmt else {
1318            unreachable!()
1319        };
1320
1321        // query context with system timezone UTC.
1322        let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1323        let ts_column = &expr.column_defs[1];
1324        let constraint = assert_ts_column(ts_column);
1325        assert!(
1326            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1327                         if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1328        );
1329
1330        // query context with timezone `+08:00`
1331        let ctx = QueryContextBuilder::default()
1332            .timezone(Timezone::from_tz_string("+08:00").unwrap())
1333            .build()
1334            .into();
1335        let expr = create_to_expr(&create_table, &ctx).unwrap();
1336        let ts_column = &expr.column_defs[1];
1337        let constraint = assert_ts_column(ts_column);
1338        assert!(
1339            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1340                         if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1341        );
1342    }
1343
1344    fn assert_ts_column(ts_column: &api::v1::ColumnDef) -> ColumnDefaultConstraint {
1345        assert_eq!("ts", ts_column.name);
1346        assert_eq!(
1347            ColumnDataType::TimestampMillisecond as i32,
1348            ts_column.data_type
1349        );
1350        assert!(!ts_column.default_constraint.is_empty());
1351
1352        ColumnDefaultConstraint::try_from(&ts_column.default_constraint[..]).unwrap()
1353    }
1354
1355    #[test]
1356    fn test_to_alter_expr() {
1357        let sql = "ALTER DATABASE greptime SET key1='value1', key2='value2';";
1358        let stmt =
1359            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1360                .unwrap()
1361                .pop()
1362                .unwrap();
1363
1364        let Statement::AlterDatabase(alter_database) = stmt else {
1365            unreachable!()
1366        };
1367
1368        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1369        let kind = expr.kind.unwrap();
1370
1371        let AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
1372            set_database_options,
1373        }) = kind
1374        else {
1375            unreachable!()
1376        };
1377
1378        assert_eq!(2, set_database_options.len());
1379        assert_eq!("key1", set_database_options[0].key);
1380        assert_eq!("value1", set_database_options[0].value);
1381        assert_eq!("key2", set_database_options[1].key);
1382        assert_eq!("value2", set_database_options[1].value);
1383
1384        let sql = "ALTER DATABASE greptime UNSET key1, key2;";
1385        let stmt =
1386            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1387                .unwrap()
1388                .pop()
1389                .unwrap();
1390
1391        let Statement::AlterDatabase(alter_database) = stmt else {
1392            unreachable!()
1393        };
1394
1395        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1396        let kind = expr.kind.unwrap();
1397
1398        let AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys }) = kind else {
1399            unreachable!()
1400        };
1401
1402        assert_eq!(2, keys.len());
1403        assert!(keys.contains(&"key1".to_string()));
1404        assert!(keys.contains(&"key2".to_string()));
1405
1406        let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
1407        let stmt =
1408            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1409                .unwrap()
1410                .pop()
1411                .unwrap();
1412
1413        let Statement::AlterTable(alter_table) = stmt else {
1414            unreachable!()
1415        };
1416
1417        // query context with system timezone UTC.
1418        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1419        let kind = expr.kind.unwrap();
1420
1421        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1422            unreachable!()
1423        };
1424
1425        assert_eq!(1, add_columns.len());
1426        let ts_column = add_columns[0].column_def.clone().unwrap();
1427        let constraint = assert_ts_column(&ts_column);
1428        assert!(
1429            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1430                         if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1431        );
1432
1433        //
1434        // query context with timezone `+08:00`
1435        let ctx = QueryContextBuilder::default()
1436            .timezone(Timezone::from_tz_string("+08:00").unwrap())
1437            .build()
1438            .into();
1439        let expr = to_alter_table_expr(alter_table, &ctx).unwrap();
1440        let kind = expr.kind.unwrap();
1441
1442        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1443            unreachable!()
1444        };
1445
1446        assert_eq!(1, add_columns.len());
1447        let ts_column = add_columns[0].column_def.clone().unwrap();
1448        let constraint = assert_ts_column(&ts_column);
1449        assert!(
1450            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1451                         if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1452        );
1453    }
1454
1455    #[test]
1456    fn test_to_alter_modify_column_type_expr() {
1457        let sql = "ALTER TABLE monitor MODIFY COLUMN mem_usage STRING;";
1458        let stmt =
1459            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1460                .unwrap()
1461                .pop()
1462                .unwrap();
1463
1464        let Statement::AlterTable(alter_table) = stmt else {
1465            unreachable!()
1466        };
1467
1468        // query context with system timezone UTC.
1469        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1470        let kind = expr.kind.unwrap();
1471
1472        let AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
1473            modify_column_types,
1474        }) = kind
1475        else {
1476            unreachable!()
1477        };
1478
1479        assert_eq!(1, modify_column_types.len());
1480        let modify_column_type = &modify_column_types[0];
1481
1482        assert_eq!("mem_usage", modify_column_type.column_name);
1483        assert_eq!(
1484            ColumnDataType::String as i32,
1485            modify_column_type.target_type
1486        );
1487        assert!(modify_column_type.target_type_extension.is_none());
1488    }
1489
1490    #[test]
1491    fn test_to_repartition_request() {
1492        let sql = r#"
1493ALTER TABLE metrics REPARTITION (
1494  device_id < 100
1495) INTO (
1496  device_id < 100 AND area < 'South',
1497  device_id < 100 AND area >= 'South'
1498);"#;
1499        let stmt =
1500            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1501                .unwrap()
1502                .pop()
1503                .unwrap();
1504
1505        let Statement::AlterTable(alter_table) = stmt else {
1506            unreachable!()
1507        };
1508
1509        let request = to_repartition_request(alter_table, &QueryContext::arc()).unwrap();
1510        assert_eq!("greptime", request.catalog_name);
1511        assert_eq!("public", request.schema_name);
1512        assert_eq!("metrics", request.table_name);
1513        assert_eq!(
1514            request
1515                .from_exprs
1516                .into_iter()
1517                .map(|x| x.to_string())
1518                .collect::<Vec<_>>(),
1519            vec!["device_id < 100".to_string()]
1520        );
1521        assert_eq!(
1522            request
1523                .into_exprs
1524                .into_iter()
1525                .map(|x| x.to_string())
1526                .collect::<Vec<_>>(),
1527            vec![
1528                "device_id < 100 AND area < 'South'".to_string(),
1529                "device_id < 100 AND area >= 'South'".to_string()
1530            ]
1531        );
1532    }
1533
1534    fn new_test_table_names() -> Vec<TableName> {
1535        vec![
1536            TableName {
1537                catalog_name: "greptime".to_string(),
1538                schema_name: "public".to_string(),
1539                table_name: "a_table".to_string(),
1540            },
1541            TableName {
1542                catalog_name: "greptime".to_string(),
1543                schema_name: "public".to_string(),
1544                table_name: "b_table".to_string(),
1545            },
1546        ]
1547    }
1548
1549    #[test]
1550    fn test_to_create_view_expr() {
1551        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
1552        let stmt =
1553            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1554                .unwrap()
1555                .pop()
1556                .unwrap();
1557
1558        let Statement::CreateView(stmt) = stmt else {
1559            unreachable!()
1560        };
1561
1562        let logical_plan = vec![1, 2, 3];
1563        let table_names = new_test_table_names();
1564        let columns = vec!["a".to_string()];
1565        let plan_columns = vec!["number".to_string()];
1566
1567        let expr = to_create_view_expr(
1568            stmt,
1569            logical_plan.clone(),
1570            table_names.clone(),
1571            columns.clone(),
1572            plan_columns.clone(),
1573            sql.to_string(),
1574            QueryContext::arc(),
1575        )
1576        .unwrap();
1577
1578        assert_eq!("greptime", expr.catalog_name);
1579        assert_eq!("public", expr.schema_name);
1580        assert_eq!("test", expr.view_name);
1581        assert!(!expr.create_if_not_exists);
1582        assert!(!expr.or_replace);
1583        assert_eq!(logical_plan, expr.logical_plan);
1584        assert_eq!(table_names, expr.table_names);
1585        assert_eq!(sql, expr.definition);
1586        assert_eq!(columns, expr.columns);
1587        assert_eq!(plan_columns, expr.plan_columns);
1588    }
1589
1590    #[test]
1591    fn test_to_create_view_expr_complex() {
1592        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test.test_view AS SELECT * FROM NUMBERS";
1593        let stmt =
1594            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1595                .unwrap()
1596                .pop()
1597                .unwrap();
1598
1599        let Statement::CreateView(stmt) = stmt else {
1600            unreachable!()
1601        };
1602
1603        let logical_plan = vec![1, 2, 3];
1604        let table_names = new_test_table_names();
1605        let columns = vec!["a".to_string()];
1606        let plan_columns = vec!["number".to_string()];
1607
1608        let expr = to_create_view_expr(
1609            stmt,
1610            logical_plan.clone(),
1611            table_names.clone(),
1612            columns.clone(),
1613            plan_columns.clone(),
1614            sql.to_string(),
1615            QueryContext::arc(),
1616        )
1617        .unwrap();
1618
1619        assert_eq!("greptime", expr.catalog_name);
1620        assert_eq!("test", expr.schema_name);
1621        assert_eq!("test_view", expr.view_name);
1622        assert!(expr.create_if_not_exists);
1623        assert!(expr.or_replace);
1624        assert_eq!(logical_plan, expr.logical_plan);
1625        assert_eq!(table_names, expr.table_names);
1626        assert_eq!(sql, expr.definition);
1627        assert_eq!(columns, expr.columns);
1628        assert_eq!(plan_columns, expr.plan_columns);
1629    }
1630
1631    #[test]
1632    fn test_expr_to_create() {
1633        let sql = r#"CREATE TABLE IF NOT EXISTS `tt` (
1634  `timestamp` TIMESTAMP(9) NOT NULL,
1635  `ip_address` STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),
1636  `username` STRING NULL,
1637  `http_method` STRING NULL INVERTED INDEX,
1638  `request_line` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),
1639  `protocol` STRING NULL,
1640  `status_code` INT NULL INVERTED INDEX,
1641  `response_size` BIGINT NULL,
1642  `message` STRING NULL,
1643  TIME INDEX (`timestamp`),
1644  PRIMARY KEY (`username`, `status_code`)
1645)
1646ENGINE=mito
1647WITH(
1648  append_mode = 'true'
1649)"#;
1650        let stmt =
1651            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1652                .unwrap()
1653                .pop()
1654                .unwrap();
1655
1656        let Statement::CreateTable(original_create) = stmt else {
1657            unreachable!()
1658        };
1659
1660        // Convert CreateTable -> CreateTableExpr -> CreateTable
1661        let expr = create_to_expr(&original_create, &QueryContext::arc()).unwrap();
1662
1663        let create_table = expr_to_create(&expr, Some('`')).unwrap();
1664        let new_sql = format!("{:#}", create_table);
1665        assert_eq!(sql, new_sql);
1666    }
1667}