sql/parsers/
create_parser.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod json;
16#[cfg(feature = "enterprise")]
17pub mod trigger;
18
19use std::collections::HashMap;
20
21use arrow_buffer::IntervalMonthDayNano;
22use common_catalog::consts::default_engine;
23use datafusion_common::ScalarValue;
24use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
25use datatypes::data_type::ConcreteDataType;
26use itertools::Itertools;
27use snafu::{OptionExt, ResultExt, ensure};
28use sqlparser::ast::{ColumnOption, ColumnOptionDef, DataType, Expr};
29use sqlparser::dialect::keywords::Keyword;
30use sqlparser::keywords::ALL_KEYWORDS;
31use sqlparser::parser::IsOptional::Mandatory;
32use sqlparser::parser::{Parser, ParserError};
33use sqlparser::tokenizer::{Token, TokenWithSpan, Word};
34use table::requests::{validate_database_option, validate_table_option};
35
36use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
37use crate::error::{
38    self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidIntervalSnafu,
39    InvalidSqlSnafu, InvalidTableOptionSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result,
40    SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu,
41};
42use crate::parser::{FLOW, ParserContext};
43use crate::parsers::tql_parser;
44use crate::parsers::utils::{
45    self, validate_column_fulltext_create_option, validate_column_skipping_index_create_option,
46};
47use crate::statements::create::{
48    Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
49    CreateTableLike, CreateView, Partitions, SqlOrTql, TableConstraint, VECTOR_OPT_DIM,
50};
51use crate::statements::statement::Statement;
52use crate::statements::transform::type_alias::get_data_type_by_alias_name;
53use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
54use crate::util::{OptionValue, location_to_index, parse_option_string};
55
56pub const ENGINE: &str = "ENGINE";
57pub const MAXVALUE: &str = "MAXVALUE";
58pub const SINK: &str = "SINK";
59pub const EXPIRE: &str = "EXPIRE";
60pub const AFTER: &str = "AFTER";
61pub const INVERTED: &str = "INVERTED";
62pub const SKIPPING: &str = "SKIPPING";
63
64pub type RawIntervalExpr = String;
65
66/// Parses create [table] statement
67impl<'a> ParserContext<'a> {
68    pub(crate) fn parse_create(&mut self) -> Result<Statement> {
69        match self.parser.peek_token().token {
70            Token::Word(w) => match w.keyword {
71                Keyword::TABLE => self.parse_create_table(),
72
73                Keyword::SCHEMA | Keyword::DATABASE => self.parse_create_database(),
74
75                Keyword::EXTERNAL => self.parse_create_external_table(),
76
77                Keyword::OR => {
78                    let _ = self.parser.next_token();
79                    self.parser
80                        .expect_keyword(Keyword::REPLACE)
81                        .context(SyntaxSnafu)?;
82                    match self.parser.next_token().token {
83                        Token::Word(w) => match w.keyword {
84                            Keyword::VIEW => self.parse_create_view(true),
85                            Keyword::NoKeyword => {
86                                let uppercase = w.value.to_uppercase();
87                                match uppercase.as_str() {
88                                    FLOW => self.parse_create_flow(true),
89                                    _ => self.unsupported(w.to_string()),
90                                }
91                            }
92                            _ => self.unsupported(w.to_string()),
93                        },
94                        _ => self.unsupported(w.to_string()),
95                    }
96                }
97
98                Keyword::VIEW => {
99                    let _ = self.parser.next_token();
100                    self.parse_create_view(false)
101                }
102
103                #[cfg(feature = "enterprise")]
104                Keyword::TRIGGER => {
105                    let _ = self.parser.next_token();
106                    self.parse_create_trigger()
107                }
108
109                Keyword::NoKeyword => {
110                    let _ = self.parser.next_token();
111                    let uppercase = w.value.to_uppercase();
112                    match uppercase.as_str() {
113                        FLOW => self.parse_create_flow(false),
114                        _ => self.unsupported(w.to_string()),
115                    }
116                }
117                _ => self.unsupported(w.to_string()),
118            },
119            unexpected => self.unsupported(unexpected.to_string()),
120        }
121    }
122
123    /// Parse `CREAVE VIEW` statement.
124    fn parse_create_view(&mut self, or_replace: bool) -> Result<Statement> {
125        let if_not_exists = self.parse_if_not_exist()?;
126        let view_name = self.intern_parse_table_name()?;
127
128        let columns = self.parse_view_columns()?;
129
130        self.parser
131            .expect_keyword(Keyword::AS)
132            .context(SyntaxSnafu)?;
133
134        let query = self.parse_query()?;
135
136        Ok(Statement::CreateView(CreateView {
137            name: view_name,
138            columns,
139            or_replace,
140            query: Box::new(query),
141            if_not_exists,
142        }))
143    }
144
145    fn parse_view_columns(&mut self) -> Result<Vec<Ident>> {
146        let mut columns = vec![];
147        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
148            return Ok(columns);
149        }
150
151        loop {
152            let name = self.parse_column_name().context(SyntaxSnafu)?;
153
154            columns.push(name);
155
156            let comma = self.parser.consume_token(&Token::Comma);
157            if self.parser.consume_token(&Token::RParen) {
158                // allow a trailing comma, even though it's not in standard
159                break;
160            } else if !comma {
161                return self.expected("',' or ')' after column name", self.parser.peek_token());
162            }
163        }
164
165        Ok(columns)
166    }
167
168    fn parse_create_external_table(&mut self) -> Result<Statement> {
169        let _ = self.parser.next_token();
170        self.parser
171            .expect_keyword(Keyword::TABLE)
172            .context(SyntaxSnafu)?;
173        let if_not_exists = self.parse_if_not_exist()?;
174        let table_name = self.intern_parse_table_name()?;
175        let (columns, constraints) = self.parse_columns()?;
176        if !columns.is_empty() {
177            validate_time_index(&columns, &constraints)?;
178        }
179
180        let engine = self.parse_table_engine(common_catalog::consts::FILE_ENGINE)?;
181        let options = self.parse_create_table_options()?;
182        Ok(Statement::CreateExternalTable(CreateExternalTable {
183            name: table_name,
184            columns,
185            constraints,
186            options,
187            if_not_exists,
188            engine,
189        }))
190    }
191
192    fn parse_create_database(&mut self) -> Result<Statement> {
193        let _ = self.parser.next_token();
194        let if_not_exists = self.parse_if_not_exist()?;
195        let database_name = self.parse_object_name().context(error::UnexpectedSnafu {
196            expected: "a database name",
197            actual: self.peek_token_as_string(),
198        })?;
199        let database_name = Self::canonicalize_object_name(database_name)?;
200
201        let options = self
202            .parser
203            .parse_options(Keyword::WITH)
204            .context(SyntaxSnafu)?
205            .into_iter()
206            .map(parse_option_string)
207            .collect::<Result<HashMap<String, OptionValue>>>()?;
208
209        for key in options.keys() {
210            ensure!(
211                validate_database_option(key),
212                InvalidDatabaseOptionSnafu { key: key.clone() }
213            );
214        }
215        if let Some(append_mode) = options.get("append_mode").and_then(|x| x.as_string())
216            && append_mode == "true"
217            && options.contains_key("merge_mode")
218        {
219            return InvalidDatabaseOptionSnafu {
220                key: "merge_mode".to_string(),
221            }
222            .fail();
223        }
224
225        Ok(Statement::CreateDatabase(CreateDatabase {
226            name: database_name,
227            if_not_exists,
228            options: OptionMap::new(options),
229        }))
230    }
231
232    fn parse_create_table(&mut self) -> Result<Statement> {
233        let _ = self.parser.next_token();
234
235        let if_not_exists = self.parse_if_not_exist()?;
236
237        let table_name = self.intern_parse_table_name()?;
238
239        if self.parser.parse_keyword(Keyword::LIKE) {
240            let source_name = self.intern_parse_table_name()?;
241
242            return Ok(Statement::CreateTableLike(CreateTableLike {
243                table_name,
244                source_name,
245            }));
246        }
247
248        let (columns, constraints) = self.parse_columns()?;
249        validate_time_index(&columns, &constraints)?;
250
251        let partitions = self.parse_partitions()?;
252        if let Some(partitions) = &partitions {
253            validate_partitions(&columns, partitions)?;
254        }
255
256        let engine = self.parse_table_engine(default_engine())?;
257        let options = self.parse_create_table_options()?;
258        let create_table = CreateTable {
259            if_not_exists,
260            name: table_name,
261            columns,
262            engine,
263            constraints,
264            options,
265            table_id: 0, // table id is assigned by catalog manager
266            partitions,
267        };
268
269        Ok(Statement::CreateTable(create_table))
270    }
271
272    /// "CREATE FLOW" clause
273    fn parse_create_flow(&mut self, or_replace: bool) -> Result<Statement> {
274        let if_not_exists = self.parse_if_not_exist()?;
275
276        let flow_name = self.intern_parse_table_name()?;
277
278        // make `SINK` case in-sensitive
279        if let Token::Word(word) = self.parser.peek_token().token
280            && word.value.eq_ignore_ascii_case(SINK)
281        {
282            self.parser.next_token();
283        } else {
284            Err(ParserError::ParserError(
285                "Expect `SINK` keyword".to_string(),
286            ))
287            .context(SyntaxSnafu)?
288        }
289        self.parser
290            .expect_keyword(Keyword::TO)
291            .context(SyntaxSnafu)?;
292
293        let output_table_name = self.intern_parse_table_name()?;
294
295        let expire_after = if let Token::Word(w1) = &self.parser.peek_token().token
296            && w1.value.eq_ignore_ascii_case(EXPIRE)
297        {
298            self.parser.next_token();
299            if let Token::Word(w2) = &self.parser.peek_token().token
300                && w2.value.eq_ignore_ascii_case(AFTER)
301            {
302                self.parser.next_token();
303                Some(self.parse_interval_no_month("EXPIRE AFTER")?)
304            } else {
305                None
306            }
307        } else {
308            None
309        };
310
311        let eval_interval = if self
312            .parser
313            .consume_tokens(&[Token::make_keyword("EVAL"), Token::make_keyword("INTERVAL")])
314        {
315            Some(self.parse_interval_no_month("EVAL INTERVAL")?)
316        } else {
317            None
318        };
319
320        let comment = if self.parser.parse_keyword(Keyword::COMMENT) {
321            match self.parser.next_token() {
322                TokenWithSpan {
323                    token: Token::SingleQuotedString(value, ..),
324                    ..
325                } => Some(value),
326                unexpected => {
327                    return self
328                        .parser
329                        .expected("string", unexpected)
330                        .context(SyntaxSnafu);
331                }
332            }
333        } else {
334            None
335        };
336
337        self.parser
338            .expect_keyword(Keyword::AS)
339            .context(SyntaxSnafu)?;
340
341        let query = Box::new(self.parse_sql_or_tql(true)?);
342
343        Ok(Statement::CreateFlow(CreateFlow {
344            flow_name,
345            sink_table_name: output_table_name,
346            or_replace,
347            if_not_exists,
348            expire_after,
349            eval_interval,
350            comment,
351            query,
352        }))
353    }
354
355    fn parse_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
356        let start_loc = self.parser.peek_token().span.start;
357        let start_index = location_to_index(self.sql, &start_loc);
358
359        // only accept sql or tql
360        let query = match self.parser.peek_token().token {
361            Token::Word(w) => match w.keyword {
362                Keyword::SELECT => self.parse_query(),
363                Keyword::NoKeyword
364                    if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
365                {
366                    self.parse_tql(require_now_expr)
367                }
368
369                _ => self.unsupported(self.peek_token_as_string()),
370            },
371            _ => self.unsupported(self.peek_token_as_string()),
372        }?;
373
374        let end_token = self.parser.peek_token();
375
376        let raw_query = if end_token == Token::EOF {
377            &self.sql[start_index..]
378        } else {
379            let end_loc = end_token.span.end;
380            let end_index = location_to_index(self.sql, &end_loc);
381            &self.sql[start_index..end_index.min(self.sql.len())]
382        };
383        let raw_query = raw_query.trim_end_matches(";");
384        let query = SqlOrTql::try_from_statement(query, raw_query)?;
385        Ok(query)
386    }
387
388    /// Parse the interval expr to duration in seconds.
389    fn parse_interval_no_month(&mut self, context: &str) -> Result<i64> {
390        let interval = self.parse_interval_month_day_nano()?.0;
391        if interval.months != 0 {
392            return InvalidIntervalSnafu {
393                reason: format!("Interval with months is not allowed in {context}"),
394            }
395            .fail();
396        }
397        Ok(
398            interval.nanoseconds / 1_000_000_000
399                + interval.days as i64 * 60 * 60 * 24
400                + interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, // 1 month=365.25/12=30.44 days
401                                                                       // this is to keep the same as https://docs.rs/humantime/latest/humantime/fn.parse_duration.html
402                                                                       // which we use in database to parse i.e. ttl interval and many other intervals
403        )
404    }
405
406    /// Parse interval expr to [`IntervalMonthDayNano`].
407    fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> {
408        let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
409        let raw_interval_expr = interval_expr.to_string();
410        let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone(), false)?
411            .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
412            .ok()
413            .with_context(|| InvalidIntervalSnafu {
414                reason: format!("cannot cast {} to interval type", interval_expr),
415            })?;
416        if let ScalarValue::IntervalMonthDayNano(Some(interval)) = interval {
417            Ok((interval, raw_interval_expr))
418        } else {
419            unreachable!()
420        }
421    }
422
423    fn parse_if_not_exist(&mut self) -> Result<bool> {
424        match self.parser.peek_token().token {
425            Token::Word(w) if Keyword::IF != w.keyword => return Ok(false),
426            _ => {}
427        }
428
429        if self.parser.parse_keywords(&[Keyword::IF, Keyword::NOT]) {
430            return self
431                .parser
432                .expect_keyword(Keyword::EXISTS)
433                .map(|_| true)
434                .context(UnexpectedSnafu {
435                    expected: "EXISTS",
436                    actual: self.peek_token_as_string(),
437                });
438        }
439
440        if self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]) {
441            return UnsupportedSnafu { keyword: "EXISTS" }.fail();
442        }
443
444        Ok(false)
445    }
446
447    fn parse_create_table_options(&mut self) -> Result<OptionMap> {
448        let options = self
449            .parser
450            .parse_options(Keyword::WITH)
451            .context(SyntaxSnafu)?
452            .into_iter()
453            .map(parse_option_string)
454            .collect::<Result<HashMap<String, OptionValue>>>()?;
455        for key in options.keys() {
456            ensure!(validate_table_option(key), InvalidTableOptionSnafu { key });
457        }
458        Ok(OptionMap::new(options))
459    }
460
461    /// "PARTITION ON COLUMNS (...)" clause
462    fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
463        if !self.parser.parse_keyword(Keyword::PARTITION) {
464            return Ok(None);
465        }
466        self.parser
467            .expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
468            .context(error::UnexpectedSnafu {
469                expected: "ON, COLUMNS",
470                actual: self.peek_token_as_string(),
471            })?;
472
473        let raw_column_list = self
474            .parser
475            .parse_parenthesized_column_list(Mandatory, false)
476            .context(error::SyntaxSnafu)?;
477        let column_list = raw_column_list
478            .into_iter()
479            .map(Self::canonicalize_identifier)
480            .collect();
481
482        let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
483
484        Ok(Some(Partitions { column_list, exprs }))
485    }
486
487    fn parse_partition_entry(&mut self) -> Result<Expr> {
488        self.parser.parse_expr().context(error::SyntaxSnafu)
489    }
490
491    /// Parse a comma-separated list wrapped by "()", and of which all items accepted by `F`
492    fn parse_comma_separated<T, F>(&mut self, mut f: F) -> Result<Vec<T>>
493    where
494        F: FnMut(&mut ParserContext<'a>) -> Result<T>,
495    {
496        self.parser
497            .expect_token(&Token::LParen)
498            .context(error::UnexpectedSnafu {
499                expected: "(",
500                actual: self.peek_token_as_string(),
501            })?;
502
503        let mut values = vec![];
504        while self.parser.peek_token() != Token::RParen {
505            values.push(f(self)?);
506            if !self.parser.consume_token(&Token::Comma) {
507                break;
508            }
509        }
510
511        self.parser
512            .expect_token(&Token::RParen)
513            .context(error::UnexpectedSnafu {
514                expected: ")",
515                actual: self.peek_token_as_string(),
516            })?;
517
518        Ok(values)
519    }
520
521    /// Parse the columns and constraints.
522    fn parse_columns(&mut self) -> Result<(Vec<Column>, Vec<TableConstraint>)> {
523        let mut columns = vec![];
524        let mut constraints = vec![];
525        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
526            return Ok((columns, constraints));
527        }
528
529        loop {
530            if let Some(constraint) = self.parse_optional_table_constraint()? {
531                constraints.push(constraint);
532            } else if let Token::Word(_) = self.parser.peek_token().token {
533                self.parse_column(&mut columns, &mut constraints)?;
534            } else {
535                return self.expected(
536                    "column name or constraint definition",
537                    self.parser.peek_token(),
538                );
539            }
540            let comma = self.parser.consume_token(&Token::Comma);
541            if self.parser.consume_token(&Token::RParen) {
542                // allow a trailing comma, even though it's not in standard
543                break;
544            } else if !comma {
545                return self.expected(
546                    "',' or ')' after column definition",
547                    self.parser.peek_token(),
548                );
549            }
550        }
551
552        Ok((columns, constraints))
553    }
554
555    fn parse_column(
556        &mut self,
557        columns: &mut Vec<Column>,
558        constraints: &mut Vec<TableConstraint>,
559    ) -> Result<()> {
560        let mut column = self.parse_column_def()?;
561
562        let mut time_index_opt_idx = None;
563        for (index, opt) in column.options().iter().enumerate() {
564            if let ColumnOption::DialectSpecific(tokens) = &opt.option
565                && matches!(
566                    &tokens[..],
567                    [
568                        Token::Word(Word {
569                            keyword: Keyword::TIME,
570                            ..
571                        }),
572                        Token::Word(Word {
573                            keyword: Keyword::INDEX,
574                            ..
575                        })
576                    ]
577                )
578            {
579                ensure!(
580                    time_index_opt_idx.is_none(),
581                    InvalidColumnOptionSnafu {
582                        name: column.name().to_string(),
583                        msg: "duplicated time index",
584                    }
585                );
586                time_index_opt_idx = Some(index);
587
588                let constraint = TableConstraint::TimeIndex {
589                    column: Ident::new(column.name().value.clone()),
590                };
591                constraints.push(constraint);
592            }
593        }
594
595        if let Some(index) = time_index_opt_idx {
596            ensure!(
597                !column.options().contains(&ColumnOptionDef {
598                    option: ColumnOption::Null,
599                    name: None,
600                }),
601                InvalidColumnOptionSnafu {
602                    name: column.name().to_string(),
603                    msg: "time index column can't be null",
604                }
605            );
606
607            // The timestamp type may be an alias type, we have to retrieve the actual type.
608            let data_type = get_unalias_type(column.data_type());
609            ensure!(
610                matches!(data_type, DataType::Timestamp(_, _)),
611                InvalidColumnOptionSnafu {
612                    name: column.name().to_string(),
613                    msg: "time index column data type should be timestamp",
614                }
615            );
616
617            let not_null_opt = ColumnOptionDef {
618                option: ColumnOption::NotNull,
619                name: None,
620            };
621
622            if !column.options().contains(&not_null_opt) {
623                column.mut_options().push(not_null_opt);
624            }
625
626            let _ = column.mut_options().remove(index);
627        }
628
629        columns.push(column);
630
631        Ok(())
632    }
633
634    /// Parse the column name and check if it's valid.
635    fn parse_column_name(&mut self) -> std::result::Result<Ident, ParserError> {
636        let name = self.parser.parse_identifier()?;
637        if name.quote_style.is_none() &&
638        // "ALL_KEYWORDS" are sorted.
639            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()
640        {
641            return Err(ParserError::ParserError(format!(
642                "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
643                &name.value
644            )));
645        }
646
647        Ok(name)
648    }
649
650    pub fn parse_column_def(&mut self) -> Result<Column> {
651        let name = self.parse_column_name().context(SyntaxSnafu)?;
652        let parser = &mut self.parser;
653
654        ensure!(
655            !(name.quote_style.is_none() &&
656            // "ALL_KEYWORDS" are sorted.
657            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()),
658            InvalidSqlSnafu {
659                msg: format!(
660                    "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
661                    &name.value
662                ),
663            }
664        );
665
666        let mut extensions = ColumnExtensions::default();
667
668        let data_type = parser.parse_data_type().context(SyntaxSnafu)?;
669        // Must immediately parse the JSON datatype format because it is closely after the "JSON"
670        // datatype, like this: "JSON(format = ...)".
671        if matches!(data_type, DataType::JSON) {
672            extensions.json_datatype_options = json::parse_json_datatype_options(parser)?;
673        }
674
675        let mut options = vec![];
676        loop {
677            if parser.parse_keyword(Keyword::CONSTRAINT) {
678                let name = Some(parser.parse_identifier().context(SyntaxSnafu)?);
679                if let Some(option) = Self::parse_optional_column_option(parser)? {
680                    options.push(ColumnOptionDef { name, option });
681                } else {
682                    return parser
683                        .expected(
684                            "constraint details after CONSTRAINT <name>",
685                            parser.peek_token(),
686                        )
687                        .context(SyntaxSnafu);
688                }
689            } else if let Some(option) = Self::parse_optional_column_option(parser)? {
690                options.push(ColumnOptionDef { name: None, option });
691            } else if !Self::parse_column_extensions(parser, &name, &data_type, &mut extensions)? {
692                break;
693            };
694        }
695
696        Ok(Column {
697            column_def: ColumnDef {
698                name: Self::canonicalize_identifier(name),
699                data_type,
700                options,
701            },
702            extensions,
703        })
704    }
705
706    fn parse_optional_column_option(parser: &mut Parser<'_>) -> Result<Option<ColumnOption>> {
707        if parser.parse_keywords(&[Keyword::CHARACTER, Keyword::SET]) {
708            Ok(Some(ColumnOption::CharacterSet(
709                parser.parse_object_name(false).context(SyntaxSnafu)?,
710            )))
711        } else if parser.parse_keywords(&[Keyword::NOT, Keyword::NULL]) {
712            Ok(Some(ColumnOption::NotNull))
713        } else if parser.parse_keywords(&[Keyword::COMMENT]) {
714            match parser.next_token() {
715                TokenWithSpan {
716                    token: Token::SingleQuotedString(value, ..),
717                    ..
718                } => Ok(Some(ColumnOption::Comment(value))),
719                unexpected => parser.expected("string", unexpected).context(SyntaxSnafu),
720            }
721        } else if parser.parse_keyword(Keyword::NULL) {
722            Ok(Some(ColumnOption::Null))
723        } else if parser.parse_keyword(Keyword::DEFAULT) {
724            Ok(Some(ColumnOption::Default(
725                parser.parse_expr().context(SyntaxSnafu)?,
726            )))
727        } else if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
728            Ok(Some(ColumnOption::Unique {
729                is_primary: true,
730                characteristics: None,
731            }))
732        } else if parser.parse_keyword(Keyword::UNIQUE) {
733            Ok(Some(ColumnOption::Unique {
734                is_primary: false,
735                characteristics: None,
736            }))
737        } else if parser.parse_keywords(&[Keyword::TIME, Keyword::INDEX]) {
738            // Use a DialectSpecific option for time index
739            Ok(Some(ColumnOption::DialectSpecific(vec![
740                Token::Word(Word {
741                    value: "TIME".to_string(),
742                    quote_style: None,
743                    keyword: Keyword::TIME,
744                }),
745                Token::Word(Word {
746                    value: "INDEX".to_string(),
747                    quote_style: None,
748                    keyword: Keyword::INDEX,
749                }),
750            ])))
751        } else {
752            Ok(None)
753        }
754    }
755
756    /// Parse a column option extensions.
757    ///
758    /// This function will handle:
759    /// - Vector type
760    /// - Indexes
761    fn parse_column_extensions(
762        parser: &mut Parser<'_>,
763        column_name: &Ident,
764        column_type: &DataType,
765        column_extensions: &mut ColumnExtensions,
766    ) -> Result<bool> {
767        if let DataType::Custom(name, tokens) = column_type
768            && name.0.len() == 1
769            && &name.0[0].to_string_unquoted().to_uppercase() == "VECTOR"
770        {
771            ensure!(
772                tokens.len() == 1,
773                InvalidColumnOptionSnafu {
774                    name: column_name.to_string(),
775                    msg: "VECTOR type should have dimension",
776                }
777            );
778
779            let dimension =
780                tokens[0]
781                    .parse::<u32>()
782                    .ok()
783                    .with_context(|| InvalidColumnOptionSnafu {
784                        name: column_name.to_string(),
785                        msg: "dimension should be a positive integer",
786                    })?;
787
788            let options = OptionMap::from([(VECTOR_OPT_DIM.to_string(), dimension.to_string())]);
789            column_extensions.vector_options = Some(options);
790        }
791
792        // parse index options in column definition
793        let mut is_index_declared = false;
794
795        // skipping index
796        if let Token::Word(word) = parser.peek_token().token
797            && word.value.eq_ignore_ascii_case(SKIPPING)
798        {
799            parser.next_token();
800            // Consume `INDEX` keyword
801            ensure!(
802                parser.parse_keyword(Keyword::INDEX),
803                InvalidColumnOptionSnafu {
804                    name: column_name.to_string(),
805                    msg: "expect INDEX after SKIPPING keyword",
806                }
807            );
808            ensure!(
809                column_extensions.skipping_index_options.is_none(),
810                InvalidColumnOptionSnafu {
811                    name: column_name.to_string(),
812                    msg: "duplicated SKIPPING index option",
813                }
814            );
815
816            let options = parser
817                .parse_options(Keyword::WITH)
818                .context(error::SyntaxSnafu)?
819                .into_iter()
820                .map(parse_option_string)
821                .collect::<Result<Vec<_>>>()?;
822
823            for (key, _) in options.iter() {
824                ensure!(
825                    validate_column_skipping_index_create_option(key),
826                    InvalidColumnOptionSnafu {
827                        name: column_name.to_string(),
828                        msg: format!("invalid SKIPPING INDEX option: {key}"),
829                    }
830                );
831            }
832
833            let options = OptionMap::new(options);
834            column_extensions.skipping_index_options = Some(options);
835            is_index_declared |= true;
836        }
837
838        // fulltext index
839        if parser.parse_keyword(Keyword::FULLTEXT) {
840            // Consume `INDEX` keyword
841            ensure!(
842                parser.parse_keyword(Keyword::INDEX),
843                InvalidColumnOptionSnafu {
844                    name: column_name.to_string(),
845                    msg: "expect INDEX after FULLTEXT keyword",
846                }
847            );
848
849            ensure!(
850                column_extensions.fulltext_index_options.is_none(),
851                InvalidColumnOptionSnafu {
852                    name: column_name.to_string(),
853                    msg: "duplicated FULLTEXT INDEX option",
854                }
855            );
856
857            let column_type = get_unalias_type(column_type);
858            let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
859            ensure!(
860                data_type == ConcreteDataType::string_datatype(),
861                InvalidColumnOptionSnafu {
862                    name: column_name.to_string(),
863                    msg: "FULLTEXT index only supports string type",
864                }
865            );
866
867            let options = parser
868                .parse_options(Keyword::WITH)
869                .context(error::SyntaxSnafu)?
870                .into_iter()
871                .map(parse_option_string)
872                .collect::<Result<Vec<_>>>()?;
873
874            for (key, _) in options.iter() {
875                ensure!(
876                    validate_column_fulltext_create_option(key),
877                    InvalidColumnOptionSnafu {
878                        name: column_name.to_string(),
879                        msg: format!("invalid FULLTEXT INDEX option: {key}"),
880                    }
881                );
882            }
883
884            let options = OptionMap::new(options);
885            column_extensions.fulltext_index_options = Some(options);
886            is_index_declared |= true;
887        }
888
889        // inverted index
890        if let Token::Word(word) = parser.peek_token().token
891            && word.value.eq_ignore_ascii_case(INVERTED)
892        {
893            parser.next_token();
894            // Consume `INDEX` keyword
895            ensure!(
896                parser.parse_keyword(Keyword::INDEX),
897                InvalidColumnOptionSnafu {
898                    name: column_name.to_string(),
899                    msg: "expect INDEX after INVERTED keyword",
900                }
901            );
902
903            ensure!(
904                column_extensions.inverted_index_options.is_none(),
905                InvalidColumnOptionSnafu {
906                    name: column_name.to_string(),
907                    msg: "duplicated INVERTED index option",
908                }
909            );
910
911            // inverted index doesn't have options, skipping `WITH`
912            // try cache `WITH` and throw error
913            let with_token = parser.peek_token();
914            ensure!(
915                with_token.token
916                    != Token::Word(Word {
917                        value: "WITH".to_string(),
918                        keyword: Keyword::WITH,
919                        quote_style: None,
920                    }),
921                InvalidColumnOptionSnafu {
922                    name: column_name.to_string(),
923                    msg: "INVERTED index doesn't support options",
924                }
925            );
926
927            column_extensions.inverted_index_options = Some(OptionMap::default());
928            is_index_declared |= true;
929        }
930
931        Ok(is_index_declared)
932    }
933
934    fn parse_optional_table_constraint(&mut self) -> Result<Option<TableConstraint>> {
935        match self.parser.next_token() {
936            TokenWithSpan {
937                token: Token::Word(w),
938                ..
939            } if w.keyword == Keyword::PRIMARY => {
940                self.parser
941                    .expect_keyword(Keyword::KEY)
942                    .context(error::UnexpectedSnafu {
943                        expected: "KEY",
944                        actual: self.peek_token_as_string(),
945                    })?;
946                let raw_columns = self
947                    .parser
948                    .parse_parenthesized_column_list(Mandatory, false)
949                    .context(error::SyntaxSnafu)?;
950                let columns = raw_columns
951                    .into_iter()
952                    .map(Self::canonicalize_identifier)
953                    .collect();
954                Ok(Some(TableConstraint::PrimaryKey { columns }))
955            }
956            TokenWithSpan {
957                token: Token::Word(w),
958                ..
959            } if w.keyword == Keyword::TIME => {
960                self.parser
961                    .expect_keyword(Keyword::INDEX)
962                    .context(error::UnexpectedSnafu {
963                        expected: "INDEX",
964                        actual: self.peek_token_as_string(),
965                    })?;
966
967                let raw_columns = self
968                    .parser
969                    .parse_parenthesized_column_list(Mandatory, false)
970                    .context(error::SyntaxSnafu)?;
971                let mut columns = raw_columns
972                    .into_iter()
973                    .map(Self::canonicalize_identifier)
974                    .collect::<Vec<_>>();
975
976                ensure!(
977                    columns.len() == 1,
978                    InvalidTimeIndexSnafu {
979                        msg: "it should contain only one column in time index",
980                    }
981                );
982
983                Ok(Some(TableConstraint::TimeIndex {
984                    column: columns.pop().unwrap(),
985                }))
986            }
987            _ => {
988                self.parser.prev_token();
989                Ok(None)
990            }
991        }
992    }
993
994    /// Parses the set of valid formats
995    fn parse_table_engine(&mut self, default: &str) -> Result<String> {
996        if !self.consume_token(ENGINE) {
997            return Ok(default.to_string());
998        }
999
1000        self.parser
1001            .expect_token(&Token::Eq)
1002            .context(error::UnexpectedSnafu {
1003                expected: "=",
1004                actual: self.peek_token_as_string(),
1005            })?;
1006
1007        let token = self.parser.next_token();
1008        if let Token::Word(w) = token.token {
1009            Ok(w.value)
1010        } else {
1011            self.expected("'Engine' is missing", token)
1012        }
1013    }
1014}
1015
1016fn validate_time_index(columns: &[Column], constraints: &[TableConstraint]) -> Result<()> {
1017    let time_index_constraints: Vec<_> = constraints
1018        .iter()
1019        .filter_map(|c| match c {
1020            TableConstraint::TimeIndex { column } => Some(column),
1021            _ => None,
1022        })
1023        .unique()
1024        .collect();
1025
1026    ensure!(!time_index_constraints.is_empty(), MissingTimeIndexSnafu);
1027    ensure!(
1028        time_index_constraints.len() == 1,
1029        InvalidTimeIndexSnafu {
1030            msg: format!(
1031                "expected only one time index constraint but actual {}",
1032                time_index_constraints.len()
1033            ),
1034        }
1035    );
1036
1037    // It's safe to use time_index_constraints[0][0],
1038    // we already check the bound above.
1039    let time_index_column_ident = &time_index_constraints[0];
1040    let time_index_column = columns
1041        .iter()
1042        .find(|c| c.name().value == *time_index_column_ident.value)
1043        .with_context(|| InvalidTimeIndexSnafu {
1044            msg: format!(
1045                "time index column {} not found in columns",
1046                time_index_column_ident
1047            ),
1048        })?;
1049
1050    let time_index_data_type = get_unalias_type(time_index_column.data_type());
1051    ensure!(
1052        matches!(time_index_data_type, DataType::Timestamp(_, _)),
1053        InvalidColumnOptionSnafu {
1054            name: time_index_column.name().to_string(),
1055            msg: "time index column data type should be timestamp",
1056        }
1057    );
1058
1059    Ok(())
1060}
1061
1062fn get_unalias_type(data_type: &DataType) -> DataType {
1063    match data_type {
1064        DataType::Custom(name, tokens) if name.0.len() == 1 && tokens.is_empty() => {
1065            if let Some(real_type) =
1066                get_data_type_by_alias_name(name.0[0].to_string_unquoted().as_str())
1067            {
1068                real_type
1069            } else {
1070                data_type.clone()
1071            }
1072        }
1073        _ => data_type.clone(),
1074    }
1075}
1076
1077fn validate_partitions(columns: &[Column], partitions: &Partitions) -> Result<()> {
1078    let partition_columns = ensure_partition_columns_defined(columns, partitions)?;
1079
1080    ensure_exprs_are_binary(&partitions.exprs, &partition_columns)?;
1081
1082    Ok(())
1083}
1084
1085/// Ensure all exprs are binary expr and all the columns are defined in the column list.
1086fn ensure_exprs_are_binary(exprs: &[Expr], columns: &[&Column]) -> Result<()> {
1087    for expr in exprs {
1088        // The first level must be binary expr
1089        if let Expr::BinaryOp { left, op: _, right } = expr {
1090            ensure_one_expr(left, columns)?;
1091            ensure_one_expr(right, columns)?;
1092        } else {
1093            return error::InvalidSqlSnafu {
1094                msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1095            }
1096            .fail();
1097        }
1098    }
1099    Ok(())
1100}
1101
1102/// Check if the expr is a binary expr, an ident or a literal value.
1103/// If is ident, then check it is in the column list.
1104/// This recursive function is intended to be used by [ensure_exprs_are_binary].
1105fn ensure_one_expr(expr: &Expr, columns: &[&Column]) -> Result<()> {
1106    match expr {
1107        Expr::BinaryOp { left, op: _, right } => {
1108            ensure_one_expr(left, columns)?;
1109            ensure_one_expr(right, columns)?;
1110            Ok(())
1111        }
1112        Expr::Identifier(ident) => {
1113            let column_name = &ident.value;
1114            ensure!(
1115                columns.iter().any(|c| &c.name().value == column_name),
1116                error::InvalidSqlSnafu {
1117                    msg: format!(
1118                        "Column {:?} in rule expr is not referenced in PARTITION ON",
1119                        column_name
1120                    ),
1121                }
1122            );
1123            Ok(())
1124        }
1125        Expr::Value(_) => Ok(()),
1126        Expr::UnaryOp { expr, .. } => {
1127            ensure_one_expr(expr, columns)?;
1128            Ok(())
1129        }
1130        _ => error::InvalidSqlSnafu {
1131            msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1132        }
1133        .fail(),
1134    }
1135}
1136
1137/// Ensure that all columns used in "PARTITION ON COLUMNS" are defined in create table.
1138fn ensure_partition_columns_defined<'a>(
1139    columns: &'a [Column],
1140    partitions: &'a Partitions,
1141) -> Result<Vec<&'a Column>> {
1142    partitions
1143        .column_list
1144        .iter()
1145        .map(|x| {
1146            let x = ParserContext::canonicalize_identifier(x.clone());
1147            // Normally the columns in "create table" won't be too many,
1148            // a linear search to find the target every time is fine.
1149            columns
1150                .iter()
1151                .find(|c| *c.name().value == x.value)
1152                .context(error::InvalidSqlSnafu {
1153                    msg: format!("Partition column {:?} not defined", x.value),
1154                })
1155        })
1156        .collect::<Result<Vec<&Column>>>()
1157}
1158
1159#[cfg(test)]
1160mod tests {
1161    use std::assert_matches::assert_matches;
1162    use std::collections::HashMap;
1163
1164    use common_catalog::consts::FILE_ENGINE;
1165    use common_error::ext::ErrorExt;
1166    use sqlparser::ast::ColumnOption::NotNull;
1167    use sqlparser::ast::{BinaryOperator, Expr, ObjectName, ObjectNamePart, Value};
1168    use sqlparser::dialect::GenericDialect;
1169    use sqlparser::tokenizer::Tokenizer;
1170
1171    use super::*;
1172    use crate::dialect::GreptimeDbDialect;
1173    use crate::parser::ParseOptions;
1174
1175    #[test]
1176    fn test_parse_create_table_like() {
1177        let sql = "CREATE TABLE t1 LIKE t2";
1178        let stmts =
1179            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1180                .unwrap();
1181
1182        assert_eq!(1, stmts.len());
1183        match &stmts[0] {
1184            Statement::CreateTableLike(c) => {
1185                assert_eq!(c.table_name.to_string(), "t1");
1186                assert_eq!(c.source_name.to_string(), "t2");
1187            }
1188            _ => unreachable!(),
1189        }
1190    }
1191
1192    #[test]
1193    fn test_validate_external_table_options() {
1194        let sql = "CREATE EXTERNAL TABLE city (
1195            host string,
1196            ts timestamp,
1197            cpu float64 default 0,
1198            memory float64,
1199            TIME INDEX (ts),
1200            PRIMARY KEY(ts, host)
1201        ) with(location='/var/data/city.csv',format='csv',foo='bar');";
1202
1203        let result =
1204            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1205        assert!(matches!(
1206            result,
1207            Err(error::Error::InvalidTableOption { .. })
1208        ));
1209    }
1210
1211    #[test]
1212    fn test_parse_create_external_table() {
1213        struct Test<'a> {
1214            sql: &'a str,
1215            expected_table_name: &'a str,
1216            expected_options: HashMap<&'a str, &'a str>,
1217            expected_engine: &'a str,
1218            expected_if_not_exist: bool,
1219        }
1220
1221        let tests = [
1222            Test {
1223                sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
1224                expected_table_name: "city",
1225                expected_options: HashMap::from([
1226                    ("location", "/var/data/city.csv"),
1227                    ("format", "csv"),
1228                ]),
1229                expected_engine: FILE_ENGINE,
1230                expected_if_not_exist: false,
1231            },
1232            Test {
1233                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');",
1234                expected_table_name: "city",
1235                expected_options: HashMap::from([
1236                    ("location", "/var/data/city.csv"),
1237                    ("format", "csv"),
1238                ]),
1239                expected_engine: "foo",
1240                expected_if_not_exist: true,
1241            },
1242            Test {
1243                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv','compaction.type'='bar');",
1244                expected_table_name: "city",
1245                expected_options: HashMap::from([
1246                    ("location", "/var/data/city.csv"),
1247                    ("format", "csv"),
1248                    ("compaction.type", "bar"),
1249                ]),
1250                expected_engine: "foo",
1251                expected_if_not_exist: true,
1252            },
1253        ];
1254
1255        for test in tests {
1256            let stmts = ParserContext::create_with_dialect(
1257                test.sql,
1258                &GreptimeDbDialect {},
1259                ParseOptions::default(),
1260            )
1261            .unwrap();
1262            assert_eq!(1, stmts.len());
1263            match &stmts[0] {
1264                Statement::CreateExternalTable(c) => {
1265                    assert_eq!(c.name.to_string(), test.expected_table_name.to_string());
1266                    assert_eq!(c.options.to_str_map(), test.expected_options);
1267                    assert_eq!(c.if_not_exists, test.expected_if_not_exist);
1268                    assert_eq!(c.engine, test.expected_engine);
1269                }
1270                _ => unreachable!(),
1271            }
1272        }
1273    }
1274
1275    #[test]
1276    fn test_parse_create_external_table_with_schema() {
1277        let sql = "CREATE EXTERNAL TABLE city (
1278            host string,
1279            ts timestamp,
1280            cpu float32 default 0,
1281            memory float64,
1282            TIME INDEX (ts),
1283            PRIMARY KEY(ts, host),
1284        ) with(location='/var/data/city.csv',format='csv');";
1285
1286        let options = HashMap::from([("location", "/var/data/city.csv"), ("format", "csv")]);
1287
1288        let stmts =
1289            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1290                .unwrap();
1291        assert_eq!(1, stmts.len());
1292        match &stmts[0] {
1293            Statement::CreateExternalTable(c) => {
1294                assert_eq!(c.name.to_string(), "city");
1295                assert_eq!(c.options.to_str_map(), options);
1296
1297                let columns = &c.columns;
1298                assert_column_def(&columns[0].column_def, "host", "STRING");
1299                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
1300                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
1301                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
1302
1303                let constraints = &c.constraints;
1304                assert_eq!(
1305                    &constraints[0],
1306                    &TableConstraint::TimeIndex {
1307                        column: Ident::new("ts"),
1308                    }
1309                );
1310                assert_eq!(
1311                    &constraints[1],
1312                    &TableConstraint::PrimaryKey {
1313                        columns: vec![Ident::new("ts"), Ident::new("host")]
1314                    }
1315                );
1316            }
1317            _ => unreachable!(),
1318        }
1319    }
1320
1321    #[test]
1322    fn test_parse_create_database() {
1323        let sql = "create database";
1324        let result =
1325            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1326        assert!(
1327            result
1328                .unwrap_err()
1329                .to_string()
1330                .contains("Unexpected token while parsing SQL statement")
1331        );
1332
1333        let sql = "create database prometheus";
1334        let stmts =
1335            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1336                .unwrap();
1337
1338        assert_eq!(1, stmts.len());
1339        match &stmts[0] {
1340            Statement::CreateDatabase(c) => {
1341                assert_eq!(c.name.to_string(), "prometheus");
1342                assert!(!c.if_not_exists);
1343            }
1344            _ => unreachable!(),
1345        }
1346
1347        let sql = "create database if not exists prometheus";
1348        let stmts =
1349            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1350                .unwrap();
1351
1352        assert_eq!(1, stmts.len());
1353        match &stmts[0] {
1354            Statement::CreateDatabase(c) => {
1355                assert_eq!(c.name.to_string(), "prometheus");
1356                assert!(c.if_not_exists);
1357            }
1358            _ => unreachable!(),
1359        }
1360
1361        let sql = "CREATE DATABASE `fOo`";
1362        let result =
1363            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1364        let stmts = result.unwrap();
1365        match &stmts.last().unwrap() {
1366            Statement::CreateDatabase(c) => {
1367                assert_eq!(c.name, vec![Ident::with_quote('`', "fOo")].into());
1368                assert!(!c.if_not_exists);
1369            }
1370            _ => unreachable!(),
1371        }
1372
1373        let sql = "CREATE DATABASE prometheus with (ttl='1h');";
1374        let result =
1375            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1376        let stmts = result.unwrap();
1377        match &stmts[0] {
1378            Statement::CreateDatabase(c) => {
1379                assert_eq!(c.name.to_string(), "prometheus");
1380                assert!(!c.if_not_exists);
1381                assert_eq!(c.options.get("ttl").unwrap(), "1h");
1382            }
1383            _ => unreachable!(),
1384        }
1385    }
1386
1387    #[test]
1388    fn test_parse_create_flow_more_testcases() {
1389        use pretty_assertions::assert_eq;
1390        fn parse_create_flow(sql: &str) -> CreateFlow {
1391            let stmts = ParserContext::create_with_dialect(
1392                sql,
1393                &GreptimeDbDialect {},
1394                ParseOptions::default(),
1395            )
1396            .unwrap();
1397            assert_eq!(1, stmts.len());
1398            match &stmts[0] {
1399                Statement::CreateFlow(c) => c.clone(),
1400                _ => unreachable!(),
1401            }
1402        }
1403        struct CreateFlowWoutQuery {
1404            /// Flow name
1405            pub flow_name: ObjectName,
1406            /// Output (sink) table name
1407            pub sink_table_name: ObjectName,
1408            /// Whether to replace existing task
1409            pub or_replace: bool,
1410            /// Create if not exist
1411            pub if_not_exists: bool,
1412            /// `EXPIRE AFTER`
1413            /// Duration in second as `i64`
1414            pub expire_after: Option<i64>,
1415            /// Comment string
1416            pub comment: Option<String>,
1417        }
1418        let testcases = vec![
1419            (
1420                r"
1421CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1422SINK TO schema_1.table_1
1423EXPIRE AFTER INTERVAL '5 minutes'
1424COMMENT 'test comment'
1425AS
1426SELECT max(c1), min(c2) FROM schema_2.table_2;",
1427                CreateFlowWoutQuery {
1428                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1429                    sink_table_name: ObjectName::from(vec![
1430                        Ident::new("schema_1"),
1431                        Ident::new("table_1"),
1432                    ]),
1433                    or_replace: true,
1434                    if_not_exists: true,
1435                    expire_after: Some(300),
1436                    comment: Some("test comment".to_string()),
1437                },
1438            ),
1439            (
1440                r"
1441CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1442SINK TO schema_1.table_1
1443EXPIRE AFTER INTERVAL '300 s'
1444COMMENT 'test comment'
1445AS
1446SELECT max(c1), min(c2) FROM schema_2.table_2;",
1447                CreateFlowWoutQuery {
1448                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1449                    sink_table_name: ObjectName::from(vec![
1450                        Ident::new("schema_1"),
1451                        Ident::new("table_1"),
1452                    ]),
1453                    or_replace: true,
1454                    if_not_exists: true,
1455                    expire_after: Some(300),
1456                    comment: Some("test comment".to_string()),
1457                },
1458            ),
1459            (
1460                r"
1461CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1462SINK TO schema_1.table_1
1463EXPIRE AFTER '5 minutes'
1464COMMENT 'test comment'
1465AS
1466SELECT max(c1), min(c2) FROM schema_2.table_2;",
1467                CreateFlowWoutQuery {
1468                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1469                    sink_table_name: ObjectName::from(vec![
1470                        Ident::new("schema_1"),
1471                        Ident::new("table_1"),
1472                    ]),
1473                    or_replace: true,
1474                    if_not_exists: true,
1475                    expire_after: Some(300),
1476                    comment: Some("test comment".to_string()),
1477                },
1478            ),
1479            (
1480                r"
1481CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1482SINK TO schema_1.table_1
1483EXPIRE AFTER '300 s'
1484COMMENT 'test comment'
1485AS
1486SELECT max(c1), min(c2) FROM schema_2.table_2;",
1487                CreateFlowWoutQuery {
1488                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1489                    sink_table_name: ObjectName::from(vec![
1490                        Ident::new("schema_1"),
1491                        Ident::new("table_1"),
1492                    ]),
1493                    or_replace: true,
1494                    if_not_exists: true,
1495                    expire_after: Some(300),
1496                    comment: Some("test comment".to_string()),
1497                },
1498            ),
1499            (
1500                r"
1501CREATE FLOW `task_2`
1502SINK TO schema_1.table_1
1503EXPIRE AFTER '2 days 1h 2 min'
1504AS
1505SELECT max(c1), min(c2) FROM schema_2.table_2;",
1506                CreateFlowWoutQuery {
1507                    flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_2")]),
1508                    sink_table_name: ObjectName::from(vec![
1509                        Ident::new("schema_1"),
1510                        Ident::new("table_1"),
1511                    ]),
1512                    or_replace: false,
1513                    if_not_exists: false,
1514                    expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1515                    comment: None,
1516                },
1517            ),
1518            (
1519                r"
1520create flow `task_3`
1521sink to schema_1.table_1
1522expire after '10 minutes'
1523as
1524select max(c1), min(c2) from schema_2.table_2;",
1525                CreateFlowWoutQuery {
1526                    flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_3")]),
1527                    sink_table_name: ObjectName::from(vec![
1528                        Ident::new("schema_1"),
1529                        Ident::new("table_1"),
1530                    ]),
1531                    or_replace: false,
1532                    if_not_exists: false,
1533                    expire_after: Some(600), // 10 minutes in seconds
1534                    comment: None,
1535                },
1536            ),
1537            (
1538                r"
1539create or replace flow if not exists task_4
1540sink to schema_1.table_1
1541expire after interval '2 hours'
1542comment 'lowercase test'
1543as
1544select max(c1), min(c2) from schema_2.table_2;",
1545                CreateFlowWoutQuery {
1546                    flow_name: ObjectName::from(vec![Ident::new("task_4")]),
1547                    sink_table_name: ObjectName::from(vec![
1548                        Ident::new("schema_1"),
1549                        Ident::new("table_1"),
1550                    ]),
1551                    or_replace: true,
1552                    if_not_exists: true,
1553                    expire_after: Some(7200), // 2 hours in seconds
1554                    comment: Some("lowercase test".to_string()),
1555                },
1556            ),
1557        ];
1558
1559        for (sql, expected) in testcases {
1560            let create_task = parse_create_flow(sql);
1561
1562            let expected = CreateFlow {
1563                flow_name: expected.flow_name,
1564                sink_table_name: expected.sink_table_name,
1565                or_replace: expected.or_replace,
1566                if_not_exists: expected.if_not_exists,
1567                expire_after: expected.expire_after,
1568                eval_interval: None,
1569                comment: expected.comment,
1570                // ignore query parse result
1571                query: create_task.query.clone(),
1572            };
1573
1574            assert_eq!(create_task, expected, "input sql is:\n{sql}");
1575            let show_create = create_task.to_string();
1576            let recreated = parse_create_flow(&show_create);
1577            assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1578        }
1579    }
1580
1581    #[test]
1582    fn test_parse_create_flow() {
1583        use pretty_assertions::assert_eq;
1584        fn parse_create_flow(sql: &str) -> CreateFlow {
1585            let stmts = ParserContext::create_with_dialect(
1586                sql,
1587                &GreptimeDbDialect {},
1588                ParseOptions::default(),
1589            )
1590            .unwrap();
1591            assert_eq!(1, stmts.len());
1592            match &stmts[0] {
1593                Statement::CreateFlow(c) => c.clone(),
1594                _ => panic!("{:?}", stmts[0]),
1595            }
1596        }
1597        struct CreateFlowWoutQuery {
1598            /// Flow name
1599            pub flow_name: ObjectName,
1600            /// Output (sink) table name
1601            pub sink_table_name: ObjectName,
1602            /// Whether to replace existing task
1603            pub or_replace: bool,
1604            /// Create if not exist
1605            pub if_not_exists: bool,
1606            /// `EXPIRE AFTER`
1607            /// Duration in second as `i64`
1608            pub expire_after: Option<i64>,
1609            /// Duration for flow evaluation interval
1610            /// Duration in seconds as `i64`
1611            /// If not set, flow will be evaluated based on time window size and other args.
1612            pub eval_interval: Option<i64>,
1613            /// Comment string
1614            pub comment: Option<String>,
1615        }
1616
1617        // create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE AFTER` and `COMMENT`
1618        let testcases = vec![
1619            (
1620                r"
1621CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1622SINK TO schema_1.table_1
1623EXPIRE AFTER INTERVAL '5 minutes'
1624COMMENT 'test comment'
1625AS
1626SELECT max(c1), min(c2) FROM schema_2.table_2;",
1627                CreateFlowWoutQuery {
1628                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1629                    sink_table_name: ObjectName(vec![
1630                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1631                        ObjectNamePart::Identifier(Ident::new("table_1")),
1632                    ]),
1633                    or_replace: true,
1634                    if_not_exists: true,
1635                    expire_after: Some(300),
1636                    eval_interval: None,
1637                    comment: Some("test comment".to_string()),
1638                },
1639            ),
1640            (
1641                r"
1642CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1643SINK TO schema_1.table_1
1644EXPIRE AFTER INTERVAL '300 s'
1645COMMENT 'test comment'
1646AS
1647SELECT max(c1), min(c2) FROM schema_2.table_2;",
1648                CreateFlowWoutQuery {
1649                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1650                    sink_table_name: ObjectName(vec![
1651                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1652                        ObjectNamePart::Identifier(Ident::new("table_1")),
1653                    ]),
1654                    or_replace: true,
1655                    if_not_exists: true,
1656                    expire_after: Some(300),
1657                    eval_interval: None,
1658                    comment: Some("test comment".to_string()),
1659                },
1660            ),
1661            (
1662                r"
1663CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1664SINK TO schema_1.table_1
1665EXPIRE AFTER '5 minutes'
1666EVAL INTERVAL '10 seconds'
1667COMMENT 'test comment'
1668AS
1669SELECT max(c1), min(c2) FROM schema_2.table_2;",
1670                CreateFlowWoutQuery {
1671                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1672                    sink_table_name: ObjectName(vec![
1673                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1674                        ObjectNamePart::Identifier(Ident::new("table_1")),
1675                    ]),
1676                    or_replace: true,
1677                    if_not_exists: true,
1678                    expire_after: Some(300),
1679                    eval_interval: Some(10),
1680                    comment: Some("test comment".to_string()),
1681                },
1682            ),
1683            (
1684                r"
1685CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1686SINK TO schema_1.table_1
1687EXPIRE AFTER '5 minutes'
1688EVAL INTERVAL INTERVAL '10 seconds'
1689COMMENT 'test comment'
1690AS
1691SELECT max(c1), min(c2) FROM schema_2.table_2;",
1692                CreateFlowWoutQuery {
1693                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1694                    sink_table_name: ObjectName(vec![
1695                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1696                        ObjectNamePart::Identifier(Ident::new("table_1")),
1697                    ]),
1698                    or_replace: true,
1699                    if_not_exists: true,
1700                    expire_after: Some(300),
1701                    eval_interval: Some(10),
1702                    comment: Some("test comment".to_string()),
1703                },
1704            ),
1705            (
1706                r"
1707CREATE FLOW `task_2`
1708SINK TO schema_1.table_1
1709EXPIRE AFTER '2 days 1h 2 min'
1710AS
1711SELECT max(c1), min(c2) FROM schema_2.table_2;",
1712                CreateFlowWoutQuery {
1713                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::with_quote(
1714                        '`', "task_2",
1715                    ))]),
1716                    sink_table_name: ObjectName(vec![
1717                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1718                        ObjectNamePart::Identifier(Ident::new("table_1")),
1719                    ]),
1720                    or_replace: false,
1721                    if_not_exists: false,
1722                    expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1723                    eval_interval: None,
1724                    comment: None,
1725                },
1726            ),
1727        ];
1728
1729        for (sql, expected) in testcases {
1730            let create_task = parse_create_flow(sql);
1731
1732            let expected = CreateFlow {
1733                flow_name: expected.flow_name,
1734                sink_table_name: expected.sink_table_name,
1735                or_replace: expected.or_replace,
1736                if_not_exists: expected.if_not_exists,
1737                expire_after: expected.expire_after,
1738                eval_interval: expected.eval_interval,
1739                comment: expected.comment,
1740                // ignore query parse result
1741                query: create_task.query.clone(),
1742            };
1743
1744            assert_eq!(create_task, expected, "input sql is:\n{sql}");
1745            let show_create = create_task.to_string();
1746            let recreated = parse_create_flow(&show_create);
1747            assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1748        }
1749    }
1750
1751    #[test]
1752    fn test_create_flow_no_month() {
1753        let sql = r"
1754CREATE FLOW `task_2`
1755SINK TO schema_1.table_1
1756EXPIRE AFTER '1 month 2 days 1h 2 min'
1757AS
1758SELECT max(c1), min(c2) FROM schema_2.table_2;";
1759        let stmts =
1760            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1761
1762        assert!(
1763            stmts.is_err()
1764                && stmts
1765                    .unwrap_err()
1766                    .to_string()
1767                    .contains("Interval with months is not allowed")
1768        );
1769    }
1770
1771    #[test]
1772    fn test_validate_create() {
1773        let sql = r"
1774CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp TIME INDEX)
1775PARTITION ON COLUMNS(c, a) (
1776    a < 10,
1777    a > 10 AND a < 20,
1778    a > 20 AND c < 100,
1779    a > 20 AND c >= 100
1780)
1781ENGINE=mito";
1782        let result =
1783            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1784        let _ = result.unwrap();
1785
1786        let sql = r"
1787CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
1788PARTITION ON COLUMNS(x) ()
1789ENGINE=mito";
1790        let result =
1791            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1792        assert!(
1793            result
1794                .unwrap_err()
1795                .to_string()
1796                .contains("Partition column \"x\" not defined")
1797        );
1798    }
1799
1800    #[test]
1801    fn test_parse_create_table_with_partitions() {
1802        let sql = r"
1803CREATE TABLE monitor (
1804  host_id    INT,
1805  idc        STRING,
1806  ts         TIMESTAMP,
1807  cpu        DOUBLE DEFAULT 0,
1808  memory     DOUBLE,
1809  TIME INDEX (ts),
1810  PRIMARY KEY (host),
1811)
1812PARTITION ON COLUMNS(idc, host_id) (
1813  idc <= 'hz' AND host_id < 1000,
1814  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
1815  idc > 'sh' AND host_id < 3000,
1816  idc > 'sh' AND host_id >= 3000,
1817)
1818ENGINE=mito";
1819        let result =
1820            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1821                .unwrap();
1822        assert_eq!(result.len(), 1);
1823        match &result[0] {
1824            Statement::CreateTable(c) => {
1825                assert!(c.partitions.is_some());
1826
1827                let partitions = c.partitions.as_ref().unwrap();
1828                let column_list = partitions
1829                    .column_list
1830                    .iter()
1831                    .map(|x| &x.value)
1832                    .collect::<Vec<&String>>();
1833                assert_eq!(column_list, vec!["idc", "host_id"]);
1834
1835                let exprs = &partitions.exprs;
1836
1837                assert_eq!(
1838                    exprs[0],
1839                    Expr::BinaryOp {
1840                        left: Box::new(Expr::BinaryOp {
1841                            left: Box::new(Expr::Identifier("idc".into())),
1842                            op: BinaryOperator::LtEq,
1843                            right: Box::new(Expr::Value(
1844                                Value::SingleQuotedString("hz".to_string()).into()
1845                            ))
1846                        }),
1847                        op: BinaryOperator::And,
1848                        right: Box::new(Expr::BinaryOp {
1849                            left: Box::new(Expr::Identifier("host_id".into())),
1850                            op: BinaryOperator::Lt,
1851                            right: Box::new(Expr::Value(
1852                                Value::Number("1000".to_string(), false).into()
1853                            ))
1854                        })
1855                    }
1856                );
1857                assert_eq!(
1858                    exprs[1],
1859                    Expr::BinaryOp {
1860                        left: Box::new(Expr::BinaryOp {
1861                            left: Box::new(Expr::BinaryOp {
1862                                left: Box::new(Expr::Identifier("idc".into())),
1863                                op: BinaryOperator::Gt,
1864                                right: Box::new(Expr::Value(
1865                                    Value::SingleQuotedString("hz".to_string()).into()
1866                                ))
1867                            }),
1868                            op: BinaryOperator::And,
1869                            right: Box::new(Expr::BinaryOp {
1870                                left: Box::new(Expr::Identifier("idc".into())),
1871                                op: BinaryOperator::LtEq,
1872                                right: Box::new(Expr::Value(
1873                                    Value::SingleQuotedString("sh".to_string()).into()
1874                                ))
1875                            })
1876                        }),
1877                        op: BinaryOperator::And,
1878                        right: Box::new(Expr::BinaryOp {
1879                            left: Box::new(Expr::Identifier("host_id".into())),
1880                            op: BinaryOperator::Lt,
1881                            right: Box::new(Expr::Value(
1882                                Value::Number("2000".to_string(), false).into()
1883                            ))
1884                        })
1885                    }
1886                );
1887                assert_eq!(
1888                    exprs[2],
1889                    Expr::BinaryOp {
1890                        left: Box::new(Expr::BinaryOp {
1891                            left: Box::new(Expr::Identifier("idc".into())),
1892                            op: BinaryOperator::Gt,
1893                            right: Box::new(Expr::Value(
1894                                Value::SingleQuotedString("sh".to_string()).into()
1895                            ))
1896                        }),
1897                        op: BinaryOperator::And,
1898                        right: Box::new(Expr::BinaryOp {
1899                            left: Box::new(Expr::Identifier("host_id".into())),
1900                            op: BinaryOperator::Lt,
1901                            right: Box::new(Expr::Value(
1902                                Value::Number("3000".to_string(), false).into()
1903                            ))
1904                        })
1905                    }
1906                );
1907                assert_eq!(
1908                    exprs[3],
1909                    Expr::BinaryOp {
1910                        left: Box::new(Expr::BinaryOp {
1911                            left: Box::new(Expr::Identifier("idc".into())),
1912                            op: BinaryOperator::Gt,
1913                            right: Box::new(Expr::Value(
1914                                Value::SingleQuotedString("sh".to_string()).into()
1915                            ))
1916                        }),
1917                        op: BinaryOperator::And,
1918                        right: Box::new(Expr::BinaryOp {
1919                            left: Box::new(Expr::Identifier("host_id".into())),
1920                            op: BinaryOperator::GtEq,
1921                            right: Box::new(Expr::Value(
1922                                Value::Number("3000".to_string(), false).into()
1923                            ))
1924                        })
1925                    }
1926                );
1927            }
1928            _ => unreachable!(),
1929        }
1930    }
1931
1932    #[test]
1933    fn test_parse_create_table_with_quoted_partitions() {
1934        let sql = r"
1935CREATE TABLE monitor (
1936  `host_id`    INT,
1937  idc        STRING,
1938  ts         TIMESTAMP,
1939  cpu        DOUBLE DEFAULT 0,
1940  memory     DOUBLE,
1941  TIME INDEX (ts),
1942  PRIMARY KEY (host),
1943)
1944PARTITION ON COLUMNS(IdC, host_id) (
1945  idc <= 'hz' AND host_id < 1000,
1946  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
1947  idc > 'sh' AND host_id < 3000,
1948  idc > 'sh' AND host_id >= 3000,
1949)";
1950        let result =
1951            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1952                .unwrap();
1953        assert_eq!(result.len(), 1);
1954    }
1955
1956    #[test]
1957    fn test_parse_create_table_with_timestamp_index() {
1958        let sql1 = r"
1959CREATE TABLE monitor (
1960  host_id    INT,
1961  idc        STRING,
1962  ts         TIMESTAMP TIME INDEX,
1963  cpu        DOUBLE DEFAULT 0,
1964  memory     DOUBLE,
1965  PRIMARY KEY (host),
1966)
1967ENGINE=mito";
1968        let result1 = ParserContext::create_with_dialect(
1969            sql1,
1970            &GreptimeDbDialect {},
1971            ParseOptions::default(),
1972        )
1973        .unwrap();
1974
1975        if let Statement::CreateTable(c) = &result1[0] {
1976            assert_eq!(c.constraints.len(), 2);
1977            let tc = c.constraints[0].clone();
1978            match tc {
1979                TableConstraint::TimeIndex { column } => {
1980                    assert_eq!(&column.value, "ts");
1981                }
1982                _ => panic!("should be time index constraint"),
1983            };
1984        } else {
1985            panic!("should be create_table statement");
1986        }
1987
1988        // `TIME INDEX` should be in front of `PRIMARY KEY`
1989        // in order to equal the `TIMESTAMP TIME INDEX` constraint options vector
1990        let sql2 = r"
1991CREATE TABLE monitor (
1992  host_id    INT,
1993  idc        STRING,
1994  ts         TIMESTAMP NOT NULL,
1995  cpu        DOUBLE DEFAULT 0,
1996  memory     DOUBLE,
1997  TIME INDEX (ts),
1998  PRIMARY KEY (host),
1999)
2000ENGINE=mito";
2001        let result2 = ParserContext::create_with_dialect(
2002            sql2,
2003            &GreptimeDbDialect {},
2004            ParseOptions::default(),
2005        )
2006        .unwrap();
2007
2008        assert_eq!(result1, result2);
2009
2010        // TIMESTAMP can be NULL which is not equal to above
2011        let sql3 = r"
2012CREATE TABLE monitor (
2013  host_id    INT,
2014  idc        STRING,
2015  ts         TIMESTAMP,
2016  cpu        DOUBLE DEFAULT 0,
2017  memory     DOUBLE,
2018  TIME INDEX (ts),
2019  PRIMARY KEY (host),
2020)
2021ENGINE=mito";
2022
2023        let result3 = ParserContext::create_with_dialect(
2024            sql3,
2025            &GreptimeDbDialect {},
2026            ParseOptions::default(),
2027        )
2028        .unwrap();
2029
2030        assert_ne!(result1, result3);
2031
2032        // BIGINT can't be time index any more
2033        let sql1 = r"
2034CREATE TABLE monitor (
2035  host_id    INT,
2036  idc        STRING,
2037  b          bigint TIME INDEX,
2038  cpu        DOUBLE DEFAULT 0,
2039  memory     DOUBLE,
2040  PRIMARY KEY (host),
2041)
2042ENGINE=mito";
2043        let result1 = ParserContext::create_with_dialect(
2044            sql1,
2045            &GreptimeDbDialect {},
2046            ParseOptions::default(),
2047        );
2048
2049        assert!(
2050            result1
2051                .unwrap_err()
2052                .to_string()
2053                .contains("time index column data type should be timestamp")
2054        );
2055    }
2056
2057    #[test]
2058    fn test_parse_create_table_with_timestamp_index_not_null() {
2059        let sql = r"
2060CREATE TABLE monitor (
2061  host_id    INT,
2062  idc        STRING,
2063  ts         TIMESTAMP TIME INDEX,
2064  cpu        DOUBLE DEFAULT 0,
2065  memory     DOUBLE,
2066  TIME INDEX (ts),
2067  PRIMARY KEY (host),
2068)
2069ENGINE=mito";
2070        let result =
2071            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2072                .unwrap();
2073
2074        assert_eq!(result.len(), 1);
2075        if let Statement::CreateTable(c) = &result[0] {
2076            let ts = c.columns[2].clone();
2077            assert_eq!(ts.name().to_string(), "ts");
2078            assert_eq!(ts.options()[0].option, NotNull);
2079        } else {
2080            panic!("should be create table statement");
2081        }
2082
2083        let sql1 = r"
2084CREATE TABLE monitor (
2085  host_id    INT,
2086  idc        STRING,
2087  ts         TIMESTAMP NOT NULL TIME INDEX,
2088  cpu        DOUBLE DEFAULT 0,
2089  memory     DOUBLE,
2090  TIME INDEX (ts),
2091  PRIMARY KEY (host),
2092)
2093ENGINE=mito";
2094
2095        let result1 = ParserContext::create_with_dialect(
2096            sql1,
2097            &GreptimeDbDialect {},
2098            ParseOptions::default(),
2099        )
2100        .unwrap();
2101        assert_eq!(result, result1);
2102
2103        let sql2 = r"
2104CREATE TABLE monitor (
2105  host_id    INT,
2106  idc        STRING,
2107  ts         TIMESTAMP TIME INDEX NOT NULL,
2108  cpu        DOUBLE DEFAULT 0,
2109  memory     DOUBLE,
2110  TIME INDEX (ts),
2111  PRIMARY KEY (host),
2112)
2113ENGINE=mito";
2114
2115        let result2 = ParserContext::create_with_dialect(
2116            sql2,
2117            &GreptimeDbDialect {},
2118            ParseOptions::default(),
2119        )
2120        .unwrap();
2121        assert_eq!(result, result2);
2122
2123        let sql3 = r"
2124CREATE TABLE monitor (
2125  host_id    INT,
2126  idc        STRING,
2127  ts         TIMESTAMP TIME INDEX NULL NOT,
2128  cpu        DOUBLE DEFAULT 0,
2129  memory     DOUBLE,
2130  TIME INDEX (ts),
2131  PRIMARY KEY (host),
2132)
2133ENGINE=mito";
2134
2135        let result3 = ParserContext::create_with_dialect(
2136            sql3,
2137            &GreptimeDbDialect {},
2138            ParseOptions::default(),
2139        );
2140        assert!(result3.is_err());
2141
2142        let sql4 = r"
2143CREATE TABLE monitor (
2144  host_id    INT,
2145  idc        STRING,
2146  ts         TIMESTAMP TIME INDEX NOT NULL NULL,
2147  cpu        DOUBLE DEFAULT 0,
2148  memory     DOUBLE,
2149  TIME INDEX (ts),
2150  PRIMARY KEY (host),
2151)
2152ENGINE=mito";
2153
2154        let result4 = ParserContext::create_with_dialect(
2155            sql4,
2156            &GreptimeDbDialect {},
2157            ParseOptions::default(),
2158        );
2159        assert!(result4.is_err());
2160
2161        let sql = r"
2162CREATE TABLE monitor (
2163  host_id    INT,
2164  idc        STRING,
2165  ts         TIMESTAMP TIME INDEX DEFAULT CURRENT_TIMESTAMP,
2166  cpu        DOUBLE DEFAULT 0,
2167  memory     DOUBLE,
2168  TIME INDEX (ts),
2169  PRIMARY KEY (host),
2170)
2171ENGINE=mito";
2172
2173        let result =
2174            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2175                .unwrap();
2176
2177        if let Statement::CreateTable(c) = &result[0] {
2178            let tc = c.constraints[0].clone();
2179            match tc {
2180                TableConstraint::TimeIndex { column } => {
2181                    assert_eq!(&column.value, "ts");
2182                }
2183                _ => panic!("should be time index constraint"),
2184            }
2185            let ts = c.columns[2].clone();
2186            assert_eq!(ts.name().to_string(), "ts");
2187            assert!(matches!(ts.options()[0].option, ColumnOption::Default(..)));
2188            assert_eq!(ts.options()[1].option, NotNull);
2189        } else {
2190            unreachable!("should be create table statement");
2191        }
2192    }
2193
2194    #[test]
2195    fn test_parse_partitions_with_error_syntax() {
2196        let sql = r"
2197CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2198PARTITION COLUMNS(c, a) (
2199    a < 10,
2200    a > 10 AND a < 20,
2201    a > 20 AND c < 100,
2202    a > 20 AND c >= 100
2203)
2204ENGINE=mito";
2205        let result =
2206            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2207        assert!(
2208            result
2209                .unwrap_err()
2210                .output_msg()
2211                .contains("sql parser error: Expected: ON, found: COLUMNS")
2212        );
2213    }
2214
2215    #[test]
2216    fn test_parse_partitions_without_rule() {
2217        let sql = r"
2218CREATE TABLE rcx ( a INT, b STRING, c INT, d TIMESTAMP TIME INDEX )
2219PARTITION ON COLUMNS(c, a) ()
2220ENGINE=mito";
2221        ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2222            .unwrap();
2223    }
2224
2225    #[test]
2226    fn test_parse_partitions_unreferenced_column() {
2227        let sql = r"
2228CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2229PARTITION ON COLUMNS(c, a) (
2230    b = 'foo'
2231)
2232ENGINE=mito";
2233        let result =
2234            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2235        assert_eq!(
2236            result.unwrap_err().output_msg(),
2237            "Invalid SQL, error: Column \"b\" in rule expr is not referenced in PARTITION ON"
2238        );
2239    }
2240
2241    #[test]
2242    fn test_parse_partitions_not_binary_expr() {
2243        let sql = r"
2244CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2245PARTITION ON COLUMNS(c, a) (
2246    b
2247)
2248ENGINE=mito";
2249        let result =
2250            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2251        assert_eq!(
2252            result.unwrap_err().output_msg(),
2253            r#"Invalid SQL, error: Partition rule expr Identifier(Ident { value: "b", quote_style: None, span: Span(Location(4,5)..Location(4,6)) }) is not a binary expr"#
2254        );
2255    }
2256
2257    fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {
2258        assert_eq!(column.name.to_string(), name);
2259        assert_eq!(column.data_type.to_string(), data_type);
2260    }
2261
2262    #[test]
2263    pub fn test_parse_create_table() {
2264        let sql = r"create table demo(
2265                             host string,
2266                             ts timestamp,
2267                             cpu float32 default 0,
2268                             memory float64,
2269                             TIME INDEX (ts),
2270                             PRIMARY KEY(ts, host),
2271                             ) engine=mito
2272                             with(ttl='10s');
2273         ";
2274        let result =
2275            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2276                .unwrap();
2277        assert_eq!(1, result.len());
2278        match &result[0] {
2279            Statement::CreateTable(c) => {
2280                assert!(!c.if_not_exists);
2281                assert_eq!("demo", c.name.to_string());
2282                assert_eq!("mito", c.engine);
2283                assert_eq!(4, c.columns.len());
2284                let columns = &c.columns;
2285                assert_column_def(&columns[0].column_def, "host", "STRING");
2286                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
2287                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
2288                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
2289
2290                let constraints = &c.constraints;
2291                assert_eq!(
2292                    &constraints[0],
2293                    &TableConstraint::TimeIndex {
2294                        column: Ident::new("ts"),
2295                    }
2296                );
2297                assert_eq!(
2298                    &constraints[1],
2299                    &TableConstraint::PrimaryKey {
2300                        columns: vec![Ident::new("ts"), Ident::new("host")]
2301                    }
2302                );
2303                // inverted index is merged into column options
2304                assert_eq!(1, c.options.len());
2305                assert_eq!(
2306                    [("ttl", "10s")].into_iter().collect::<HashMap<_, _>>(),
2307                    c.options.to_str_map()
2308                );
2309            }
2310            _ => unreachable!(),
2311        }
2312    }
2313
2314    #[test]
2315    fn test_invalid_index_keys() {
2316        let sql = r"create table demo(
2317                             host string,
2318                             ts int64,
2319                             cpu float64 default 0,
2320                             memory float64,
2321                             TIME INDEX (ts, host),
2322                             PRIMARY KEY(ts, host)) engine=mito;
2323         ";
2324        let result =
2325            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2326        assert!(result.is_err());
2327        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2328    }
2329
2330    #[test]
2331    fn test_duplicated_time_index() {
2332        let sql = r"create table demo(
2333                             host string,
2334                             ts timestamp time index,
2335                             t timestamp time index,
2336                             cpu float64 default 0,
2337                             memory float64,
2338                             TIME INDEX (ts, host),
2339                             PRIMARY KEY(ts, host)) engine=mito;
2340         ";
2341        let result =
2342            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2343        assert!(result.is_err());
2344        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2345
2346        let sql = r"create table demo(
2347                             host string,
2348                             ts timestamp time index,
2349                             cpu float64 default 0,
2350                             t timestamp,
2351                             memory float64,
2352                             TIME INDEX (t),
2353                             PRIMARY KEY(ts, host)) engine=mito;
2354         ";
2355        let result =
2356            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2357        assert!(result.is_err());
2358        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2359    }
2360
2361    #[test]
2362    fn test_invalid_column_name() {
2363        let sql = "create table foo(user string, i timestamp time index)";
2364        let result =
2365            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2366        let err = result.unwrap_err().output_msg();
2367        assert!(err.contains("Cannot use keyword 'user' as column name"));
2368
2369        // If column name is quoted, it's valid even same with keyword.
2370        let sql = r#"
2371            create table foo("user" string, i timestamp time index)
2372        "#;
2373        let result =
2374            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2375        let _ = result.unwrap();
2376    }
2377
2378    #[test]
2379    fn test_incorrect_default_value_issue_3479() {
2380        let sql = r#"CREATE TABLE `ExcePTuRi`(
2381non TIMESTAMP(6) TIME INDEX,
2382`iUSTO` DOUBLE DEFAULT 0.047318541668048164
2383)"#;
2384        let result =
2385            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2386                .unwrap();
2387        assert_eq!(1, result.len());
2388        match &result[0] {
2389            Statement::CreateTable(c) => {
2390                assert_eq!(
2391                    "`iUSTO` DOUBLE DEFAULT 0.047318541668048164",
2392                    c.columns[1].to_string()
2393                );
2394            }
2395            _ => unreachable!(),
2396        }
2397    }
2398
2399    #[test]
2400    fn test_parse_create_view() {
2401        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
2402
2403        let result =
2404            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2405                .unwrap();
2406        match &result[0] {
2407            Statement::CreateView(c) => {
2408                assert_eq!(c.to_string(), sql);
2409                assert!(!c.or_replace);
2410                assert!(!c.if_not_exists);
2411                assert_eq!("test", c.name.to_string());
2412            }
2413            _ => unreachable!(),
2414        }
2415
2416        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS";
2417
2418        let result =
2419            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2420                .unwrap();
2421        match &result[0] {
2422            Statement::CreateView(c) => {
2423                assert_eq!(c.to_string(), sql);
2424                assert!(c.or_replace);
2425                assert!(c.if_not_exists);
2426                assert_eq!("test", c.name.to_string());
2427            }
2428            _ => unreachable!(),
2429        }
2430    }
2431
2432    #[test]
2433    fn test_parse_create_view_invalid_query() {
2434        let sql = "CREATE VIEW test AS DELETE from demo";
2435        let result =
2436            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2437        assert!(result.is_ok_and(|x| x.len() == 1));
2438    }
2439
2440    #[test]
2441    fn test_parse_create_table_fulltext_options() {
2442        let sql1 = r"
2443CREATE TABLE log (
2444    ts TIMESTAMP TIME INDEX,
2445    msg TEXT FULLTEXT INDEX,
2446)";
2447        let result1 = ParserContext::create_with_dialect(
2448            sql1,
2449            &GreptimeDbDialect {},
2450            ParseOptions::default(),
2451        )
2452        .unwrap();
2453
2454        if let Statement::CreateTable(c) = &result1[0] {
2455            c.columns.iter().for_each(|col| {
2456                if col.name().value == "msg" {
2457                    assert!(
2458                        col.extensions
2459                            .fulltext_index_options
2460                            .as_ref()
2461                            .unwrap()
2462                            .is_empty()
2463                    );
2464                }
2465            });
2466        } else {
2467            panic!("should be create_table statement");
2468        }
2469
2470        let sql2 = r"
2471CREATE TABLE log (
2472    ts TIMESTAMP TIME INDEX,
2473    msg STRING FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false')
2474)";
2475        let result2 = ParserContext::create_with_dialect(
2476            sql2,
2477            &GreptimeDbDialect {},
2478            ParseOptions::default(),
2479        )
2480        .unwrap();
2481
2482        if let Statement::CreateTable(c) = &result2[0] {
2483            c.columns.iter().for_each(|col| {
2484                if col.name().value == "msg" {
2485                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2486                    assert_eq!(options.len(), 2);
2487                    assert_eq!(options.get("analyzer").unwrap(), "English");
2488                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2489                }
2490            });
2491        } else {
2492            panic!("should be create_table statement");
2493        }
2494
2495        let sql3 = r"
2496CREATE TABLE log (
2497    ts TIMESTAMP TIME INDEX,
2498    msg1 TINYTEXT FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false'),
2499    msg2 CHAR(20) FULLTEXT INDEX WITH (analyzer='Chinese', case_sensitive='true')
2500)";
2501        let result3 = ParserContext::create_with_dialect(
2502            sql3,
2503            &GreptimeDbDialect {},
2504            ParseOptions::default(),
2505        )
2506        .unwrap();
2507
2508        if let Statement::CreateTable(c) = &result3[0] {
2509            c.columns.iter().for_each(|col| {
2510                if col.name().value == "msg1" {
2511                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2512                    assert_eq!(options.len(), 2);
2513                    assert_eq!(options.get("analyzer").unwrap(), "English");
2514                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2515                } else if col.name().value == "msg2" {
2516                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2517                    assert_eq!(options.len(), 2);
2518                    assert_eq!(options.get("analyzer").unwrap(), "Chinese");
2519                    assert_eq!(options.get("case_sensitive").unwrap(), "true");
2520                }
2521            });
2522        } else {
2523            panic!("should be create_table statement");
2524        }
2525    }
2526
2527    #[test]
2528    fn test_parse_create_table_fulltext_options_invalid_type() {
2529        let sql = r"
2530CREATE TABLE log (
2531    ts TIMESTAMP TIME INDEX,
2532    msg INT FULLTEXT INDEX,
2533)";
2534        let result =
2535            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2536        assert!(result.is_err());
2537        assert!(
2538            result
2539                .unwrap_err()
2540                .to_string()
2541                .contains("FULLTEXT index only supports string type")
2542        );
2543    }
2544
2545    #[test]
2546    fn test_parse_create_table_fulltext_options_duplicate() {
2547        let sql = r"
2548CREATE TABLE log (
2549    ts TIMESTAMP TIME INDEX,
2550    msg STRING FULLTEXT INDEX WITH (analyzer='English', analyzer='Chinese') FULLTEXT INDEX WITH (case_sensitive='false')
2551)";
2552        let result =
2553            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2554        assert!(result.is_err());
2555        assert!(
2556            result
2557                .unwrap_err()
2558                .to_string()
2559                .contains("duplicated FULLTEXT INDEX option")
2560        );
2561    }
2562
2563    #[test]
2564    fn test_parse_create_table_fulltext_options_invalid_option() {
2565        let sql = r"
2566CREATE TABLE log (
2567    ts TIMESTAMP TIME INDEX,
2568    msg STRING FULLTEXT INDEX WITH (analyzer='English', invalid_option='Chinese')
2569)";
2570        let result =
2571            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2572        assert!(result.is_err());
2573        assert!(
2574            result
2575                .unwrap_err()
2576                .to_string()
2577                .contains("invalid FULLTEXT INDEX option")
2578        );
2579    }
2580
2581    #[test]
2582    fn test_parse_create_table_skip_options() {
2583        let sql = r"
2584CREATE TABLE log (
2585    ts TIMESTAMP TIME INDEX,
2586    msg INT SKIPPING INDEX WITH (granularity='8192', type='bloom'),
2587)";
2588        let result =
2589            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2590                .unwrap();
2591
2592        if let Statement::CreateTable(c) = &result[0] {
2593            c.columns.iter().for_each(|col| {
2594                if col.name().value == "msg" {
2595                    assert!(
2596                        !col.extensions
2597                            .skipping_index_options
2598                            .as_ref()
2599                            .unwrap()
2600                            .is_empty()
2601                    );
2602                }
2603            });
2604        } else {
2605            panic!("should be create_table statement");
2606        }
2607
2608        let sql = r"
2609        CREATE TABLE log (
2610            ts TIMESTAMP TIME INDEX,
2611            msg INT SKIPPING INDEX,
2612        )";
2613        let result =
2614            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2615                .unwrap();
2616
2617        if let Statement::CreateTable(c) = &result[0] {
2618            c.columns.iter().for_each(|col| {
2619                if col.name().value == "msg" {
2620                    assert!(
2621                        col.extensions
2622                            .skipping_index_options
2623                            .as_ref()
2624                            .unwrap()
2625                            .is_empty()
2626                    );
2627                }
2628            });
2629        } else {
2630            panic!("should be create_table statement");
2631        }
2632    }
2633
2634    #[test]
2635    fn test_parse_create_view_with_columns() {
2636        let sql = "CREATE VIEW test () AS SELECT * FROM NUMBERS";
2637        let result =
2638            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2639                .unwrap();
2640
2641        match &result[0] {
2642            Statement::CreateView(c) => {
2643                assert_eq!(c.to_string(), "CREATE VIEW test AS SELECT * FROM NUMBERS");
2644                assert!(!c.or_replace);
2645                assert!(!c.if_not_exists);
2646                assert_eq!("test", c.name.to_string());
2647            }
2648            _ => unreachable!(),
2649        }
2650        assert_eq!(
2651            "CREATE VIEW test AS SELECT * FROM NUMBERS",
2652            result[0].to_string()
2653        );
2654
2655        let sql = "CREATE VIEW test (n1) AS SELECT * FROM NUMBERS";
2656        let result =
2657            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2658                .unwrap();
2659
2660        match &result[0] {
2661            Statement::CreateView(c) => {
2662                assert_eq!(c.to_string(), sql);
2663                assert!(!c.or_replace);
2664                assert!(!c.if_not_exists);
2665                assert_eq!("test", c.name.to_string());
2666            }
2667            _ => unreachable!(),
2668        }
2669        assert_eq!(sql, result[0].to_string());
2670
2671        let sql = "CREATE VIEW test (n1, n2) AS SELECT * FROM NUMBERS";
2672        let result =
2673            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2674                .unwrap();
2675
2676        match &result[0] {
2677            Statement::CreateView(c) => {
2678                assert_eq!(c.to_string(), sql);
2679                assert!(!c.or_replace);
2680                assert!(!c.if_not_exists);
2681                assert_eq!("test", c.name.to_string());
2682            }
2683            _ => unreachable!(),
2684        }
2685        assert_eq!(sql, result[0].to_string());
2686
2687        // Some invalid syntax cases
2688        let sql = "CREATE VIEW test (n1 AS select * from demo";
2689        let result =
2690            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2691        assert!(result.is_err());
2692
2693        let sql = "CREATE VIEW test (n1, AS select * from demo";
2694        let result =
2695            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2696        assert!(result.is_err());
2697
2698        let sql = "CREATE VIEW test n1,n2) AS select * from demo";
2699        let result =
2700            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2701        assert!(result.is_err());
2702
2703        let sql = "CREATE VIEW test (1) AS select * from demo";
2704        let result =
2705            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2706        assert!(result.is_err());
2707
2708        // keyword
2709        let sql = "CREATE VIEW test (n1, select) AS select * from demo";
2710        let result =
2711            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2712        assert!(result.is_err());
2713    }
2714
2715    #[test]
2716    fn test_parse_column_extensions_vector() {
2717        let sql = "VECTOR(128)";
2718        let dialect = GenericDialect {};
2719        let mut tokenizer = Tokenizer::new(&dialect, sql);
2720        let tokens = tokenizer.tokenize().unwrap();
2721        let mut parser = Parser::new(&dialect).with_tokens(tokens);
2722        let name = Ident::new("vec_col");
2723        let data_type =
2724            DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
2725        let mut extensions = ColumnExtensions::default();
2726
2727        let result =
2728            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2729        assert!(result.is_ok());
2730        assert!(extensions.vector_options.is_some());
2731        let vector_options = extensions.vector_options.unwrap();
2732        assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some("128"));
2733    }
2734
2735    #[test]
2736    fn test_parse_column_extensions_vector_invalid() {
2737        let sql = "VECTOR()";
2738        let dialect = GenericDialect {};
2739        let mut tokenizer = Tokenizer::new(&dialect, sql);
2740        let tokens = tokenizer.tokenize().unwrap();
2741        let mut parser = Parser::new(&dialect).with_tokens(tokens);
2742        let name = Ident::new("vec_col");
2743        let data_type = DataType::Custom(vec![Ident::new("VECTOR")].into(), vec![]);
2744        let mut extensions = ColumnExtensions::default();
2745
2746        let result =
2747            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2748        assert!(result.is_err());
2749    }
2750
2751    #[test]
2752    fn test_parse_column_extensions_indices() {
2753        // Test skipping index
2754        {
2755            let sql = "SKIPPING INDEX";
2756            let dialect = GenericDialect {};
2757            let mut tokenizer = Tokenizer::new(&dialect, sql);
2758            let tokens = tokenizer.tokenize().unwrap();
2759            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2760            let name = Ident::new("col");
2761            let data_type = DataType::String(None);
2762            let mut extensions = ColumnExtensions::default();
2763            let result = ParserContext::parse_column_extensions(
2764                &mut parser,
2765                &name,
2766                &data_type,
2767                &mut extensions,
2768            );
2769            assert!(result.is_ok());
2770            assert!(extensions.skipping_index_options.is_some());
2771        }
2772
2773        // Test fulltext index with options
2774        {
2775            let sql = "FULLTEXT INDEX WITH (analyzer = 'English', case_sensitive = 'true')";
2776            let dialect = GenericDialect {};
2777            let mut tokenizer = Tokenizer::new(&dialect, sql);
2778            let tokens = tokenizer.tokenize().unwrap();
2779            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2780            let name = Ident::new("text_col");
2781            let data_type = DataType::String(None);
2782            let mut extensions = ColumnExtensions::default();
2783            let result = ParserContext::parse_column_extensions(
2784                &mut parser,
2785                &name,
2786                &data_type,
2787                &mut extensions,
2788            );
2789            assert!(result.unwrap());
2790            assert!(extensions.fulltext_index_options.is_some());
2791            let fulltext_options = extensions.fulltext_index_options.unwrap();
2792            assert_eq!(fulltext_options.get("analyzer"), Some("English"));
2793            assert_eq!(fulltext_options.get("case_sensitive"), Some("true"));
2794        }
2795
2796        // Test fulltext index with invalid type (should fail)
2797        {
2798            let sql = "FULLTEXT INDEX WITH (analyzer = 'English')";
2799            let dialect = GenericDialect {};
2800            let mut tokenizer = Tokenizer::new(&dialect, sql);
2801            let tokens = tokenizer.tokenize().unwrap();
2802            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2803            let name = Ident::new("num_col");
2804            let data_type = DataType::Int(None); // Non-string type
2805            let mut extensions = ColumnExtensions::default();
2806            let result = ParserContext::parse_column_extensions(
2807                &mut parser,
2808                &name,
2809                &data_type,
2810                &mut extensions,
2811            );
2812            assert!(result.is_err());
2813            assert!(
2814                result
2815                    .unwrap_err()
2816                    .to_string()
2817                    .contains("FULLTEXT index only supports string type")
2818            );
2819        }
2820
2821        // Test fulltext index with invalid option (won't fail, the parser doesn't check the option's content)
2822        {
2823            let sql = "FULLTEXT INDEX WITH (analyzer = 'Invalid', case_sensitive = 'true')";
2824            let dialect = GenericDialect {};
2825            let mut tokenizer = Tokenizer::new(&dialect, sql);
2826            let tokens = tokenizer.tokenize().unwrap();
2827            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2828            let name = Ident::new("text_col");
2829            let data_type = DataType::String(None);
2830            let mut extensions = ColumnExtensions::default();
2831            let result = ParserContext::parse_column_extensions(
2832                &mut parser,
2833                &name,
2834                &data_type,
2835                &mut extensions,
2836            );
2837            assert!(result.unwrap());
2838        }
2839
2840        // Test inverted index
2841        {
2842            let sql = "INVERTED INDEX";
2843            let dialect = GenericDialect {};
2844            let mut tokenizer = Tokenizer::new(&dialect, sql);
2845            let tokens = tokenizer.tokenize().unwrap();
2846            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2847            let name = Ident::new("col");
2848            let data_type = DataType::String(None);
2849            let mut extensions = ColumnExtensions::default();
2850            let result = ParserContext::parse_column_extensions(
2851                &mut parser,
2852                &name,
2853                &data_type,
2854                &mut extensions,
2855            );
2856            assert!(result.is_ok());
2857            assert!(extensions.inverted_index_options.is_some());
2858        }
2859
2860        // Test inverted index with options (should fail)
2861        {
2862            let sql = "INVERTED INDEX WITH (analyzer = 'English')";
2863            let dialect = GenericDialect {};
2864            let mut tokenizer = Tokenizer::new(&dialect, sql);
2865            let tokens = tokenizer.tokenize().unwrap();
2866            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2867            let name = Ident::new("col");
2868            let data_type = DataType::String(None);
2869            let mut extensions = ColumnExtensions::default();
2870            let result = ParserContext::parse_column_extensions(
2871                &mut parser,
2872                &name,
2873                &data_type,
2874                &mut extensions,
2875            );
2876            assert!(result.is_err());
2877            assert!(
2878                result
2879                    .unwrap_err()
2880                    .to_string()
2881                    .contains("INVERTED index doesn't support options")
2882            );
2883        }
2884
2885        // Test multiple indices
2886        {
2887            let sql = "SKIPPING INDEX FULLTEXT INDEX";
2888            let dialect = GenericDialect {};
2889            let mut tokenizer = Tokenizer::new(&dialect, sql);
2890            let tokens = tokenizer.tokenize().unwrap();
2891            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2892            let name = Ident::new("col");
2893            let data_type = DataType::String(None);
2894            let mut extensions = ColumnExtensions::default();
2895            let result = ParserContext::parse_column_extensions(
2896                &mut parser,
2897                &name,
2898                &data_type,
2899                &mut extensions,
2900            );
2901            assert!(result.unwrap());
2902            assert!(extensions.skipping_index_options.is_some());
2903            assert!(extensions.fulltext_index_options.is_some());
2904        }
2905    }
2906
2907    #[test]
2908    fn test_parse_interval_cast() {
2909        let s = "select '10s'::INTERVAL";
2910        let stmts =
2911            ParserContext::create_with_dialect(s, &GreptimeDbDialect {}, ParseOptions::default())
2912                .unwrap();
2913        assert_eq!("SELECT '10 seconds'::INTERVAL", &stmts[0].to_string());
2914    }
2915}