sql/parsers/
create_parser.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod json;
16#[cfg(feature = "enterprise")]
17pub mod trigger;
18
19use std::collections::HashMap;
20
21use arrow_buffer::IntervalMonthDayNano;
22use common_catalog::consts::default_engine;
23use datafusion_common::ScalarValue;
24use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
25use datatypes::data_type::ConcreteDataType;
26use itertools::Itertools;
27use snafu::{OptionExt, ResultExt, ensure};
28use sqlparser::ast::{ColumnOption, ColumnOptionDef, DataType, Expr};
29use sqlparser::dialect::keywords::Keyword;
30use sqlparser::keywords::ALL_KEYWORDS;
31use sqlparser::parser::IsOptional::Mandatory;
32use sqlparser::parser::{Parser, ParserError};
33use sqlparser::tokenizer::{Token, TokenWithSpan, Word};
34use table::requests::{validate_database_option, validate_table_option};
35
36use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
37use crate::error::{
38    self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidIntervalSnafu,
39    InvalidSqlSnafu, InvalidTableOptionSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result,
40    SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu,
41};
42use crate::parser::{FLOW, ParserContext};
43use crate::parsers::tql_parser;
44use crate::parsers::utils::{
45    self, validate_column_fulltext_create_option, validate_column_skipping_index_create_option,
46};
47use crate::statements::create::{
48    Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
49    CreateTableLike, CreateView, Partitions, SqlOrTql, TableConstraint, VECTOR_OPT_DIM,
50};
51use crate::statements::statement::Statement;
52use crate::statements::transform::type_alias::get_data_type_by_alias_name;
53use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
54use crate::util::{OptionValue, location_to_index, parse_option_string};
55
56pub const ENGINE: &str = "ENGINE";
57pub const MAXVALUE: &str = "MAXVALUE";
58pub const SINK: &str = "SINK";
59pub const EXPIRE: &str = "EXPIRE";
60pub const AFTER: &str = "AFTER";
61pub const INVERTED: &str = "INVERTED";
62pub const SKIPPING: &str = "SKIPPING";
63
64pub type RawIntervalExpr = String;
65
66/// Parses create [table] statement
67impl<'a> ParserContext<'a> {
68    pub(crate) fn parse_create(&mut self) -> Result<Statement> {
69        match self.parser.peek_token().token {
70            Token::Word(w) => match w.keyword {
71                Keyword::TABLE => self.parse_create_table(),
72
73                Keyword::SCHEMA | Keyword::DATABASE => self.parse_create_database(),
74
75                Keyword::EXTERNAL => self.parse_create_external_table(),
76
77                Keyword::OR => {
78                    let _ = self.parser.next_token();
79                    self.parser
80                        .expect_keyword(Keyword::REPLACE)
81                        .context(SyntaxSnafu)?;
82                    match self.parser.next_token().token {
83                        Token::Word(w) => match w.keyword {
84                            Keyword::VIEW => self.parse_create_view(true),
85                            Keyword::NoKeyword => {
86                                let uppercase = w.value.to_uppercase();
87                                match uppercase.as_str() {
88                                    FLOW => self.parse_create_flow(true),
89                                    _ => self.unsupported(w.to_string()),
90                                }
91                            }
92                            _ => self.unsupported(w.to_string()),
93                        },
94                        _ => self.unsupported(w.to_string()),
95                    }
96                }
97
98                Keyword::VIEW => {
99                    let _ = self.parser.next_token();
100                    self.parse_create_view(false)
101                }
102
103                #[cfg(feature = "enterprise")]
104                Keyword::TRIGGER => {
105                    let _ = self.parser.next_token();
106                    self.parse_create_trigger()
107                }
108
109                Keyword::NoKeyword => {
110                    let _ = self.parser.next_token();
111                    let uppercase = w.value.to_uppercase();
112                    match uppercase.as_str() {
113                        FLOW => self.parse_create_flow(false),
114                        _ => self.unsupported(w.to_string()),
115                    }
116                }
117                _ => self.unsupported(w.to_string()),
118            },
119            unexpected => self.unsupported(unexpected.to_string()),
120        }
121    }
122
123    /// Parse `CREAVE VIEW` statement.
124    fn parse_create_view(&mut self, or_replace: bool) -> Result<Statement> {
125        let if_not_exists = self.parse_if_not_exist()?;
126        let view_name = self.intern_parse_table_name()?;
127
128        let columns = self.parse_view_columns()?;
129
130        self.parser
131            .expect_keyword(Keyword::AS)
132            .context(SyntaxSnafu)?;
133
134        let query = self.parse_query()?;
135
136        Ok(Statement::CreateView(CreateView {
137            name: view_name,
138            columns,
139            or_replace,
140            query: Box::new(query),
141            if_not_exists,
142        }))
143    }
144
145    fn parse_view_columns(&mut self) -> Result<Vec<Ident>> {
146        let mut columns = vec![];
147        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
148            return Ok(columns);
149        }
150
151        loop {
152            let name = self.parse_column_name().context(SyntaxSnafu)?;
153
154            columns.push(name);
155
156            let comma = self.parser.consume_token(&Token::Comma);
157            if self.parser.consume_token(&Token::RParen) {
158                // allow a trailing comma, even though it's not in standard
159                break;
160            } else if !comma {
161                return self.expected("',' or ')' after column name", self.parser.peek_token());
162            }
163        }
164
165        Ok(columns)
166    }
167
168    fn parse_create_external_table(&mut self) -> Result<Statement> {
169        let _ = self.parser.next_token();
170        self.parser
171            .expect_keyword(Keyword::TABLE)
172            .context(SyntaxSnafu)?;
173        let if_not_exists = self.parse_if_not_exist()?;
174        let table_name = self.intern_parse_table_name()?;
175        let (columns, constraints) = self.parse_columns()?;
176        if !columns.is_empty() {
177            validate_time_index(&columns, &constraints)?;
178        }
179
180        let engine = self.parse_table_engine(common_catalog::consts::FILE_ENGINE)?;
181        let options = self.parse_create_table_options()?;
182        Ok(Statement::CreateExternalTable(CreateExternalTable {
183            name: table_name,
184            columns,
185            constraints,
186            options,
187            if_not_exists,
188            engine,
189        }))
190    }
191
192    fn parse_create_database(&mut self) -> Result<Statement> {
193        let _ = self.parser.next_token();
194        let if_not_exists = self.parse_if_not_exist()?;
195        let database_name = self.parse_object_name().context(error::UnexpectedSnafu {
196            expected: "a database name",
197            actual: self.peek_token_as_string(),
198        })?;
199        let database_name = Self::canonicalize_object_name(database_name)?;
200
201        let options = self
202            .parser
203            .parse_options(Keyword::WITH)
204            .context(SyntaxSnafu)?
205            .into_iter()
206            .map(parse_option_string)
207            .collect::<Result<HashMap<String, OptionValue>>>()?;
208
209        for key in options.keys() {
210            ensure!(
211                validate_database_option(key),
212                InvalidDatabaseOptionSnafu { key: key.clone() }
213            );
214        }
215        if let Some(append_mode) = options.get("append_mode").and_then(|x| x.as_string())
216            && append_mode == "true"
217            && options.contains_key("merge_mode")
218        {
219            return InvalidDatabaseOptionSnafu {
220                key: "merge_mode".to_string(),
221            }
222            .fail();
223        }
224
225        Ok(Statement::CreateDatabase(CreateDatabase {
226            name: database_name,
227            if_not_exists,
228            options: OptionMap::new(options),
229        }))
230    }
231
232    fn parse_create_table(&mut self) -> Result<Statement> {
233        let _ = self.parser.next_token();
234
235        let if_not_exists = self.parse_if_not_exist()?;
236
237        let table_name = self.intern_parse_table_name()?;
238
239        if self.parser.parse_keyword(Keyword::LIKE) {
240            let source_name = self.intern_parse_table_name()?;
241
242            return Ok(Statement::CreateTableLike(CreateTableLike {
243                table_name,
244                source_name,
245            }));
246        }
247
248        let (columns, constraints) = self.parse_columns()?;
249        validate_time_index(&columns, &constraints)?;
250
251        let partitions = self.parse_partitions()?;
252        if let Some(partitions) = &partitions {
253            validate_partitions(&columns, partitions)?;
254        }
255
256        let engine = self.parse_table_engine(default_engine())?;
257        let options = self.parse_create_table_options()?;
258        let create_table = CreateTable {
259            if_not_exists,
260            name: table_name,
261            columns,
262            engine,
263            constraints,
264            options,
265            table_id: 0, // table id is assigned by catalog manager
266            partitions,
267        };
268
269        Ok(Statement::CreateTable(create_table))
270    }
271
272    /// "CREATE FLOW" clause
273    fn parse_create_flow(&mut self, or_replace: bool) -> Result<Statement> {
274        let if_not_exists = self.parse_if_not_exist()?;
275
276        let flow_name = self.intern_parse_table_name()?;
277
278        // make `SINK` case in-sensitive
279        if let Token::Word(word) = self.parser.peek_token().token
280            && word.value.eq_ignore_ascii_case(SINK)
281        {
282            self.parser.next_token();
283        } else {
284            Err(ParserError::ParserError(
285                "Expect `SINK` keyword".to_string(),
286            ))
287            .context(SyntaxSnafu)?
288        }
289        self.parser
290            .expect_keyword(Keyword::TO)
291            .context(SyntaxSnafu)?;
292
293        let output_table_name = self.intern_parse_table_name()?;
294
295        let expire_after = if let Token::Word(w1) = &self.parser.peek_token().token
296            && w1.value.eq_ignore_ascii_case(EXPIRE)
297        {
298            self.parser.next_token();
299            if let Token::Word(w2) = &self.parser.peek_token().token
300                && w2.value.eq_ignore_ascii_case(AFTER)
301            {
302                self.parser.next_token();
303                Some(self.parse_interval_no_month("EXPIRE AFTER")?)
304            } else {
305                None
306            }
307        } else {
308            None
309        };
310
311        let eval_interval = if self
312            .parser
313            .consume_tokens(&[Token::make_keyword("EVAL"), Token::make_keyword("INTERVAL")])
314        {
315            Some(self.parse_interval_no_month("EVAL INTERVAL")?)
316        } else {
317            None
318        };
319
320        let comment = if self.parser.parse_keyword(Keyword::COMMENT) {
321            match self.parser.next_token() {
322                TokenWithSpan {
323                    token: Token::SingleQuotedString(value, ..),
324                    ..
325                } => Some(value),
326                unexpected => {
327                    return self
328                        .parser
329                        .expected("string", unexpected)
330                        .context(SyntaxSnafu);
331                }
332            }
333        } else {
334            None
335        };
336
337        self.parser
338            .expect_keyword(Keyword::AS)
339            .context(SyntaxSnafu)?;
340
341        let query = Box::new(self.parse_sql_or_tql(true)?);
342
343        Ok(Statement::CreateFlow(CreateFlow {
344            flow_name,
345            sink_table_name: output_table_name,
346            or_replace,
347            if_not_exists,
348            expire_after,
349            eval_interval,
350            comment,
351            query,
352        }))
353    }
354
355    fn parse_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
356        let start_loc = self.parser.peek_token().span.start;
357        let start_index = location_to_index(self.sql, &start_loc);
358
359        // only accept sql or tql
360        let query = match self.parser.peek_token().token {
361            Token::Word(w) => match w.keyword {
362                Keyword::SELECT => self.parse_query(),
363                Keyword::NoKeyword
364                    if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
365                {
366                    self.parse_tql(require_now_expr)
367                }
368
369                _ => self.unsupported(self.peek_token_as_string()),
370            },
371            _ => self.unsupported(self.peek_token_as_string()),
372        }?;
373
374        let end_token = self.parser.peek_token();
375
376        let raw_query = if end_token == Token::EOF {
377            &self.sql[start_index..]
378        } else {
379            let end_loc = end_token.span.end;
380            let end_index = location_to_index(self.sql, &end_loc);
381            &self.sql[start_index..end_index.min(self.sql.len())]
382        };
383        let raw_query = raw_query.trim_end_matches(";");
384        let query = SqlOrTql::try_from_statement(query, raw_query)?;
385        Ok(query)
386    }
387
388    /// Parse the interval expr to duration in seconds.
389    fn parse_interval_no_month(&mut self, context: &str) -> Result<i64> {
390        let interval = self.parse_interval_month_day_nano()?.0;
391        if interval.months != 0 {
392            return InvalidIntervalSnafu {
393                reason: format!("Interval with months is not allowed in {context}"),
394            }
395            .fail();
396        }
397        Ok(
398            interval.nanoseconds / 1_000_000_000
399                + interval.days as i64 * 60 * 60 * 24
400                + interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, // 1 month=365.25/12=30.44 days
401                                                                       // this is to keep the same as https://docs.rs/humantime/latest/humantime/fn.parse_duration.html
402                                                                       // which we use in database to parse i.e. ttl interval and many other intervals
403        )
404    }
405
406    /// Parse interval expr to [`IntervalMonthDayNano`].
407    fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> {
408        let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
409        let raw_interval_expr = interval_expr.to_string();
410        let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone(), false)?
411            .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
412            .ok()
413            .with_context(|| InvalidIntervalSnafu {
414                reason: format!("cannot cast {} to interval type", interval_expr),
415            })?;
416        if let ScalarValue::IntervalMonthDayNano(Some(interval)) = interval {
417            Ok((interval, raw_interval_expr))
418        } else {
419            unreachable!()
420        }
421    }
422
423    fn parse_if_not_exist(&mut self) -> Result<bool> {
424        match self.parser.peek_token().token {
425            Token::Word(w) if Keyword::IF != w.keyword => return Ok(false),
426            _ => {}
427        }
428
429        if self.parser.parse_keywords(&[Keyword::IF, Keyword::NOT]) {
430            return self
431                .parser
432                .expect_keyword(Keyword::EXISTS)
433                .map(|_| true)
434                .context(UnexpectedSnafu {
435                    expected: "EXISTS",
436                    actual: self.peek_token_as_string(),
437                });
438        }
439
440        if self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]) {
441            return UnsupportedSnafu { keyword: "EXISTS" }.fail();
442        }
443
444        Ok(false)
445    }
446
447    fn parse_create_table_options(&mut self) -> Result<OptionMap> {
448        let options = self
449            .parser
450            .parse_options(Keyword::WITH)
451            .context(SyntaxSnafu)?
452            .into_iter()
453            .map(parse_option_string)
454            .collect::<Result<HashMap<String, OptionValue>>>()?;
455        for key in options.keys() {
456            ensure!(validate_table_option(key), InvalidTableOptionSnafu { key });
457        }
458        Ok(OptionMap::new(options))
459    }
460
461    /// "PARTITION ON COLUMNS (...)" clause
462    fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
463        if !self.parser.parse_keyword(Keyword::PARTITION) {
464            return Ok(None);
465        }
466        self.parser
467            .expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
468            .context(error::UnexpectedSnafu {
469                expected: "ON, COLUMNS",
470                actual: self.peek_token_as_string(),
471            })?;
472
473        let raw_column_list = self
474            .parser
475            .parse_parenthesized_column_list(Mandatory, false)
476            .context(error::SyntaxSnafu)?;
477        let column_list = raw_column_list
478            .into_iter()
479            .map(Self::canonicalize_identifier)
480            .collect();
481
482        let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
483
484        Ok(Some(Partitions { column_list, exprs }))
485    }
486
487    fn parse_partition_entry(&mut self) -> Result<Expr> {
488        self.parser.parse_expr().context(error::SyntaxSnafu)
489    }
490
491    /// Parse a comma-separated list wrapped by "()", and of which all items accepted by `F`
492    fn parse_comma_separated<T, F>(&mut self, mut f: F) -> Result<Vec<T>>
493    where
494        F: FnMut(&mut ParserContext<'a>) -> Result<T>,
495    {
496        self.parser
497            .expect_token(&Token::LParen)
498            .context(error::UnexpectedSnafu {
499                expected: "(",
500                actual: self.peek_token_as_string(),
501            })?;
502
503        let mut values = vec![];
504        while self.parser.peek_token() != Token::RParen {
505            values.push(f(self)?);
506            if !self.parser.consume_token(&Token::Comma) {
507                break;
508            }
509        }
510
511        self.parser
512            .expect_token(&Token::RParen)
513            .context(error::UnexpectedSnafu {
514                expected: ")",
515                actual: self.peek_token_as_string(),
516            })?;
517
518        Ok(values)
519    }
520
521    /// Parse the columns and constraints.
522    fn parse_columns(&mut self) -> Result<(Vec<Column>, Vec<TableConstraint>)> {
523        let mut columns = vec![];
524        let mut constraints = vec![];
525        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
526            return Ok((columns, constraints));
527        }
528
529        loop {
530            if let Some(constraint) = self.parse_optional_table_constraint()? {
531                constraints.push(constraint);
532            } else if let Token::Word(_) = self.parser.peek_token().token {
533                self.parse_column(&mut columns, &mut constraints)?;
534            } else {
535                return self.expected(
536                    "column name or constraint definition",
537                    self.parser.peek_token(),
538                );
539            }
540            let comma = self.parser.consume_token(&Token::Comma);
541            if self.parser.consume_token(&Token::RParen) {
542                // allow a trailing comma, even though it's not in standard
543                break;
544            } else if !comma {
545                return self.expected(
546                    "',' or ')' after column definition",
547                    self.parser.peek_token(),
548                );
549            }
550        }
551
552        Ok((columns, constraints))
553    }
554
555    fn parse_column(
556        &mut self,
557        columns: &mut Vec<Column>,
558        constraints: &mut Vec<TableConstraint>,
559    ) -> Result<()> {
560        let mut column = self.parse_column_def()?;
561
562        let mut time_index_opt_idx = None;
563        for (index, opt) in column.options().iter().enumerate() {
564            if let ColumnOption::DialectSpecific(tokens) = &opt.option
565                && matches!(
566                    &tokens[..],
567                    [
568                        Token::Word(Word {
569                            keyword: Keyword::TIME,
570                            ..
571                        }),
572                        Token::Word(Word {
573                            keyword: Keyword::INDEX,
574                            ..
575                        })
576                    ]
577                )
578            {
579                ensure!(
580                    time_index_opt_idx.is_none(),
581                    InvalidColumnOptionSnafu {
582                        name: column.name().to_string(),
583                        msg: "duplicated time index",
584                    }
585                );
586                time_index_opt_idx = Some(index);
587
588                let constraint = TableConstraint::TimeIndex {
589                    column: Ident::new(column.name().value.clone()),
590                };
591                constraints.push(constraint);
592            }
593        }
594
595        if let Some(index) = time_index_opt_idx {
596            ensure!(
597                !column.options().contains(&ColumnOptionDef {
598                    option: ColumnOption::Null,
599                    name: None,
600                }),
601                InvalidColumnOptionSnafu {
602                    name: column.name().to_string(),
603                    msg: "time index column can't be null",
604                }
605            );
606
607            // The timestamp type may be an alias type, we have to retrieve the actual type.
608            let data_type = get_unalias_type(column.data_type());
609            ensure!(
610                matches!(data_type, DataType::Timestamp(_, _)),
611                InvalidColumnOptionSnafu {
612                    name: column.name().to_string(),
613                    msg: "time index column data type should be timestamp",
614                }
615            );
616
617            let not_null_opt = ColumnOptionDef {
618                option: ColumnOption::NotNull,
619                name: None,
620            };
621
622            if !column.options().contains(&not_null_opt) {
623                column.mut_options().push(not_null_opt);
624            }
625
626            let _ = column.mut_options().remove(index);
627        }
628
629        columns.push(column);
630
631        Ok(())
632    }
633
634    /// Parse the column name and check if it's valid.
635    fn parse_column_name(&mut self) -> std::result::Result<Ident, ParserError> {
636        let name = self.parser.parse_identifier()?;
637        if name.quote_style.is_none() &&
638        // "ALL_KEYWORDS" are sorted.
639            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()
640        {
641            return Err(ParserError::ParserError(format!(
642                "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
643                &name.value
644            )));
645        }
646
647        Ok(name)
648    }
649
650    pub fn parse_column_def(&mut self) -> Result<Column> {
651        let name = self.parse_column_name().context(SyntaxSnafu)?;
652        let parser = &mut self.parser;
653
654        ensure!(
655            !(name.quote_style.is_none() &&
656            // "ALL_KEYWORDS" are sorted.
657            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()),
658            InvalidSqlSnafu {
659                msg: format!(
660                    "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
661                    &name.value
662                ),
663            }
664        );
665
666        let mut extensions = ColumnExtensions::default();
667
668        let data_type = parser.parse_data_type().context(SyntaxSnafu)?;
669        // Must immediately parse the JSON datatype format because it is closely after the "JSON"
670        // datatype, like this: "JSON(format = ...)".
671        if matches!(data_type, DataType::JSON) {
672            let options = json::parse_json_datatype_options(parser)?;
673            extensions.json_datatype_options = Some(options);
674        }
675
676        let mut options = vec![];
677        loop {
678            if parser.parse_keyword(Keyword::CONSTRAINT) {
679                let name = Some(parser.parse_identifier().context(SyntaxSnafu)?);
680                if let Some(option) = Self::parse_optional_column_option(parser)? {
681                    options.push(ColumnOptionDef { name, option });
682                } else {
683                    return parser
684                        .expected(
685                            "constraint details after CONSTRAINT <name>",
686                            parser.peek_token(),
687                        )
688                        .context(SyntaxSnafu);
689                }
690            } else if let Some(option) = Self::parse_optional_column_option(parser)? {
691                options.push(ColumnOptionDef { name: None, option });
692            } else if !Self::parse_column_extensions(parser, &name, &data_type, &mut extensions)? {
693                break;
694            };
695        }
696
697        Ok(Column {
698            column_def: ColumnDef {
699                name: Self::canonicalize_identifier(name),
700                data_type,
701                options,
702            },
703            extensions,
704        })
705    }
706
707    fn parse_optional_column_option(parser: &mut Parser<'_>) -> Result<Option<ColumnOption>> {
708        if parser.parse_keywords(&[Keyword::CHARACTER, Keyword::SET]) {
709            Ok(Some(ColumnOption::CharacterSet(
710                parser.parse_object_name(false).context(SyntaxSnafu)?,
711            )))
712        } else if parser.parse_keywords(&[Keyword::NOT, Keyword::NULL]) {
713            Ok(Some(ColumnOption::NotNull))
714        } else if parser.parse_keywords(&[Keyword::COMMENT]) {
715            match parser.next_token() {
716                TokenWithSpan {
717                    token: Token::SingleQuotedString(value, ..),
718                    ..
719                } => Ok(Some(ColumnOption::Comment(value))),
720                unexpected => parser.expected("string", unexpected).context(SyntaxSnafu),
721            }
722        } else if parser.parse_keyword(Keyword::NULL) {
723            Ok(Some(ColumnOption::Null))
724        } else if parser.parse_keyword(Keyword::DEFAULT) {
725            Ok(Some(ColumnOption::Default(
726                parser.parse_expr().context(SyntaxSnafu)?,
727            )))
728        } else if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
729            Ok(Some(ColumnOption::Unique {
730                is_primary: true,
731                characteristics: None,
732            }))
733        } else if parser.parse_keyword(Keyword::UNIQUE) {
734            Ok(Some(ColumnOption::Unique {
735                is_primary: false,
736                characteristics: None,
737            }))
738        } else if parser.parse_keywords(&[Keyword::TIME, Keyword::INDEX]) {
739            // Use a DialectSpecific option for time index
740            Ok(Some(ColumnOption::DialectSpecific(vec![
741                Token::Word(Word {
742                    value: "TIME".to_string(),
743                    quote_style: None,
744                    keyword: Keyword::TIME,
745                }),
746                Token::Word(Word {
747                    value: "INDEX".to_string(),
748                    quote_style: None,
749                    keyword: Keyword::INDEX,
750                }),
751            ])))
752        } else {
753            Ok(None)
754        }
755    }
756
757    /// Parse a column option extensions.
758    ///
759    /// This function will handle:
760    /// - Vector type
761    /// - Indexes
762    fn parse_column_extensions(
763        parser: &mut Parser<'_>,
764        column_name: &Ident,
765        column_type: &DataType,
766        column_extensions: &mut ColumnExtensions,
767    ) -> Result<bool> {
768        if let DataType::Custom(name, tokens) = column_type
769            && name.0.len() == 1
770            && &name.0[0].to_string_unquoted().to_uppercase() == "VECTOR"
771        {
772            ensure!(
773                tokens.len() == 1,
774                InvalidColumnOptionSnafu {
775                    name: column_name.to_string(),
776                    msg: "VECTOR type should have dimension",
777                }
778            );
779
780            let dimension =
781                tokens[0]
782                    .parse::<u32>()
783                    .ok()
784                    .with_context(|| InvalidColumnOptionSnafu {
785                        name: column_name.to_string(),
786                        msg: "dimension should be a positive integer",
787                    })?;
788
789            let options = OptionMap::from([(VECTOR_OPT_DIM.to_string(), dimension.to_string())]);
790            column_extensions.vector_options = Some(options);
791        }
792
793        // parse index options in column definition
794        let mut is_index_declared = false;
795
796        // skipping index
797        if let Token::Word(word) = parser.peek_token().token
798            && word.value.eq_ignore_ascii_case(SKIPPING)
799        {
800            parser.next_token();
801            // Consume `INDEX` keyword
802            ensure!(
803                parser.parse_keyword(Keyword::INDEX),
804                InvalidColumnOptionSnafu {
805                    name: column_name.to_string(),
806                    msg: "expect INDEX after SKIPPING keyword",
807                }
808            );
809            ensure!(
810                column_extensions.skipping_index_options.is_none(),
811                InvalidColumnOptionSnafu {
812                    name: column_name.to_string(),
813                    msg: "duplicated SKIPPING index option",
814                }
815            );
816
817            let options = parser
818                .parse_options(Keyword::WITH)
819                .context(error::SyntaxSnafu)?
820                .into_iter()
821                .map(parse_option_string)
822                .collect::<Result<Vec<_>>>()?;
823
824            for (key, _) in options.iter() {
825                ensure!(
826                    validate_column_skipping_index_create_option(key),
827                    InvalidColumnOptionSnafu {
828                        name: column_name.to_string(),
829                        msg: format!("invalid SKIPPING INDEX option: {key}"),
830                    }
831                );
832            }
833
834            let options = OptionMap::new(options);
835            column_extensions.skipping_index_options = Some(options);
836            is_index_declared |= true;
837        }
838
839        // fulltext index
840        if parser.parse_keyword(Keyword::FULLTEXT) {
841            // Consume `INDEX` keyword
842            ensure!(
843                parser.parse_keyword(Keyword::INDEX),
844                InvalidColumnOptionSnafu {
845                    name: column_name.to_string(),
846                    msg: "expect INDEX after FULLTEXT keyword",
847                }
848            );
849
850            ensure!(
851                column_extensions.fulltext_index_options.is_none(),
852                InvalidColumnOptionSnafu {
853                    name: column_name.to_string(),
854                    msg: "duplicated FULLTEXT INDEX option",
855                }
856            );
857
858            let column_type = get_unalias_type(column_type);
859            let data_type = sql_data_type_to_concrete_data_type(&column_type)?;
860            ensure!(
861                data_type == ConcreteDataType::string_datatype(),
862                InvalidColumnOptionSnafu {
863                    name: column_name.to_string(),
864                    msg: "FULLTEXT index only supports string type",
865                }
866            );
867
868            let options = parser
869                .parse_options(Keyword::WITH)
870                .context(error::SyntaxSnafu)?
871                .into_iter()
872                .map(parse_option_string)
873                .collect::<Result<Vec<_>>>()?;
874
875            for (key, _) in options.iter() {
876                ensure!(
877                    validate_column_fulltext_create_option(key),
878                    InvalidColumnOptionSnafu {
879                        name: column_name.to_string(),
880                        msg: format!("invalid FULLTEXT INDEX option: {key}"),
881                    }
882                );
883            }
884
885            let options = OptionMap::new(options);
886            column_extensions.fulltext_index_options = Some(options);
887            is_index_declared |= true;
888        }
889
890        // inverted index
891        if let Token::Word(word) = parser.peek_token().token
892            && word.value.eq_ignore_ascii_case(INVERTED)
893        {
894            parser.next_token();
895            // Consume `INDEX` keyword
896            ensure!(
897                parser.parse_keyword(Keyword::INDEX),
898                InvalidColumnOptionSnafu {
899                    name: column_name.to_string(),
900                    msg: "expect INDEX after INVERTED keyword",
901                }
902            );
903
904            ensure!(
905                column_extensions.inverted_index_options.is_none(),
906                InvalidColumnOptionSnafu {
907                    name: column_name.to_string(),
908                    msg: "duplicated INVERTED index option",
909                }
910            );
911
912            // inverted index doesn't have options, skipping `WITH`
913            // try cache `WITH` and throw error
914            let with_token = parser.peek_token();
915            ensure!(
916                with_token.token
917                    != Token::Word(Word {
918                        value: "WITH".to_string(),
919                        keyword: Keyword::WITH,
920                        quote_style: None,
921                    }),
922                InvalidColumnOptionSnafu {
923                    name: column_name.to_string(),
924                    msg: "INVERTED index doesn't support options",
925                }
926            );
927
928            column_extensions.inverted_index_options = Some(OptionMap::default());
929            is_index_declared |= true;
930        }
931
932        Ok(is_index_declared)
933    }
934
935    fn parse_optional_table_constraint(&mut self) -> Result<Option<TableConstraint>> {
936        match self.parser.next_token() {
937            TokenWithSpan {
938                token: Token::Word(w),
939                ..
940            } if w.keyword == Keyword::PRIMARY => {
941                self.parser
942                    .expect_keyword(Keyword::KEY)
943                    .context(error::UnexpectedSnafu {
944                        expected: "KEY",
945                        actual: self.peek_token_as_string(),
946                    })?;
947                let raw_columns = self
948                    .parser
949                    .parse_parenthesized_column_list(Mandatory, false)
950                    .context(error::SyntaxSnafu)?;
951                let columns = raw_columns
952                    .into_iter()
953                    .map(Self::canonicalize_identifier)
954                    .collect();
955                Ok(Some(TableConstraint::PrimaryKey { columns }))
956            }
957            TokenWithSpan {
958                token: Token::Word(w),
959                ..
960            } if w.keyword == Keyword::TIME => {
961                self.parser
962                    .expect_keyword(Keyword::INDEX)
963                    .context(error::UnexpectedSnafu {
964                        expected: "INDEX",
965                        actual: self.peek_token_as_string(),
966                    })?;
967
968                let raw_columns = self
969                    .parser
970                    .parse_parenthesized_column_list(Mandatory, false)
971                    .context(error::SyntaxSnafu)?;
972                let mut columns = raw_columns
973                    .into_iter()
974                    .map(Self::canonicalize_identifier)
975                    .collect::<Vec<_>>();
976
977                ensure!(
978                    columns.len() == 1,
979                    InvalidTimeIndexSnafu {
980                        msg: "it should contain only one column in time index",
981                    }
982                );
983
984                Ok(Some(TableConstraint::TimeIndex {
985                    column: columns.pop().unwrap(),
986                }))
987            }
988            _ => {
989                self.parser.prev_token();
990                Ok(None)
991            }
992        }
993    }
994
995    /// Parses the set of valid formats
996    fn parse_table_engine(&mut self, default: &str) -> Result<String> {
997        if !self.consume_token(ENGINE) {
998            return Ok(default.to_string());
999        }
1000
1001        self.parser
1002            .expect_token(&Token::Eq)
1003            .context(error::UnexpectedSnafu {
1004                expected: "=",
1005                actual: self.peek_token_as_string(),
1006            })?;
1007
1008        let token = self.parser.next_token();
1009        if let Token::Word(w) = token.token {
1010            Ok(w.value)
1011        } else {
1012            self.expected("'Engine' is missing", token)
1013        }
1014    }
1015}
1016
1017fn validate_time_index(columns: &[Column], constraints: &[TableConstraint]) -> Result<()> {
1018    let time_index_constraints: Vec<_> = constraints
1019        .iter()
1020        .filter_map(|c| match c {
1021            TableConstraint::TimeIndex { column } => Some(column),
1022            _ => None,
1023        })
1024        .unique()
1025        .collect();
1026
1027    ensure!(!time_index_constraints.is_empty(), MissingTimeIndexSnafu);
1028    ensure!(
1029        time_index_constraints.len() == 1,
1030        InvalidTimeIndexSnafu {
1031            msg: format!(
1032                "expected only one time index constraint but actual {}",
1033                time_index_constraints.len()
1034            ),
1035        }
1036    );
1037
1038    // It's safe to use time_index_constraints[0][0],
1039    // we already check the bound above.
1040    let time_index_column_ident = &time_index_constraints[0];
1041    let time_index_column = columns
1042        .iter()
1043        .find(|c| c.name().value == *time_index_column_ident.value)
1044        .with_context(|| InvalidTimeIndexSnafu {
1045            msg: format!(
1046                "time index column {} not found in columns",
1047                time_index_column_ident
1048            ),
1049        })?;
1050
1051    let time_index_data_type = get_unalias_type(time_index_column.data_type());
1052    ensure!(
1053        matches!(time_index_data_type, DataType::Timestamp(_, _)),
1054        InvalidColumnOptionSnafu {
1055            name: time_index_column.name().to_string(),
1056            msg: "time index column data type should be timestamp",
1057        }
1058    );
1059
1060    Ok(())
1061}
1062
1063fn get_unalias_type(data_type: &DataType) -> DataType {
1064    match data_type {
1065        DataType::Custom(name, tokens) if name.0.len() == 1 && tokens.is_empty() => {
1066            if let Some(real_type) =
1067                get_data_type_by_alias_name(name.0[0].to_string_unquoted().as_str())
1068            {
1069                real_type
1070            } else {
1071                data_type.clone()
1072            }
1073        }
1074        _ => data_type.clone(),
1075    }
1076}
1077
1078fn validate_partitions(columns: &[Column], partitions: &Partitions) -> Result<()> {
1079    let partition_columns = ensure_partition_columns_defined(columns, partitions)?;
1080
1081    ensure_exprs_are_binary(&partitions.exprs, &partition_columns)?;
1082
1083    Ok(())
1084}
1085
1086/// Ensure all exprs are binary expr and all the columns are defined in the column list.
1087fn ensure_exprs_are_binary(exprs: &[Expr], columns: &[&Column]) -> Result<()> {
1088    for expr in exprs {
1089        // The first level must be binary expr
1090        if let Expr::BinaryOp { left, op: _, right } = expr {
1091            ensure_one_expr(left, columns)?;
1092            ensure_one_expr(right, columns)?;
1093        } else {
1094            return error::InvalidSqlSnafu {
1095                msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1096            }
1097            .fail();
1098        }
1099    }
1100    Ok(())
1101}
1102
1103/// Check if the expr is a binary expr, an ident or a literal value.
1104/// If is ident, then check it is in the column list.
1105/// This recursive function is intended to be used by [ensure_exprs_are_binary].
1106fn ensure_one_expr(expr: &Expr, columns: &[&Column]) -> Result<()> {
1107    match expr {
1108        Expr::BinaryOp { left, op: _, right } => {
1109            ensure_one_expr(left, columns)?;
1110            ensure_one_expr(right, columns)?;
1111            Ok(())
1112        }
1113        Expr::Identifier(ident) => {
1114            let column_name = &ident.value;
1115            ensure!(
1116                columns.iter().any(|c| &c.name().value == column_name),
1117                error::InvalidSqlSnafu {
1118                    msg: format!(
1119                        "Column {:?} in rule expr is not referenced in PARTITION ON",
1120                        column_name
1121                    ),
1122                }
1123            );
1124            Ok(())
1125        }
1126        Expr::Value(_) => Ok(()),
1127        Expr::UnaryOp { expr, .. } => {
1128            ensure_one_expr(expr, columns)?;
1129            Ok(())
1130        }
1131        _ => error::InvalidSqlSnafu {
1132            msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1133        }
1134        .fail(),
1135    }
1136}
1137
1138/// Ensure that all columns used in "PARTITION ON COLUMNS" are defined in create table.
1139fn ensure_partition_columns_defined<'a>(
1140    columns: &'a [Column],
1141    partitions: &'a Partitions,
1142) -> Result<Vec<&'a Column>> {
1143    partitions
1144        .column_list
1145        .iter()
1146        .map(|x| {
1147            let x = ParserContext::canonicalize_identifier(x.clone());
1148            // Normally the columns in "create table" won't be too many,
1149            // a linear search to find the target every time is fine.
1150            columns
1151                .iter()
1152                .find(|c| *c.name().value == x.value)
1153                .context(error::InvalidSqlSnafu {
1154                    msg: format!("Partition column {:?} not defined", x.value),
1155                })
1156        })
1157        .collect::<Result<Vec<&Column>>>()
1158}
1159
1160#[cfg(test)]
1161mod tests {
1162    use std::assert_matches::assert_matches;
1163    use std::collections::HashMap;
1164
1165    use common_catalog::consts::FILE_ENGINE;
1166    use common_error::ext::ErrorExt;
1167    use sqlparser::ast::ColumnOption::NotNull;
1168    use sqlparser::ast::{BinaryOperator, Expr, ObjectName, ObjectNamePart, Value};
1169    use sqlparser::dialect::GenericDialect;
1170    use sqlparser::tokenizer::Tokenizer;
1171
1172    use super::*;
1173    use crate::dialect::GreptimeDbDialect;
1174    use crate::parser::ParseOptions;
1175
1176    #[test]
1177    fn test_parse_create_table_like() {
1178        let sql = "CREATE TABLE t1 LIKE t2";
1179        let stmts =
1180            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1181                .unwrap();
1182
1183        assert_eq!(1, stmts.len());
1184        match &stmts[0] {
1185            Statement::CreateTableLike(c) => {
1186                assert_eq!(c.table_name.to_string(), "t1");
1187                assert_eq!(c.source_name.to_string(), "t2");
1188            }
1189            _ => unreachable!(),
1190        }
1191    }
1192
1193    #[test]
1194    fn test_validate_external_table_options() {
1195        let sql = "CREATE EXTERNAL TABLE city (
1196            host string,
1197            ts timestamp,
1198            cpu float64 default 0,
1199            memory float64,
1200            TIME INDEX (ts),
1201            PRIMARY KEY(ts, host)
1202        ) with(location='/var/data/city.csv',format='csv',foo='bar');";
1203
1204        let result =
1205            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1206        assert!(matches!(
1207            result,
1208            Err(error::Error::InvalidTableOption { .. })
1209        ));
1210    }
1211
1212    #[test]
1213    fn test_parse_create_external_table() {
1214        struct Test<'a> {
1215            sql: &'a str,
1216            expected_table_name: &'a str,
1217            expected_options: HashMap<&'a str, &'a str>,
1218            expected_engine: &'a str,
1219            expected_if_not_exist: bool,
1220        }
1221
1222        let tests = [
1223            Test {
1224                sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
1225                expected_table_name: "city",
1226                expected_options: HashMap::from([
1227                    ("location", "/var/data/city.csv"),
1228                    ("format", "csv"),
1229                ]),
1230                expected_engine: FILE_ENGINE,
1231                expected_if_not_exist: false,
1232            },
1233            Test {
1234                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');",
1235                expected_table_name: "city",
1236                expected_options: HashMap::from([
1237                    ("location", "/var/data/city.csv"),
1238                    ("format", "csv"),
1239                ]),
1240                expected_engine: "foo",
1241                expected_if_not_exist: true,
1242            },
1243            Test {
1244                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv','compaction.type'='bar');",
1245                expected_table_name: "city",
1246                expected_options: HashMap::from([
1247                    ("location", "/var/data/city.csv"),
1248                    ("format", "csv"),
1249                    ("compaction.type", "bar"),
1250                ]),
1251                expected_engine: "foo",
1252                expected_if_not_exist: true,
1253            },
1254        ];
1255
1256        for test in tests {
1257            let stmts = ParserContext::create_with_dialect(
1258                test.sql,
1259                &GreptimeDbDialect {},
1260                ParseOptions::default(),
1261            )
1262            .unwrap();
1263            assert_eq!(1, stmts.len());
1264            match &stmts[0] {
1265                Statement::CreateExternalTable(c) => {
1266                    assert_eq!(c.name.to_string(), test.expected_table_name.to_string());
1267                    assert_eq!(c.options.to_str_map(), test.expected_options);
1268                    assert_eq!(c.if_not_exists, test.expected_if_not_exist);
1269                    assert_eq!(c.engine, test.expected_engine);
1270                }
1271                _ => unreachable!(),
1272            }
1273        }
1274    }
1275
1276    #[test]
1277    fn test_parse_create_external_table_with_schema() {
1278        let sql = "CREATE EXTERNAL TABLE city (
1279            host string,
1280            ts timestamp,
1281            cpu float32 default 0,
1282            memory float64,
1283            TIME INDEX (ts),
1284            PRIMARY KEY(ts, host),
1285        ) with(location='/var/data/city.csv',format='csv');";
1286
1287        let options = HashMap::from([("location", "/var/data/city.csv"), ("format", "csv")]);
1288
1289        let stmts =
1290            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1291                .unwrap();
1292        assert_eq!(1, stmts.len());
1293        match &stmts[0] {
1294            Statement::CreateExternalTable(c) => {
1295                assert_eq!(c.name.to_string(), "city");
1296                assert_eq!(c.options.to_str_map(), options);
1297
1298                let columns = &c.columns;
1299                assert_column_def(&columns[0].column_def, "host", "STRING");
1300                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
1301                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
1302                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
1303
1304                let constraints = &c.constraints;
1305                assert_eq!(
1306                    &constraints[0],
1307                    &TableConstraint::TimeIndex {
1308                        column: Ident::new("ts"),
1309                    }
1310                );
1311                assert_eq!(
1312                    &constraints[1],
1313                    &TableConstraint::PrimaryKey {
1314                        columns: vec![Ident::new("ts"), Ident::new("host")]
1315                    }
1316                );
1317            }
1318            _ => unreachable!(),
1319        }
1320    }
1321
1322    #[test]
1323    fn test_parse_create_database() {
1324        let sql = "create database";
1325        let result =
1326            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1327        assert!(
1328            result
1329                .unwrap_err()
1330                .to_string()
1331                .contains("Unexpected token while parsing SQL statement")
1332        );
1333
1334        let sql = "create database prometheus";
1335        let stmts =
1336            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1337                .unwrap();
1338
1339        assert_eq!(1, stmts.len());
1340        match &stmts[0] {
1341            Statement::CreateDatabase(c) => {
1342                assert_eq!(c.name.to_string(), "prometheus");
1343                assert!(!c.if_not_exists);
1344            }
1345            _ => unreachable!(),
1346        }
1347
1348        let sql = "create database if not exists prometheus";
1349        let stmts =
1350            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1351                .unwrap();
1352
1353        assert_eq!(1, stmts.len());
1354        match &stmts[0] {
1355            Statement::CreateDatabase(c) => {
1356                assert_eq!(c.name.to_string(), "prometheus");
1357                assert!(c.if_not_exists);
1358            }
1359            _ => unreachable!(),
1360        }
1361
1362        let sql = "CREATE DATABASE `fOo`";
1363        let result =
1364            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1365        let stmts = result.unwrap();
1366        match &stmts.last().unwrap() {
1367            Statement::CreateDatabase(c) => {
1368                assert_eq!(c.name, vec![Ident::with_quote('`', "fOo")].into());
1369                assert!(!c.if_not_exists);
1370            }
1371            _ => unreachable!(),
1372        }
1373
1374        let sql = "CREATE DATABASE prometheus with (ttl='1h');";
1375        let result =
1376            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1377        let stmts = result.unwrap();
1378        match &stmts[0] {
1379            Statement::CreateDatabase(c) => {
1380                assert_eq!(c.name.to_string(), "prometheus");
1381                assert!(!c.if_not_exists);
1382                assert_eq!(c.options.get("ttl").unwrap(), "1h");
1383            }
1384            _ => unreachable!(),
1385        }
1386    }
1387
1388    #[test]
1389    fn test_parse_create_flow_more_testcases() {
1390        use pretty_assertions::assert_eq;
1391        fn parse_create_flow(sql: &str) -> CreateFlow {
1392            let stmts = ParserContext::create_with_dialect(
1393                sql,
1394                &GreptimeDbDialect {},
1395                ParseOptions::default(),
1396            )
1397            .unwrap();
1398            assert_eq!(1, stmts.len());
1399            match &stmts[0] {
1400                Statement::CreateFlow(c) => c.clone(),
1401                _ => unreachable!(),
1402            }
1403        }
1404        struct CreateFlowWoutQuery {
1405            /// Flow name
1406            pub flow_name: ObjectName,
1407            /// Output (sink) table name
1408            pub sink_table_name: ObjectName,
1409            /// Whether to replace existing task
1410            pub or_replace: bool,
1411            /// Create if not exist
1412            pub if_not_exists: bool,
1413            /// `EXPIRE AFTER`
1414            /// Duration in second as `i64`
1415            pub expire_after: Option<i64>,
1416            /// Comment string
1417            pub comment: Option<String>,
1418        }
1419        let testcases = vec![
1420            (
1421                r"
1422CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1423SINK TO schema_1.table_1
1424EXPIRE AFTER INTERVAL '5 minutes'
1425COMMENT 'test comment'
1426AS
1427SELECT max(c1), min(c2) FROM schema_2.table_2;",
1428                CreateFlowWoutQuery {
1429                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1430                    sink_table_name: ObjectName::from(vec![
1431                        Ident::new("schema_1"),
1432                        Ident::new("table_1"),
1433                    ]),
1434                    or_replace: true,
1435                    if_not_exists: true,
1436                    expire_after: Some(300),
1437                    comment: Some("test comment".to_string()),
1438                },
1439            ),
1440            (
1441                r"
1442CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1443SINK TO schema_1.table_1
1444EXPIRE AFTER INTERVAL '300 s'
1445COMMENT 'test comment'
1446AS
1447SELECT max(c1), min(c2) FROM schema_2.table_2;",
1448                CreateFlowWoutQuery {
1449                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1450                    sink_table_name: ObjectName::from(vec![
1451                        Ident::new("schema_1"),
1452                        Ident::new("table_1"),
1453                    ]),
1454                    or_replace: true,
1455                    if_not_exists: true,
1456                    expire_after: Some(300),
1457                    comment: Some("test comment".to_string()),
1458                },
1459            ),
1460            (
1461                r"
1462CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1463SINK TO schema_1.table_1
1464EXPIRE AFTER '5 minutes'
1465COMMENT 'test comment'
1466AS
1467SELECT max(c1), min(c2) FROM schema_2.table_2;",
1468                CreateFlowWoutQuery {
1469                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1470                    sink_table_name: ObjectName::from(vec![
1471                        Ident::new("schema_1"),
1472                        Ident::new("table_1"),
1473                    ]),
1474                    or_replace: true,
1475                    if_not_exists: true,
1476                    expire_after: Some(300),
1477                    comment: Some("test comment".to_string()),
1478                },
1479            ),
1480            (
1481                r"
1482CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1483SINK TO schema_1.table_1
1484EXPIRE AFTER '300 s'
1485COMMENT 'test comment'
1486AS
1487SELECT max(c1), min(c2) FROM schema_2.table_2;",
1488                CreateFlowWoutQuery {
1489                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1490                    sink_table_name: ObjectName::from(vec![
1491                        Ident::new("schema_1"),
1492                        Ident::new("table_1"),
1493                    ]),
1494                    or_replace: true,
1495                    if_not_exists: true,
1496                    expire_after: Some(300),
1497                    comment: Some("test comment".to_string()),
1498                },
1499            ),
1500            (
1501                r"
1502CREATE FLOW `task_2`
1503SINK TO schema_1.table_1
1504EXPIRE AFTER '2 days 1h 2 min'
1505AS
1506SELECT max(c1), min(c2) FROM schema_2.table_2;",
1507                CreateFlowWoutQuery {
1508                    flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_2")]),
1509                    sink_table_name: ObjectName::from(vec![
1510                        Ident::new("schema_1"),
1511                        Ident::new("table_1"),
1512                    ]),
1513                    or_replace: false,
1514                    if_not_exists: false,
1515                    expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1516                    comment: None,
1517                },
1518            ),
1519            (
1520                r"
1521create flow `task_3`
1522sink to schema_1.table_1
1523expire after '10 minutes'
1524as
1525select max(c1), min(c2) from schema_2.table_2;",
1526                CreateFlowWoutQuery {
1527                    flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_3")]),
1528                    sink_table_name: ObjectName::from(vec![
1529                        Ident::new("schema_1"),
1530                        Ident::new("table_1"),
1531                    ]),
1532                    or_replace: false,
1533                    if_not_exists: false,
1534                    expire_after: Some(600), // 10 minutes in seconds
1535                    comment: None,
1536                },
1537            ),
1538            (
1539                r"
1540create or replace flow if not exists task_4
1541sink to schema_1.table_1
1542expire after interval '2 hours'
1543comment 'lowercase test'
1544as
1545select max(c1), min(c2) from schema_2.table_2;",
1546                CreateFlowWoutQuery {
1547                    flow_name: ObjectName::from(vec![Ident::new("task_4")]),
1548                    sink_table_name: ObjectName::from(vec![
1549                        Ident::new("schema_1"),
1550                        Ident::new("table_1"),
1551                    ]),
1552                    or_replace: true,
1553                    if_not_exists: true,
1554                    expire_after: Some(7200), // 2 hours in seconds
1555                    comment: Some("lowercase test".to_string()),
1556                },
1557            ),
1558        ];
1559
1560        for (sql, expected) in testcases {
1561            let create_task = parse_create_flow(sql);
1562
1563            let expected = CreateFlow {
1564                flow_name: expected.flow_name,
1565                sink_table_name: expected.sink_table_name,
1566                or_replace: expected.or_replace,
1567                if_not_exists: expected.if_not_exists,
1568                expire_after: expected.expire_after,
1569                eval_interval: None,
1570                comment: expected.comment,
1571                // ignore query parse result
1572                query: create_task.query.clone(),
1573            };
1574
1575            assert_eq!(create_task, expected, "input sql is:\n{sql}");
1576            let show_create = create_task.to_string();
1577            let recreated = parse_create_flow(&show_create);
1578            assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1579        }
1580    }
1581
1582    #[test]
1583    fn test_parse_create_flow() {
1584        use pretty_assertions::assert_eq;
1585        fn parse_create_flow(sql: &str) -> CreateFlow {
1586            let stmts = ParserContext::create_with_dialect(
1587                sql,
1588                &GreptimeDbDialect {},
1589                ParseOptions::default(),
1590            )
1591            .unwrap();
1592            assert_eq!(1, stmts.len());
1593            match &stmts[0] {
1594                Statement::CreateFlow(c) => c.clone(),
1595                _ => panic!("{:?}", stmts[0]),
1596            }
1597        }
1598        struct CreateFlowWoutQuery {
1599            /// Flow name
1600            pub flow_name: ObjectName,
1601            /// Output (sink) table name
1602            pub sink_table_name: ObjectName,
1603            /// Whether to replace existing task
1604            pub or_replace: bool,
1605            /// Create if not exist
1606            pub if_not_exists: bool,
1607            /// `EXPIRE AFTER`
1608            /// Duration in second as `i64`
1609            pub expire_after: Option<i64>,
1610            /// Duration for flow evaluation interval
1611            /// Duration in seconds as `i64`
1612            /// If not set, flow will be evaluated based on time window size and other args.
1613            pub eval_interval: Option<i64>,
1614            /// Comment string
1615            pub comment: Option<String>,
1616        }
1617
1618        // create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE AFTER` and `COMMENT`
1619        let testcases = vec![
1620            (
1621                r"
1622CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1623SINK TO schema_1.table_1
1624EXPIRE AFTER INTERVAL '5 minutes'
1625COMMENT 'test comment'
1626AS
1627SELECT max(c1), min(c2) FROM schema_2.table_2;",
1628                CreateFlowWoutQuery {
1629                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1630                    sink_table_name: ObjectName(vec![
1631                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1632                        ObjectNamePart::Identifier(Ident::new("table_1")),
1633                    ]),
1634                    or_replace: true,
1635                    if_not_exists: true,
1636                    expire_after: Some(300),
1637                    eval_interval: None,
1638                    comment: Some("test comment".to_string()),
1639                },
1640            ),
1641            (
1642                r"
1643CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1644SINK TO schema_1.table_1
1645EXPIRE AFTER INTERVAL '300 s'
1646COMMENT 'test comment'
1647AS
1648SELECT max(c1), min(c2) FROM schema_2.table_2;",
1649                CreateFlowWoutQuery {
1650                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1651                    sink_table_name: ObjectName(vec![
1652                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1653                        ObjectNamePart::Identifier(Ident::new("table_1")),
1654                    ]),
1655                    or_replace: true,
1656                    if_not_exists: true,
1657                    expire_after: Some(300),
1658                    eval_interval: None,
1659                    comment: Some("test comment".to_string()),
1660                },
1661            ),
1662            (
1663                r"
1664CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1665SINK TO schema_1.table_1
1666EXPIRE AFTER '5 minutes'
1667EVAL INTERVAL '10 seconds'
1668COMMENT 'test comment'
1669AS
1670SELECT max(c1), min(c2) FROM schema_2.table_2;",
1671                CreateFlowWoutQuery {
1672                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1673                    sink_table_name: ObjectName(vec![
1674                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1675                        ObjectNamePart::Identifier(Ident::new("table_1")),
1676                    ]),
1677                    or_replace: true,
1678                    if_not_exists: true,
1679                    expire_after: Some(300),
1680                    eval_interval: Some(10),
1681                    comment: Some("test comment".to_string()),
1682                },
1683            ),
1684            (
1685                r"
1686CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1687SINK TO schema_1.table_1
1688EXPIRE AFTER '5 minutes'
1689EVAL INTERVAL INTERVAL '10 seconds'
1690COMMENT 'test comment'
1691AS
1692SELECT max(c1), min(c2) FROM schema_2.table_2;",
1693                CreateFlowWoutQuery {
1694                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1695                    sink_table_name: ObjectName(vec![
1696                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1697                        ObjectNamePart::Identifier(Ident::new("table_1")),
1698                    ]),
1699                    or_replace: true,
1700                    if_not_exists: true,
1701                    expire_after: Some(300),
1702                    eval_interval: Some(10),
1703                    comment: Some("test comment".to_string()),
1704                },
1705            ),
1706            (
1707                r"
1708CREATE FLOW `task_2`
1709SINK TO schema_1.table_1
1710EXPIRE AFTER '2 days 1h 2 min'
1711AS
1712SELECT max(c1), min(c2) FROM schema_2.table_2;",
1713                CreateFlowWoutQuery {
1714                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::with_quote(
1715                        '`', "task_2",
1716                    ))]),
1717                    sink_table_name: ObjectName(vec![
1718                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1719                        ObjectNamePart::Identifier(Ident::new("table_1")),
1720                    ]),
1721                    or_replace: false,
1722                    if_not_exists: false,
1723                    expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1724                    eval_interval: None,
1725                    comment: None,
1726                },
1727            ),
1728        ];
1729
1730        for (sql, expected) in testcases {
1731            let create_task = parse_create_flow(sql);
1732
1733            let expected = CreateFlow {
1734                flow_name: expected.flow_name,
1735                sink_table_name: expected.sink_table_name,
1736                or_replace: expected.or_replace,
1737                if_not_exists: expected.if_not_exists,
1738                expire_after: expected.expire_after,
1739                eval_interval: expected.eval_interval,
1740                comment: expected.comment,
1741                // ignore query parse result
1742                query: create_task.query.clone(),
1743            };
1744
1745            assert_eq!(create_task, expected, "input sql is:\n{sql}");
1746            let show_create = create_task.to_string();
1747            let recreated = parse_create_flow(&show_create);
1748            assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1749        }
1750    }
1751
1752    #[test]
1753    fn test_create_flow_no_month() {
1754        let sql = r"
1755CREATE FLOW `task_2`
1756SINK TO schema_1.table_1
1757EXPIRE AFTER '1 month 2 days 1h 2 min'
1758AS
1759SELECT max(c1), min(c2) FROM schema_2.table_2;";
1760        let stmts =
1761            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1762
1763        assert!(
1764            stmts.is_err()
1765                && stmts
1766                    .unwrap_err()
1767                    .to_string()
1768                    .contains("Interval with months is not allowed")
1769        );
1770    }
1771
1772    #[test]
1773    fn test_validate_create() {
1774        let sql = r"
1775CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp TIME INDEX)
1776PARTITION ON COLUMNS(c, a) (
1777    a < 10,
1778    a > 10 AND a < 20,
1779    a > 20 AND c < 100,
1780    a > 20 AND c >= 100
1781)
1782ENGINE=mito";
1783        let result =
1784            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1785        let _ = result.unwrap();
1786
1787        let sql = r"
1788CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
1789PARTITION ON COLUMNS(x) ()
1790ENGINE=mito";
1791        let result =
1792            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1793        assert!(
1794            result
1795                .unwrap_err()
1796                .to_string()
1797                .contains("Partition column \"x\" not defined")
1798        );
1799    }
1800
1801    #[test]
1802    fn test_parse_create_table_with_partitions() {
1803        let sql = r"
1804CREATE TABLE monitor (
1805  host_id    INT,
1806  idc        STRING,
1807  ts         TIMESTAMP,
1808  cpu        DOUBLE DEFAULT 0,
1809  memory     DOUBLE,
1810  TIME INDEX (ts),
1811  PRIMARY KEY (host),
1812)
1813PARTITION ON COLUMNS(idc, host_id) (
1814  idc <= 'hz' AND host_id < 1000,
1815  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
1816  idc > 'sh' AND host_id < 3000,
1817  idc > 'sh' AND host_id >= 3000,
1818)
1819ENGINE=mito";
1820        let result =
1821            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1822                .unwrap();
1823        assert_eq!(result.len(), 1);
1824        match &result[0] {
1825            Statement::CreateTable(c) => {
1826                assert!(c.partitions.is_some());
1827
1828                let partitions = c.partitions.as_ref().unwrap();
1829                let column_list = partitions
1830                    .column_list
1831                    .iter()
1832                    .map(|x| &x.value)
1833                    .collect::<Vec<&String>>();
1834                assert_eq!(column_list, vec!["idc", "host_id"]);
1835
1836                let exprs = &partitions.exprs;
1837
1838                assert_eq!(
1839                    exprs[0],
1840                    Expr::BinaryOp {
1841                        left: Box::new(Expr::BinaryOp {
1842                            left: Box::new(Expr::Identifier("idc".into())),
1843                            op: BinaryOperator::LtEq,
1844                            right: Box::new(Expr::Value(
1845                                Value::SingleQuotedString("hz".to_string()).into()
1846                            ))
1847                        }),
1848                        op: BinaryOperator::And,
1849                        right: Box::new(Expr::BinaryOp {
1850                            left: Box::new(Expr::Identifier("host_id".into())),
1851                            op: BinaryOperator::Lt,
1852                            right: Box::new(Expr::Value(
1853                                Value::Number("1000".to_string(), false).into()
1854                            ))
1855                        })
1856                    }
1857                );
1858                assert_eq!(
1859                    exprs[1],
1860                    Expr::BinaryOp {
1861                        left: Box::new(Expr::BinaryOp {
1862                            left: Box::new(Expr::BinaryOp {
1863                                left: Box::new(Expr::Identifier("idc".into())),
1864                                op: BinaryOperator::Gt,
1865                                right: Box::new(Expr::Value(
1866                                    Value::SingleQuotedString("hz".to_string()).into()
1867                                ))
1868                            }),
1869                            op: BinaryOperator::And,
1870                            right: Box::new(Expr::BinaryOp {
1871                                left: Box::new(Expr::Identifier("idc".into())),
1872                                op: BinaryOperator::LtEq,
1873                                right: Box::new(Expr::Value(
1874                                    Value::SingleQuotedString("sh".to_string()).into()
1875                                ))
1876                            })
1877                        }),
1878                        op: BinaryOperator::And,
1879                        right: Box::new(Expr::BinaryOp {
1880                            left: Box::new(Expr::Identifier("host_id".into())),
1881                            op: BinaryOperator::Lt,
1882                            right: Box::new(Expr::Value(
1883                                Value::Number("2000".to_string(), false).into()
1884                            ))
1885                        })
1886                    }
1887                );
1888                assert_eq!(
1889                    exprs[2],
1890                    Expr::BinaryOp {
1891                        left: Box::new(Expr::BinaryOp {
1892                            left: Box::new(Expr::Identifier("idc".into())),
1893                            op: BinaryOperator::Gt,
1894                            right: Box::new(Expr::Value(
1895                                Value::SingleQuotedString("sh".to_string()).into()
1896                            ))
1897                        }),
1898                        op: BinaryOperator::And,
1899                        right: Box::new(Expr::BinaryOp {
1900                            left: Box::new(Expr::Identifier("host_id".into())),
1901                            op: BinaryOperator::Lt,
1902                            right: Box::new(Expr::Value(
1903                                Value::Number("3000".to_string(), false).into()
1904                            ))
1905                        })
1906                    }
1907                );
1908                assert_eq!(
1909                    exprs[3],
1910                    Expr::BinaryOp {
1911                        left: Box::new(Expr::BinaryOp {
1912                            left: Box::new(Expr::Identifier("idc".into())),
1913                            op: BinaryOperator::Gt,
1914                            right: Box::new(Expr::Value(
1915                                Value::SingleQuotedString("sh".to_string()).into()
1916                            ))
1917                        }),
1918                        op: BinaryOperator::And,
1919                        right: Box::new(Expr::BinaryOp {
1920                            left: Box::new(Expr::Identifier("host_id".into())),
1921                            op: BinaryOperator::GtEq,
1922                            right: Box::new(Expr::Value(
1923                                Value::Number("3000".to_string(), false).into()
1924                            ))
1925                        })
1926                    }
1927                );
1928            }
1929            _ => unreachable!(),
1930        }
1931    }
1932
1933    #[test]
1934    fn test_parse_create_table_with_quoted_partitions() {
1935        let sql = r"
1936CREATE TABLE monitor (
1937  `host_id`    INT,
1938  idc        STRING,
1939  ts         TIMESTAMP,
1940  cpu        DOUBLE DEFAULT 0,
1941  memory     DOUBLE,
1942  TIME INDEX (ts),
1943  PRIMARY KEY (host),
1944)
1945PARTITION ON COLUMNS(IdC, host_id) (
1946  idc <= 'hz' AND host_id < 1000,
1947  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
1948  idc > 'sh' AND host_id < 3000,
1949  idc > 'sh' AND host_id >= 3000,
1950)";
1951        let result =
1952            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1953                .unwrap();
1954        assert_eq!(result.len(), 1);
1955    }
1956
1957    #[test]
1958    fn test_parse_create_table_with_timestamp_index() {
1959        let sql1 = r"
1960CREATE TABLE monitor (
1961  host_id    INT,
1962  idc        STRING,
1963  ts         TIMESTAMP TIME INDEX,
1964  cpu        DOUBLE DEFAULT 0,
1965  memory     DOUBLE,
1966  PRIMARY KEY (host),
1967)
1968ENGINE=mito";
1969        let result1 = ParserContext::create_with_dialect(
1970            sql1,
1971            &GreptimeDbDialect {},
1972            ParseOptions::default(),
1973        )
1974        .unwrap();
1975
1976        if let Statement::CreateTable(c) = &result1[0] {
1977            assert_eq!(c.constraints.len(), 2);
1978            let tc = c.constraints[0].clone();
1979            match tc {
1980                TableConstraint::TimeIndex { column } => {
1981                    assert_eq!(&column.value, "ts");
1982                }
1983                _ => panic!("should be time index constraint"),
1984            };
1985        } else {
1986            panic!("should be create_table statement");
1987        }
1988
1989        // `TIME INDEX` should be in front of `PRIMARY KEY`
1990        // in order to equal the `TIMESTAMP TIME INDEX` constraint options vector
1991        let sql2 = r"
1992CREATE TABLE monitor (
1993  host_id    INT,
1994  idc        STRING,
1995  ts         TIMESTAMP NOT NULL,
1996  cpu        DOUBLE DEFAULT 0,
1997  memory     DOUBLE,
1998  TIME INDEX (ts),
1999  PRIMARY KEY (host),
2000)
2001ENGINE=mito";
2002        let result2 = ParserContext::create_with_dialect(
2003            sql2,
2004            &GreptimeDbDialect {},
2005            ParseOptions::default(),
2006        )
2007        .unwrap();
2008
2009        assert_eq!(result1, result2);
2010
2011        // TIMESTAMP can be NULL which is not equal to above
2012        let sql3 = r"
2013CREATE TABLE monitor (
2014  host_id    INT,
2015  idc        STRING,
2016  ts         TIMESTAMP,
2017  cpu        DOUBLE DEFAULT 0,
2018  memory     DOUBLE,
2019  TIME INDEX (ts),
2020  PRIMARY KEY (host),
2021)
2022ENGINE=mito";
2023
2024        let result3 = ParserContext::create_with_dialect(
2025            sql3,
2026            &GreptimeDbDialect {},
2027            ParseOptions::default(),
2028        )
2029        .unwrap();
2030
2031        assert_ne!(result1, result3);
2032
2033        // BIGINT can't be time index any more
2034        let sql1 = r"
2035CREATE TABLE monitor (
2036  host_id    INT,
2037  idc        STRING,
2038  b          bigint TIME INDEX,
2039  cpu        DOUBLE DEFAULT 0,
2040  memory     DOUBLE,
2041  PRIMARY KEY (host),
2042)
2043ENGINE=mito";
2044        let result1 = ParserContext::create_with_dialect(
2045            sql1,
2046            &GreptimeDbDialect {},
2047            ParseOptions::default(),
2048        );
2049
2050        assert!(
2051            result1
2052                .unwrap_err()
2053                .to_string()
2054                .contains("time index column data type should be timestamp")
2055        );
2056    }
2057
2058    #[test]
2059    fn test_parse_create_table_with_timestamp_index_not_null() {
2060        let sql = r"
2061CREATE TABLE monitor (
2062  host_id    INT,
2063  idc        STRING,
2064  ts         TIMESTAMP TIME INDEX,
2065  cpu        DOUBLE DEFAULT 0,
2066  memory     DOUBLE,
2067  TIME INDEX (ts),
2068  PRIMARY KEY (host),
2069)
2070ENGINE=mito";
2071        let result =
2072            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2073                .unwrap();
2074
2075        assert_eq!(result.len(), 1);
2076        if let Statement::CreateTable(c) = &result[0] {
2077            let ts = c.columns[2].clone();
2078            assert_eq!(ts.name().to_string(), "ts");
2079            assert_eq!(ts.options()[0].option, NotNull);
2080        } else {
2081            panic!("should be create table statement");
2082        }
2083
2084        let sql1 = r"
2085CREATE TABLE monitor (
2086  host_id    INT,
2087  idc        STRING,
2088  ts         TIMESTAMP NOT NULL TIME INDEX,
2089  cpu        DOUBLE DEFAULT 0,
2090  memory     DOUBLE,
2091  TIME INDEX (ts),
2092  PRIMARY KEY (host),
2093)
2094ENGINE=mito";
2095
2096        let result1 = ParserContext::create_with_dialect(
2097            sql1,
2098            &GreptimeDbDialect {},
2099            ParseOptions::default(),
2100        )
2101        .unwrap();
2102        assert_eq!(result, result1);
2103
2104        let sql2 = r"
2105CREATE TABLE monitor (
2106  host_id    INT,
2107  idc        STRING,
2108  ts         TIMESTAMP TIME INDEX NOT NULL,
2109  cpu        DOUBLE DEFAULT 0,
2110  memory     DOUBLE,
2111  TIME INDEX (ts),
2112  PRIMARY KEY (host),
2113)
2114ENGINE=mito";
2115
2116        let result2 = ParserContext::create_with_dialect(
2117            sql2,
2118            &GreptimeDbDialect {},
2119            ParseOptions::default(),
2120        )
2121        .unwrap();
2122        assert_eq!(result, result2);
2123
2124        let sql3 = r"
2125CREATE TABLE monitor (
2126  host_id    INT,
2127  idc        STRING,
2128  ts         TIMESTAMP TIME INDEX NULL NOT,
2129  cpu        DOUBLE DEFAULT 0,
2130  memory     DOUBLE,
2131  TIME INDEX (ts),
2132  PRIMARY KEY (host),
2133)
2134ENGINE=mito";
2135
2136        let result3 = ParserContext::create_with_dialect(
2137            sql3,
2138            &GreptimeDbDialect {},
2139            ParseOptions::default(),
2140        );
2141        assert!(result3.is_err());
2142
2143        let sql4 = r"
2144CREATE TABLE monitor (
2145  host_id    INT,
2146  idc        STRING,
2147  ts         TIMESTAMP TIME INDEX NOT NULL NULL,
2148  cpu        DOUBLE DEFAULT 0,
2149  memory     DOUBLE,
2150  TIME INDEX (ts),
2151  PRIMARY KEY (host),
2152)
2153ENGINE=mito";
2154
2155        let result4 = ParserContext::create_with_dialect(
2156            sql4,
2157            &GreptimeDbDialect {},
2158            ParseOptions::default(),
2159        );
2160        assert!(result4.is_err());
2161
2162        let sql = r"
2163CREATE TABLE monitor (
2164  host_id    INT,
2165  idc        STRING,
2166  ts         TIMESTAMP TIME INDEX DEFAULT CURRENT_TIMESTAMP,
2167  cpu        DOUBLE DEFAULT 0,
2168  memory     DOUBLE,
2169  TIME INDEX (ts),
2170  PRIMARY KEY (host),
2171)
2172ENGINE=mito";
2173
2174        let result =
2175            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2176                .unwrap();
2177
2178        if let Statement::CreateTable(c) = &result[0] {
2179            let tc = c.constraints[0].clone();
2180            match tc {
2181                TableConstraint::TimeIndex { column } => {
2182                    assert_eq!(&column.value, "ts");
2183                }
2184                _ => panic!("should be time index constraint"),
2185            }
2186            let ts = c.columns[2].clone();
2187            assert_eq!(ts.name().to_string(), "ts");
2188            assert!(matches!(ts.options()[0].option, ColumnOption::Default(..)));
2189            assert_eq!(ts.options()[1].option, NotNull);
2190        } else {
2191            unreachable!("should be create table statement");
2192        }
2193    }
2194
2195    #[test]
2196    fn test_parse_partitions_with_error_syntax() {
2197        let sql = r"
2198CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2199PARTITION COLUMNS(c, a) (
2200    a < 10,
2201    a > 10 AND a < 20,
2202    a > 20 AND c < 100,
2203    a > 20 AND c >= 100
2204)
2205ENGINE=mito";
2206        let result =
2207            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2208        assert!(
2209            result
2210                .unwrap_err()
2211                .output_msg()
2212                .contains("sql parser error: Expected: ON, found: COLUMNS")
2213        );
2214    }
2215
2216    #[test]
2217    fn test_parse_partitions_without_rule() {
2218        let sql = r"
2219CREATE TABLE rcx ( a INT, b STRING, c INT, d TIMESTAMP TIME INDEX )
2220PARTITION ON COLUMNS(c, a) ()
2221ENGINE=mito";
2222        ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2223            .unwrap();
2224    }
2225
2226    #[test]
2227    fn test_parse_partitions_unreferenced_column() {
2228        let sql = r"
2229CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2230PARTITION ON COLUMNS(c, a) (
2231    b = 'foo'
2232)
2233ENGINE=mito";
2234        let result =
2235            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2236        assert_eq!(
2237            result.unwrap_err().output_msg(),
2238            "Invalid SQL, error: Column \"b\" in rule expr is not referenced in PARTITION ON"
2239        );
2240    }
2241
2242    #[test]
2243    fn test_parse_partitions_not_binary_expr() {
2244        let sql = r"
2245CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2246PARTITION ON COLUMNS(c, a) (
2247    b
2248)
2249ENGINE=mito";
2250        let result =
2251            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2252        assert_eq!(
2253            result.unwrap_err().output_msg(),
2254            r#"Invalid SQL, error: Partition rule expr Identifier(Ident { value: "b", quote_style: None, span: Span(Location(4,5)..Location(4,6)) }) is not a binary expr"#
2255        );
2256    }
2257
2258    fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {
2259        assert_eq!(column.name.to_string(), name);
2260        assert_eq!(column.data_type.to_string(), data_type);
2261    }
2262
2263    #[test]
2264    pub fn test_parse_create_table() {
2265        let sql = r"create table demo(
2266                             host string,
2267                             ts timestamp,
2268                             cpu float32 default 0,
2269                             memory float64,
2270                             TIME INDEX (ts),
2271                             PRIMARY KEY(ts, host),
2272                             ) engine=mito
2273                             with(ttl='10s');
2274         ";
2275        let result =
2276            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2277                .unwrap();
2278        assert_eq!(1, result.len());
2279        match &result[0] {
2280            Statement::CreateTable(c) => {
2281                assert!(!c.if_not_exists);
2282                assert_eq!("demo", c.name.to_string());
2283                assert_eq!("mito", c.engine);
2284                assert_eq!(4, c.columns.len());
2285                let columns = &c.columns;
2286                assert_column_def(&columns[0].column_def, "host", "STRING");
2287                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
2288                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
2289                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
2290
2291                let constraints = &c.constraints;
2292                assert_eq!(
2293                    &constraints[0],
2294                    &TableConstraint::TimeIndex {
2295                        column: Ident::new("ts"),
2296                    }
2297                );
2298                assert_eq!(
2299                    &constraints[1],
2300                    &TableConstraint::PrimaryKey {
2301                        columns: vec![Ident::new("ts"), Ident::new("host")]
2302                    }
2303                );
2304                // inverted index is merged into column options
2305                assert_eq!(1, c.options.len());
2306                assert_eq!(
2307                    [("ttl", "10s")].into_iter().collect::<HashMap<_, _>>(),
2308                    c.options.to_str_map()
2309                );
2310            }
2311            _ => unreachable!(),
2312        }
2313    }
2314
2315    #[test]
2316    fn test_invalid_index_keys() {
2317        let sql = r"create table demo(
2318                             host string,
2319                             ts int64,
2320                             cpu float64 default 0,
2321                             memory float64,
2322                             TIME INDEX (ts, host),
2323                             PRIMARY KEY(ts, host)) engine=mito;
2324         ";
2325        let result =
2326            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2327        assert!(result.is_err());
2328        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2329    }
2330
2331    #[test]
2332    fn test_duplicated_time_index() {
2333        let sql = r"create table demo(
2334                             host string,
2335                             ts timestamp time index,
2336                             t timestamp time index,
2337                             cpu float64 default 0,
2338                             memory float64,
2339                             TIME INDEX (ts, host),
2340                             PRIMARY KEY(ts, host)) engine=mito;
2341         ";
2342        let result =
2343            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2344        assert!(result.is_err());
2345        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2346
2347        let sql = r"create table demo(
2348                             host string,
2349                             ts timestamp time index,
2350                             cpu float64 default 0,
2351                             t timestamp,
2352                             memory float64,
2353                             TIME INDEX (t),
2354                             PRIMARY KEY(ts, host)) engine=mito;
2355         ";
2356        let result =
2357            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2358        assert!(result.is_err());
2359        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2360    }
2361
2362    #[test]
2363    fn test_invalid_column_name() {
2364        let sql = "create table foo(user string, i timestamp time index)";
2365        let result =
2366            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2367        let err = result.unwrap_err().output_msg();
2368        assert!(err.contains("Cannot use keyword 'user' as column name"));
2369
2370        // If column name is quoted, it's valid even same with keyword.
2371        let sql = r#"
2372            create table foo("user" string, i timestamp time index)
2373        "#;
2374        let result =
2375            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2376        let _ = result.unwrap();
2377    }
2378
2379    #[test]
2380    fn test_incorrect_default_value_issue_3479() {
2381        let sql = r#"CREATE TABLE `ExcePTuRi`(
2382non TIMESTAMP(6) TIME INDEX,
2383`iUSTO` DOUBLE DEFAULT 0.047318541668048164
2384)"#;
2385        let result =
2386            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2387                .unwrap();
2388        assert_eq!(1, result.len());
2389        match &result[0] {
2390            Statement::CreateTable(c) => {
2391                assert_eq!(
2392                    "`iUSTO` DOUBLE DEFAULT 0.047318541668048164",
2393                    c.columns[1].to_string()
2394                );
2395            }
2396            _ => unreachable!(),
2397        }
2398    }
2399
2400    #[test]
2401    fn test_parse_create_view() {
2402        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
2403
2404        let result =
2405            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2406                .unwrap();
2407        match &result[0] {
2408            Statement::CreateView(c) => {
2409                assert_eq!(c.to_string(), sql);
2410                assert!(!c.or_replace);
2411                assert!(!c.if_not_exists);
2412                assert_eq!("test", c.name.to_string());
2413            }
2414            _ => unreachable!(),
2415        }
2416
2417        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS";
2418
2419        let result =
2420            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2421                .unwrap();
2422        match &result[0] {
2423            Statement::CreateView(c) => {
2424                assert_eq!(c.to_string(), sql);
2425                assert!(c.or_replace);
2426                assert!(c.if_not_exists);
2427                assert_eq!("test", c.name.to_string());
2428            }
2429            _ => unreachable!(),
2430        }
2431    }
2432
2433    #[test]
2434    fn test_parse_create_view_invalid_query() {
2435        let sql = "CREATE VIEW test AS DELETE from demo";
2436        let result =
2437            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2438        assert!(result.is_ok_and(|x| x.len() == 1));
2439    }
2440
2441    #[test]
2442    fn test_parse_create_table_fulltext_options() {
2443        let sql1 = r"
2444CREATE TABLE log (
2445    ts TIMESTAMP TIME INDEX,
2446    msg TEXT FULLTEXT INDEX,
2447)";
2448        let result1 = ParserContext::create_with_dialect(
2449            sql1,
2450            &GreptimeDbDialect {},
2451            ParseOptions::default(),
2452        )
2453        .unwrap();
2454
2455        if let Statement::CreateTable(c) = &result1[0] {
2456            c.columns.iter().for_each(|col| {
2457                if col.name().value == "msg" {
2458                    assert!(
2459                        col.extensions
2460                            .fulltext_index_options
2461                            .as_ref()
2462                            .unwrap()
2463                            .is_empty()
2464                    );
2465                }
2466            });
2467        } else {
2468            panic!("should be create_table statement");
2469        }
2470
2471        let sql2 = r"
2472CREATE TABLE log (
2473    ts TIMESTAMP TIME INDEX,
2474    msg STRING FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false')
2475)";
2476        let result2 = ParserContext::create_with_dialect(
2477            sql2,
2478            &GreptimeDbDialect {},
2479            ParseOptions::default(),
2480        )
2481        .unwrap();
2482
2483        if let Statement::CreateTable(c) = &result2[0] {
2484            c.columns.iter().for_each(|col| {
2485                if col.name().value == "msg" {
2486                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2487                    assert_eq!(options.len(), 2);
2488                    assert_eq!(options.get("analyzer").unwrap(), "English");
2489                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2490                }
2491            });
2492        } else {
2493            panic!("should be create_table statement");
2494        }
2495
2496        let sql3 = r"
2497CREATE TABLE log (
2498    ts TIMESTAMP TIME INDEX,
2499    msg1 TINYTEXT FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false'),
2500    msg2 CHAR(20) FULLTEXT INDEX WITH (analyzer='Chinese', case_sensitive='true')
2501)";
2502        let result3 = ParserContext::create_with_dialect(
2503            sql3,
2504            &GreptimeDbDialect {},
2505            ParseOptions::default(),
2506        )
2507        .unwrap();
2508
2509        if let Statement::CreateTable(c) = &result3[0] {
2510            c.columns.iter().for_each(|col| {
2511                if col.name().value == "msg1" {
2512                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2513                    assert_eq!(options.len(), 2);
2514                    assert_eq!(options.get("analyzer").unwrap(), "English");
2515                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2516                } else if col.name().value == "msg2" {
2517                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2518                    assert_eq!(options.len(), 2);
2519                    assert_eq!(options.get("analyzer").unwrap(), "Chinese");
2520                    assert_eq!(options.get("case_sensitive").unwrap(), "true");
2521                }
2522            });
2523        } else {
2524            panic!("should be create_table statement");
2525        }
2526    }
2527
2528    #[test]
2529    fn test_parse_create_table_fulltext_options_invalid_type() {
2530        let sql = r"
2531CREATE TABLE log (
2532    ts TIMESTAMP TIME INDEX,
2533    msg INT FULLTEXT INDEX,
2534)";
2535        let result =
2536            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2537        assert!(result.is_err());
2538        assert!(
2539            result
2540                .unwrap_err()
2541                .to_string()
2542                .contains("FULLTEXT index only supports string type")
2543        );
2544    }
2545
2546    #[test]
2547    fn test_parse_create_table_fulltext_options_duplicate() {
2548        let sql = r"
2549CREATE TABLE log (
2550    ts TIMESTAMP TIME INDEX,
2551    msg STRING FULLTEXT INDEX WITH (analyzer='English', analyzer='Chinese') FULLTEXT INDEX WITH (case_sensitive='false')
2552)";
2553        let result =
2554            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2555        assert!(result.is_err());
2556        assert!(
2557            result
2558                .unwrap_err()
2559                .to_string()
2560                .contains("duplicated FULLTEXT INDEX option")
2561        );
2562    }
2563
2564    #[test]
2565    fn test_parse_create_table_fulltext_options_invalid_option() {
2566        let sql = r"
2567CREATE TABLE log (
2568    ts TIMESTAMP TIME INDEX,
2569    msg STRING FULLTEXT INDEX WITH (analyzer='English', invalid_option='Chinese')
2570)";
2571        let result =
2572            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2573        assert!(result.is_err());
2574        assert!(
2575            result
2576                .unwrap_err()
2577                .to_string()
2578                .contains("invalid FULLTEXT INDEX option")
2579        );
2580    }
2581
2582    #[test]
2583    fn test_parse_create_table_skip_options() {
2584        let sql = r"
2585CREATE TABLE log (
2586    ts TIMESTAMP TIME INDEX,
2587    msg INT SKIPPING INDEX WITH (granularity='8192', type='bloom'),
2588)";
2589        let result =
2590            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2591                .unwrap();
2592
2593        if let Statement::CreateTable(c) = &result[0] {
2594            c.columns.iter().for_each(|col| {
2595                if col.name().value == "msg" {
2596                    assert!(
2597                        !col.extensions
2598                            .skipping_index_options
2599                            .as_ref()
2600                            .unwrap()
2601                            .is_empty()
2602                    );
2603                }
2604            });
2605        } else {
2606            panic!("should be create_table statement");
2607        }
2608
2609        let sql = r"
2610        CREATE TABLE log (
2611            ts TIMESTAMP TIME INDEX,
2612            msg INT SKIPPING INDEX,
2613        )";
2614        let result =
2615            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2616                .unwrap();
2617
2618        if let Statement::CreateTable(c) = &result[0] {
2619            c.columns.iter().for_each(|col| {
2620                if col.name().value == "msg" {
2621                    assert!(
2622                        col.extensions
2623                            .skipping_index_options
2624                            .as_ref()
2625                            .unwrap()
2626                            .is_empty()
2627                    );
2628                }
2629            });
2630        } else {
2631            panic!("should be create_table statement");
2632        }
2633    }
2634
2635    #[test]
2636    fn test_parse_create_view_with_columns() {
2637        let sql = "CREATE VIEW test () AS SELECT * FROM NUMBERS";
2638        let result =
2639            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2640                .unwrap();
2641
2642        match &result[0] {
2643            Statement::CreateView(c) => {
2644                assert_eq!(c.to_string(), "CREATE VIEW test AS SELECT * FROM NUMBERS");
2645                assert!(!c.or_replace);
2646                assert!(!c.if_not_exists);
2647                assert_eq!("test", c.name.to_string());
2648            }
2649            _ => unreachable!(),
2650        }
2651        assert_eq!(
2652            "CREATE VIEW test AS SELECT * FROM NUMBERS",
2653            result[0].to_string()
2654        );
2655
2656        let sql = "CREATE VIEW test (n1) AS SELECT * FROM NUMBERS";
2657        let result =
2658            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2659                .unwrap();
2660
2661        match &result[0] {
2662            Statement::CreateView(c) => {
2663                assert_eq!(c.to_string(), sql);
2664                assert!(!c.or_replace);
2665                assert!(!c.if_not_exists);
2666                assert_eq!("test", c.name.to_string());
2667            }
2668            _ => unreachable!(),
2669        }
2670        assert_eq!(sql, result[0].to_string());
2671
2672        let sql = "CREATE VIEW test (n1, n2) AS SELECT * FROM NUMBERS";
2673        let result =
2674            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2675                .unwrap();
2676
2677        match &result[0] {
2678            Statement::CreateView(c) => {
2679                assert_eq!(c.to_string(), sql);
2680                assert!(!c.or_replace);
2681                assert!(!c.if_not_exists);
2682                assert_eq!("test", c.name.to_string());
2683            }
2684            _ => unreachable!(),
2685        }
2686        assert_eq!(sql, result[0].to_string());
2687
2688        // Some invalid syntax cases
2689        let sql = "CREATE VIEW test (n1 AS select * from demo";
2690        let result =
2691            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2692        assert!(result.is_err());
2693
2694        let sql = "CREATE VIEW test (n1, AS select * from demo";
2695        let result =
2696            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2697        assert!(result.is_err());
2698
2699        let sql = "CREATE VIEW test n1,n2) AS select * from demo";
2700        let result =
2701            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2702        assert!(result.is_err());
2703
2704        let sql = "CREATE VIEW test (1) AS select * from demo";
2705        let result =
2706            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2707        assert!(result.is_err());
2708
2709        // keyword
2710        let sql = "CREATE VIEW test (n1, select) AS select * from demo";
2711        let result =
2712            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2713        assert!(result.is_err());
2714    }
2715
2716    #[test]
2717    fn test_parse_column_extensions_vector() {
2718        let sql = "VECTOR(128)";
2719        let dialect = GenericDialect {};
2720        let mut tokenizer = Tokenizer::new(&dialect, sql);
2721        let tokens = tokenizer.tokenize().unwrap();
2722        let mut parser = Parser::new(&dialect).with_tokens(tokens);
2723        let name = Ident::new("vec_col");
2724        let data_type =
2725            DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
2726        let mut extensions = ColumnExtensions::default();
2727
2728        let result =
2729            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2730        assert!(result.is_ok());
2731        assert!(extensions.vector_options.is_some());
2732        let vector_options = extensions.vector_options.unwrap();
2733        assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some("128"));
2734    }
2735
2736    #[test]
2737    fn test_parse_column_extensions_vector_invalid() {
2738        let sql = "VECTOR()";
2739        let dialect = GenericDialect {};
2740        let mut tokenizer = Tokenizer::new(&dialect, sql);
2741        let tokens = tokenizer.tokenize().unwrap();
2742        let mut parser = Parser::new(&dialect).with_tokens(tokens);
2743        let name = Ident::new("vec_col");
2744        let data_type = DataType::Custom(vec![Ident::new("VECTOR")].into(), vec![]);
2745        let mut extensions = ColumnExtensions::default();
2746
2747        let result =
2748            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2749        assert!(result.is_err());
2750    }
2751
2752    #[test]
2753    fn test_parse_column_extensions_indices() {
2754        // Test skipping index
2755        {
2756            let sql = "SKIPPING INDEX";
2757            let dialect = GenericDialect {};
2758            let mut tokenizer = Tokenizer::new(&dialect, sql);
2759            let tokens = tokenizer.tokenize().unwrap();
2760            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2761            let name = Ident::new("col");
2762            let data_type = DataType::String(None);
2763            let mut extensions = ColumnExtensions::default();
2764            let result = ParserContext::parse_column_extensions(
2765                &mut parser,
2766                &name,
2767                &data_type,
2768                &mut extensions,
2769            );
2770            assert!(result.is_ok());
2771            assert!(extensions.skipping_index_options.is_some());
2772        }
2773
2774        // Test fulltext index with options
2775        {
2776            let sql = "FULLTEXT INDEX WITH (analyzer = 'English', case_sensitive = 'true')";
2777            let dialect = GenericDialect {};
2778            let mut tokenizer = Tokenizer::new(&dialect, sql);
2779            let tokens = tokenizer.tokenize().unwrap();
2780            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2781            let name = Ident::new("text_col");
2782            let data_type = DataType::String(None);
2783            let mut extensions = ColumnExtensions::default();
2784            let result = ParserContext::parse_column_extensions(
2785                &mut parser,
2786                &name,
2787                &data_type,
2788                &mut extensions,
2789            );
2790            assert!(result.unwrap());
2791            assert!(extensions.fulltext_index_options.is_some());
2792            let fulltext_options = extensions.fulltext_index_options.unwrap();
2793            assert_eq!(fulltext_options.get("analyzer"), Some("English"));
2794            assert_eq!(fulltext_options.get("case_sensitive"), Some("true"));
2795        }
2796
2797        // Test fulltext index with invalid type (should fail)
2798        {
2799            let sql = "FULLTEXT INDEX WITH (analyzer = 'English')";
2800            let dialect = GenericDialect {};
2801            let mut tokenizer = Tokenizer::new(&dialect, sql);
2802            let tokens = tokenizer.tokenize().unwrap();
2803            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2804            let name = Ident::new("num_col");
2805            let data_type = DataType::Int(None); // Non-string type
2806            let mut extensions = ColumnExtensions::default();
2807            let result = ParserContext::parse_column_extensions(
2808                &mut parser,
2809                &name,
2810                &data_type,
2811                &mut extensions,
2812            );
2813            assert!(result.is_err());
2814            assert!(
2815                result
2816                    .unwrap_err()
2817                    .to_string()
2818                    .contains("FULLTEXT index only supports string type")
2819            );
2820        }
2821
2822        // Test fulltext index with invalid option (won't fail, the parser doesn't check the option's content)
2823        {
2824            let sql = "FULLTEXT INDEX WITH (analyzer = 'Invalid', case_sensitive = 'true')";
2825            let dialect = GenericDialect {};
2826            let mut tokenizer = Tokenizer::new(&dialect, sql);
2827            let tokens = tokenizer.tokenize().unwrap();
2828            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2829            let name = Ident::new("text_col");
2830            let data_type = DataType::String(None);
2831            let mut extensions = ColumnExtensions::default();
2832            let result = ParserContext::parse_column_extensions(
2833                &mut parser,
2834                &name,
2835                &data_type,
2836                &mut extensions,
2837            );
2838            assert!(result.unwrap());
2839        }
2840
2841        // Test inverted index
2842        {
2843            let sql = "INVERTED INDEX";
2844            let dialect = GenericDialect {};
2845            let mut tokenizer = Tokenizer::new(&dialect, sql);
2846            let tokens = tokenizer.tokenize().unwrap();
2847            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2848            let name = Ident::new("col");
2849            let data_type = DataType::String(None);
2850            let mut extensions = ColumnExtensions::default();
2851            let result = ParserContext::parse_column_extensions(
2852                &mut parser,
2853                &name,
2854                &data_type,
2855                &mut extensions,
2856            );
2857            assert!(result.is_ok());
2858            assert!(extensions.inverted_index_options.is_some());
2859        }
2860
2861        // Test inverted index with options (should fail)
2862        {
2863            let sql = "INVERTED INDEX WITH (analyzer = 'English')";
2864            let dialect = GenericDialect {};
2865            let mut tokenizer = Tokenizer::new(&dialect, sql);
2866            let tokens = tokenizer.tokenize().unwrap();
2867            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2868            let name = Ident::new("col");
2869            let data_type = DataType::String(None);
2870            let mut extensions = ColumnExtensions::default();
2871            let result = ParserContext::parse_column_extensions(
2872                &mut parser,
2873                &name,
2874                &data_type,
2875                &mut extensions,
2876            );
2877            assert!(result.is_err());
2878            assert!(
2879                result
2880                    .unwrap_err()
2881                    .to_string()
2882                    .contains("INVERTED index doesn't support options")
2883            );
2884        }
2885
2886        // Test multiple indices
2887        {
2888            let sql = "SKIPPING INDEX FULLTEXT INDEX";
2889            let dialect = GenericDialect {};
2890            let mut tokenizer = Tokenizer::new(&dialect, sql);
2891            let tokens = tokenizer.tokenize().unwrap();
2892            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2893            let name = Ident::new("col");
2894            let data_type = DataType::String(None);
2895            let mut extensions = ColumnExtensions::default();
2896            let result = ParserContext::parse_column_extensions(
2897                &mut parser,
2898                &name,
2899                &data_type,
2900                &mut extensions,
2901            );
2902            assert!(result.unwrap());
2903            assert!(extensions.skipping_index_options.is_some());
2904            assert!(extensions.fulltext_index_options.is_some());
2905        }
2906    }
2907
2908    #[test]
2909    fn test_parse_interval_cast() {
2910        let s = "select '10s'::INTERVAL";
2911        let stmts =
2912            ParserContext::create_with_dialect(s, &GreptimeDbDialect {}, ParseOptions::default())
2913                .unwrap();
2914        assert_eq!("SELECT '10 seconds'::INTERVAL", &stmts[0].to_string());
2915    }
2916}