sql/parsers/
create_parser.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::HashMap;
19
20use arrow_buffer::IntervalMonthDayNano;
21use common_catalog::consts::default_engine;
22use datafusion_common::ScalarValue;
23use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
24use datatypes::data_type::ConcreteDataType;
25use itertools::Itertools;
26use snafu::{OptionExt, ResultExt, ensure};
27use sqlparser::ast::{ColumnOption, ColumnOptionDef, DataType, Expr};
28use sqlparser::dialect::keywords::Keyword;
29use sqlparser::keywords::ALL_KEYWORDS;
30use sqlparser::parser::IsOptional::Mandatory;
31use sqlparser::parser::{Parser, ParserError};
32use sqlparser::tokenizer::{Token, TokenWithSpan, Word};
33use table::requests::{validate_database_option, validate_table_option};
34
35use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
36use crate::error::{
37    self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidIntervalSnafu,
38    InvalidSqlSnafu, InvalidTableOptionSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result,
39    SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu,
40};
41use crate::parser::{FLOW, ParserContext};
42use crate::parsers::tql_parser;
43use crate::parsers::utils::{
44    self, validate_column_fulltext_create_option, validate_column_skipping_index_create_option,
45};
46use crate::statements::create::{
47    Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
48    CreateTableLike, CreateView, Partitions, SqlOrTql, TableConstraint, VECTOR_OPT_DIM,
49};
50use crate::statements::statement::Statement;
51use crate::statements::transform::type_alias::get_data_type_by_alias_name;
52use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
53use crate::util::{location_to_index, parse_option_string};
54
55pub const ENGINE: &str = "ENGINE";
56pub const MAXVALUE: &str = "MAXVALUE";
57pub const SINK: &str = "SINK";
58pub const EXPIRE: &str = "EXPIRE";
59pub const AFTER: &str = "AFTER";
60pub const INVERTED: &str = "INVERTED";
61pub const SKIPPING: &str = "SKIPPING";
62
63pub type RawIntervalExpr = String;
64
65/// Parses create [table] statement
66impl<'a> ParserContext<'a> {
67    pub(crate) fn parse_create(&mut self) -> Result<Statement> {
68        match self.parser.peek_token().token {
69            Token::Word(w) => match w.keyword {
70                Keyword::TABLE => self.parse_create_table(),
71
72                Keyword::SCHEMA | Keyword::DATABASE => self.parse_create_database(),
73
74                Keyword::EXTERNAL => self.parse_create_external_table(),
75
76                Keyword::OR => {
77                    let _ = self.parser.next_token();
78                    self.parser
79                        .expect_keyword(Keyword::REPLACE)
80                        .context(SyntaxSnafu)?;
81                    match self.parser.next_token().token {
82                        Token::Word(w) => match w.keyword {
83                            Keyword::VIEW => self.parse_create_view(true),
84                            Keyword::NoKeyword => {
85                                let uppercase = w.value.to_uppercase();
86                                match uppercase.as_str() {
87                                    FLOW => self.parse_create_flow(true),
88                                    _ => self.unsupported(w.to_string()),
89                                }
90                            }
91                            _ => self.unsupported(w.to_string()),
92                        },
93                        _ => self.unsupported(w.to_string()),
94                    }
95                }
96
97                Keyword::VIEW => {
98                    let _ = self.parser.next_token();
99                    self.parse_create_view(false)
100                }
101
102                #[cfg(feature = "enterprise")]
103                Keyword::TRIGGER => {
104                    let _ = self.parser.next_token();
105                    self.parse_create_trigger()
106                }
107
108                Keyword::NoKeyword => {
109                    let _ = self.parser.next_token();
110                    let uppercase = w.value.to_uppercase();
111                    match uppercase.as_str() {
112                        FLOW => self.parse_create_flow(false),
113                        _ => self.unsupported(w.to_string()),
114                    }
115                }
116                _ => self.unsupported(w.to_string()),
117            },
118            unexpected => self.unsupported(unexpected.to_string()),
119        }
120    }
121
122    /// Parse `CREAVE VIEW` statement.
123    fn parse_create_view(&mut self, or_replace: bool) -> Result<Statement> {
124        let if_not_exists = self.parse_if_not_exist()?;
125        let view_name = self.intern_parse_table_name()?;
126
127        let columns = self.parse_view_columns()?;
128
129        self.parser
130            .expect_keyword(Keyword::AS)
131            .context(SyntaxSnafu)?;
132
133        let query = self.parse_query()?;
134
135        Ok(Statement::CreateView(CreateView {
136            name: view_name,
137            columns,
138            or_replace,
139            query: Box::new(query),
140            if_not_exists,
141        }))
142    }
143
144    fn parse_view_columns(&mut self) -> Result<Vec<Ident>> {
145        let mut columns = vec![];
146        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
147            return Ok(columns);
148        }
149
150        loop {
151            let name = self.parse_column_name().context(SyntaxSnafu)?;
152
153            columns.push(name);
154
155            let comma = self.parser.consume_token(&Token::Comma);
156            if self.parser.consume_token(&Token::RParen) {
157                // allow a trailing comma, even though it's not in standard
158                break;
159            } else if !comma {
160                return self.expected("',' or ')' after column name", self.parser.peek_token());
161            }
162        }
163
164        Ok(columns)
165    }
166
167    fn parse_create_external_table(&mut self) -> Result<Statement> {
168        let _ = self.parser.next_token();
169        self.parser
170            .expect_keyword(Keyword::TABLE)
171            .context(SyntaxSnafu)?;
172        let if_not_exists = self.parse_if_not_exist()?;
173        let table_name = self.intern_parse_table_name()?;
174        let (columns, constraints) = self.parse_columns()?;
175        if !columns.is_empty() {
176            validate_time_index(&columns, &constraints)?;
177        }
178
179        let engine = self.parse_table_engine(common_catalog::consts::FILE_ENGINE)?;
180        let options = self.parse_create_table_options()?;
181        Ok(Statement::CreateExternalTable(CreateExternalTable {
182            name: table_name,
183            columns,
184            constraints,
185            options,
186            if_not_exists,
187            engine,
188        }))
189    }
190
191    fn parse_create_database(&mut self) -> Result<Statement> {
192        let _ = self.parser.next_token();
193        let if_not_exists = self.parse_if_not_exist()?;
194        let database_name = self.parse_object_name().context(error::UnexpectedSnafu {
195            expected: "a database name",
196            actual: self.peek_token_as_string(),
197        })?;
198        let database_name = Self::canonicalize_object_name(database_name);
199
200        let options = self
201            .parser
202            .parse_options(Keyword::WITH)
203            .context(SyntaxSnafu)?
204            .into_iter()
205            .map(parse_option_string)
206            .collect::<Result<HashMap<String, String>>>()?;
207
208        for key in options.keys() {
209            ensure!(
210                validate_database_option(key),
211                InvalidDatabaseOptionSnafu {
212                    key: key.to_string()
213                }
214            );
215        }
216        if let Some(append_mode) = options.get("append_mode")
217            && append_mode == "true"
218            && options.contains_key("merge_mode")
219        {
220            return InvalidDatabaseOptionSnafu {
221                key: "merge_mode".to_string(),
222            }
223            .fail();
224        }
225
226        Ok(Statement::CreateDatabase(CreateDatabase {
227            name: database_name,
228            if_not_exists,
229            options: options.into(),
230        }))
231    }
232
233    fn parse_create_table(&mut self) -> Result<Statement> {
234        let _ = self.parser.next_token();
235
236        let if_not_exists = self.parse_if_not_exist()?;
237
238        let table_name = self.intern_parse_table_name()?;
239
240        if self.parser.parse_keyword(Keyword::LIKE) {
241            let source_name = self.intern_parse_table_name()?;
242
243            return Ok(Statement::CreateTableLike(CreateTableLike {
244                table_name,
245                source_name,
246            }));
247        }
248
249        let (columns, constraints) = self.parse_columns()?;
250        validate_time_index(&columns, &constraints)?;
251
252        let partitions = self.parse_partitions()?;
253        if let Some(partitions) = &partitions {
254            validate_partitions(&columns, partitions)?;
255        }
256
257        let engine = self.parse_table_engine(default_engine())?;
258        let options = self.parse_create_table_options()?;
259        let create_table = CreateTable {
260            if_not_exists,
261            name: table_name,
262            columns,
263            engine,
264            constraints,
265            options,
266            table_id: 0, // table id is assigned by catalog manager
267            partitions,
268        };
269
270        Ok(Statement::CreateTable(create_table))
271    }
272
273    /// "CREATE FLOW" clause
274    fn parse_create_flow(&mut self, or_replace: bool) -> Result<Statement> {
275        let if_not_exists = self.parse_if_not_exist()?;
276
277        let flow_name = self.intern_parse_table_name()?;
278
279        // make `SINK` case in-sensitive
280        if let Token::Word(word) = self.parser.peek_token().token
281            && word.value.eq_ignore_ascii_case(SINK)
282        {
283            self.parser.next_token();
284        } else {
285            Err(ParserError::ParserError(
286                "Expect `SINK` keyword".to_string(),
287            ))
288            .context(SyntaxSnafu)?
289        }
290        self.parser
291            .expect_keyword(Keyword::TO)
292            .context(SyntaxSnafu)?;
293
294        let output_table_name = self.intern_parse_table_name()?;
295
296        let expire_after = if self
297            .parser
298            .consume_tokens(&[Token::make_keyword(EXPIRE), Token::make_keyword(AFTER)])
299        {
300            Some(self.parse_interval_no_month("EXPIRE AFTER")?)
301        } else {
302            None
303        };
304
305        let eval_interval = if self
306            .parser
307            .consume_tokens(&[Token::make_keyword("EVAL"), Token::make_keyword("INTERVAL")])
308        {
309            Some(self.parse_interval_no_month("EVAL INTERVAL")?)
310        } else {
311            None
312        };
313
314        let comment = if self.parser.parse_keyword(Keyword::COMMENT) {
315            match self.parser.next_token() {
316                TokenWithSpan {
317                    token: Token::SingleQuotedString(value, ..),
318                    ..
319                } => Some(value),
320                unexpected => {
321                    return self
322                        .parser
323                        .expected("string", unexpected)
324                        .context(SyntaxSnafu);
325                }
326            }
327        } else {
328            None
329        };
330
331        self.parser
332            .expect_keyword(Keyword::AS)
333            .context(SyntaxSnafu)?;
334
335        let query = Box::new(self.parse_sql_or_tql(true)?);
336
337        Ok(Statement::CreateFlow(CreateFlow {
338            flow_name,
339            sink_table_name: output_table_name,
340            or_replace,
341            if_not_exists,
342            expire_after,
343            eval_interval,
344            comment,
345            query,
346        }))
347    }
348
349    fn parse_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
350        let start_loc = self.parser.peek_token().span.start;
351        let start_index = location_to_index(self.sql, &start_loc);
352
353        // only accept sql or tql
354        let query = match self.parser.peek_token().token {
355            Token::Word(w) => match w.keyword {
356                Keyword::SELECT => self.parse_query(),
357                Keyword::NoKeyword
358                    if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
359                {
360                    self.parse_tql(require_now_expr)
361                }
362
363                _ => self.unsupported(self.peek_token_as_string()),
364            },
365            _ => self.unsupported(self.peek_token_as_string()),
366        }?;
367
368        let end_token = self.parser.peek_token();
369
370        let raw_query = if end_token == Token::EOF {
371            &self.sql[start_index..]
372        } else {
373            let end_loc = end_token.span.end;
374            let end_index = location_to_index(self.sql, &end_loc);
375            &self.sql[start_index..end_index.min(self.sql.len())]
376        };
377        let raw_query = raw_query.trim_end_matches(";");
378        let query = SqlOrTql::try_from_statement(query, raw_query)?;
379        Ok(query)
380    }
381
382    /// Parse the interval expr to duration in seconds.
383    fn parse_interval_no_month(&mut self, context: &str) -> Result<i64> {
384        let interval = self.parse_interval_month_day_nano()?.0;
385        if interval.months != 0 {
386            return InvalidIntervalSnafu {
387                reason: format!("Interval with months is not allowed in {context}"),
388            }
389            .fail();
390        }
391        Ok(
392            interval.nanoseconds / 1_000_000_000
393                + interval.days as i64 * 60 * 60 * 24
394                + interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, // 1 month=365.25/12=30.44 days
395                                                                       // this is to keep the same as https://docs.rs/humantime/latest/humantime/fn.parse_duration.html
396                                                                       // which we use in database to parse i.e. ttl interval and many other intervals
397        )
398    }
399
400    /// Parse interval expr to [`IntervalMonthDayNano`].
401    fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> {
402        let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
403        let raw_interval_expr = interval_expr.to_string();
404        let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone(), false)?
405            .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
406            .ok()
407            .with_context(|| InvalidIntervalSnafu {
408                reason: format!("cannot cast {} to interval type", interval_expr),
409            })?;
410        if let ScalarValue::IntervalMonthDayNano(Some(interval)) = interval {
411            Ok((interval, raw_interval_expr))
412        } else {
413            unreachable!()
414        }
415    }
416
417    fn parse_if_not_exist(&mut self) -> Result<bool> {
418        match self.parser.peek_token().token {
419            Token::Word(w) if Keyword::IF != w.keyword => return Ok(false),
420            _ => {}
421        }
422
423        if self.parser.parse_keywords(&[Keyword::IF, Keyword::NOT]) {
424            return self
425                .parser
426                .expect_keyword(Keyword::EXISTS)
427                .map(|_| true)
428                .context(UnexpectedSnafu {
429                    expected: "EXISTS",
430                    actual: self.peek_token_as_string(),
431                });
432        }
433
434        if self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]) {
435            return UnsupportedSnafu { keyword: "EXISTS" }.fail();
436        }
437
438        Ok(false)
439    }
440
441    fn parse_create_table_options(&mut self) -> Result<OptionMap> {
442        let options = self
443            .parser
444            .parse_options(Keyword::WITH)
445            .context(SyntaxSnafu)?
446            .into_iter()
447            .map(parse_option_string)
448            .collect::<Result<HashMap<String, String>>>()?;
449        for key in options.keys() {
450            ensure!(validate_table_option(key), InvalidTableOptionSnafu { key });
451        }
452        Ok(options.into())
453    }
454
455    /// "PARTITION ON COLUMNS (...)" clause
456    fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
457        if !self.parser.parse_keyword(Keyword::PARTITION) {
458            return Ok(None);
459        }
460        self.parser
461            .expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
462            .context(error::UnexpectedSnafu {
463                expected: "ON, COLUMNS",
464                actual: self.peek_token_as_string(),
465            })?;
466
467        let raw_column_list = self
468            .parser
469            .parse_parenthesized_column_list(Mandatory, false)
470            .context(error::SyntaxSnafu)?;
471        let column_list = raw_column_list
472            .into_iter()
473            .map(Self::canonicalize_identifier)
474            .collect();
475
476        let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
477
478        Ok(Some(Partitions { column_list, exprs }))
479    }
480
481    fn parse_partition_entry(&mut self) -> Result<Expr> {
482        self.parser.parse_expr().context(error::SyntaxSnafu)
483    }
484
485    /// Parse a comma-separated list wrapped by "()", and of which all items accepted by `F`
486    fn parse_comma_separated<T, F>(&mut self, mut f: F) -> Result<Vec<T>>
487    where
488        F: FnMut(&mut ParserContext<'a>) -> Result<T>,
489    {
490        self.parser
491            .expect_token(&Token::LParen)
492            .context(error::UnexpectedSnafu {
493                expected: "(",
494                actual: self.peek_token_as_string(),
495            })?;
496
497        let mut values = vec![];
498        while self.parser.peek_token() != Token::RParen {
499            values.push(f(self)?);
500            if !self.parser.consume_token(&Token::Comma) {
501                break;
502            }
503        }
504
505        self.parser
506            .expect_token(&Token::RParen)
507            .context(error::UnexpectedSnafu {
508                expected: ")",
509                actual: self.peek_token_as_string(),
510            })?;
511
512        Ok(values)
513    }
514
515    /// Parse the columns and constraints.
516    fn parse_columns(&mut self) -> Result<(Vec<Column>, Vec<TableConstraint>)> {
517        let mut columns = vec![];
518        let mut constraints = vec![];
519        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
520            return Ok((columns, constraints));
521        }
522
523        loop {
524            if let Some(constraint) = self.parse_optional_table_constraint()? {
525                constraints.push(constraint);
526            } else if let Token::Word(_) = self.parser.peek_token().token {
527                self.parse_column(&mut columns, &mut constraints)?;
528            } else {
529                return self.expected(
530                    "column name or constraint definition",
531                    self.parser.peek_token(),
532                );
533            }
534            let comma = self.parser.consume_token(&Token::Comma);
535            if self.parser.consume_token(&Token::RParen) {
536                // allow a trailing comma, even though it's not in standard
537                break;
538            } else if !comma {
539                return self.expected(
540                    "',' or ')' after column definition",
541                    self.parser.peek_token(),
542                );
543            }
544        }
545
546        Ok((columns, constraints))
547    }
548
549    fn parse_column(
550        &mut self,
551        columns: &mut Vec<Column>,
552        constraints: &mut Vec<TableConstraint>,
553    ) -> Result<()> {
554        let mut column = self.parse_column_def()?;
555
556        let mut time_index_opt_idx = None;
557        for (index, opt) in column.options().iter().enumerate() {
558            if let ColumnOption::DialectSpecific(tokens) = &opt.option
559                && matches!(
560                    &tokens[..],
561                    [
562                        Token::Word(Word {
563                            keyword: Keyword::TIME,
564                            ..
565                        }),
566                        Token::Word(Word {
567                            keyword: Keyword::INDEX,
568                            ..
569                        })
570                    ]
571                )
572            {
573                ensure!(
574                    time_index_opt_idx.is_none(),
575                    InvalidColumnOptionSnafu {
576                        name: column.name().to_string(),
577                        msg: "duplicated time index",
578                    }
579                );
580                time_index_opt_idx = Some(index);
581
582                let constraint = TableConstraint::TimeIndex {
583                    column: Ident::new(column.name().value.clone()),
584                };
585                constraints.push(constraint);
586            }
587        }
588
589        if let Some(index) = time_index_opt_idx {
590            ensure!(
591                !column.options().contains(&ColumnOptionDef {
592                    option: ColumnOption::Null,
593                    name: None,
594                }),
595                InvalidColumnOptionSnafu {
596                    name: column.name().to_string(),
597                    msg: "time index column can't be null",
598                }
599            );
600
601            // The timestamp type may be an alias type, we have to retrieve the actual type.
602            let data_type = get_unalias_type(column.data_type());
603            ensure!(
604                matches!(data_type, DataType::Timestamp(_, _)),
605                InvalidColumnOptionSnafu {
606                    name: column.name().to_string(),
607                    msg: "time index column data type should be timestamp",
608                }
609            );
610
611            let not_null_opt = ColumnOptionDef {
612                option: ColumnOption::NotNull,
613                name: None,
614            };
615
616            if !column.options().contains(&not_null_opt) {
617                column.mut_options().push(not_null_opt);
618            }
619
620            let _ = column.mut_options().remove(index);
621        }
622
623        columns.push(column);
624
625        Ok(())
626    }
627
628    /// Parse the column name and check if it's valid.
629    fn parse_column_name(&mut self) -> std::result::Result<Ident, ParserError> {
630        let name = self.parser.parse_identifier()?;
631        if name.quote_style.is_none() &&
632        // "ALL_KEYWORDS" are sorted.
633            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()
634        {
635            return Err(ParserError::ParserError(format!(
636                "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
637                &name.value
638            )));
639        }
640
641        Ok(name)
642    }
643
644    pub fn parse_column_def(&mut self) -> Result<Column> {
645        let name = self.parse_column_name().context(SyntaxSnafu)?;
646        let parser = &mut self.parser;
647
648        ensure!(
649            !(name.quote_style.is_none() &&
650            // "ALL_KEYWORDS" are sorted.
651            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()),
652            InvalidSqlSnafu {
653                msg: format!(
654                    "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
655                    &name.value
656                ),
657            }
658        );
659
660        let data_type = parser.parse_data_type().context(SyntaxSnafu)?;
661        let mut options = vec![];
662        let mut extensions = ColumnExtensions::default();
663        loop {
664            if parser.parse_keyword(Keyword::CONSTRAINT) {
665                let name = Some(parser.parse_identifier().context(SyntaxSnafu)?);
666                if let Some(option) = Self::parse_optional_column_option(parser)? {
667                    options.push(ColumnOptionDef { name, option });
668                } else {
669                    return parser
670                        .expected(
671                            "constraint details after CONSTRAINT <name>",
672                            parser.peek_token(),
673                        )
674                        .context(SyntaxSnafu);
675                }
676            } else if let Some(option) = Self::parse_optional_column_option(parser)? {
677                options.push(ColumnOptionDef { name: None, option });
678            } else if !Self::parse_column_extensions(parser, &name, &data_type, &mut extensions)? {
679                break;
680            };
681        }
682
683        Ok(Column {
684            column_def: ColumnDef {
685                name: Self::canonicalize_identifier(name),
686                data_type,
687                options,
688            },
689            extensions,
690        })
691    }
692
693    fn parse_optional_column_option(parser: &mut Parser<'_>) -> Result<Option<ColumnOption>> {
694        if parser.parse_keywords(&[Keyword::CHARACTER, Keyword::SET]) {
695            Ok(Some(ColumnOption::CharacterSet(
696                parser.parse_object_name(false).context(SyntaxSnafu)?,
697            )))
698        } else if parser.parse_keywords(&[Keyword::NOT, Keyword::NULL]) {
699            Ok(Some(ColumnOption::NotNull))
700        } else if parser.parse_keywords(&[Keyword::COMMENT]) {
701            match parser.next_token() {
702                TokenWithSpan {
703                    token: Token::SingleQuotedString(value, ..),
704                    ..
705                } => Ok(Some(ColumnOption::Comment(value))),
706                unexpected => parser.expected("string", unexpected).context(SyntaxSnafu),
707            }
708        } else if parser.parse_keyword(Keyword::NULL) {
709            Ok(Some(ColumnOption::Null))
710        } else if parser.parse_keyword(Keyword::DEFAULT) {
711            Ok(Some(ColumnOption::Default(
712                parser.parse_expr().context(SyntaxSnafu)?,
713            )))
714        } else if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
715            Ok(Some(ColumnOption::Unique {
716                is_primary: true,
717                characteristics: None,
718            }))
719        } else if parser.parse_keyword(Keyword::UNIQUE) {
720            Ok(Some(ColumnOption::Unique {
721                is_primary: false,
722                characteristics: None,
723            }))
724        } else if parser.parse_keywords(&[Keyword::TIME, Keyword::INDEX]) {
725            // Use a DialectSpecific option for time index
726            Ok(Some(ColumnOption::DialectSpecific(vec![
727                Token::Word(Word {
728                    value: "TIME".to_string(),
729                    quote_style: None,
730                    keyword: Keyword::TIME,
731                }),
732                Token::Word(Word {
733                    value: "INDEX".to_string(),
734                    quote_style: None,
735                    keyword: Keyword::INDEX,
736                }),
737            ])))
738        } else {
739            Ok(None)
740        }
741    }
742
743    /// Parse a column option extensions.
744    ///
745    /// This function will handle:
746    /// - Vector type
747    /// - Indexes
748    fn parse_column_extensions(
749        parser: &mut Parser<'_>,
750        column_name: &Ident,
751        column_type: &DataType,
752        column_extensions: &mut ColumnExtensions,
753    ) -> Result<bool> {
754        if let DataType::Custom(name, tokens) = column_type
755            && name.0.len() == 1
756            && &name.0[0].to_string_unquoted().to_uppercase() == "VECTOR"
757        {
758            ensure!(
759                tokens.len() == 1,
760                InvalidColumnOptionSnafu {
761                    name: column_name.to_string(),
762                    msg: "VECTOR type should have dimension",
763                }
764            );
765
766            let dimension =
767                tokens[0]
768                    .parse::<u32>()
769                    .ok()
770                    .with_context(|| InvalidColumnOptionSnafu {
771                        name: column_name.to_string(),
772                        msg: "dimension should be a positive integer",
773                    })?;
774
775            let options = OptionMap::from([(VECTOR_OPT_DIM.to_string(), dimension.to_string())]);
776            column_extensions.vector_options = Some(options);
777        }
778
779        // parse index options in column definition
780        let mut is_index_declared = false;
781
782        // skipping index
783        if let Token::Word(word) = parser.peek_token().token
784            && word.value.eq_ignore_ascii_case(SKIPPING)
785        {
786            parser.next_token();
787            // Consume `INDEX` keyword
788            ensure!(
789                parser.parse_keyword(Keyword::INDEX),
790                InvalidColumnOptionSnafu {
791                    name: column_name.to_string(),
792                    msg: "expect INDEX after SKIPPING keyword",
793                }
794            );
795            ensure!(
796                column_extensions.skipping_index_options.is_none(),
797                InvalidColumnOptionSnafu {
798                    name: column_name.to_string(),
799                    msg: "duplicated SKIPPING index option",
800                }
801            );
802
803            let options = parser
804                .parse_options(Keyword::WITH)
805                .context(error::SyntaxSnafu)?
806                .into_iter()
807                .map(parse_option_string)
808                .collect::<Result<HashMap<String, String>>>()?;
809
810            for key in options.keys() {
811                ensure!(
812                    validate_column_skipping_index_create_option(key),
813                    InvalidColumnOptionSnafu {
814                        name: column_name.to_string(),
815                        msg: format!("invalid SKIPPING INDEX option: {key}"),
816                    }
817                );
818            }
819
820            column_extensions.skipping_index_options = Some(options.into());
821            is_index_declared |= true;
822        }
823
824        // fulltext index
825        if parser.parse_keyword(Keyword::FULLTEXT) {
826            // Consume `INDEX` keyword
827            ensure!(
828                parser.parse_keyword(Keyword::INDEX),
829                InvalidColumnOptionSnafu {
830                    name: column_name.to_string(),
831                    msg: "expect INDEX after FULLTEXT keyword",
832                }
833            );
834
835            ensure!(
836                column_extensions.fulltext_index_options.is_none(),
837                InvalidColumnOptionSnafu {
838                    name: column_name.to_string(),
839                    msg: "duplicated FULLTEXT INDEX option",
840                }
841            );
842
843            let column_type = get_unalias_type(column_type);
844            let data_type = sql_data_type_to_concrete_data_type(&column_type)?;
845            ensure!(
846                data_type == ConcreteDataType::string_datatype(),
847                InvalidColumnOptionSnafu {
848                    name: column_name.to_string(),
849                    msg: "FULLTEXT index only supports string type",
850                }
851            );
852
853            let options = parser
854                .parse_options(Keyword::WITH)
855                .context(error::SyntaxSnafu)?
856                .into_iter()
857                .map(parse_option_string)
858                .collect::<Result<HashMap<String, String>>>()?;
859
860            for key in options.keys() {
861                ensure!(
862                    validate_column_fulltext_create_option(key),
863                    InvalidColumnOptionSnafu {
864                        name: column_name.to_string(),
865                        msg: format!("invalid FULLTEXT INDEX option: {key}"),
866                    }
867                );
868            }
869
870            column_extensions.fulltext_index_options = Some(options.into());
871            is_index_declared |= true;
872        }
873
874        // inverted index
875        if let Token::Word(word) = parser.peek_token().token
876            && word.value.eq_ignore_ascii_case(INVERTED)
877        {
878            parser.next_token();
879            // Consume `INDEX` keyword
880            ensure!(
881                parser.parse_keyword(Keyword::INDEX),
882                InvalidColumnOptionSnafu {
883                    name: column_name.to_string(),
884                    msg: "expect INDEX after INVERTED keyword",
885                }
886            );
887
888            ensure!(
889                column_extensions.inverted_index_options.is_none(),
890                InvalidColumnOptionSnafu {
891                    name: column_name.to_string(),
892                    msg: "duplicated INVERTED index option",
893                }
894            );
895
896            // inverted index doesn't have options, skipping `WITH`
897            // try cache `WITH` and throw error
898            let with_token = parser.peek_token();
899            ensure!(
900                with_token.token
901                    != Token::Word(Word {
902                        value: "WITH".to_string(),
903                        keyword: Keyword::WITH,
904                        quote_style: None,
905                    }),
906                InvalidColumnOptionSnafu {
907                    name: column_name.to_string(),
908                    msg: "INVERTED index doesn't support options",
909                }
910            );
911
912            column_extensions.inverted_index_options = Some(OptionMap::default());
913            is_index_declared |= true;
914        }
915
916        Ok(is_index_declared)
917    }
918
919    fn parse_optional_table_constraint(&mut self) -> Result<Option<TableConstraint>> {
920        match self.parser.next_token() {
921            TokenWithSpan {
922                token: Token::Word(w),
923                ..
924            } if w.keyword == Keyword::PRIMARY => {
925                self.parser
926                    .expect_keyword(Keyword::KEY)
927                    .context(error::UnexpectedSnafu {
928                        expected: "KEY",
929                        actual: self.peek_token_as_string(),
930                    })?;
931                let raw_columns = self
932                    .parser
933                    .parse_parenthesized_column_list(Mandatory, false)
934                    .context(error::SyntaxSnafu)?;
935                let columns = raw_columns
936                    .into_iter()
937                    .map(Self::canonicalize_identifier)
938                    .collect();
939                Ok(Some(TableConstraint::PrimaryKey { columns }))
940            }
941            TokenWithSpan {
942                token: Token::Word(w),
943                ..
944            } if w.keyword == Keyword::TIME => {
945                self.parser
946                    .expect_keyword(Keyword::INDEX)
947                    .context(error::UnexpectedSnafu {
948                        expected: "INDEX",
949                        actual: self.peek_token_as_string(),
950                    })?;
951
952                let raw_columns = self
953                    .parser
954                    .parse_parenthesized_column_list(Mandatory, false)
955                    .context(error::SyntaxSnafu)?;
956                let mut columns = raw_columns
957                    .into_iter()
958                    .map(Self::canonicalize_identifier)
959                    .collect::<Vec<_>>();
960
961                ensure!(
962                    columns.len() == 1,
963                    InvalidTimeIndexSnafu {
964                        msg: "it should contain only one column in time index",
965                    }
966                );
967
968                Ok(Some(TableConstraint::TimeIndex {
969                    column: columns.pop().unwrap(),
970                }))
971            }
972            _ => {
973                self.parser.prev_token();
974                Ok(None)
975            }
976        }
977    }
978
979    /// Parses the set of valid formats
980    fn parse_table_engine(&mut self, default: &str) -> Result<String> {
981        if !self.consume_token(ENGINE) {
982            return Ok(default.to_string());
983        }
984
985        self.parser
986            .expect_token(&Token::Eq)
987            .context(error::UnexpectedSnafu {
988                expected: "=",
989                actual: self.peek_token_as_string(),
990            })?;
991
992        let token = self.parser.next_token();
993        if let Token::Word(w) = token.token {
994            Ok(w.value)
995        } else {
996            self.expected("'Engine' is missing", token)
997        }
998    }
999}
1000
1001fn validate_time_index(columns: &[Column], constraints: &[TableConstraint]) -> Result<()> {
1002    let time_index_constraints: Vec<_> = constraints
1003        .iter()
1004        .filter_map(|c| match c {
1005            TableConstraint::TimeIndex { column } => Some(column),
1006            _ => None,
1007        })
1008        .unique()
1009        .collect();
1010
1011    ensure!(!time_index_constraints.is_empty(), MissingTimeIndexSnafu);
1012    ensure!(
1013        time_index_constraints.len() == 1,
1014        InvalidTimeIndexSnafu {
1015            msg: format!(
1016                "expected only one time index constraint but actual {}",
1017                time_index_constraints.len()
1018            ),
1019        }
1020    );
1021
1022    // It's safe to use time_index_constraints[0][0],
1023    // we already check the bound above.
1024    let time_index_column_ident = &time_index_constraints[0];
1025    let time_index_column = columns
1026        .iter()
1027        .find(|c| c.name().value == *time_index_column_ident.value)
1028        .with_context(|| InvalidTimeIndexSnafu {
1029            msg: format!(
1030                "time index column {} not found in columns",
1031                time_index_column_ident
1032            ),
1033        })?;
1034
1035    let time_index_data_type = get_unalias_type(time_index_column.data_type());
1036    ensure!(
1037        matches!(time_index_data_type, DataType::Timestamp(_, _)),
1038        InvalidColumnOptionSnafu {
1039            name: time_index_column.name().to_string(),
1040            msg: "time index column data type should be timestamp",
1041        }
1042    );
1043
1044    Ok(())
1045}
1046
1047fn get_unalias_type(data_type: &DataType) -> DataType {
1048    match data_type {
1049        DataType::Custom(name, tokens) if name.0.len() == 1 && tokens.is_empty() => {
1050            if let Some(real_type) =
1051                get_data_type_by_alias_name(name.0[0].to_string_unquoted().as_str())
1052            {
1053                real_type
1054            } else {
1055                data_type.clone()
1056            }
1057        }
1058        _ => data_type.clone(),
1059    }
1060}
1061
1062fn validate_partitions(columns: &[Column], partitions: &Partitions) -> Result<()> {
1063    let partition_columns = ensure_partition_columns_defined(columns, partitions)?;
1064
1065    ensure_exprs_are_binary(&partitions.exprs, &partition_columns)?;
1066
1067    Ok(())
1068}
1069
1070/// Ensure all exprs are binary expr and all the columns are defined in the column list.
1071fn ensure_exprs_are_binary(exprs: &[Expr], columns: &[&Column]) -> Result<()> {
1072    for expr in exprs {
1073        // The first level must be binary expr
1074        if let Expr::BinaryOp { left, op: _, right } = expr {
1075            ensure_one_expr(left, columns)?;
1076            ensure_one_expr(right, columns)?;
1077        } else {
1078            return error::InvalidSqlSnafu {
1079                msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1080            }
1081            .fail();
1082        }
1083    }
1084    Ok(())
1085}
1086
1087/// Check if the expr is a binary expr, an ident or a literal value.
1088/// If is ident, then check it is in the column list.
1089/// This recursive function is intended to be used by [ensure_exprs_are_binary].
1090fn ensure_one_expr(expr: &Expr, columns: &[&Column]) -> Result<()> {
1091    match expr {
1092        Expr::BinaryOp { left, op: _, right } => {
1093            ensure_one_expr(left, columns)?;
1094            ensure_one_expr(right, columns)?;
1095            Ok(())
1096        }
1097        Expr::Identifier(ident) => {
1098            let column_name = &ident.value;
1099            ensure!(
1100                columns.iter().any(|c| &c.name().value == column_name),
1101                error::InvalidSqlSnafu {
1102                    msg: format!(
1103                        "Column {:?} in rule expr is not referenced in PARTITION ON",
1104                        column_name
1105                    ),
1106                }
1107            );
1108            Ok(())
1109        }
1110        Expr::Value(_) => Ok(()),
1111        Expr::UnaryOp { expr, .. } => {
1112            ensure_one_expr(expr, columns)?;
1113            Ok(())
1114        }
1115        _ => error::InvalidSqlSnafu {
1116            msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1117        }
1118        .fail(),
1119    }
1120}
1121
1122/// Ensure that all columns used in "PARTITION ON COLUMNS" are defined in create table.
1123fn ensure_partition_columns_defined<'a>(
1124    columns: &'a [Column],
1125    partitions: &'a Partitions,
1126) -> Result<Vec<&'a Column>> {
1127    partitions
1128        .column_list
1129        .iter()
1130        .map(|x| {
1131            let x = ParserContext::canonicalize_identifier(x.clone());
1132            // Normally the columns in "create table" won't be too many,
1133            // a linear search to find the target every time is fine.
1134            columns
1135                .iter()
1136                .find(|c| *c.name().value == x.value)
1137                .context(error::InvalidSqlSnafu {
1138                    msg: format!("Partition column {:?} not defined", x.value),
1139                })
1140        })
1141        .collect::<Result<Vec<&Column>>>()
1142}
1143
1144#[cfg(test)]
1145mod tests {
1146    use std::assert_matches::assert_matches;
1147    use std::collections::HashMap;
1148
1149    use common_catalog::consts::FILE_ENGINE;
1150    use common_error::ext::ErrorExt;
1151    use sqlparser::ast::ColumnOption::NotNull;
1152    use sqlparser::ast::{BinaryOperator, Expr, ObjectName, ObjectNamePart, Value};
1153    use sqlparser::dialect::GenericDialect;
1154    use sqlparser::tokenizer::Tokenizer;
1155
1156    use super::*;
1157    use crate::dialect::GreptimeDbDialect;
1158    use crate::parser::ParseOptions;
1159
1160    #[test]
1161    fn test_parse_create_table_like() {
1162        let sql = "CREATE TABLE t1 LIKE t2";
1163        let stmts =
1164            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1165                .unwrap();
1166
1167        assert_eq!(1, stmts.len());
1168        match &stmts[0] {
1169            Statement::CreateTableLike(c) => {
1170                assert_eq!(c.table_name.to_string(), "t1");
1171                assert_eq!(c.source_name.to_string(), "t2");
1172            }
1173            _ => unreachable!(),
1174        }
1175    }
1176
1177    #[test]
1178    fn test_validate_external_table_options() {
1179        let sql = "CREATE EXTERNAL TABLE city (
1180            host string,
1181            ts timestamp,
1182            cpu float64 default 0,
1183            memory float64,
1184            TIME INDEX (ts),
1185            PRIMARY KEY(ts, host)
1186        ) with(location='/var/data/city.csv',format='csv',foo='bar');";
1187
1188        let result =
1189            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1190        assert!(matches!(
1191            result,
1192            Err(error::Error::InvalidTableOption { .. })
1193        ));
1194    }
1195
1196    #[test]
1197    fn test_parse_create_external_table() {
1198        struct Test<'a> {
1199            sql: &'a str,
1200            expected_table_name: &'a str,
1201            expected_options: HashMap<String, String>,
1202            expected_engine: &'a str,
1203            expected_if_not_exist: bool,
1204        }
1205
1206        let tests = [
1207            Test {
1208                sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
1209                expected_table_name: "city",
1210                expected_options: HashMap::from([
1211                    ("location".to_string(), "/var/data/city.csv".to_string()),
1212                    ("format".to_string(), "csv".to_string()),
1213                ]),
1214                expected_engine: FILE_ENGINE,
1215                expected_if_not_exist: false,
1216            },
1217            Test {
1218                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');",
1219                expected_table_name: "city",
1220                expected_options: HashMap::from([
1221                    ("location".to_string(), "/var/data/city.csv".to_string()),
1222                    ("format".to_string(), "csv".to_string()),
1223                ]),
1224                expected_engine: "foo",
1225                expected_if_not_exist: true,
1226            },
1227            Test {
1228                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv','compaction.type'='bar');",
1229                expected_table_name: "city",
1230                expected_options: HashMap::from([
1231                    ("location".to_string(), "/var/data/city.csv".to_string()),
1232                    ("format".to_string(), "csv".to_string()),
1233                    ("compaction.type".to_string(), "bar".to_string()),
1234                ]),
1235                expected_engine: "foo",
1236                expected_if_not_exist: true,
1237            },
1238        ];
1239
1240        for test in tests {
1241            let stmts = ParserContext::create_with_dialect(
1242                test.sql,
1243                &GreptimeDbDialect {},
1244                ParseOptions::default(),
1245            )
1246            .unwrap();
1247            assert_eq!(1, stmts.len());
1248            match &stmts[0] {
1249                Statement::CreateExternalTable(c) => {
1250                    assert_eq!(c.name.to_string(), test.expected_table_name.to_string());
1251                    assert_eq!(c.options, test.expected_options.into());
1252                    assert_eq!(c.if_not_exists, test.expected_if_not_exist);
1253                    assert_eq!(c.engine, test.expected_engine);
1254                }
1255                _ => unreachable!(),
1256            }
1257        }
1258    }
1259
1260    #[test]
1261    fn test_parse_create_external_table_with_schema() {
1262        let sql = "CREATE EXTERNAL TABLE city (
1263            host string,
1264            ts timestamp,
1265            cpu float32 default 0,
1266            memory float64,
1267            TIME INDEX (ts),
1268            PRIMARY KEY(ts, host),
1269        ) with(location='/var/data/city.csv',format='csv');";
1270
1271        let options = HashMap::from([
1272            ("location".to_string(), "/var/data/city.csv".to_string()),
1273            ("format".to_string(), "csv".to_string()),
1274        ]);
1275
1276        let stmts =
1277            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1278                .unwrap();
1279        assert_eq!(1, stmts.len());
1280        match &stmts[0] {
1281            Statement::CreateExternalTable(c) => {
1282                assert_eq!(c.name.to_string(), "city");
1283                assert_eq!(c.options, options.into());
1284
1285                let columns = &c.columns;
1286                assert_column_def(&columns[0].column_def, "host", "STRING");
1287                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
1288                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
1289                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
1290
1291                let constraints = &c.constraints;
1292                assert_eq!(
1293                    &constraints[0],
1294                    &TableConstraint::TimeIndex {
1295                        column: Ident::new("ts"),
1296                    }
1297                );
1298                assert_eq!(
1299                    &constraints[1],
1300                    &TableConstraint::PrimaryKey {
1301                        columns: vec![Ident::new("ts"), Ident::new("host")]
1302                    }
1303                );
1304            }
1305            _ => unreachable!(),
1306        }
1307    }
1308
1309    #[test]
1310    fn test_parse_create_database() {
1311        let sql = "create database";
1312        let result =
1313            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1314        assert!(
1315            result
1316                .unwrap_err()
1317                .to_string()
1318                .contains("Unexpected token while parsing SQL statement")
1319        );
1320
1321        let sql = "create database prometheus";
1322        let stmts =
1323            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1324                .unwrap();
1325
1326        assert_eq!(1, stmts.len());
1327        match &stmts[0] {
1328            Statement::CreateDatabase(c) => {
1329                assert_eq!(c.name.to_string(), "prometheus");
1330                assert!(!c.if_not_exists);
1331            }
1332            _ => unreachable!(),
1333        }
1334
1335        let sql = "create database if not exists prometheus";
1336        let stmts =
1337            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1338                .unwrap();
1339
1340        assert_eq!(1, stmts.len());
1341        match &stmts[0] {
1342            Statement::CreateDatabase(c) => {
1343                assert_eq!(c.name.to_string(), "prometheus");
1344                assert!(c.if_not_exists);
1345            }
1346            _ => unreachable!(),
1347        }
1348
1349        let sql = "CREATE DATABASE `fOo`";
1350        let result =
1351            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1352        let stmts = result.unwrap();
1353        match &stmts.last().unwrap() {
1354            Statement::CreateDatabase(c) => {
1355                assert_eq!(c.name, vec![Ident::with_quote('`', "fOo")].into());
1356                assert!(!c.if_not_exists);
1357            }
1358            _ => unreachable!(),
1359        }
1360
1361        let sql = "CREATE DATABASE prometheus with (ttl='1h');";
1362        let result =
1363            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1364        let stmts = result.unwrap();
1365        match &stmts[0] {
1366            Statement::CreateDatabase(c) => {
1367                assert_eq!(c.name.to_string(), "prometheus");
1368                assert!(!c.if_not_exists);
1369                assert_eq!(c.options.get("ttl").unwrap(), "1h");
1370            }
1371            _ => unreachable!(),
1372        }
1373    }
1374
1375    #[test]
1376    fn test_parse_create_flow_more_testcases() {
1377        use pretty_assertions::assert_eq;
1378        fn parse_create_flow(sql: &str) -> CreateFlow {
1379            let stmts = ParserContext::create_with_dialect(
1380                sql,
1381                &GreptimeDbDialect {},
1382                ParseOptions::default(),
1383            )
1384            .unwrap();
1385            assert_eq!(1, stmts.len());
1386            match &stmts[0] {
1387                Statement::CreateFlow(c) => c.clone(),
1388                _ => unreachable!(),
1389            }
1390        }
1391        struct CreateFlowWoutQuery {
1392            /// Flow name
1393            pub flow_name: ObjectName,
1394            /// Output (sink) table name
1395            pub sink_table_name: ObjectName,
1396            /// Whether to replace existing task
1397            pub or_replace: bool,
1398            /// Create if not exist
1399            pub if_not_exists: bool,
1400            /// `EXPIRE AFTER`
1401            /// Duration in second as `i64`
1402            pub expire_after: Option<i64>,
1403            /// Comment string
1404            pub comment: Option<String>,
1405        }
1406        let testcases = vec![
1407            (
1408                r"
1409CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1410SINK TO schema_1.table_1
1411EXPIRE AFTER INTERVAL '5 minutes'
1412COMMENT 'test comment'
1413AS
1414SELECT max(c1), min(c2) FROM schema_2.table_2;",
1415                CreateFlowWoutQuery {
1416                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1417                    sink_table_name: ObjectName::from(vec![
1418                        Ident::new("schema_1"),
1419                        Ident::new("table_1"),
1420                    ]),
1421                    or_replace: true,
1422                    if_not_exists: true,
1423                    expire_after: Some(300),
1424                    comment: Some("test comment".to_string()),
1425                },
1426            ),
1427            (
1428                r"
1429CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1430SINK TO schema_1.table_1
1431EXPIRE AFTER INTERVAL '300 s'
1432COMMENT 'test comment'
1433AS
1434SELECT max(c1), min(c2) FROM schema_2.table_2;",
1435                CreateFlowWoutQuery {
1436                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1437                    sink_table_name: ObjectName::from(vec![
1438                        Ident::new("schema_1"),
1439                        Ident::new("table_1"),
1440                    ]),
1441                    or_replace: true,
1442                    if_not_exists: true,
1443                    expire_after: Some(300),
1444                    comment: Some("test comment".to_string()),
1445                },
1446            ),
1447            (
1448                r"
1449CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1450SINK TO schema_1.table_1
1451EXPIRE AFTER '5 minutes'
1452COMMENT 'test comment'
1453AS
1454SELECT max(c1), min(c2) FROM schema_2.table_2;",
1455                CreateFlowWoutQuery {
1456                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1457                    sink_table_name: ObjectName::from(vec![
1458                        Ident::new("schema_1"),
1459                        Ident::new("table_1"),
1460                    ]),
1461                    or_replace: true,
1462                    if_not_exists: true,
1463                    expire_after: Some(300),
1464                    comment: Some("test comment".to_string()),
1465                },
1466            ),
1467            (
1468                r"
1469CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1470SINK TO schema_1.table_1
1471EXPIRE AFTER '300 s'
1472COMMENT 'test comment'
1473AS
1474SELECT max(c1), min(c2) FROM schema_2.table_2;",
1475                CreateFlowWoutQuery {
1476                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1477                    sink_table_name: ObjectName::from(vec![
1478                        Ident::new("schema_1"),
1479                        Ident::new("table_1"),
1480                    ]),
1481                    or_replace: true,
1482                    if_not_exists: true,
1483                    expire_after: Some(300),
1484                    comment: Some("test comment".to_string()),
1485                },
1486            ),
1487            (
1488                r"
1489CREATE FLOW `task_2`
1490SINK TO schema_1.table_1
1491EXPIRE AFTER '2 days 1h 2 min'
1492AS
1493SELECT max(c1), min(c2) FROM schema_2.table_2;",
1494                CreateFlowWoutQuery {
1495                    flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_2")]),
1496                    sink_table_name: ObjectName::from(vec![
1497                        Ident::new("schema_1"),
1498                        Ident::new("table_1"),
1499                    ]),
1500                    or_replace: false,
1501                    if_not_exists: false,
1502                    expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1503                    comment: None,
1504                },
1505            ),
1506        ];
1507
1508        for (sql, expected) in testcases {
1509            let create_task = parse_create_flow(sql);
1510
1511            let expected = CreateFlow {
1512                flow_name: expected.flow_name,
1513                sink_table_name: expected.sink_table_name,
1514                or_replace: expected.or_replace,
1515                if_not_exists: expected.if_not_exists,
1516                expire_after: expected.expire_after,
1517                eval_interval: None,
1518                comment: expected.comment,
1519                // ignore query parse result
1520                query: create_task.query.clone(),
1521            };
1522
1523            assert_eq!(create_task, expected, "input sql is:\n{sql}");
1524            let show_create = create_task.to_string();
1525            let recreated = parse_create_flow(&show_create);
1526            assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1527        }
1528    }
1529
1530    #[test]
1531    fn test_parse_create_flow() {
1532        use pretty_assertions::assert_eq;
1533        fn parse_create_flow(sql: &str) -> CreateFlow {
1534            let stmts = ParserContext::create_with_dialect(
1535                sql,
1536                &GreptimeDbDialect {},
1537                ParseOptions::default(),
1538            )
1539            .unwrap();
1540            assert_eq!(1, stmts.len());
1541            match &stmts[0] {
1542                Statement::CreateFlow(c) => c.clone(),
1543                _ => panic!("{:?}", stmts[0]),
1544            }
1545        }
1546        struct CreateFlowWoutQuery {
1547            /// Flow name
1548            pub flow_name: ObjectName,
1549            /// Output (sink) table name
1550            pub sink_table_name: ObjectName,
1551            /// Whether to replace existing task
1552            pub or_replace: bool,
1553            /// Create if not exist
1554            pub if_not_exists: bool,
1555            /// `EXPIRE AFTER`
1556            /// Duration in second as `i64`
1557            pub expire_after: Option<i64>,
1558            /// Duration for flow evaluation interval
1559            /// Duration in seconds as `i64`
1560            /// If not set, flow will be evaluated based on time window size and other args.
1561            pub eval_interval: Option<i64>,
1562            /// Comment string
1563            pub comment: Option<String>,
1564        }
1565
1566        // create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE AFTER` and `COMMENT`
1567        let testcases = vec![
1568            (
1569                r"
1570CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1571SINK TO schema_1.table_1
1572EXPIRE AFTER INTERVAL '5 minutes'
1573COMMENT 'test comment'
1574AS
1575SELECT max(c1), min(c2) FROM schema_2.table_2;",
1576                CreateFlowWoutQuery {
1577                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1578                    sink_table_name: ObjectName(vec![
1579                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1580                        ObjectNamePart::Identifier(Ident::new("table_1")),
1581                    ]),
1582                    or_replace: true,
1583                    if_not_exists: true,
1584                    expire_after: Some(300),
1585                    eval_interval: None,
1586                    comment: Some("test comment".to_string()),
1587                },
1588            ),
1589            (
1590                r"
1591CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1592SINK TO schema_1.table_1
1593EXPIRE AFTER INTERVAL '300 s'
1594COMMENT 'test comment'
1595AS
1596SELECT max(c1), min(c2) FROM schema_2.table_2;",
1597                CreateFlowWoutQuery {
1598                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1599                    sink_table_name: ObjectName(vec![
1600                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1601                        ObjectNamePart::Identifier(Ident::new("table_1")),
1602                    ]),
1603                    or_replace: true,
1604                    if_not_exists: true,
1605                    expire_after: Some(300),
1606                    eval_interval: None,
1607                    comment: Some("test comment".to_string()),
1608                },
1609            ),
1610            (
1611                r"
1612CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1613SINK TO schema_1.table_1
1614EXPIRE AFTER '5 minutes'
1615EVAL INTERVAL '10 seconds'
1616COMMENT 'test comment'
1617AS
1618SELECT max(c1), min(c2) FROM schema_2.table_2;",
1619                CreateFlowWoutQuery {
1620                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1621                    sink_table_name: ObjectName(vec![
1622                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1623                        ObjectNamePart::Identifier(Ident::new("table_1")),
1624                    ]),
1625                    or_replace: true,
1626                    if_not_exists: true,
1627                    expire_after: Some(300),
1628                    eval_interval: Some(10),
1629                    comment: Some("test comment".to_string()),
1630                },
1631            ),
1632            (
1633                r"
1634CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1635SINK TO schema_1.table_1
1636EXPIRE AFTER '5 minutes'
1637EVAL INTERVAL INTERVAL '10 seconds'
1638COMMENT 'test comment'
1639AS
1640SELECT max(c1), min(c2) FROM schema_2.table_2;",
1641                CreateFlowWoutQuery {
1642                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1643                    sink_table_name: ObjectName(vec![
1644                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1645                        ObjectNamePart::Identifier(Ident::new("table_1")),
1646                    ]),
1647                    or_replace: true,
1648                    if_not_exists: true,
1649                    expire_after: Some(300),
1650                    eval_interval: Some(10),
1651                    comment: Some("test comment".to_string()),
1652                },
1653            ),
1654            (
1655                r"
1656CREATE FLOW `task_2`
1657SINK TO schema_1.table_1
1658EXPIRE AFTER '2 days 1h 2 min'
1659AS
1660SELECT max(c1), min(c2) FROM schema_2.table_2;",
1661                CreateFlowWoutQuery {
1662                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::with_quote(
1663                        '`', "task_2",
1664                    ))]),
1665                    sink_table_name: ObjectName(vec![
1666                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1667                        ObjectNamePart::Identifier(Ident::new("table_1")),
1668                    ]),
1669                    or_replace: false,
1670                    if_not_exists: false,
1671                    expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1672                    eval_interval: None,
1673                    comment: None,
1674                },
1675            ),
1676        ];
1677
1678        for (sql, expected) in testcases {
1679            let create_task = parse_create_flow(sql);
1680
1681            let expected = CreateFlow {
1682                flow_name: expected.flow_name,
1683                sink_table_name: expected.sink_table_name,
1684                or_replace: expected.or_replace,
1685                if_not_exists: expected.if_not_exists,
1686                expire_after: expected.expire_after,
1687                eval_interval: expected.eval_interval,
1688                comment: expected.comment,
1689                // ignore query parse result
1690                query: create_task.query.clone(),
1691            };
1692
1693            assert_eq!(create_task, expected, "input sql is:\n{sql}");
1694            let show_create = create_task.to_string();
1695            let recreated = parse_create_flow(&show_create);
1696            assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1697        }
1698    }
1699
1700    #[test]
1701    fn test_create_flow_no_month() {
1702        let sql = r"
1703CREATE FLOW `task_2`
1704SINK TO schema_1.table_1
1705EXPIRE AFTER '1 month 2 days 1h 2 min'
1706AS
1707SELECT max(c1), min(c2) FROM schema_2.table_2;";
1708        let stmts =
1709            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1710
1711        assert!(
1712            stmts.is_err()
1713                && stmts
1714                    .unwrap_err()
1715                    .to_string()
1716                    .contains("Interval with months is not allowed")
1717        );
1718    }
1719
1720    #[test]
1721    fn test_validate_create() {
1722        let sql = r"
1723CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp TIME INDEX)
1724PARTITION ON COLUMNS(c, a) (
1725    a < 10,
1726    a > 10 AND a < 20,
1727    a > 20 AND c < 100,
1728    a > 20 AND c >= 100
1729)
1730ENGINE=mito";
1731        let result =
1732            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1733        let _ = result.unwrap();
1734
1735        let sql = r"
1736CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
1737PARTITION ON COLUMNS(x) ()
1738ENGINE=mito";
1739        let result =
1740            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1741        assert!(
1742            result
1743                .unwrap_err()
1744                .to_string()
1745                .contains("Partition column \"x\" not defined")
1746        );
1747    }
1748
1749    #[test]
1750    fn test_parse_create_table_with_partitions() {
1751        let sql = r"
1752CREATE TABLE monitor (
1753  host_id    INT,
1754  idc        STRING,
1755  ts         TIMESTAMP,
1756  cpu        DOUBLE DEFAULT 0,
1757  memory     DOUBLE,
1758  TIME INDEX (ts),
1759  PRIMARY KEY (host),
1760)
1761PARTITION ON COLUMNS(idc, host_id) (
1762  idc <= 'hz' AND host_id < 1000,
1763  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
1764  idc > 'sh' AND host_id < 3000,
1765  idc > 'sh' AND host_id >= 3000,
1766)
1767ENGINE=mito";
1768        let result =
1769            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1770                .unwrap();
1771        assert_eq!(result.len(), 1);
1772        match &result[0] {
1773            Statement::CreateTable(c) => {
1774                assert!(c.partitions.is_some());
1775
1776                let partitions = c.partitions.as_ref().unwrap();
1777                let column_list = partitions
1778                    .column_list
1779                    .iter()
1780                    .map(|x| &x.value)
1781                    .collect::<Vec<&String>>();
1782                assert_eq!(column_list, vec!["idc", "host_id"]);
1783
1784                let exprs = &partitions.exprs;
1785
1786                assert_eq!(
1787                    exprs[0],
1788                    Expr::BinaryOp {
1789                        left: Box::new(Expr::BinaryOp {
1790                            left: Box::new(Expr::Identifier("idc".into())),
1791                            op: BinaryOperator::LtEq,
1792                            right: Box::new(Expr::Value(
1793                                Value::SingleQuotedString("hz".to_string()).into()
1794                            ))
1795                        }),
1796                        op: BinaryOperator::And,
1797                        right: Box::new(Expr::BinaryOp {
1798                            left: Box::new(Expr::Identifier("host_id".into())),
1799                            op: BinaryOperator::Lt,
1800                            right: Box::new(Expr::Value(
1801                                Value::Number("1000".to_string(), false).into()
1802                            ))
1803                        })
1804                    }
1805                );
1806                assert_eq!(
1807                    exprs[1],
1808                    Expr::BinaryOp {
1809                        left: Box::new(Expr::BinaryOp {
1810                            left: Box::new(Expr::BinaryOp {
1811                                left: Box::new(Expr::Identifier("idc".into())),
1812                                op: BinaryOperator::Gt,
1813                                right: Box::new(Expr::Value(
1814                                    Value::SingleQuotedString("hz".to_string()).into()
1815                                ))
1816                            }),
1817                            op: BinaryOperator::And,
1818                            right: Box::new(Expr::BinaryOp {
1819                                left: Box::new(Expr::Identifier("idc".into())),
1820                                op: BinaryOperator::LtEq,
1821                                right: Box::new(Expr::Value(
1822                                    Value::SingleQuotedString("sh".to_string()).into()
1823                                ))
1824                            })
1825                        }),
1826                        op: BinaryOperator::And,
1827                        right: Box::new(Expr::BinaryOp {
1828                            left: Box::new(Expr::Identifier("host_id".into())),
1829                            op: BinaryOperator::Lt,
1830                            right: Box::new(Expr::Value(
1831                                Value::Number("2000".to_string(), false).into()
1832                            ))
1833                        })
1834                    }
1835                );
1836                assert_eq!(
1837                    exprs[2],
1838                    Expr::BinaryOp {
1839                        left: Box::new(Expr::BinaryOp {
1840                            left: Box::new(Expr::Identifier("idc".into())),
1841                            op: BinaryOperator::Gt,
1842                            right: Box::new(Expr::Value(
1843                                Value::SingleQuotedString("sh".to_string()).into()
1844                            ))
1845                        }),
1846                        op: BinaryOperator::And,
1847                        right: Box::new(Expr::BinaryOp {
1848                            left: Box::new(Expr::Identifier("host_id".into())),
1849                            op: BinaryOperator::Lt,
1850                            right: Box::new(Expr::Value(
1851                                Value::Number("3000".to_string(), false).into()
1852                            ))
1853                        })
1854                    }
1855                );
1856                assert_eq!(
1857                    exprs[3],
1858                    Expr::BinaryOp {
1859                        left: Box::new(Expr::BinaryOp {
1860                            left: Box::new(Expr::Identifier("idc".into())),
1861                            op: BinaryOperator::Gt,
1862                            right: Box::new(Expr::Value(
1863                                Value::SingleQuotedString("sh".to_string()).into()
1864                            ))
1865                        }),
1866                        op: BinaryOperator::And,
1867                        right: Box::new(Expr::BinaryOp {
1868                            left: Box::new(Expr::Identifier("host_id".into())),
1869                            op: BinaryOperator::GtEq,
1870                            right: Box::new(Expr::Value(
1871                                Value::Number("3000".to_string(), false).into()
1872                            ))
1873                        })
1874                    }
1875                );
1876            }
1877            _ => unreachable!(),
1878        }
1879    }
1880
1881    #[test]
1882    fn test_parse_create_table_with_quoted_partitions() {
1883        let sql = r"
1884CREATE TABLE monitor (
1885  `host_id`    INT,
1886  idc        STRING,
1887  ts         TIMESTAMP,
1888  cpu        DOUBLE DEFAULT 0,
1889  memory     DOUBLE,
1890  TIME INDEX (ts),
1891  PRIMARY KEY (host),
1892)
1893PARTITION ON COLUMNS(IdC, host_id) (
1894  idc <= 'hz' AND host_id < 1000,
1895  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
1896  idc > 'sh' AND host_id < 3000,
1897  idc > 'sh' AND host_id >= 3000,
1898)";
1899        let result =
1900            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1901                .unwrap();
1902        assert_eq!(result.len(), 1);
1903    }
1904
1905    #[test]
1906    fn test_parse_create_table_with_timestamp_index() {
1907        let sql1 = r"
1908CREATE TABLE monitor (
1909  host_id    INT,
1910  idc        STRING,
1911  ts         TIMESTAMP TIME INDEX,
1912  cpu        DOUBLE DEFAULT 0,
1913  memory     DOUBLE,
1914  PRIMARY KEY (host),
1915)
1916ENGINE=mito";
1917        let result1 = ParserContext::create_with_dialect(
1918            sql1,
1919            &GreptimeDbDialect {},
1920            ParseOptions::default(),
1921        )
1922        .unwrap();
1923
1924        if let Statement::CreateTable(c) = &result1[0] {
1925            assert_eq!(c.constraints.len(), 2);
1926            let tc = c.constraints[0].clone();
1927            match tc {
1928                TableConstraint::TimeIndex { column } => {
1929                    assert_eq!(&column.value, "ts");
1930                }
1931                _ => panic!("should be time index constraint"),
1932            };
1933        } else {
1934            panic!("should be create_table statement");
1935        }
1936
1937        // `TIME INDEX` should be in front of `PRIMARY KEY`
1938        // in order to equal the `TIMESTAMP TIME INDEX` constraint options vector
1939        let sql2 = r"
1940CREATE TABLE monitor (
1941  host_id    INT,
1942  idc        STRING,
1943  ts         TIMESTAMP NOT NULL,
1944  cpu        DOUBLE DEFAULT 0,
1945  memory     DOUBLE,
1946  TIME INDEX (ts),
1947  PRIMARY KEY (host),
1948)
1949ENGINE=mito";
1950        let result2 = ParserContext::create_with_dialect(
1951            sql2,
1952            &GreptimeDbDialect {},
1953            ParseOptions::default(),
1954        )
1955        .unwrap();
1956
1957        assert_eq!(result1, result2);
1958
1959        // TIMESTAMP can be NULL which is not equal to above
1960        let sql3 = r"
1961CREATE TABLE monitor (
1962  host_id    INT,
1963  idc        STRING,
1964  ts         TIMESTAMP,
1965  cpu        DOUBLE DEFAULT 0,
1966  memory     DOUBLE,
1967  TIME INDEX (ts),
1968  PRIMARY KEY (host),
1969)
1970ENGINE=mito";
1971
1972        let result3 = ParserContext::create_with_dialect(
1973            sql3,
1974            &GreptimeDbDialect {},
1975            ParseOptions::default(),
1976        )
1977        .unwrap();
1978
1979        assert_ne!(result1, result3);
1980
1981        // BIGINT can't be time index any more
1982        let sql1 = r"
1983CREATE TABLE monitor (
1984  host_id    INT,
1985  idc        STRING,
1986  b          bigint TIME INDEX,
1987  cpu        DOUBLE DEFAULT 0,
1988  memory     DOUBLE,
1989  PRIMARY KEY (host),
1990)
1991ENGINE=mito";
1992        let result1 = ParserContext::create_with_dialect(
1993            sql1,
1994            &GreptimeDbDialect {},
1995            ParseOptions::default(),
1996        );
1997
1998        assert!(
1999            result1
2000                .unwrap_err()
2001                .to_string()
2002                .contains("time index column data type should be timestamp")
2003        );
2004    }
2005
2006    #[test]
2007    fn test_parse_create_table_with_timestamp_index_not_null() {
2008        let sql = r"
2009CREATE TABLE monitor (
2010  host_id    INT,
2011  idc        STRING,
2012  ts         TIMESTAMP TIME INDEX,
2013  cpu        DOUBLE DEFAULT 0,
2014  memory     DOUBLE,
2015  TIME INDEX (ts),
2016  PRIMARY KEY (host),
2017)
2018ENGINE=mito";
2019        let result =
2020            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2021                .unwrap();
2022
2023        assert_eq!(result.len(), 1);
2024        if let Statement::CreateTable(c) = &result[0] {
2025            let ts = c.columns[2].clone();
2026            assert_eq!(ts.name().to_string(), "ts");
2027            assert_eq!(ts.options()[0].option, NotNull);
2028        } else {
2029            panic!("should be create table statement");
2030        }
2031
2032        let sql1 = r"
2033CREATE TABLE monitor (
2034  host_id    INT,
2035  idc        STRING,
2036  ts         TIMESTAMP NOT NULL TIME INDEX,
2037  cpu        DOUBLE DEFAULT 0,
2038  memory     DOUBLE,
2039  TIME INDEX (ts),
2040  PRIMARY KEY (host),
2041)
2042ENGINE=mito";
2043
2044        let result1 = ParserContext::create_with_dialect(
2045            sql1,
2046            &GreptimeDbDialect {},
2047            ParseOptions::default(),
2048        )
2049        .unwrap();
2050        assert_eq!(result, result1);
2051
2052        let sql2 = r"
2053CREATE TABLE monitor (
2054  host_id    INT,
2055  idc        STRING,
2056  ts         TIMESTAMP TIME INDEX NOT NULL,
2057  cpu        DOUBLE DEFAULT 0,
2058  memory     DOUBLE,
2059  TIME INDEX (ts),
2060  PRIMARY KEY (host),
2061)
2062ENGINE=mito";
2063
2064        let result2 = ParserContext::create_with_dialect(
2065            sql2,
2066            &GreptimeDbDialect {},
2067            ParseOptions::default(),
2068        )
2069        .unwrap();
2070        assert_eq!(result, result2);
2071
2072        let sql3 = r"
2073CREATE TABLE monitor (
2074  host_id    INT,
2075  idc        STRING,
2076  ts         TIMESTAMP TIME INDEX NULL NOT,
2077  cpu        DOUBLE DEFAULT 0,
2078  memory     DOUBLE,
2079  TIME INDEX (ts),
2080  PRIMARY KEY (host),
2081)
2082ENGINE=mito";
2083
2084        let result3 = ParserContext::create_with_dialect(
2085            sql3,
2086            &GreptimeDbDialect {},
2087            ParseOptions::default(),
2088        );
2089        assert!(result3.is_err());
2090
2091        let sql4 = r"
2092CREATE TABLE monitor (
2093  host_id    INT,
2094  idc        STRING,
2095  ts         TIMESTAMP TIME INDEX NOT NULL NULL,
2096  cpu        DOUBLE DEFAULT 0,
2097  memory     DOUBLE,
2098  TIME INDEX (ts),
2099  PRIMARY KEY (host),
2100)
2101ENGINE=mito";
2102
2103        let result4 = ParserContext::create_with_dialect(
2104            sql4,
2105            &GreptimeDbDialect {},
2106            ParseOptions::default(),
2107        );
2108        assert!(result4.is_err());
2109
2110        let sql = r"
2111CREATE TABLE monitor (
2112  host_id    INT,
2113  idc        STRING,
2114  ts         TIMESTAMP TIME INDEX DEFAULT CURRENT_TIMESTAMP,
2115  cpu        DOUBLE DEFAULT 0,
2116  memory     DOUBLE,
2117  TIME INDEX (ts),
2118  PRIMARY KEY (host),
2119)
2120ENGINE=mito";
2121
2122        let result =
2123            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2124                .unwrap();
2125
2126        if let Statement::CreateTable(c) = &result[0] {
2127            let tc = c.constraints[0].clone();
2128            match tc {
2129                TableConstraint::TimeIndex { column } => {
2130                    assert_eq!(&column.value, "ts");
2131                }
2132                _ => panic!("should be time index constraint"),
2133            }
2134            let ts = c.columns[2].clone();
2135            assert_eq!(ts.name().to_string(), "ts");
2136            assert!(matches!(ts.options()[0].option, ColumnOption::Default(..)));
2137            assert_eq!(ts.options()[1].option, NotNull);
2138        } else {
2139            unreachable!("should be create table statement");
2140        }
2141    }
2142
2143    #[test]
2144    fn test_parse_partitions_with_error_syntax() {
2145        let sql = r"
2146CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2147PARTITION COLUMNS(c, a) (
2148    a < 10,
2149    a > 10 AND a < 20,
2150    a > 20 AND c < 100,
2151    a > 20 AND c >= 100
2152)
2153ENGINE=mito";
2154        let result =
2155            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2156        assert!(
2157            result
2158                .unwrap_err()
2159                .output_msg()
2160                .contains("sql parser error: Expected: ON, found: COLUMNS")
2161        );
2162    }
2163
2164    #[test]
2165    fn test_parse_partitions_without_rule() {
2166        let sql = r"
2167CREATE TABLE rcx ( a INT, b STRING, c INT, d TIMESTAMP TIME INDEX )
2168PARTITION ON COLUMNS(c, a) ()
2169ENGINE=mito";
2170        ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2171            .unwrap();
2172    }
2173
2174    #[test]
2175    fn test_parse_partitions_unreferenced_column() {
2176        let sql = r"
2177CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2178PARTITION ON COLUMNS(c, a) (
2179    b = 'foo'
2180)
2181ENGINE=mito";
2182        let result =
2183            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2184        assert_eq!(
2185            result.unwrap_err().output_msg(),
2186            "Invalid SQL, error: Column \"b\" in rule expr is not referenced in PARTITION ON"
2187        );
2188    }
2189
2190    #[test]
2191    fn test_parse_partitions_not_binary_expr() {
2192        let sql = r"
2193CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2194PARTITION ON COLUMNS(c, a) (
2195    b
2196)
2197ENGINE=mito";
2198        let result =
2199            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2200        assert_eq!(
2201            result.unwrap_err().output_msg(),
2202            r#"Invalid SQL, error: Partition rule expr Identifier(Ident { value: "b", quote_style: None, span: Span(Location(4,5)..Location(4,6)) }) is not a binary expr"#
2203        );
2204    }
2205
2206    fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {
2207        assert_eq!(column.name.to_string(), name);
2208        assert_eq!(column.data_type.to_string(), data_type);
2209    }
2210
2211    #[test]
2212    pub fn test_parse_create_table() {
2213        let sql = r"create table demo(
2214                             host string,
2215                             ts timestamp,
2216                             cpu float32 default 0,
2217                             memory float64,
2218                             TIME INDEX (ts),
2219                             PRIMARY KEY(ts, host),
2220                             ) engine=mito
2221                             with(ttl='10s');
2222         ";
2223        let result =
2224            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2225                .unwrap();
2226        assert_eq!(1, result.len());
2227        match &result[0] {
2228            Statement::CreateTable(c) => {
2229                assert!(!c.if_not_exists);
2230                assert_eq!("demo", c.name.to_string());
2231                assert_eq!("mito", c.engine);
2232                assert_eq!(4, c.columns.len());
2233                let columns = &c.columns;
2234                assert_column_def(&columns[0].column_def, "host", "STRING");
2235                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
2236                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
2237                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
2238
2239                let constraints = &c.constraints;
2240                assert_eq!(
2241                    &constraints[0],
2242                    &TableConstraint::TimeIndex {
2243                        column: Ident::new("ts"),
2244                    }
2245                );
2246                assert_eq!(
2247                    &constraints[1],
2248                    &TableConstraint::PrimaryKey {
2249                        columns: vec![Ident::new("ts"), Ident::new("host")]
2250                    }
2251                );
2252                // inverted index is merged into column options
2253                assert_eq!(1, c.options.len());
2254                assert_eq!(
2255                    [("ttl", "10s")].into_iter().collect::<HashMap<_, _>>(),
2256                    c.options.to_str_map()
2257                );
2258            }
2259            _ => unreachable!(),
2260        }
2261    }
2262
2263    #[test]
2264    fn test_invalid_index_keys() {
2265        let sql = r"create table demo(
2266                             host string,
2267                             ts int64,
2268                             cpu float64 default 0,
2269                             memory float64,
2270                             TIME INDEX (ts, host),
2271                             PRIMARY KEY(ts, host)) engine=mito;
2272         ";
2273        let result =
2274            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2275        assert!(result.is_err());
2276        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2277    }
2278
2279    #[test]
2280    fn test_duplicated_time_index() {
2281        let sql = r"create table demo(
2282                             host string,
2283                             ts timestamp time index,
2284                             t timestamp time index,
2285                             cpu float64 default 0,
2286                             memory float64,
2287                             TIME INDEX (ts, host),
2288                             PRIMARY KEY(ts, host)) engine=mito;
2289         ";
2290        let result =
2291            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2292        assert!(result.is_err());
2293        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2294
2295        let sql = r"create table demo(
2296                             host string,
2297                             ts timestamp time index,
2298                             cpu float64 default 0,
2299                             t timestamp,
2300                             memory float64,
2301                             TIME INDEX (t),
2302                             PRIMARY KEY(ts, host)) engine=mito;
2303         ";
2304        let result =
2305            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2306        assert!(result.is_err());
2307        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2308    }
2309
2310    #[test]
2311    fn test_invalid_column_name() {
2312        let sql = "create table foo(user string, i timestamp time index)";
2313        let result =
2314            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2315        let err = result.unwrap_err().output_msg();
2316        assert!(err.contains("Cannot use keyword 'user' as column name"));
2317
2318        // If column name is quoted, it's valid even same with keyword.
2319        let sql = r#"
2320            create table foo("user" string, i timestamp time index)
2321        "#;
2322        let result =
2323            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2324        let _ = result.unwrap();
2325    }
2326
2327    #[test]
2328    fn test_incorrect_default_value_issue_3479() {
2329        let sql = r#"CREATE TABLE `ExcePTuRi`(
2330non TIMESTAMP(6) TIME INDEX,
2331`iUSTO` DOUBLE DEFAULT 0.047318541668048164
2332)"#;
2333        let result =
2334            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2335                .unwrap();
2336        assert_eq!(1, result.len());
2337        match &result[0] {
2338            Statement::CreateTable(c) => {
2339                assert_eq!(
2340                    "`iUSTO` DOUBLE DEFAULT 0.047318541668048164",
2341                    c.columns[1].to_string()
2342                );
2343            }
2344            _ => unreachable!(),
2345        }
2346    }
2347
2348    #[test]
2349    fn test_parse_create_view() {
2350        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
2351
2352        let result =
2353            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2354                .unwrap();
2355        match &result[0] {
2356            Statement::CreateView(c) => {
2357                assert_eq!(c.to_string(), sql);
2358                assert!(!c.or_replace);
2359                assert!(!c.if_not_exists);
2360                assert_eq!("test", c.name.to_string());
2361            }
2362            _ => unreachable!(),
2363        }
2364
2365        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS";
2366
2367        let result =
2368            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2369                .unwrap();
2370        match &result[0] {
2371            Statement::CreateView(c) => {
2372                assert_eq!(c.to_string(), sql);
2373                assert!(c.or_replace);
2374                assert!(c.if_not_exists);
2375                assert_eq!("test", c.name.to_string());
2376            }
2377            _ => unreachable!(),
2378        }
2379    }
2380
2381    #[test]
2382    fn test_parse_create_view_invalid_query() {
2383        let sql = "CREATE VIEW test AS DELETE from demo";
2384        let result =
2385            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2386        assert!(result.is_err());
2387        assert_matches!(result, Err(crate::error::Error::Syntax { .. }));
2388    }
2389
2390    #[test]
2391    fn test_parse_create_table_fulltext_options() {
2392        let sql1 = r"
2393CREATE TABLE log (
2394    ts TIMESTAMP TIME INDEX,
2395    msg TEXT FULLTEXT INDEX,
2396)";
2397        let result1 = ParserContext::create_with_dialect(
2398            sql1,
2399            &GreptimeDbDialect {},
2400            ParseOptions::default(),
2401        )
2402        .unwrap();
2403
2404        if let Statement::CreateTable(c) = &result1[0] {
2405            c.columns.iter().for_each(|col| {
2406                if col.name().value == "msg" {
2407                    assert!(
2408                        col.extensions
2409                            .fulltext_index_options
2410                            .as_ref()
2411                            .unwrap()
2412                            .is_empty()
2413                    );
2414                }
2415            });
2416        } else {
2417            panic!("should be create_table statement");
2418        }
2419
2420        let sql2 = r"
2421CREATE TABLE log (
2422    ts TIMESTAMP TIME INDEX,
2423    msg STRING FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false')
2424)";
2425        let result2 = ParserContext::create_with_dialect(
2426            sql2,
2427            &GreptimeDbDialect {},
2428            ParseOptions::default(),
2429        )
2430        .unwrap();
2431
2432        if let Statement::CreateTable(c) = &result2[0] {
2433            c.columns.iter().for_each(|col| {
2434                if col.name().value == "msg" {
2435                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2436                    assert_eq!(options.len(), 2);
2437                    assert_eq!(options.get("analyzer").unwrap(), "English");
2438                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2439                }
2440            });
2441        } else {
2442            panic!("should be create_table statement");
2443        }
2444
2445        let sql3 = r"
2446CREATE TABLE log (
2447    ts TIMESTAMP TIME INDEX,
2448    msg1 TINYTEXT FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false'),
2449    msg2 CHAR(20) FULLTEXT INDEX WITH (analyzer='Chinese', case_sensitive='true')
2450)";
2451        let result3 = ParserContext::create_with_dialect(
2452            sql3,
2453            &GreptimeDbDialect {},
2454            ParseOptions::default(),
2455        )
2456        .unwrap();
2457
2458        if let Statement::CreateTable(c) = &result3[0] {
2459            c.columns.iter().for_each(|col| {
2460                if col.name().value == "msg1" {
2461                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2462                    assert_eq!(options.len(), 2);
2463                    assert_eq!(options.get("analyzer").unwrap(), "English");
2464                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2465                } else if col.name().value == "msg2" {
2466                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2467                    assert_eq!(options.len(), 2);
2468                    assert_eq!(options.get("analyzer").unwrap(), "Chinese");
2469                    assert_eq!(options.get("case_sensitive").unwrap(), "true");
2470                }
2471            });
2472        } else {
2473            panic!("should be create_table statement");
2474        }
2475    }
2476
2477    #[test]
2478    fn test_parse_create_table_fulltext_options_invalid_type() {
2479        let sql = r"
2480CREATE TABLE log (
2481    ts TIMESTAMP TIME INDEX,
2482    msg INT FULLTEXT INDEX,
2483)";
2484        let result =
2485            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2486        assert!(result.is_err());
2487        assert!(
2488            result
2489                .unwrap_err()
2490                .to_string()
2491                .contains("FULLTEXT index only supports string type")
2492        );
2493    }
2494
2495    #[test]
2496    fn test_parse_create_table_fulltext_options_duplicate() {
2497        let sql = r"
2498CREATE TABLE log (
2499    ts TIMESTAMP TIME INDEX,
2500    msg STRING FULLTEXT INDEX WITH (analyzer='English', analyzer='Chinese') FULLTEXT INDEX WITH (case_sensitive='false')
2501)";
2502        let result =
2503            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2504        assert!(result.is_err());
2505        assert!(
2506            result
2507                .unwrap_err()
2508                .to_string()
2509                .contains("duplicated FULLTEXT INDEX option")
2510        );
2511    }
2512
2513    #[test]
2514    fn test_parse_create_table_fulltext_options_invalid_option() {
2515        let sql = r"
2516CREATE TABLE log (
2517    ts TIMESTAMP TIME INDEX,
2518    msg STRING FULLTEXT INDEX WITH (analyzer='English', invalid_option='Chinese')
2519)";
2520        let result =
2521            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2522        assert!(result.is_err());
2523        assert!(
2524            result
2525                .unwrap_err()
2526                .to_string()
2527                .contains("invalid FULLTEXT INDEX option")
2528        );
2529    }
2530
2531    #[test]
2532    fn test_parse_create_table_skip_options() {
2533        let sql = r"
2534CREATE TABLE log (
2535    ts TIMESTAMP TIME INDEX,
2536    msg INT SKIPPING INDEX WITH (granularity='8192', type='bloom'),
2537)";
2538        let result =
2539            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2540                .unwrap();
2541
2542        if let Statement::CreateTable(c) = &result[0] {
2543            c.columns.iter().for_each(|col| {
2544                if col.name().value == "msg" {
2545                    assert!(
2546                        !col.extensions
2547                            .skipping_index_options
2548                            .as_ref()
2549                            .unwrap()
2550                            .is_empty()
2551                    );
2552                }
2553            });
2554        } else {
2555            panic!("should be create_table statement");
2556        }
2557
2558        let sql = r"
2559        CREATE TABLE log (
2560            ts TIMESTAMP TIME INDEX,
2561            msg INT SKIPPING INDEX,
2562        )";
2563        let result =
2564            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2565                .unwrap();
2566
2567        if let Statement::CreateTable(c) = &result[0] {
2568            c.columns.iter().for_each(|col| {
2569                if col.name().value == "msg" {
2570                    assert!(
2571                        col.extensions
2572                            .skipping_index_options
2573                            .as_ref()
2574                            .unwrap()
2575                            .is_empty()
2576                    );
2577                }
2578            });
2579        } else {
2580            panic!("should be create_table statement");
2581        }
2582    }
2583
2584    #[test]
2585    fn test_parse_create_view_with_columns() {
2586        let sql = "CREATE VIEW test () AS SELECT * FROM NUMBERS";
2587        let result =
2588            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2589                .unwrap();
2590
2591        match &result[0] {
2592            Statement::CreateView(c) => {
2593                assert_eq!(c.to_string(), "CREATE VIEW test AS SELECT * FROM NUMBERS");
2594                assert!(!c.or_replace);
2595                assert!(!c.if_not_exists);
2596                assert_eq!("test", c.name.to_string());
2597            }
2598            _ => unreachable!(),
2599        }
2600        assert_eq!(
2601            "CREATE VIEW test AS SELECT * FROM NUMBERS",
2602            result[0].to_string()
2603        );
2604
2605        let sql = "CREATE VIEW test (n1) AS SELECT * FROM NUMBERS";
2606        let result =
2607            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2608                .unwrap();
2609
2610        match &result[0] {
2611            Statement::CreateView(c) => {
2612                assert_eq!(c.to_string(), sql);
2613                assert!(!c.or_replace);
2614                assert!(!c.if_not_exists);
2615                assert_eq!("test", c.name.to_string());
2616            }
2617            _ => unreachable!(),
2618        }
2619        assert_eq!(sql, result[0].to_string());
2620
2621        let sql = "CREATE VIEW test (n1, n2) AS SELECT * FROM NUMBERS";
2622        let result =
2623            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2624                .unwrap();
2625
2626        match &result[0] {
2627            Statement::CreateView(c) => {
2628                assert_eq!(c.to_string(), sql);
2629                assert!(!c.or_replace);
2630                assert!(!c.if_not_exists);
2631                assert_eq!("test", c.name.to_string());
2632            }
2633            _ => unreachable!(),
2634        }
2635        assert_eq!(sql, result[0].to_string());
2636
2637        // Some invalid syntax cases
2638        let sql = "CREATE VIEW test (n1 AS select * from demo";
2639        let result =
2640            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2641        assert!(result.is_err());
2642
2643        let sql = "CREATE VIEW test (n1, AS select * from demo";
2644        let result =
2645            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2646        assert!(result.is_err());
2647
2648        let sql = "CREATE VIEW test n1,n2) AS select * from demo";
2649        let result =
2650            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2651        assert!(result.is_err());
2652
2653        let sql = "CREATE VIEW test (1) AS select * from demo";
2654        let result =
2655            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2656        assert!(result.is_err());
2657
2658        // keyword
2659        let sql = "CREATE VIEW test (n1, select) AS select * from demo";
2660        let result =
2661            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2662        assert!(result.is_err());
2663    }
2664
2665    #[test]
2666    fn test_parse_column_extensions_vector() {
2667        let sql = "VECTOR(128)";
2668        let dialect = GenericDialect {};
2669        let mut tokenizer = Tokenizer::new(&dialect, sql);
2670        let tokens = tokenizer.tokenize().unwrap();
2671        let mut parser = Parser::new(&dialect).with_tokens(tokens);
2672        let name = Ident::new("vec_col");
2673        let data_type =
2674            DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
2675        let mut extensions = ColumnExtensions::default();
2676
2677        let result =
2678            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2679        assert!(result.is_ok());
2680        assert!(extensions.vector_options.is_some());
2681        let vector_options = extensions.vector_options.unwrap();
2682        assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some(&"128".to_string()));
2683    }
2684
2685    #[test]
2686    fn test_parse_column_extensions_vector_invalid() {
2687        let sql = "VECTOR()";
2688        let dialect = GenericDialect {};
2689        let mut tokenizer = Tokenizer::new(&dialect, sql);
2690        let tokens = tokenizer.tokenize().unwrap();
2691        let mut parser = Parser::new(&dialect).with_tokens(tokens);
2692        let name = Ident::new("vec_col");
2693        let data_type = DataType::Custom(vec![Ident::new("VECTOR")].into(), vec![]);
2694        let mut extensions = ColumnExtensions::default();
2695
2696        let result =
2697            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2698        assert!(result.is_err());
2699    }
2700
2701    #[test]
2702    fn test_parse_column_extensions_indices() {
2703        // Test skipping index
2704        {
2705            let sql = "SKIPPING INDEX";
2706            let dialect = GenericDialect {};
2707            let mut tokenizer = Tokenizer::new(&dialect, sql);
2708            let tokens = tokenizer.tokenize().unwrap();
2709            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2710            let name = Ident::new("col");
2711            let data_type = DataType::String(None);
2712            let mut extensions = ColumnExtensions::default();
2713            let result = ParserContext::parse_column_extensions(
2714                &mut parser,
2715                &name,
2716                &data_type,
2717                &mut extensions,
2718            );
2719            assert!(result.is_ok());
2720            assert!(extensions.skipping_index_options.is_some());
2721        }
2722
2723        // Test fulltext index with options
2724        {
2725            let sql = "FULLTEXT INDEX WITH (analyzer = 'English', case_sensitive = 'true')";
2726            let dialect = GenericDialect {};
2727            let mut tokenizer = Tokenizer::new(&dialect, sql);
2728            let tokens = tokenizer.tokenize().unwrap();
2729            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2730            let name = Ident::new("text_col");
2731            let data_type = DataType::String(None);
2732            let mut extensions = ColumnExtensions::default();
2733            let result = ParserContext::parse_column_extensions(
2734                &mut parser,
2735                &name,
2736                &data_type,
2737                &mut extensions,
2738            );
2739            assert!(result.unwrap());
2740            assert!(extensions.fulltext_index_options.is_some());
2741            let fulltext_options = extensions.fulltext_index_options.unwrap();
2742            assert_eq!(
2743                fulltext_options.get("analyzer"),
2744                Some(&"English".to_string())
2745            );
2746            assert_eq!(
2747                fulltext_options.get("case_sensitive"),
2748                Some(&"true".to_string())
2749            );
2750        }
2751
2752        // Test fulltext index with invalid type (should fail)
2753        {
2754            let sql = "FULLTEXT INDEX WITH (analyzer = 'English')";
2755            let dialect = GenericDialect {};
2756            let mut tokenizer = Tokenizer::new(&dialect, sql);
2757            let tokens = tokenizer.tokenize().unwrap();
2758            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2759            let name = Ident::new("num_col");
2760            let data_type = DataType::Int(None); // Non-string type
2761            let mut extensions = ColumnExtensions::default();
2762            let result = ParserContext::parse_column_extensions(
2763                &mut parser,
2764                &name,
2765                &data_type,
2766                &mut extensions,
2767            );
2768            assert!(result.is_err());
2769            assert!(
2770                result
2771                    .unwrap_err()
2772                    .to_string()
2773                    .contains("FULLTEXT index only supports string type")
2774            );
2775        }
2776
2777        // Test fulltext index with invalid option (won't fail, the parser doesn't check the option's content)
2778        {
2779            let sql = "FULLTEXT INDEX WITH (analyzer = 'Invalid', case_sensitive = 'true')";
2780            let dialect = GenericDialect {};
2781            let mut tokenizer = Tokenizer::new(&dialect, sql);
2782            let tokens = tokenizer.tokenize().unwrap();
2783            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2784            let name = Ident::new("text_col");
2785            let data_type = DataType::String(None);
2786            let mut extensions = ColumnExtensions::default();
2787            let result = ParserContext::parse_column_extensions(
2788                &mut parser,
2789                &name,
2790                &data_type,
2791                &mut extensions,
2792            );
2793            assert!(result.unwrap());
2794        }
2795
2796        // Test inverted index
2797        {
2798            let sql = "INVERTED INDEX";
2799            let dialect = GenericDialect {};
2800            let mut tokenizer = Tokenizer::new(&dialect, sql);
2801            let tokens = tokenizer.tokenize().unwrap();
2802            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2803            let name = Ident::new("col");
2804            let data_type = DataType::String(None);
2805            let mut extensions = ColumnExtensions::default();
2806            let result = ParserContext::parse_column_extensions(
2807                &mut parser,
2808                &name,
2809                &data_type,
2810                &mut extensions,
2811            );
2812            assert!(result.is_ok());
2813            assert!(extensions.inverted_index_options.is_some());
2814        }
2815
2816        // Test inverted index with options (should fail)
2817        {
2818            let sql = "INVERTED INDEX WITH (analyzer = 'English')";
2819            let dialect = GenericDialect {};
2820            let mut tokenizer = Tokenizer::new(&dialect, sql);
2821            let tokens = tokenizer.tokenize().unwrap();
2822            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2823            let name = Ident::new("col");
2824            let data_type = DataType::String(None);
2825            let mut extensions = ColumnExtensions::default();
2826            let result = ParserContext::parse_column_extensions(
2827                &mut parser,
2828                &name,
2829                &data_type,
2830                &mut extensions,
2831            );
2832            assert!(result.is_err());
2833            assert!(
2834                result
2835                    .unwrap_err()
2836                    .to_string()
2837                    .contains("INVERTED index doesn't support options")
2838            );
2839        }
2840
2841        // Test multiple indices
2842        {
2843            let sql = "SKIPPING INDEX FULLTEXT INDEX";
2844            let dialect = GenericDialect {};
2845            let mut tokenizer = Tokenizer::new(&dialect, sql);
2846            let tokens = tokenizer.tokenize().unwrap();
2847            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2848            let name = Ident::new("col");
2849            let data_type = DataType::String(None);
2850            let mut extensions = ColumnExtensions::default();
2851            let result = ParserContext::parse_column_extensions(
2852                &mut parser,
2853                &name,
2854                &data_type,
2855                &mut extensions,
2856            );
2857            assert!(result.unwrap());
2858            assert!(extensions.skipping_index_options.is_some());
2859            assert!(extensions.fulltext_index_options.is_some());
2860        }
2861    }
2862
2863    #[test]
2864    fn test_parse_interval_cast() {
2865        let s = "select '10s'::INTERVAL";
2866        let stmts =
2867            ParserContext::create_with_dialect(s, &GreptimeDbDialect {}, ParseOptions::default())
2868                .unwrap();
2869        assert_eq!("SELECT '10 seconds'::INTERVAL", &stmts[0].to_string());
2870    }
2871}