Skip to main content

operator/
expr_helper.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19
20use api::helper::ColumnDataTypeWrapper;
21use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
22use api::v1::alter_table_expr::Kind as AlterTableKind;
23use api::v1::column_def::{options_from_column_schema, try_as_column_schema};
24use api::v1::{
25    AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer, ColumnDataType,
26    ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn,
27    DropColumns, DropDefaults, ExpireAfter, FulltextBackend as PbFulltextBackend, ModifyColumnType,
28    ModifyColumnTypes, RenameTable, SemanticType, SetDatabaseOptions, SetDefaults, SetFulltext,
29    SetIndex, SetIndexes, SetInverted, SetSkipping, SetTableOptions,
30    SkippingIndexType as PbSkippingIndexType, TableName, UnsetDatabaseOptions, UnsetFulltext,
31    UnsetIndex, UnsetIndexes, UnsetInverted, UnsetSkipping, UnsetTableOptions, set_index,
32    unset_index,
33};
34use common_error::ext::BoxedError;
35use common_grpc_expr::util::ColumnExpr;
36use common_time::Timezone;
37use datafusion::sql::planner::object_name_to_table_reference;
38use datatypes::schema::{
39    COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
40    COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
41    COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
42    COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
43    ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema,
44    SkippingIndexType,
45};
46use file_engine::FileOptions;
47use query::sql::{
48    check_file_to_table_schema_compatibility, file_column_schemas_to_table,
49    infer_file_table_schema, prepare_file_table_files,
50};
51use session::context::QueryContextRef;
52use session::table_name::table_idents_to_full_name;
53use snafu::{OptionExt, ResultExt, ensure};
54use sql::ast::{
55    ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName, ObjectNamePartExt,
56};
57use sql::dialect::GreptimeDbDialect;
58use sql::parser::ParserContext;
59use sql::statements::alter::{
60    AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
61};
62use sql::statements::create::{
63    Column as SqlColumn, ColumnExtensions, CreateExternalTable, CreateFlow, CreateTable,
64    CreateView, TableConstraint,
65};
66use sql::statements::{
67    OptionMap, column_to_schema, concrete_data_type_to_sql_data_type,
68    sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type, value_to_sql_value,
69};
70use sql::util::extract_tables_from_query;
71use store_api::mito_engine_options::{COMPACTION_OVERRIDE, COMPACTION_TYPE};
72use table::requests::{FILE_TABLE_META_KEY, TableOptions};
73use table::table_reference::TableReference;
74#[cfg(feature = "enterprise")]
75pub use trigger::to_create_trigger_task_expr;
76
77use crate::error::{
78    BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
79    ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, FindNewColumnsOnInsertionSnafu,
80    IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidColumnDefSnafu,
81    InvalidFlowNameSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, ParseSqlValueSnafu,
82    PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu,
83};
84
85pub fn create_table_expr_by_column_schemas(
86    table_name: &TableReference<'_>,
87    column_schemas: &[api::v1::ColumnSchema],
88    engine: &str,
89    desc: Option<&str>,
90) -> Result<CreateTableExpr> {
91    let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
92    let expr = common_grpc_expr::util::build_create_table_expr(
93        None,
94        table_name,
95        column_exprs,
96        engine,
97        desc.unwrap_or("Created on insertion"),
98    )
99    .context(BuildCreateExprOnInsertionSnafu)?;
100
101    validate_create_expr(&expr)?;
102    Ok(expr)
103}
104
105pub fn extract_add_columns_expr(
106    schema: &Schema,
107    column_exprs: Vec<ColumnExpr>,
108) -> Result<Option<AddColumns>> {
109    let add_columns = common_grpc_expr::util::extract_new_columns(schema, column_exprs)
110        .context(FindNewColumnsOnInsertionSnafu)?;
111    if let Some(add_columns) = &add_columns {
112        validate_add_columns_expr(add_columns)?;
113    }
114    Ok(add_columns)
115}
116
117//   cpu float64,
118//   memory float64,
119//   TIME INDEX (ts),
120//   PRIMARY KEY(host)
121// ) WITH (location='/var/data/city.csv', format='csv');
122// ```
123// The user needs to specify the TIME INDEX column. If there is no suitable
124// column in the file to use as TIME INDEX, an additional placeholder column
125// needs to be created as the TIME INDEX, and a `DEFAULT <value>` constraint
126// should be added.
127//
128//
129// When the `CREATE EXTERNAL TABLE` statement is in inferred form, like
130// ```sql
131// CREATE EXTERNAL TABLE IF NOT EXISTS city WITH (location='/var/data/city.csv',format='csv');
132// ```
133// 1. If the TIME INDEX column can be inferred from metadata, use that column
134//    as the TIME INDEX. Otherwise,
135// 2. If a column named `greptime_timestamp` exists (with the requirement that
136//    the column is with type TIMESTAMP, otherwise an error is thrown), use
137//    that column as the TIME INDEX. Otherwise,
138// 3. Automatically create the `greptime_timestamp` column and add a `DEFAULT 0`
139//    constraint.
140pub(crate) async fn create_external_expr(
141    create: CreateExternalTable,
142    query_ctx: &QueryContextRef,
143) -> Result<CreateTableExpr> {
144    let (catalog_name, schema_name, table_name) =
145        table_idents_to_full_name(&create.name, query_ctx)
146            .map_err(BoxedError::new)
147            .context(ExternalSnafu)?;
148
149    let mut table_options = create.options.into_map();
150
151    let (object_store, files) = prepare_file_table_files(&table_options)
152        .await
153        .context(PrepareFileTableSnafu)?;
154
155    let file_column_schemas = infer_file_table_schema(&object_store, &files, &table_options)
156        .await
157        .context(InferFileTableSchemaSnafu)?
158        .column_schemas()
159        .to_vec();
160
161    let (time_index, primary_keys, table_column_schemas) = if !create.columns.is_empty() {
162        // expanded form
163        let time_index = find_time_index(&create.constraints)?;
164        let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
165        let column_schemas =
166            columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
167        (time_index, primary_keys, column_schemas)
168    } else {
169        // inferred form
170        let (column_schemas, time_index) = file_column_schemas_to_table(&file_column_schemas);
171        let primary_keys = vec![];
172        (time_index, primary_keys, column_schemas)
173    };
174
175    check_file_to_table_schema_compatibility(&file_column_schemas, &table_column_schemas)
176        .context(SchemaIncompatibleSnafu)?;
177
178    let meta = FileOptions {
179        files,
180        file_column_schemas,
181    };
182    table_options.insert(
183        FILE_TABLE_META_KEY.to_string(),
184        serde_json::to_string(&meta).context(EncodeJsonSnafu)?,
185    );
186
187    let column_defs = column_schemas_to_defs(table_column_schemas, &primary_keys)?;
188    let expr = CreateTableExpr {
189        catalog_name,
190        schema_name,
191        table_name,
192        desc: String::default(),
193        column_defs,
194        time_index,
195        primary_keys,
196        create_if_not_exists: create.if_not_exists,
197        table_options,
198        table_id: None,
199        engine: create.engine.clone(),
200    };
201
202    Ok(expr)
203}
204
205/// Convert `CreateTable` statement to [`CreateTableExpr`] gRPC request.
206pub fn create_to_expr(
207    create: &CreateTable,
208    query_ctx: &QueryContextRef,
209) -> Result<CreateTableExpr> {
210    let (catalog_name, schema_name, table_name) =
211        table_idents_to_full_name(&create.name, query_ctx)
212            .map_err(BoxedError::new)
213            .context(ExternalSnafu)?;
214
215    let time_index = find_time_index(&create.constraints)?;
216    let table_options = HashMap::from(
217        &TableOptions::try_from_iter(create.options.to_str_map())
218            .context(UnrecognizedTableOptionSnafu)?,
219    );
220
221    let mut table_options = table_options;
222    if table_options.contains_key(COMPACTION_TYPE) {
223        table_options.insert(COMPACTION_OVERRIDE.to_string(), "true".to_string());
224    }
225
226    let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
227
228    let expr = CreateTableExpr {
229        catalog_name,
230        schema_name,
231        table_name,
232        desc: String::default(),
233        column_defs: columns_to_expr(
234            &create.columns,
235            &time_index,
236            &primary_keys,
237            Some(&query_ctx.timezone()),
238        )?,
239        time_index,
240        primary_keys,
241        create_if_not_exists: create.if_not_exists,
242        table_options,
243        table_id: None,
244        engine: create.engine.clone(),
245    };
246
247    validate_create_expr(&expr)?;
248    Ok(expr)
249}
250
251/// Convert gRPC's [`CreateTableExpr`] back to `CreateTable` statement.
252/// You can use `create_table_expr_by_column_schemas` to create a `CreateTableExpr` from column schemas.
253///
254/// # Parameters
255///
256/// * `expr` - The `CreateTableExpr` to convert
257/// * `quote_style` - Optional quote style for identifiers (defaults to MySQL style ` backtick)
258pub fn expr_to_create(expr: &CreateTableExpr, quote_style: Option<char>) -> Result<CreateTable> {
259    let quote_style = quote_style.unwrap_or('`');
260
261    // Convert table name
262    let table_name = ObjectName(vec![sql::ast::ObjectNamePart::Identifier(
263        sql::ast::Ident::with_quote(quote_style, &expr.table_name),
264    )]);
265
266    // Convert columns
267    let mut columns = Vec::with_capacity(expr.column_defs.len());
268    for column_def in &expr.column_defs {
269        let column_schema = try_as_column_schema(column_def).context(InvalidColumnDefSnafu {
270            column: &column_def.name,
271        })?;
272
273        let mut options = Vec::new();
274
275        // Add NULL/NOT NULL constraint
276        if column_def.is_nullable {
277            options.push(ColumnOptionDef {
278                name: None,
279                option: ColumnOption::Null,
280            });
281        } else {
282            options.push(ColumnOptionDef {
283                name: None,
284                option: ColumnOption::NotNull,
285            });
286        }
287
288        // Add DEFAULT constraint if present
289        if let Some(default_constraint) = column_schema.default_constraint() {
290            let expr = match default_constraint {
291                ColumnDefaultConstraint::Value(v) => {
292                    Expr::Value(value_to_sql_value(v).context(ParseSqlValueSnafu)?.into())
293                }
294                ColumnDefaultConstraint::Function(func_expr) => {
295                    ParserContext::parse_function(func_expr, &GreptimeDbDialect {})
296                        .context(ParseSqlSnafu)?
297                }
298            };
299            options.push(ColumnOptionDef {
300                name: None,
301                option: ColumnOption::Default(expr),
302            });
303        }
304
305        // Add COMMENT if present
306        if !column_def.comment.is_empty() {
307            options.push(ColumnOptionDef {
308                name: None,
309                option: ColumnOption::Comment(column_def.comment.clone()),
310            });
311        }
312
313        // Note: We don't add inline PRIMARY KEY options here,
314        // we'll handle all primary keys as constraints instead for consistency
315
316        // Handle column extensions (fulltext, inverted index, skipping index)
317        let mut extensions = ColumnExtensions::default();
318
319        // Add fulltext index options if present
320        if let Ok(Some(opt)) = column_schema.fulltext_options()
321            && opt.enable
322        {
323            let mut map = HashMap::from([
324                (
325                    COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
326                    opt.analyzer.to_string(),
327                ),
328                (
329                    COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
330                    opt.case_sensitive.to_string(),
331                ),
332                (
333                    COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
334                    opt.backend.to_string(),
335                ),
336            ]);
337            if opt.backend == FulltextBackend::Bloom {
338                map.insert(
339                    COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(),
340                    opt.granularity.to_string(),
341                );
342                map.insert(
343                    COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
344                    opt.false_positive_rate().to_string(),
345                );
346            }
347            extensions.fulltext_index_options = Some(map.into());
348        }
349
350        // Add skipping index options if present
351        if let Ok(Some(opt)) = column_schema.skipping_index_options() {
352            let map = HashMap::from([
353                (
354                    COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
355                    opt.granularity.to_string(),
356                ),
357                (
358                    COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
359                    opt.false_positive_rate().to_string(),
360                ),
361                (
362                    COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
363                    opt.index_type.to_string(),
364                ),
365            ]);
366            extensions.skipping_index_options = Some(map.into());
367        }
368
369        // Add inverted index options if present
370        if column_schema.is_inverted_indexed() {
371            extensions.inverted_index_options = Some(HashMap::new().into());
372        }
373
374        let sql_column = SqlColumn {
375            column_def: ColumnDef {
376                name: Ident::with_quote(quote_style, &column_def.name),
377                data_type: concrete_data_type_to_sql_data_type(&column_schema.data_type)
378                    .context(ParseSqlSnafu)?,
379                options,
380            },
381            extensions,
382        };
383
384        columns.push(sql_column);
385    }
386
387    // Convert constraints
388    let mut constraints = Vec::new();
389
390    // Add TIME INDEX constraint
391    constraints.push(TableConstraint::TimeIndex {
392        column: Ident::with_quote(quote_style, &expr.time_index),
393    });
394
395    // Add PRIMARY KEY constraint (always add as constraint for consistency)
396    if !expr.primary_keys.is_empty() {
397        let primary_key_columns: Vec<Ident> = expr
398            .primary_keys
399            .iter()
400            .map(|pk| Ident::with_quote(quote_style, pk))
401            .collect();
402
403        constraints.push(TableConstraint::PrimaryKey {
404            columns: primary_key_columns,
405        });
406    }
407
408    // Convert table options
409    let mut options = OptionMap::default();
410    for (key, value) in &expr.table_options {
411        options.insert(key.clone(), value.clone());
412    }
413
414    Ok(CreateTable {
415        if_not_exists: expr.create_if_not_exists,
416        table_id: expr.table_id.as_ref().map(|tid| tid.id).unwrap_or(0),
417        name: table_name,
418        columns,
419        engine: expr.engine.clone(),
420        constraints,
421        options,
422        partitions: None,
423    })
424}
425
426/// Validate the [`CreateTableExpr`] request.
427pub fn validate_create_expr(create: &CreateTableExpr) -> Result<()> {
428    // construct column list
429    let mut column_to_indices = HashMap::with_capacity(create.column_defs.len());
430    for (idx, column) in create.column_defs.iter().enumerate() {
431        if let Some(indices) = column_to_indices.get(&column.name) {
432            return InvalidSqlSnafu {
433                err_msg: format!(
434                    "column name `{}` is duplicated at index {} and {}",
435                    column.name, indices, idx
436                ),
437            }
438            .fail();
439        }
440        column_to_indices.insert(&column.name, idx);
441    }
442
443    // verify time_index exists
444    let _ = column_to_indices
445        .get(&create.time_index)
446        .with_context(|| InvalidSqlSnafu {
447            err_msg: format!(
448                "column name `{}` is not found in column list",
449                create.time_index
450            ),
451        })?;
452
453    // verify primary_key exists
454    for pk in &create.primary_keys {
455        let _ = column_to_indices
456            .get(&pk)
457            .with_context(|| InvalidSqlSnafu {
458                err_msg: format!("column name `{}` is not found in column list", pk),
459            })?;
460    }
461
462    // construct primary_key set
463    let mut pk_set = HashSet::new();
464    for pk in &create.primary_keys {
465        if !pk_set.insert(pk) {
466            return InvalidSqlSnafu {
467                err_msg: format!("column name `{}` is duplicated in primary keys", pk),
468            }
469            .fail();
470        }
471    }
472
473    // verify time index is not primary key
474    if pk_set.contains(&create.time_index) {
475        return InvalidSqlSnafu {
476            err_msg: format!(
477                "column name `{}` is both primary key and time index",
478                create.time_index
479            ),
480        }
481        .fail();
482    }
483
484    for column in &create.column_defs {
485        // verify do not contain interval type column issue #3235
486        if is_interval_type(&column.data_type()) {
487            return InvalidSqlSnafu {
488                err_msg: format!(
489                    "column name `{}` is interval type, which is not supported",
490                    column.name
491                ),
492            }
493            .fail();
494        }
495        // verify do not contain datetime type column issue #5489
496        if is_date_time_type(&column.data_type()) {
497            return InvalidSqlSnafu {
498                err_msg: format!(
499                    "column name `{}` is datetime type, which is not supported, please use `timestamp` type instead",
500                    column.name
501                ),
502            }
503            .fail();
504        }
505    }
506    Ok(())
507}
508
509fn validate_add_columns_expr(add_columns: &AddColumns) -> Result<()> {
510    for add_column in &add_columns.add_columns {
511        let Some(column_def) = &add_column.column_def else {
512            continue;
513        };
514        if is_date_time_type(&column_def.data_type()) {
515            return InvalidSqlSnafu {
516                    err_msg: format!("column name `{}` is datetime type, which is not supported, please use `timestamp` type instead", column_def.name),
517                }
518                .fail();
519        }
520        if is_interval_type(&column_def.data_type()) {
521            return InvalidSqlSnafu {
522                err_msg: format!(
523                    "column name `{}` is interval type, which is not supported",
524                    column_def.name
525                ),
526            }
527            .fail();
528        }
529    }
530    Ok(())
531}
532
533fn is_date_time_type(data_type: &ColumnDataType) -> bool {
534    matches!(data_type, ColumnDataType::Datetime)
535}
536
537fn is_interval_type(data_type: &ColumnDataType) -> bool {
538    matches!(
539        data_type,
540        ColumnDataType::IntervalYearMonth
541            | ColumnDataType::IntervalDayTime
542            | ColumnDataType::IntervalMonthDayNano
543    )
544}
545
546fn find_primary_keys(
547    columns: &[SqlColumn],
548    constraints: &[TableConstraint],
549) -> Result<Vec<String>> {
550    let columns_pk = columns
551        .iter()
552        .filter_map(|x| {
553            if x.options()
554                .iter()
555                .any(|o| matches!(o.option, ColumnOption::PrimaryKey(_)))
556            {
557                Some(x.name().value.clone())
558            } else {
559                None
560            }
561        })
562        .collect::<Vec<String>>();
563
564    ensure!(
565        columns_pk.len() <= 1,
566        IllegalPrimaryKeysDefSnafu {
567            msg: "not allowed to inline multiple primary keys in columns options"
568        }
569    );
570
571    let constraints_pk = constraints
572        .iter()
573        .filter_map(|constraint| match constraint {
574            TableConstraint::PrimaryKey { columns, .. } => {
575                Some(columns.iter().map(|ident| ident.value.clone()))
576            }
577            _ => None,
578        })
579        .flatten()
580        .collect::<Vec<String>>();
581
582    ensure!(
583        columns_pk.is_empty() || constraints_pk.is_empty(),
584        IllegalPrimaryKeysDefSnafu {
585            msg: "found definitions of primary keys in multiple places"
586        }
587    );
588
589    let mut primary_keys = Vec::with_capacity(columns_pk.len() + constraints_pk.len());
590    primary_keys.extend(columns_pk);
591    primary_keys.extend(constraints_pk);
592    Ok(primary_keys)
593}
594
595pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
596    let time_index = constraints
597        .iter()
598        .filter_map(|constraint| match constraint {
599            TableConstraint::TimeIndex { column, .. } => Some(&column.value),
600            _ => None,
601        })
602        .collect::<Vec<&String>>();
603    ensure!(
604        time_index.len() == 1,
605        InvalidSqlSnafu {
606            err_msg: "must have one and only one TimeIndex columns",
607        }
608    );
609    Ok(time_index[0].clone())
610}
611
612fn columns_to_expr(
613    column_defs: &[SqlColumn],
614    time_index: &str,
615    primary_keys: &[String],
616    timezone: Option<&Timezone>,
617) -> Result<Vec<api::v1::ColumnDef>> {
618    let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
619    column_schemas_to_defs(column_schemas, primary_keys)
620}
621
622fn columns_to_column_schemas(
623    columns: &[SqlColumn],
624    time_index: &str,
625    timezone: Option<&Timezone>,
626) -> Result<Vec<ColumnSchema>> {
627    columns
628        .iter()
629        .map(|c| column_to_schema(c, time_index, timezone).context(ParseSqlSnafu))
630        .collect::<Result<Vec<ColumnSchema>>>()
631}
632
633// TODO(weny): refactor this function to use `try_as_column_def`
634pub fn column_schemas_to_defs(
635    column_schemas: Vec<ColumnSchema>,
636    primary_keys: &[String],
637) -> Result<Vec<api::v1::ColumnDef>> {
638    let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
639        .iter()
640        .map(|c| {
641            ColumnDataTypeWrapper::try_from(c.data_type.clone())
642                .map(|w| w.to_parts())
643                .context(ColumnDataTypeSnafu)
644        })
645        .collect::<Result<Vec<_>>>()?;
646
647    column_schemas
648        .iter()
649        .zip(column_datatypes)
650        .map(|(schema, datatype)| {
651            let semantic_type = if schema.is_time_index() {
652                SemanticType::Timestamp
653            } else if primary_keys.contains(&schema.name) {
654                SemanticType::Tag
655            } else {
656                SemanticType::Field
657            } as i32;
658            let comment = schema
659                .metadata()
660                .get(COMMENT_KEY)
661                .cloned()
662                .unwrap_or_default();
663
664            Ok(api::v1::ColumnDef {
665                name: schema.name.clone(),
666                data_type: datatype.0 as i32,
667                is_nullable: schema.is_nullable(),
668                default_constraint: match schema.default_constraint() {
669                    None => vec![],
670                    Some(v) => {
671                        v.clone()
672                            .try_into()
673                            .context(ConvertColumnDefaultConstraintSnafu {
674                                column_name: &schema.name,
675                            })?
676                    }
677                },
678                semantic_type,
679                comment,
680                datatype_extension: datatype.1,
681                options: options_from_column_schema(schema),
682            })
683        })
684        .collect()
685}
686
687#[derive(Debug, Clone, PartialEq, Eq)]
688pub struct RepartitionRequest {
689    pub catalog_name: String,
690    pub schema_name: String,
691    pub table_name: String,
692    pub source: RepartitionSource,
693    pub into_exprs: Vec<Expr>,
694    pub options: OptionMap,
695}
696
697#[derive(Debug, Clone, PartialEq, Eq)]
698pub enum RepartitionSource {
699    Partitions {
700        from_exprs: Vec<Expr>,
701        target_partition_columns: Option<Vec<String>>,
702    },
703    Unpartitioned {
704        partition_columns: Vec<String>,
705    },
706}
707
708pub(crate) fn to_repartition_request(
709    alter_table: AlterTable,
710    query_ctx: &QueryContextRef,
711) -> Result<RepartitionRequest> {
712    let AlterTable {
713        table_name,
714        alter_operation,
715        options,
716    } = alter_table;
717
718    let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&table_name, query_ctx)
719        .map_err(BoxedError::new)
720        .context(ExternalSnafu)?;
721
722    let (source, into_exprs) = match alter_operation {
723        AlterTableOperation::Repartition { operation } => (
724            RepartitionSource::Partitions {
725                from_exprs: operation.from_exprs,
726                target_partition_columns: operation.partition_columns.map(|columns| {
727                    columns
728                        .into_iter()
729                        .map(|ident| ident.value)
730                        .collect::<Vec<_>>()
731                }),
732            },
733            operation.into_exprs,
734        ),
735        AlterTableOperation::Partition { partitions } => (
736            RepartitionSource::Unpartitioned {
737                partition_columns: partitions
738                    .column_list
739                    .into_iter()
740                    .map(|ident| ident.value)
741                    .collect(),
742            },
743            partitions.exprs,
744        ),
745        _ => {
746            return InvalidSqlSnafu {
747                err_msg: "expected REPARTITION or PARTITION operation",
748            }
749            .fail();
750        }
751    };
752
753    Ok(RepartitionRequest {
754        catalog_name,
755        schema_name,
756        table_name,
757        source,
758        into_exprs,
759        options,
760    })
761}
762
763/// Converts a SQL alter table statement into a gRPC alter table expression.
764pub(crate) fn to_alter_table_expr(
765    alter_table: AlterTable,
766    query_ctx: &QueryContextRef,
767) -> Result<AlterTableExpr> {
768    let (catalog_name, schema_name, table_name) =
769        table_idents_to_full_name(alter_table.table_name(), query_ctx)
770            .map_err(BoxedError::new)
771            .context(ExternalSnafu)?;
772
773    let kind = match alter_table.alter_operation {
774        AlterTableOperation::AddConstraint(_) => {
775            return NotSupportedSnafu {
776                feat: "ADD CONSTRAINT",
777            }
778            .fail();
779        }
780        AlterTableOperation::AddColumns { add_columns } => AlterTableKind::AddColumns(AddColumns {
781            add_columns: add_columns
782                .into_iter()
783                .map(|add_column| {
784                    let column_def = sql_column_def_to_grpc_column_def(
785                        &add_column.column_def,
786                        Some(&query_ctx.timezone()),
787                    )
788                    .map_err(BoxedError::new)
789                    .context(ExternalSnafu)?;
790                    if is_interval_type(&column_def.data_type()) {
791                        return NotSupportedSnafu {
792                            feat: "Add column with interval type",
793                        }
794                        .fail();
795                    }
796                    Ok(AddColumn {
797                        column_def: Some(column_def),
798                        location: add_column.location.as_ref().map(From::from),
799                        add_if_not_exists: add_column.add_if_not_exists,
800                    })
801                })
802                .collect::<Result<Vec<AddColumn>>>()?,
803        }),
804        AlterTableOperation::ModifyColumnType {
805            column_name,
806            target_type,
807        } => {
808            let target_type =
809                sql_data_type_to_concrete_data_type(&target_type, &Default::default())
810                    .context(ParseSqlSnafu)?;
811            let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
812                .map(|w| w.to_parts())
813                .context(ColumnDataTypeSnafu)?;
814            if is_interval_type(&target_type) {
815                return NotSupportedSnafu {
816                    feat: "Modify column type to interval type",
817                }
818                .fail();
819            }
820            AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
821                modify_column_types: vec![ModifyColumnType {
822                    column_name: column_name.value,
823                    target_type: target_type as i32,
824                    target_type_extension,
825                }],
826            })
827        }
828        AlterTableOperation::DropColumn { name } => AlterTableKind::DropColumns(DropColumns {
829            drop_columns: vec![DropColumn {
830                name: name.value.clone(),
831            }],
832        }),
833        AlterTableOperation::RenameTable { new_table_name } => {
834            AlterTableKind::RenameTable(RenameTable {
835                new_table_name: new_table_name.clone(),
836            })
837        }
838        AlterTableOperation::SetTableOptions { options } => {
839            AlterTableKind::SetTableOptions(SetTableOptions {
840                table_options: options.into_iter().map(Into::into).collect(),
841            })
842        }
843        AlterTableOperation::UnsetTableOptions { keys } => {
844            AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys })
845        }
846        AlterTableOperation::Repartition { .. } => {
847            return NotSupportedSnafu {
848                feat: "ALTER TABLE ... REPARTITION",
849            }
850            .fail();
851        }
852        AlterTableOperation::Partition { .. } => {
853            return NotSupportedSnafu {
854                feat: "ALTER TABLE ... PARTITION ON COLUMNS",
855            }
856            .fail();
857        }
858        AlterTableOperation::SetIndex { options } => {
859            let option = match options {
860                sql::statements::alter::SetIndexOperation::Fulltext {
861                    column_name,
862                    options,
863                } => SetIndex {
864                    options: Some(set_index::Options::Fulltext(SetFulltext {
865                        column_name: column_name.value,
866                        enable: options.enable,
867                        analyzer: match options.analyzer {
868                            FulltextAnalyzer::English => Analyzer::English.into(),
869                            FulltextAnalyzer::Chinese => Analyzer::Chinese.into(),
870                        },
871                        case_sensitive: options.case_sensitive,
872                        backend: match options.backend {
873                            FulltextBackend::Bloom => PbFulltextBackend::Bloom.into(),
874                            FulltextBackend::Tantivy => PbFulltextBackend::Tantivy.into(),
875                        },
876                        granularity: options.granularity as u64,
877                        false_positive_rate: options.false_positive_rate(),
878                    })),
879                },
880                sql::statements::alter::SetIndexOperation::Inverted { column_name } => SetIndex {
881                    options: Some(set_index::Options::Inverted(SetInverted {
882                        column_name: column_name.value,
883                    })),
884                },
885                sql::statements::alter::SetIndexOperation::Skipping {
886                    column_name,
887                    options,
888                } => SetIndex {
889                    options: Some(set_index::Options::Skipping(SetSkipping {
890                        column_name: column_name.value,
891                        enable: true,
892                        granularity: options.granularity as u64,
893                        false_positive_rate: options.false_positive_rate(),
894                        skipping_index_type: match options.index_type {
895                            SkippingIndexType::BloomFilter => {
896                                PbSkippingIndexType::BloomFilter.into()
897                            }
898                        },
899                    })),
900                },
901            };
902            AlterTableKind::SetIndexes(SetIndexes {
903                set_indexes: vec![option],
904            })
905        }
906        AlterTableOperation::UnsetIndex { options } => {
907            let option = match options {
908                sql::statements::alter::UnsetIndexOperation::Fulltext { column_name } => {
909                    UnsetIndex {
910                        options: Some(unset_index::Options::Fulltext(UnsetFulltext {
911                            column_name: column_name.value,
912                        })),
913                    }
914                }
915                sql::statements::alter::UnsetIndexOperation::Inverted { column_name } => {
916                    UnsetIndex {
917                        options: Some(unset_index::Options::Inverted(UnsetInverted {
918                            column_name: column_name.value,
919                        })),
920                    }
921                }
922                sql::statements::alter::UnsetIndexOperation::Skipping { column_name } => {
923                    UnsetIndex {
924                        options: Some(unset_index::Options::Skipping(UnsetSkipping {
925                            column_name: column_name.value,
926                        })),
927                    }
928                }
929            };
930
931            AlterTableKind::UnsetIndexes(UnsetIndexes {
932                unset_indexes: vec![option],
933            })
934        }
935        AlterTableOperation::DropDefaults { columns } => {
936            AlterTableKind::DropDefaults(DropDefaults {
937                drop_defaults: columns
938                    .into_iter()
939                    .map(|col| {
940                        let column_name = col.0.to_string();
941                        Ok(api::v1::DropDefault { column_name })
942                    })
943                    .collect::<Result<Vec<_>>>()?,
944            })
945        }
946        AlterTableOperation::SetDefaults { defaults } => AlterTableKind::SetDefaults(SetDefaults {
947            set_defaults: defaults
948                .into_iter()
949                .map(|col| {
950                    let column_name = col.column_name.to_string();
951                    let default_constraint = serde_json::to_string(&col.default_constraint)
952                        .context(EncodeJsonSnafu)?
953                        .into_bytes();
954                    Ok(api::v1::SetDefault {
955                        column_name,
956                        default_constraint,
957                    })
958                })
959                .collect::<Result<Vec<_>>>()?,
960        }),
961    };
962
963    Ok(AlterTableExpr {
964        catalog_name,
965        schema_name,
966        table_name,
967        kind: Some(kind),
968    })
969}
970
971/// Try to cast the `[AlterDatabase]` statement into gRPC `[AlterDatabaseExpr]`.
972pub fn to_alter_database_expr(
973    alter_database: AlterDatabase,
974    query_ctx: &QueryContextRef,
975) -> Result<AlterDatabaseExpr> {
976    let catalog = query_ctx.current_catalog();
977    let schema = alter_database.database_name;
978
979    let kind = match alter_database.alter_operation {
980        AlterDatabaseOperation::SetDatabaseOption { options } => {
981            let options = options.into_iter().map(Into::into).collect();
982            AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
983                set_database_options: options,
984            })
985        }
986        AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
987            AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys })
988        }
989    };
990
991    Ok(AlterDatabaseExpr {
992        catalog_name: catalog.to_string(),
993        schema_name: schema.to_string(),
994        kind: Some(kind),
995    })
996}
997
998/// Try to cast the `[CreateViewExpr]` statement into gRPC `[CreateViewExpr]`.
999pub fn to_create_view_expr(
1000    stmt: CreateView,
1001    logical_plan: Vec<u8>,
1002    table_names: Vec<TableName>,
1003    columns: Vec<String>,
1004    plan_columns: Vec<String>,
1005    definition: String,
1006    query_ctx: QueryContextRef,
1007) -> Result<CreateViewExpr> {
1008    let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx)
1009        .map_err(BoxedError::new)
1010        .context(ExternalSnafu)?;
1011
1012    let expr = CreateViewExpr {
1013        catalog_name,
1014        schema_name,
1015        view_name,
1016        logical_plan,
1017        create_if_not_exists: stmt.if_not_exists,
1018        or_replace: stmt.or_replace,
1019        table_names,
1020        columns,
1021        plan_columns,
1022        definition,
1023    };
1024
1025    Ok(expr)
1026}
1027
1028pub fn to_create_flow_task_expr(
1029    create_flow: CreateFlow,
1030    query_ctx: &QueryContextRef,
1031) -> Result<CreateFlowExpr> {
1032    // retrieve sink table name
1033    let sink_table_ref = object_name_to_table_reference(create_flow.sink_table_name.clone(), true)
1034        .with_context(|_| ConvertIdentifierSnafu {
1035            ident: create_flow.sink_table_name.to_string(),
1036        })?;
1037    let catalog = sink_table_ref
1038        .catalog()
1039        .unwrap_or(query_ctx.current_catalog())
1040        .to_string();
1041    let schema = sink_table_ref
1042        .schema()
1043        .map(|s| s.to_owned())
1044        .unwrap_or(query_ctx.current_schema());
1045
1046    let sink_table_name = TableName {
1047        catalog_name: catalog,
1048        schema_name: schema,
1049        table_name: sink_table_ref.table().to_string(),
1050    };
1051
1052    let source_table_names = extract_tables_from_query(&create_flow.query)
1053        .map(|name| {
1054            let reference =
1055                object_name_to_table_reference(name.clone(), true).with_context(|_| {
1056                    ConvertIdentifierSnafu {
1057                        ident: name.to_string(),
1058                    }
1059                })?;
1060            let catalog = reference
1061                .catalog()
1062                .unwrap_or(query_ctx.current_catalog())
1063                .to_string();
1064            let schema = reference
1065                .schema()
1066                .map(|s| s.to_string())
1067                .unwrap_or(query_ctx.current_schema());
1068
1069            let table_name = TableName {
1070                catalog_name: catalog,
1071                schema_name: schema,
1072                table_name: reference.table().to_string(),
1073            };
1074            Ok(table_name)
1075        })
1076        .collect::<Result<Vec<_>>>()?;
1077
1078    let eval_interval = create_flow.eval_interval;
1079
1080    Ok(CreateFlowExpr {
1081        catalog_name: query_ctx.current_catalog().to_string(),
1082        flow_name: sanitize_flow_name(create_flow.flow_name)?,
1083        source_table_names,
1084        sink_table_name: Some(sink_table_name),
1085        or_replace: create_flow.or_replace,
1086        create_if_not_exists: create_flow.if_not_exists,
1087        expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
1088        eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }),
1089        comment: create_flow.comment.unwrap_or_default(),
1090        sql: create_flow.query.to_string(),
1091        flow_options: stringify_flow_options(create_flow.flow_options)?,
1092    })
1093}
1094
1095fn stringify_flow_options(flow_options: OptionMap) -> Result<HashMap<String, String>> {
1096    let options_len = flow_options.len();
1097    let flow_options = flow_options.into_map();
1098    ensure!(
1099        flow_options.len() == options_len,
1100        InvalidSqlSnafu {
1101            err_msg: "flow options only support scalar string-compatible values".to_string(),
1102        }
1103    );
1104    Ok(flow_options)
1105}
1106
1107/// sanitize the flow name, remove possible quotes
1108fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
1109    ensure!(
1110        flow_name.0.len() == 1,
1111        InvalidFlowNameSnafu {
1112            name: flow_name.to_string(),
1113        }
1114    );
1115    // safety: we've checked flow_name.0 has exactly one element.
1116    Ok(flow_name.0.swap_remove(0).to_string_unquoted())
1117}
1118
1119#[cfg(test)]
1120mod tests {
1121    use std::collections::HashMap;
1122
1123    use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
1124    use datatypes::value::Value;
1125    use session::context::{QueryContext, QueryContextBuilder};
1126    use sql::dialect::GreptimeDbDialect;
1127    use sql::parser::{ParseOptions, ParserContext};
1128    use sql::statements::statement::Statement;
1129    use store_api::storage::ColumnDefaultConstraint;
1130
1131    use super::*;
1132
1133    #[test]
1134    fn test_create_flow_tql_expr() {
1135        let sql = r#"
1136CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1137TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#;
1138        let stmt =
1139            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1140
1141        assert!(
1142            stmt.is_err(),
1143            "Expected error for invalid TQL EVAL parameters: {:#?}",
1144            stmt
1145        );
1146
1147        let sql = r#"
1148CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1149TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests);"#;
1150        let stmt =
1151            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1152                .unwrap()
1153                .pop()
1154                .unwrap();
1155
1156        let Statement::CreateFlow(create_flow) = stmt else {
1157            unreachable!()
1158        };
1159        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1160
1161        let to_dot_sep =
1162            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1163        assert_eq!("calc_reqs", expr.flow_name);
1164        assert_eq!("greptime", expr.catalog_name);
1165        assert_eq!(
1166            "greptime.public.cnt_reqs",
1167            expr.sink_table_name.map(to_dot_sep).unwrap()
1168        );
1169        assert_eq!(1, expr.source_table_names.len());
1170        assert_eq!(
1171            "greptime.public.http_requests",
1172            to_dot_sep(expr.source_table_names[0].clone())
1173        );
1174        assert_eq!(
1175            r#"TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests)"#,
1176            expr.sql
1177        );
1178
1179        let sql = r#"
1180CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1181TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests{__schema__="greptime_private"});"#;
1182        let stmt =
1183            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1184                .unwrap()
1185                .pop()
1186                .unwrap();
1187        let Statement::CreateFlow(create_flow) = stmt else {
1188            unreachable!()
1189        };
1190        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1191        assert_eq!(1, expr.source_table_names.len());
1192        assert_eq!(
1193            "greptime.greptime_private.http_requests",
1194            to_dot_sep(expr.source_table_names[0].clone())
1195        );
1196
1197        let sql = r#"
1198CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1199TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests{__database__="greptime_private"});"#;
1200        let stmt =
1201            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1202                .unwrap()
1203                .pop()
1204                .unwrap();
1205        let Statement::CreateFlow(create_flow) = stmt else {
1206            unreachable!()
1207        };
1208        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1209        assert_eq!(1, expr.source_table_names.len());
1210        assert_eq!(
1211            "greptime.greptime_private.http_requests",
1212            to_dot_sep(expr.source_table_names[0].clone())
1213        );
1214    }
1215
1216    #[test]
1217    fn test_create_flow_tql_cte_source_tables() {
1218        let sql = r#"
1219CREATE FLOW calc_cte
1220SINK TO metric_cte_sink
1221EVAL INTERVAL '1m'
1222AS
1223WITH tql(ts, the_value) AS (
1224  TQL EVAL (now() - '1m'::interval, now(), '5s') metric_cte
1225)
1226SELECT * FROM tql;
1227"#;
1228
1229        let stmt =
1230            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1231                .unwrap()
1232                .pop()
1233                .unwrap();
1234
1235        let Statement::CreateFlow(create_flow) = stmt else {
1236            unreachable!()
1237        };
1238        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1239
1240        let to_dot_sep =
1241            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1242        assert_eq!(1, expr.source_table_names.len());
1243        assert_eq!(
1244            "greptime.public.metric_cte",
1245            to_dot_sep(expr.source_table_names[0].clone())
1246        );
1247    }
1248
1249    #[test]
1250    fn test_create_flow_tql_cte_source_tables_quoted_cte_name() {
1251        let sql = r#"
1252CREATE FLOW calc_cte
1253SINK TO metric_cte_sink
1254EVAL INTERVAL '1m'
1255AS
1256WITH "TQL"(ts, the_value) AS (
1257  TQL EVAL (now() - '1m'::interval, now(), '5s') metric_cte
1258)
1259SELECT * FROM "TQL";
1260"#;
1261
1262        let stmt =
1263            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1264                .unwrap()
1265                .pop()
1266                .unwrap();
1267
1268        let Statement::CreateFlow(create_flow) = stmt else {
1269            unreachable!()
1270        };
1271        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1272
1273        let to_dot_sep =
1274            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1275        assert_eq!(1, expr.source_table_names.len());
1276        assert_eq!(
1277            "greptime.public.metric_cte",
1278            to_dot_sep(expr.source_table_names[0].clone())
1279        );
1280    }
1281
1282    #[test]
1283    fn test_create_flow_tql_cte_source_tables_same_name() {
1284        let sql = r#"
1285CREATE FLOW calc_cte
1286SINK TO metric_cte_sink
1287EVAL INTERVAL '1m'
1288AS
1289WITH tql(ts, the_value) AS (
1290  TQL EVAL (now() - '1m'::interval, now(), '5s') tql
1291)
1292SELECT * FROM tql;
1293"#;
1294
1295        let stmt =
1296            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1297                .unwrap()
1298                .pop()
1299                .unwrap();
1300
1301        let Statement::CreateFlow(create_flow) = stmt else {
1302            unreachable!()
1303        };
1304        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1305
1306        let to_dot_sep =
1307            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1308        assert_eq!(1, expr.source_table_names.len());
1309        assert_eq!(
1310            "greptime.public.tql",
1311            to_dot_sep(expr.source_table_names[0].clone())
1312        );
1313    }
1314
1315    #[test]
1316    fn test_create_flow_expr() {
1317        let sql = r"
1318CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
1319SELECT
1320    DISTINCT number as dis
1321FROM
1322    distinct_basic;";
1323        let stmt =
1324            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1325                .unwrap()
1326                .pop()
1327                .unwrap();
1328
1329        let Statement::CreateFlow(create_flow) = stmt else {
1330            unreachable!()
1331        };
1332        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1333
1334        let to_dot_sep =
1335            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1336        assert_eq!("test_distinct_basic", expr.flow_name);
1337        assert_eq!("greptime", expr.catalog_name);
1338        assert_eq!(
1339            "greptime.public.out_distinct_basic",
1340            expr.sink_table_name.map(to_dot_sep).unwrap()
1341        );
1342        assert_eq!(1, expr.source_table_names.len());
1343        assert_eq!(
1344            "greptime.public.distinct_basic",
1345            to_dot_sep(expr.source_table_names[0].clone())
1346        );
1347        assert_eq!(
1348            r"SELECT
1349    DISTINCT number as dis
1350FROM
1351    distinct_basic",
1352            expr.sql
1353        );
1354
1355        let sql = r"
1356CREATE FLOW `task_2`
1357SINK TO schema_1.table_1
1358AS
1359SELECT max(c1), min(c2) FROM schema_2.table_2;";
1360        let stmt =
1361            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1362                .unwrap()
1363                .pop()
1364                .unwrap();
1365
1366        let Statement::CreateFlow(create_flow) = stmt else {
1367            unreachable!()
1368        };
1369        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1370
1371        let to_dot_sep =
1372            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1373        assert_eq!("task_2", expr.flow_name);
1374        assert_eq!("greptime", expr.catalog_name);
1375        assert_eq!(
1376            "greptime.schema_1.table_1",
1377            expr.sink_table_name.map(to_dot_sep).unwrap()
1378        );
1379        assert_eq!(1, expr.source_table_names.len());
1380        assert_eq!(
1381            "greptime.schema_2.table_2",
1382            to_dot_sep(expr.source_table_names[0].clone())
1383        );
1384        assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
1385        assert!(expr.flow_options.is_empty());
1386
1387        let sql = r"
1388CREATE FLOW task_3
1389SINK TO schema_1.table_1
1390WITH (defer_on_missing_source = 'true', foo = 'bar')
1391AS
1392SELECT max(c1), min(c2) FROM schema_2.table_2;";
1393        let stmt =
1394            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1395                .unwrap()
1396                .pop()
1397                .unwrap();
1398
1399        let Statement::CreateFlow(create_flow) = stmt else {
1400            unreachable!()
1401        };
1402        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1403        assert_eq!(
1404            expr.flow_options,
1405            HashMap::from([
1406                ("defer_on_missing_source".to_string(), "true".to_string()),
1407                ("foo".to_string(), "bar".to_string()),
1408            ])
1409        );
1410
1411        let sql = r"
1412CREATE FLOW task_4
1413SINK TO schema_1.table_1
1414WITH (defer_on_missing_source = true)
1415AS
1416SELECT max(c1), min(c2) FROM schema_2.table_2;";
1417        let stmt =
1418            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1419                .unwrap()
1420                .pop()
1421                .unwrap();
1422
1423        let Statement::CreateFlow(create_flow) = stmt else {
1424            unreachable!()
1425        };
1426        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1427        assert_eq!(
1428            expr.flow_options,
1429            HashMap::from([("defer_on_missing_source".to_string(), "true".to_string(),)])
1430        );
1431
1432        let sql = r"
1433CREATE FLOW task_5
1434SINK TO schema_1.table_1
1435WITH (defer_on_missing_source = [true])
1436AS
1437SELECT max(c1), min(c2) FROM schema_2.table_2;";
1438        let stmt =
1439            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1440                .unwrap()
1441                .pop()
1442                .unwrap();
1443
1444        let Statement::CreateFlow(create_flow) = stmt else {
1445            unreachable!()
1446        };
1447        let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
1448        assert!(res.is_err());
1449        assert!(
1450            res.unwrap_err()
1451                .to_string()
1452                .contains("flow options only support scalar string-compatible values")
1453        );
1454
1455        let sql = r"
1456CREATE FLOW abc.`task_2`
1457SINK TO schema_1.table_1
1458AS
1459SELECT max(c1), min(c2) FROM schema_2.table_2;";
1460        let stmt =
1461            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1462                .unwrap()
1463                .pop()
1464                .unwrap();
1465
1466        let Statement::CreateFlow(create_flow) = stmt else {
1467            unreachable!()
1468        };
1469        let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
1470
1471        assert!(res.is_err());
1472        assert!(
1473            res.unwrap_err()
1474                .to_string()
1475                .contains("Invalid flow name: abc.`task_2`")
1476        );
1477    }
1478
1479    #[test]
1480    fn test_create_to_expr() {
1481        let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(ttl='3days', write_buffer_size='1024KB');";
1482        let stmt =
1483            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1484                .unwrap()
1485                .pop()
1486                .unwrap();
1487
1488        let Statement::CreateTable(create_table) = stmt else {
1489            unreachable!()
1490        };
1491        let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1492        assert_eq!("3days", expr.table_options.get("ttl").unwrap());
1493        assert_eq!(
1494            "1.0MiB",
1495            expr.table_options.get("write_buffer_size").unwrap()
1496        );
1497    }
1498
1499    #[test]
1500    fn test_invalid_create_to_expr() {
1501        let cases = [
1502            // duplicate column declaration
1503            "CREATE TABLE monitor (host STRING primary key, ts TIMESTAMP TIME INDEX, some_column text, some_column string);",
1504            // duplicate primary key
1505            "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, some_column STRING, PRIMARY KEY (some_column, host, some_column));",
1506            // time index is primary key
1507            "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, PRIMARY KEY (host, ts));",
1508        ];
1509
1510        for sql in cases {
1511            let stmt = ParserContext::create_with_dialect(
1512                sql,
1513                &GreptimeDbDialect {},
1514                ParseOptions::default(),
1515            )
1516            .unwrap()
1517            .pop()
1518            .unwrap();
1519            let Statement::CreateTable(create_table) = stmt else {
1520                unreachable!()
1521            };
1522            create_to_expr(&create_table, &QueryContext::arc()).unwrap_err();
1523        }
1524    }
1525
1526    #[test]
1527    fn test_create_to_expr_with_default_timestamp_value() {
1528        let sql = "CREATE TABLE monitor (v double,ts TIMESTAMP default '2024-01-30T00:01:01',TIME INDEX (ts)) engine=mito;";
1529        let stmt =
1530            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1531                .unwrap()
1532                .pop()
1533                .unwrap();
1534
1535        let Statement::CreateTable(create_table) = stmt else {
1536            unreachable!()
1537        };
1538
1539        // query context with system timezone UTC.
1540        let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1541        let ts_column = &expr.column_defs[1];
1542        let constraint = assert_ts_column(ts_column);
1543        assert!(
1544            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1545                         if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1546        );
1547
1548        // query context with timezone `+08:00`
1549        let ctx = QueryContextBuilder::default()
1550            .timezone(Timezone::from_tz_string("+08:00").unwrap())
1551            .build()
1552            .into();
1553        let expr = create_to_expr(&create_table, &ctx).unwrap();
1554        let ts_column = &expr.column_defs[1];
1555        let constraint = assert_ts_column(ts_column);
1556        assert!(
1557            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1558                         if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1559        );
1560    }
1561
1562    fn assert_ts_column(ts_column: &api::v1::ColumnDef) -> ColumnDefaultConstraint {
1563        assert_eq!("ts", ts_column.name);
1564        assert_eq!(
1565            ColumnDataType::TimestampMillisecond as i32,
1566            ts_column.data_type
1567        );
1568        assert!(!ts_column.default_constraint.is_empty());
1569
1570        ColumnDefaultConstraint::try_from(&ts_column.default_constraint[..]).unwrap()
1571    }
1572
1573    #[test]
1574    fn test_to_alter_expr() {
1575        let sql = "ALTER DATABASE greptime SET key1='value1', key2='value2';";
1576        let stmt =
1577            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1578                .unwrap()
1579                .pop()
1580                .unwrap();
1581
1582        let Statement::AlterDatabase(alter_database) = stmt else {
1583            unreachable!()
1584        };
1585
1586        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1587        let kind = expr.kind.unwrap();
1588
1589        let AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
1590            set_database_options,
1591        }) = kind
1592        else {
1593            unreachable!()
1594        };
1595
1596        assert_eq!(2, set_database_options.len());
1597        assert_eq!("key1", set_database_options[0].key);
1598        assert_eq!("value1", set_database_options[0].value);
1599        assert_eq!("key2", set_database_options[1].key);
1600        assert_eq!("value2", set_database_options[1].value);
1601
1602        let sql = "ALTER DATABASE greptime UNSET key1, key2;";
1603        let stmt =
1604            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1605                .unwrap()
1606                .pop()
1607                .unwrap();
1608
1609        let Statement::AlterDatabase(alter_database) = stmt else {
1610            unreachable!()
1611        };
1612
1613        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1614        let kind = expr.kind.unwrap();
1615
1616        let AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys }) = kind else {
1617            unreachable!()
1618        };
1619
1620        assert_eq!(2, keys.len());
1621        assert!(keys.contains(&"key1".to_string()));
1622        assert!(keys.contains(&"key2".to_string()));
1623
1624        let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
1625        let stmt =
1626            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1627                .unwrap()
1628                .pop()
1629                .unwrap();
1630
1631        let Statement::AlterTable(alter_table) = stmt else {
1632            unreachable!()
1633        };
1634
1635        // query context with system timezone UTC.
1636        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1637        let kind = expr.kind.unwrap();
1638
1639        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1640            unreachable!()
1641        };
1642
1643        assert_eq!(1, add_columns.len());
1644        let ts_column = add_columns[0].column_def.clone().unwrap();
1645        let constraint = assert_ts_column(&ts_column);
1646        assert!(
1647            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1648                         if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1649        );
1650
1651        //
1652        // query context with timezone `+08:00`
1653        let ctx = QueryContextBuilder::default()
1654            .timezone(Timezone::from_tz_string("+08:00").unwrap())
1655            .build()
1656            .into();
1657        let expr = to_alter_table_expr(alter_table, &ctx).unwrap();
1658        let kind = expr.kind.unwrap();
1659
1660        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1661            unreachable!()
1662        };
1663
1664        assert_eq!(1, add_columns.len());
1665        let ts_column = add_columns[0].column_def.clone().unwrap();
1666        let constraint = assert_ts_column(&ts_column);
1667        assert!(
1668            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1669                         if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1670        );
1671    }
1672
1673    #[test]
1674    fn test_to_alter_modify_column_type_expr() {
1675        let sql = "ALTER TABLE monitor MODIFY COLUMN mem_usage STRING;";
1676        let stmt =
1677            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1678                .unwrap()
1679                .pop()
1680                .unwrap();
1681
1682        let Statement::AlterTable(alter_table) = stmt else {
1683            unreachable!()
1684        };
1685
1686        // query context with system timezone UTC.
1687        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1688        let kind = expr.kind.unwrap();
1689
1690        let AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
1691            modify_column_types,
1692        }) = kind
1693        else {
1694            unreachable!()
1695        };
1696
1697        assert_eq!(1, modify_column_types.len());
1698        let modify_column_type = &modify_column_types[0];
1699
1700        assert_eq!("mem_usage", modify_column_type.column_name);
1701        assert_eq!(
1702            ColumnDataType::String as i32,
1703            modify_column_type.target_type
1704        );
1705        assert!(modify_column_type.target_type_extension.is_none());
1706    }
1707
1708    #[test]
1709    fn test_to_repartition_request() {
1710        let sql = r#"
1711ALTER TABLE metrics REPARTITION (
1712  device_id < 100
1713) INTO (
1714  device_id < 100 AND area < 'South',
1715  device_id < 100 AND area >= 'South'
1716);"#;
1717        let stmt =
1718            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1719                .unwrap()
1720                .pop()
1721                .unwrap();
1722
1723        let Statement::AlterTable(alter_table) = stmt else {
1724            unreachable!()
1725        };
1726
1727        let request = to_repartition_request(alter_table, &QueryContext::arc()).unwrap();
1728        assert_eq!("greptime", request.catalog_name);
1729        assert_eq!("public", request.schema_name);
1730        assert_eq!("metrics", request.table_name);
1731        let RepartitionSource::Partitions {
1732            from_exprs,
1733            target_partition_columns,
1734        } = request.source
1735        else {
1736            unreachable!()
1737        };
1738        assert!(target_partition_columns.is_none());
1739        assert_eq!(
1740            from_exprs
1741                .into_iter()
1742                .map(|x| x.to_string())
1743                .collect::<Vec<_>>(),
1744            vec!["device_id < 100".to_string()]
1745        );
1746        assert_eq!(
1747            request
1748                .into_exprs
1749                .into_iter()
1750                .map(|x| x.to_string())
1751                .collect::<Vec<_>>(),
1752            vec![
1753                "device_id < 100 AND area < 'South'".to_string(),
1754                "device_id < 100 AND area >= 'South'".to_string()
1755            ]
1756        );
1757    }
1758
1759    #[test]
1760    fn test_to_repartition_request_with_target_partition_columns() {
1761        let sql = r#"
1762ALTER TABLE metrics REPARTITION (
1763  device_id < 100
1764) ON COLUMNS (device_id, area) INTO (
1765  device_id < 100 AND area < 'South',
1766  device_id < 100 AND area >= 'South'
1767);"#;
1768        let stmt =
1769            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1770                .unwrap()
1771                .pop()
1772                .unwrap();
1773
1774        let Statement::AlterTable(alter_table) = stmt else {
1775            unreachable!()
1776        };
1777
1778        let request = to_repartition_request(alter_table, &QueryContext::arc()).unwrap();
1779        let RepartitionSource::Partitions {
1780            target_partition_columns,
1781            ..
1782        } = request.source
1783        else {
1784            unreachable!()
1785        };
1786
1787        assert_eq!(
1788            target_partition_columns,
1789            Some(vec!["device_id".to_string(), "area".to_string()])
1790        );
1791    }
1792
1793    #[test]
1794    fn test_to_repartition_request_with_unpartitioned_source() {
1795        let sql = r#"
1796ALTER TABLE metrics PARTITION ON COLUMNS (device_id, area) (
1797  device_id < 100 AND area < 'South',
1798  device_id < 100 AND area >= 'South'
1799);"#;
1800        let stmt =
1801            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1802                .unwrap()
1803                .pop()
1804                .unwrap();
1805
1806        let Statement::AlterTable(alter_table) = stmt else {
1807            unreachable!()
1808        };
1809
1810        let request = to_repartition_request(alter_table, &QueryContext::arc()).unwrap();
1811        assert_eq!("greptime", request.catalog_name);
1812        assert_eq!("public", request.schema_name);
1813        assert_eq!("metrics", request.table_name);
1814        let RepartitionSource::Unpartitioned { partition_columns } = request.source else {
1815            unreachable!()
1816        };
1817        assert_eq!(partition_columns, vec!["device_id", "area"]);
1818        assert_eq!(
1819            request
1820                .into_exprs
1821                .into_iter()
1822                .map(|x| x.to_string())
1823                .collect::<Vec<_>>(),
1824            vec![
1825                "device_id < 100 AND area < 'South'".to_string(),
1826                "device_id < 100 AND area >= 'South'".to_string()
1827            ]
1828        );
1829    }
1830
1831    fn new_test_table_names() -> Vec<TableName> {
1832        vec![
1833            TableName {
1834                catalog_name: "greptime".to_string(),
1835                schema_name: "public".to_string(),
1836                table_name: "a_table".to_string(),
1837            },
1838            TableName {
1839                catalog_name: "greptime".to_string(),
1840                schema_name: "public".to_string(),
1841                table_name: "b_table".to_string(),
1842            },
1843        ]
1844    }
1845
1846    #[test]
1847    fn test_to_create_view_expr() {
1848        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
1849        let stmt =
1850            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1851                .unwrap()
1852                .pop()
1853                .unwrap();
1854
1855        let Statement::CreateView(stmt) = stmt else {
1856            unreachable!()
1857        };
1858
1859        let logical_plan = vec![1, 2, 3];
1860        let table_names = new_test_table_names();
1861        let columns = vec!["a".to_string()];
1862        let plan_columns = vec!["number".to_string()];
1863
1864        let expr = to_create_view_expr(
1865            stmt,
1866            logical_plan.clone(),
1867            table_names.clone(),
1868            columns.clone(),
1869            plan_columns.clone(),
1870            sql.to_string(),
1871            QueryContext::arc(),
1872        )
1873        .unwrap();
1874
1875        assert_eq!("greptime", expr.catalog_name);
1876        assert_eq!("public", expr.schema_name);
1877        assert_eq!("test", expr.view_name);
1878        assert!(!expr.create_if_not_exists);
1879        assert!(!expr.or_replace);
1880        assert_eq!(logical_plan, expr.logical_plan);
1881        assert_eq!(table_names, expr.table_names);
1882        assert_eq!(sql, expr.definition);
1883        assert_eq!(columns, expr.columns);
1884        assert_eq!(plan_columns, expr.plan_columns);
1885    }
1886
1887    #[test]
1888    fn test_to_create_view_expr_complex() {
1889        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test.test_view AS SELECT * FROM NUMBERS";
1890        let stmt =
1891            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1892                .unwrap()
1893                .pop()
1894                .unwrap();
1895
1896        let Statement::CreateView(stmt) = stmt else {
1897            unreachable!()
1898        };
1899
1900        let logical_plan = vec![1, 2, 3];
1901        let table_names = new_test_table_names();
1902        let columns = vec!["a".to_string()];
1903        let plan_columns = vec!["number".to_string()];
1904
1905        let expr = to_create_view_expr(
1906            stmt,
1907            logical_plan.clone(),
1908            table_names.clone(),
1909            columns.clone(),
1910            plan_columns.clone(),
1911            sql.to_string(),
1912            QueryContext::arc(),
1913        )
1914        .unwrap();
1915
1916        assert_eq!("greptime", expr.catalog_name);
1917        assert_eq!("test", expr.schema_name);
1918        assert_eq!("test_view", expr.view_name);
1919        assert!(expr.create_if_not_exists);
1920        assert!(expr.or_replace);
1921        assert_eq!(logical_plan, expr.logical_plan);
1922        assert_eq!(table_names, expr.table_names);
1923        assert_eq!(sql, expr.definition);
1924        assert_eq!(columns, expr.columns);
1925        assert_eq!(plan_columns, expr.plan_columns);
1926    }
1927
1928    #[test]
1929    fn test_expr_to_create() {
1930        let sql = r#"CREATE TABLE IF NOT EXISTS `tt` (
1931  `timestamp` TIMESTAMP(9) NOT NULL,
1932  `ip_address` STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),
1933  `username` STRING NULL,
1934  `http_method` STRING NULL INVERTED INDEX,
1935  `request_line` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),
1936  `protocol` STRING NULL,
1937  `status_code` INT NULL INVERTED INDEX,
1938  `response_size` BIGINT NULL,
1939  `message` STRING NULL,
1940  TIME INDEX (`timestamp`),
1941  PRIMARY KEY (`username`, `status_code`)
1942)
1943ENGINE=mito
1944WITH(
1945  append_mode = 'true'
1946)"#;
1947        let stmt =
1948            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1949                .unwrap()
1950                .pop()
1951                .unwrap();
1952
1953        let Statement::CreateTable(original_create) = stmt else {
1954            unreachable!()
1955        };
1956
1957        // Convert CreateTable -> CreateTableExpr -> CreateTable
1958        let expr = create_to_expr(&original_create, &QueryContext::arc()).unwrap();
1959
1960        let create_table = expr_to_create(&expr, Some('`')).unwrap();
1961        let new_sql = format!("{:#}", create_table);
1962        assert_eq!(sql, new_sql);
1963    }
1964}