Skip to main content

operator/
expr_helper.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19
20use api::helper::ColumnDataTypeWrapper;
21use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
22use api::v1::alter_table_expr::Kind as AlterTableKind;
23use api::v1::column_def::{options_from_column_schema, try_as_column_schema};
24use api::v1::{
25    AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer, ColumnDataType,
26    ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn,
27    DropColumns, DropDefaults, ExpireAfter, FulltextBackend as PbFulltextBackend, ModifyColumnType,
28    ModifyColumnTypes, RenameTable, SemanticType, SetDatabaseOptions, SetDefaults, SetFulltext,
29    SetIndex, SetIndexes, SetInverted, SetSkipping, SetTableOptions,
30    SkippingIndexType as PbSkippingIndexType, TableName, UnsetDatabaseOptions, UnsetFulltext,
31    UnsetIndex, UnsetIndexes, UnsetInverted, UnsetSkipping, UnsetTableOptions, set_index,
32    unset_index,
33};
34use common_error::ext::BoxedError;
35use common_grpc_expr::util::ColumnExpr;
36use common_time::Timezone;
37use datafusion::sql::planner::object_name_to_table_reference;
38use datatypes::schema::{
39    COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
40    COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
41    COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
42    COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
43    ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema,
44    SkippingIndexType,
45};
46use file_engine::FileOptions;
47use query::sql::{
48    check_file_to_table_schema_compatibility, file_column_schemas_to_table,
49    infer_file_table_schema, prepare_file_table_files,
50};
51use session::context::QueryContextRef;
52use session::table_name::table_idents_to_full_name;
53use snafu::{OptionExt, ResultExt, ensure};
54use sql::ast::{
55    ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName, ObjectNamePartExt,
56};
57use sql::dialect::GreptimeDbDialect;
58use sql::parser::ParserContext;
59use sql::statements::alter::{
60    AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
61};
62use sql::statements::create::{
63    Column as SqlColumn, ColumnExtensions, CreateExternalTable, CreateFlow, CreateTable,
64    CreateView, TableConstraint,
65};
66use sql::statements::{
67    OptionMap, column_to_schema, concrete_data_type_to_sql_data_type,
68    sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type, value_to_sql_value,
69};
70use sql::util::extract_tables_from_query;
71use store_api::mito_engine_options::{COMPACTION_OVERRIDE, COMPACTION_TYPE};
72use table::requests::{FILE_TABLE_META_KEY, TableOptions};
73use table::table_reference::TableReference;
74#[cfg(feature = "enterprise")]
75pub use trigger::to_create_trigger_task_expr;
76
77use crate::error::{
78    BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
79    ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, FindNewColumnsOnInsertionSnafu,
80    IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidColumnDefSnafu,
81    InvalidFlowNameSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, ParseSqlValueSnafu,
82    PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu,
83};
84
85pub fn create_table_expr_by_column_schemas(
86    table_name: &TableReference<'_>,
87    column_schemas: &[api::v1::ColumnSchema],
88    engine: &str,
89    desc: Option<&str>,
90) -> Result<CreateTableExpr> {
91    let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
92    let expr = common_grpc_expr::util::build_create_table_expr(
93        None,
94        table_name,
95        column_exprs,
96        engine,
97        desc.unwrap_or("Created on insertion"),
98    )
99    .context(BuildCreateExprOnInsertionSnafu)?;
100
101    validate_create_expr(&expr)?;
102    Ok(expr)
103}
104
105pub fn extract_add_columns_expr(
106    schema: &Schema,
107    column_exprs: Vec<ColumnExpr>,
108) -> Result<Option<AddColumns>> {
109    let add_columns = common_grpc_expr::util::extract_new_columns(schema, column_exprs)
110        .context(FindNewColumnsOnInsertionSnafu)?;
111    if let Some(add_columns) = &add_columns {
112        validate_add_columns_expr(add_columns)?;
113    }
114    Ok(add_columns)
115}
116
117//   cpu float64,
118//   memory float64,
119//   TIME INDEX (ts),
120//   PRIMARY KEY(host)
121// ) WITH (location='/var/data/city.csv', format='csv');
122// ```
123// The user needs to specify the TIME INDEX column. If there is no suitable
124// column in the file to use as TIME INDEX, an additional placeholder column
125// needs to be created as the TIME INDEX, and a `DEFAULT <value>` constraint
126// should be added.
127//
128//
129// When the `CREATE EXTERNAL TABLE` statement is in inferred form, like
130// ```sql
131// CREATE EXTERNAL TABLE IF NOT EXISTS city WITH (location='/var/data/city.csv',format='csv');
132// ```
133// 1. If the TIME INDEX column can be inferred from metadata, use that column
134//    as the TIME INDEX. Otherwise,
135// 2. If a column named `greptime_timestamp` exists (with the requirement that
136//    the column is with type TIMESTAMP, otherwise an error is thrown), use
137//    that column as the TIME INDEX. Otherwise,
138// 3. Automatically create the `greptime_timestamp` column and add a `DEFAULT 0`
139//    constraint.
140pub(crate) async fn create_external_expr(
141    create: CreateExternalTable,
142    query_ctx: &QueryContextRef,
143) -> Result<CreateTableExpr> {
144    let (catalog_name, schema_name, table_name) =
145        table_idents_to_full_name(&create.name, query_ctx)
146            .map_err(BoxedError::new)
147            .context(ExternalSnafu)?;
148
149    let mut table_options = create.options.into_map();
150
151    let (object_store, files) = prepare_file_table_files(&table_options)
152        .await
153        .context(PrepareFileTableSnafu)?;
154
155    let file_column_schemas = infer_file_table_schema(&object_store, &files, &table_options)
156        .await
157        .context(InferFileTableSchemaSnafu)?
158        .column_schemas()
159        .to_vec();
160
161    let (time_index, primary_keys, table_column_schemas) = if !create.columns.is_empty() {
162        // expanded form
163        let time_index = find_time_index(&create.constraints)?;
164        let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
165        let column_schemas =
166            columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
167        (time_index, primary_keys, column_schemas)
168    } else {
169        // inferred form
170        let (column_schemas, time_index) = file_column_schemas_to_table(&file_column_schemas);
171        let primary_keys = vec![];
172        (time_index, primary_keys, column_schemas)
173    };
174
175    check_file_to_table_schema_compatibility(&file_column_schemas, &table_column_schemas)
176        .context(SchemaIncompatibleSnafu)?;
177
178    let meta = FileOptions {
179        files,
180        file_column_schemas,
181    };
182    table_options.insert(
183        FILE_TABLE_META_KEY.to_string(),
184        serde_json::to_string(&meta).context(EncodeJsonSnafu)?,
185    );
186
187    let column_defs = column_schemas_to_defs(table_column_schemas, &primary_keys)?;
188    let expr = CreateTableExpr {
189        catalog_name,
190        schema_name,
191        table_name,
192        desc: String::default(),
193        column_defs,
194        time_index,
195        primary_keys,
196        create_if_not_exists: create.if_not_exists,
197        table_options,
198        table_id: None,
199        engine: create.engine.clone(),
200    };
201
202    Ok(expr)
203}
204
205/// Convert `CreateTable` statement to [`CreateTableExpr`] gRPC request.
206pub fn create_to_expr(
207    create: &CreateTable,
208    query_ctx: &QueryContextRef,
209) -> Result<CreateTableExpr> {
210    let (catalog_name, schema_name, table_name) =
211        table_idents_to_full_name(&create.name, query_ctx)
212            .map_err(BoxedError::new)
213            .context(ExternalSnafu)?;
214
215    let time_index = find_time_index(&create.constraints)?;
216    let table_options = HashMap::from(
217        &TableOptions::try_from_iter(create.options.to_str_map())
218            .context(UnrecognizedTableOptionSnafu)?,
219    );
220
221    let mut table_options = table_options;
222    if table_options.contains_key(COMPACTION_TYPE) {
223        table_options.insert(COMPACTION_OVERRIDE.to_string(), "true".to_string());
224    }
225
226    let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
227
228    let expr = CreateTableExpr {
229        catalog_name,
230        schema_name,
231        table_name,
232        desc: String::default(),
233        column_defs: columns_to_expr(
234            &create.columns,
235            &time_index,
236            &primary_keys,
237            Some(&query_ctx.timezone()),
238        )?,
239        time_index,
240        primary_keys,
241        create_if_not_exists: create.if_not_exists,
242        table_options,
243        table_id: None,
244        engine: create.engine.clone(),
245    };
246
247    validate_create_expr(&expr)?;
248    Ok(expr)
249}
250
251/// Convert gRPC's [`CreateTableExpr`] back to `CreateTable` statement.
252/// You can use `create_table_expr_by_column_schemas` to create a `CreateTableExpr` from column schemas.
253///
254/// # Parameters
255///
256/// * `expr` - The `CreateTableExpr` to convert
257/// * `quote_style` - Optional quote style for identifiers (defaults to MySQL style ` backtick)
258pub fn expr_to_create(expr: &CreateTableExpr, quote_style: Option<char>) -> Result<CreateTable> {
259    let quote_style = quote_style.unwrap_or('`');
260
261    // Convert table name
262    let table_name = ObjectName(vec![sql::ast::ObjectNamePart::Identifier(
263        sql::ast::Ident::with_quote(quote_style, &expr.table_name),
264    )]);
265
266    // Convert columns
267    let mut columns = Vec::with_capacity(expr.column_defs.len());
268    for column_def in &expr.column_defs {
269        let column_schema = try_as_column_schema(column_def).context(InvalidColumnDefSnafu {
270            column: &column_def.name,
271        })?;
272
273        let mut options = Vec::new();
274
275        // Add NULL/NOT NULL constraint
276        if column_def.is_nullable {
277            options.push(ColumnOptionDef {
278                name: None,
279                option: ColumnOption::Null,
280            });
281        } else {
282            options.push(ColumnOptionDef {
283                name: None,
284                option: ColumnOption::NotNull,
285            });
286        }
287
288        // Add DEFAULT constraint if present
289        if let Some(default_constraint) = column_schema.default_constraint() {
290            let expr = match default_constraint {
291                ColumnDefaultConstraint::Value(v) => {
292                    Expr::Value(value_to_sql_value(v).context(ParseSqlValueSnafu)?.into())
293                }
294                ColumnDefaultConstraint::Function(func_expr) => {
295                    ParserContext::parse_function(func_expr, &GreptimeDbDialect {})
296                        .context(ParseSqlSnafu)?
297                }
298            };
299            options.push(ColumnOptionDef {
300                name: None,
301                option: ColumnOption::Default(expr),
302            });
303        }
304
305        // Add COMMENT if present
306        if !column_def.comment.is_empty() {
307            options.push(ColumnOptionDef {
308                name: None,
309                option: ColumnOption::Comment(column_def.comment.clone()),
310            });
311        }
312
313        // Note: We don't add inline PRIMARY KEY options here,
314        // we'll handle all primary keys as constraints instead for consistency
315
316        // Handle column extensions (fulltext, inverted index, skipping index)
317        let mut extensions = ColumnExtensions::default();
318
319        // Add fulltext index options if present
320        if let Ok(Some(opt)) = column_schema.fulltext_options()
321            && opt.enable
322        {
323            let mut map = HashMap::from([
324                (
325                    COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
326                    opt.analyzer.to_string(),
327                ),
328                (
329                    COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
330                    opt.case_sensitive.to_string(),
331                ),
332                (
333                    COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
334                    opt.backend.to_string(),
335                ),
336            ]);
337            if opt.backend == FulltextBackend::Bloom {
338                map.insert(
339                    COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(),
340                    opt.granularity.to_string(),
341                );
342                map.insert(
343                    COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
344                    opt.false_positive_rate().to_string(),
345                );
346            }
347            extensions.fulltext_index_options = Some(map.into());
348        }
349
350        // Add skipping index options if present
351        if let Ok(Some(opt)) = column_schema.skipping_index_options() {
352            let map = HashMap::from([
353                (
354                    COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
355                    opt.granularity.to_string(),
356                ),
357                (
358                    COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
359                    opt.false_positive_rate().to_string(),
360                ),
361                (
362                    COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
363                    opt.index_type.to_string(),
364                ),
365            ]);
366            extensions.skipping_index_options = Some(map.into());
367        }
368
369        // Add inverted index options if present
370        if column_schema.is_inverted_indexed() {
371            extensions.inverted_index_options = Some(HashMap::new().into());
372        }
373
374        let sql_column = SqlColumn {
375            column_def: ColumnDef {
376                name: Ident::with_quote(quote_style, &column_def.name),
377                data_type: concrete_data_type_to_sql_data_type(&column_schema.data_type)
378                    .context(ParseSqlSnafu)?,
379                options,
380            },
381            extensions,
382        };
383
384        columns.push(sql_column);
385    }
386
387    // Convert constraints
388    let mut constraints = Vec::new();
389
390    // Add TIME INDEX constraint
391    constraints.push(TableConstraint::TimeIndex {
392        column: Ident::with_quote(quote_style, &expr.time_index),
393    });
394
395    // Add PRIMARY KEY constraint (always add as constraint for consistency)
396    if !expr.primary_keys.is_empty() {
397        let primary_key_columns: Vec<Ident> = expr
398            .primary_keys
399            .iter()
400            .map(|pk| Ident::with_quote(quote_style, pk))
401            .collect();
402
403        constraints.push(TableConstraint::PrimaryKey {
404            columns: primary_key_columns,
405        });
406    }
407
408    // Convert table options
409    let mut options = OptionMap::default();
410    for (key, value) in &expr.table_options {
411        options.insert(key.clone(), value.clone());
412    }
413
414    Ok(CreateTable {
415        if_not_exists: expr.create_if_not_exists,
416        table_id: expr.table_id.as_ref().map(|tid| tid.id).unwrap_or(0),
417        name: table_name,
418        columns,
419        engine: expr.engine.clone(),
420        constraints,
421        options,
422        partitions: None,
423    })
424}
425
426/// Validate the [`CreateTableExpr`] request.
427pub fn validate_create_expr(create: &CreateTableExpr) -> Result<()> {
428    // construct column list
429    let mut column_to_indices = HashMap::with_capacity(create.column_defs.len());
430    for (idx, column) in create.column_defs.iter().enumerate() {
431        if let Some(indices) = column_to_indices.get(&column.name) {
432            return InvalidSqlSnafu {
433                err_msg: format!(
434                    "column name `{}` is duplicated at index {} and {}",
435                    column.name, indices, idx
436                ),
437            }
438            .fail();
439        }
440        column_to_indices.insert(&column.name, idx);
441    }
442
443    // verify time_index exists
444    let _ = column_to_indices
445        .get(&create.time_index)
446        .with_context(|| InvalidSqlSnafu {
447            err_msg: format!(
448                "column name `{}` is not found in column list",
449                create.time_index
450            ),
451        })?;
452
453    // verify primary_key exists
454    for pk in &create.primary_keys {
455        let _ = column_to_indices
456            .get(&pk)
457            .with_context(|| InvalidSqlSnafu {
458                err_msg: format!("column name `{}` is not found in column list", pk),
459            })?;
460    }
461
462    // construct primary_key set
463    let mut pk_set = HashSet::new();
464    for pk in &create.primary_keys {
465        if !pk_set.insert(pk) {
466            return InvalidSqlSnafu {
467                err_msg: format!("column name `{}` is duplicated in primary keys", pk),
468            }
469            .fail();
470        }
471    }
472
473    // verify time index is not primary key
474    if pk_set.contains(&create.time_index) {
475        return InvalidSqlSnafu {
476            err_msg: format!(
477                "column name `{}` is both primary key and time index",
478                create.time_index
479            ),
480        }
481        .fail();
482    }
483
484    for column in &create.column_defs {
485        // verify do not contain interval type column issue #3235
486        if is_interval_type(&column.data_type()) {
487            return InvalidSqlSnafu {
488                err_msg: format!(
489                    "column name `{}` is interval type, which is not supported",
490                    column.name
491                ),
492            }
493            .fail();
494        }
495        // verify do not contain datetime type column issue #5489
496        if is_date_time_type(&column.data_type()) {
497            return InvalidSqlSnafu {
498                err_msg: format!(
499                    "column name `{}` is datetime type, which is not supported, please use `timestamp` type instead",
500                    column.name
501                ),
502            }
503            .fail();
504        }
505    }
506    Ok(())
507}
508
509fn validate_add_columns_expr(add_columns: &AddColumns) -> Result<()> {
510    for add_column in &add_columns.add_columns {
511        let Some(column_def) = &add_column.column_def else {
512            continue;
513        };
514        if is_date_time_type(&column_def.data_type()) {
515            return InvalidSqlSnafu {
516                    err_msg: format!("column name `{}` is datetime type, which is not supported, please use `timestamp` type instead", column_def.name),
517                }
518                .fail();
519        }
520        if is_interval_type(&column_def.data_type()) {
521            return InvalidSqlSnafu {
522                err_msg: format!(
523                    "column name `{}` is interval type, which is not supported",
524                    column_def.name
525                ),
526            }
527            .fail();
528        }
529    }
530    Ok(())
531}
532
533fn is_date_time_type(data_type: &ColumnDataType) -> bool {
534    matches!(data_type, ColumnDataType::Datetime)
535}
536
537fn is_interval_type(data_type: &ColumnDataType) -> bool {
538    matches!(
539        data_type,
540        ColumnDataType::IntervalYearMonth
541            | ColumnDataType::IntervalDayTime
542            | ColumnDataType::IntervalMonthDayNano
543    )
544}
545
546fn find_primary_keys(
547    columns: &[SqlColumn],
548    constraints: &[TableConstraint],
549) -> Result<Vec<String>> {
550    let columns_pk = columns
551        .iter()
552        .filter_map(|x| {
553            if x.options()
554                .iter()
555                .any(|o| matches!(o.option, ColumnOption::PrimaryKey(_)))
556            {
557                Some(x.name().value.clone())
558            } else {
559                None
560            }
561        })
562        .collect::<Vec<String>>();
563
564    ensure!(
565        columns_pk.len() <= 1,
566        IllegalPrimaryKeysDefSnafu {
567            msg: "not allowed to inline multiple primary keys in columns options"
568        }
569    );
570
571    let constraints_pk = constraints
572        .iter()
573        .filter_map(|constraint| match constraint {
574            TableConstraint::PrimaryKey { columns, .. } => {
575                Some(columns.iter().map(|ident| ident.value.clone()))
576            }
577            _ => None,
578        })
579        .flatten()
580        .collect::<Vec<String>>();
581
582    ensure!(
583        columns_pk.is_empty() || constraints_pk.is_empty(),
584        IllegalPrimaryKeysDefSnafu {
585            msg: "found definitions of primary keys in multiple places"
586        }
587    );
588
589    let mut primary_keys = Vec::with_capacity(columns_pk.len() + constraints_pk.len());
590    primary_keys.extend(columns_pk);
591    primary_keys.extend(constraints_pk);
592    Ok(primary_keys)
593}
594
595pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
596    let time_index = constraints
597        .iter()
598        .filter_map(|constraint| match constraint {
599            TableConstraint::TimeIndex { column, .. } => Some(&column.value),
600            _ => None,
601        })
602        .collect::<Vec<&String>>();
603    ensure!(
604        time_index.len() == 1,
605        InvalidSqlSnafu {
606            err_msg: "must have one and only one TimeIndex columns",
607        }
608    );
609    Ok(time_index[0].clone())
610}
611
612fn columns_to_expr(
613    column_defs: &[SqlColumn],
614    time_index: &str,
615    primary_keys: &[String],
616    timezone: Option<&Timezone>,
617) -> Result<Vec<api::v1::ColumnDef>> {
618    let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
619    column_schemas_to_defs(column_schemas, primary_keys)
620}
621
622fn columns_to_column_schemas(
623    columns: &[SqlColumn],
624    time_index: &str,
625    timezone: Option<&Timezone>,
626) -> Result<Vec<ColumnSchema>> {
627    columns
628        .iter()
629        .map(|c| column_to_schema(c, time_index, timezone).context(ParseSqlSnafu))
630        .collect::<Result<Vec<ColumnSchema>>>()
631}
632
633// TODO(weny): refactor this function to use `try_as_column_def`
634pub fn column_schemas_to_defs(
635    column_schemas: Vec<ColumnSchema>,
636    primary_keys: &[String],
637) -> Result<Vec<api::v1::ColumnDef>> {
638    let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
639        .iter()
640        .map(|c| {
641            ColumnDataTypeWrapper::try_from(c.data_type.clone())
642                .map(|w| w.to_parts())
643                .context(ColumnDataTypeSnafu)
644        })
645        .collect::<Result<Vec<_>>>()?;
646
647    column_schemas
648        .iter()
649        .zip(column_datatypes)
650        .map(|(schema, datatype)| {
651            let semantic_type = if schema.is_time_index() {
652                SemanticType::Timestamp
653            } else if primary_keys.contains(&schema.name) {
654                SemanticType::Tag
655            } else {
656                SemanticType::Field
657            } as i32;
658            let comment = schema
659                .metadata()
660                .get(COMMENT_KEY)
661                .cloned()
662                .unwrap_or_default();
663
664            Ok(api::v1::ColumnDef {
665                name: schema.name.clone(),
666                data_type: datatype.0 as i32,
667                is_nullable: schema.is_nullable(),
668                default_constraint: match schema.default_constraint() {
669                    None => vec![],
670                    Some(v) => {
671                        v.clone()
672                            .try_into()
673                            .context(ConvertColumnDefaultConstraintSnafu {
674                                column_name: &schema.name,
675                            })?
676                    }
677                },
678                semantic_type,
679                comment,
680                datatype_extension: datatype.1,
681                options: options_from_column_schema(schema),
682            })
683        })
684        .collect()
685}
686
687#[derive(Debug, Clone, PartialEq, Eq)]
688pub struct RepartitionRequest {
689    pub catalog_name: String,
690    pub schema_name: String,
691    pub table_name: String,
692    pub from_exprs: Vec<Expr>,
693    pub into_exprs: Vec<Expr>,
694    pub options: OptionMap,
695}
696
697pub(crate) fn to_repartition_request(
698    alter_table: AlterTable,
699    query_ctx: &QueryContextRef,
700) -> Result<RepartitionRequest> {
701    let AlterTable {
702        table_name,
703        alter_operation,
704        options,
705    } = alter_table;
706
707    let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&table_name, query_ctx)
708        .map_err(BoxedError::new)
709        .context(ExternalSnafu)?;
710
711    let AlterTableOperation::Repartition { operation } = alter_operation else {
712        return InvalidSqlSnafu {
713            err_msg: "expected REPARTITION operation",
714        }
715        .fail();
716    };
717
718    Ok(RepartitionRequest {
719        catalog_name,
720        schema_name,
721        table_name,
722        from_exprs: operation.from_exprs,
723        into_exprs: operation.into_exprs,
724        options,
725    })
726}
727
728/// Converts a SQL alter table statement into a gRPC alter table expression.
729pub(crate) fn to_alter_table_expr(
730    alter_table: AlterTable,
731    query_ctx: &QueryContextRef,
732) -> Result<AlterTableExpr> {
733    let (catalog_name, schema_name, table_name) =
734        table_idents_to_full_name(alter_table.table_name(), query_ctx)
735            .map_err(BoxedError::new)
736            .context(ExternalSnafu)?;
737
738    let kind = match alter_table.alter_operation {
739        AlterTableOperation::AddConstraint(_) => {
740            return NotSupportedSnafu {
741                feat: "ADD CONSTRAINT",
742            }
743            .fail();
744        }
745        AlterTableOperation::AddColumns { add_columns } => AlterTableKind::AddColumns(AddColumns {
746            add_columns: add_columns
747                .into_iter()
748                .map(|add_column| {
749                    let column_def = sql_column_def_to_grpc_column_def(
750                        &add_column.column_def,
751                        Some(&query_ctx.timezone()),
752                    )
753                    .map_err(BoxedError::new)
754                    .context(ExternalSnafu)?;
755                    if is_interval_type(&column_def.data_type()) {
756                        return NotSupportedSnafu {
757                            feat: "Add column with interval type",
758                        }
759                        .fail();
760                    }
761                    Ok(AddColumn {
762                        column_def: Some(column_def),
763                        location: add_column.location.as_ref().map(From::from),
764                        add_if_not_exists: add_column.add_if_not_exists,
765                    })
766                })
767                .collect::<Result<Vec<AddColumn>>>()?,
768        }),
769        AlterTableOperation::ModifyColumnType {
770            column_name,
771            target_type,
772        } => {
773            let target_type =
774                sql_data_type_to_concrete_data_type(&target_type, &Default::default())
775                    .context(ParseSqlSnafu)?;
776            let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
777                .map(|w| w.to_parts())
778                .context(ColumnDataTypeSnafu)?;
779            if is_interval_type(&target_type) {
780                return NotSupportedSnafu {
781                    feat: "Modify column type to interval type",
782                }
783                .fail();
784            }
785            AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
786                modify_column_types: vec![ModifyColumnType {
787                    column_name: column_name.value,
788                    target_type: target_type as i32,
789                    target_type_extension,
790                }],
791            })
792        }
793        AlterTableOperation::DropColumn { name } => AlterTableKind::DropColumns(DropColumns {
794            drop_columns: vec![DropColumn {
795                name: name.value.clone(),
796            }],
797        }),
798        AlterTableOperation::RenameTable { new_table_name } => {
799            AlterTableKind::RenameTable(RenameTable {
800                new_table_name: new_table_name.clone(),
801            })
802        }
803        AlterTableOperation::SetTableOptions { options } => {
804            AlterTableKind::SetTableOptions(SetTableOptions {
805                table_options: options.into_iter().map(Into::into).collect(),
806            })
807        }
808        AlterTableOperation::UnsetTableOptions { keys } => {
809            AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys })
810        }
811        AlterTableOperation::Repartition { .. } => {
812            return NotSupportedSnafu {
813                feat: "ALTER TABLE ... REPARTITION",
814            }
815            .fail();
816        }
817        AlterTableOperation::SetIndex { options } => {
818            let option = match options {
819                sql::statements::alter::SetIndexOperation::Fulltext {
820                    column_name,
821                    options,
822                } => SetIndex {
823                    options: Some(set_index::Options::Fulltext(SetFulltext {
824                        column_name: column_name.value,
825                        enable: options.enable,
826                        analyzer: match options.analyzer {
827                            FulltextAnalyzer::English => Analyzer::English.into(),
828                            FulltextAnalyzer::Chinese => Analyzer::Chinese.into(),
829                        },
830                        case_sensitive: options.case_sensitive,
831                        backend: match options.backend {
832                            FulltextBackend::Bloom => PbFulltextBackend::Bloom.into(),
833                            FulltextBackend::Tantivy => PbFulltextBackend::Tantivy.into(),
834                        },
835                        granularity: options.granularity as u64,
836                        false_positive_rate: options.false_positive_rate(),
837                    })),
838                },
839                sql::statements::alter::SetIndexOperation::Inverted { column_name } => SetIndex {
840                    options: Some(set_index::Options::Inverted(SetInverted {
841                        column_name: column_name.value,
842                    })),
843                },
844                sql::statements::alter::SetIndexOperation::Skipping {
845                    column_name,
846                    options,
847                } => SetIndex {
848                    options: Some(set_index::Options::Skipping(SetSkipping {
849                        column_name: column_name.value,
850                        enable: true,
851                        granularity: options.granularity as u64,
852                        false_positive_rate: options.false_positive_rate(),
853                        skipping_index_type: match options.index_type {
854                            SkippingIndexType::BloomFilter => {
855                                PbSkippingIndexType::BloomFilter.into()
856                            }
857                        },
858                    })),
859                },
860            };
861            AlterTableKind::SetIndexes(SetIndexes {
862                set_indexes: vec![option],
863            })
864        }
865        AlterTableOperation::UnsetIndex { options } => {
866            let option = match options {
867                sql::statements::alter::UnsetIndexOperation::Fulltext { column_name } => {
868                    UnsetIndex {
869                        options: Some(unset_index::Options::Fulltext(UnsetFulltext {
870                            column_name: column_name.value,
871                        })),
872                    }
873                }
874                sql::statements::alter::UnsetIndexOperation::Inverted { column_name } => {
875                    UnsetIndex {
876                        options: Some(unset_index::Options::Inverted(UnsetInverted {
877                            column_name: column_name.value,
878                        })),
879                    }
880                }
881                sql::statements::alter::UnsetIndexOperation::Skipping { column_name } => {
882                    UnsetIndex {
883                        options: Some(unset_index::Options::Skipping(UnsetSkipping {
884                            column_name: column_name.value,
885                        })),
886                    }
887                }
888            };
889
890            AlterTableKind::UnsetIndexes(UnsetIndexes {
891                unset_indexes: vec![option],
892            })
893        }
894        AlterTableOperation::DropDefaults { columns } => {
895            AlterTableKind::DropDefaults(DropDefaults {
896                drop_defaults: columns
897                    .into_iter()
898                    .map(|col| {
899                        let column_name = col.0.to_string();
900                        Ok(api::v1::DropDefault { column_name })
901                    })
902                    .collect::<Result<Vec<_>>>()?,
903            })
904        }
905        AlterTableOperation::SetDefaults { defaults } => AlterTableKind::SetDefaults(SetDefaults {
906            set_defaults: defaults
907                .into_iter()
908                .map(|col| {
909                    let column_name = col.column_name.to_string();
910                    let default_constraint = serde_json::to_string(&col.default_constraint)
911                        .context(EncodeJsonSnafu)?
912                        .into_bytes();
913                    Ok(api::v1::SetDefault {
914                        column_name,
915                        default_constraint,
916                    })
917                })
918                .collect::<Result<Vec<_>>>()?,
919        }),
920    };
921
922    Ok(AlterTableExpr {
923        catalog_name,
924        schema_name,
925        table_name,
926        kind: Some(kind),
927    })
928}
929
930/// Try to cast the `[AlterDatabase]` statement into gRPC `[AlterDatabaseExpr]`.
931pub fn to_alter_database_expr(
932    alter_database: AlterDatabase,
933    query_ctx: &QueryContextRef,
934) -> Result<AlterDatabaseExpr> {
935    let catalog = query_ctx.current_catalog();
936    let schema = alter_database.database_name;
937
938    let kind = match alter_database.alter_operation {
939        AlterDatabaseOperation::SetDatabaseOption { options } => {
940            let options = options.into_iter().map(Into::into).collect();
941            AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
942                set_database_options: options,
943            })
944        }
945        AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
946            AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys })
947        }
948    };
949
950    Ok(AlterDatabaseExpr {
951        catalog_name: catalog.to_string(),
952        schema_name: schema.to_string(),
953        kind: Some(kind),
954    })
955}
956
957/// Try to cast the `[CreateViewExpr]` statement into gRPC `[CreateViewExpr]`.
958pub fn to_create_view_expr(
959    stmt: CreateView,
960    logical_plan: Vec<u8>,
961    table_names: Vec<TableName>,
962    columns: Vec<String>,
963    plan_columns: Vec<String>,
964    definition: String,
965    query_ctx: QueryContextRef,
966) -> Result<CreateViewExpr> {
967    let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx)
968        .map_err(BoxedError::new)
969        .context(ExternalSnafu)?;
970
971    let expr = CreateViewExpr {
972        catalog_name,
973        schema_name,
974        view_name,
975        logical_plan,
976        create_if_not_exists: stmt.if_not_exists,
977        or_replace: stmt.or_replace,
978        table_names,
979        columns,
980        plan_columns,
981        definition,
982    };
983
984    Ok(expr)
985}
986
987pub fn to_create_flow_task_expr(
988    create_flow: CreateFlow,
989    query_ctx: &QueryContextRef,
990) -> Result<CreateFlowExpr> {
991    // retrieve sink table name
992    let sink_table_ref = object_name_to_table_reference(create_flow.sink_table_name.clone(), true)
993        .with_context(|_| ConvertIdentifierSnafu {
994            ident: create_flow.sink_table_name.to_string(),
995        })?;
996    let catalog = sink_table_ref
997        .catalog()
998        .unwrap_or(query_ctx.current_catalog())
999        .to_string();
1000    let schema = sink_table_ref
1001        .schema()
1002        .map(|s| s.to_owned())
1003        .unwrap_or(query_ctx.current_schema());
1004
1005    let sink_table_name = TableName {
1006        catalog_name: catalog,
1007        schema_name: schema,
1008        table_name: sink_table_ref.table().to_string(),
1009    };
1010
1011    let source_table_names = extract_tables_from_query(&create_flow.query)
1012        .map(|name| {
1013            let reference =
1014                object_name_to_table_reference(name.clone(), true).with_context(|_| {
1015                    ConvertIdentifierSnafu {
1016                        ident: name.to_string(),
1017                    }
1018                })?;
1019            let catalog = reference
1020                .catalog()
1021                .unwrap_or(query_ctx.current_catalog())
1022                .to_string();
1023            let schema = reference
1024                .schema()
1025                .map(|s| s.to_string())
1026                .unwrap_or(query_ctx.current_schema());
1027
1028            let table_name = TableName {
1029                catalog_name: catalog,
1030                schema_name: schema,
1031                table_name: reference.table().to_string(),
1032            };
1033            Ok(table_name)
1034        })
1035        .collect::<Result<Vec<_>>>()?;
1036
1037    let eval_interval = create_flow.eval_interval;
1038
1039    Ok(CreateFlowExpr {
1040        catalog_name: query_ctx.current_catalog().to_string(),
1041        flow_name: sanitize_flow_name(create_flow.flow_name)?,
1042        source_table_names,
1043        sink_table_name: Some(sink_table_name),
1044        or_replace: create_flow.or_replace,
1045        create_if_not_exists: create_flow.if_not_exists,
1046        expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
1047        eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }),
1048        comment: create_flow.comment.unwrap_or_default(),
1049        sql: create_flow.query.to_string(),
1050        flow_options: stringify_flow_options(create_flow.flow_options)?,
1051    })
1052}
1053
1054fn stringify_flow_options(flow_options: OptionMap) -> Result<HashMap<String, String>> {
1055    let options_len = flow_options.len();
1056    let flow_options = flow_options.into_map();
1057    ensure!(
1058        flow_options.len() == options_len,
1059        InvalidSqlSnafu {
1060            err_msg: "flow options only support scalar string-compatible values".to_string(),
1061        }
1062    );
1063    Ok(flow_options)
1064}
1065
1066/// sanitize the flow name, remove possible quotes
1067fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
1068    ensure!(
1069        flow_name.0.len() == 1,
1070        InvalidFlowNameSnafu {
1071            name: flow_name.to_string(),
1072        }
1073    );
1074    // safety: we've checked flow_name.0 has exactly one element.
1075    Ok(flow_name.0.swap_remove(0).to_string_unquoted())
1076}
1077
1078#[cfg(test)]
1079mod tests {
1080    use std::collections::HashMap;
1081
1082    use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
1083    use datatypes::value::Value;
1084    use session::context::{QueryContext, QueryContextBuilder};
1085    use sql::dialect::GreptimeDbDialect;
1086    use sql::parser::{ParseOptions, ParserContext};
1087    use sql::statements::statement::Statement;
1088    use store_api::storage::ColumnDefaultConstraint;
1089
1090    use super::*;
1091
1092    #[test]
1093    fn test_create_flow_tql_expr() {
1094        let sql = r#"
1095CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1096TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#;
1097        let stmt =
1098            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1099
1100        assert!(
1101            stmt.is_err(),
1102            "Expected error for invalid TQL EVAL parameters: {:#?}",
1103            stmt
1104        );
1105
1106        let sql = r#"
1107CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1108TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests);"#;
1109        let stmt =
1110            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1111                .unwrap()
1112                .pop()
1113                .unwrap();
1114
1115        let Statement::CreateFlow(create_flow) = stmt else {
1116            unreachable!()
1117        };
1118        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1119
1120        let to_dot_sep =
1121            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1122        assert_eq!("calc_reqs", expr.flow_name);
1123        assert_eq!("greptime", expr.catalog_name);
1124        assert_eq!(
1125            "greptime.public.cnt_reqs",
1126            expr.sink_table_name.map(to_dot_sep).unwrap()
1127        );
1128        assert_eq!(1, expr.source_table_names.len());
1129        assert_eq!(
1130            "greptime.public.http_requests",
1131            to_dot_sep(expr.source_table_names[0].clone())
1132        );
1133        assert_eq!(
1134            r#"TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests)"#,
1135            expr.sql
1136        );
1137
1138        let sql = r#"
1139CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1140TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests{__schema__="greptime_private"});"#;
1141        let stmt =
1142            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1143                .unwrap()
1144                .pop()
1145                .unwrap();
1146        let Statement::CreateFlow(create_flow) = stmt else {
1147            unreachable!()
1148        };
1149        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1150        assert_eq!(1, expr.source_table_names.len());
1151        assert_eq!(
1152            "greptime.greptime_private.http_requests",
1153            to_dot_sep(expr.source_table_names[0].clone())
1154        );
1155
1156        let sql = r#"
1157CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1158TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests{__database__="greptime_private"});"#;
1159        let stmt =
1160            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1161                .unwrap()
1162                .pop()
1163                .unwrap();
1164        let Statement::CreateFlow(create_flow) = stmt else {
1165            unreachable!()
1166        };
1167        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1168        assert_eq!(1, expr.source_table_names.len());
1169        assert_eq!(
1170            "greptime.greptime_private.http_requests",
1171            to_dot_sep(expr.source_table_names[0].clone())
1172        );
1173    }
1174
1175    #[test]
1176    fn test_create_flow_tql_cte_source_tables() {
1177        let sql = r#"
1178CREATE FLOW calc_cte
1179SINK TO metric_cte_sink
1180EVAL INTERVAL '1m'
1181AS
1182WITH tql(ts, the_value) AS (
1183  TQL EVAL (now() - '1m'::interval, now(), '5s') metric_cte
1184)
1185SELECT * FROM tql;
1186"#;
1187
1188        let stmt =
1189            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1190                .unwrap()
1191                .pop()
1192                .unwrap();
1193
1194        let Statement::CreateFlow(create_flow) = stmt else {
1195            unreachable!()
1196        };
1197        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1198
1199        let to_dot_sep =
1200            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1201        assert_eq!(1, expr.source_table_names.len());
1202        assert_eq!(
1203            "greptime.public.metric_cte",
1204            to_dot_sep(expr.source_table_names[0].clone())
1205        );
1206    }
1207
1208    #[test]
1209    fn test_create_flow_tql_cte_source_tables_quoted_cte_name() {
1210        let sql = r#"
1211CREATE FLOW calc_cte
1212SINK TO metric_cte_sink
1213EVAL INTERVAL '1m'
1214AS
1215WITH "TQL"(ts, the_value) AS (
1216  TQL EVAL (now() - '1m'::interval, now(), '5s') metric_cte
1217)
1218SELECT * FROM "TQL";
1219"#;
1220
1221        let stmt =
1222            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1223                .unwrap()
1224                .pop()
1225                .unwrap();
1226
1227        let Statement::CreateFlow(create_flow) = stmt else {
1228            unreachable!()
1229        };
1230        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1231
1232        let to_dot_sep =
1233            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1234        assert_eq!(1, expr.source_table_names.len());
1235        assert_eq!(
1236            "greptime.public.metric_cte",
1237            to_dot_sep(expr.source_table_names[0].clone())
1238        );
1239    }
1240
1241    #[test]
1242    fn test_create_flow_tql_cte_source_tables_same_name() {
1243        let sql = r#"
1244CREATE FLOW calc_cte
1245SINK TO metric_cte_sink
1246EVAL INTERVAL '1m'
1247AS
1248WITH tql(ts, the_value) AS (
1249  TQL EVAL (now() - '1m'::interval, now(), '5s') tql
1250)
1251SELECT * FROM tql;
1252"#;
1253
1254        let stmt =
1255            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1256                .unwrap()
1257                .pop()
1258                .unwrap();
1259
1260        let Statement::CreateFlow(create_flow) = stmt else {
1261            unreachable!()
1262        };
1263        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1264
1265        let to_dot_sep =
1266            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1267        assert_eq!(1, expr.source_table_names.len());
1268        assert_eq!(
1269            "greptime.public.tql",
1270            to_dot_sep(expr.source_table_names[0].clone())
1271        );
1272    }
1273
1274    #[test]
1275    fn test_create_flow_expr() {
1276        let sql = r"
1277CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
1278SELECT
1279    DISTINCT number as dis
1280FROM
1281    distinct_basic;";
1282        let stmt =
1283            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1284                .unwrap()
1285                .pop()
1286                .unwrap();
1287
1288        let Statement::CreateFlow(create_flow) = stmt else {
1289            unreachable!()
1290        };
1291        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1292
1293        let to_dot_sep =
1294            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1295        assert_eq!("test_distinct_basic", expr.flow_name);
1296        assert_eq!("greptime", expr.catalog_name);
1297        assert_eq!(
1298            "greptime.public.out_distinct_basic",
1299            expr.sink_table_name.map(to_dot_sep).unwrap()
1300        );
1301        assert_eq!(1, expr.source_table_names.len());
1302        assert_eq!(
1303            "greptime.public.distinct_basic",
1304            to_dot_sep(expr.source_table_names[0].clone())
1305        );
1306        assert_eq!(
1307            r"SELECT
1308    DISTINCT number as dis
1309FROM
1310    distinct_basic",
1311            expr.sql
1312        );
1313
1314        let sql = r"
1315CREATE FLOW `task_2`
1316SINK TO schema_1.table_1
1317AS
1318SELECT max(c1), min(c2) FROM schema_2.table_2;";
1319        let stmt =
1320            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1321                .unwrap()
1322                .pop()
1323                .unwrap();
1324
1325        let Statement::CreateFlow(create_flow) = stmt else {
1326            unreachable!()
1327        };
1328        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1329
1330        let to_dot_sep =
1331            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1332        assert_eq!("task_2", expr.flow_name);
1333        assert_eq!("greptime", expr.catalog_name);
1334        assert_eq!(
1335            "greptime.schema_1.table_1",
1336            expr.sink_table_name.map(to_dot_sep).unwrap()
1337        );
1338        assert_eq!(1, expr.source_table_names.len());
1339        assert_eq!(
1340            "greptime.schema_2.table_2",
1341            to_dot_sep(expr.source_table_names[0].clone())
1342        );
1343        assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
1344        assert!(expr.flow_options.is_empty());
1345
1346        let sql = r"
1347CREATE FLOW task_3
1348SINK TO schema_1.table_1
1349WITH (defer_on_missing_source = 'true', foo = 'bar')
1350AS
1351SELECT max(c1), min(c2) FROM schema_2.table_2;";
1352        let stmt =
1353            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1354                .unwrap()
1355                .pop()
1356                .unwrap();
1357
1358        let Statement::CreateFlow(create_flow) = stmt else {
1359            unreachable!()
1360        };
1361        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1362        assert_eq!(
1363            expr.flow_options,
1364            HashMap::from([
1365                ("defer_on_missing_source".to_string(), "true".to_string()),
1366                ("foo".to_string(), "bar".to_string()),
1367            ])
1368        );
1369
1370        let sql = r"
1371CREATE FLOW task_4
1372SINK TO schema_1.table_1
1373WITH (defer_on_missing_source = true)
1374AS
1375SELECT max(c1), min(c2) FROM schema_2.table_2;";
1376        let stmt =
1377            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1378                .unwrap()
1379                .pop()
1380                .unwrap();
1381
1382        let Statement::CreateFlow(create_flow) = stmt else {
1383            unreachable!()
1384        };
1385        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1386        assert_eq!(
1387            expr.flow_options,
1388            HashMap::from([("defer_on_missing_source".to_string(), "true".to_string(),)])
1389        );
1390
1391        let sql = r"
1392CREATE FLOW task_5
1393SINK TO schema_1.table_1
1394WITH (defer_on_missing_source = [true])
1395AS
1396SELECT max(c1), min(c2) FROM schema_2.table_2;";
1397        let stmt =
1398            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1399                .unwrap()
1400                .pop()
1401                .unwrap();
1402
1403        let Statement::CreateFlow(create_flow) = stmt else {
1404            unreachable!()
1405        };
1406        let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
1407        assert!(res.is_err());
1408        assert!(
1409            res.unwrap_err()
1410                .to_string()
1411                .contains("flow options only support scalar string-compatible values")
1412        );
1413
1414        let sql = r"
1415CREATE FLOW abc.`task_2`
1416SINK TO schema_1.table_1
1417AS
1418SELECT max(c1), min(c2) FROM schema_2.table_2;";
1419        let stmt =
1420            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1421                .unwrap()
1422                .pop()
1423                .unwrap();
1424
1425        let Statement::CreateFlow(create_flow) = stmt else {
1426            unreachable!()
1427        };
1428        let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
1429
1430        assert!(res.is_err());
1431        assert!(
1432            res.unwrap_err()
1433                .to_string()
1434                .contains("Invalid flow name: abc.`task_2`")
1435        );
1436    }
1437
1438    #[test]
1439    fn test_create_to_expr() {
1440        let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(ttl='3days', write_buffer_size='1024KB');";
1441        let stmt =
1442            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1443                .unwrap()
1444                .pop()
1445                .unwrap();
1446
1447        let Statement::CreateTable(create_table) = stmt else {
1448            unreachable!()
1449        };
1450        let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1451        assert_eq!("3days", expr.table_options.get("ttl").unwrap());
1452        assert_eq!(
1453            "1.0MiB",
1454            expr.table_options.get("write_buffer_size").unwrap()
1455        );
1456    }
1457
1458    #[test]
1459    fn test_invalid_create_to_expr() {
1460        let cases = [
1461            // duplicate column declaration
1462            "CREATE TABLE monitor (host STRING primary key, ts TIMESTAMP TIME INDEX, some_column text, some_column string);",
1463            // duplicate primary key
1464            "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, some_column STRING, PRIMARY KEY (some_column, host, some_column));",
1465            // time index is primary key
1466            "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, PRIMARY KEY (host, ts));",
1467        ];
1468
1469        for sql in cases {
1470            let stmt = ParserContext::create_with_dialect(
1471                sql,
1472                &GreptimeDbDialect {},
1473                ParseOptions::default(),
1474            )
1475            .unwrap()
1476            .pop()
1477            .unwrap();
1478            let Statement::CreateTable(create_table) = stmt else {
1479                unreachable!()
1480            };
1481            create_to_expr(&create_table, &QueryContext::arc()).unwrap_err();
1482        }
1483    }
1484
1485    #[test]
1486    fn test_create_to_expr_with_default_timestamp_value() {
1487        let sql = "CREATE TABLE monitor (v double,ts TIMESTAMP default '2024-01-30T00:01:01',TIME INDEX (ts)) engine=mito;";
1488        let stmt =
1489            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1490                .unwrap()
1491                .pop()
1492                .unwrap();
1493
1494        let Statement::CreateTable(create_table) = stmt else {
1495            unreachable!()
1496        };
1497
1498        // query context with system timezone UTC.
1499        let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1500        let ts_column = &expr.column_defs[1];
1501        let constraint = assert_ts_column(ts_column);
1502        assert!(
1503            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1504                         if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1505        );
1506
1507        // query context with timezone `+08:00`
1508        let ctx = QueryContextBuilder::default()
1509            .timezone(Timezone::from_tz_string("+08:00").unwrap())
1510            .build()
1511            .into();
1512        let expr = create_to_expr(&create_table, &ctx).unwrap();
1513        let ts_column = &expr.column_defs[1];
1514        let constraint = assert_ts_column(ts_column);
1515        assert!(
1516            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1517                         if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1518        );
1519    }
1520
1521    fn assert_ts_column(ts_column: &api::v1::ColumnDef) -> ColumnDefaultConstraint {
1522        assert_eq!("ts", ts_column.name);
1523        assert_eq!(
1524            ColumnDataType::TimestampMillisecond as i32,
1525            ts_column.data_type
1526        );
1527        assert!(!ts_column.default_constraint.is_empty());
1528
1529        ColumnDefaultConstraint::try_from(&ts_column.default_constraint[..]).unwrap()
1530    }
1531
1532    #[test]
1533    fn test_to_alter_expr() {
1534        let sql = "ALTER DATABASE greptime SET key1='value1', key2='value2';";
1535        let stmt =
1536            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1537                .unwrap()
1538                .pop()
1539                .unwrap();
1540
1541        let Statement::AlterDatabase(alter_database) = stmt else {
1542            unreachable!()
1543        };
1544
1545        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1546        let kind = expr.kind.unwrap();
1547
1548        let AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
1549            set_database_options,
1550        }) = kind
1551        else {
1552            unreachable!()
1553        };
1554
1555        assert_eq!(2, set_database_options.len());
1556        assert_eq!("key1", set_database_options[0].key);
1557        assert_eq!("value1", set_database_options[0].value);
1558        assert_eq!("key2", set_database_options[1].key);
1559        assert_eq!("value2", set_database_options[1].value);
1560
1561        let sql = "ALTER DATABASE greptime UNSET key1, key2;";
1562        let stmt =
1563            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1564                .unwrap()
1565                .pop()
1566                .unwrap();
1567
1568        let Statement::AlterDatabase(alter_database) = stmt else {
1569            unreachable!()
1570        };
1571
1572        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1573        let kind = expr.kind.unwrap();
1574
1575        let AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys }) = kind else {
1576            unreachable!()
1577        };
1578
1579        assert_eq!(2, keys.len());
1580        assert!(keys.contains(&"key1".to_string()));
1581        assert!(keys.contains(&"key2".to_string()));
1582
1583        let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
1584        let stmt =
1585            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1586                .unwrap()
1587                .pop()
1588                .unwrap();
1589
1590        let Statement::AlterTable(alter_table) = stmt else {
1591            unreachable!()
1592        };
1593
1594        // query context with system timezone UTC.
1595        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1596        let kind = expr.kind.unwrap();
1597
1598        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1599            unreachable!()
1600        };
1601
1602        assert_eq!(1, add_columns.len());
1603        let ts_column = add_columns[0].column_def.clone().unwrap();
1604        let constraint = assert_ts_column(&ts_column);
1605        assert!(
1606            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1607                         if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1608        );
1609
1610        //
1611        // query context with timezone `+08:00`
1612        let ctx = QueryContextBuilder::default()
1613            .timezone(Timezone::from_tz_string("+08:00").unwrap())
1614            .build()
1615            .into();
1616        let expr = to_alter_table_expr(alter_table, &ctx).unwrap();
1617        let kind = expr.kind.unwrap();
1618
1619        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1620            unreachable!()
1621        };
1622
1623        assert_eq!(1, add_columns.len());
1624        let ts_column = add_columns[0].column_def.clone().unwrap();
1625        let constraint = assert_ts_column(&ts_column);
1626        assert!(
1627            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1628                         if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1629        );
1630    }
1631
1632    #[test]
1633    fn test_to_alter_modify_column_type_expr() {
1634        let sql = "ALTER TABLE monitor MODIFY COLUMN mem_usage STRING;";
1635        let stmt =
1636            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1637                .unwrap()
1638                .pop()
1639                .unwrap();
1640
1641        let Statement::AlterTable(alter_table) = stmt else {
1642            unreachable!()
1643        };
1644
1645        // query context with system timezone UTC.
1646        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1647        let kind = expr.kind.unwrap();
1648
1649        let AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
1650            modify_column_types,
1651        }) = kind
1652        else {
1653            unreachable!()
1654        };
1655
1656        assert_eq!(1, modify_column_types.len());
1657        let modify_column_type = &modify_column_types[0];
1658
1659        assert_eq!("mem_usage", modify_column_type.column_name);
1660        assert_eq!(
1661            ColumnDataType::String as i32,
1662            modify_column_type.target_type
1663        );
1664        assert!(modify_column_type.target_type_extension.is_none());
1665    }
1666
1667    #[test]
1668    fn test_to_repartition_request() {
1669        let sql = r#"
1670ALTER TABLE metrics REPARTITION (
1671  device_id < 100
1672) INTO (
1673  device_id < 100 AND area < 'South',
1674  device_id < 100 AND area >= 'South'
1675);"#;
1676        let stmt =
1677            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1678                .unwrap()
1679                .pop()
1680                .unwrap();
1681
1682        let Statement::AlterTable(alter_table) = stmt else {
1683            unreachable!()
1684        };
1685
1686        let request = to_repartition_request(alter_table, &QueryContext::arc()).unwrap();
1687        assert_eq!("greptime", request.catalog_name);
1688        assert_eq!("public", request.schema_name);
1689        assert_eq!("metrics", request.table_name);
1690        assert_eq!(
1691            request
1692                .from_exprs
1693                .into_iter()
1694                .map(|x| x.to_string())
1695                .collect::<Vec<_>>(),
1696            vec!["device_id < 100".to_string()]
1697        );
1698        assert_eq!(
1699            request
1700                .into_exprs
1701                .into_iter()
1702                .map(|x| x.to_string())
1703                .collect::<Vec<_>>(),
1704            vec![
1705                "device_id < 100 AND area < 'South'".to_string(),
1706                "device_id < 100 AND area >= 'South'".to_string()
1707            ]
1708        );
1709    }
1710
1711    fn new_test_table_names() -> Vec<TableName> {
1712        vec![
1713            TableName {
1714                catalog_name: "greptime".to_string(),
1715                schema_name: "public".to_string(),
1716                table_name: "a_table".to_string(),
1717            },
1718            TableName {
1719                catalog_name: "greptime".to_string(),
1720                schema_name: "public".to_string(),
1721                table_name: "b_table".to_string(),
1722            },
1723        ]
1724    }
1725
1726    #[test]
1727    fn test_to_create_view_expr() {
1728        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
1729        let stmt =
1730            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1731                .unwrap()
1732                .pop()
1733                .unwrap();
1734
1735        let Statement::CreateView(stmt) = stmt else {
1736            unreachable!()
1737        };
1738
1739        let logical_plan = vec![1, 2, 3];
1740        let table_names = new_test_table_names();
1741        let columns = vec!["a".to_string()];
1742        let plan_columns = vec!["number".to_string()];
1743
1744        let expr = to_create_view_expr(
1745            stmt,
1746            logical_plan.clone(),
1747            table_names.clone(),
1748            columns.clone(),
1749            plan_columns.clone(),
1750            sql.to_string(),
1751            QueryContext::arc(),
1752        )
1753        .unwrap();
1754
1755        assert_eq!("greptime", expr.catalog_name);
1756        assert_eq!("public", expr.schema_name);
1757        assert_eq!("test", expr.view_name);
1758        assert!(!expr.create_if_not_exists);
1759        assert!(!expr.or_replace);
1760        assert_eq!(logical_plan, expr.logical_plan);
1761        assert_eq!(table_names, expr.table_names);
1762        assert_eq!(sql, expr.definition);
1763        assert_eq!(columns, expr.columns);
1764        assert_eq!(plan_columns, expr.plan_columns);
1765    }
1766
1767    #[test]
1768    fn test_to_create_view_expr_complex() {
1769        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test.test_view AS SELECT * FROM NUMBERS";
1770        let stmt =
1771            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1772                .unwrap()
1773                .pop()
1774                .unwrap();
1775
1776        let Statement::CreateView(stmt) = stmt else {
1777            unreachable!()
1778        };
1779
1780        let logical_plan = vec![1, 2, 3];
1781        let table_names = new_test_table_names();
1782        let columns = vec!["a".to_string()];
1783        let plan_columns = vec!["number".to_string()];
1784
1785        let expr = to_create_view_expr(
1786            stmt,
1787            logical_plan.clone(),
1788            table_names.clone(),
1789            columns.clone(),
1790            plan_columns.clone(),
1791            sql.to_string(),
1792            QueryContext::arc(),
1793        )
1794        .unwrap();
1795
1796        assert_eq!("greptime", expr.catalog_name);
1797        assert_eq!("test", expr.schema_name);
1798        assert_eq!("test_view", expr.view_name);
1799        assert!(expr.create_if_not_exists);
1800        assert!(expr.or_replace);
1801        assert_eq!(logical_plan, expr.logical_plan);
1802        assert_eq!(table_names, expr.table_names);
1803        assert_eq!(sql, expr.definition);
1804        assert_eq!(columns, expr.columns);
1805        assert_eq!(plan_columns, expr.plan_columns);
1806    }
1807
1808    #[test]
1809    fn test_expr_to_create() {
1810        let sql = r#"CREATE TABLE IF NOT EXISTS `tt` (
1811  `timestamp` TIMESTAMP(9) NOT NULL,
1812  `ip_address` STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),
1813  `username` STRING NULL,
1814  `http_method` STRING NULL INVERTED INDEX,
1815  `request_line` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),
1816  `protocol` STRING NULL,
1817  `status_code` INT NULL INVERTED INDEX,
1818  `response_size` BIGINT NULL,
1819  `message` STRING NULL,
1820  TIME INDEX (`timestamp`),
1821  PRIMARY KEY (`username`, `status_code`)
1822)
1823ENGINE=mito
1824WITH(
1825  append_mode = 'true'
1826)"#;
1827        let stmt =
1828            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1829                .unwrap()
1830                .pop()
1831                .unwrap();
1832
1833        let Statement::CreateTable(original_create) = stmt else {
1834            unreachable!()
1835        };
1836
1837        // Convert CreateTable -> CreateTableExpr -> CreateTable
1838        let expr = create_to_expr(&original_create, &QueryContext::arc()).unwrap();
1839
1840        let create_table = expr_to_create(&expr, Some('`')).unwrap();
1841        let new_sql = format!("{:#}", create_table);
1842        assert_eq!(sql, new_sql);
1843    }
1844}