sql/parsers/
create_parser.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::HashMap;
19
20use arrow_buffer::IntervalMonthDayNano;
21use common_catalog::consts::default_engine;
22use datafusion_common::ScalarValue;
23use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
24use datatypes::data_type::ConcreteDataType;
25use itertools::Itertools;
26use snafu::{OptionExt, ResultExt, ensure};
27use sqlparser::ast::{ColumnOption, ColumnOptionDef, DataType, Expr};
28use sqlparser::dialect::keywords::Keyword;
29use sqlparser::keywords::ALL_KEYWORDS;
30use sqlparser::parser::IsOptional::Mandatory;
31use sqlparser::parser::{Parser, ParserError};
32use sqlparser::tokenizer::{Token, TokenWithSpan, Word};
33use table::requests::{validate_database_option, validate_table_option};
34
35use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
36use crate::error::{
37    self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidIntervalSnafu,
38    InvalidSqlSnafu, InvalidTableOptionSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result,
39    SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu,
40};
41use crate::parser::{FLOW, ParserContext};
42use crate::parsers::tql_parser;
43use crate::parsers::utils::{
44    self, validate_column_fulltext_create_option, validate_column_skipping_index_create_option,
45};
46use crate::statements::create::{
47    Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
48    CreateTableLike, CreateView, Partitions, SqlOrTql, TableConstraint, VECTOR_OPT_DIM,
49};
50use crate::statements::statement::Statement;
51use crate::statements::transform::type_alias::get_data_type_by_alias_name;
52use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
53use crate::util::{location_to_index, parse_option_string};
54
55pub const ENGINE: &str = "ENGINE";
56pub const MAXVALUE: &str = "MAXVALUE";
57pub const SINK: &str = "SINK";
58pub const EXPIRE: &str = "EXPIRE";
59pub const AFTER: &str = "AFTER";
60pub const INVERTED: &str = "INVERTED";
61pub const SKIPPING: &str = "SKIPPING";
62
63pub type RawIntervalExpr = String;
64
65/// Parses create [table] statement
66impl<'a> ParserContext<'a> {
67    pub(crate) fn parse_create(&mut self) -> Result<Statement> {
68        match self.parser.peek_token().token {
69            Token::Word(w) => match w.keyword {
70                Keyword::TABLE => self.parse_create_table(),
71
72                Keyword::SCHEMA | Keyword::DATABASE => self.parse_create_database(),
73
74                Keyword::EXTERNAL => self.parse_create_external_table(),
75
76                Keyword::OR => {
77                    let _ = self.parser.next_token();
78                    self.parser
79                        .expect_keyword(Keyword::REPLACE)
80                        .context(SyntaxSnafu)?;
81                    match self.parser.next_token().token {
82                        Token::Word(w) => match w.keyword {
83                            Keyword::VIEW => self.parse_create_view(true),
84                            Keyword::NoKeyword => {
85                                let uppercase = w.value.to_uppercase();
86                                match uppercase.as_str() {
87                                    FLOW => self.parse_create_flow(true),
88                                    _ => self.unsupported(w.to_string()),
89                                }
90                            }
91                            _ => self.unsupported(w.to_string()),
92                        },
93                        _ => self.unsupported(w.to_string()),
94                    }
95                }
96
97                Keyword::VIEW => {
98                    let _ = self.parser.next_token();
99                    self.parse_create_view(false)
100                }
101
102                #[cfg(feature = "enterprise")]
103                Keyword::TRIGGER => {
104                    let _ = self.parser.next_token();
105                    self.parse_create_trigger()
106                }
107
108                Keyword::NoKeyword => {
109                    let _ = self.parser.next_token();
110                    let uppercase = w.value.to_uppercase();
111                    match uppercase.as_str() {
112                        FLOW => self.parse_create_flow(false),
113                        _ => self.unsupported(w.to_string()),
114                    }
115                }
116                _ => self.unsupported(w.to_string()),
117            },
118            unexpected => self.unsupported(unexpected.to_string()),
119        }
120    }
121
122    /// Parse `CREAVE VIEW` statement.
123    fn parse_create_view(&mut self, or_replace: bool) -> Result<Statement> {
124        let if_not_exists = self.parse_if_not_exist()?;
125        let view_name = self.intern_parse_table_name()?;
126
127        let columns = self.parse_view_columns()?;
128
129        self.parser
130            .expect_keyword(Keyword::AS)
131            .context(SyntaxSnafu)?;
132
133        let query = self.parse_query()?;
134
135        Ok(Statement::CreateView(CreateView {
136            name: view_name,
137            columns,
138            or_replace,
139            query: Box::new(query),
140            if_not_exists,
141        }))
142    }
143
144    fn parse_view_columns(&mut self) -> Result<Vec<Ident>> {
145        let mut columns = vec![];
146        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
147            return Ok(columns);
148        }
149
150        loop {
151            let name = self.parse_column_name().context(SyntaxSnafu)?;
152
153            columns.push(name);
154
155            let comma = self.parser.consume_token(&Token::Comma);
156            if self.parser.consume_token(&Token::RParen) {
157                // allow a trailing comma, even though it's not in standard
158                break;
159            } else if !comma {
160                return self.expected("',' or ')' after column name", self.parser.peek_token());
161            }
162        }
163
164        Ok(columns)
165    }
166
167    fn parse_create_external_table(&mut self) -> Result<Statement> {
168        let _ = self.parser.next_token();
169        self.parser
170            .expect_keyword(Keyword::TABLE)
171            .context(SyntaxSnafu)?;
172        let if_not_exists = self.parse_if_not_exist()?;
173        let table_name = self.intern_parse_table_name()?;
174        let (columns, constraints) = self.parse_columns()?;
175        if !columns.is_empty() {
176            validate_time_index(&columns, &constraints)?;
177        }
178
179        let engine = self.parse_table_engine(common_catalog::consts::FILE_ENGINE)?;
180        let options = self.parse_create_table_options()?;
181        Ok(Statement::CreateExternalTable(CreateExternalTable {
182            name: table_name,
183            columns,
184            constraints,
185            options,
186            if_not_exists,
187            engine,
188        }))
189    }
190
191    fn parse_create_database(&mut self) -> Result<Statement> {
192        let _ = self.parser.next_token();
193        let if_not_exists = self.parse_if_not_exist()?;
194        let database_name = self.parse_object_name().context(error::UnexpectedSnafu {
195            expected: "a database name",
196            actual: self.peek_token_as_string(),
197        })?;
198        let database_name = Self::canonicalize_object_name(database_name);
199
200        let options = self
201            .parser
202            .parse_options(Keyword::WITH)
203            .context(SyntaxSnafu)?
204            .into_iter()
205            .map(parse_option_string)
206            .collect::<Result<HashMap<String, String>>>()?;
207
208        for key in options.keys() {
209            ensure!(
210                validate_database_option(key),
211                InvalidDatabaseOptionSnafu { key: key.clone() }
212            );
213        }
214        if let Some(append_mode) = options.get("append_mode")
215            && append_mode == "true"
216            && options.contains_key("merge_mode")
217        {
218            return InvalidDatabaseOptionSnafu {
219                key: "merge_mode".to_string(),
220            }
221            .fail();
222        }
223
224        Ok(Statement::CreateDatabase(CreateDatabase {
225            name: database_name,
226            if_not_exists,
227            options: options.into(),
228        }))
229    }
230
231    fn parse_create_table(&mut self) -> Result<Statement> {
232        let _ = self.parser.next_token();
233
234        let if_not_exists = self.parse_if_not_exist()?;
235
236        let table_name = self.intern_parse_table_name()?;
237
238        if self.parser.parse_keyword(Keyword::LIKE) {
239            let source_name = self.intern_parse_table_name()?;
240
241            return Ok(Statement::CreateTableLike(CreateTableLike {
242                table_name,
243                source_name,
244            }));
245        }
246
247        let (columns, constraints) = self.parse_columns()?;
248        validate_time_index(&columns, &constraints)?;
249
250        let partitions = self.parse_partitions()?;
251        if let Some(partitions) = &partitions {
252            validate_partitions(&columns, partitions)?;
253        }
254
255        let engine = self.parse_table_engine(default_engine())?;
256        let options = self.parse_create_table_options()?;
257        let create_table = CreateTable {
258            if_not_exists,
259            name: table_name,
260            columns,
261            engine,
262            constraints,
263            options,
264            table_id: 0, // table id is assigned by catalog manager
265            partitions,
266        };
267
268        Ok(Statement::CreateTable(create_table))
269    }
270
271    /// "CREATE FLOW" clause
272    fn parse_create_flow(&mut self, or_replace: bool) -> Result<Statement> {
273        let if_not_exists = self.parse_if_not_exist()?;
274
275        let flow_name = self.intern_parse_table_name()?;
276
277        // make `SINK` case in-sensitive
278        if let Token::Word(word) = self.parser.peek_token().token
279            && word.value.eq_ignore_ascii_case(SINK)
280        {
281            self.parser.next_token();
282        } else {
283            Err(ParserError::ParserError(
284                "Expect `SINK` keyword".to_string(),
285            ))
286            .context(SyntaxSnafu)?
287        }
288        self.parser
289            .expect_keyword(Keyword::TO)
290            .context(SyntaxSnafu)?;
291
292        let output_table_name = self.intern_parse_table_name()?;
293
294        let expire_after = if let Token::Word(w1) = &self.parser.peek_token().token
295            && w1.value.eq_ignore_ascii_case(EXPIRE)
296        {
297            self.parser.next_token();
298            if let Token::Word(w2) = &self.parser.peek_token().token
299                && w2.value.eq_ignore_ascii_case(AFTER)
300            {
301                self.parser.next_token();
302                Some(self.parse_interval_no_month("EXPIRE AFTER")?)
303            } else {
304                None
305            }
306        } else {
307            None
308        };
309
310        let eval_interval = if self
311            .parser
312            .consume_tokens(&[Token::make_keyword("EVAL"), Token::make_keyword("INTERVAL")])
313        {
314            Some(self.parse_interval_no_month("EVAL INTERVAL")?)
315        } else {
316            None
317        };
318
319        let comment = if self.parser.parse_keyword(Keyword::COMMENT) {
320            match self.parser.next_token() {
321                TokenWithSpan {
322                    token: Token::SingleQuotedString(value, ..),
323                    ..
324                } => Some(value),
325                unexpected => {
326                    return self
327                        .parser
328                        .expected("string", unexpected)
329                        .context(SyntaxSnafu);
330                }
331            }
332        } else {
333            None
334        };
335
336        self.parser
337            .expect_keyword(Keyword::AS)
338            .context(SyntaxSnafu)?;
339
340        let query = Box::new(self.parse_sql_or_tql(true)?);
341
342        Ok(Statement::CreateFlow(CreateFlow {
343            flow_name,
344            sink_table_name: output_table_name,
345            or_replace,
346            if_not_exists,
347            expire_after,
348            eval_interval,
349            comment,
350            query,
351        }))
352    }
353
354    fn parse_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
355        let start_loc = self.parser.peek_token().span.start;
356        let start_index = location_to_index(self.sql, &start_loc);
357
358        // only accept sql or tql
359        let query = match self.parser.peek_token().token {
360            Token::Word(w) => match w.keyword {
361                Keyword::SELECT => self.parse_query(),
362                Keyword::NoKeyword
363                    if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
364                {
365                    self.parse_tql(require_now_expr)
366                }
367
368                _ => self.unsupported(self.peek_token_as_string()),
369            },
370            _ => self.unsupported(self.peek_token_as_string()),
371        }?;
372
373        let end_token = self.parser.peek_token();
374
375        let raw_query = if end_token == Token::EOF {
376            &self.sql[start_index..]
377        } else {
378            let end_loc = end_token.span.end;
379            let end_index = location_to_index(self.sql, &end_loc);
380            &self.sql[start_index..end_index.min(self.sql.len())]
381        };
382        let raw_query = raw_query.trim_end_matches(";");
383        let query = SqlOrTql::try_from_statement(query, raw_query)?;
384        Ok(query)
385    }
386
387    /// Parse the interval expr to duration in seconds.
388    fn parse_interval_no_month(&mut self, context: &str) -> Result<i64> {
389        let interval = self.parse_interval_month_day_nano()?.0;
390        if interval.months != 0 {
391            return InvalidIntervalSnafu {
392                reason: format!("Interval with months is not allowed in {context}"),
393            }
394            .fail();
395        }
396        Ok(
397            interval.nanoseconds / 1_000_000_000
398                + interval.days as i64 * 60 * 60 * 24
399                + interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, // 1 month=365.25/12=30.44 days
400                                                                       // this is to keep the same as https://docs.rs/humantime/latest/humantime/fn.parse_duration.html
401                                                                       // which we use in database to parse i.e. ttl interval and many other intervals
402        )
403    }
404
405    /// Parse interval expr to [`IntervalMonthDayNano`].
406    fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> {
407        let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
408        let raw_interval_expr = interval_expr.to_string();
409        let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone(), false)?
410            .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
411            .ok()
412            .with_context(|| InvalidIntervalSnafu {
413                reason: format!("cannot cast {} to interval type", interval_expr),
414            })?;
415        if let ScalarValue::IntervalMonthDayNano(Some(interval)) = interval {
416            Ok((interval, raw_interval_expr))
417        } else {
418            unreachable!()
419        }
420    }
421
422    fn parse_if_not_exist(&mut self) -> Result<bool> {
423        match self.parser.peek_token().token {
424            Token::Word(w) if Keyword::IF != w.keyword => return Ok(false),
425            _ => {}
426        }
427
428        if self.parser.parse_keywords(&[Keyword::IF, Keyword::NOT]) {
429            return self
430                .parser
431                .expect_keyword(Keyword::EXISTS)
432                .map(|_| true)
433                .context(UnexpectedSnafu {
434                    expected: "EXISTS",
435                    actual: self.peek_token_as_string(),
436                });
437        }
438
439        if self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]) {
440            return UnsupportedSnafu { keyword: "EXISTS" }.fail();
441        }
442
443        Ok(false)
444    }
445
446    fn parse_create_table_options(&mut self) -> Result<OptionMap> {
447        let options = self
448            .parser
449            .parse_options(Keyword::WITH)
450            .context(SyntaxSnafu)?
451            .into_iter()
452            .map(parse_option_string)
453            .collect::<Result<HashMap<String, String>>>()?;
454        for key in options.keys() {
455            ensure!(validate_table_option(key), InvalidTableOptionSnafu { key });
456        }
457        Ok(options.into())
458    }
459
460    /// "PARTITION ON COLUMNS (...)" clause
461    fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
462        if !self.parser.parse_keyword(Keyword::PARTITION) {
463            return Ok(None);
464        }
465        self.parser
466            .expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
467            .context(error::UnexpectedSnafu {
468                expected: "ON, COLUMNS",
469                actual: self.peek_token_as_string(),
470            })?;
471
472        let raw_column_list = self
473            .parser
474            .parse_parenthesized_column_list(Mandatory, false)
475            .context(error::SyntaxSnafu)?;
476        let column_list = raw_column_list
477            .into_iter()
478            .map(Self::canonicalize_identifier)
479            .collect();
480
481        let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
482
483        Ok(Some(Partitions { column_list, exprs }))
484    }
485
486    fn parse_partition_entry(&mut self) -> Result<Expr> {
487        self.parser.parse_expr().context(error::SyntaxSnafu)
488    }
489
490    /// Parse a comma-separated list wrapped by "()", and of which all items accepted by `F`
491    fn parse_comma_separated<T, F>(&mut self, mut f: F) -> Result<Vec<T>>
492    where
493        F: FnMut(&mut ParserContext<'a>) -> Result<T>,
494    {
495        self.parser
496            .expect_token(&Token::LParen)
497            .context(error::UnexpectedSnafu {
498                expected: "(",
499                actual: self.peek_token_as_string(),
500            })?;
501
502        let mut values = vec![];
503        while self.parser.peek_token() != Token::RParen {
504            values.push(f(self)?);
505            if !self.parser.consume_token(&Token::Comma) {
506                break;
507            }
508        }
509
510        self.parser
511            .expect_token(&Token::RParen)
512            .context(error::UnexpectedSnafu {
513                expected: ")",
514                actual: self.peek_token_as_string(),
515            })?;
516
517        Ok(values)
518    }
519
520    /// Parse the columns and constraints.
521    fn parse_columns(&mut self) -> Result<(Vec<Column>, Vec<TableConstraint>)> {
522        let mut columns = vec![];
523        let mut constraints = vec![];
524        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
525            return Ok((columns, constraints));
526        }
527
528        loop {
529            if let Some(constraint) = self.parse_optional_table_constraint()? {
530                constraints.push(constraint);
531            } else if let Token::Word(_) = self.parser.peek_token().token {
532                self.parse_column(&mut columns, &mut constraints)?;
533            } else {
534                return self.expected(
535                    "column name or constraint definition",
536                    self.parser.peek_token(),
537                );
538            }
539            let comma = self.parser.consume_token(&Token::Comma);
540            if self.parser.consume_token(&Token::RParen) {
541                // allow a trailing comma, even though it's not in standard
542                break;
543            } else if !comma {
544                return self.expected(
545                    "',' or ')' after column definition",
546                    self.parser.peek_token(),
547                );
548            }
549        }
550
551        Ok((columns, constraints))
552    }
553
554    fn parse_column(
555        &mut self,
556        columns: &mut Vec<Column>,
557        constraints: &mut Vec<TableConstraint>,
558    ) -> Result<()> {
559        let mut column = self.parse_column_def()?;
560
561        let mut time_index_opt_idx = None;
562        for (index, opt) in column.options().iter().enumerate() {
563            if let ColumnOption::DialectSpecific(tokens) = &opt.option
564                && matches!(
565                    &tokens[..],
566                    [
567                        Token::Word(Word {
568                            keyword: Keyword::TIME,
569                            ..
570                        }),
571                        Token::Word(Word {
572                            keyword: Keyword::INDEX,
573                            ..
574                        })
575                    ]
576                )
577            {
578                ensure!(
579                    time_index_opt_idx.is_none(),
580                    InvalidColumnOptionSnafu {
581                        name: column.name().to_string(),
582                        msg: "duplicated time index",
583                    }
584                );
585                time_index_opt_idx = Some(index);
586
587                let constraint = TableConstraint::TimeIndex {
588                    column: Ident::new(column.name().value.clone()),
589                };
590                constraints.push(constraint);
591            }
592        }
593
594        if let Some(index) = time_index_opt_idx {
595            ensure!(
596                !column.options().contains(&ColumnOptionDef {
597                    option: ColumnOption::Null,
598                    name: None,
599                }),
600                InvalidColumnOptionSnafu {
601                    name: column.name().to_string(),
602                    msg: "time index column can't be null",
603                }
604            );
605
606            // The timestamp type may be an alias type, we have to retrieve the actual type.
607            let data_type = get_unalias_type(column.data_type());
608            ensure!(
609                matches!(data_type, DataType::Timestamp(_, _)),
610                InvalidColumnOptionSnafu {
611                    name: column.name().to_string(),
612                    msg: "time index column data type should be timestamp",
613                }
614            );
615
616            let not_null_opt = ColumnOptionDef {
617                option: ColumnOption::NotNull,
618                name: None,
619            };
620
621            if !column.options().contains(&not_null_opt) {
622                column.mut_options().push(not_null_opt);
623            }
624
625            let _ = column.mut_options().remove(index);
626        }
627
628        columns.push(column);
629
630        Ok(())
631    }
632
633    /// Parse the column name and check if it's valid.
634    fn parse_column_name(&mut self) -> std::result::Result<Ident, ParserError> {
635        let name = self.parser.parse_identifier()?;
636        if name.quote_style.is_none() &&
637        // "ALL_KEYWORDS" are sorted.
638            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()
639        {
640            return Err(ParserError::ParserError(format!(
641                "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
642                &name.value
643            )));
644        }
645
646        Ok(name)
647    }
648
649    pub fn parse_column_def(&mut self) -> Result<Column> {
650        let name = self.parse_column_name().context(SyntaxSnafu)?;
651        let parser = &mut self.parser;
652
653        ensure!(
654            !(name.quote_style.is_none() &&
655            // "ALL_KEYWORDS" are sorted.
656            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()),
657            InvalidSqlSnafu {
658                msg: format!(
659                    "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
660                    &name.value
661                ),
662            }
663        );
664
665        let data_type = parser.parse_data_type().context(SyntaxSnafu)?;
666        let mut options = vec![];
667        let mut extensions = ColumnExtensions::default();
668        loop {
669            if parser.parse_keyword(Keyword::CONSTRAINT) {
670                let name = Some(parser.parse_identifier().context(SyntaxSnafu)?);
671                if let Some(option) = Self::parse_optional_column_option(parser)? {
672                    options.push(ColumnOptionDef { name, option });
673                } else {
674                    return parser
675                        .expected(
676                            "constraint details after CONSTRAINT <name>",
677                            parser.peek_token(),
678                        )
679                        .context(SyntaxSnafu);
680                }
681            } else if let Some(option) = Self::parse_optional_column_option(parser)? {
682                options.push(ColumnOptionDef { name: None, option });
683            } else if !Self::parse_column_extensions(parser, &name, &data_type, &mut extensions)? {
684                break;
685            };
686        }
687
688        Ok(Column {
689            column_def: ColumnDef {
690                name: Self::canonicalize_identifier(name),
691                data_type,
692                options,
693            },
694            extensions,
695        })
696    }
697
698    fn parse_optional_column_option(parser: &mut Parser<'_>) -> Result<Option<ColumnOption>> {
699        if parser.parse_keywords(&[Keyword::CHARACTER, Keyword::SET]) {
700            Ok(Some(ColumnOption::CharacterSet(
701                parser.parse_object_name(false).context(SyntaxSnafu)?,
702            )))
703        } else if parser.parse_keywords(&[Keyword::NOT, Keyword::NULL]) {
704            Ok(Some(ColumnOption::NotNull))
705        } else if parser.parse_keywords(&[Keyword::COMMENT]) {
706            match parser.next_token() {
707                TokenWithSpan {
708                    token: Token::SingleQuotedString(value, ..),
709                    ..
710                } => Ok(Some(ColumnOption::Comment(value))),
711                unexpected => parser.expected("string", unexpected).context(SyntaxSnafu),
712            }
713        } else if parser.parse_keyword(Keyword::NULL) {
714            Ok(Some(ColumnOption::Null))
715        } else if parser.parse_keyword(Keyword::DEFAULT) {
716            Ok(Some(ColumnOption::Default(
717                parser.parse_expr().context(SyntaxSnafu)?,
718            )))
719        } else if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
720            Ok(Some(ColumnOption::Unique {
721                is_primary: true,
722                characteristics: None,
723            }))
724        } else if parser.parse_keyword(Keyword::UNIQUE) {
725            Ok(Some(ColumnOption::Unique {
726                is_primary: false,
727                characteristics: None,
728            }))
729        } else if parser.parse_keywords(&[Keyword::TIME, Keyword::INDEX]) {
730            // Use a DialectSpecific option for time index
731            Ok(Some(ColumnOption::DialectSpecific(vec![
732                Token::Word(Word {
733                    value: "TIME".to_string(),
734                    quote_style: None,
735                    keyword: Keyword::TIME,
736                }),
737                Token::Word(Word {
738                    value: "INDEX".to_string(),
739                    quote_style: None,
740                    keyword: Keyword::INDEX,
741                }),
742            ])))
743        } else {
744            Ok(None)
745        }
746    }
747
748    /// Parse a column option extensions.
749    ///
750    /// This function will handle:
751    /// - Vector type
752    /// - Indexes
753    fn parse_column_extensions(
754        parser: &mut Parser<'_>,
755        column_name: &Ident,
756        column_type: &DataType,
757        column_extensions: &mut ColumnExtensions,
758    ) -> Result<bool> {
759        if let DataType::Custom(name, tokens) = column_type
760            && name.0.len() == 1
761            && &name.0[0].to_string_unquoted().to_uppercase() == "VECTOR"
762        {
763            ensure!(
764                tokens.len() == 1,
765                InvalidColumnOptionSnafu {
766                    name: column_name.to_string(),
767                    msg: "VECTOR type should have dimension",
768                }
769            );
770
771            let dimension =
772                tokens[0]
773                    .parse::<u32>()
774                    .ok()
775                    .with_context(|| InvalidColumnOptionSnafu {
776                        name: column_name.to_string(),
777                        msg: "dimension should be a positive integer",
778                    })?;
779
780            let options = OptionMap::from([(VECTOR_OPT_DIM.to_string(), dimension.to_string())]);
781            column_extensions.vector_options = Some(options);
782        }
783
784        // parse index options in column definition
785        let mut is_index_declared = false;
786
787        // skipping index
788        if let Token::Word(word) = parser.peek_token().token
789            && word.value.eq_ignore_ascii_case(SKIPPING)
790        {
791            parser.next_token();
792            // Consume `INDEX` keyword
793            ensure!(
794                parser.parse_keyword(Keyword::INDEX),
795                InvalidColumnOptionSnafu {
796                    name: column_name.to_string(),
797                    msg: "expect INDEX after SKIPPING keyword",
798                }
799            );
800            ensure!(
801                column_extensions.skipping_index_options.is_none(),
802                InvalidColumnOptionSnafu {
803                    name: column_name.to_string(),
804                    msg: "duplicated SKIPPING index option",
805                }
806            );
807
808            let options = parser
809                .parse_options(Keyword::WITH)
810                .context(error::SyntaxSnafu)?
811                .into_iter()
812                .map(parse_option_string)
813                .collect::<Result<HashMap<String, String>>>()?;
814
815            for key in options.keys() {
816                ensure!(
817                    validate_column_skipping_index_create_option(key),
818                    InvalidColumnOptionSnafu {
819                        name: column_name.to_string(),
820                        msg: format!("invalid SKIPPING INDEX option: {key}"),
821                    }
822                );
823            }
824
825            column_extensions.skipping_index_options = Some(options.into());
826            is_index_declared |= true;
827        }
828
829        // fulltext index
830        if parser.parse_keyword(Keyword::FULLTEXT) {
831            // Consume `INDEX` keyword
832            ensure!(
833                parser.parse_keyword(Keyword::INDEX),
834                InvalidColumnOptionSnafu {
835                    name: column_name.to_string(),
836                    msg: "expect INDEX after FULLTEXT keyword",
837                }
838            );
839
840            ensure!(
841                column_extensions.fulltext_index_options.is_none(),
842                InvalidColumnOptionSnafu {
843                    name: column_name.to_string(),
844                    msg: "duplicated FULLTEXT INDEX option",
845                }
846            );
847
848            let column_type = get_unalias_type(column_type);
849            let data_type = sql_data_type_to_concrete_data_type(&column_type)?;
850            ensure!(
851                data_type == ConcreteDataType::string_datatype(),
852                InvalidColumnOptionSnafu {
853                    name: column_name.to_string(),
854                    msg: "FULLTEXT index only supports string type",
855                }
856            );
857
858            let options = parser
859                .parse_options(Keyword::WITH)
860                .context(error::SyntaxSnafu)?
861                .into_iter()
862                .map(parse_option_string)
863                .collect::<Result<HashMap<String, String>>>()?;
864
865            for key in options.keys() {
866                ensure!(
867                    validate_column_fulltext_create_option(key),
868                    InvalidColumnOptionSnafu {
869                        name: column_name.to_string(),
870                        msg: format!("invalid FULLTEXT INDEX option: {key}"),
871                    }
872                );
873            }
874
875            column_extensions.fulltext_index_options = Some(options.into());
876            is_index_declared |= true;
877        }
878
879        // inverted index
880        if let Token::Word(word) = parser.peek_token().token
881            && word.value.eq_ignore_ascii_case(INVERTED)
882        {
883            parser.next_token();
884            // Consume `INDEX` keyword
885            ensure!(
886                parser.parse_keyword(Keyword::INDEX),
887                InvalidColumnOptionSnafu {
888                    name: column_name.to_string(),
889                    msg: "expect INDEX after INVERTED keyword",
890                }
891            );
892
893            ensure!(
894                column_extensions.inverted_index_options.is_none(),
895                InvalidColumnOptionSnafu {
896                    name: column_name.to_string(),
897                    msg: "duplicated INVERTED index option",
898                }
899            );
900
901            // inverted index doesn't have options, skipping `WITH`
902            // try cache `WITH` and throw error
903            let with_token = parser.peek_token();
904            ensure!(
905                with_token.token
906                    != Token::Word(Word {
907                        value: "WITH".to_string(),
908                        keyword: Keyword::WITH,
909                        quote_style: None,
910                    }),
911                InvalidColumnOptionSnafu {
912                    name: column_name.to_string(),
913                    msg: "INVERTED index doesn't support options",
914                }
915            );
916
917            column_extensions.inverted_index_options = Some(OptionMap::default());
918            is_index_declared |= true;
919        }
920
921        Ok(is_index_declared)
922    }
923
924    fn parse_optional_table_constraint(&mut self) -> Result<Option<TableConstraint>> {
925        match self.parser.next_token() {
926            TokenWithSpan {
927                token: Token::Word(w),
928                ..
929            } if w.keyword == Keyword::PRIMARY => {
930                self.parser
931                    .expect_keyword(Keyword::KEY)
932                    .context(error::UnexpectedSnafu {
933                        expected: "KEY",
934                        actual: self.peek_token_as_string(),
935                    })?;
936                let raw_columns = self
937                    .parser
938                    .parse_parenthesized_column_list(Mandatory, false)
939                    .context(error::SyntaxSnafu)?;
940                let columns = raw_columns
941                    .into_iter()
942                    .map(Self::canonicalize_identifier)
943                    .collect();
944                Ok(Some(TableConstraint::PrimaryKey { columns }))
945            }
946            TokenWithSpan {
947                token: Token::Word(w),
948                ..
949            } if w.keyword == Keyword::TIME => {
950                self.parser
951                    .expect_keyword(Keyword::INDEX)
952                    .context(error::UnexpectedSnafu {
953                        expected: "INDEX",
954                        actual: self.peek_token_as_string(),
955                    })?;
956
957                let raw_columns = self
958                    .parser
959                    .parse_parenthesized_column_list(Mandatory, false)
960                    .context(error::SyntaxSnafu)?;
961                let mut columns = raw_columns
962                    .into_iter()
963                    .map(Self::canonicalize_identifier)
964                    .collect::<Vec<_>>();
965
966                ensure!(
967                    columns.len() == 1,
968                    InvalidTimeIndexSnafu {
969                        msg: "it should contain only one column in time index",
970                    }
971                );
972
973                Ok(Some(TableConstraint::TimeIndex {
974                    column: columns.pop().unwrap(),
975                }))
976            }
977            _ => {
978                self.parser.prev_token();
979                Ok(None)
980            }
981        }
982    }
983
984    /// Parses the set of valid formats
985    fn parse_table_engine(&mut self, default: &str) -> Result<String> {
986        if !self.consume_token(ENGINE) {
987            return Ok(default.to_string());
988        }
989
990        self.parser
991            .expect_token(&Token::Eq)
992            .context(error::UnexpectedSnafu {
993                expected: "=",
994                actual: self.peek_token_as_string(),
995            })?;
996
997        let token = self.parser.next_token();
998        if let Token::Word(w) = token.token {
999            Ok(w.value)
1000        } else {
1001            self.expected("'Engine' is missing", token)
1002        }
1003    }
1004}
1005
1006fn validate_time_index(columns: &[Column], constraints: &[TableConstraint]) -> Result<()> {
1007    let time_index_constraints: Vec<_> = constraints
1008        .iter()
1009        .filter_map(|c| match c {
1010            TableConstraint::TimeIndex { column } => Some(column),
1011            _ => None,
1012        })
1013        .unique()
1014        .collect();
1015
1016    ensure!(!time_index_constraints.is_empty(), MissingTimeIndexSnafu);
1017    ensure!(
1018        time_index_constraints.len() == 1,
1019        InvalidTimeIndexSnafu {
1020            msg: format!(
1021                "expected only one time index constraint but actual {}",
1022                time_index_constraints.len()
1023            ),
1024        }
1025    );
1026
1027    // It's safe to use time_index_constraints[0][0],
1028    // we already check the bound above.
1029    let time_index_column_ident = &time_index_constraints[0];
1030    let time_index_column = columns
1031        .iter()
1032        .find(|c| c.name().value == *time_index_column_ident.value)
1033        .with_context(|| InvalidTimeIndexSnafu {
1034            msg: format!(
1035                "time index column {} not found in columns",
1036                time_index_column_ident
1037            ),
1038        })?;
1039
1040    let time_index_data_type = get_unalias_type(time_index_column.data_type());
1041    ensure!(
1042        matches!(time_index_data_type, DataType::Timestamp(_, _)),
1043        InvalidColumnOptionSnafu {
1044            name: time_index_column.name().to_string(),
1045            msg: "time index column data type should be timestamp",
1046        }
1047    );
1048
1049    Ok(())
1050}
1051
1052fn get_unalias_type(data_type: &DataType) -> DataType {
1053    match data_type {
1054        DataType::Custom(name, tokens) if name.0.len() == 1 && tokens.is_empty() => {
1055            if let Some(real_type) =
1056                get_data_type_by_alias_name(name.0[0].to_string_unquoted().as_str())
1057            {
1058                real_type
1059            } else {
1060                data_type.clone()
1061            }
1062        }
1063        _ => data_type.clone(),
1064    }
1065}
1066
1067fn validate_partitions(columns: &[Column], partitions: &Partitions) -> Result<()> {
1068    let partition_columns = ensure_partition_columns_defined(columns, partitions)?;
1069
1070    ensure_exprs_are_binary(&partitions.exprs, &partition_columns)?;
1071
1072    Ok(())
1073}
1074
1075/// Ensure all exprs are binary expr and all the columns are defined in the column list.
1076fn ensure_exprs_are_binary(exprs: &[Expr], columns: &[&Column]) -> Result<()> {
1077    for expr in exprs {
1078        // The first level must be binary expr
1079        if let Expr::BinaryOp { left, op: _, right } = expr {
1080            ensure_one_expr(left, columns)?;
1081            ensure_one_expr(right, columns)?;
1082        } else {
1083            return error::InvalidSqlSnafu {
1084                msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1085            }
1086            .fail();
1087        }
1088    }
1089    Ok(())
1090}
1091
1092/// Check if the expr is a binary expr, an ident or a literal value.
1093/// If is ident, then check it is in the column list.
1094/// This recursive function is intended to be used by [ensure_exprs_are_binary].
1095fn ensure_one_expr(expr: &Expr, columns: &[&Column]) -> Result<()> {
1096    match expr {
1097        Expr::BinaryOp { left, op: _, right } => {
1098            ensure_one_expr(left, columns)?;
1099            ensure_one_expr(right, columns)?;
1100            Ok(())
1101        }
1102        Expr::Identifier(ident) => {
1103            let column_name = &ident.value;
1104            ensure!(
1105                columns.iter().any(|c| &c.name().value == column_name),
1106                error::InvalidSqlSnafu {
1107                    msg: format!(
1108                        "Column {:?} in rule expr is not referenced in PARTITION ON",
1109                        column_name
1110                    ),
1111                }
1112            );
1113            Ok(())
1114        }
1115        Expr::Value(_) => Ok(()),
1116        Expr::UnaryOp { expr, .. } => {
1117            ensure_one_expr(expr, columns)?;
1118            Ok(())
1119        }
1120        _ => error::InvalidSqlSnafu {
1121            msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1122        }
1123        .fail(),
1124    }
1125}
1126
1127/// Ensure that all columns used in "PARTITION ON COLUMNS" are defined in create table.
1128fn ensure_partition_columns_defined<'a>(
1129    columns: &'a [Column],
1130    partitions: &'a Partitions,
1131) -> Result<Vec<&'a Column>> {
1132    partitions
1133        .column_list
1134        .iter()
1135        .map(|x| {
1136            let x = ParserContext::canonicalize_identifier(x.clone());
1137            // Normally the columns in "create table" won't be too many,
1138            // a linear search to find the target every time is fine.
1139            columns
1140                .iter()
1141                .find(|c| *c.name().value == x.value)
1142                .context(error::InvalidSqlSnafu {
1143                    msg: format!("Partition column {:?} not defined", x.value),
1144                })
1145        })
1146        .collect::<Result<Vec<&Column>>>()
1147}
1148
1149#[cfg(test)]
1150mod tests {
1151    use std::assert_matches::assert_matches;
1152    use std::collections::HashMap;
1153
1154    use common_catalog::consts::FILE_ENGINE;
1155    use common_error::ext::ErrorExt;
1156    use sqlparser::ast::ColumnOption::NotNull;
1157    use sqlparser::ast::{BinaryOperator, Expr, ObjectName, ObjectNamePart, Value};
1158    use sqlparser::dialect::GenericDialect;
1159    use sqlparser::tokenizer::Tokenizer;
1160
1161    use super::*;
1162    use crate::dialect::GreptimeDbDialect;
1163    use crate::parser::ParseOptions;
1164
1165    #[test]
1166    fn test_parse_create_table_like() {
1167        let sql = "CREATE TABLE t1 LIKE t2";
1168        let stmts =
1169            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1170                .unwrap();
1171
1172        assert_eq!(1, stmts.len());
1173        match &stmts[0] {
1174            Statement::CreateTableLike(c) => {
1175                assert_eq!(c.table_name.to_string(), "t1");
1176                assert_eq!(c.source_name.to_string(), "t2");
1177            }
1178            _ => unreachable!(),
1179        }
1180    }
1181
1182    #[test]
1183    fn test_validate_external_table_options() {
1184        let sql = "CREATE EXTERNAL TABLE city (
1185            host string,
1186            ts timestamp,
1187            cpu float64 default 0,
1188            memory float64,
1189            TIME INDEX (ts),
1190            PRIMARY KEY(ts, host)
1191        ) with(location='/var/data/city.csv',format='csv',foo='bar');";
1192
1193        let result =
1194            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1195        assert!(matches!(
1196            result,
1197            Err(error::Error::InvalidTableOption { .. })
1198        ));
1199    }
1200
1201    #[test]
1202    fn test_parse_create_external_table() {
1203        struct Test<'a> {
1204            sql: &'a str,
1205            expected_table_name: &'a str,
1206            expected_options: HashMap<String, String>,
1207            expected_engine: &'a str,
1208            expected_if_not_exist: bool,
1209        }
1210
1211        let tests = [
1212            Test {
1213                sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
1214                expected_table_name: "city",
1215                expected_options: HashMap::from([
1216                    ("location".to_string(), "/var/data/city.csv".to_string()),
1217                    ("format".to_string(), "csv".to_string()),
1218                ]),
1219                expected_engine: FILE_ENGINE,
1220                expected_if_not_exist: false,
1221            },
1222            Test {
1223                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');",
1224                expected_table_name: "city",
1225                expected_options: HashMap::from([
1226                    ("location".to_string(), "/var/data/city.csv".to_string()),
1227                    ("format".to_string(), "csv".to_string()),
1228                ]),
1229                expected_engine: "foo",
1230                expected_if_not_exist: true,
1231            },
1232            Test {
1233                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv','compaction.type'='bar');",
1234                expected_table_name: "city",
1235                expected_options: HashMap::from([
1236                    ("location".to_string(), "/var/data/city.csv".to_string()),
1237                    ("format".to_string(), "csv".to_string()),
1238                    ("compaction.type".to_string(), "bar".to_string()),
1239                ]),
1240                expected_engine: "foo",
1241                expected_if_not_exist: true,
1242            },
1243        ];
1244
1245        for test in tests {
1246            let stmts = ParserContext::create_with_dialect(
1247                test.sql,
1248                &GreptimeDbDialect {},
1249                ParseOptions::default(),
1250            )
1251            .unwrap();
1252            assert_eq!(1, stmts.len());
1253            match &stmts[0] {
1254                Statement::CreateExternalTable(c) => {
1255                    assert_eq!(c.name.to_string(), test.expected_table_name.to_string());
1256                    assert_eq!(c.options, test.expected_options.into());
1257                    assert_eq!(c.if_not_exists, test.expected_if_not_exist);
1258                    assert_eq!(c.engine, test.expected_engine);
1259                }
1260                _ => unreachable!(),
1261            }
1262        }
1263    }
1264
1265    #[test]
1266    fn test_parse_create_external_table_with_schema() {
1267        let sql = "CREATE EXTERNAL TABLE city (
1268            host string,
1269            ts timestamp,
1270            cpu float32 default 0,
1271            memory float64,
1272            TIME INDEX (ts),
1273            PRIMARY KEY(ts, host),
1274        ) with(location='/var/data/city.csv',format='csv');";
1275
1276        let options = HashMap::from([
1277            ("location".to_string(), "/var/data/city.csv".to_string()),
1278            ("format".to_string(), "csv".to_string()),
1279        ]);
1280
1281        let stmts =
1282            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1283                .unwrap();
1284        assert_eq!(1, stmts.len());
1285        match &stmts[0] {
1286            Statement::CreateExternalTable(c) => {
1287                assert_eq!(c.name.to_string(), "city");
1288                assert_eq!(c.options, options.into());
1289
1290                let columns = &c.columns;
1291                assert_column_def(&columns[0].column_def, "host", "STRING");
1292                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
1293                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
1294                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
1295
1296                let constraints = &c.constraints;
1297                assert_eq!(
1298                    &constraints[0],
1299                    &TableConstraint::TimeIndex {
1300                        column: Ident::new("ts"),
1301                    }
1302                );
1303                assert_eq!(
1304                    &constraints[1],
1305                    &TableConstraint::PrimaryKey {
1306                        columns: vec![Ident::new("ts"), Ident::new("host")]
1307                    }
1308                );
1309            }
1310            _ => unreachable!(),
1311        }
1312    }
1313
1314    #[test]
1315    fn test_parse_create_database() {
1316        let sql = "create database";
1317        let result =
1318            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1319        assert!(
1320            result
1321                .unwrap_err()
1322                .to_string()
1323                .contains("Unexpected token while parsing SQL statement")
1324        );
1325
1326        let sql = "create database prometheus";
1327        let stmts =
1328            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1329                .unwrap();
1330
1331        assert_eq!(1, stmts.len());
1332        match &stmts[0] {
1333            Statement::CreateDatabase(c) => {
1334                assert_eq!(c.name.to_string(), "prometheus");
1335                assert!(!c.if_not_exists);
1336            }
1337            _ => unreachable!(),
1338        }
1339
1340        let sql = "create database if not exists prometheus";
1341        let stmts =
1342            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1343                .unwrap();
1344
1345        assert_eq!(1, stmts.len());
1346        match &stmts[0] {
1347            Statement::CreateDatabase(c) => {
1348                assert_eq!(c.name.to_string(), "prometheus");
1349                assert!(c.if_not_exists);
1350            }
1351            _ => unreachable!(),
1352        }
1353
1354        let sql = "CREATE DATABASE `fOo`";
1355        let result =
1356            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1357        let stmts = result.unwrap();
1358        match &stmts.last().unwrap() {
1359            Statement::CreateDatabase(c) => {
1360                assert_eq!(c.name, vec![Ident::with_quote('`', "fOo")].into());
1361                assert!(!c.if_not_exists);
1362            }
1363            _ => unreachable!(),
1364        }
1365
1366        let sql = "CREATE DATABASE prometheus with (ttl='1h');";
1367        let result =
1368            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1369        let stmts = result.unwrap();
1370        match &stmts[0] {
1371            Statement::CreateDatabase(c) => {
1372                assert_eq!(c.name.to_string(), "prometheus");
1373                assert!(!c.if_not_exists);
1374                assert_eq!(c.options.get("ttl").unwrap(), "1h");
1375            }
1376            _ => unreachable!(),
1377        }
1378    }
1379
1380    #[test]
1381    fn test_parse_create_flow_more_testcases() {
1382        use pretty_assertions::assert_eq;
1383        fn parse_create_flow(sql: &str) -> CreateFlow {
1384            let stmts = ParserContext::create_with_dialect(
1385                sql,
1386                &GreptimeDbDialect {},
1387                ParseOptions::default(),
1388            )
1389            .unwrap();
1390            assert_eq!(1, stmts.len());
1391            match &stmts[0] {
1392                Statement::CreateFlow(c) => c.clone(),
1393                _ => unreachable!(),
1394            }
1395        }
1396        struct CreateFlowWoutQuery {
1397            /// Flow name
1398            pub flow_name: ObjectName,
1399            /// Output (sink) table name
1400            pub sink_table_name: ObjectName,
1401            /// Whether to replace existing task
1402            pub or_replace: bool,
1403            /// Create if not exist
1404            pub if_not_exists: bool,
1405            /// `EXPIRE AFTER`
1406            /// Duration in second as `i64`
1407            pub expire_after: Option<i64>,
1408            /// Comment string
1409            pub comment: Option<String>,
1410        }
1411        let testcases = vec![
1412            (
1413                r"
1414CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1415SINK TO schema_1.table_1
1416EXPIRE AFTER INTERVAL '5 minutes'
1417COMMENT 'test comment'
1418AS
1419SELECT max(c1), min(c2) FROM schema_2.table_2;",
1420                CreateFlowWoutQuery {
1421                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1422                    sink_table_name: ObjectName::from(vec![
1423                        Ident::new("schema_1"),
1424                        Ident::new("table_1"),
1425                    ]),
1426                    or_replace: true,
1427                    if_not_exists: true,
1428                    expire_after: Some(300),
1429                    comment: Some("test comment".to_string()),
1430                },
1431            ),
1432            (
1433                r"
1434CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1435SINK TO schema_1.table_1
1436EXPIRE AFTER INTERVAL '300 s'
1437COMMENT 'test comment'
1438AS
1439SELECT max(c1), min(c2) FROM schema_2.table_2;",
1440                CreateFlowWoutQuery {
1441                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1442                    sink_table_name: ObjectName::from(vec![
1443                        Ident::new("schema_1"),
1444                        Ident::new("table_1"),
1445                    ]),
1446                    or_replace: true,
1447                    if_not_exists: true,
1448                    expire_after: Some(300),
1449                    comment: Some("test comment".to_string()),
1450                },
1451            ),
1452            (
1453                r"
1454CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1455SINK TO schema_1.table_1
1456EXPIRE AFTER '5 minutes'
1457COMMENT 'test comment'
1458AS
1459SELECT max(c1), min(c2) FROM schema_2.table_2;",
1460                CreateFlowWoutQuery {
1461                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1462                    sink_table_name: ObjectName::from(vec![
1463                        Ident::new("schema_1"),
1464                        Ident::new("table_1"),
1465                    ]),
1466                    or_replace: true,
1467                    if_not_exists: true,
1468                    expire_after: Some(300),
1469                    comment: Some("test comment".to_string()),
1470                },
1471            ),
1472            (
1473                r"
1474CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1475SINK TO schema_1.table_1
1476EXPIRE AFTER '300 s'
1477COMMENT 'test comment'
1478AS
1479SELECT max(c1), min(c2) FROM schema_2.table_2;",
1480                CreateFlowWoutQuery {
1481                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1482                    sink_table_name: ObjectName::from(vec![
1483                        Ident::new("schema_1"),
1484                        Ident::new("table_1"),
1485                    ]),
1486                    or_replace: true,
1487                    if_not_exists: true,
1488                    expire_after: Some(300),
1489                    comment: Some("test comment".to_string()),
1490                },
1491            ),
1492            (
1493                r"
1494CREATE FLOW `task_2`
1495SINK TO schema_1.table_1
1496EXPIRE AFTER '2 days 1h 2 min'
1497AS
1498SELECT max(c1), min(c2) FROM schema_2.table_2;",
1499                CreateFlowWoutQuery {
1500                    flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_2")]),
1501                    sink_table_name: ObjectName::from(vec![
1502                        Ident::new("schema_1"),
1503                        Ident::new("table_1"),
1504                    ]),
1505                    or_replace: false,
1506                    if_not_exists: false,
1507                    expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1508                    comment: None,
1509                },
1510            ),
1511            (
1512                r"
1513create flow `task_3`
1514sink to schema_1.table_1
1515expire after '10 minutes'
1516as
1517select max(c1), min(c2) from schema_2.table_2;",
1518                CreateFlowWoutQuery {
1519                    flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_3")]),
1520                    sink_table_name: ObjectName::from(vec![
1521                        Ident::new("schema_1"),
1522                        Ident::new("table_1"),
1523                    ]),
1524                    or_replace: false,
1525                    if_not_exists: false,
1526                    expire_after: Some(600), // 10 minutes in seconds
1527                    comment: None,
1528                },
1529            ),
1530            (
1531                r"
1532create or replace flow if not exists task_4
1533sink to schema_1.table_1
1534expire after interval '2 hours'
1535comment 'lowercase test'
1536as
1537select max(c1), min(c2) from schema_2.table_2;",
1538                CreateFlowWoutQuery {
1539                    flow_name: ObjectName::from(vec![Ident::new("task_4")]),
1540                    sink_table_name: ObjectName::from(vec![
1541                        Ident::new("schema_1"),
1542                        Ident::new("table_1"),
1543                    ]),
1544                    or_replace: true,
1545                    if_not_exists: true,
1546                    expire_after: Some(7200), // 2 hours in seconds
1547                    comment: Some("lowercase test".to_string()),
1548                },
1549            ),
1550        ];
1551
1552        for (sql, expected) in testcases {
1553            let create_task = parse_create_flow(sql);
1554
1555            let expected = CreateFlow {
1556                flow_name: expected.flow_name,
1557                sink_table_name: expected.sink_table_name,
1558                or_replace: expected.or_replace,
1559                if_not_exists: expected.if_not_exists,
1560                expire_after: expected.expire_after,
1561                eval_interval: None,
1562                comment: expected.comment,
1563                // ignore query parse result
1564                query: create_task.query.clone(),
1565            };
1566
1567            assert_eq!(create_task, expected, "input sql is:\n{sql}");
1568            let show_create = create_task.to_string();
1569            let recreated = parse_create_flow(&show_create);
1570            assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1571        }
1572    }
1573
1574    #[test]
1575    fn test_parse_create_flow() {
1576        use pretty_assertions::assert_eq;
1577        fn parse_create_flow(sql: &str) -> CreateFlow {
1578            let stmts = ParserContext::create_with_dialect(
1579                sql,
1580                &GreptimeDbDialect {},
1581                ParseOptions::default(),
1582            )
1583            .unwrap();
1584            assert_eq!(1, stmts.len());
1585            match &stmts[0] {
1586                Statement::CreateFlow(c) => c.clone(),
1587                _ => panic!("{:?}", stmts[0]),
1588            }
1589        }
1590        struct CreateFlowWoutQuery {
1591            /// Flow name
1592            pub flow_name: ObjectName,
1593            /// Output (sink) table name
1594            pub sink_table_name: ObjectName,
1595            /// Whether to replace existing task
1596            pub or_replace: bool,
1597            /// Create if not exist
1598            pub if_not_exists: bool,
1599            /// `EXPIRE AFTER`
1600            /// Duration in second as `i64`
1601            pub expire_after: Option<i64>,
1602            /// Duration for flow evaluation interval
1603            /// Duration in seconds as `i64`
1604            /// If not set, flow will be evaluated based on time window size and other args.
1605            pub eval_interval: Option<i64>,
1606            /// Comment string
1607            pub comment: Option<String>,
1608        }
1609
1610        // create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE AFTER` and `COMMENT`
1611        let testcases = vec![
1612            (
1613                r"
1614CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1615SINK TO schema_1.table_1
1616EXPIRE AFTER INTERVAL '5 minutes'
1617COMMENT 'test comment'
1618AS
1619SELECT max(c1), min(c2) FROM schema_2.table_2;",
1620                CreateFlowWoutQuery {
1621                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1622                    sink_table_name: ObjectName(vec![
1623                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1624                        ObjectNamePart::Identifier(Ident::new("table_1")),
1625                    ]),
1626                    or_replace: true,
1627                    if_not_exists: true,
1628                    expire_after: Some(300),
1629                    eval_interval: None,
1630                    comment: Some("test comment".to_string()),
1631                },
1632            ),
1633            (
1634                r"
1635CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1636SINK TO schema_1.table_1
1637EXPIRE AFTER INTERVAL '300 s'
1638COMMENT 'test comment'
1639AS
1640SELECT max(c1), min(c2) FROM schema_2.table_2;",
1641                CreateFlowWoutQuery {
1642                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1643                    sink_table_name: ObjectName(vec![
1644                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1645                        ObjectNamePart::Identifier(Ident::new("table_1")),
1646                    ]),
1647                    or_replace: true,
1648                    if_not_exists: true,
1649                    expire_after: Some(300),
1650                    eval_interval: None,
1651                    comment: Some("test comment".to_string()),
1652                },
1653            ),
1654            (
1655                r"
1656CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1657SINK TO schema_1.table_1
1658EXPIRE AFTER '5 minutes'
1659EVAL INTERVAL '10 seconds'
1660COMMENT 'test comment'
1661AS
1662SELECT max(c1), min(c2) FROM schema_2.table_2;",
1663                CreateFlowWoutQuery {
1664                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1665                    sink_table_name: ObjectName(vec![
1666                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1667                        ObjectNamePart::Identifier(Ident::new("table_1")),
1668                    ]),
1669                    or_replace: true,
1670                    if_not_exists: true,
1671                    expire_after: Some(300),
1672                    eval_interval: Some(10),
1673                    comment: Some("test comment".to_string()),
1674                },
1675            ),
1676            (
1677                r"
1678CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1679SINK TO schema_1.table_1
1680EXPIRE AFTER '5 minutes'
1681EVAL INTERVAL INTERVAL '10 seconds'
1682COMMENT 'test comment'
1683AS
1684SELECT max(c1), min(c2) FROM schema_2.table_2;",
1685                CreateFlowWoutQuery {
1686                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1687                    sink_table_name: ObjectName(vec![
1688                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1689                        ObjectNamePart::Identifier(Ident::new("table_1")),
1690                    ]),
1691                    or_replace: true,
1692                    if_not_exists: true,
1693                    expire_after: Some(300),
1694                    eval_interval: Some(10),
1695                    comment: Some("test comment".to_string()),
1696                },
1697            ),
1698            (
1699                r"
1700CREATE FLOW `task_2`
1701SINK TO schema_1.table_1
1702EXPIRE AFTER '2 days 1h 2 min'
1703AS
1704SELECT max(c1), min(c2) FROM schema_2.table_2;",
1705                CreateFlowWoutQuery {
1706                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::with_quote(
1707                        '`', "task_2",
1708                    ))]),
1709                    sink_table_name: ObjectName(vec![
1710                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1711                        ObjectNamePart::Identifier(Ident::new("table_1")),
1712                    ]),
1713                    or_replace: false,
1714                    if_not_exists: false,
1715                    expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1716                    eval_interval: None,
1717                    comment: None,
1718                },
1719            ),
1720        ];
1721
1722        for (sql, expected) in testcases {
1723            let create_task = parse_create_flow(sql);
1724
1725            let expected = CreateFlow {
1726                flow_name: expected.flow_name,
1727                sink_table_name: expected.sink_table_name,
1728                or_replace: expected.or_replace,
1729                if_not_exists: expected.if_not_exists,
1730                expire_after: expected.expire_after,
1731                eval_interval: expected.eval_interval,
1732                comment: expected.comment,
1733                // ignore query parse result
1734                query: create_task.query.clone(),
1735            };
1736
1737            assert_eq!(create_task, expected, "input sql is:\n{sql}");
1738            let show_create = create_task.to_string();
1739            let recreated = parse_create_flow(&show_create);
1740            assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1741        }
1742    }
1743
1744    #[test]
1745    fn test_create_flow_no_month() {
1746        let sql = r"
1747CREATE FLOW `task_2`
1748SINK TO schema_1.table_1
1749EXPIRE AFTER '1 month 2 days 1h 2 min'
1750AS
1751SELECT max(c1), min(c2) FROM schema_2.table_2;";
1752        let stmts =
1753            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1754
1755        assert!(
1756            stmts.is_err()
1757                && stmts
1758                    .unwrap_err()
1759                    .to_string()
1760                    .contains("Interval with months is not allowed")
1761        );
1762    }
1763
1764    #[test]
1765    fn test_validate_create() {
1766        let sql = r"
1767CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp TIME INDEX)
1768PARTITION ON COLUMNS(c, a) (
1769    a < 10,
1770    a > 10 AND a < 20,
1771    a > 20 AND c < 100,
1772    a > 20 AND c >= 100
1773)
1774ENGINE=mito";
1775        let result =
1776            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1777        let _ = result.unwrap();
1778
1779        let sql = r"
1780CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
1781PARTITION ON COLUMNS(x) ()
1782ENGINE=mito";
1783        let result =
1784            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1785        assert!(
1786            result
1787                .unwrap_err()
1788                .to_string()
1789                .contains("Partition column \"x\" not defined")
1790        );
1791    }
1792
1793    #[test]
1794    fn test_parse_create_table_with_partitions() {
1795        let sql = r"
1796CREATE TABLE monitor (
1797  host_id    INT,
1798  idc        STRING,
1799  ts         TIMESTAMP,
1800  cpu        DOUBLE DEFAULT 0,
1801  memory     DOUBLE,
1802  TIME INDEX (ts),
1803  PRIMARY KEY (host),
1804)
1805PARTITION ON COLUMNS(idc, host_id) (
1806  idc <= 'hz' AND host_id < 1000,
1807  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
1808  idc > 'sh' AND host_id < 3000,
1809  idc > 'sh' AND host_id >= 3000,
1810)
1811ENGINE=mito";
1812        let result =
1813            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1814                .unwrap();
1815        assert_eq!(result.len(), 1);
1816        match &result[0] {
1817            Statement::CreateTable(c) => {
1818                assert!(c.partitions.is_some());
1819
1820                let partitions = c.partitions.as_ref().unwrap();
1821                let column_list = partitions
1822                    .column_list
1823                    .iter()
1824                    .map(|x| &x.value)
1825                    .collect::<Vec<&String>>();
1826                assert_eq!(column_list, vec!["idc", "host_id"]);
1827
1828                let exprs = &partitions.exprs;
1829
1830                assert_eq!(
1831                    exprs[0],
1832                    Expr::BinaryOp {
1833                        left: Box::new(Expr::BinaryOp {
1834                            left: Box::new(Expr::Identifier("idc".into())),
1835                            op: BinaryOperator::LtEq,
1836                            right: Box::new(Expr::Value(
1837                                Value::SingleQuotedString("hz".to_string()).into()
1838                            ))
1839                        }),
1840                        op: BinaryOperator::And,
1841                        right: Box::new(Expr::BinaryOp {
1842                            left: Box::new(Expr::Identifier("host_id".into())),
1843                            op: BinaryOperator::Lt,
1844                            right: Box::new(Expr::Value(
1845                                Value::Number("1000".to_string(), false).into()
1846                            ))
1847                        })
1848                    }
1849                );
1850                assert_eq!(
1851                    exprs[1],
1852                    Expr::BinaryOp {
1853                        left: Box::new(Expr::BinaryOp {
1854                            left: Box::new(Expr::BinaryOp {
1855                                left: Box::new(Expr::Identifier("idc".into())),
1856                                op: BinaryOperator::Gt,
1857                                right: Box::new(Expr::Value(
1858                                    Value::SingleQuotedString("hz".to_string()).into()
1859                                ))
1860                            }),
1861                            op: BinaryOperator::And,
1862                            right: Box::new(Expr::BinaryOp {
1863                                left: Box::new(Expr::Identifier("idc".into())),
1864                                op: BinaryOperator::LtEq,
1865                                right: Box::new(Expr::Value(
1866                                    Value::SingleQuotedString("sh".to_string()).into()
1867                                ))
1868                            })
1869                        }),
1870                        op: BinaryOperator::And,
1871                        right: Box::new(Expr::BinaryOp {
1872                            left: Box::new(Expr::Identifier("host_id".into())),
1873                            op: BinaryOperator::Lt,
1874                            right: Box::new(Expr::Value(
1875                                Value::Number("2000".to_string(), false).into()
1876                            ))
1877                        })
1878                    }
1879                );
1880                assert_eq!(
1881                    exprs[2],
1882                    Expr::BinaryOp {
1883                        left: Box::new(Expr::BinaryOp {
1884                            left: Box::new(Expr::Identifier("idc".into())),
1885                            op: BinaryOperator::Gt,
1886                            right: Box::new(Expr::Value(
1887                                Value::SingleQuotedString("sh".to_string()).into()
1888                            ))
1889                        }),
1890                        op: BinaryOperator::And,
1891                        right: Box::new(Expr::BinaryOp {
1892                            left: Box::new(Expr::Identifier("host_id".into())),
1893                            op: BinaryOperator::Lt,
1894                            right: Box::new(Expr::Value(
1895                                Value::Number("3000".to_string(), false).into()
1896                            ))
1897                        })
1898                    }
1899                );
1900                assert_eq!(
1901                    exprs[3],
1902                    Expr::BinaryOp {
1903                        left: Box::new(Expr::BinaryOp {
1904                            left: Box::new(Expr::Identifier("idc".into())),
1905                            op: BinaryOperator::Gt,
1906                            right: Box::new(Expr::Value(
1907                                Value::SingleQuotedString("sh".to_string()).into()
1908                            ))
1909                        }),
1910                        op: BinaryOperator::And,
1911                        right: Box::new(Expr::BinaryOp {
1912                            left: Box::new(Expr::Identifier("host_id".into())),
1913                            op: BinaryOperator::GtEq,
1914                            right: Box::new(Expr::Value(
1915                                Value::Number("3000".to_string(), false).into()
1916                            ))
1917                        })
1918                    }
1919                );
1920            }
1921            _ => unreachable!(),
1922        }
1923    }
1924
1925    #[test]
1926    fn test_parse_create_table_with_quoted_partitions() {
1927        let sql = r"
1928CREATE TABLE monitor (
1929  `host_id`    INT,
1930  idc        STRING,
1931  ts         TIMESTAMP,
1932  cpu        DOUBLE DEFAULT 0,
1933  memory     DOUBLE,
1934  TIME INDEX (ts),
1935  PRIMARY KEY (host),
1936)
1937PARTITION ON COLUMNS(IdC, host_id) (
1938  idc <= 'hz' AND host_id < 1000,
1939  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
1940  idc > 'sh' AND host_id < 3000,
1941  idc > 'sh' AND host_id >= 3000,
1942)";
1943        let result =
1944            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1945                .unwrap();
1946        assert_eq!(result.len(), 1);
1947    }
1948
1949    #[test]
1950    fn test_parse_create_table_with_timestamp_index() {
1951        let sql1 = r"
1952CREATE TABLE monitor (
1953  host_id    INT,
1954  idc        STRING,
1955  ts         TIMESTAMP TIME INDEX,
1956  cpu        DOUBLE DEFAULT 0,
1957  memory     DOUBLE,
1958  PRIMARY KEY (host),
1959)
1960ENGINE=mito";
1961        let result1 = ParserContext::create_with_dialect(
1962            sql1,
1963            &GreptimeDbDialect {},
1964            ParseOptions::default(),
1965        )
1966        .unwrap();
1967
1968        if let Statement::CreateTable(c) = &result1[0] {
1969            assert_eq!(c.constraints.len(), 2);
1970            let tc = c.constraints[0].clone();
1971            match tc {
1972                TableConstraint::TimeIndex { column } => {
1973                    assert_eq!(&column.value, "ts");
1974                }
1975                _ => panic!("should be time index constraint"),
1976            };
1977        } else {
1978            panic!("should be create_table statement");
1979        }
1980
1981        // `TIME INDEX` should be in front of `PRIMARY KEY`
1982        // in order to equal the `TIMESTAMP TIME INDEX` constraint options vector
1983        let sql2 = r"
1984CREATE TABLE monitor (
1985  host_id    INT,
1986  idc        STRING,
1987  ts         TIMESTAMP NOT NULL,
1988  cpu        DOUBLE DEFAULT 0,
1989  memory     DOUBLE,
1990  TIME INDEX (ts),
1991  PRIMARY KEY (host),
1992)
1993ENGINE=mito";
1994        let result2 = ParserContext::create_with_dialect(
1995            sql2,
1996            &GreptimeDbDialect {},
1997            ParseOptions::default(),
1998        )
1999        .unwrap();
2000
2001        assert_eq!(result1, result2);
2002
2003        // TIMESTAMP can be NULL which is not equal to above
2004        let sql3 = r"
2005CREATE TABLE monitor (
2006  host_id    INT,
2007  idc        STRING,
2008  ts         TIMESTAMP,
2009  cpu        DOUBLE DEFAULT 0,
2010  memory     DOUBLE,
2011  TIME INDEX (ts),
2012  PRIMARY KEY (host),
2013)
2014ENGINE=mito";
2015
2016        let result3 = ParserContext::create_with_dialect(
2017            sql3,
2018            &GreptimeDbDialect {},
2019            ParseOptions::default(),
2020        )
2021        .unwrap();
2022
2023        assert_ne!(result1, result3);
2024
2025        // BIGINT can't be time index any more
2026        let sql1 = r"
2027CREATE TABLE monitor (
2028  host_id    INT,
2029  idc        STRING,
2030  b          bigint TIME INDEX,
2031  cpu        DOUBLE DEFAULT 0,
2032  memory     DOUBLE,
2033  PRIMARY KEY (host),
2034)
2035ENGINE=mito";
2036        let result1 = ParserContext::create_with_dialect(
2037            sql1,
2038            &GreptimeDbDialect {},
2039            ParseOptions::default(),
2040        );
2041
2042        assert!(
2043            result1
2044                .unwrap_err()
2045                .to_string()
2046                .contains("time index column data type should be timestamp")
2047        );
2048    }
2049
2050    #[test]
2051    fn test_parse_create_table_with_timestamp_index_not_null() {
2052        let sql = r"
2053CREATE TABLE monitor (
2054  host_id    INT,
2055  idc        STRING,
2056  ts         TIMESTAMP TIME INDEX,
2057  cpu        DOUBLE DEFAULT 0,
2058  memory     DOUBLE,
2059  TIME INDEX (ts),
2060  PRIMARY KEY (host),
2061)
2062ENGINE=mito";
2063        let result =
2064            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2065                .unwrap();
2066
2067        assert_eq!(result.len(), 1);
2068        if let Statement::CreateTable(c) = &result[0] {
2069            let ts = c.columns[2].clone();
2070            assert_eq!(ts.name().to_string(), "ts");
2071            assert_eq!(ts.options()[0].option, NotNull);
2072        } else {
2073            panic!("should be create table statement");
2074        }
2075
2076        let sql1 = r"
2077CREATE TABLE monitor (
2078  host_id    INT,
2079  idc        STRING,
2080  ts         TIMESTAMP NOT NULL TIME INDEX,
2081  cpu        DOUBLE DEFAULT 0,
2082  memory     DOUBLE,
2083  TIME INDEX (ts),
2084  PRIMARY KEY (host),
2085)
2086ENGINE=mito";
2087
2088        let result1 = ParserContext::create_with_dialect(
2089            sql1,
2090            &GreptimeDbDialect {},
2091            ParseOptions::default(),
2092        )
2093        .unwrap();
2094        assert_eq!(result, result1);
2095
2096        let sql2 = r"
2097CREATE TABLE monitor (
2098  host_id    INT,
2099  idc        STRING,
2100  ts         TIMESTAMP TIME INDEX NOT NULL,
2101  cpu        DOUBLE DEFAULT 0,
2102  memory     DOUBLE,
2103  TIME INDEX (ts),
2104  PRIMARY KEY (host),
2105)
2106ENGINE=mito";
2107
2108        let result2 = ParserContext::create_with_dialect(
2109            sql2,
2110            &GreptimeDbDialect {},
2111            ParseOptions::default(),
2112        )
2113        .unwrap();
2114        assert_eq!(result, result2);
2115
2116        let sql3 = r"
2117CREATE TABLE monitor (
2118  host_id    INT,
2119  idc        STRING,
2120  ts         TIMESTAMP TIME INDEX NULL NOT,
2121  cpu        DOUBLE DEFAULT 0,
2122  memory     DOUBLE,
2123  TIME INDEX (ts),
2124  PRIMARY KEY (host),
2125)
2126ENGINE=mito";
2127
2128        let result3 = ParserContext::create_with_dialect(
2129            sql3,
2130            &GreptimeDbDialect {},
2131            ParseOptions::default(),
2132        );
2133        assert!(result3.is_err());
2134
2135        let sql4 = r"
2136CREATE TABLE monitor (
2137  host_id    INT,
2138  idc        STRING,
2139  ts         TIMESTAMP TIME INDEX NOT NULL NULL,
2140  cpu        DOUBLE DEFAULT 0,
2141  memory     DOUBLE,
2142  TIME INDEX (ts),
2143  PRIMARY KEY (host),
2144)
2145ENGINE=mito";
2146
2147        let result4 = ParserContext::create_with_dialect(
2148            sql4,
2149            &GreptimeDbDialect {},
2150            ParseOptions::default(),
2151        );
2152        assert!(result4.is_err());
2153
2154        let sql = r"
2155CREATE TABLE monitor (
2156  host_id    INT,
2157  idc        STRING,
2158  ts         TIMESTAMP TIME INDEX DEFAULT CURRENT_TIMESTAMP,
2159  cpu        DOUBLE DEFAULT 0,
2160  memory     DOUBLE,
2161  TIME INDEX (ts),
2162  PRIMARY KEY (host),
2163)
2164ENGINE=mito";
2165
2166        let result =
2167            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2168                .unwrap();
2169
2170        if let Statement::CreateTable(c) = &result[0] {
2171            let tc = c.constraints[0].clone();
2172            match tc {
2173                TableConstraint::TimeIndex { column } => {
2174                    assert_eq!(&column.value, "ts");
2175                }
2176                _ => panic!("should be time index constraint"),
2177            }
2178            let ts = c.columns[2].clone();
2179            assert_eq!(ts.name().to_string(), "ts");
2180            assert!(matches!(ts.options()[0].option, ColumnOption::Default(..)));
2181            assert_eq!(ts.options()[1].option, NotNull);
2182        } else {
2183            unreachable!("should be create table statement");
2184        }
2185    }
2186
2187    #[test]
2188    fn test_parse_partitions_with_error_syntax() {
2189        let sql = r"
2190CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2191PARTITION COLUMNS(c, a) (
2192    a < 10,
2193    a > 10 AND a < 20,
2194    a > 20 AND c < 100,
2195    a > 20 AND c >= 100
2196)
2197ENGINE=mito";
2198        let result =
2199            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2200        assert!(
2201            result
2202                .unwrap_err()
2203                .output_msg()
2204                .contains("sql parser error: Expected: ON, found: COLUMNS")
2205        );
2206    }
2207
2208    #[test]
2209    fn test_parse_partitions_without_rule() {
2210        let sql = r"
2211CREATE TABLE rcx ( a INT, b STRING, c INT, d TIMESTAMP TIME INDEX )
2212PARTITION ON COLUMNS(c, a) ()
2213ENGINE=mito";
2214        ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2215            .unwrap();
2216    }
2217
2218    #[test]
2219    fn test_parse_partitions_unreferenced_column() {
2220        let sql = r"
2221CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2222PARTITION ON COLUMNS(c, a) (
2223    b = 'foo'
2224)
2225ENGINE=mito";
2226        let result =
2227            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2228        assert_eq!(
2229            result.unwrap_err().output_msg(),
2230            "Invalid SQL, error: Column \"b\" in rule expr is not referenced in PARTITION ON"
2231        );
2232    }
2233
2234    #[test]
2235    fn test_parse_partitions_not_binary_expr() {
2236        let sql = r"
2237CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2238PARTITION ON COLUMNS(c, a) (
2239    b
2240)
2241ENGINE=mito";
2242        let result =
2243            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2244        assert_eq!(
2245            result.unwrap_err().output_msg(),
2246            r#"Invalid SQL, error: Partition rule expr Identifier(Ident { value: "b", quote_style: None, span: Span(Location(4,5)..Location(4,6)) }) is not a binary expr"#
2247        );
2248    }
2249
2250    fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {
2251        assert_eq!(column.name.to_string(), name);
2252        assert_eq!(column.data_type.to_string(), data_type);
2253    }
2254
2255    #[test]
2256    pub fn test_parse_create_table() {
2257        let sql = r"create table demo(
2258                             host string,
2259                             ts timestamp,
2260                             cpu float32 default 0,
2261                             memory float64,
2262                             TIME INDEX (ts),
2263                             PRIMARY KEY(ts, host),
2264                             ) engine=mito
2265                             with(ttl='10s');
2266         ";
2267        let result =
2268            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2269                .unwrap();
2270        assert_eq!(1, result.len());
2271        match &result[0] {
2272            Statement::CreateTable(c) => {
2273                assert!(!c.if_not_exists);
2274                assert_eq!("demo", c.name.to_string());
2275                assert_eq!("mito", c.engine);
2276                assert_eq!(4, c.columns.len());
2277                let columns = &c.columns;
2278                assert_column_def(&columns[0].column_def, "host", "STRING");
2279                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
2280                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
2281                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
2282
2283                let constraints = &c.constraints;
2284                assert_eq!(
2285                    &constraints[0],
2286                    &TableConstraint::TimeIndex {
2287                        column: Ident::new("ts"),
2288                    }
2289                );
2290                assert_eq!(
2291                    &constraints[1],
2292                    &TableConstraint::PrimaryKey {
2293                        columns: vec![Ident::new("ts"), Ident::new("host")]
2294                    }
2295                );
2296                // inverted index is merged into column options
2297                assert_eq!(1, c.options.len());
2298                assert_eq!(
2299                    [("ttl", "10s")].into_iter().collect::<HashMap<_, _>>(),
2300                    c.options.to_str_map()
2301                );
2302            }
2303            _ => unreachable!(),
2304        }
2305    }
2306
2307    #[test]
2308    fn test_invalid_index_keys() {
2309        let sql = r"create table demo(
2310                             host string,
2311                             ts int64,
2312                             cpu float64 default 0,
2313                             memory float64,
2314                             TIME INDEX (ts, host),
2315                             PRIMARY KEY(ts, host)) engine=mito;
2316         ";
2317        let result =
2318            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2319        assert!(result.is_err());
2320        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2321    }
2322
2323    #[test]
2324    fn test_duplicated_time_index() {
2325        let sql = r"create table demo(
2326                             host string,
2327                             ts timestamp time index,
2328                             t timestamp time index,
2329                             cpu float64 default 0,
2330                             memory float64,
2331                             TIME INDEX (ts, host),
2332                             PRIMARY KEY(ts, host)) engine=mito;
2333         ";
2334        let result =
2335            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2336        assert!(result.is_err());
2337        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2338
2339        let sql = r"create table demo(
2340                             host string,
2341                             ts timestamp time index,
2342                             cpu float64 default 0,
2343                             t timestamp,
2344                             memory float64,
2345                             TIME INDEX (t),
2346                             PRIMARY KEY(ts, host)) engine=mito;
2347         ";
2348        let result =
2349            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2350        assert!(result.is_err());
2351        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2352    }
2353
2354    #[test]
2355    fn test_invalid_column_name() {
2356        let sql = "create table foo(user string, i timestamp time index)";
2357        let result =
2358            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2359        let err = result.unwrap_err().output_msg();
2360        assert!(err.contains("Cannot use keyword 'user' as column name"));
2361
2362        // If column name is quoted, it's valid even same with keyword.
2363        let sql = r#"
2364            create table foo("user" string, i timestamp time index)
2365        "#;
2366        let result =
2367            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2368        let _ = result.unwrap();
2369    }
2370
2371    #[test]
2372    fn test_incorrect_default_value_issue_3479() {
2373        let sql = r#"CREATE TABLE `ExcePTuRi`(
2374non TIMESTAMP(6) TIME INDEX,
2375`iUSTO` DOUBLE DEFAULT 0.047318541668048164
2376)"#;
2377        let result =
2378            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2379                .unwrap();
2380        assert_eq!(1, result.len());
2381        match &result[0] {
2382            Statement::CreateTable(c) => {
2383                assert_eq!(
2384                    "`iUSTO` DOUBLE DEFAULT 0.047318541668048164",
2385                    c.columns[1].to_string()
2386                );
2387            }
2388            _ => unreachable!(),
2389        }
2390    }
2391
2392    #[test]
2393    fn test_parse_create_view() {
2394        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
2395
2396        let result =
2397            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2398                .unwrap();
2399        match &result[0] {
2400            Statement::CreateView(c) => {
2401                assert_eq!(c.to_string(), sql);
2402                assert!(!c.or_replace);
2403                assert!(!c.if_not_exists);
2404                assert_eq!("test", c.name.to_string());
2405            }
2406            _ => unreachable!(),
2407        }
2408
2409        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS";
2410
2411        let result =
2412            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2413                .unwrap();
2414        match &result[0] {
2415            Statement::CreateView(c) => {
2416                assert_eq!(c.to_string(), sql);
2417                assert!(c.or_replace);
2418                assert!(c.if_not_exists);
2419                assert_eq!("test", c.name.to_string());
2420            }
2421            _ => unreachable!(),
2422        }
2423    }
2424
2425    #[test]
2426    fn test_parse_create_view_invalid_query() {
2427        let sql = "CREATE VIEW test AS DELETE from demo";
2428        let result =
2429            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2430        assert!(result.is_err());
2431        assert_matches!(result, Err(crate::error::Error::Syntax { .. }));
2432    }
2433
2434    #[test]
2435    fn test_parse_create_table_fulltext_options() {
2436        let sql1 = r"
2437CREATE TABLE log (
2438    ts TIMESTAMP TIME INDEX,
2439    msg TEXT FULLTEXT INDEX,
2440)";
2441        let result1 = ParserContext::create_with_dialect(
2442            sql1,
2443            &GreptimeDbDialect {},
2444            ParseOptions::default(),
2445        )
2446        .unwrap();
2447
2448        if let Statement::CreateTable(c) = &result1[0] {
2449            c.columns.iter().for_each(|col| {
2450                if col.name().value == "msg" {
2451                    assert!(
2452                        col.extensions
2453                            .fulltext_index_options
2454                            .as_ref()
2455                            .unwrap()
2456                            .is_empty()
2457                    );
2458                }
2459            });
2460        } else {
2461            panic!("should be create_table statement");
2462        }
2463
2464        let sql2 = r"
2465CREATE TABLE log (
2466    ts TIMESTAMP TIME INDEX,
2467    msg STRING FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false')
2468)";
2469        let result2 = ParserContext::create_with_dialect(
2470            sql2,
2471            &GreptimeDbDialect {},
2472            ParseOptions::default(),
2473        )
2474        .unwrap();
2475
2476        if let Statement::CreateTable(c) = &result2[0] {
2477            c.columns.iter().for_each(|col| {
2478                if col.name().value == "msg" {
2479                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2480                    assert_eq!(options.len(), 2);
2481                    assert_eq!(options.get("analyzer").unwrap(), "English");
2482                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2483                }
2484            });
2485        } else {
2486            panic!("should be create_table statement");
2487        }
2488
2489        let sql3 = r"
2490CREATE TABLE log (
2491    ts TIMESTAMP TIME INDEX,
2492    msg1 TINYTEXT FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false'),
2493    msg2 CHAR(20) FULLTEXT INDEX WITH (analyzer='Chinese', case_sensitive='true')
2494)";
2495        let result3 = ParserContext::create_with_dialect(
2496            sql3,
2497            &GreptimeDbDialect {},
2498            ParseOptions::default(),
2499        )
2500        .unwrap();
2501
2502        if let Statement::CreateTable(c) = &result3[0] {
2503            c.columns.iter().for_each(|col| {
2504                if col.name().value == "msg1" {
2505                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2506                    assert_eq!(options.len(), 2);
2507                    assert_eq!(options.get("analyzer").unwrap(), "English");
2508                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2509                } else if col.name().value == "msg2" {
2510                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2511                    assert_eq!(options.len(), 2);
2512                    assert_eq!(options.get("analyzer").unwrap(), "Chinese");
2513                    assert_eq!(options.get("case_sensitive").unwrap(), "true");
2514                }
2515            });
2516        } else {
2517            panic!("should be create_table statement");
2518        }
2519    }
2520
2521    #[test]
2522    fn test_parse_create_table_fulltext_options_invalid_type() {
2523        let sql = r"
2524CREATE TABLE log (
2525    ts TIMESTAMP TIME INDEX,
2526    msg INT FULLTEXT INDEX,
2527)";
2528        let result =
2529            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2530        assert!(result.is_err());
2531        assert!(
2532            result
2533                .unwrap_err()
2534                .to_string()
2535                .contains("FULLTEXT index only supports string type")
2536        );
2537    }
2538
2539    #[test]
2540    fn test_parse_create_table_fulltext_options_duplicate() {
2541        let sql = r"
2542CREATE TABLE log (
2543    ts TIMESTAMP TIME INDEX,
2544    msg STRING FULLTEXT INDEX WITH (analyzer='English', analyzer='Chinese') FULLTEXT INDEX WITH (case_sensitive='false')
2545)";
2546        let result =
2547            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2548        assert!(result.is_err());
2549        assert!(
2550            result
2551                .unwrap_err()
2552                .to_string()
2553                .contains("duplicated FULLTEXT INDEX option")
2554        );
2555    }
2556
2557    #[test]
2558    fn test_parse_create_table_fulltext_options_invalid_option() {
2559        let sql = r"
2560CREATE TABLE log (
2561    ts TIMESTAMP TIME INDEX,
2562    msg STRING FULLTEXT INDEX WITH (analyzer='English', invalid_option='Chinese')
2563)";
2564        let result =
2565            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2566        assert!(result.is_err());
2567        assert!(
2568            result
2569                .unwrap_err()
2570                .to_string()
2571                .contains("invalid FULLTEXT INDEX option")
2572        );
2573    }
2574
2575    #[test]
2576    fn test_parse_create_table_skip_options() {
2577        let sql = r"
2578CREATE TABLE log (
2579    ts TIMESTAMP TIME INDEX,
2580    msg INT SKIPPING INDEX WITH (granularity='8192', type='bloom'),
2581)";
2582        let result =
2583            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2584                .unwrap();
2585
2586        if let Statement::CreateTable(c) = &result[0] {
2587            c.columns.iter().for_each(|col| {
2588                if col.name().value == "msg" {
2589                    assert!(
2590                        !col.extensions
2591                            .skipping_index_options
2592                            .as_ref()
2593                            .unwrap()
2594                            .is_empty()
2595                    );
2596                }
2597            });
2598        } else {
2599            panic!("should be create_table statement");
2600        }
2601
2602        let sql = r"
2603        CREATE TABLE log (
2604            ts TIMESTAMP TIME INDEX,
2605            msg INT SKIPPING INDEX,
2606        )";
2607        let result =
2608            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2609                .unwrap();
2610
2611        if let Statement::CreateTable(c) = &result[0] {
2612            c.columns.iter().for_each(|col| {
2613                if col.name().value == "msg" {
2614                    assert!(
2615                        col.extensions
2616                            .skipping_index_options
2617                            .as_ref()
2618                            .unwrap()
2619                            .is_empty()
2620                    );
2621                }
2622            });
2623        } else {
2624            panic!("should be create_table statement");
2625        }
2626    }
2627
2628    #[test]
2629    fn test_parse_create_view_with_columns() {
2630        let sql = "CREATE VIEW test () AS SELECT * FROM NUMBERS";
2631        let result =
2632            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2633                .unwrap();
2634
2635        match &result[0] {
2636            Statement::CreateView(c) => {
2637                assert_eq!(c.to_string(), "CREATE VIEW test AS SELECT * FROM NUMBERS");
2638                assert!(!c.or_replace);
2639                assert!(!c.if_not_exists);
2640                assert_eq!("test", c.name.to_string());
2641            }
2642            _ => unreachable!(),
2643        }
2644        assert_eq!(
2645            "CREATE VIEW test AS SELECT * FROM NUMBERS",
2646            result[0].to_string()
2647        );
2648
2649        let sql = "CREATE VIEW test (n1) AS SELECT * FROM NUMBERS";
2650        let result =
2651            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2652                .unwrap();
2653
2654        match &result[0] {
2655            Statement::CreateView(c) => {
2656                assert_eq!(c.to_string(), sql);
2657                assert!(!c.or_replace);
2658                assert!(!c.if_not_exists);
2659                assert_eq!("test", c.name.to_string());
2660            }
2661            _ => unreachable!(),
2662        }
2663        assert_eq!(sql, result[0].to_string());
2664
2665        let sql = "CREATE VIEW test (n1, n2) AS SELECT * FROM NUMBERS";
2666        let result =
2667            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2668                .unwrap();
2669
2670        match &result[0] {
2671            Statement::CreateView(c) => {
2672                assert_eq!(c.to_string(), sql);
2673                assert!(!c.or_replace);
2674                assert!(!c.if_not_exists);
2675                assert_eq!("test", c.name.to_string());
2676            }
2677            _ => unreachable!(),
2678        }
2679        assert_eq!(sql, result[0].to_string());
2680
2681        // Some invalid syntax cases
2682        let sql = "CREATE VIEW test (n1 AS select * from demo";
2683        let result =
2684            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2685        assert!(result.is_err());
2686
2687        let sql = "CREATE VIEW test (n1, AS select * from demo";
2688        let result =
2689            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2690        assert!(result.is_err());
2691
2692        let sql = "CREATE VIEW test n1,n2) AS select * from demo";
2693        let result =
2694            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2695        assert!(result.is_err());
2696
2697        let sql = "CREATE VIEW test (1) AS select * from demo";
2698        let result =
2699            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2700        assert!(result.is_err());
2701
2702        // keyword
2703        let sql = "CREATE VIEW test (n1, select) AS select * from demo";
2704        let result =
2705            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2706        assert!(result.is_err());
2707    }
2708
2709    #[test]
2710    fn test_parse_column_extensions_vector() {
2711        let sql = "VECTOR(128)";
2712        let dialect = GenericDialect {};
2713        let mut tokenizer = Tokenizer::new(&dialect, sql);
2714        let tokens = tokenizer.tokenize().unwrap();
2715        let mut parser = Parser::new(&dialect).with_tokens(tokens);
2716        let name = Ident::new("vec_col");
2717        let data_type =
2718            DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
2719        let mut extensions = ColumnExtensions::default();
2720
2721        let result =
2722            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2723        assert!(result.is_ok());
2724        assert!(extensions.vector_options.is_some());
2725        let vector_options = extensions.vector_options.unwrap();
2726        assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some(&"128".to_string()));
2727    }
2728
2729    #[test]
2730    fn test_parse_column_extensions_vector_invalid() {
2731        let sql = "VECTOR()";
2732        let dialect = GenericDialect {};
2733        let mut tokenizer = Tokenizer::new(&dialect, sql);
2734        let tokens = tokenizer.tokenize().unwrap();
2735        let mut parser = Parser::new(&dialect).with_tokens(tokens);
2736        let name = Ident::new("vec_col");
2737        let data_type = DataType::Custom(vec![Ident::new("VECTOR")].into(), vec![]);
2738        let mut extensions = ColumnExtensions::default();
2739
2740        let result =
2741            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2742        assert!(result.is_err());
2743    }
2744
2745    #[test]
2746    fn test_parse_column_extensions_indices() {
2747        // Test skipping index
2748        {
2749            let sql = "SKIPPING INDEX";
2750            let dialect = GenericDialect {};
2751            let mut tokenizer = Tokenizer::new(&dialect, sql);
2752            let tokens = tokenizer.tokenize().unwrap();
2753            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2754            let name = Ident::new("col");
2755            let data_type = DataType::String(None);
2756            let mut extensions = ColumnExtensions::default();
2757            let result = ParserContext::parse_column_extensions(
2758                &mut parser,
2759                &name,
2760                &data_type,
2761                &mut extensions,
2762            );
2763            assert!(result.is_ok());
2764            assert!(extensions.skipping_index_options.is_some());
2765        }
2766
2767        // Test fulltext index with options
2768        {
2769            let sql = "FULLTEXT INDEX WITH (analyzer = 'English', case_sensitive = 'true')";
2770            let dialect = GenericDialect {};
2771            let mut tokenizer = Tokenizer::new(&dialect, sql);
2772            let tokens = tokenizer.tokenize().unwrap();
2773            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2774            let name = Ident::new("text_col");
2775            let data_type = DataType::String(None);
2776            let mut extensions = ColumnExtensions::default();
2777            let result = ParserContext::parse_column_extensions(
2778                &mut parser,
2779                &name,
2780                &data_type,
2781                &mut extensions,
2782            );
2783            assert!(result.unwrap());
2784            assert!(extensions.fulltext_index_options.is_some());
2785            let fulltext_options = extensions.fulltext_index_options.unwrap();
2786            assert_eq!(
2787                fulltext_options.get("analyzer"),
2788                Some(&"English".to_string())
2789            );
2790            assert_eq!(
2791                fulltext_options.get("case_sensitive"),
2792                Some(&"true".to_string())
2793            );
2794        }
2795
2796        // Test fulltext index with invalid type (should fail)
2797        {
2798            let sql = "FULLTEXT INDEX WITH (analyzer = 'English')";
2799            let dialect = GenericDialect {};
2800            let mut tokenizer = Tokenizer::new(&dialect, sql);
2801            let tokens = tokenizer.tokenize().unwrap();
2802            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2803            let name = Ident::new("num_col");
2804            let data_type = DataType::Int(None); // Non-string type
2805            let mut extensions = ColumnExtensions::default();
2806            let result = ParserContext::parse_column_extensions(
2807                &mut parser,
2808                &name,
2809                &data_type,
2810                &mut extensions,
2811            );
2812            assert!(result.is_err());
2813            assert!(
2814                result
2815                    .unwrap_err()
2816                    .to_string()
2817                    .contains("FULLTEXT index only supports string type")
2818            );
2819        }
2820
2821        // Test fulltext index with invalid option (won't fail, the parser doesn't check the option's content)
2822        {
2823            let sql = "FULLTEXT INDEX WITH (analyzer = 'Invalid', case_sensitive = 'true')";
2824            let dialect = GenericDialect {};
2825            let mut tokenizer = Tokenizer::new(&dialect, sql);
2826            let tokens = tokenizer.tokenize().unwrap();
2827            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2828            let name = Ident::new("text_col");
2829            let data_type = DataType::String(None);
2830            let mut extensions = ColumnExtensions::default();
2831            let result = ParserContext::parse_column_extensions(
2832                &mut parser,
2833                &name,
2834                &data_type,
2835                &mut extensions,
2836            );
2837            assert!(result.unwrap());
2838        }
2839
2840        // Test inverted index
2841        {
2842            let sql = "INVERTED INDEX";
2843            let dialect = GenericDialect {};
2844            let mut tokenizer = Tokenizer::new(&dialect, sql);
2845            let tokens = tokenizer.tokenize().unwrap();
2846            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2847            let name = Ident::new("col");
2848            let data_type = DataType::String(None);
2849            let mut extensions = ColumnExtensions::default();
2850            let result = ParserContext::parse_column_extensions(
2851                &mut parser,
2852                &name,
2853                &data_type,
2854                &mut extensions,
2855            );
2856            assert!(result.is_ok());
2857            assert!(extensions.inverted_index_options.is_some());
2858        }
2859
2860        // Test inverted index with options (should fail)
2861        {
2862            let sql = "INVERTED INDEX WITH (analyzer = 'English')";
2863            let dialect = GenericDialect {};
2864            let mut tokenizer = Tokenizer::new(&dialect, sql);
2865            let tokens = tokenizer.tokenize().unwrap();
2866            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2867            let name = Ident::new("col");
2868            let data_type = DataType::String(None);
2869            let mut extensions = ColumnExtensions::default();
2870            let result = ParserContext::parse_column_extensions(
2871                &mut parser,
2872                &name,
2873                &data_type,
2874                &mut extensions,
2875            );
2876            assert!(result.is_err());
2877            assert!(
2878                result
2879                    .unwrap_err()
2880                    .to_string()
2881                    .contains("INVERTED index doesn't support options")
2882            );
2883        }
2884
2885        // Test multiple indices
2886        {
2887            let sql = "SKIPPING INDEX FULLTEXT INDEX";
2888            let dialect = GenericDialect {};
2889            let mut tokenizer = Tokenizer::new(&dialect, sql);
2890            let tokens = tokenizer.tokenize().unwrap();
2891            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2892            let name = Ident::new("col");
2893            let data_type = DataType::String(None);
2894            let mut extensions = ColumnExtensions::default();
2895            let result = ParserContext::parse_column_extensions(
2896                &mut parser,
2897                &name,
2898                &data_type,
2899                &mut extensions,
2900            );
2901            assert!(result.unwrap());
2902            assert!(extensions.skipping_index_options.is_some());
2903            assert!(extensions.fulltext_index_options.is_some());
2904        }
2905    }
2906
2907    #[test]
2908    fn test_parse_interval_cast() {
2909        let s = "select '10s'::INTERVAL";
2910        let stmts =
2911            ParserContext::create_with_dialect(s, &GreptimeDbDialect {}, ParseOptions::default())
2912                .unwrap();
2913        assert_eq!("SELECT '10 seconds'::INTERVAL", &stmts[0].to_string());
2914    }
2915}