operator/
expr_helper.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19
20use api::helper::ColumnDataTypeWrapper;
21use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
22use api::v1::alter_table_expr::Kind as AlterTableKind;
23use api::v1::column_def::{options_from_column_schema, try_as_column_schema};
24use api::v1::{
25    AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer, ColumnDataType,
26    ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn,
27    DropColumns, DropDefaults, ExpireAfter, FulltextBackend as PbFulltextBackend, ModifyColumnType,
28    ModifyColumnTypes, RenameTable, SemanticType, SetDatabaseOptions, SetDefaults, SetFulltext,
29    SetIndex, SetIndexes, SetInverted, SetSkipping, SetTableOptions,
30    SkippingIndexType as PbSkippingIndexType, TableName, UnsetDatabaseOptions, UnsetFulltext,
31    UnsetIndex, UnsetIndexes, UnsetInverted, UnsetSkipping, UnsetTableOptions, set_index,
32    unset_index,
33};
34use common_error::ext::BoxedError;
35use common_grpc_expr::util::ColumnExpr;
36use common_time::Timezone;
37use datafusion::sql::planner::object_name_to_table_reference;
38use datatypes::schema::{
39    COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
40    COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
41    COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
42    COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
43    ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema,
44    SkippingIndexType,
45};
46use file_engine::FileOptions;
47use query::sql::{
48    check_file_to_table_schema_compatibility, file_column_schemas_to_table,
49    infer_file_table_schema, prepare_file_table_files,
50};
51use session::context::QueryContextRef;
52use session::table_name::table_idents_to_full_name;
53use snafu::{OptionExt, ResultExt, ensure};
54use sql::ast::{
55    ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName, ObjectNamePartExt,
56};
57use sql::dialect::GreptimeDbDialect;
58use sql::parser::ParserContext;
59use sql::statements::alter::{
60    AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
61};
62use sql::statements::create::{
63    Column as SqlColumn, ColumnExtensions, CreateExternalTable, CreateFlow, CreateTable,
64    CreateView, TableConstraint,
65};
66use sql::statements::{
67    OptionMap, column_to_schema, concrete_data_type_to_sql_data_type,
68    sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type, value_to_sql_value,
69};
70use sql::util::extract_tables_from_query;
71use store_api::mito_engine_options::{COMPACTION_OVERRIDE, COMPACTION_TYPE};
72use table::requests::{FILE_TABLE_META_KEY, TableOptions};
73use table::table_reference::TableReference;
74#[cfg(feature = "enterprise")]
75pub use trigger::to_create_trigger_task_expr;
76
77use crate::error::{
78    BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
79    ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, FindNewColumnsOnInsertionSnafu,
80    IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidColumnDefSnafu,
81    InvalidFlowNameSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, ParseSqlValueSnafu,
82    PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu,
83};
84
85pub fn create_table_expr_by_column_schemas(
86    table_name: &TableReference<'_>,
87    column_schemas: &[api::v1::ColumnSchema],
88    engine: &str,
89    desc: Option<&str>,
90) -> Result<CreateTableExpr> {
91    let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
92    let expr = common_grpc_expr::util::build_create_table_expr(
93        None,
94        table_name,
95        column_exprs,
96        engine,
97        desc.unwrap_or("Created on insertion"),
98    )
99    .context(BuildCreateExprOnInsertionSnafu)?;
100
101    validate_create_expr(&expr)?;
102    Ok(expr)
103}
104
105pub fn extract_add_columns_expr(
106    schema: &Schema,
107    column_exprs: Vec<ColumnExpr>,
108) -> Result<Option<AddColumns>> {
109    let add_columns = common_grpc_expr::util::extract_new_columns(schema, column_exprs)
110        .context(FindNewColumnsOnInsertionSnafu)?;
111    if let Some(add_columns) = &add_columns {
112        validate_add_columns_expr(add_columns)?;
113    }
114    Ok(add_columns)
115}
116
117//   cpu float64,
118//   memory float64,
119//   TIME INDEX (ts),
120//   PRIMARY KEY(host)
121// ) WITH (location='/var/data/city.csv', format='csv');
122// ```
123// The user needs to specify the TIME INDEX column. If there is no suitable
124// column in the file to use as TIME INDEX, an additional placeholder column
125// needs to be created as the TIME INDEX, and a `DEFAULT <value>` constraint
126// should be added.
127//
128//
129// When the `CREATE EXTERNAL TABLE` statement is in inferred form, like
130// ```sql
131// CREATE EXTERNAL TABLE IF NOT EXISTS city WITH (location='/var/data/city.csv',format='csv');
132// ```
133// 1. If the TIME INDEX column can be inferred from metadata, use that column
134//    as the TIME INDEX. Otherwise,
135// 2. If a column named `greptime_timestamp` exists (with the requirement that
136//    the column is with type TIMESTAMP, otherwise an error is thrown), use
137//    that column as the TIME INDEX. Otherwise,
138// 3. Automatically create the `greptime_timestamp` column and add a `DEFAULT 0`
139//    constraint.
140pub(crate) async fn create_external_expr(
141    create: CreateExternalTable,
142    query_ctx: &QueryContextRef,
143) -> Result<CreateTableExpr> {
144    let (catalog_name, schema_name, table_name) =
145        table_idents_to_full_name(&create.name, query_ctx)
146            .map_err(BoxedError::new)
147            .context(ExternalSnafu)?;
148
149    let mut table_options = create.options.into_map();
150
151    let (object_store, files) = prepare_file_table_files(&table_options)
152        .await
153        .context(PrepareFileTableSnafu)?;
154
155    let file_column_schemas = infer_file_table_schema(&object_store, &files, &table_options)
156        .await
157        .context(InferFileTableSchemaSnafu)?
158        .column_schemas()
159        .to_vec();
160
161    let (time_index, primary_keys, table_column_schemas) = if !create.columns.is_empty() {
162        // expanded form
163        let time_index = find_time_index(&create.constraints)?;
164        let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
165        let column_schemas =
166            columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
167        (time_index, primary_keys, column_schemas)
168    } else {
169        // inferred form
170        let (column_schemas, time_index) = file_column_schemas_to_table(&file_column_schemas);
171        let primary_keys = vec![];
172        (time_index, primary_keys, column_schemas)
173    };
174
175    check_file_to_table_schema_compatibility(&file_column_schemas, &table_column_schemas)
176        .context(SchemaIncompatibleSnafu)?;
177
178    let meta = FileOptions {
179        files,
180        file_column_schemas,
181    };
182    table_options.insert(
183        FILE_TABLE_META_KEY.to_string(),
184        serde_json::to_string(&meta).context(EncodeJsonSnafu)?,
185    );
186
187    let column_defs = column_schemas_to_defs(table_column_schemas, &primary_keys)?;
188    let expr = CreateTableExpr {
189        catalog_name,
190        schema_name,
191        table_name,
192        desc: String::default(),
193        column_defs,
194        time_index,
195        primary_keys,
196        create_if_not_exists: create.if_not_exists,
197        table_options,
198        table_id: None,
199        engine: create.engine.clone(),
200    };
201
202    Ok(expr)
203}
204
205/// Convert `CreateTable` statement to [`CreateTableExpr`] gRPC request.
206pub fn create_to_expr(
207    create: &CreateTable,
208    query_ctx: &QueryContextRef,
209) -> Result<CreateTableExpr> {
210    let (catalog_name, schema_name, table_name) =
211        table_idents_to_full_name(&create.name, query_ctx)
212            .map_err(BoxedError::new)
213            .context(ExternalSnafu)?;
214
215    let time_index = find_time_index(&create.constraints)?;
216    let table_options = HashMap::from(
217        &TableOptions::try_from_iter(create.options.to_str_map())
218            .context(UnrecognizedTableOptionSnafu)?,
219    );
220
221    let mut table_options = table_options;
222    if table_options.contains_key(COMPACTION_TYPE) {
223        table_options.insert(COMPACTION_OVERRIDE.to_string(), "true".to_string());
224    }
225
226    let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
227
228    let expr = CreateTableExpr {
229        catalog_name,
230        schema_name,
231        table_name,
232        desc: String::default(),
233        column_defs: columns_to_expr(
234            &create.columns,
235            &time_index,
236            &primary_keys,
237            Some(&query_ctx.timezone()),
238        )?,
239        time_index,
240        primary_keys,
241        create_if_not_exists: create.if_not_exists,
242        table_options,
243        table_id: None,
244        engine: create.engine.clone(),
245    };
246
247    validate_create_expr(&expr)?;
248    Ok(expr)
249}
250
251/// Convert gRPC's [`CreateTableExpr`] back to `CreateTable` statement.
252/// You can use `create_table_expr_by_column_schemas` to create a `CreateTableExpr` from column schemas.
253///
254/// # Parameters
255///
256/// * `expr` - The `CreateTableExpr` to convert
257/// * `quote_style` - Optional quote style for identifiers (defaults to MySQL style ` backtick)
258pub fn expr_to_create(expr: &CreateTableExpr, quote_style: Option<char>) -> Result<CreateTable> {
259    let quote_style = quote_style.unwrap_or('`');
260
261    // Convert table name
262    let table_name = ObjectName(vec![sql::ast::ObjectNamePart::Identifier(
263        sql::ast::Ident::with_quote(quote_style, &expr.table_name),
264    )]);
265
266    // Convert columns
267    let mut columns = Vec::with_capacity(expr.column_defs.len());
268    for column_def in &expr.column_defs {
269        let column_schema = try_as_column_schema(column_def).context(InvalidColumnDefSnafu {
270            column: &column_def.name,
271        })?;
272
273        let mut options = Vec::new();
274
275        // Add NULL/NOT NULL constraint
276        if column_def.is_nullable {
277            options.push(ColumnOptionDef {
278                name: None,
279                option: ColumnOption::Null,
280            });
281        } else {
282            options.push(ColumnOptionDef {
283                name: None,
284                option: ColumnOption::NotNull,
285            });
286        }
287
288        // Add DEFAULT constraint if present
289        if let Some(default_constraint) = column_schema.default_constraint() {
290            let expr = match default_constraint {
291                ColumnDefaultConstraint::Value(v) => {
292                    Expr::Value(value_to_sql_value(v).context(ParseSqlValueSnafu)?.into())
293                }
294                ColumnDefaultConstraint::Function(func_expr) => {
295                    ParserContext::parse_function(func_expr, &GreptimeDbDialect {})
296                        .context(ParseSqlSnafu)?
297                }
298            };
299            options.push(ColumnOptionDef {
300                name: None,
301                option: ColumnOption::Default(expr),
302            });
303        }
304
305        // Add COMMENT if present
306        if !column_def.comment.is_empty() {
307            options.push(ColumnOptionDef {
308                name: None,
309                option: ColumnOption::Comment(column_def.comment.clone()),
310            });
311        }
312
313        // Note: We don't add inline PRIMARY KEY options here,
314        // we'll handle all primary keys as constraints instead for consistency
315
316        // Handle column extensions (fulltext, inverted index, skipping index)
317        let mut extensions = ColumnExtensions::default();
318
319        // Add fulltext index options if present
320        if let Ok(Some(opt)) = column_schema.fulltext_options()
321            && opt.enable
322        {
323            let mut map = HashMap::from([
324                (
325                    COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
326                    opt.analyzer.to_string(),
327                ),
328                (
329                    COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
330                    opt.case_sensitive.to_string(),
331                ),
332                (
333                    COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
334                    opt.backend.to_string(),
335                ),
336            ]);
337            if opt.backend == FulltextBackend::Bloom {
338                map.insert(
339                    COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(),
340                    opt.granularity.to_string(),
341                );
342                map.insert(
343                    COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
344                    opt.false_positive_rate().to_string(),
345                );
346            }
347            extensions.fulltext_index_options = Some(map.into());
348        }
349
350        // Add skipping index options if present
351        if let Ok(Some(opt)) = column_schema.skipping_index_options() {
352            let map = HashMap::from([
353                (
354                    COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
355                    opt.granularity.to_string(),
356                ),
357                (
358                    COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
359                    opt.false_positive_rate().to_string(),
360                ),
361                (
362                    COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
363                    opt.index_type.to_string(),
364                ),
365            ]);
366            extensions.skipping_index_options = Some(map.into());
367        }
368
369        // Add inverted index options if present
370        if column_schema.is_inverted_indexed() {
371            extensions.inverted_index_options = Some(HashMap::new().into());
372        }
373
374        let sql_column = SqlColumn {
375            column_def: ColumnDef {
376                name: Ident::with_quote(quote_style, &column_def.name),
377                data_type: concrete_data_type_to_sql_data_type(&column_schema.data_type)
378                    .context(ParseSqlSnafu)?,
379                options,
380            },
381            extensions,
382        };
383
384        columns.push(sql_column);
385    }
386
387    // Convert constraints
388    let mut constraints = Vec::new();
389
390    // Add TIME INDEX constraint
391    constraints.push(TableConstraint::TimeIndex {
392        column: Ident::with_quote(quote_style, &expr.time_index),
393    });
394
395    // Add PRIMARY KEY constraint (always add as constraint for consistency)
396    if !expr.primary_keys.is_empty() {
397        let primary_key_columns: Vec<Ident> = expr
398            .primary_keys
399            .iter()
400            .map(|pk| Ident::with_quote(quote_style, pk))
401            .collect();
402
403        constraints.push(TableConstraint::PrimaryKey {
404            columns: primary_key_columns,
405        });
406    }
407
408    // Convert table options
409    let mut options = OptionMap::default();
410    for (key, value) in &expr.table_options {
411        options.insert(key.clone(), value.clone());
412    }
413
414    Ok(CreateTable {
415        if_not_exists: expr.create_if_not_exists,
416        table_id: expr.table_id.as_ref().map(|tid| tid.id).unwrap_or(0),
417        name: table_name,
418        columns,
419        engine: expr.engine.clone(),
420        constraints,
421        options,
422        partitions: None,
423    })
424}
425
426/// Validate the [`CreateTableExpr`] request.
427pub fn validate_create_expr(create: &CreateTableExpr) -> Result<()> {
428    // construct column list
429    let mut column_to_indices = HashMap::with_capacity(create.column_defs.len());
430    for (idx, column) in create.column_defs.iter().enumerate() {
431        if let Some(indices) = column_to_indices.get(&column.name) {
432            return InvalidSqlSnafu {
433                err_msg: format!(
434                    "column name `{}` is duplicated at index {} and {}",
435                    column.name, indices, idx
436                ),
437            }
438            .fail();
439        }
440        column_to_indices.insert(&column.name, idx);
441    }
442
443    // verify time_index exists
444    let _ = column_to_indices
445        .get(&create.time_index)
446        .with_context(|| InvalidSqlSnafu {
447            err_msg: format!(
448                "column name `{}` is not found in column list",
449                create.time_index
450            ),
451        })?;
452
453    // verify primary_key exists
454    for pk in &create.primary_keys {
455        let _ = column_to_indices
456            .get(&pk)
457            .with_context(|| InvalidSqlSnafu {
458                err_msg: format!("column name `{}` is not found in column list", pk),
459            })?;
460    }
461
462    // construct primary_key set
463    let mut pk_set = HashSet::new();
464    for pk in &create.primary_keys {
465        if !pk_set.insert(pk) {
466            return InvalidSqlSnafu {
467                err_msg: format!("column name `{}` is duplicated in primary keys", pk),
468            }
469            .fail();
470        }
471    }
472
473    // verify time index is not primary key
474    if pk_set.contains(&create.time_index) {
475        return InvalidSqlSnafu {
476            err_msg: format!(
477                "column name `{}` is both primary key and time index",
478                create.time_index
479            ),
480        }
481        .fail();
482    }
483
484    for column in &create.column_defs {
485        // verify do not contain interval type column issue #3235
486        if is_interval_type(&column.data_type()) {
487            return InvalidSqlSnafu {
488                err_msg: format!(
489                    "column name `{}` is interval type, which is not supported",
490                    column.name
491                ),
492            }
493            .fail();
494        }
495        // verify do not contain datetime type column issue #5489
496        if is_date_time_type(&column.data_type()) {
497            return InvalidSqlSnafu {
498                err_msg: format!(
499                    "column name `{}` is datetime type, which is not supported, please use `timestamp` type instead",
500                    column.name
501                ),
502            }
503            .fail();
504        }
505    }
506    Ok(())
507}
508
509fn validate_add_columns_expr(add_columns: &AddColumns) -> Result<()> {
510    for add_column in &add_columns.add_columns {
511        let Some(column_def) = &add_column.column_def else {
512            continue;
513        };
514        if is_date_time_type(&column_def.data_type()) {
515            return InvalidSqlSnafu {
516                    err_msg: format!("column name `{}` is datetime type, which is not supported, please use `timestamp` type instead", column_def.name),
517                }
518                .fail();
519        }
520        if is_interval_type(&column_def.data_type()) {
521            return InvalidSqlSnafu {
522                err_msg: format!(
523                    "column name `{}` is interval type, which is not supported",
524                    column_def.name
525                ),
526            }
527            .fail();
528        }
529    }
530    Ok(())
531}
532
533fn is_date_time_type(data_type: &ColumnDataType) -> bool {
534    matches!(data_type, ColumnDataType::Datetime)
535}
536
537fn is_interval_type(data_type: &ColumnDataType) -> bool {
538    matches!(
539        data_type,
540        ColumnDataType::IntervalYearMonth
541            | ColumnDataType::IntervalDayTime
542            | ColumnDataType::IntervalMonthDayNano
543    )
544}
545
546fn find_primary_keys(
547    columns: &[SqlColumn],
548    constraints: &[TableConstraint],
549) -> Result<Vec<String>> {
550    let columns_pk = columns
551        .iter()
552        .filter_map(|x| {
553            if x.options()
554                .iter()
555                .any(|o| matches!(o.option, ColumnOption::PrimaryKey(_)))
556            {
557                Some(x.name().value.clone())
558            } else {
559                None
560            }
561        })
562        .collect::<Vec<String>>();
563
564    ensure!(
565        columns_pk.len() <= 1,
566        IllegalPrimaryKeysDefSnafu {
567            msg: "not allowed to inline multiple primary keys in columns options"
568        }
569    );
570
571    let constraints_pk = constraints
572        .iter()
573        .filter_map(|constraint| match constraint {
574            TableConstraint::PrimaryKey { columns, .. } => {
575                Some(columns.iter().map(|ident| ident.value.clone()))
576            }
577            _ => None,
578        })
579        .flatten()
580        .collect::<Vec<String>>();
581
582    ensure!(
583        columns_pk.is_empty() || constraints_pk.is_empty(),
584        IllegalPrimaryKeysDefSnafu {
585            msg: "found definitions of primary keys in multiple places"
586        }
587    );
588
589    let mut primary_keys = Vec::with_capacity(columns_pk.len() + constraints_pk.len());
590    primary_keys.extend(columns_pk);
591    primary_keys.extend(constraints_pk);
592    Ok(primary_keys)
593}
594
595pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
596    let time_index = constraints
597        .iter()
598        .filter_map(|constraint| match constraint {
599            TableConstraint::TimeIndex { column, .. } => Some(&column.value),
600            _ => None,
601        })
602        .collect::<Vec<&String>>();
603    ensure!(
604        time_index.len() == 1,
605        InvalidSqlSnafu {
606            err_msg: "must have one and only one TimeIndex columns",
607        }
608    );
609    Ok(time_index[0].clone())
610}
611
612fn columns_to_expr(
613    column_defs: &[SqlColumn],
614    time_index: &str,
615    primary_keys: &[String],
616    timezone: Option<&Timezone>,
617) -> Result<Vec<api::v1::ColumnDef>> {
618    let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
619    column_schemas_to_defs(column_schemas, primary_keys)
620}
621
622fn columns_to_column_schemas(
623    columns: &[SqlColumn],
624    time_index: &str,
625    timezone: Option<&Timezone>,
626) -> Result<Vec<ColumnSchema>> {
627    columns
628        .iter()
629        .map(|c| column_to_schema(c, time_index, timezone).context(ParseSqlSnafu))
630        .collect::<Result<Vec<ColumnSchema>>>()
631}
632
633// TODO(weny): refactor this function to use `try_as_column_def`
634pub fn column_schemas_to_defs(
635    column_schemas: Vec<ColumnSchema>,
636    primary_keys: &[String],
637) -> Result<Vec<api::v1::ColumnDef>> {
638    let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
639        .iter()
640        .map(|c| {
641            ColumnDataTypeWrapper::try_from(c.data_type.clone())
642                .map(|w| w.to_parts())
643                .context(ColumnDataTypeSnafu)
644        })
645        .collect::<Result<Vec<_>>>()?;
646
647    column_schemas
648        .iter()
649        .zip(column_datatypes)
650        .map(|(schema, datatype)| {
651            let semantic_type = if schema.is_time_index() {
652                SemanticType::Timestamp
653            } else if primary_keys.contains(&schema.name) {
654                SemanticType::Tag
655            } else {
656                SemanticType::Field
657            } as i32;
658            let comment = schema
659                .metadata()
660                .get(COMMENT_KEY)
661                .cloned()
662                .unwrap_or_default();
663
664            Ok(api::v1::ColumnDef {
665                name: schema.name.clone(),
666                data_type: datatype.0 as i32,
667                is_nullable: schema.is_nullable(),
668                default_constraint: match schema.default_constraint() {
669                    None => vec![],
670                    Some(v) => {
671                        v.clone()
672                            .try_into()
673                            .context(ConvertColumnDefaultConstraintSnafu {
674                                column_name: &schema.name,
675                            })?
676                    }
677                },
678                semantic_type,
679                comment,
680                datatype_extension: datatype.1,
681                options: options_from_column_schema(schema),
682            })
683        })
684        .collect()
685}
686
687#[derive(Debug, Clone, PartialEq, Eq)]
688pub struct RepartitionRequest {
689    pub catalog_name: String,
690    pub schema_name: String,
691    pub table_name: String,
692    pub from_exprs: Vec<Expr>,
693    pub into_exprs: Vec<Expr>,
694    pub options: OptionMap,
695}
696
697pub(crate) fn to_repartition_request(
698    alter_table: AlterTable,
699    query_ctx: &QueryContextRef,
700) -> Result<RepartitionRequest> {
701    let AlterTable {
702        table_name,
703        alter_operation,
704        options,
705    } = alter_table;
706
707    let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&table_name, query_ctx)
708        .map_err(BoxedError::new)
709        .context(ExternalSnafu)?;
710
711    let AlterTableOperation::Repartition { operation } = alter_operation else {
712        return InvalidSqlSnafu {
713            err_msg: "expected REPARTITION operation",
714        }
715        .fail();
716    };
717
718    Ok(RepartitionRequest {
719        catalog_name,
720        schema_name,
721        table_name,
722        from_exprs: operation.from_exprs,
723        into_exprs: operation.into_exprs,
724        options,
725    })
726}
727
728/// Converts a SQL alter table statement into a gRPC alter table expression.
729pub(crate) fn to_alter_table_expr(
730    alter_table: AlterTable,
731    query_ctx: &QueryContextRef,
732) -> Result<AlterTableExpr> {
733    let (catalog_name, schema_name, table_name) =
734        table_idents_to_full_name(alter_table.table_name(), query_ctx)
735            .map_err(BoxedError::new)
736            .context(ExternalSnafu)?;
737
738    let kind = match alter_table.alter_operation {
739        AlterTableOperation::AddConstraint(_) => {
740            return NotSupportedSnafu {
741                feat: "ADD CONSTRAINT",
742            }
743            .fail();
744        }
745        AlterTableOperation::AddColumns { add_columns } => AlterTableKind::AddColumns(AddColumns {
746            add_columns: add_columns
747                .into_iter()
748                .map(|add_column| {
749                    let column_def = sql_column_def_to_grpc_column_def(
750                        &add_column.column_def,
751                        Some(&query_ctx.timezone()),
752                    )
753                    .map_err(BoxedError::new)
754                    .context(ExternalSnafu)?;
755                    if is_interval_type(&column_def.data_type()) {
756                        return NotSupportedSnafu {
757                            feat: "Add column with interval type",
758                        }
759                        .fail();
760                    }
761                    Ok(AddColumn {
762                        column_def: Some(column_def),
763                        location: add_column.location.as_ref().map(From::from),
764                        add_if_not_exists: add_column.add_if_not_exists,
765                    })
766                })
767                .collect::<Result<Vec<AddColumn>>>()?,
768        }),
769        AlterTableOperation::ModifyColumnType {
770            column_name,
771            target_type,
772        } => {
773            let target_type =
774                sql_data_type_to_concrete_data_type(&target_type, &Default::default())
775                    .context(ParseSqlSnafu)?;
776            let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
777                .map(|w| w.to_parts())
778                .context(ColumnDataTypeSnafu)?;
779            if is_interval_type(&target_type) {
780                return NotSupportedSnafu {
781                    feat: "Modify column type to interval type",
782                }
783                .fail();
784            }
785            AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
786                modify_column_types: vec![ModifyColumnType {
787                    column_name: column_name.value,
788                    target_type: target_type as i32,
789                    target_type_extension,
790                }],
791            })
792        }
793        AlterTableOperation::DropColumn { name } => AlterTableKind::DropColumns(DropColumns {
794            drop_columns: vec![DropColumn {
795                name: name.value.clone(),
796            }],
797        }),
798        AlterTableOperation::RenameTable { new_table_name } => {
799            AlterTableKind::RenameTable(RenameTable {
800                new_table_name: new_table_name.clone(),
801            })
802        }
803        AlterTableOperation::SetTableOptions { options } => {
804            AlterTableKind::SetTableOptions(SetTableOptions {
805                table_options: options.into_iter().map(Into::into).collect(),
806            })
807        }
808        AlterTableOperation::UnsetTableOptions { keys } => {
809            AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys })
810        }
811        AlterTableOperation::Repartition { .. } => {
812            return NotSupportedSnafu {
813                feat: "ALTER TABLE ... REPARTITION",
814            }
815            .fail();
816        }
817        AlterTableOperation::SetIndex { options } => {
818            let option = match options {
819                sql::statements::alter::SetIndexOperation::Fulltext {
820                    column_name,
821                    options,
822                } => SetIndex {
823                    options: Some(set_index::Options::Fulltext(SetFulltext {
824                        column_name: column_name.value,
825                        enable: options.enable,
826                        analyzer: match options.analyzer {
827                            FulltextAnalyzer::English => Analyzer::English.into(),
828                            FulltextAnalyzer::Chinese => Analyzer::Chinese.into(),
829                        },
830                        case_sensitive: options.case_sensitive,
831                        backend: match options.backend {
832                            FulltextBackend::Bloom => PbFulltextBackend::Bloom.into(),
833                            FulltextBackend::Tantivy => PbFulltextBackend::Tantivy.into(),
834                        },
835                        granularity: options.granularity as u64,
836                        false_positive_rate: options.false_positive_rate(),
837                    })),
838                },
839                sql::statements::alter::SetIndexOperation::Inverted { column_name } => SetIndex {
840                    options: Some(set_index::Options::Inverted(SetInverted {
841                        column_name: column_name.value,
842                    })),
843                },
844                sql::statements::alter::SetIndexOperation::Skipping {
845                    column_name,
846                    options,
847                } => SetIndex {
848                    options: Some(set_index::Options::Skipping(SetSkipping {
849                        column_name: column_name.value,
850                        enable: true,
851                        granularity: options.granularity as u64,
852                        false_positive_rate: options.false_positive_rate(),
853                        skipping_index_type: match options.index_type {
854                            SkippingIndexType::BloomFilter => {
855                                PbSkippingIndexType::BloomFilter.into()
856                            }
857                        },
858                    })),
859                },
860            };
861            AlterTableKind::SetIndexes(SetIndexes {
862                set_indexes: vec![option],
863            })
864        }
865        AlterTableOperation::UnsetIndex { options } => {
866            let option = match options {
867                sql::statements::alter::UnsetIndexOperation::Fulltext { column_name } => {
868                    UnsetIndex {
869                        options: Some(unset_index::Options::Fulltext(UnsetFulltext {
870                            column_name: column_name.value,
871                        })),
872                    }
873                }
874                sql::statements::alter::UnsetIndexOperation::Inverted { column_name } => {
875                    UnsetIndex {
876                        options: Some(unset_index::Options::Inverted(UnsetInverted {
877                            column_name: column_name.value,
878                        })),
879                    }
880                }
881                sql::statements::alter::UnsetIndexOperation::Skipping { column_name } => {
882                    UnsetIndex {
883                        options: Some(unset_index::Options::Skipping(UnsetSkipping {
884                            column_name: column_name.value,
885                        })),
886                    }
887                }
888            };
889
890            AlterTableKind::UnsetIndexes(UnsetIndexes {
891                unset_indexes: vec![option],
892            })
893        }
894        AlterTableOperation::DropDefaults { columns } => {
895            AlterTableKind::DropDefaults(DropDefaults {
896                drop_defaults: columns
897                    .into_iter()
898                    .map(|col| {
899                        let column_name = col.0.to_string();
900                        Ok(api::v1::DropDefault { column_name })
901                    })
902                    .collect::<Result<Vec<_>>>()?,
903            })
904        }
905        AlterTableOperation::SetDefaults { defaults } => AlterTableKind::SetDefaults(SetDefaults {
906            set_defaults: defaults
907                .into_iter()
908                .map(|col| {
909                    let column_name = col.column_name.to_string();
910                    let default_constraint = serde_json::to_string(&col.default_constraint)
911                        .context(EncodeJsonSnafu)?
912                        .into_bytes();
913                    Ok(api::v1::SetDefault {
914                        column_name,
915                        default_constraint,
916                    })
917                })
918                .collect::<Result<Vec<_>>>()?,
919        }),
920    };
921
922    Ok(AlterTableExpr {
923        catalog_name,
924        schema_name,
925        table_name,
926        kind: Some(kind),
927    })
928}
929
930/// Try to cast the `[AlterDatabase]` statement into gRPC `[AlterDatabaseExpr]`.
931pub fn to_alter_database_expr(
932    alter_database: AlterDatabase,
933    query_ctx: &QueryContextRef,
934) -> Result<AlterDatabaseExpr> {
935    let catalog = query_ctx.current_catalog();
936    let schema = alter_database.database_name;
937
938    let kind = match alter_database.alter_operation {
939        AlterDatabaseOperation::SetDatabaseOption { options } => {
940            let options = options.into_iter().map(Into::into).collect();
941            AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
942                set_database_options: options,
943            })
944        }
945        AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
946            AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys })
947        }
948    };
949
950    Ok(AlterDatabaseExpr {
951        catalog_name: catalog.to_string(),
952        schema_name: schema.to_string(),
953        kind: Some(kind),
954    })
955}
956
957/// Try to cast the `[CreateViewExpr]` statement into gRPC `[CreateViewExpr]`.
958pub fn to_create_view_expr(
959    stmt: CreateView,
960    logical_plan: Vec<u8>,
961    table_names: Vec<TableName>,
962    columns: Vec<String>,
963    plan_columns: Vec<String>,
964    definition: String,
965    query_ctx: QueryContextRef,
966) -> Result<CreateViewExpr> {
967    let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx)
968        .map_err(BoxedError::new)
969        .context(ExternalSnafu)?;
970
971    let expr = CreateViewExpr {
972        catalog_name,
973        schema_name,
974        view_name,
975        logical_plan,
976        create_if_not_exists: stmt.if_not_exists,
977        or_replace: stmt.or_replace,
978        table_names,
979        columns,
980        plan_columns,
981        definition,
982    };
983
984    Ok(expr)
985}
986
987pub fn to_create_flow_task_expr(
988    create_flow: CreateFlow,
989    query_ctx: &QueryContextRef,
990) -> Result<CreateFlowExpr> {
991    // retrieve sink table name
992    let sink_table_ref = object_name_to_table_reference(create_flow.sink_table_name.clone(), true)
993        .with_context(|_| ConvertIdentifierSnafu {
994            ident: create_flow.sink_table_name.to_string(),
995        })?;
996    let catalog = sink_table_ref
997        .catalog()
998        .unwrap_or(query_ctx.current_catalog())
999        .to_string();
1000    let schema = sink_table_ref
1001        .schema()
1002        .map(|s| s.to_owned())
1003        .unwrap_or(query_ctx.current_schema());
1004
1005    let sink_table_name = TableName {
1006        catalog_name: catalog,
1007        schema_name: schema,
1008        table_name: sink_table_ref.table().to_string(),
1009    };
1010
1011    let source_table_names = extract_tables_from_query(&create_flow.query)
1012        .map(|name| {
1013            let reference =
1014                object_name_to_table_reference(name.clone(), true).with_context(|_| {
1015                    ConvertIdentifierSnafu {
1016                        ident: name.to_string(),
1017                    }
1018                })?;
1019            let catalog = reference
1020                .catalog()
1021                .unwrap_or(query_ctx.current_catalog())
1022                .to_string();
1023            let schema = reference
1024                .schema()
1025                .map(|s| s.to_string())
1026                .unwrap_or(query_ctx.current_schema());
1027
1028            let table_name = TableName {
1029                catalog_name: catalog,
1030                schema_name: schema,
1031                table_name: reference.table().to_string(),
1032            };
1033            Ok(table_name)
1034        })
1035        .collect::<Result<Vec<_>>>()?;
1036
1037    let eval_interval = create_flow.eval_interval;
1038
1039    Ok(CreateFlowExpr {
1040        catalog_name: query_ctx.current_catalog().to_string(),
1041        flow_name: sanitize_flow_name(create_flow.flow_name)?,
1042        source_table_names,
1043        sink_table_name: Some(sink_table_name),
1044        or_replace: create_flow.or_replace,
1045        create_if_not_exists: create_flow.if_not_exists,
1046        expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
1047        eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }),
1048        comment: create_flow.comment.unwrap_or_default(),
1049        sql: create_flow.query.to_string(),
1050        flow_options: Default::default(),
1051    })
1052}
1053
1054/// sanitize the flow name, remove possible quotes
1055fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
1056    ensure!(
1057        flow_name.0.len() == 1,
1058        InvalidFlowNameSnafu {
1059            name: flow_name.to_string(),
1060        }
1061    );
1062    // safety: we've checked flow_name.0 has exactly one element.
1063    Ok(flow_name.0.swap_remove(0).to_string_unquoted())
1064}
1065
1066#[cfg(test)]
1067mod tests {
1068    use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
1069    use datatypes::value::Value;
1070    use session::context::{QueryContext, QueryContextBuilder};
1071    use sql::dialect::GreptimeDbDialect;
1072    use sql::parser::{ParseOptions, ParserContext};
1073    use sql::statements::statement::Statement;
1074    use store_api::storage::ColumnDefaultConstraint;
1075
1076    use super::*;
1077
1078    #[test]
1079    fn test_create_flow_tql_expr() {
1080        let sql = r#"
1081CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1082TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#;
1083        let stmt =
1084            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1085
1086        assert!(
1087            stmt.is_err(),
1088            "Expected error for invalid TQL EVAL parameters: {:#?}",
1089            stmt
1090        );
1091
1092        let sql = r#"
1093CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1094TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests);"#;
1095        let stmt =
1096            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1097                .unwrap()
1098                .pop()
1099                .unwrap();
1100
1101        let Statement::CreateFlow(create_flow) = stmt else {
1102            unreachable!()
1103        };
1104        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1105
1106        let to_dot_sep =
1107            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1108        assert_eq!("calc_reqs", expr.flow_name);
1109        assert_eq!("greptime", expr.catalog_name);
1110        assert_eq!(
1111            "greptime.public.cnt_reqs",
1112            expr.sink_table_name.map(to_dot_sep).unwrap()
1113        );
1114        assert_eq!(1, expr.source_table_names.len());
1115        assert_eq!(
1116            "greptime.public.http_requests",
1117            to_dot_sep(expr.source_table_names[0].clone())
1118        );
1119        assert_eq!(
1120            r#"TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests)"#,
1121            expr.sql
1122        );
1123
1124        let sql = r#"
1125CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1126TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests{__schema__="greptime_private"});"#;
1127        let stmt =
1128            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1129                .unwrap()
1130                .pop()
1131                .unwrap();
1132        let Statement::CreateFlow(create_flow) = stmt else {
1133            unreachable!()
1134        };
1135        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1136        assert_eq!(1, expr.source_table_names.len());
1137        assert_eq!(
1138            "greptime.greptime_private.http_requests",
1139            to_dot_sep(expr.source_table_names[0].clone())
1140        );
1141
1142        let sql = r#"
1143CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1144TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests{__database__="greptime_private"});"#;
1145        let stmt =
1146            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1147                .unwrap()
1148                .pop()
1149                .unwrap();
1150        let Statement::CreateFlow(create_flow) = stmt else {
1151            unreachable!()
1152        };
1153        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1154        assert_eq!(1, expr.source_table_names.len());
1155        assert_eq!(
1156            "greptime.greptime_private.http_requests",
1157            to_dot_sep(expr.source_table_names[0].clone())
1158        );
1159    }
1160
1161    #[test]
1162    fn test_create_flow_tql_cte_source_tables() {
1163        let sql = r#"
1164CREATE FLOW calc_cte
1165SINK TO metric_cte_sink
1166EVAL INTERVAL '1m'
1167AS
1168WITH tql(ts, the_value) AS (
1169  TQL EVAL (now() - '1m'::interval, now(), '5s') metric_cte
1170)
1171SELECT * FROM tql;
1172"#;
1173
1174        let stmt =
1175            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1176                .unwrap()
1177                .pop()
1178                .unwrap();
1179
1180        let Statement::CreateFlow(create_flow) = stmt else {
1181            unreachable!()
1182        };
1183        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1184
1185        let to_dot_sep =
1186            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1187        assert_eq!(1, expr.source_table_names.len());
1188        assert_eq!(
1189            "greptime.public.metric_cte",
1190            to_dot_sep(expr.source_table_names[0].clone())
1191        );
1192    }
1193
1194    #[test]
1195    fn test_create_flow_tql_cte_source_tables_quoted_cte_name() {
1196        let sql = r#"
1197CREATE FLOW calc_cte
1198SINK TO metric_cte_sink
1199EVAL INTERVAL '1m'
1200AS
1201WITH "TQL"(ts, the_value) AS (
1202  TQL EVAL (now() - '1m'::interval, now(), '5s') metric_cte
1203)
1204SELECT * FROM "TQL";
1205"#;
1206
1207        let stmt =
1208            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1209                .unwrap()
1210                .pop()
1211                .unwrap();
1212
1213        let Statement::CreateFlow(create_flow) = stmt else {
1214            unreachable!()
1215        };
1216        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1217
1218        let to_dot_sep =
1219            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1220        assert_eq!(1, expr.source_table_names.len());
1221        assert_eq!(
1222            "greptime.public.metric_cte",
1223            to_dot_sep(expr.source_table_names[0].clone())
1224        );
1225    }
1226
1227    #[test]
1228    fn test_create_flow_tql_cte_source_tables_same_name() {
1229        let sql = r#"
1230CREATE FLOW calc_cte
1231SINK TO metric_cte_sink
1232EVAL INTERVAL '1m'
1233AS
1234WITH tql(ts, the_value) AS (
1235  TQL EVAL (now() - '1m'::interval, now(), '5s') tql
1236)
1237SELECT * FROM tql;
1238"#;
1239
1240        let stmt =
1241            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1242                .unwrap()
1243                .pop()
1244                .unwrap();
1245
1246        let Statement::CreateFlow(create_flow) = stmt else {
1247            unreachable!()
1248        };
1249        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1250
1251        let to_dot_sep =
1252            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1253        assert_eq!(1, expr.source_table_names.len());
1254        assert_eq!(
1255            "greptime.public.tql",
1256            to_dot_sep(expr.source_table_names[0].clone())
1257        );
1258    }
1259
1260    #[test]
1261    fn test_create_flow_expr() {
1262        let sql = r"
1263CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
1264SELECT
1265    DISTINCT number as dis
1266FROM
1267    distinct_basic;";
1268        let stmt =
1269            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1270                .unwrap()
1271                .pop()
1272                .unwrap();
1273
1274        let Statement::CreateFlow(create_flow) = stmt else {
1275            unreachable!()
1276        };
1277        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1278
1279        let to_dot_sep =
1280            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1281        assert_eq!("test_distinct_basic", expr.flow_name);
1282        assert_eq!("greptime", expr.catalog_name);
1283        assert_eq!(
1284            "greptime.public.out_distinct_basic",
1285            expr.sink_table_name.map(to_dot_sep).unwrap()
1286        );
1287        assert_eq!(1, expr.source_table_names.len());
1288        assert_eq!(
1289            "greptime.public.distinct_basic",
1290            to_dot_sep(expr.source_table_names[0].clone())
1291        );
1292        assert_eq!(
1293            r"SELECT
1294    DISTINCT number as dis
1295FROM
1296    distinct_basic",
1297            expr.sql
1298        );
1299
1300        let sql = r"
1301CREATE FLOW `task_2`
1302SINK TO schema_1.table_1
1303AS
1304SELECT max(c1), min(c2) FROM schema_2.table_2;";
1305        let stmt =
1306            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1307                .unwrap()
1308                .pop()
1309                .unwrap();
1310
1311        let Statement::CreateFlow(create_flow) = stmt else {
1312            unreachable!()
1313        };
1314        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1315
1316        let to_dot_sep =
1317            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1318        assert_eq!("task_2", expr.flow_name);
1319        assert_eq!("greptime", expr.catalog_name);
1320        assert_eq!(
1321            "greptime.schema_1.table_1",
1322            expr.sink_table_name.map(to_dot_sep).unwrap()
1323        );
1324        assert_eq!(1, expr.source_table_names.len());
1325        assert_eq!(
1326            "greptime.schema_2.table_2",
1327            to_dot_sep(expr.source_table_names[0].clone())
1328        );
1329        assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
1330
1331        let sql = r"
1332CREATE FLOW abc.`task_2`
1333SINK TO schema_1.table_1
1334AS
1335SELECT max(c1), min(c2) FROM schema_2.table_2;";
1336        let stmt =
1337            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1338                .unwrap()
1339                .pop()
1340                .unwrap();
1341
1342        let Statement::CreateFlow(create_flow) = stmt else {
1343            unreachable!()
1344        };
1345        let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
1346
1347        assert!(res.is_err());
1348        assert!(
1349            res.unwrap_err()
1350                .to_string()
1351                .contains("Invalid flow name: abc.`task_2`")
1352        );
1353    }
1354
1355    #[test]
1356    fn test_create_to_expr() {
1357        let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(ttl='3days', write_buffer_size='1024KB');";
1358        let stmt =
1359            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1360                .unwrap()
1361                .pop()
1362                .unwrap();
1363
1364        let Statement::CreateTable(create_table) = stmt else {
1365            unreachable!()
1366        };
1367        let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1368        assert_eq!("3days", expr.table_options.get("ttl").unwrap());
1369        assert_eq!(
1370            "1.0MiB",
1371            expr.table_options.get("write_buffer_size").unwrap()
1372        );
1373    }
1374
1375    #[test]
1376    fn test_invalid_create_to_expr() {
1377        let cases = [
1378            // duplicate column declaration
1379            "CREATE TABLE monitor (host STRING primary key, ts TIMESTAMP TIME INDEX, some_column text, some_column string);",
1380            // duplicate primary key
1381            "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, some_column STRING, PRIMARY KEY (some_column, host, some_column));",
1382            // time index is primary key
1383            "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, PRIMARY KEY (host, ts));",
1384        ];
1385
1386        for sql in cases {
1387            let stmt = ParserContext::create_with_dialect(
1388                sql,
1389                &GreptimeDbDialect {},
1390                ParseOptions::default(),
1391            )
1392            .unwrap()
1393            .pop()
1394            .unwrap();
1395            let Statement::CreateTable(create_table) = stmt else {
1396                unreachable!()
1397            };
1398            create_to_expr(&create_table, &QueryContext::arc()).unwrap_err();
1399        }
1400    }
1401
1402    #[test]
1403    fn test_create_to_expr_with_default_timestamp_value() {
1404        let sql = "CREATE TABLE monitor (v double,ts TIMESTAMP default '2024-01-30T00:01:01',TIME INDEX (ts)) engine=mito;";
1405        let stmt =
1406            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1407                .unwrap()
1408                .pop()
1409                .unwrap();
1410
1411        let Statement::CreateTable(create_table) = stmt else {
1412            unreachable!()
1413        };
1414
1415        // query context with system timezone UTC.
1416        let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1417        let ts_column = &expr.column_defs[1];
1418        let constraint = assert_ts_column(ts_column);
1419        assert!(
1420            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1421                         if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1422        );
1423
1424        // query context with timezone `+08:00`
1425        let ctx = QueryContextBuilder::default()
1426            .timezone(Timezone::from_tz_string("+08:00").unwrap())
1427            .build()
1428            .into();
1429        let expr = create_to_expr(&create_table, &ctx).unwrap();
1430        let ts_column = &expr.column_defs[1];
1431        let constraint = assert_ts_column(ts_column);
1432        assert!(
1433            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1434                         if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1435        );
1436    }
1437
1438    fn assert_ts_column(ts_column: &api::v1::ColumnDef) -> ColumnDefaultConstraint {
1439        assert_eq!("ts", ts_column.name);
1440        assert_eq!(
1441            ColumnDataType::TimestampMillisecond as i32,
1442            ts_column.data_type
1443        );
1444        assert!(!ts_column.default_constraint.is_empty());
1445
1446        ColumnDefaultConstraint::try_from(&ts_column.default_constraint[..]).unwrap()
1447    }
1448
1449    #[test]
1450    fn test_to_alter_expr() {
1451        let sql = "ALTER DATABASE greptime SET key1='value1', key2='value2';";
1452        let stmt =
1453            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1454                .unwrap()
1455                .pop()
1456                .unwrap();
1457
1458        let Statement::AlterDatabase(alter_database) = stmt else {
1459            unreachable!()
1460        };
1461
1462        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1463        let kind = expr.kind.unwrap();
1464
1465        let AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
1466            set_database_options,
1467        }) = kind
1468        else {
1469            unreachable!()
1470        };
1471
1472        assert_eq!(2, set_database_options.len());
1473        assert_eq!("key1", set_database_options[0].key);
1474        assert_eq!("value1", set_database_options[0].value);
1475        assert_eq!("key2", set_database_options[1].key);
1476        assert_eq!("value2", set_database_options[1].value);
1477
1478        let sql = "ALTER DATABASE greptime UNSET key1, key2;";
1479        let stmt =
1480            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1481                .unwrap()
1482                .pop()
1483                .unwrap();
1484
1485        let Statement::AlterDatabase(alter_database) = stmt else {
1486            unreachable!()
1487        };
1488
1489        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1490        let kind = expr.kind.unwrap();
1491
1492        let AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys }) = kind else {
1493            unreachable!()
1494        };
1495
1496        assert_eq!(2, keys.len());
1497        assert!(keys.contains(&"key1".to_string()));
1498        assert!(keys.contains(&"key2".to_string()));
1499
1500        let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
1501        let stmt =
1502            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1503                .unwrap()
1504                .pop()
1505                .unwrap();
1506
1507        let Statement::AlterTable(alter_table) = stmt else {
1508            unreachable!()
1509        };
1510
1511        // query context with system timezone UTC.
1512        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1513        let kind = expr.kind.unwrap();
1514
1515        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1516            unreachable!()
1517        };
1518
1519        assert_eq!(1, add_columns.len());
1520        let ts_column = add_columns[0].column_def.clone().unwrap();
1521        let constraint = assert_ts_column(&ts_column);
1522        assert!(
1523            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1524                         if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1525        );
1526
1527        //
1528        // query context with timezone `+08:00`
1529        let ctx = QueryContextBuilder::default()
1530            .timezone(Timezone::from_tz_string("+08:00").unwrap())
1531            .build()
1532            .into();
1533        let expr = to_alter_table_expr(alter_table, &ctx).unwrap();
1534        let kind = expr.kind.unwrap();
1535
1536        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1537            unreachable!()
1538        };
1539
1540        assert_eq!(1, add_columns.len());
1541        let ts_column = add_columns[0].column_def.clone().unwrap();
1542        let constraint = assert_ts_column(&ts_column);
1543        assert!(
1544            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1545                         if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1546        );
1547    }
1548
1549    #[test]
1550    fn test_to_alter_modify_column_type_expr() {
1551        let sql = "ALTER TABLE monitor MODIFY COLUMN mem_usage STRING;";
1552        let stmt =
1553            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1554                .unwrap()
1555                .pop()
1556                .unwrap();
1557
1558        let Statement::AlterTable(alter_table) = stmt else {
1559            unreachable!()
1560        };
1561
1562        // query context with system timezone UTC.
1563        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1564        let kind = expr.kind.unwrap();
1565
1566        let AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
1567            modify_column_types,
1568        }) = kind
1569        else {
1570            unreachable!()
1571        };
1572
1573        assert_eq!(1, modify_column_types.len());
1574        let modify_column_type = &modify_column_types[0];
1575
1576        assert_eq!("mem_usage", modify_column_type.column_name);
1577        assert_eq!(
1578            ColumnDataType::String as i32,
1579            modify_column_type.target_type
1580        );
1581        assert!(modify_column_type.target_type_extension.is_none());
1582    }
1583
1584    #[test]
1585    fn test_to_repartition_request() {
1586        let sql = r#"
1587ALTER TABLE metrics REPARTITION (
1588  device_id < 100
1589) INTO (
1590  device_id < 100 AND area < 'South',
1591  device_id < 100 AND area >= 'South'
1592);"#;
1593        let stmt =
1594            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1595                .unwrap()
1596                .pop()
1597                .unwrap();
1598
1599        let Statement::AlterTable(alter_table) = stmt else {
1600            unreachable!()
1601        };
1602
1603        let request = to_repartition_request(alter_table, &QueryContext::arc()).unwrap();
1604        assert_eq!("greptime", request.catalog_name);
1605        assert_eq!("public", request.schema_name);
1606        assert_eq!("metrics", request.table_name);
1607        assert_eq!(
1608            request
1609                .from_exprs
1610                .into_iter()
1611                .map(|x| x.to_string())
1612                .collect::<Vec<_>>(),
1613            vec!["device_id < 100".to_string()]
1614        );
1615        assert_eq!(
1616            request
1617                .into_exprs
1618                .into_iter()
1619                .map(|x| x.to_string())
1620                .collect::<Vec<_>>(),
1621            vec![
1622                "device_id < 100 AND area < 'South'".to_string(),
1623                "device_id < 100 AND area >= 'South'".to_string()
1624            ]
1625        );
1626    }
1627
1628    fn new_test_table_names() -> Vec<TableName> {
1629        vec![
1630            TableName {
1631                catalog_name: "greptime".to_string(),
1632                schema_name: "public".to_string(),
1633                table_name: "a_table".to_string(),
1634            },
1635            TableName {
1636                catalog_name: "greptime".to_string(),
1637                schema_name: "public".to_string(),
1638                table_name: "b_table".to_string(),
1639            },
1640        ]
1641    }
1642
1643    #[test]
1644    fn test_to_create_view_expr() {
1645        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
1646        let stmt =
1647            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1648                .unwrap()
1649                .pop()
1650                .unwrap();
1651
1652        let Statement::CreateView(stmt) = stmt else {
1653            unreachable!()
1654        };
1655
1656        let logical_plan = vec![1, 2, 3];
1657        let table_names = new_test_table_names();
1658        let columns = vec!["a".to_string()];
1659        let plan_columns = vec!["number".to_string()];
1660
1661        let expr = to_create_view_expr(
1662            stmt,
1663            logical_plan.clone(),
1664            table_names.clone(),
1665            columns.clone(),
1666            plan_columns.clone(),
1667            sql.to_string(),
1668            QueryContext::arc(),
1669        )
1670        .unwrap();
1671
1672        assert_eq!("greptime", expr.catalog_name);
1673        assert_eq!("public", expr.schema_name);
1674        assert_eq!("test", expr.view_name);
1675        assert!(!expr.create_if_not_exists);
1676        assert!(!expr.or_replace);
1677        assert_eq!(logical_plan, expr.logical_plan);
1678        assert_eq!(table_names, expr.table_names);
1679        assert_eq!(sql, expr.definition);
1680        assert_eq!(columns, expr.columns);
1681        assert_eq!(plan_columns, expr.plan_columns);
1682    }
1683
1684    #[test]
1685    fn test_to_create_view_expr_complex() {
1686        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test.test_view AS SELECT * FROM NUMBERS";
1687        let stmt =
1688            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1689                .unwrap()
1690                .pop()
1691                .unwrap();
1692
1693        let Statement::CreateView(stmt) = stmt else {
1694            unreachable!()
1695        };
1696
1697        let logical_plan = vec![1, 2, 3];
1698        let table_names = new_test_table_names();
1699        let columns = vec!["a".to_string()];
1700        let plan_columns = vec!["number".to_string()];
1701
1702        let expr = to_create_view_expr(
1703            stmt,
1704            logical_plan.clone(),
1705            table_names.clone(),
1706            columns.clone(),
1707            plan_columns.clone(),
1708            sql.to_string(),
1709            QueryContext::arc(),
1710        )
1711        .unwrap();
1712
1713        assert_eq!("greptime", expr.catalog_name);
1714        assert_eq!("test", expr.schema_name);
1715        assert_eq!("test_view", expr.view_name);
1716        assert!(expr.create_if_not_exists);
1717        assert!(expr.or_replace);
1718        assert_eq!(logical_plan, expr.logical_plan);
1719        assert_eq!(table_names, expr.table_names);
1720        assert_eq!(sql, expr.definition);
1721        assert_eq!(columns, expr.columns);
1722        assert_eq!(plan_columns, expr.plan_columns);
1723    }
1724
1725    #[test]
1726    fn test_expr_to_create() {
1727        let sql = r#"CREATE TABLE IF NOT EXISTS `tt` (
1728  `timestamp` TIMESTAMP(9) NOT NULL,
1729  `ip_address` STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),
1730  `username` STRING NULL,
1731  `http_method` STRING NULL INVERTED INDEX,
1732  `request_line` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),
1733  `protocol` STRING NULL,
1734  `status_code` INT NULL INVERTED INDEX,
1735  `response_size` BIGINT NULL,
1736  `message` STRING NULL,
1737  TIME INDEX (`timestamp`),
1738  PRIMARY KEY (`username`, `status_code`)
1739)
1740ENGINE=mito
1741WITH(
1742  append_mode = 'true'
1743)"#;
1744        let stmt =
1745            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1746                .unwrap()
1747                .pop()
1748                .unwrap();
1749
1750        let Statement::CreateTable(original_create) = stmt else {
1751            unreachable!()
1752        };
1753
1754        // Convert CreateTable -> CreateTableExpr -> CreateTable
1755        let expr = create_to_expr(&original_create, &QueryContext::arc()).unwrap();
1756
1757        let create_table = expr_to_create(&expr, Some('`')).unwrap();
1758        let new_sql = format!("{:#}", create_table);
1759        assert_eq!(sql, new_sql);
1760    }
1761}