sql/parsers/
create_parser.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod json;
16#[cfg(feature = "enterprise")]
17pub mod trigger;
18
19use std::collections::HashMap;
20
21use arrow_buffer::IntervalMonthDayNano;
22use common_catalog::consts::default_engine;
23use datafusion_common::ScalarValue;
24use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
25use datatypes::data_type::ConcreteDataType;
26use itertools::Itertools;
27use snafu::{OptionExt, ResultExt, ensure};
28use sqlparser::ast::{
29    ColumnOption, ColumnOptionDef, DataType, Expr, KeyOrIndexDisplay, NullsDistinctOption,
30    PrimaryKeyConstraint, UniqueConstraint,
31};
32use sqlparser::dialect::keywords::Keyword;
33use sqlparser::keywords::ALL_KEYWORDS;
34use sqlparser::parser::IsOptional::Mandatory;
35use sqlparser::parser::{Parser, ParserError};
36use sqlparser::tokenizer::{Token, TokenWithSpan, Word};
37use table::requests::validate_database_option;
38
39use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
40use crate::error::{
41    self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidFlowQuerySnafu,
42    InvalidIntervalSnafu, InvalidSqlSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result,
43    SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu,
44};
45use crate::parser::{FLOW, ParserContext};
46use crate::parsers::tql_parser;
47use crate::parsers::utils::{
48    self, parse_with_options, validate_column_fulltext_create_option,
49    validate_column_skipping_index_create_option, validate_column_vector_index_create_option,
50};
51use crate::statements::create::{
52    Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
53    CreateTableLike, CreateView, Partitions, SqlOrTql, TableConstraint, VECTOR_OPT_DIM,
54};
55use crate::statements::statement::Statement;
56use crate::statements::transform::type_alias::get_data_type_by_alias_name;
57use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
58use crate::util::{OptionValue, location_to_index, parse_option_string};
59
60pub const ENGINE: &str = "ENGINE";
61pub const MAXVALUE: &str = "MAXVALUE";
62pub const SINK: &str = "SINK";
63pub const EXPIRE: &str = "EXPIRE";
64pub const AFTER: &str = "AFTER";
65pub const INVERTED: &str = "INVERTED";
66pub const SKIPPING: &str = "SKIPPING";
67pub const VECTOR: &str = "VECTOR";
68
69pub type RawIntervalExpr = String;
70
71/// Parses create [table] statement
72impl<'a> ParserContext<'a> {
73    pub(crate) fn parse_create(&mut self) -> Result<Statement> {
74        match self.parser.peek_token().token {
75            Token::Word(w) => match w.keyword {
76                Keyword::TABLE => self.parse_create_table(),
77
78                Keyword::SCHEMA | Keyword::DATABASE => self.parse_create_database(),
79
80                Keyword::EXTERNAL => self.parse_create_external_table(),
81
82                Keyword::OR => {
83                    let _ = self.parser.next_token();
84                    self.parser
85                        .expect_keyword(Keyword::REPLACE)
86                        .context(SyntaxSnafu)?;
87                    match self.parser.next_token().token {
88                        Token::Word(w) => match w.keyword {
89                            Keyword::VIEW => self.parse_create_view(true),
90                            Keyword::NoKeyword => {
91                                let uppercase = w.value.to_uppercase();
92                                match uppercase.as_str() {
93                                    FLOW => self.parse_create_flow(true),
94                                    _ => self.unsupported(w.to_string()),
95                                }
96                            }
97                            _ => self.unsupported(w.to_string()),
98                        },
99                        _ => self.unsupported(w.to_string()),
100                    }
101                }
102
103                Keyword::VIEW => {
104                    let _ = self.parser.next_token();
105                    self.parse_create_view(false)
106                }
107
108                #[cfg(feature = "enterprise")]
109                Keyword::TRIGGER => {
110                    let _ = self.parser.next_token();
111                    self.parse_create_trigger()
112                }
113
114                Keyword::NoKeyword => {
115                    let _ = self.parser.next_token();
116                    let uppercase = w.value.to_uppercase();
117                    match uppercase.as_str() {
118                        FLOW => self.parse_create_flow(false),
119                        _ => self.unsupported(w.to_string()),
120                    }
121                }
122                _ => self.unsupported(w.to_string()),
123            },
124            unexpected => self.unsupported(unexpected.to_string()),
125        }
126    }
127
128    /// Parse `CREAVE VIEW` statement.
129    fn parse_create_view(&mut self, or_replace: bool) -> Result<Statement> {
130        let if_not_exists = self.parse_if_not_exist()?;
131        let view_name = self.intern_parse_table_name()?;
132
133        let columns = self.parse_view_columns()?;
134
135        self.parser
136            .expect_keyword(Keyword::AS)
137            .context(SyntaxSnafu)?;
138
139        let query = self.parse_query()?;
140
141        Ok(Statement::CreateView(CreateView {
142            name: view_name,
143            columns,
144            or_replace,
145            query: Box::new(query),
146            if_not_exists,
147        }))
148    }
149
150    fn parse_view_columns(&mut self) -> Result<Vec<Ident>> {
151        let mut columns = vec![];
152        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
153            return Ok(columns);
154        }
155
156        loop {
157            let name = self.parse_column_name().context(SyntaxSnafu)?;
158
159            columns.push(name);
160
161            let comma = self.parser.consume_token(&Token::Comma);
162            if self.parser.consume_token(&Token::RParen) {
163                // allow a trailing comma, even though it's not in standard
164                break;
165            } else if !comma {
166                return self.expected("',' or ')' after column name", self.parser.peek_token());
167            }
168        }
169
170        Ok(columns)
171    }
172
173    fn parse_create_external_table(&mut self) -> Result<Statement> {
174        let _ = self.parser.next_token();
175        self.parser
176            .expect_keyword(Keyword::TABLE)
177            .context(SyntaxSnafu)?;
178        let if_not_exists = self.parse_if_not_exist()?;
179        let table_name = self.intern_parse_table_name()?;
180        let (columns, constraints) = self.parse_columns()?;
181        if !columns.is_empty() {
182            validate_time_index(&columns, &constraints)?;
183        }
184
185        let engine = self.parse_table_engine(common_catalog::consts::FILE_ENGINE)?;
186        let options = self.parse_create_table_options()?;
187        Ok(Statement::CreateExternalTable(CreateExternalTable {
188            name: table_name,
189            columns,
190            constraints,
191            options,
192            if_not_exists,
193            engine,
194        }))
195    }
196
197    fn parse_create_database(&mut self) -> Result<Statement> {
198        let _ = self.parser.next_token();
199        let if_not_exists = self.parse_if_not_exist()?;
200        let database_name = self.parse_object_name().context(error::UnexpectedSnafu {
201            expected: "a database name",
202            actual: self.peek_token_as_string(),
203        })?;
204        let database_name = Self::canonicalize_object_name(database_name)?;
205
206        let options = self
207            .parser
208            .parse_options(Keyword::WITH)
209            .context(SyntaxSnafu)?
210            .into_iter()
211            .map(parse_option_string)
212            .collect::<Result<HashMap<String, OptionValue>>>()?;
213
214        for key in options.keys() {
215            ensure!(
216                validate_database_option(key),
217                InvalidDatabaseOptionSnafu { key: key.clone() }
218            );
219        }
220        if let Some(append_mode) = options.get("append_mode").and_then(|x| x.as_string())
221            && append_mode == "true"
222            && options.contains_key("merge_mode")
223        {
224            return InvalidDatabaseOptionSnafu {
225                key: "merge_mode".to_string(),
226            }
227            .fail();
228        }
229
230        Ok(Statement::CreateDatabase(CreateDatabase {
231            name: database_name,
232            if_not_exists,
233            options: OptionMap::new(options),
234        }))
235    }
236
237    fn parse_create_table(&mut self) -> Result<Statement> {
238        let _ = self.parser.next_token();
239
240        let if_not_exists = self.parse_if_not_exist()?;
241
242        let table_name = self.intern_parse_table_name()?;
243
244        if self.parser.parse_keyword(Keyword::LIKE) {
245            let source_name = self.intern_parse_table_name()?;
246
247            return Ok(Statement::CreateTableLike(CreateTableLike {
248                table_name,
249                source_name,
250            }));
251        }
252
253        let (columns, constraints) = self.parse_columns()?;
254        validate_time_index(&columns, &constraints)?;
255
256        let partitions = self.parse_partitions()?;
257        if let Some(partitions) = &partitions {
258            validate_partitions(&columns, partitions)?;
259        }
260
261        let engine = self.parse_table_engine(default_engine())?;
262        let options = self.parse_create_table_options()?;
263        let create_table = CreateTable {
264            if_not_exists,
265            name: table_name,
266            columns,
267            engine,
268            constraints,
269            options,
270            table_id: 0, // table id is assigned by catalog manager
271            partitions,
272        };
273
274        Ok(Statement::CreateTable(create_table))
275    }
276
277    /// "CREATE FLOW" clause
278    fn parse_create_flow(&mut self, or_replace: bool) -> Result<Statement> {
279        let if_not_exists = self.parse_if_not_exist()?;
280
281        let flow_name = self.intern_parse_table_name()?;
282
283        // make `SINK` case in-sensitive
284        if let Token::Word(word) = self.parser.peek_token().token
285            && word.value.eq_ignore_ascii_case(SINK)
286        {
287            self.parser.next_token();
288        } else {
289            Err(ParserError::ParserError(
290                "Expect `SINK` keyword".to_string(),
291            ))
292            .context(SyntaxSnafu)?
293        }
294        self.parser
295            .expect_keyword(Keyword::TO)
296            .context(SyntaxSnafu)?;
297
298        let output_table_name = self.intern_parse_table_name()?;
299
300        let expire_after = if let Token::Word(w1) = &self.parser.peek_token().token
301            && w1.value.eq_ignore_ascii_case(EXPIRE)
302        {
303            self.parser.next_token();
304            if let Token::Word(w2) = &self.parser.peek_token().token
305                && w2.value.eq_ignore_ascii_case(AFTER)
306            {
307                self.parser.next_token();
308                Some(self.parse_interval_no_month("EXPIRE AFTER")?)
309            } else {
310                None
311            }
312        } else {
313            None
314        };
315
316        let eval_interval = if self
317            .parser
318            .consume_tokens(&[Token::make_keyword("EVAL"), Token::make_keyword("INTERVAL")])
319        {
320            Some(self.parse_interval_no_month("EVAL INTERVAL")?)
321        } else {
322            None
323        };
324
325        let comment = if self.parser.parse_keyword(Keyword::COMMENT) {
326            match self.parser.next_token() {
327                TokenWithSpan {
328                    token: Token::SingleQuotedString(value, ..),
329                    ..
330                } => Some(value),
331                unexpected => {
332                    return self
333                        .parser
334                        .expected("string", unexpected)
335                        .context(SyntaxSnafu);
336                }
337            }
338        } else {
339            None
340        };
341
342        self.parser
343            .expect_keyword(Keyword::AS)
344            .context(SyntaxSnafu)?;
345
346        let query = Box::new(self.parse_flow_sql_or_tql(true)?);
347
348        Ok(Statement::CreateFlow(CreateFlow {
349            flow_name,
350            sink_table_name: output_table_name,
351            or_replace,
352            if_not_exists,
353            expire_after,
354            eval_interval,
355            comment,
356            query,
357        }))
358    }
359
360    fn parse_flow_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
361        let start_loc = self.parser.peek_token().span.start;
362        let start_index = location_to_index(self.sql, &start_loc);
363
364        let starts_with_with = matches!(
365            self.parser.peek_token().token,
366            Token::Word(w) if w.keyword == Keyword::WITH
367        );
368
369        // only accept sql or tql
370        let query = match self.parser.peek_token().token {
371            Token::Word(w) => match w.keyword {
372                Keyword::SELECT => self.parse_query(),
373                Keyword::WITH => self.parse_with_tql_with_now(require_now_expr),
374                Keyword::NoKeyword
375                    if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
376                {
377                    self.parse_tql(require_now_expr)
378                }
379
380                _ => self.unsupported(self.peek_token_as_string()),
381            },
382            _ => self.unsupported(self.peek_token_as_string()),
383        }?;
384
385        if starts_with_with {
386            let Statement::Query(query) = &query else {
387                return InvalidFlowQuerySnafu {
388                    reason: "Expect a query after WITH".to_string(),
389                }
390                .fail();
391            };
392
393            if !utils::is_simple_tql_cte_query(query) {
394                return InvalidFlowQuerySnafu {
395                    reason: "WITH is only supported for the simplest TQL CTE in CREATE FLOW"
396                        .to_string(),
397                }
398                .fail();
399            }
400        }
401
402        let end_token = self.parser.peek_token();
403
404        let raw_query = if end_token == Token::EOF {
405            &self.sql[start_index..]
406        } else {
407            let end_loc = end_token.span.end;
408            let end_index = location_to_index(self.sql, &end_loc);
409            &self.sql[start_index..end_index.min(self.sql.len())]
410        };
411        let raw_query = raw_query.trim_end_matches(";");
412
413        let query = SqlOrTql::try_from_statement(query, raw_query)?;
414        Ok(query)
415    }
416
417    /// Parse the interval expr to duration in seconds.
418    fn parse_interval_no_month(&mut self, context: &str) -> Result<i64> {
419        let interval = self.parse_interval_month_day_nano()?.0;
420        if interval.months != 0 {
421            return InvalidIntervalSnafu {
422                reason: format!("Interval with months is not allowed in {context}"),
423            }
424            .fail();
425        }
426        Ok(
427            interval.nanoseconds / 1_000_000_000
428                + interval.days as i64 * 60 * 60 * 24
429                + interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, // 1 month=365.25/12=30.44 days
430                                                                       // this is to keep the same as https://docs.rs/humantime/latest/humantime/fn.parse_duration.html
431                                                                       // which we use in database to parse i.e. ttl interval and many other intervals
432        )
433    }
434
435    /// Parse interval expr to [`IntervalMonthDayNano`].
436    fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> {
437        let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
438        let raw_interval_expr = interval_expr.to_string();
439        let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone(), false)?
440            .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
441            .ok()
442            .with_context(|| InvalidIntervalSnafu {
443                reason: format!("cannot cast {} to interval type", interval_expr),
444            })?;
445        if let ScalarValue::IntervalMonthDayNano(Some(interval)) = interval {
446            Ok((interval, raw_interval_expr))
447        } else {
448            unreachable!()
449        }
450    }
451
452    fn parse_if_not_exist(&mut self) -> Result<bool> {
453        match self.parser.peek_token().token {
454            Token::Word(w) if Keyword::IF != w.keyword => return Ok(false),
455            _ => {}
456        }
457
458        if self.parser.parse_keywords(&[Keyword::IF, Keyword::NOT]) {
459            return self
460                .parser
461                .expect_keyword(Keyword::EXISTS)
462                .map(|_| true)
463                .context(UnexpectedSnafu {
464                    expected: "EXISTS",
465                    actual: self.peek_token_as_string(),
466                });
467        }
468
469        if self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]) {
470            return UnsupportedSnafu { keyword: "EXISTS" }.fail();
471        }
472
473        Ok(false)
474    }
475
476    fn parse_create_table_options(&mut self) -> Result<OptionMap> {
477        parse_with_options(&mut self.parser)
478    }
479
480    /// "PARTITION ON COLUMNS (...)" clause
481    fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
482        if !self.parser.parse_keyword(Keyword::PARTITION) {
483            return Ok(None);
484        }
485        self.parser
486            .expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
487            .context(error::UnexpectedSnafu {
488                expected: "ON, COLUMNS",
489                actual: self.peek_token_as_string(),
490            })?;
491
492        let raw_column_list = self
493            .parser
494            .parse_parenthesized_column_list(Mandatory, false)
495            .context(error::SyntaxSnafu)?;
496        let column_list = raw_column_list
497            .into_iter()
498            .map(Self::canonicalize_identifier)
499            .collect();
500
501        let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
502
503        Ok(Some(Partitions { column_list, exprs }))
504    }
505
506    fn parse_partition_entry(&mut self) -> Result<Expr> {
507        self.parser.parse_expr().context(error::SyntaxSnafu)
508    }
509
510    /// Parse a comma-separated list wrapped by "()", and of which all items accepted by `F`
511    fn parse_comma_separated<T, F>(&mut self, mut f: F) -> Result<Vec<T>>
512    where
513        F: FnMut(&mut ParserContext<'a>) -> Result<T>,
514    {
515        self.parser
516            .expect_token(&Token::LParen)
517            .context(error::UnexpectedSnafu {
518                expected: "(",
519                actual: self.peek_token_as_string(),
520            })?;
521
522        let mut values = vec![];
523        while self.parser.peek_token() != Token::RParen {
524            values.push(f(self)?);
525            if !self.parser.consume_token(&Token::Comma) {
526                break;
527            }
528        }
529
530        self.parser
531            .expect_token(&Token::RParen)
532            .context(error::UnexpectedSnafu {
533                expected: ")",
534                actual: self.peek_token_as_string(),
535            })?;
536
537        Ok(values)
538    }
539
540    /// Parse the columns and constraints.
541    fn parse_columns(&mut self) -> Result<(Vec<Column>, Vec<TableConstraint>)> {
542        let mut columns = vec![];
543        let mut constraints = vec![];
544        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
545            return Ok((columns, constraints));
546        }
547
548        loop {
549            if let Some(constraint) = self.parse_optional_table_constraint()? {
550                constraints.push(constraint);
551            } else if let Token::Word(_) = self.parser.peek_token().token {
552                self.parse_column(&mut columns, &mut constraints)?;
553            } else {
554                return self.expected(
555                    "column name or constraint definition",
556                    self.parser.peek_token(),
557                );
558            }
559            let comma = self.parser.consume_token(&Token::Comma);
560            if self.parser.consume_token(&Token::RParen) {
561                // allow a trailing comma, even though it's not in standard
562                break;
563            } else if !comma {
564                return self.expected(
565                    "',' or ')' after column definition",
566                    self.parser.peek_token(),
567                );
568            }
569        }
570
571        Ok((columns, constraints))
572    }
573
574    fn parse_column(
575        &mut self,
576        columns: &mut Vec<Column>,
577        constraints: &mut Vec<TableConstraint>,
578    ) -> Result<()> {
579        let mut column = self.parse_column_def()?;
580
581        let mut time_index_opt_idx = None;
582        for (index, opt) in column.options().iter().enumerate() {
583            if let ColumnOption::DialectSpecific(tokens) = &opt.option
584                && matches!(
585                    &tokens[..],
586                    [
587                        Token::Word(Word {
588                            keyword: Keyword::TIME,
589                            ..
590                        }),
591                        Token::Word(Word {
592                            keyword: Keyword::INDEX,
593                            ..
594                        })
595                    ]
596                )
597            {
598                ensure!(
599                    time_index_opt_idx.is_none(),
600                    InvalidColumnOptionSnafu {
601                        name: column.name().to_string(),
602                        msg: "duplicated time index",
603                    }
604                );
605                time_index_opt_idx = Some(index);
606
607                let constraint = TableConstraint::TimeIndex {
608                    column: Ident::new(column.name().value.clone()),
609                };
610                constraints.push(constraint);
611            }
612        }
613
614        if let Some(index) = time_index_opt_idx {
615            ensure!(
616                !column.options().contains(&ColumnOptionDef {
617                    option: ColumnOption::Null,
618                    name: None,
619                }),
620                InvalidColumnOptionSnafu {
621                    name: column.name().to_string(),
622                    msg: "time index column can't be null",
623                }
624            );
625
626            // The timestamp type may be an alias type, we have to retrieve the actual type.
627            let data_type = get_unalias_type(column.data_type());
628            ensure!(
629                matches!(data_type, DataType::Timestamp(_, _)),
630                InvalidColumnOptionSnafu {
631                    name: column.name().to_string(),
632                    msg: "time index column data type should be timestamp",
633                }
634            );
635
636            let not_null_opt = ColumnOptionDef {
637                option: ColumnOption::NotNull,
638                name: None,
639            };
640
641            if !column.options().contains(&not_null_opt) {
642                column.mut_options().push(not_null_opt);
643            }
644
645            let _ = column.mut_options().remove(index);
646        }
647
648        columns.push(column);
649
650        Ok(())
651    }
652
653    /// Parse the column name and check if it's valid.
654    fn parse_column_name(&mut self) -> std::result::Result<Ident, ParserError> {
655        let name = self.parser.parse_identifier()?;
656        if name.quote_style.is_none() &&
657        // "ALL_KEYWORDS" are sorted.
658            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()
659        {
660            return Err(ParserError::ParserError(format!(
661                "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
662                &name.value
663            )));
664        }
665
666        Ok(name)
667    }
668
669    pub fn parse_column_def(&mut self) -> Result<Column> {
670        let name = self.parse_column_name().context(SyntaxSnafu)?;
671        let parser = &mut self.parser;
672
673        ensure!(
674            !(name.quote_style.is_none() &&
675            // "ALL_KEYWORDS" are sorted.
676            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()),
677            InvalidSqlSnafu {
678                msg: format!(
679                    "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
680                    &name.value
681                ),
682            }
683        );
684
685        let mut extensions = ColumnExtensions::default();
686
687        let data_type = parser.parse_data_type().context(SyntaxSnafu)?;
688        // Must immediately parse the JSON datatype format because it is closely after the "JSON"
689        // datatype, like this: "JSON(format = ...)".
690        if matches!(data_type, DataType::JSON) {
691            extensions.json_datatype_options = json::parse_json_datatype_options(parser)?;
692        }
693
694        let mut options = vec![];
695        loop {
696            if parser.parse_keyword(Keyword::CONSTRAINT) {
697                let name = Some(parser.parse_identifier().context(SyntaxSnafu)?);
698                if let Some(option) = Self::parse_optional_column_option(parser)? {
699                    options.push(ColumnOptionDef { name, option });
700                } else {
701                    return parser
702                        .expected(
703                            "constraint details after CONSTRAINT <name>",
704                            parser.peek_token(),
705                        )
706                        .context(SyntaxSnafu);
707                }
708            } else if let Some(option) = Self::parse_optional_column_option(parser)? {
709                options.push(ColumnOptionDef { name: None, option });
710            } else if !Self::parse_column_extensions(parser, &name, &data_type, &mut extensions)? {
711                break;
712            };
713        }
714
715        Ok(Column {
716            column_def: ColumnDef {
717                name: Self::canonicalize_identifier(name),
718                data_type,
719                options,
720            },
721            extensions,
722        })
723    }
724
725    fn parse_optional_column_option(parser: &mut Parser<'_>) -> Result<Option<ColumnOption>> {
726        if parser.parse_keywords(&[Keyword::CHARACTER, Keyword::SET]) {
727            Ok(Some(ColumnOption::CharacterSet(
728                parser.parse_object_name(false).context(SyntaxSnafu)?,
729            )))
730        } else if parser.parse_keywords(&[Keyword::NOT, Keyword::NULL]) {
731            Ok(Some(ColumnOption::NotNull))
732        } else if parser.parse_keywords(&[Keyword::COMMENT]) {
733            match parser.next_token() {
734                TokenWithSpan {
735                    token: Token::SingleQuotedString(value, ..),
736                    ..
737                } => Ok(Some(ColumnOption::Comment(value))),
738                unexpected => parser.expected("string", unexpected).context(SyntaxSnafu),
739            }
740        } else if parser.parse_keyword(Keyword::NULL) {
741            Ok(Some(ColumnOption::Null))
742        } else if parser.parse_keyword(Keyword::DEFAULT) {
743            Ok(Some(ColumnOption::Default(
744                parser.parse_expr().context(SyntaxSnafu)?,
745            )))
746        } else if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
747            Ok(Some(ColumnOption::PrimaryKey(PrimaryKeyConstraint {
748                name: None,
749                index_name: None,
750                index_type: None,
751                columns: vec![],
752                index_options: vec![],
753                characteristics: None,
754            })))
755        } else if parser.parse_keyword(Keyword::UNIQUE) {
756            Ok(Some(ColumnOption::Unique(UniqueConstraint {
757                name: None,
758                index_name: None,
759                index_type_display: KeyOrIndexDisplay::None,
760                index_type: None,
761                columns: vec![],
762                index_options: vec![],
763                characteristics: None,
764                nulls_distinct: NullsDistinctOption::None,
765            })))
766        } else if parser.parse_keywords(&[Keyword::TIME, Keyword::INDEX]) {
767            // Use a DialectSpecific option for time index
768            Ok(Some(ColumnOption::DialectSpecific(vec![
769                Token::Word(Word {
770                    value: "TIME".to_string(),
771                    quote_style: None,
772                    keyword: Keyword::TIME,
773                }),
774                Token::Word(Word {
775                    value: "INDEX".to_string(),
776                    quote_style: None,
777                    keyword: Keyword::INDEX,
778                }),
779            ])))
780        } else {
781            Ok(None)
782        }
783    }
784
785    /// Parse a column option extensions.
786    ///
787    /// This function will handle:
788    /// - Vector type
789    /// - Indexes
790    fn parse_column_extensions(
791        parser: &mut Parser<'_>,
792        column_name: &Ident,
793        column_type: &DataType,
794        column_extensions: &mut ColumnExtensions,
795    ) -> Result<bool> {
796        if let DataType::Custom(name, tokens) = column_type
797            && name.0.len() == 1
798            && &name.0[0].to_string_unquoted().to_uppercase() == "VECTOR"
799        {
800            ensure!(
801                tokens.len() == 1,
802                InvalidColumnOptionSnafu {
803                    name: column_name.to_string(),
804                    msg: "VECTOR type should have dimension",
805                }
806            );
807
808            let dimension =
809                tokens[0]
810                    .parse::<u32>()
811                    .ok()
812                    .with_context(|| InvalidColumnOptionSnafu {
813                        name: column_name.to_string(),
814                        msg: "dimension should be a positive integer",
815                    })?;
816
817            let options = OptionMap::from([(VECTOR_OPT_DIM.to_string(), dimension.to_string())]);
818            column_extensions.vector_options = Some(options);
819        }
820
821        // parse index options in column definition
822        let mut is_index_declared = false;
823
824        // skipping index
825        if let Token::Word(word) = parser.peek_token().token
826            && word.value.eq_ignore_ascii_case(SKIPPING)
827        {
828            parser.next_token();
829            // Consume `INDEX` keyword
830            ensure!(
831                parser.parse_keyword(Keyword::INDEX),
832                InvalidColumnOptionSnafu {
833                    name: column_name.to_string(),
834                    msg: "expect INDEX after SKIPPING keyword",
835                }
836            );
837            ensure!(
838                column_extensions.skipping_index_options.is_none(),
839                InvalidColumnOptionSnafu {
840                    name: column_name.to_string(),
841                    msg: "duplicated SKIPPING index option",
842                }
843            );
844
845            let options = parser
846                .parse_options(Keyword::WITH)
847                .context(error::SyntaxSnafu)?
848                .into_iter()
849                .map(parse_option_string)
850                .collect::<Result<Vec<_>>>()?;
851
852            for (key, _) in options.iter() {
853                ensure!(
854                    validate_column_skipping_index_create_option(key),
855                    InvalidColumnOptionSnafu {
856                        name: column_name.to_string(),
857                        msg: format!("invalid SKIPPING INDEX option: {key}"),
858                    }
859                );
860            }
861
862            let options = OptionMap::new(options);
863            column_extensions.skipping_index_options = Some(options);
864            is_index_declared |= true;
865        }
866
867        // fulltext index
868        if parser.parse_keyword(Keyword::FULLTEXT) {
869            // Consume `INDEX` keyword
870            ensure!(
871                parser.parse_keyword(Keyword::INDEX),
872                InvalidColumnOptionSnafu {
873                    name: column_name.to_string(),
874                    msg: "expect INDEX after FULLTEXT keyword",
875                }
876            );
877
878            ensure!(
879                column_extensions.fulltext_index_options.is_none(),
880                InvalidColumnOptionSnafu {
881                    name: column_name.to_string(),
882                    msg: "duplicated FULLTEXT INDEX option",
883                }
884            );
885
886            let column_type = get_unalias_type(column_type);
887            let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
888            ensure!(
889                data_type == ConcreteDataType::string_datatype(),
890                InvalidColumnOptionSnafu {
891                    name: column_name.to_string(),
892                    msg: "FULLTEXT index only supports string type",
893                }
894            );
895
896            let options = parser
897                .parse_options(Keyword::WITH)
898                .context(error::SyntaxSnafu)?
899                .into_iter()
900                .map(parse_option_string)
901                .collect::<Result<Vec<_>>>()?;
902
903            for (key, _) in options.iter() {
904                ensure!(
905                    validate_column_fulltext_create_option(key),
906                    InvalidColumnOptionSnafu {
907                        name: column_name.to_string(),
908                        msg: format!("invalid FULLTEXT INDEX option: {key}"),
909                    }
910                );
911            }
912
913            let options = OptionMap::new(options);
914            column_extensions.fulltext_index_options = Some(options);
915            is_index_declared |= true;
916        }
917
918        // inverted index
919        if let Token::Word(word) = parser.peek_token().token
920            && word.value.eq_ignore_ascii_case(INVERTED)
921        {
922            parser.next_token();
923            // Consume `INDEX` keyword
924            ensure!(
925                parser.parse_keyword(Keyword::INDEX),
926                InvalidColumnOptionSnafu {
927                    name: column_name.to_string(),
928                    msg: "expect INDEX after INVERTED keyword",
929                }
930            );
931
932            ensure!(
933                column_extensions.inverted_index_options.is_none(),
934                InvalidColumnOptionSnafu {
935                    name: column_name.to_string(),
936                    msg: "duplicated INVERTED index option",
937                }
938            );
939
940            // inverted index doesn't have options, skipping `WITH`
941            // try cache `WITH` and throw error
942            let with_token = parser.peek_token();
943            ensure!(
944                with_token.token
945                    != Token::Word(Word {
946                        value: "WITH".to_string(),
947                        keyword: Keyword::WITH,
948                        quote_style: None,
949                    }),
950                InvalidColumnOptionSnafu {
951                    name: column_name.to_string(),
952                    msg: "INVERTED index doesn't support options",
953                }
954            );
955
956            column_extensions.inverted_index_options = Some(OptionMap::default());
957            is_index_declared |= true;
958        }
959
960        // vector index
961        if let Token::Word(word) = parser.peek_token().token
962            && word.value.eq_ignore_ascii_case(VECTOR)
963        {
964            parser.next_token();
965            // Consume `INDEX` keyword
966            ensure!(
967                parser.parse_keyword(Keyword::INDEX),
968                InvalidColumnOptionSnafu {
969                    name: column_name.to_string(),
970                    msg: "expect INDEX after VECTOR keyword",
971                }
972            );
973
974            ensure!(
975                column_extensions.vector_index_options.is_none(),
976                InvalidColumnOptionSnafu {
977                    name: column_name.to_string(),
978                    msg: "duplicated VECTOR INDEX option",
979                }
980            );
981
982            // Check that column is a vector type
983            let column_type = get_unalias_type(column_type);
984            let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
985            ensure!(
986                matches!(data_type, ConcreteDataType::Vector(_)),
987                InvalidColumnOptionSnafu {
988                    name: column_name.to_string(),
989                    msg: "VECTOR INDEX only supports Vector type columns",
990                }
991            );
992
993            let options = parser
994                .parse_options(Keyword::WITH)
995                .context(error::SyntaxSnafu)?
996                .into_iter()
997                .map(parse_option_string)
998                .collect::<Result<Vec<_>>>()?;
999
1000            for (key, _) in options.iter() {
1001                ensure!(
1002                    validate_column_vector_index_create_option(key),
1003                    InvalidColumnOptionSnafu {
1004                        name: column_name.to_string(),
1005                        msg: format!("invalid VECTOR INDEX option: {key}"),
1006                    }
1007                );
1008            }
1009
1010            let options = OptionMap::new(options);
1011            column_extensions.vector_index_options = Some(options);
1012            is_index_declared |= true;
1013        }
1014
1015        Ok(is_index_declared)
1016    }
1017
1018    fn parse_optional_table_constraint(&mut self) -> Result<Option<TableConstraint>> {
1019        match self.parser.next_token() {
1020            TokenWithSpan {
1021                token: Token::Word(w),
1022                ..
1023            } if w.keyword == Keyword::PRIMARY => {
1024                self.parser
1025                    .expect_keyword(Keyword::KEY)
1026                    .context(error::UnexpectedSnafu {
1027                        expected: "KEY",
1028                        actual: self.peek_token_as_string(),
1029                    })?;
1030                let raw_columns = self
1031                    .parser
1032                    .parse_parenthesized_column_list(Mandatory, false)
1033                    .context(error::SyntaxSnafu)?;
1034                let columns = raw_columns
1035                    .into_iter()
1036                    .map(Self::canonicalize_identifier)
1037                    .collect();
1038                Ok(Some(TableConstraint::PrimaryKey { columns }))
1039            }
1040            TokenWithSpan {
1041                token: Token::Word(w),
1042                ..
1043            } if w.keyword == Keyword::TIME => {
1044                self.parser
1045                    .expect_keyword(Keyword::INDEX)
1046                    .context(error::UnexpectedSnafu {
1047                        expected: "INDEX",
1048                        actual: self.peek_token_as_string(),
1049                    })?;
1050
1051                let raw_columns = self
1052                    .parser
1053                    .parse_parenthesized_column_list(Mandatory, false)
1054                    .context(error::SyntaxSnafu)?;
1055                let mut columns = raw_columns
1056                    .into_iter()
1057                    .map(Self::canonicalize_identifier)
1058                    .collect::<Vec<_>>();
1059
1060                ensure!(
1061                    columns.len() == 1,
1062                    InvalidTimeIndexSnafu {
1063                        msg: "it should contain only one column in time index",
1064                    }
1065                );
1066
1067                Ok(Some(TableConstraint::TimeIndex {
1068                    column: columns.pop().unwrap(),
1069                }))
1070            }
1071            _ => {
1072                self.parser.prev_token();
1073                Ok(None)
1074            }
1075        }
1076    }
1077
1078    /// Parses the set of valid formats
1079    fn parse_table_engine(&mut self, default: &str) -> Result<String> {
1080        if !self.consume_token(ENGINE) {
1081            return Ok(default.to_string());
1082        }
1083
1084        self.parser
1085            .expect_token(&Token::Eq)
1086            .context(error::UnexpectedSnafu {
1087                expected: "=",
1088                actual: self.peek_token_as_string(),
1089            })?;
1090
1091        let token = self.parser.next_token();
1092        if let Token::Word(w) = token.token {
1093            Ok(w.value)
1094        } else {
1095            self.expected("'Engine' is missing", token)
1096        }
1097    }
1098}
1099
1100fn validate_time_index(columns: &[Column], constraints: &[TableConstraint]) -> Result<()> {
1101    let time_index_constraints: Vec<_> = constraints
1102        .iter()
1103        .filter_map(|c| match c {
1104            TableConstraint::TimeIndex { column } => Some(column),
1105            _ => None,
1106        })
1107        .unique()
1108        .collect();
1109
1110    ensure!(!time_index_constraints.is_empty(), MissingTimeIndexSnafu);
1111    ensure!(
1112        time_index_constraints.len() == 1,
1113        InvalidTimeIndexSnafu {
1114            msg: format!(
1115                "expected only one time index constraint but actual {}",
1116                time_index_constraints.len()
1117            ),
1118        }
1119    );
1120
1121    // It's safe to use time_index_constraints[0][0],
1122    // we already check the bound above.
1123    let time_index_column_ident = &time_index_constraints[0];
1124    let time_index_column = columns
1125        .iter()
1126        .find(|c| c.name().value == *time_index_column_ident.value)
1127        .with_context(|| InvalidTimeIndexSnafu {
1128            msg: format!(
1129                "time index column {} not found in columns",
1130                time_index_column_ident
1131            ),
1132        })?;
1133
1134    let time_index_data_type = get_unalias_type(time_index_column.data_type());
1135    ensure!(
1136        matches!(time_index_data_type, DataType::Timestamp(_, _)),
1137        InvalidColumnOptionSnafu {
1138            name: time_index_column.name().to_string(),
1139            msg: "time index column data type should be timestamp",
1140        }
1141    );
1142
1143    Ok(())
1144}
1145
1146fn get_unalias_type(data_type: &DataType) -> DataType {
1147    match data_type {
1148        DataType::Custom(name, tokens) if name.0.len() == 1 && tokens.is_empty() => {
1149            if let Some(real_type) =
1150                get_data_type_by_alias_name(name.0[0].to_string_unquoted().as_str())
1151            {
1152                real_type
1153            } else {
1154                data_type.clone()
1155            }
1156        }
1157        _ => data_type.clone(),
1158    }
1159}
1160
1161fn validate_partitions(columns: &[Column], partitions: &Partitions) -> Result<()> {
1162    let partition_columns = ensure_partition_columns_defined(columns, partitions)?;
1163
1164    ensure_exprs_are_binary(&partitions.exprs, &partition_columns)?;
1165
1166    Ok(())
1167}
1168
1169/// Ensure all exprs are binary expr and all the columns are defined in the column list.
1170fn ensure_exprs_are_binary(exprs: &[Expr], columns: &[&Column]) -> Result<()> {
1171    for expr in exprs {
1172        // The first level must be binary expr
1173        if let Expr::BinaryOp { left, op: _, right } = expr {
1174            ensure_one_expr(left, columns)?;
1175            ensure_one_expr(right, columns)?;
1176        } else {
1177            return error::InvalidSqlSnafu {
1178                msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1179            }
1180            .fail();
1181        }
1182    }
1183    Ok(())
1184}
1185
1186/// Check if the expr is a binary expr, an ident or a literal value.
1187/// If is ident, then check it is in the column list.
1188/// This recursive function is intended to be used by [ensure_exprs_are_binary].
1189fn ensure_one_expr(expr: &Expr, columns: &[&Column]) -> Result<()> {
1190    match expr {
1191        Expr::BinaryOp { left, op: _, right } => {
1192            ensure_one_expr(left, columns)?;
1193            ensure_one_expr(right, columns)?;
1194            Ok(())
1195        }
1196        Expr::Identifier(ident) => {
1197            let column_name = &ident.value;
1198            ensure!(
1199                columns.iter().any(|c| &c.name().value == column_name),
1200                error::InvalidSqlSnafu {
1201                    msg: format!(
1202                        "Column {:?} in rule expr is not referenced in PARTITION ON",
1203                        column_name
1204                    ),
1205                }
1206            );
1207            Ok(())
1208        }
1209        Expr::Value(_) => Ok(()),
1210        Expr::UnaryOp { expr, .. } => {
1211            ensure_one_expr(expr, columns)?;
1212            Ok(())
1213        }
1214        _ => error::InvalidSqlSnafu {
1215            msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1216        }
1217        .fail(),
1218    }
1219}
1220
1221/// Ensure that all columns used in "PARTITION ON COLUMNS" are defined in create table.
1222fn ensure_partition_columns_defined<'a>(
1223    columns: &'a [Column],
1224    partitions: &'a Partitions,
1225) -> Result<Vec<&'a Column>> {
1226    partitions
1227        .column_list
1228        .iter()
1229        .map(|x| {
1230            let x = ParserContext::canonicalize_identifier(x.clone());
1231            // Normally the columns in "create table" won't be too many,
1232            // a linear search to find the target every time is fine.
1233            columns
1234                .iter()
1235                .find(|c| *c.name().value == x.value)
1236                .context(error::InvalidSqlSnafu {
1237                    msg: format!("Partition column {:?} not defined", x.value),
1238                })
1239        })
1240        .collect::<Result<Vec<&Column>>>()
1241}
1242
1243#[cfg(test)]
1244mod tests {
1245    use std::assert_matches::assert_matches;
1246    use std::collections::HashMap;
1247
1248    use common_catalog::consts::FILE_ENGINE;
1249    use common_error::ext::ErrorExt;
1250    use sqlparser::ast::ColumnOption::NotNull;
1251    use sqlparser::ast::{BinaryOperator, Expr, ObjectName, ObjectNamePart, Value};
1252    use sqlparser::dialect::GenericDialect;
1253    use sqlparser::tokenizer::Tokenizer;
1254
1255    use super::*;
1256    use crate::dialect::GreptimeDbDialect;
1257    use crate::parser::ParseOptions;
1258
1259    #[test]
1260    fn test_parse_create_table_like() {
1261        let sql = "CREATE TABLE t1 LIKE t2";
1262        let stmts =
1263            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1264                .unwrap();
1265
1266        assert_eq!(1, stmts.len());
1267        match &stmts[0] {
1268            Statement::CreateTableLike(c) => {
1269                assert_eq!(c.table_name.to_string(), "t1");
1270                assert_eq!(c.source_name.to_string(), "t2");
1271            }
1272            _ => unreachable!(),
1273        }
1274    }
1275
1276    #[test]
1277    fn test_validate_external_table_options() {
1278        let sql = "CREATE EXTERNAL TABLE city (
1279            host string,
1280            ts timestamp,
1281            cpu float64 default 0,
1282            memory float64,
1283            TIME INDEX (ts),
1284            PRIMARY KEY(ts, host)
1285        ) with(location='/var/data/city.csv',format='csv',foo='bar');";
1286
1287        let result =
1288            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1289        assert!(matches!(
1290            result,
1291            Err(error::Error::InvalidTableOption { .. })
1292        ));
1293    }
1294
1295    #[test]
1296    fn test_parse_create_external_table() {
1297        struct Test<'a> {
1298            sql: &'a str,
1299            expected_table_name: &'a str,
1300            expected_options: HashMap<&'a str, &'a str>,
1301            expected_engine: &'a str,
1302            expected_if_not_exist: bool,
1303        }
1304
1305        let tests = [
1306            Test {
1307                sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
1308                expected_table_name: "city",
1309                expected_options: HashMap::from([
1310                    ("location", "/var/data/city.csv"),
1311                    ("format", "csv"),
1312                ]),
1313                expected_engine: FILE_ENGINE,
1314                expected_if_not_exist: false,
1315            },
1316            Test {
1317                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');",
1318                expected_table_name: "city",
1319                expected_options: HashMap::from([
1320                    ("location", "/var/data/city.csv"),
1321                    ("format", "csv"),
1322                ]),
1323                expected_engine: "foo",
1324                expected_if_not_exist: true,
1325            },
1326            Test {
1327                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv','compaction.type'='bar');",
1328                expected_table_name: "city",
1329                expected_options: HashMap::from([
1330                    ("location", "/var/data/city.csv"),
1331                    ("format", "csv"),
1332                    ("compaction.type", "bar"),
1333                ]),
1334                expected_engine: "foo",
1335                expected_if_not_exist: true,
1336            },
1337        ];
1338
1339        for test in tests {
1340            let stmts = ParserContext::create_with_dialect(
1341                test.sql,
1342                &GreptimeDbDialect {},
1343                ParseOptions::default(),
1344            )
1345            .unwrap();
1346            assert_eq!(1, stmts.len());
1347            match &stmts[0] {
1348                Statement::CreateExternalTable(c) => {
1349                    assert_eq!(c.name.to_string(), test.expected_table_name.to_string());
1350                    assert_eq!(c.options.to_str_map(), test.expected_options);
1351                    assert_eq!(c.if_not_exists, test.expected_if_not_exist);
1352                    assert_eq!(c.engine, test.expected_engine);
1353                }
1354                _ => unreachable!(),
1355            }
1356        }
1357    }
1358
1359    #[test]
1360    fn test_parse_create_external_table_with_schema() {
1361        let sql = "CREATE EXTERNAL TABLE city (
1362            host string,
1363            ts timestamp,
1364            cpu float32 default 0,
1365            memory float64,
1366            TIME INDEX (ts),
1367            PRIMARY KEY(ts, host),
1368        ) with(location='/var/data/city.csv',format='csv');";
1369
1370        let options = HashMap::from([("location", "/var/data/city.csv"), ("format", "csv")]);
1371
1372        let stmts =
1373            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1374                .unwrap();
1375        assert_eq!(1, stmts.len());
1376        match &stmts[0] {
1377            Statement::CreateExternalTable(c) => {
1378                assert_eq!(c.name.to_string(), "city");
1379                assert_eq!(c.options.to_str_map(), options);
1380
1381                let columns = &c.columns;
1382                assert_column_def(&columns[0].column_def, "host", "STRING");
1383                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
1384                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
1385                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
1386
1387                let constraints = &c.constraints;
1388                assert_eq!(
1389                    &constraints[0],
1390                    &TableConstraint::TimeIndex {
1391                        column: Ident::new("ts"),
1392                    }
1393                );
1394                assert_eq!(
1395                    &constraints[1],
1396                    &TableConstraint::PrimaryKey {
1397                        columns: vec![Ident::new("ts"), Ident::new("host")]
1398                    }
1399                );
1400            }
1401            _ => unreachable!(),
1402        }
1403    }
1404
1405    #[test]
1406    fn test_parse_create_database() {
1407        let sql = "create database";
1408        let result =
1409            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1410        assert!(
1411            result
1412                .unwrap_err()
1413                .to_string()
1414                .contains("Unexpected token while parsing SQL statement")
1415        );
1416
1417        let sql = "create database prometheus";
1418        let stmts =
1419            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1420                .unwrap();
1421
1422        assert_eq!(1, stmts.len());
1423        match &stmts[0] {
1424            Statement::CreateDatabase(c) => {
1425                assert_eq!(c.name.to_string(), "prometheus");
1426                assert!(!c.if_not_exists);
1427            }
1428            _ => unreachable!(),
1429        }
1430
1431        let sql = "create database if not exists prometheus";
1432        let stmts =
1433            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1434                .unwrap();
1435
1436        assert_eq!(1, stmts.len());
1437        match &stmts[0] {
1438            Statement::CreateDatabase(c) => {
1439                assert_eq!(c.name.to_string(), "prometheus");
1440                assert!(c.if_not_exists);
1441            }
1442            _ => unreachable!(),
1443        }
1444
1445        let sql = "CREATE DATABASE `fOo`";
1446        let result =
1447            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1448        let stmts = result.unwrap();
1449        match &stmts.last().unwrap() {
1450            Statement::CreateDatabase(c) => {
1451                assert_eq!(c.name, vec![Ident::with_quote('`', "fOo")].into());
1452                assert!(!c.if_not_exists);
1453            }
1454            _ => unreachable!(),
1455        }
1456
1457        let sql = "CREATE DATABASE prometheus with (ttl='1h');";
1458        let result =
1459            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1460        let stmts = result.unwrap();
1461        match &stmts[0] {
1462            Statement::CreateDatabase(c) => {
1463                assert_eq!(c.name.to_string(), "prometheus");
1464                assert!(!c.if_not_exists);
1465                assert_eq!(c.options.get("ttl").unwrap(), "1h");
1466            }
1467            _ => unreachable!(),
1468        }
1469    }
1470
1471    #[test]
1472    fn test_parse_create_flow_more_testcases() {
1473        use pretty_assertions::assert_eq;
1474        fn parse_create_flow(sql: &str) -> CreateFlow {
1475            let stmts = ParserContext::create_with_dialect(
1476                sql,
1477                &GreptimeDbDialect {},
1478                ParseOptions::default(),
1479            )
1480            .unwrap();
1481            assert_eq!(1, stmts.len());
1482            match &stmts[0] {
1483                Statement::CreateFlow(c) => c.clone(),
1484                _ => unreachable!(),
1485            }
1486        }
1487        struct CreateFlowWoutQuery {
1488            /// Flow name
1489            pub flow_name: ObjectName,
1490            /// Output (sink) table name
1491            pub sink_table_name: ObjectName,
1492            /// Whether to replace existing task
1493            pub or_replace: bool,
1494            /// Create if not exist
1495            pub if_not_exists: bool,
1496            /// `EXPIRE AFTER`
1497            /// Duration in second as `i64`
1498            pub expire_after: Option<i64>,
1499            /// Comment string
1500            pub comment: Option<String>,
1501        }
1502        let testcases = vec![
1503            (
1504                r"
1505CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1506SINK TO schema_1.table_1
1507EXPIRE AFTER INTERVAL '5 minutes'
1508COMMENT 'test comment'
1509AS
1510SELECT max(c1), min(c2) FROM schema_2.table_2;",
1511                CreateFlowWoutQuery {
1512                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1513                    sink_table_name: ObjectName::from(vec![
1514                        Ident::new("schema_1"),
1515                        Ident::new("table_1"),
1516                    ]),
1517                    or_replace: true,
1518                    if_not_exists: true,
1519                    expire_after: Some(300),
1520                    comment: Some("test comment".to_string()),
1521                },
1522            ),
1523            (
1524                r"
1525CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1526SINK TO schema_1.table_1
1527EXPIRE AFTER INTERVAL '300 s'
1528COMMENT 'test comment'
1529AS
1530SELECT max(c1), min(c2) FROM schema_2.table_2;",
1531                CreateFlowWoutQuery {
1532                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1533                    sink_table_name: ObjectName::from(vec![
1534                        Ident::new("schema_1"),
1535                        Ident::new("table_1"),
1536                    ]),
1537                    or_replace: true,
1538                    if_not_exists: true,
1539                    expire_after: Some(300),
1540                    comment: Some("test comment".to_string()),
1541                },
1542            ),
1543            (
1544                r"
1545CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1546SINK TO schema_1.table_1
1547EXPIRE AFTER '5 minutes'
1548COMMENT 'test comment'
1549AS
1550SELECT max(c1), min(c2) FROM schema_2.table_2;",
1551                CreateFlowWoutQuery {
1552                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1553                    sink_table_name: ObjectName::from(vec![
1554                        Ident::new("schema_1"),
1555                        Ident::new("table_1"),
1556                    ]),
1557                    or_replace: true,
1558                    if_not_exists: true,
1559                    expire_after: Some(300),
1560                    comment: Some("test comment".to_string()),
1561                },
1562            ),
1563            (
1564                r"
1565CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1566SINK TO schema_1.table_1
1567EXPIRE AFTER '300 s'
1568COMMENT 'test comment'
1569AS
1570SELECT max(c1), min(c2) FROM schema_2.table_2;",
1571                CreateFlowWoutQuery {
1572                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1573                    sink_table_name: ObjectName::from(vec![
1574                        Ident::new("schema_1"),
1575                        Ident::new("table_1"),
1576                    ]),
1577                    or_replace: true,
1578                    if_not_exists: true,
1579                    expire_after: Some(300),
1580                    comment: Some("test comment".to_string()),
1581                },
1582            ),
1583            (
1584                r"
1585CREATE FLOW `task_2`
1586SINK TO schema_1.table_1
1587EXPIRE AFTER '2 days 1h 2 min'
1588AS
1589SELECT max(c1), min(c2) FROM schema_2.table_2;",
1590                CreateFlowWoutQuery {
1591                    flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_2")]),
1592                    sink_table_name: ObjectName::from(vec![
1593                        Ident::new("schema_1"),
1594                        Ident::new("table_1"),
1595                    ]),
1596                    or_replace: false,
1597                    if_not_exists: false,
1598                    expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1599                    comment: None,
1600                },
1601            ),
1602            (
1603                r"
1604create flow `task_3`
1605sink to schema_1.table_1
1606expire after '10 minutes'
1607as
1608select max(c1), min(c2) from schema_2.table_2;",
1609                CreateFlowWoutQuery {
1610                    flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_3")]),
1611                    sink_table_name: ObjectName::from(vec![
1612                        Ident::new("schema_1"),
1613                        Ident::new("table_1"),
1614                    ]),
1615                    or_replace: false,
1616                    if_not_exists: false,
1617                    expire_after: Some(600), // 10 minutes in seconds
1618                    comment: None,
1619                },
1620            ),
1621            (
1622                r"
1623create or replace flow if not exists task_4
1624sink to schema_1.table_1
1625expire after interval '2 hours'
1626comment 'lowercase test'
1627as
1628select max(c1), min(c2) from schema_2.table_2;",
1629                CreateFlowWoutQuery {
1630                    flow_name: ObjectName::from(vec![Ident::new("task_4")]),
1631                    sink_table_name: ObjectName::from(vec![
1632                        Ident::new("schema_1"),
1633                        Ident::new("table_1"),
1634                    ]),
1635                    or_replace: true,
1636                    if_not_exists: true,
1637                    expire_after: Some(7200), // 2 hours in seconds
1638                    comment: Some("lowercase test".to_string()),
1639                },
1640            ),
1641        ];
1642
1643        for (sql, expected) in testcases {
1644            let create_task = parse_create_flow(sql);
1645
1646            let expected = CreateFlow {
1647                flow_name: expected.flow_name,
1648                sink_table_name: expected.sink_table_name,
1649                or_replace: expected.or_replace,
1650                if_not_exists: expected.if_not_exists,
1651                expire_after: expected.expire_after,
1652                eval_interval: None,
1653                comment: expected.comment,
1654                // ignore query parse result
1655                query: create_task.query.clone(),
1656            };
1657
1658            assert_eq!(create_task, expected, "input sql is:\n{sql}");
1659            let show_create = create_task.to_string();
1660            let recreated = parse_create_flow(&show_create);
1661            assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1662        }
1663    }
1664
1665    #[test]
1666    fn test_parse_create_flow() {
1667        use pretty_assertions::assert_eq;
1668        fn parse_create_flow(sql: &str) -> CreateFlow {
1669            let stmts = ParserContext::create_with_dialect(
1670                sql,
1671                &GreptimeDbDialect {},
1672                ParseOptions::default(),
1673            )
1674            .unwrap();
1675            assert_eq!(1, stmts.len());
1676            match &stmts[0] {
1677                Statement::CreateFlow(c) => c.clone(),
1678                _ => panic!("{:?}", stmts[0]),
1679            }
1680        }
1681        struct CreateFlowWoutQuery {
1682            /// Flow name
1683            pub flow_name: ObjectName,
1684            /// Output (sink) table name
1685            pub sink_table_name: ObjectName,
1686            /// Whether to replace existing task
1687            pub or_replace: bool,
1688            /// Create if not exist
1689            pub if_not_exists: bool,
1690            /// `EXPIRE AFTER`
1691            /// Duration in second as `i64`
1692            pub expire_after: Option<i64>,
1693            /// Duration for flow evaluation interval
1694            /// Duration in seconds as `i64`
1695            /// If not set, flow will be evaluated based on time window size and other args.
1696            pub eval_interval: Option<i64>,
1697            /// Comment string
1698            pub comment: Option<String>,
1699        }
1700
1701        // create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE AFTER` and `COMMENT`
1702        let testcases = vec![
1703            (
1704                r"
1705CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1706SINK TO schema_1.table_1
1707EXPIRE AFTER INTERVAL '5 minutes'
1708COMMENT 'test comment'
1709AS
1710SELECT max(c1), min(c2) FROM schema_2.table_2;",
1711                CreateFlowWoutQuery {
1712                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1713                    sink_table_name: ObjectName(vec![
1714                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1715                        ObjectNamePart::Identifier(Ident::new("table_1")),
1716                    ]),
1717                    or_replace: true,
1718                    if_not_exists: true,
1719                    expire_after: Some(300),
1720                    eval_interval: None,
1721                    comment: Some("test comment".to_string()),
1722                },
1723            ),
1724            (
1725                r"
1726CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1727SINK TO schema_1.table_1
1728EXPIRE AFTER INTERVAL '300 s'
1729COMMENT 'test comment'
1730AS
1731SELECT max(c1), min(c2) FROM schema_2.table_2;",
1732                CreateFlowWoutQuery {
1733                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1734                    sink_table_name: ObjectName(vec![
1735                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1736                        ObjectNamePart::Identifier(Ident::new("table_1")),
1737                    ]),
1738                    or_replace: true,
1739                    if_not_exists: true,
1740                    expire_after: Some(300),
1741                    eval_interval: None,
1742                    comment: Some("test comment".to_string()),
1743                },
1744            ),
1745            (
1746                r"
1747CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1748SINK TO schema_1.table_1
1749EXPIRE AFTER '5 minutes'
1750EVAL INTERVAL '10 seconds'
1751COMMENT 'test comment'
1752AS
1753SELECT max(c1), min(c2) FROM schema_2.table_2;",
1754                CreateFlowWoutQuery {
1755                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1756                    sink_table_name: ObjectName(vec![
1757                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1758                        ObjectNamePart::Identifier(Ident::new("table_1")),
1759                    ]),
1760                    or_replace: true,
1761                    if_not_exists: true,
1762                    expire_after: Some(300),
1763                    eval_interval: Some(10),
1764                    comment: Some("test comment".to_string()),
1765                },
1766            ),
1767            (
1768                r"
1769CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1770SINK TO schema_1.table_1
1771EXPIRE AFTER '5 minutes'
1772EVAL INTERVAL INTERVAL '10 seconds'
1773COMMENT 'test comment'
1774AS
1775SELECT max(c1), min(c2) FROM schema_2.table_2;",
1776                CreateFlowWoutQuery {
1777                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1778                    sink_table_name: ObjectName(vec![
1779                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1780                        ObjectNamePart::Identifier(Ident::new("table_1")),
1781                    ]),
1782                    or_replace: true,
1783                    if_not_exists: true,
1784                    expire_after: Some(300),
1785                    eval_interval: Some(10),
1786                    comment: Some("test comment".to_string()),
1787                },
1788            ),
1789            (
1790                r"
1791CREATE FLOW `task_2`
1792SINK TO schema_1.table_1
1793EXPIRE AFTER '2 days 1h 2 min'
1794AS
1795SELECT max(c1), min(c2) FROM schema_2.table_2;",
1796                CreateFlowWoutQuery {
1797                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::with_quote(
1798                        '`', "task_2",
1799                    ))]),
1800                    sink_table_name: ObjectName(vec![
1801                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1802                        ObjectNamePart::Identifier(Ident::new("table_1")),
1803                    ]),
1804                    or_replace: false,
1805                    if_not_exists: false,
1806                    expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1807                    eval_interval: None,
1808                    comment: None,
1809                },
1810            ),
1811        ];
1812
1813        for (sql, expected) in testcases {
1814            let create_task = parse_create_flow(sql);
1815
1816            let expected = CreateFlow {
1817                flow_name: expected.flow_name,
1818                sink_table_name: expected.sink_table_name,
1819                or_replace: expected.or_replace,
1820                if_not_exists: expected.if_not_exists,
1821                expire_after: expected.expire_after,
1822                eval_interval: expected.eval_interval,
1823                comment: expected.comment,
1824                // ignore query parse result
1825                query: create_task.query.clone(),
1826            };
1827
1828            assert_eq!(create_task, expected, "input sql is:\n{sql}");
1829            let show_create = create_task.to_string();
1830            let recreated = parse_create_flow(&show_create);
1831            assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1832        }
1833    }
1834
1835    #[test]
1836    fn test_parse_create_flow_with_tql_cte_query() {
1837        let sql = r#"
1838CREATE FLOW calc_reqs_cte
1839SINK TO cnt_reqs_cte
1840EVAL INTERVAL '1m'
1841AS
1842WITH tql(the_timestamp, the_value) AS (
1843    TQL EVAL (now() - '1m'::interval, now(), '5s') metric
1844)
1845SELECT * FROM tql;
1846"#;
1847
1848        let stmts =
1849            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1850                .unwrap();
1851        assert_eq!(1, stmts.len());
1852        let Statement::CreateFlow(create_flow) = &stmts[0] else {
1853            panic!("unexpected stmt: {:?}", stmts[0]);
1854        };
1855
1856        let query = create_flow.query.to_string();
1857        assert!(query.to_uppercase().contains("WITH"));
1858        assert!(query.to_uppercase().contains("TQL EVAL"));
1859    }
1860
1861    #[test]
1862    fn test_parse_create_flow_with_sql_cte_is_unsupported() {
1863        let sql = r#"
1864CREATE FLOW f
1865SINK TO s
1866AS
1867WITH cte AS (SELECT 1) SELECT * FROM cte;
1868"#;
1869
1870        let err =
1871            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1872                .unwrap_err();
1873        let msg = err.to_string();
1874        assert!(msg.to_uppercase().contains("WITH"), "err: {msg}");
1875    }
1876
1877    #[test]
1878    fn test_parse_create_flow_with_tql_cte_requires_now_expr() {
1879        let sql = r#"
1880CREATE FLOW f
1881SINK TO s
1882EVAL INTERVAL '1m'
1883AS
1884WITH tql(ts, val) AS (
1885    TQL EVAL (0, 15, '5s') metric
1886)
1887SELECT * FROM tql;
1888"#;
1889
1890        let err =
1891            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1892                .unwrap_err();
1893
1894        let msg = format!("{err:?}");
1895        assert!(
1896            msg.contains("Expected expression containing `now()`"),
1897            "unexpected err: {msg}"
1898        );
1899    }
1900
1901    #[test]
1902    fn test_parse_create_flow_with_tql_cte_non_select_star_is_unsupported() {
1903        let sql = r#"
1904CREATE FLOW f
1905SINK TO s
1906AS
1907WITH tql(ts, val) AS (
1908    TQL EVAL (now() - '1m'::interval, now(), '5s') metric
1909)
1910SELECT ts FROM tql;
1911"#;
1912
1913        let err =
1914            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1915                .unwrap_err();
1916        assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
1917    }
1918
1919    #[test]
1920    fn test_parse_create_flow_with_tql_cte_filter_is_unsupported() {
1921        let sql = r#"
1922CREATE FLOW f
1923SINK TO s
1924AS
1925WITH tql(ts, val) AS (
1926    TQL EVAL (now() - '1m'::interval, now(), '5s') metric
1927)
1928SELECT * FROM tql WHERE ts > 0;
1929"#;
1930
1931        let err =
1932            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1933                .unwrap_err();
1934        assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
1935    }
1936
1937    #[test]
1938    fn test_parse_create_flow_with_mixed_sql_tql_cte_is_unsupported() {
1939        let sql = r#"
1940CREATE FLOW f
1941SINK TO s
1942AS
1943WITH s1 AS (SELECT 1),
1944     tql(ts, val) AS (TQL EVAL (now() - '1m'::interval, now(), '5s') metric)
1945SELECT * FROM tql;
1946"#;
1947
1948        let err =
1949            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1950                .unwrap_err();
1951        assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
1952    }
1953
1954    #[test]
1955    fn test_create_flow_no_month() {
1956        let sql = r"
1957CREATE FLOW `task_2`
1958SINK TO schema_1.table_1
1959EXPIRE AFTER '1 month 2 days 1h 2 min'
1960AS
1961SELECT max(c1), min(c2) FROM schema_2.table_2;";
1962        let stmts =
1963            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1964
1965        assert!(
1966            stmts.is_err()
1967                && stmts
1968                    .unwrap_err()
1969                    .to_string()
1970                    .contains("Interval with months is not allowed")
1971        );
1972    }
1973
1974    #[test]
1975    fn test_validate_create() {
1976        let sql = r"
1977CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp TIME INDEX)
1978PARTITION ON COLUMNS(c, a) (
1979    a < 10,
1980    a > 10 AND a < 20,
1981    a > 20 AND c < 100,
1982    a > 20 AND c >= 100
1983)
1984ENGINE=mito";
1985        let result =
1986            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1987        let _ = result.unwrap();
1988
1989        let sql = r"
1990CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
1991PARTITION ON COLUMNS(x) ()
1992ENGINE=mito";
1993        let result =
1994            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1995        assert!(
1996            result
1997                .unwrap_err()
1998                .to_string()
1999                .contains("Partition column \"x\" not defined")
2000        );
2001    }
2002
2003    #[test]
2004    fn test_parse_create_table_with_partitions() {
2005        let sql = r"
2006CREATE TABLE monitor (
2007  host_id    INT,
2008  idc        STRING,
2009  ts         TIMESTAMP,
2010  cpu        DOUBLE DEFAULT 0,
2011  memory     DOUBLE,
2012  TIME INDEX (ts),
2013  PRIMARY KEY (host),
2014)
2015PARTITION ON COLUMNS(idc, host_id) (
2016  idc <= 'hz' AND host_id < 1000,
2017  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
2018  idc > 'sh' AND host_id < 3000,
2019  idc > 'sh' AND host_id >= 3000,
2020)
2021ENGINE=mito";
2022        let result =
2023            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2024                .unwrap();
2025        assert_eq!(result.len(), 1);
2026        match &result[0] {
2027            Statement::CreateTable(c) => {
2028                assert!(c.partitions.is_some());
2029
2030                let partitions = c.partitions.as_ref().unwrap();
2031                let column_list = partitions
2032                    .column_list
2033                    .iter()
2034                    .map(|x| &x.value)
2035                    .collect::<Vec<&String>>();
2036                assert_eq!(column_list, vec!["idc", "host_id"]);
2037
2038                let exprs = &partitions.exprs;
2039
2040                assert_eq!(
2041                    exprs[0],
2042                    Expr::BinaryOp {
2043                        left: Box::new(Expr::BinaryOp {
2044                            left: Box::new(Expr::Identifier("idc".into())),
2045                            op: BinaryOperator::LtEq,
2046                            right: Box::new(Expr::Value(
2047                                Value::SingleQuotedString("hz".to_string()).into()
2048                            ))
2049                        }),
2050                        op: BinaryOperator::And,
2051                        right: Box::new(Expr::BinaryOp {
2052                            left: Box::new(Expr::Identifier("host_id".into())),
2053                            op: BinaryOperator::Lt,
2054                            right: Box::new(Expr::Value(
2055                                Value::Number("1000".to_string(), false).into()
2056                            ))
2057                        })
2058                    }
2059                );
2060                assert_eq!(
2061                    exprs[1],
2062                    Expr::BinaryOp {
2063                        left: Box::new(Expr::BinaryOp {
2064                            left: Box::new(Expr::BinaryOp {
2065                                left: Box::new(Expr::Identifier("idc".into())),
2066                                op: BinaryOperator::Gt,
2067                                right: Box::new(Expr::Value(
2068                                    Value::SingleQuotedString("hz".to_string()).into()
2069                                ))
2070                            }),
2071                            op: BinaryOperator::And,
2072                            right: Box::new(Expr::BinaryOp {
2073                                left: Box::new(Expr::Identifier("idc".into())),
2074                                op: BinaryOperator::LtEq,
2075                                right: Box::new(Expr::Value(
2076                                    Value::SingleQuotedString("sh".to_string()).into()
2077                                ))
2078                            })
2079                        }),
2080                        op: BinaryOperator::And,
2081                        right: Box::new(Expr::BinaryOp {
2082                            left: Box::new(Expr::Identifier("host_id".into())),
2083                            op: BinaryOperator::Lt,
2084                            right: Box::new(Expr::Value(
2085                                Value::Number("2000".to_string(), false).into()
2086                            ))
2087                        })
2088                    }
2089                );
2090                assert_eq!(
2091                    exprs[2],
2092                    Expr::BinaryOp {
2093                        left: Box::new(Expr::BinaryOp {
2094                            left: Box::new(Expr::Identifier("idc".into())),
2095                            op: BinaryOperator::Gt,
2096                            right: Box::new(Expr::Value(
2097                                Value::SingleQuotedString("sh".to_string()).into()
2098                            ))
2099                        }),
2100                        op: BinaryOperator::And,
2101                        right: Box::new(Expr::BinaryOp {
2102                            left: Box::new(Expr::Identifier("host_id".into())),
2103                            op: BinaryOperator::Lt,
2104                            right: Box::new(Expr::Value(
2105                                Value::Number("3000".to_string(), false).into()
2106                            ))
2107                        })
2108                    }
2109                );
2110                assert_eq!(
2111                    exprs[3],
2112                    Expr::BinaryOp {
2113                        left: Box::new(Expr::BinaryOp {
2114                            left: Box::new(Expr::Identifier("idc".into())),
2115                            op: BinaryOperator::Gt,
2116                            right: Box::new(Expr::Value(
2117                                Value::SingleQuotedString("sh".to_string()).into()
2118                            ))
2119                        }),
2120                        op: BinaryOperator::And,
2121                        right: Box::new(Expr::BinaryOp {
2122                            left: Box::new(Expr::Identifier("host_id".into())),
2123                            op: BinaryOperator::GtEq,
2124                            right: Box::new(Expr::Value(
2125                                Value::Number("3000".to_string(), false).into()
2126                            ))
2127                        })
2128                    }
2129                );
2130            }
2131            _ => unreachable!(),
2132        }
2133    }
2134
2135    #[test]
2136    fn test_parse_create_table_with_quoted_partitions() {
2137        let sql = r"
2138CREATE TABLE monitor (
2139  `host_id`    INT,
2140  idc        STRING,
2141  ts         TIMESTAMP,
2142  cpu        DOUBLE DEFAULT 0,
2143  memory     DOUBLE,
2144  TIME INDEX (ts),
2145  PRIMARY KEY (host),
2146)
2147PARTITION ON COLUMNS(IdC, host_id) (
2148  idc <= 'hz' AND host_id < 1000,
2149  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
2150  idc > 'sh' AND host_id < 3000,
2151  idc > 'sh' AND host_id >= 3000,
2152)";
2153        let result =
2154            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2155                .unwrap();
2156        assert_eq!(result.len(), 1);
2157    }
2158
2159    #[test]
2160    fn test_parse_create_table_with_timestamp_index() {
2161        let sql1 = r"
2162CREATE TABLE monitor (
2163  host_id    INT,
2164  idc        STRING,
2165  ts         TIMESTAMP TIME INDEX,
2166  cpu        DOUBLE DEFAULT 0,
2167  memory     DOUBLE,
2168  PRIMARY KEY (host),
2169)
2170ENGINE=mito";
2171        let result1 = ParserContext::create_with_dialect(
2172            sql1,
2173            &GreptimeDbDialect {},
2174            ParseOptions::default(),
2175        )
2176        .unwrap();
2177
2178        if let Statement::CreateTable(c) = &result1[0] {
2179            assert_eq!(c.constraints.len(), 2);
2180            let tc = c.constraints[0].clone();
2181            match tc {
2182                TableConstraint::TimeIndex { column } => {
2183                    assert_eq!(&column.value, "ts");
2184                }
2185                _ => panic!("should be time index constraint"),
2186            };
2187        } else {
2188            panic!("should be create_table statement");
2189        }
2190
2191        // `TIME INDEX` should be in front of `PRIMARY KEY`
2192        // in order to equal the `TIMESTAMP TIME INDEX` constraint options vector
2193        let sql2 = r"
2194CREATE TABLE monitor (
2195  host_id    INT,
2196  idc        STRING,
2197  ts         TIMESTAMP NOT NULL,
2198  cpu        DOUBLE DEFAULT 0,
2199  memory     DOUBLE,
2200  TIME INDEX (ts),
2201  PRIMARY KEY (host),
2202)
2203ENGINE=mito";
2204        let result2 = ParserContext::create_with_dialect(
2205            sql2,
2206            &GreptimeDbDialect {},
2207            ParseOptions::default(),
2208        )
2209        .unwrap();
2210
2211        assert_eq!(result1, result2);
2212
2213        // TIMESTAMP can be NULL which is not equal to above
2214        let sql3 = r"
2215CREATE TABLE monitor (
2216  host_id    INT,
2217  idc        STRING,
2218  ts         TIMESTAMP,
2219  cpu        DOUBLE DEFAULT 0,
2220  memory     DOUBLE,
2221  TIME INDEX (ts),
2222  PRIMARY KEY (host),
2223)
2224ENGINE=mito";
2225
2226        let result3 = ParserContext::create_with_dialect(
2227            sql3,
2228            &GreptimeDbDialect {},
2229            ParseOptions::default(),
2230        )
2231        .unwrap();
2232
2233        assert_ne!(result1, result3);
2234
2235        // BIGINT can't be time index any more
2236        let sql1 = r"
2237CREATE TABLE monitor (
2238  host_id    INT,
2239  idc        STRING,
2240  b          bigint TIME INDEX,
2241  cpu        DOUBLE DEFAULT 0,
2242  memory     DOUBLE,
2243  PRIMARY KEY (host),
2244)
2245ENGINE=mito";
2246        let result1 = ParserContext::create_with_dialect(
2247            sql1,
2248            &GreptimeDbDialect {},
2249            ParseOptions::default(),
2250        );
2251
2252        assert!(
2253            result1
2254                .unwrap_err()
2255                .to_string()
2256                .contains("time index column data type should be timestamp")
2257        );
2258    }
2259
2260    #[test]
2261    fn test_parse_create_table_with_timestamp_index_not_null() {
2262        let sql = r"
2263CREATE TABLE monitor (
2264  host_id    INT,
2265  idc        STRING,
2266  ts         TIMESTAMP TIME INDEX,
2267  cpu        DOUBLE DEFAULT 0,
2268  memory     DOUBLE,
2269  TIME INDEX (ts),
2270  PRIMARY KEY (host),
2271)
2272ENGINE=mito";
2273        let result =
2274            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2275                .unwrap();
2276
2277        assert_eq!(result.len(), 1);
2278        if let Statement::CreateTable(c) = &result[0] {
2279            let ts = c.columns[2].clone();
2280            assert_eq!(ts.name().to_string(), "ts");
2281            assert_eq!(ts.options()[0].option, NotNull);
2282        } else {
2283            panic!("should be create table statement");
2284        }
2285
2286        let sql1 = r"
2287CREATE TABLE monitor (
2288  host_id    INT,
2289  idc        STRING,
2290  ts         TIMESTAMP NOT NULL TIME INDEX,
2291  cpu        DOUBLE DEFAULT 0,
2292  memory     DOUBLE,
2293  TIME INDEX (ts),
2294  PRIMARY KEY (host),
2295)
2296ENGINE=mito";
2297
2298        let result1 = ParserContext::create_with_dialect(
2299            sql1,
2300            &GreptimeDbDialect {},
2301            ParseOptions::default(),
2302        )
2303        .unwrap();
2304        assert_eq!(result, result1);
2305
2306        let sql2 = r"
2307CREATE TABLE monitor (
2308  host_id    INT,
2309  idc        STRING,
2310  ts         TIMESTAMP TIME INDEX NOT NULL,
2311  cpu        DOUBLE DEFAULT 0,
2312  memory     DOUBLE,
2313  TIME INDEX (ts),
2314  PRIMARY KEY (host),
2315)
2316ENGINE=mito";
2317
2318        let result2 = ParserContext::create_with_dialect(
2319            sql2,
2320            &GreptimeDbDialect {},
2321            ParseOptions::default(),
2322        )
2323        .unwrap();
2324        assert_eq!(result, result2);
2325
2326        let sql3 = r"
2327CREATE TABLE monitor (
2328  host_id    INT,
2329  idc        STRING,
2330  ts         TIMESTAMP TIME INDEX NULL NOT,
2331  cpu        DOUBLE DEFAULT 0,
2332  memory     DOUBLE,
2333  TIME INDEX (ts),
2334  PRIMARY KEY (host),
2335)
2336ENGINE=mito";
2337
2338        let result3 = ParserContext::create_with_dialect(
2339            sql3,
2340            &GreptimeDbDialect {},
2341            ParseOptions::default(),
2342        );
2343        assert!(result3.is_err());
2344
2345        let sql4 = r"
2346CREATE TABLE monitor (
2347  host_id    INT,
2348  idc        STRING,
2349  ts         TIMESTAMP TIME INDEX NOT NULL NULL,
2350  cpu        DOUBLE DEFAULT 0,
2351  memory     DOUBLE,
2352  TIME INDEX (ts),
2353  PRIMARY KEY (host),
2354)
2355ENGINE=mito";
2356
2357        let result4 = ParserContext::create_with_dialect(
2358            sql4,
2359            &GreptimeDbDialect {},
2360            ParseOptions::default(),
2361        );
2362        assert!(result4.is_err());
2363
2364        let sql = r"
2365CREATE TABLE monitor (
2366  host_id    INT,
2367  idc        STRING,
2368  ts         TIMESTAMP TIME INDEX DEFAULT CURRENT_TIMESTAMP,
2369  cpu        DOUBLE DEFAULT 0,
2370  memory     DOUBLE,
2371  TIME INDEX (ts),
2372  PRIMARY KEY (host),
2373)
2374ENGINE=mito";
2375
2376        let result =
2377            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2378                .unwrap();
2379
2380        if let Statement::CreateTable(c) = &result[0] {
2381            let tc = c.constraints[0].clone();
2382            match tc {
2383                TableConstraint::TimeIndex { column } => {
2384                    assert_eq!(&column.value, "ts");
2385                }
2386                _ => panic!("should be time index constraint"),
2387            }
2388            let ts = c.columns[2].clone();
2389            assert_eq!(ts.name().to_string(), "ts");
2390            assert!(matches!(ts.options()[0].option, ColumnOption::Default(..)));
2391            assert_eq!(ts.options()[1].option, NotNull);
2392        } else {
2393            unreachable!("should be create table statement");
2394        }
2395    }
2396
2397    #[test]
2398    fn test_parse_partitions_with_error_syntax() {
2399        let sql = r"
2400CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2401PARTITION COLUMNS(c, a) (
2402    a < 10,
2403    a > 10 AND a < 20,
2404    a > 20 AND c < 100,
2405    a > 20 AND c >= 100
2406)
2407ENGINE=mito";
2408        let result =
2409            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2410        assert!(
2411            result
2412                .unwrap_err()
2413                .output_msg()
2414                .contains("sql parser error: Expected: ON, found: COLUMNS")
2415        );
2416    }
2417
2418    #[test]
2419    fn test_parse_partitions_without_rule() {
2420        let sql = r"
2421CREATE TABLE rcx ( a INT, b STRING, c INT, d TIMESTAMP TIME INDEX )
2422PARTITION ON COLUMNS(c, a) ()
2423ENGINE=mito";
2424        ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2425            .unwrap();
2426    }
2427
2428    #[test]
2429    fn test_parse_partitions_unreferenced_column() {
2430        let sql = r"
2431CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2432PARTITION ON COLUMNS(c, a) (
2433    b = 'foo'
2434)
2435ENGINE=mito";
2436        let result =
2437            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2438        assert_eq!(
2439            result.unwrap_err().output_msg(),
2440            "Invalid SQL, error: Column \"b\" in rule expr is not referenced in PARTITION ON"
2441        );
2442    }
2443
2444    #[test]
2445    fn test_parse_partitions_not_binary_expr() {
2446        let sql = r"
2447CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2448PARTITION ON COLUMNS(c, a) (
2449    b
2450)
2451ENGINE=mito";
2452        let result =
2453            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2454        assert_eq!(
2455            result.unwrap_err().output_msg(),
2456            r#"Invalid SQL, error: Partition rule expr Identifier(Ident { value: "b", quote_style: None, span: Span(Location(4,5)..Location(4,6)) }) is not a binary expr"#
2457        );
2458    }
2459
2460    fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {
2461        assert_eq!(column.name.to_string(), name);
2462        assert_eq!(column.data_type.to_string(), data_type);
2463    }
2464
2465    #[test]
2466    pub fn test_parse_create_table() {
2467        let sql = r"create table demo(
2468                             host string,
2469                             ts timestamp,
2470                             cpu float32 default 0,
2471                             memory float64,
2472                             TIME INDEX (ts),
2473                             PRIMARY KEY(ts, host),
2474                             ) engine=mito
2475                             with(ttl='10s');
2476         ";
2477        let result =
2478            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2479                .unwrap();
2480        assert_eq!(1, result.len());
2481        match &result[0] {
2482            Statement::CreateTable(c) => {
2483                assert!(!c.if_not_exists);
2484                assert_eq!("demo", c.name.to_string());
2485                assert_eq!("mito", c.engine);
2486                assert_eq!(4, c.columns.len());
2487                let columns = &c.columns;
2488                assert_column_def(&columns[0].column_def, "host", "STRING");
2489                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
2490                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
2491                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
2492
2493                let constraints = &c.constraints;
2494                assert_eq!(
2495                    &constraints[0],
2496                    &TableConstraint::TimeIndex {
2497                        column: Ident::new("ts"),
2498                    }
2499                );
2500                assert_eq!(
2501                    &constraints[1],
2502                    &TableConstraint::PrimaryKey {
2503                        columns: vec![Ident::new("ts"), Ident::new("host")]
2504                    }
2505                );
2506                // inverted index is merged into column options
2507                assert_eq!(1, c.options.len());
2508                assert_eq!(
2509                    [("ttl", "10s")].into_iter().collect::<HashMap<_, _>>(),
2510                    c.options.to_str_map()
2511                );
2512            }
2513            _ => unreachable!(),
2514        }
2515    }
2516
2517    #[test]
2518    fn test_invalid_index_keys() {
2519        let sql = r"create table demo(
2520                             host string,
2521                             ts int64,
2522                             cpu float64 default 0,
2523                             memory float64,
2524                             TIME INDEX (ts, host),
2525                             PRIMARY KEY(ts, host)) engine=mito;
2526         ";
2527        let result =
2528            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2529        assert!(result.is_err());
2530        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2531    }
2532
2533    #[test]
2534    fn test_duplicated_time_index() {
2535        let sql = r"create table demo(
2536                             host string,
2537                             ts timestamp time index,
2538                             t timestamp time index,
2539                             cpu float64 default 0,
2540                             memory float64,
2541                             TIME INDEX (ts, host),
2542                             PRIMARY KEY(ts, host)) engine=mito;
2543         ";
2544        let result =
2545            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2546        assert!(result.is_err());
2547        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2548
2549        let sql = r"create table demo(
2550                             host string,
2551                             ts timestamp time index,
2552                             cpu float64 default 0,
2553                             t timestamp,
2554                             memory float64,
2555                             TIME INDEX (t),
2556                             PRIMARY KEY(ts, host)) engine=mito;
2557         ";
2558        let result =
2559            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2560        assert!(result.is_err());
2561        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2562    }
2563
2564    #[test]
2565    fn test_invalid_column_name() {
2566        let sql = "create table foo(user string, i timestamp time index)";
2567        let result =
2568            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2569        let err = result.unwrap_err().output_msg();
2570        assert!(err.contains("Cannot use keyword 'user' as column name"));
2571
2572        // If column name is quoted, it's valid even same with keyword.
2573        let sql = r#"
2574            create table foo("user" string, i timestamp time index)
2575        "#;
2576        let result =
2577            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2578        let _ = result.unwrap();
2579    }
2580
2581    #[test]
2582    fn test_incorrect_default_value_issue_3479() {
2583        let sql = r#"CREATE TABLE `ExcePTuRi`(
2584non TIMESTAMP(6) TIME INDEX,
2585`iUSTO` DOUBLE DEFAULT 0.047318541668048164
2586)"#;
2587        let result =
2588            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2589                .unwrap();
2590        assert_eq!(1, result.len());
2591        match &result[0] {
2592            Statement::CreateTable(c) => {
2593                assert_eq!(
2594                    "`iUSTO` DOUBLE DEFAULT 0.047318541668048164",
2595                    c.columns[1].to_string()
2596                );
2597            }
2598            _ => unreachable!(),
2599        }
2600    }
2601
2602    #[test]
2603    fn test_parse_create_view() {
2604        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
2605
2606        let result =
2607            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2608                .unwrap();
2609        match &result[0] {
2610            Statement::CreateView(c) => {
2611                assert_eq!(c.to_string(), sql);
2612                assert!(!c.or_replace);
2613                assert!(!c.if_not_exists);
2614                assert_eq!("test", c.name.to_string());
2615            }
2616            _ => unreachable!(),
2617        }
2618
2619        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS";
2620
2621        let result =
2622            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2623                .unwrap();
2624        match &result[0] {
2625            Statement::CreateView(c) => {
2626                assert_eq!(c.to_string(), sql);
2627                assert!(c.or_replace);
2628                assert!(c.if_not_exists);
2629                assert_eq!("test", c.name.to_string());
2630            }
2631            _ => unreachable!(),
2632        }
2633    }
2634
2635    #[test]
2636    fn test_parse_create_view_invalid_query() {
2637        let sql = "CREATE VIEW test AS DELETE from demo";
2638        let result =
2639            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2640        assert!(result.is_ok_and(|x| x.len() == 1));
2641    }
2642
2643    #[test]
2644    fn test_parse_create_table_fulltext_options() {
2645        let sql1 = r"
2646CREATE TABLE log (
2647    ts TIMESTAMP TIME INDEX,
2648    msg TEXT FULLTEXT INDEX,
2649)";
2650        let result1 = ParserContext::create_with_dialect(
2651            sql1,
2652            &GreptimeDbDialect {},
2653            ParseOptions::default(),
2654        )
2655        .unwrap();
2656
2657        if let Statement::CreateTable(c) = &result1[0] {
2658            c.columns.iter().for_each(|col| {
2659                if col.name().value == "msg" {
2660                    assert!(
2661                        col.extensions
2662                            .fulltext_index_options
2663                            .as_ref()
2664                            .unwrap()
2665                            .is_empty()
2666                    );
2667                }
2668            });
2669        } else {
2670            panic!("should be create_table statement");
2671        }
2672
2673        let sql2 = r"
2674CREATE TABLE log (
2675    ts TIMESTAMP TIME INDEX,
2676    msg STRING FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false')
2677)";
2678        let result2 = ParserContext::create_with_dialect(
2679            sql2,
2680            &GreptimeDbDialect {},
2681            ParseOptions::default(),
2682        )
2683        .unwrap();
2684
2685        if let Statement::CreateTable(c) = &result2[0] {
2686            c.columns.iter().for_each(|col| {
2687                if col.name().value == "msg" {
2688                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2689                    assert_eq!(options.len(), 2);
2690                    assert_eq!(options.get("analyzer").unwrap(), "English");
2691                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2692                }
2693            });
2694        } else {
2695            panic!("should be create_table statement");
2696        }
2697
2698        let sql3 = r"
2699CREATE TABLE log (
2700    ts TIMESTAMP TIME INDEX,
2701    msg1 TINYTEXT FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false'),
2702    msg2 CHAR(20) FULLTEXT INDEX WITH (analyzer='Chinese', case_sensitive='true')
2703)";
2704        let result3 = ParserContext::create_with_dialect(
2705            sql3,
2706            &GreptimeDbDialect {},
2707            ParseOptions::default(),
2708        )
2709        .unwrap();
2710
2711        if let Statement::CreateTable(c) = &result3[0] {
2712            c.columns.iter().for_each(|col| {
2713                if col.name().value == "msg1" {
2714                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2715                    assert_eq!(options.len(), 2);
2716                    assert_eq!(options.get("analyzer").unwrap(), "English");
2717                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2718                } else if col.name().value == "msg2" {
2719                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2720                    assert_eq!(options.len(), 2);
2721                    assert_eq!(options.get("analyzer").unwrap(), "Chinese");
2722                    assert_eq!(options.get("case_sensitive").unwrap(), "true");
2723                }
2724            });
2725        } else {
2726            panic!("should be create_table statement");
2727        }
2728    }
2729
2730    #[test]
2731    fn test_parse_create_table_fulltext_options_invalid_type() {
2732        let sql = r"
2733CREATE TABLE log (
2734    ts TIMESTAMP TIME INDEX,
2735    msg INT FULLTEXT INDEX,
2736)";
2737        let result =
2738            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2739        assert!(result.is_err());
2740        assert!(
2741            result
2742                .unwrap_err()
2743                .to_string()
2744                .contains("FULLTEXT index only supports string type")
2745        );
2746    }
2747
2748    #[test]
2749    fn test_parse_create_table_fulltext_options_duplicate() {
2750        let sql = r"
2751CREATE TABLE log (
2752    ts TIMESTAMP TIME INDEX,
2753    msg STRING FULLTEXT INDEX WITH (analyzer='English', analyzer='Chinese') FULLTEXT INDEX WITH (case_sensitive='false')
2754)";
2755        let result =
2756            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2757        assert!(result.is_err());
2758        assert!(
2759            result
2760                .unwrap_err()
2761                .to_string()
2762                .contains("duplicated FULLTEXT INDEX option")
2763        );
2764    }
2765
2766    #[test]
2767    fn test_parse_create_table_fulltext_options_invalid_option() {
2768        let sql = r"
2769CREATE TABLE log (
2770    ts TIMESTAMP TIME INDEX,
2771    msg STRING FULLTEXT INDEX WITH (analyzer='English', invalid_option='Chinese')
2772)";
2773        let result =
2774            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2775        assert!(result.is_err());
2776        assert!(
2777            result
2778                .unwrap_err()
2779                .to_string()
2780                .contains("invalid FULLTEXT INDEX option")
2781        );
2782    }
2783
2784    #[test]
2785    fn test_parse_create_table_skip_options() {
2786        let sql = r"
2787CREATE TABLE log (
2788    ts TIMESTAMP TIME INDEX,
2789    msg INT SKIPPING INDEX WITH (granularity='8192', type='bloom'),
2790)";
2791        let result =
2792            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2793                .unwrap();
2794
2795        if let Statement::CreateTable(c) = &result[0] {
2796            c.columns.iter().for_each(|col| {
2797                if col.name().value == "msg" {
2798                    assert!(
2799                        !col.extensions
2800                            .skipping_index_options
2801                            .as_ref()
2802                            .unwrap()
2803                            .is_empty()
2804                    );
2805                }
2806            });
2807        } else {
2808            panic!("should be create_table statement");
2809        }
2810
2811        let sql = r"
2812        CREATE TABLE log (
2813            ts TIMESTAMP TIME INDEX,
2814            msg INT SKIPPING INDEX,
2815        )";
2816        let result =
2817            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2818                .unwrap();
2819
2820        if let Statement::CreateTable(c) = &result[0] {
2821            c.columns.iter().for_each(|col| {
2822                if col.name().value == "msg" {
2823                    assert!(
2824                        col.extensions
2825                            .skipping_index_options
2826                            .as_ref()
2827                            .unwrap()
2828                            .is_empty()
2829                    );
2830                }
2831            });
2832        } else {
2833            panic!("should be create_table statement");
2834        }
2835    }
2836
2837    #[test]
2838    fn test_parse_create_view_with_columns() {
2839        let sql = "CREATE VIEW test () AS SELECT * FROM NUMBERS";
2840        let result =
2841            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2842                .unwrap();
2843
2844        match &result[0] {
2845            Statement::CreateView(c) => {
2846                assert_eq!(c.to_string(), "CREATE VIEW test AS SELECT * FROM NUMBERS");
2847                assert!(!c.or_replace);
2848                assert!(!c.if_not_exists);
2849                assert_eq!("test", c.name.to_string());
2850            }
2851            _ => unreachable!(),
2852        }
2853        assert_eq!(
2854            "CREATE VIEW test AS SELECT * FROM NUMBERS",
2855            result[0].to_string()
2856        );
2857
2858        let sql = "CREATE VIEW test (n1) AS SELECT * FROM NUMBERS";
2859        let result =
2860            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2861                .unwrap();
2862
2863        match &result[0] {
2864            Statement::CreateView(c) => {
2865                assert_eq!(c.to_string(), sql);
2866                assert!(!c.or_replace);
2867                assert!(!c.if_not_exists);
2868                assert_eq!("test", c.name.to_string());
2869            }
2870            _ => unreachable!(),
2871        }
2872        assert_eq!(sql, result[0].to_string());
2873
2874        let sql = "CREATE VIEW test (n1, n2) AS SELECT * FROM NUMBERS";
2875        let result =
2876            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2877                .unwrap();
2878
2879        match &result[0] {
2880            Statement::CreateView(c) => {
2881                assert_eq!(c.to_string(), sql);
2882                assert!(!c.or_replace);
2883                assert!(!c.if_not_exists);
2884                assert_eq!("test", c.name.to_string());
2885            }
2886            _ => unreachable!(),
2887        }
2888        assert_eq!(sql, result[0].to_string());
2889
2890        // Some invalid syntax cases
2891        let sql = "CREATE VIEW test (n1 AS select * from demo";
2892        let result =
2893            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2894        assert!(result.is_err());
2895
2896        let sql = "CREATE VIEW test (n1, AS select * from demo";
2897        let result =
2898            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2899        assert!(result.is_err());
2900
2901        let sql = "CREATE VIEW test n1,n2) AS select * from demo";
2902        let result =
2903            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2904        assert!(result.is_err());
2905
2906        let sql = "CREATE VIEW test (1) AS select * from demo";
2907        let result =
2908            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2909        assert!(result.is_err());
2910
2911        // keyword
2912        let sql = "CREATE VIEW test (n1, select) AS select * from demo";
2913        let result =
2914            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2915        assert!(result.is_err());
2916    }
2917
2918    #[test]
2919    fn test_parse_column_extensions_vector() {
2920        // Test that vector options are parsed from data_type (no additional SQL needed)
2921        let sql = "";
2922        let dialect = GenericDialect {};
2923        let mut tokenizer = Tokenizer::new(&dialect, sql);
2924        let tokens = tokenizer.tokenize().unwrap();
2925        let mut parser = Parser::new(&dialect).with_tokens(tokens);
2926        let name = Ident::new("vec_col");
2927        let data_type =
2928            DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
2929        let mut extensions = ColumnExtensions::default();
2930
2931        let result =
2932            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2933        assert!(result.is_ok());
2934        assert!(extensions.vector_options.is_some());
2935        let vector_options = extensions.vector_options.unwrap();
2936        assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some("128"));
2937    }
2938
2939    #[test]
2940    fn test_parse_column_extensions_vector_invalid() {
2941        // Test that vector with no dimension fails
2942        let sql = "";
2943        let dialect = GenericDialect {};
2944        let mut tokenizer = Tokenizer::new(&dialect, sql);
2945        let tokens = tokenizer.tokenize().unwrap();
2946        let mut parser = Parser::new(&dialect).with_tokens(tokens);
2947        let name = Ident::new("vec_col");
2948        let data_type = DataType::Custom(vec![Ident::new("VECTOR")].into(), vec![]);
2949        let mut extensions = ColumnExtensions::default();
2950
2951        let result =
2952            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2953        assert!(result.is_err());
2954    }
2955
2956    #[test]
2957    fn test_parse_column_extensions_indices() {
2958        // Test skipping index
2959        {
2960            let sql = "SKIPPING INDEX";
2961            let dialect = GenericDialect {};
2962            let mut tokenizer = Tokenizer::new(&dialect, sql);
2963            let tokens = tokenizer.tokenize().unwrap();
2964            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2965            let name = Ident::new("col");
2966            let data_type = DataType::String(None);
2967            let mut extensions = ColumnExtensions::default();
2968            let result = ParserContext::parse_column_extensions(
2969                &mut parser,
2970                &name,
2971                &data_type,
2972                &mut extensions,
2973            );
2974            assert!(result.is_ok());
2975            assert!(extensions.skipping_index_options.is_some());
2976        }
2977
2978        // Test fulltext index with options
2979        {
2980            let sql = "FULLTEXT INDEX WITH (analyzer = 'English', case_sensitive = 'true')";
2981            let dialect = GenericDialect {};
2982            let mut tokenizer = Tokenizer::new(&dialect, sql);
2983            let tokens = tokenizer.tokenize().unwrap();
2984            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2985            let name = Ident::new("text_col");
2986            let data_type = DataType::String(None);
2987            let mut extensions = ColumnExtensions::default();
2988            let result = ParserContext::parse_column_extensions(
2989                &mut parser,
2990                &name,
2991                &data_type,
2992                &mut extensions,
2993            );
2994            assert!(result.unwrap());
2995            assert!(extensions.fulltext_index_options.is_some());
2996            let fulltext_options = extensions.fulltext_index_options.unwrap();
2997            assert_eq!(fulltext_options.get("analyzer"), Some("English"));
2998            assert_eq!(fulltext_options.get("case_sensitive"), Some("true"));
2999        }
3000
3001        // Test fulltext index with invalid type (should fail)
3002        {
3003            let sql = "FULLTEXT INDEX WITH (analyzer = 'English')";
3004            let dialect = GenericDialect {};
3005            let mut tokenizer = Tokenizer::new(&dialect, sql);
3006            let tokens = tokenizer.tokenize().unwrap();
3007            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3008            let name = Ident::new("num_col");
3009            let data_type = DataType::Int(None); // Non-string type
3010            let mut extensions = ColumnExtensions::default();
3011            let result = ParserContext::parse_column_extensions(
3012                &mut parser,
3013                &name,
3014                &data_type,
3015                &mut extensions,
3016            );
3017            assert!(result.is_err());
3018            assert!(
3019                result
3020                    .unwrap_err()
3021                    .to_string()
3022                    .contains("FULLTEXT index only supports string type")
3023            );
3024        }
3025
3026        // Test fulltext index with invalid option (won't fail, the parser doesn't check the option's content)
3027        {
3028            let sql = "FULLTEXT INDEX WITH (analyzer = 'Invalid', case_sensitive = 'true')";
3029            let dialect = GenericDialect {};
3030            let mut tokenizer = Tokenizer::new(&dialect, sql);
3031            let tokens = tokenizer.tokenize().unwrap();
3032            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3033            let name = Ident::new("text_col");
3034            let data_type = DataType::String(None);
3035            let mut extensions = ColumnExtensions::default();
3036            let result = ParserContext::parse_column_extensions(
3037                &mut parser,
3038                &name,
3039                &data_type,
3040                &mut extensions,
3041            );
3042            assert!(result.unwrap());
3043        }
3044
3045        // Test inverted index
3046        {
3047            let sql = "INVERTED INDEX";
3048            let dialect = GenericDialect {};
3049            let mut tokenizer = Tokenizer::new(&dialect, sql);
3050            let tokens = tokenizer.tokenize().unwrap();
3051            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3052            let name = Ident::new("col");
3053            let data_type = DataType::String(None);
3054            let mut extensions = ColumnExtensions::default();
3055            let result = ParserContext::parse_column_extensions(
3056                &mut parser,
3057                &name,
3058                &data_type,
3059                &mut extensions,
3060            );
3061            assert!(result.is_ok());
3062            assert!(extensions.inverted_index_options.is_some());
3063        }
3064
3065        // Test inverted index with options (should fail)
3066        {
3067            let sql = "INVERTED INDEX WITH (analyzer = 'English')";
3068            let dialect = GenericDialect {};
3069            let mut tokenizer = Tokenizer::new(&dialect, sql);
3070            let tokens = tokenizer.tokenize().unwrap();
3071            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3072            let name = Ident::new("col");
3073            let data_type = DataType::String(None);
3074            let mut extensions = ColumnExtensions::default();
3075            let result = ParserContext::parse_column_extensions(
3076                &mut parser,
3077                &name,
3078                &data_type,
3079                &mut extensions,
3080            );
3081            assert!(result.is_err());
3082            assert!(
3083                result
3084                    .unwrap_err()
3085                    .to_string()
3086                    .contains("INVERTED index doesn't support options")
3087            );
3088        }
3089
3090        // Test multiple indices
3091        {
3092            let sql = "SKIPPING INDEX FULLTEXT INDEX";
3093            let dialect = GenericDialect {};
3094            let mut tokenizer = Tokenizer::new(&dialect, sql);
3095            let tokens = tokenizer.tokenize().unwrap();
3096            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3097            let name = Ident::new("col");
3098            let data_type = DataType::String(None);
3099            let mut extensions = ColumnExtensions::default();
3100            let result = ParserContext::parse_column_extensions(
3101                &mut parser,
3102                &name,
3103                &data_type,
3104                &mut extensions,
3105            );
3106            assert!(result.unwrap());
3107            assert!(extensions.skipping_index_options.is_some());
3108            assert!(extensions.fulltext_index_options.is_some());
3109        }
3110    }
3111
3112    #[test]
3113    fn test_parse_interval_cast() {
3114        let s = "select '10s'::INTERVAL";
3115        let stmts =
3116            ParserContext::create_with_dialect(s, &GreptimeDbDialect {}, ParseOptions::default())
3117                .unwrap();
3118        assert_eq!("SELECT '10 seconds'::INTERVAL", &stmts[0].to_string());
3119    }
3120
3121    #[test]
3122    fn test_parse_create_table_vector_index_options() {
3123        // Test basic vector index
3124        let sql = r"
3125CREATE TABLE vectors (
3126    ts TIMESTAMP TIME INDEX,
3127    vec VECTOR(128) VECTOR INDEX,
3128)";
3129        let result =
3130            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3131                .unwrap();
3132
3133        if let Statement::CreateTable(c) = &result[0] {
3134            c.columns.iter().for_each(|col| {
3135                if col.name().value == "vec" {
3136                    assert!(
3137                        col.extensions
3138                            .vector_index_options
3139                            .as_ref()
3140                            .unwrap()
3141                            .is_empty()
3142                    );
3143                }
3144            });
3145        } else {
3146            panic!("should be create_table statement");
3147        }
3148
3149        // Test vector index with options
3150        let sql = r"
3151CREATE TABLE vectors (
3152    ts TIMESTAMP TIME INDEX,
3153    vec VECTOR(128) VECTOR INDEX WITH (metric='cosine', connectivity='32', expansion_add='256', expansion_search='128')
3154)";
3155        let result =
3156            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3157                .unwrap();
3158
3159        if let Statement::CreateTable(c) = &result[0] {
3160            c.columns.iter().for_each(|col| {
3161                if col.name().value == "vec" {
3162                    let options = col.extensions.vector_index_options.as_ref().unwrap();
3163                    assert_eq!(options.len(), 4);
3164                    assert_eq!(options.get("metric").unwrap(), "cosine");
3165                    assert_eq!(options.get("connectivity").unwrap(), "32");
3166                    assert_eq!(options.get("expansion_add").unwrap(), "256");
3167                    assert_eq!(options.get("expansion_search").unwrap(), "128");
3168                }
3169            });
3170        } else {
3171            panic!("should be create_table statement");
3172        }
3173    }
3174
3175    #[test]
3176    fn test_parse_create_table_vector_index_invalid_type() {
3177        // Test vector index on non-vector type (should fail)
3178        let sql = r"
3179CREATE TABLE vectors (
3180    ts TIMESTAMP TIME INDEX,
3181    col INT VECTOR INDEX,
3182)";
3183        let result =
3184            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3185        assert!(result.is_err());
3186        assert!(
3187            result
3188                .unwrap_err()
3189                .to_string()
3190                .contains("VECTOR INDEX only supports Vector type columns")
3191        );
3192    }
3193
3194    #[test]
3195    fn test_parse_create_table_vector_index_duplicate() {
3196        // Test duplicate vector index (should fail)
3197        let sql = r"
3198CREATE TABLE vectors (
3199    ts TIMESTAMP TIME INDEX,
3200    vec VECTOR(128) VECTOR INDEX VECTOR INDEX,
3201)";
3202        let result =
3203            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3204        assert!(result.is_err());
3205        assert!(
3206            result
3207                .unwrap_err()
3208                .to_string()
3209                .contains("duplicated VECTOR INDEX option")
3210        );
3211    }
3212
3213    #[test]
3214    fn test_parse_create_table_vector_index_invalid_option() {
3215        // Test invalid option key (should fail)
3216        let sql = r"
3217CREATE TABLE vectors (
3218    ts TIMESTAMP TIME INDEX,
3219    vec VECTOR(128) VECTOR INDEX WITH (metric='l2sq', invalid_option='foo')
3220)";
3221        let result =
3222            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3223        assert!(result.is_err());
3224        assert!(
3225            result
3226                .unwrap_err()
3227                .to_string()
3228                .contains("invalid VECTOR INDEX option")
3229        );
3230    }
3231
3232    #[test]
3233    fn test_parse_column_extensions_vector_index() {
3234        // Test vector index on vector type
3235        {
3236            let sql = "VECTOR INDEX WITH (metric = 'l2sq')";
3237            let dialect = GenericDialect {};
3238            let mut tokenizer = Tokenizer::new(&dialect, sql);
3239            let tokens = tokenizer.tokenize().unwrap();
3240            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3241            let name = Ident::new("vec_col");
3242            let data_type =
3243                DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
3244            // First, parse the vector type to set vector_options
3245            let mut extensions = ColumnExtensions {
3246                vector_options: Some(OptionMap::from([(
3247                    VECTOR_OPT_DIM.to_string(),
3248                    "128".to_string(),
3249                )])),
3250                ..Default::default()
3251            };
3252
3253            let result = ParserContext::parse_column_extensions(
3254                &mut parser,
3255                &name,
3256                &data_type,
3257                &mut extensions,
3258            );
3259            assert!(result.is_ok());
3260            assert!(extensions.vector_index_options.is_some());
3261            let vi_options = extensions.vector_index_options.unwrap();
3262            assert_eq!(vi_options.get("metric"), Some("l2sq"));
3263        }
3264
3265        // Test vector index on non-vector type (should fail)
3266        {
3267            let sql = "VECTOR INDEX";
3268            let dialect = GenericDialect {};
3269            let mut tokenizer = Tokenizer::new(&dialect, sql);
3270            let tokens = tokenizer.tokenize().unwrap();
3271            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3272            let name = Ident::new("num_col");
3273            let data_type = DataType::Int(None); // Non-vector type
3274            let mut extensions = ColumnExtensions::default();
3275            let result = ParserContext::parse_column_extensions(
3276                &mut parser,
3277                &name,
3278                &data_type,
3279                &mut extensions,
3280            );
3281            assert!(result.is_err());
3282            assert!(
3283                result
3284                    .unwrap_err()
3285                    .to_string()
3286                    .contains("VECTOR INDEX only supports Vector type columns")
3287            );
3288        }
3289    }
3290}