sql/parsers/
create_parser.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod json;
16#[cfg(feature = "enterprise")]
17pub mod trigger;
18
19use std::collections::HashMap;
20
21use arrow_buffer::IntervalMonthDayNano;
22use common_catalog::consts::default_engine;
23use datafusion_common::ScalarValue;
24use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
25use datatypes::data_type::ConcreteDataType;
26use itertools::Itertools;
27use snafu::{OptionExt, ResultExt, ensure};
28use sqlparser::ast::{
29    ColumnOption, ColumnOptionDef, DataType, Expr, KeyOrIndexDisplay, NullsDistinctOption,
30    PrimaryKeyConstraint, UniqueConstraint,
31};
32use sqlparser::dialect::keywords::Keyword;
33use sqlparser::keywords::ALL_KEYWORDS;
34use sqlparser::parser::IsOptional::Mandatory;
35use sqlparser::parser::{Parser, ParserError};
36use sqlparser::tokenizer::{Token, TokenWithSpan, Word};
37use table::requests::validate_database_option;
38
39use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
40use crate::error::{
41    self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidIntervalSnafu,
42    InvalidSqlSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result, SyntaxSnafu,
43    UnexpectedSnafu, UnsupportedSnafu,
44};
45use crate::parser::{FLOW, ParserContext};
46use crate::parsers::tql_parser;
47use crate::parsers::utils::{
48    self, parse_with_options, validate_column_fulltext_create_option,
49    validate_column_skipping_index_create_option, validate_column_vector_index_create_option,
50};
51use crate::statements::create::{
52    Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
53    CreateTableLike, CreateView, Partitions, SqlOrTql, TableConstraint, VECTOR_OPT_DIM,
54};
55use crate::statements::statement::Statement;
56use crate::statements::transform::type_alias::get_data_type_by_alias_name;
57use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
58use crate::util::{OptionValue, location_to_index, parse_option_string};
59
60pub const ENGINE: &str = "ENGINE";
61pub const MAXVALUE: &str = "MAXVALUE";
62pub const SINK: &str = "SINK";
63pub const EXPIRE: &str = "EXPIRE";
64pub const AFTER: &str = "AFTER";
65pub const INVERTED: &str = "INVERTED";
66pub const SKIPPING: &str = "SKIPPING";
67pub const VECTOR: &str = "VECTOR";
68
69pub type RawIntervalExpr = String;
70
71/// Parses create [table] statement
72impl<'a> ParserContext<'a> {
73    pub(crate) fn parse_create(&mut self) -> Result<Statement> {
74        match self.parser.peek_token().token {
75            Token::Word(w) => match w.keyword {
76                Keyword::TABLE => self.parse_create_table(),
77
78                Keyword::SCHEMA | Keyword::DATABASE => self.parse_create_database(),
79
80                Keyword::EXTERNAL => self.parse_create_external_table(),
81
82                Keyword::OR => {
83                    let _ = self.parser.next_token();
84                    self.parser
85                        .expect_keyword(Keyword::REPLACE)
86                        .context(SyntaxSnafu)?;
87                    match self.parser.next_token().token {
88                        Token::Word(w) => match w.keyword {
89                            Keyword::VIEW => self.parse_create_view(true),
90                            Keyword::NoKeyword => {
91                                let uppercase = w.value.to_uppercase();
92                                match uppercase.as_str() {
93                                    FLOW => self.parse_create_flow(true),
94                                    _ => self.unsupported(w.to_string()),
95                                }
96                            }
97                            _ => self.unsupported(w.to_string()),
98                        },
99                        _ => self.unsupported(w.to_string()),
100                    }
101                }
102
103                Keyword::VIEW => {
104                    let _ = self.parser.next_token();
105                    self.parse_create_view(false)
106                }
107
108                #[cfg(feature = "enterprise")]
109                Keyword::TRIGGER => {
110                    let _ = self.parser.next_token();
111                    self.parse_create_trigger()
112                }
113
114                Keyword::NoKeyword => {
115                    let _ = self.parser.next_token();
116                    let uppercase = w.value.to_uppercase();
117                    match uppercase.as_str() {
118                        FLOW => self.parse_create_flow(false),
119                        _ => self.unsupported(w.to_string()),
120                    }
121                }
122                _ => self.unsupported(w.to_string()),
123            },
124            unexpected => self.unsupported(unexpected.to_string()),
125        }
126    }
127
128    /// Parse `CREAVE VIEW` statement.
129    fn parse_create_view(&mut self, or_replace: bool) -> Result<Statement> {
130        let if_not_exists = self.parse_if_not_exist()?;
131        let view_name = self.intern_parse_table_name()?;
132
133        let columns = self.parse_view_columns()?;
134
135        self.parser
136            .expect_keyword(Keyword::AS)
137            .context(SyntaxSnafu)?;
138
139        let query = self.parse_query()?;
140
141        Ok(Statement::CreateView(CreateView {
142            name: view_name,
143            columns,
144            or_replace,
145            query: Box::new(query),
146            if_not_exists,
147        }))
148    }
149
150    fn parse_view_columns(&mut self) -> Result<Vec<Ident>> {
151        let mut columns = vec![];
152        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
153            return Ok(columns);
154        }
155
156        loop {
157            let name = self.parse_column_name().context(SyntaxSnafu)?;
158
159            columns.push(name);
160
161            let comma = self.parser.consume_token(&Token::Comma);
162            if self.parser.consume_token(&Token::RParen) {
163                // allow a trailing comma, even though it's not in standard
164                break;
165            } else if !comma {
166                return self.expected("',' or ')' after column name", self.parser.peek_token());
167            }
168        }
169
170        Ok(columns)
171    }
172
173    fn parse_create_external_table(&mut self) -> Result<Statement> {
174        let _ = self.parser.next_token();
175        self.parser
176            .expect_keyword(Keyword::TABLE)
177            .context(SyntaxSnafu)?;
178        let if_not_exists = self.parse_if_not_exist()?;
179        let table_name = self.intern_parse_table_name()?;
180        let (columns, constraints) = self.parse_columns()?;
181        if !columns.is_empty() {
182            validate_time_index(&columns, &constraints)?;
183        }
184
185        let engine = self.parse_table_engine(common_catalog::consts::FILE_ENGINE)?;
186        let options = self.parse_create_table_options()?;
187        Ok(Statement::CreateExternalTable(CreateExternalTable {
188            name: table_name,
189            columns,
190            constraints,
191            options,
192            if_not_exists,
193            engine,
194        }))
195    }
196
197    fn parse_create_database(&mut self) -> Result<Statement> {
198        let _ = self.parser.next_token();
199        let if_not_exists = self.parse_if_not_exist()?;
200        let database_name = self.parse_object_name().context(error::UnexpectedSnafu {
201            expected: "a database name",
202            actual: self.peek_token_as_string(),
203        })?;
204        let database_name = Self::canonicalize_object_name(database_name)?;
205
206        let options = self
207            .parser
208            .parse_options(Keyword::WITH)
209            .context(SyntaxSnafu)?
210            .into_iter()
211            .map(parse_option_string)
212            .collect::<Result<HashMap<String, OptionValue>>>()?;
213
214        for key in options.keys() {
215            ensure!(
216                validate_database_option(key),
217                InvalidDatabaseOptionSnafu { key: key.clone() }
218            );
219        }
220        if let Some(append_mode) = options.get("append_mode").and_then(|x| x.as_string())
221            && append_mode == "true"
222            && options.contains_key("merge_mode")
223        {
224            return InvalidDatabaseOptionSnafu {
225                key: "merge_mode".to_string(),
226            }
227            .fail();
228        }
229
230        Ok(Statement::CreateDatabase(CreateDatabase {
231            name: database_name,
232            if_not_exists,
233            options: OptionMap::new(options),
234        }))
235    }
236
237    fn parse_create_table(&mut self) -> Result<Statement> {
238        let _ = self.parser.next_token();
239
240        let if_not_exists = self.parse_if_not_exist()?;
241
242        let table_name = self.intern_parse_table_name()?;
243
244        if self.parser.parse_keyword(Keyword::LIKE) {
245            let source_name = self.intern_parse_table_name()?;
246
247            return Ok(Statement::CreateTableLike(CreateTableLike {
248                table_name,
249                source_name,
250            }));
251        }
252
253        let (columns, constraints) = self.parse_columns()?;
254        validate_time_index(&columns, &constraints)?;
255
256        let partitions = self.parse_partitions()?;
257        if let Some(partitions) = &partitions {
258            validate_partitions(&columns, partitions)?;
259        }
260
261        let engine = self.parse_table_engine(default_engine())?;
262        let options = self.parse_create_table_options()?;
263        let create_table = CreateTable {
264            if_not_exists,
265            name: table_name,
266            columns,
267            engine,
268            constraints,
269            options,
270            table_id: 0, // table id is assigned by catalog manager
271            partitions,
272        };
273
274        Ok(Statement::CreateTable(create_table))
275    }
276
277    /// "CREATE FLOW" clause
278    fn parse_create_flow(&mut self, or_replace: bool) -> Result<Statement> {
279        let if_not_exists = self.parse_if_not_exist()?;
280
281        let flow_name = self.intern_parse_table_name()?;
282
283        // make `SINK` case in-sensitive
284        if let Token::Word(word) = self.parser.peek_token().token
285            && word.value.eq_ignore_ascii_case(SINK)
286        {
287            self.parser.next_token();
288        } else {
289            Err(ParserError::ParserError(
290                "Expect `SINK` keyword".to_string(),
291            ))
292            .context(SyntaxSnafu)?
293        }
294        self.parser
295            .expect_keyword(Keyword::TO)
296            .context(SyntaxSnafu)?;
297
298        let output_table_name = self.intern_parse_table_name()?;
299
300        let expire_after = if let Token::Word(w1) = &self.parser.peek_token().token
301            && w1.value.eq_ignore_ascii_case(EXPIRE)
302        {
303            self.parser.next_token();
304            if let Token::Word(w2) = &self.parser.peek_token().token
305                && w2.value.eq_ignore_ascii_case(AFTER)
306            {
307                self.parser.next_token();
308                Some(self.parse_interval_no_month("EXPIRE AFTER")?)
309            } else {
310                None
311            }
312        } else {
313            None
314        };
315
316        let eval_interval = if self
317            .parser
318            .consume_tokens(&[Token::make_keyword("EVAL"), Token::make_keyword("INTERVAL")])
319        {
320            Some(self.parse_interval_no_month("EVAL INTERVAL")?)
321        } else {
322            None
323        };
324
325        let comment = if self.parser.parse_keyword(Keyword::COMMENT) {
326            match self.parser.next_token() {
327                TokenWithSpan {
328                    token: Token::SingleQuotedString(value, ..),
329                    ..
330                } => Some(value),
331                unexpected => {
332                    return self
333                        .parser
334                        .expected("string", unexpected)
335                        .context(SyntaxSnafu);
336                }
337            }
338        } else {
339            None
340        };
341
342        self.parser
343            .expect_keyword(Keyword::AS)
344            .context(SyntaxSnafu)?;
345
346        let query = Box::new(self.parse_sql_or_tql(true)?);
347
348        Ok(Statement::CreateFlow(CreateFlow {
349            flow_name,
350            sink_table_name: output_table_name,
351            or_replace,
352            if_not_exists,
353            expire_after,
354            eval_interval,
355            comment,
356            query,
357        }))
358    }
359
360    fn parse_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
361        let start_loc = self.parser.peek_token().span.start;
362        let start_index = location_to_index(self.sql, &start_loc);
363
364        // only accept sql or tql
365        let query = match self.parser.peek_token().token {
366            Token::Word(w) => match w.keyword {
367                Keyword::SELECT => self.parse_query(),
368                Keyword::NoKeyword
369                    if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
370                {
371                    self.parse_tql(require_now_expr)
372                }
373
374                _ => self.unsupported(self.peek_token_as_string()),
375            },
376            _ => self.unsupported(self.peek_token_as_string()),
377        }?;
378
379        let end_token = self.parser.peek_token();
380
381        let raw_query = if end_token == Token::EOF {
382            &self.sql[start_index..]
383        } else {
384            let end_loc = end_token.span.end;
385            let end_index = location_to_index(self.sql, &end_loc);
386            &self.sql[start_index..end_index.min(self.sql.len())]
387        };
388        let raw_query = raw_query.trim_end_matches(";");
389        let query = SqlOrTql::try_from_statement(query, raw_query)?;
390        Ok(query)
391    }
392
393    /// Parse the interval expr to duration in seconds.
394    fn parse_interval_no_month(&mut self, context: &str) -> Result<i64> {
395        let interval = self.parse_interval_month_day_nano()?.0;
396        if interval.months != 0 {
397            return InvalidIntervalSnafu {
398                reason: format!("Interval with months is not allowed in {context}"),
399            }
400            .fail();
401        }
402        Ok(
403            interval.nanoseconds / 1_000_000_000
404                + interval.days as i64 * 60 * 60 * 24
405                + interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, // 1 month=365.25/12=30.44 days
406                                                                       // this is to keep the same as https://docs.rs/humantime/latest/humantime/fn.parse_duration.html
407                                                                       // which we use in database to parse i.e. ttl interval and many other intervals
408        )
409    }
410
411    /// Parse interval expr to [`IntervalMonthDayNano`].
412    fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> {
413        let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
414        let raw_interval_expr = interval_expr.to_string();
415        let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone(), false)?
416            .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
417            .ok()
418            .with_context(|| InvalidIntervalSnafu {
419                reason: format!("cannot cast {} to interval type", interval_expr),
420            })?;
421        if let ScalarValue::IntervalMonthDayNano(Some(interval)) = interval {
422            Ok((interval, raw_interval_expr))
423        } else {
424            unreachable!()
425        }
426    }
427
428    fn parse_if_not_exist(&mut self) -> Result<bool> {
429        match self.parser.peek_token().token {
430            Token::Word(w) if Keyword::IF != w.keyword => return Ok(false),
431            _ => {}
432        }
433
434        if self.parser.parse_keywords(&[Keyword::IF, Keyword::NOT]) {
435            return self
436                .parser
437                .expect_keyword(Keyword::EXISTS)
438                .map(|_| true)
439                .context(UnexpectedSnafu {
440                    expected: "EXISTS",
441                    actual: self.peek_token_as_string(),
442                });
443        }
444
445        if self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]) {
446            return UnsupportedSnafu { keyword: "EXISTS" }.fail();
447        }
448
449        Ok(false)
450    }
451
452    fn parse_create_table_options(&mut self) -> Result<OptionMap> {
453        parse_with_options(&mut self.parser)
454    }
455
456    /// "PARTITION ON COLUMNS (...)" clause
457    fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
458        if !self.parser.parse_keyword(Keyword::PARTITION) {
459            return Ok(None);
460        }
461        self.parser
462            .expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
463            .context(error::UnexpectedSnafu {
464                expected: "ON, COLUMNS",
465                actual: self.peek_token_as_string(),
466            })?;
467
468        let raw_column_list = self
469            .parser
470            .parse_parenthesized_column_list(Mandatory, false)
471            .context(error::SyntaxSnafu)?;
472        let column_list = raw_column_list
473            .into_iter()
474            .map(Self::canonicalize_identifier)
475            .collect();
476
477        let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
478
479        Ok(Some(Partitions { column_list, exprs }))
480    }
481
482    fn parse_partition_entry(&mut self) -> Result<Expr> {
483        self.parser.parse_expr().context(error::SyntaxSnafu)
484    }
485
486    /// Parse a comma-separated list wrapped by "()", and of which all items accepted by `F`
487    fn parse_comma_separated<T, F>(&mut self, mut f: F) -> Result<Vec<T>>
488    where
489        F: FnMut(&mut ParserContext<'a>) -> Result<T>,
490    {
491        self.parser
492            .expect_token(&Token::LParen)
493            .context(error::UnexpectedSnafu {
494                expected: "(",
495                actual: self.peek_token_as_string(),
496            })?;
497
498        let mut values = vec![];
499        while self.parser.peek_token() != Token::RParen {
500            values.push(f(self)?);
501            if !self.parser.consume_token(&Token::Comma) {
502                break;
503            }
504        }
505
506        self.parser
507            .expect_token(&Token::RParen)
508            .context(error::UnexpectedSnafu {
509                expected: ")",
510                actual: self.peek_token_as_string(),
511            })?;
512
513        Ok(values)
514    }
515
516    /// Parse the columns and constraints.
517    fn parse_columns(&mut self) -> Result<(Vec<Column>, Vec<TableConstraint>)> {
518        let mut columns = vec![];
519        let mut constraints = vec![];
520        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
521            return Ok((columns, constraints));
522        }
523
524        loop {
525            if let Some(constraint) = self.parse_optional_table_constraint()? {
526                constraints.push(constraint);
527            } else if let Token::Word(_) = self.parser.peek_token().token {
528                self.parse_column(&mut columns, &mut constraints)?;
529            } else {
530                return self.expected(
531                    "column name or constraint definition",
532                    self.parser.peek_token(),
533                );
534            }
535            let comma = self.parser.consume_token(&Token::Comma);
536            if self.parser.consume_token(&Token::RParen) {
537                // allow a trailing comma, even though it's not in standard
538                break;
539            } else if !comma {
540                return self.expected(
541                    "',' or ')' after column definition",
542                    self.parser.peek_token(),
543                );
544            }
545        }
546
547        Ok((columns, constraints))
548    }
549
550    fn parse_column(
551        &mut self,
552        columns: &mut Vec<Column>,
553        constraints: &mut Vec<TableConstraint>,
554    ) -> Result<()> {
555        let mut column = self.parse_column_def()?;
556
557        let mut time_index_opt_idx = None;
558        for (index, opt) in column.options().iter().enumerate() {
559            if let ColumnOption::DialectSpecific(tokens) = &opt.option
560                && matches!(
561                    &tokens[..],
562                    [
563                        Token::Word(Word {
564                            keyword: Keyword::TIME,
565                            ..
566                        }),
567                        Token::Word(Word {
568                            keyword: Keyword::INDEX,
569                            ..
570                        })
571                    ]
572                )
573            {
574                ensure!(
575                    time_index_opt_idx.is_none(),
576                    InvalidColumnOptionSnafu {
577                        name: column.name().to_string(),
578                        msg: "duplicated time index",
579                    }
580                );
581                time_index_opt_idx = Some(index);
582
583                let constraint = TableConstraint::TimeIndex {
584                    column: Ident::new(column.name().value.clone()),
585                };
586                constraints.push(constraint);
587            }
588        }
589
590        if let Some(index) = time_index_opt_idx {
591            ensure!(
592                !column.options().contains(&ColumnOptionDef {
593                    option: ColumnOption::Null,
594                    name: None,
595                }),
596                InvalidColumnOptionSnafu {
597                    name: column.name().to_string(),
598                    msg: "time index column can't be null",
599                }
600            );
601
602            // The timestamp type may be an alias type, we have to retrieve the actual type.
603            let data_type = get_unalias_type(column.data_type());
604            ensure!(
605                matches!(data_type, DataType::Timestamp(_, _)),
606                InvalidColumnOptionSnafu {
607                    name: column.name().to_string(),
608                    msg: "time index column data type should be timestamp",
609                }
610            );
611
612            let not_null_opt = ColumnOptionDef {
613                option: ColumnOption::NotNull,
614                name: None,
615            };
616
617            if !column.options().contains(&not_null_opt) {
618                column.mut_options().push(not_null_opt);
619            }
620
621            let _ = column.mut_options().remove(index);
622        }
623
624        columns.push(column);
625
626        Ok(())
627    }
628
629    /// Parse the column name and check if it's valid.
630    fn parse_column_name(&mut self) -> std::result::Result<Ident, ParserError> {
631        let name = self.parser.parse_identifier()?;
632        if name.quote_style.is_none() &&
633        // "ALL_KEYWORDS" are sorted.
634            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()
635        {
636            return Err(ParserError::ParserError(format!(
637                "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
638                &name.value
639            )));
640        }
641
642        Ok(name)
643    }
644
645    pub fn parse_column_def(&mut self) -> Result<Column> {
646        let name = self.parse_column_name().context(SyntaxSnafu)?;
647        let parser = &mut self.parser;
648
649        ensure!(
650            !(name.quote_style.is_none() &&
651            // "ALL_KEYWORDS" are sorted.
652            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()),
653            InvalidSqlSnafu {
654                msg: format!(
655                    "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
656                    &name.value
657                ),
658            }
659        );
660
661        let mut extensions = ColumnExtensions::default();
662
663        let data_type = parser.parse_data_type().context(SyntaxSnafu)?;
664        // Must immediately parse the JSON datatype format because it is closely after the "JSON"
665        // datatype, like this: "JSON(format = ...)".
666        if matches!(data_type, DataType::JSON) {
667            extensions.json_datatype_options = json::parse_json_datatype_options(parser)?;
668        }
669
670        let mut options = vec![];
671        loop {
672            if parser.parse_keyword(Keyword::CONSTRAINT) {
673                let name = Some(parser.parse_identifier().context(SyntaxSnafu)?);
674                if let Some(option) = Self::parse_optional_column_option(parser)? {
675                    options.push(ColumnOptionDef { name, option });
676                } else {
677                    return parser
678                        .expected(
679                            "constraint details after CONSTRAINT <name>",
680                            parser.peek_token(),
681                        )
682                        .context(SyntaxSnafu);
683                }
684            } else if let Some(option) = Self::parse_optional_column_option(parser)? {
685                options.push(ColumnOptionDef { name: None, option });
686            } else if !Self::parse_column_extensions(parser, &name, &data_type, &mut extensions)? {
687                break;
688            };
689        }
690
691        Ok(Column {
692            column_def: ColumnDef {
693                name: Self::canonicalize_identifier(name),
694                data_type,
695                options,
696            },
697            extensions,
698        })
699    }
700
701    fn parse_optional_column_option(parser: &mut Parser<'_>) -> Result<Option<ColumnOption>> {
702        if parser.parse_keywords(&[Keyword::CHARACTER, Keyword::SET]) {
703            Ok(Some(ColumnOption::CharacterSet(
704                parser.parse_object_name(false).context(SyntaxSnafu)?,
705            )))
706        } else if parser.parse_keywords(&[Keyword::NOT, Keyword::NULL]) {
707            Ok(Some(ColumnOption::NotNull))
708        } else if parser.parse_keywords(&[Keyword::COMMENT]) {
709            match parser.next_token() {
710                TokenWithSpan {
711                    token: Token::SingleQuotedString(value, ..),
712                    ..
713                } => Ok(Some(ColumnOption::Comment(value))),
714                unexpected => parser.expected("string", unexpected).context(SyntaxSnafu),
715            }
716        } else if parser.parse_keyword(Keyword::NULL) {
717            Ok(Some(ColumnOption::Null))
718        } else if parser.parse_keyword(Keyword::DEFAULT) {
719            Ok(Some(ColumnOption::Default(
720                parser.parse_expr().context(SyntaxSnafu)?,
721            )))
722        } else if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
723            Ok(Some(ColumnOption::PrimaryKey(PrimaryKeyConstraint {
724                name: None,
725                index_name: None,
726                index_type: None,
727                columns: vec![],
728                index_options: vec![],
729                characteristics: None,
730            })))
731        } else if parser.parse_keyword(Keyword::UNIQUE) {
732            Ok(Some(ColumnOption::Unique(UniqueConstraint {
733                name: None,
734                index_name: None,
735                index_type_display: KeyOrIndexDisplay::None,
736                index_type: None,
737                columns: vec![],
738                index_options: vec![],
739                characteristics: None,
740                nulls_distinct: NullsDistinctOption::None,
741            })))
742        } else if parser.parse_keywords(&[Keyword::TIME, Keyword::INDEX]) {
743            // Use a DialectSpecific option for time index
744            Ok(Some(ColumnOption::DialectSpecific(vec![
745                Token::Word(Word {
746                    value: "TIME".to_string(),
747                    quote_style: None,
748                    keyword: Keyword::TIME,
749                }),
750                Token::Word(Word {
751                    value: "INDEX".to_string(),
752                    quote_style: None,
753                    keyword: Keyword::INDEX,
754                }),
755            ])))
756        } else {
757            Ok(None)
758        }
759    }
760
761    /// Parse a column option extensions.
762    ///
763    /// This function will handle:
764    /// - Vector type
765    /// - Indexes
766    fn parse_column_extensions(
767        parser: &mut Parser<'_>,
768        column_name: &Ident,
769        column_type: &DataType,
770        column_extensions: &mut ColumnExtensions,
771    ) -> Result<bool> {
772        if let DataType::Custom(name, tokens) = column_type
773            && name.0.len() == 1
774            && &name.0[0].to_string_unquoted().to_uppercase() == "VECTOR"
775        {
776            ensure!(
777                tokens.len() == 1,
778                InvalidColumnOptionSnafu {
779                    name: column_name.to_string(),
780                    msg: "VECTOR type should have dimension",
781                }
782            );
783
784            let dimension =
785                tokens[0]
786                    .parse::<u32>()
787                    .ok()
788                    .with_context(|| InvalidColumnOptionSnafu {
789                        name: column_name.to_string(),
790                        msg: "dimension should be a positive integer",
791                    })?;
792
793            let options = OptionMap::from([(VECTOR_OPT_DIM.to_string(), dimension.to_string())]);
794            column_extensions.vector_options = Some(options);
795        }
796
797        // parse index options in column definition
798        let mut is_index_declared = false;
799
800        // skipping index
801        if let Token::Word(word) = parser.peek_token().token
802            && word.value.eq_ignore_ascii_case(SKIPPING)
803        {
804            parser.next_token();
805            // Consume `INDEX` keyword
806            ensure!(
807                parser.parse_keyword(Keyword::INDEX),
808                InvalidColumnOptionSnafu {
809                    name: column_name.to_string(),
810                    msg: "expect INDEX after SKIPPING keyword",
811                }
812            );
813            ensure!(
814                column_extensions.skipping_index_options.is_none(),
815                InvalidColumnOptionSnafu {
816                    name: column_name.to_string(),
817                    msg: "duplicated SKIPPING index option",
818                }
819            );
820
821            let options = parser
822                .parse_options(Keyword::WITH)
823                .context(error::SyntaxSnafu)?
824                .into_iter()
825                .map(parse_option_string)
826                .collect::<Result<Vec<_>>>()?;
827
828            for (key, _) in options.iter() {
829                ensure!(
830                    validate_column_skipping_index_create_option(key),
831                    InvalidColumnOptionSnafu {
832                        name: column_name.to_string(),
833                        msg: format!("invalid SKIPPING INDEX option: {key}"),
834                    }
835                );
836            }
837
838            let options = OptionMap::new(options);
839            column_extensions.skipping_index_options = Some(options);
840            is_index_declared |= true;
841        }
842
843        // fulltext index
844        if parser.parse_keyword(Keyword::FULLTEXT) {
845            // Consume `INDEX` keyword
846            ensure!(
847                parser.parse_keyword(Keyword::INDEX),
848                InvalidColumnOptionSnafu {
849                    name: column_name.to_string(),
850                    msg: "expect INDEX after FULLTEXT keyword",
851                }
852            );
853
854            ensure!(
855                column_extensions.fulltext_index_options.is_none(),
856                InvalidColumnOptionSnafu {
857                    name: column_name.to_string(),
858                    msg: "duplicated FULLTEXT INDEX option",
859                }
860            );
861
862            let column_type = get_unalias_type(column_type);
863            let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
864            ensure!(
865                data_type == ConcreteDataType::string_datatype(),
866                InvalidColumnOptionSnafu {
867                    name: column_name.to_string(),
868                    msg: "FULLTEXT index only supports string type",
869                }
870            );
871
872            let options = parser
873                .parse_options(Keyword::WITH)
874                .context(error::SyntaxSnafu)?
875                .into_iter()
876                .map(parse_option_string)
877                .collect::<Result<Vec<_>>>()?;
878
879            for (key, _) in options.iter() {
880                ensure!(
881                    validate_column_fulltext_create_option(key),
882                    InvalidColumnOptionSnafu {
883                        name: column_name.to_string(),
884                        msg: format!("invalid FULLTEXT INDEX option: {key}"),
885                    }
886                );
887            }
888
889            let options = OptionMap::new(options);
890            column_extensions.fulltext_index_options = Some(options);
891            is_index_declared |= true;
892        }
893
894        // inverted index
895        if let Token::Word(word) = parser.peek_token().token
896            && word.value.eq_ignore_ascii_case(INVERTED)
897        {
898            parser.next_token();
899            // Consume `INDEX` keyword
900            ensure!(
901                parser.parse_keyword(Keyword::INDEX),
902                InvalidColumnOptionSnafu {
903                    name: column_name.to_string(),
904                    msg: "expect INDEX after INVERTED keyword",
905                }
906            );
907
908            ensure!(
909                column_extensions.inverted_index_options.is_none(),
910                InvalidColumnOptionSnafu {
911                    name: column_name.to_string(),
912                    msg: "duplicated INVERTED index option",
913                }
914            );
915
916            // inverted index doesn't have options, skipping `WITH`
917            // try cache `WITH` and throw error
918            let with_token = parser.peek_token();
919            ensure!(
920                with_token.token
921                    != Token::Word(Word {
922                        value: "WITH".to_string(),
923                        keyword: Keyword::WITH,
924                        quote_style: None,
925                    }),
926                InvalidColumnOptionSnafu {
927                    name: column_name.to_string(),
928                    msg: "INVERTED index doesn't support options",
929                }
930            );
931
932            column_extensions.inverted_index_options = Some(OptionMap::default());
933            is_index_declared |= true;
934        }
935
936        // vector index
937        if let Token::Word(word) = parser.peek_token().token
938            && word.value.eq_ignore_ascii_case(VECTOR)
939        {
940            parser.next_token();
941            // Consume `INDEX` keyword
942            ensure!(
943                parser.parse_keyword(Keyword::INDEX),
944                InvalidColumnOptionSnafu {
945                    name: column_name.to_string(),
946                    msg: "expect INDEX after VECTOR keyword",
947                }
948            );
949
950            ensure!(
951                column_extensions.vector_index_options.is_none(),
952                InvalidColumnOptionSnafu {
953                    name: column_name.to_string(),
954                    msg: "duplicated VECTOR INDEX option",
955                }
956            );
957
958            // Check that column is a vector type
959            let column_type = get_unalias_type(column_type);
960            let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
961            ensure!(
962                matches!(data_type, ConcreteDataType::Vector(_)),
963                InvalidColumnOptionSnafu {
964                    name: column_name.to_string(),
965                    msg: "VECTOR INDEX only supports Vector type columns",
966                }
967            );
968
969            let options = parser
970                .parse_options(Keyword::WITH)
971                .context(error::SyntaxSnafu)?
972                .into_iter()
973                .map(parse_option_string)
974                .collect::<Result<Vec<_>>>()?;
975
976            for (key, _) in options.iter() {
977                ensure!(
978                    validate_column_vector_index_create_option(key),
979                    InvalidColumnOptionSnafu {
980                        name: column_name.to_string(),
981                        msg: format!("invalid VECTOR INDEX option: {key}"),
982                    }
983                );
984            }
985
986            let options = OptionMap::new(options);
987            column_extensions.vector_index_options = Some(options);
988            is_index_declared |= true;
989        }
990
991        Ok(is_index_declared)
992    }
993
994    fn parse_optional_table_constraint(&mut self) -> Result<Option<TableConstraint>> {
995        match self.parser.next_token() {
996            TokenWithSpan {
997                token: Token::Word(w),
998                ..
999            } if w.keyword == Keyword::PRIMARY => {
1000                self.parser
1001                    .expect_keyword(Keyword::KEY)
1002                    .context(error::UnexpectedSnafu {
1003                        expected: "KEY",
1004                        actual: self.peek_token_as_string(),
1005                    })?;
1006                let raw_columns = self
1007                    .parser
1008                    .parse_parenthesized_column_list(Mandatory, false)
1009                    .context(error::SyntaxSnafu)?;
1010                let columns = raw_columns
1011                    .into_iter()
1012                    .map(Self::canonicalize_identifier)
1013                    .collect();
1014                Ok(Some(TableConstraint::PrimaryKey { columns }))
1015            }
1016            TokenWithSpan {
1017                token: Token::Word(w),
1018                ..
1019            } if w.keyword == Keyword::TIME => {
1020                self.parser
1021                    .expect_keyword(Keyword::INDEX)
1022                    .context(error::UnexpectedSnafu {
1023                        expected: "INDEX",
1024                        actual: self.peek_token_as_string(),
1025                    })?;
1026
1027                let raw_columns = self
1028                    .parser
1029                    .parse_parenthesized_column_list(Mandatory, false)
1030                    .context(error::SyntaxSnafu)?;
1031                let mut columns = raw_columns
1032                    .into_iter()
1033                    .map(Self::canonicalize_identifier)
1034                    .collect::<Vec<_>>();
1035
1036                ensure!(
1037                    columns.len() == 1,
1038                    InvalidTimeIndexSnafu {
1039                        msg: "it should contain only one column in time index",
1040                    }
1041                );
1042
1043                Ok(Some(TableConstraint::TimeIndex {
1044                    column: columns.pop().unwrap(),
1045                }))
1046            }
1047            _ => {
1048                self.parser.prev_token();
1049                Ok(None)
1050            }
1051        }
1052    }
1053
1054    /// Parses the set of valid formats
1055    fn parse_table_engine(&mut self, default: &str) -> Result<String> {
1056        if !self.consume_token(ENGINE) {
1057            return Ok(default.to_string());
1058        }
1059
1060        self.parser
1061            .expect_token(&Token::Eq)
1062            .context(error::UnexpectedSnafu {
1063                expected: "=",
1064                actual: self.peek_token_as_string(),
1065            })?;
1066
1067        let token = self.parser.next_token();
1068        if let Token::Word(w) = token.token {
1069            Ok(w.value)
1070        } else {
1071            self.expected("'Engine' is missing", token)
1072        }
1073    }
1074}
1075
1076fn validate_time_index(columns: &[Column], constraints: &[TableConstraint]) -> Result<()> {
1077    let time_index_constraints: Vec<_> = constraints
1078        .iter()
1079        .filter_map(|c| match c {
1080            TableConstraint::TimeIndex { column } => Some(column),
1081            _ => None,
1082        })
1083        .unique()
1084        .collect();
1085
1086    ensure!(!time_index_constraints.is_empty(), MissingTimeIndexSnafu);
1087    ensure!(
1088        time_index_constraints.len() == 1,
1089        InvalidTimeIndexSnafu {
1090            msg: format!(
1091                "expected only one time index constraint but actual {}",
1092                time_index_constraints.len()
1093            ),
1094        }
1095    );
1096
1097    // It's safe to use time_index_constraints[0][0],
1098    // we already check the bound above.
1099    let time_index_column_ident = &time_index_constraints[0];
1100    let time_index_column = columns
1101        .iter()
1102        .find(|c| c.name().value == *time_index_column_ident.value)
1103        .with_context(|| InvalidTimeIndexSnafu {
1104            msg: format!(
1105                "time index column {} not found in columns",
1106                time_index_column_ident
1107            ),
1108        })?;
1109
1110    let time_index_data_type = get_unalias_type(time_index_column.data_type());
1111    ensure!(
1112        matches!(time_index_data_type, DataType::Timestamp(_, _)),
1113        InvalidColumnOptionSnafu {
1114            name: time_index_column.name().to_string(),
1115            msg: "time index column data type should be timestamp",
1116        }
1117    );
1118
1119    Ok(())
1120}
1121
1122fn get_unalias_type(data_type: &DataType) -> DataType {
1123    match data_type {
1124        DataType::Custom(name, tokens) if name.0.len() == 1 && tokens.is_empty() => {
1125            if let Some(real_type) =
1126                get_data_type_by_alias_name(name.0[0].to_string_unquoted().as_str())
1127            {
1128                real_type
1129            } else {
1130                data_type.clone()
1131            }
1132        }
1133        _ => data_type.clone(),
1134    }
1135}
1136
1137fn validate_partitions(columns: &[Column], partitions: &Partitions) -> Result<()> {
1138    let partition_columns = ensure_partition_columns_defined(columns, partitions)?;
1139
1140    ensure_exprs_are_binary(&partitions.exprs, &partition_columns)?;
1141
1142    Ok(())
1143}
1144
1145/// Ensure all exprs are binary expr and all the columns are defined in the column list.
1146fn ensure_exprs_are_binary(exprs: &[Expr], columns: &[&Column]) -> Result<()> {
1147    for expr in exprs {
1148        // The first level must be binary expr
1149        if let Expr::BinaryOp { left, op: _, right } = expr {
1150            ensure_one_expr(left, columns)?;
1151            ensure_one_expr(right, columns)?;
1152        } else {
1153            return error::InvalidSqlSnafu {
1154                msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1155            }
1156            .fail();
1157        }
1158    }
1159    Ok(())
1160}
1161
1162/// Check if the expr is a binary expr, an ident or a literal value.
1163/// If is ident, then check it is in the column list.
1164/// This recursive function is intended to be used by [ensure_exprs_are_binary].
1165fn ensure_one_expr(expr: &Expr, columns: &[&Column]) -> Result<()> {
1166    match expr {
1167        Expr::BinaryOp { left, op: _, right } => {
1168            ensure_one_expr(left, columns)?;
1169            ensure_one_expr(right, columns)?;
1170            Ok(())
1171        }
1172        Expr::Identifier(ident) => {
1173            let column_name = &ident.value;
1174            ensure!(
1175                columns.iter().any(|c| &c.name().value == column_name),
1176                error::InvalidSqlSnafu {
1177                    msg: format!(
1178                        "Column {:?} in rule expr is not referenced in PARTITION ON",
1179                        column_name
1180                    ),
1181                }
1182            );
1183            Ok(())
1184        }
1185        Expr::Value(_) => Ok(()),
1186        Expr::UnaryOp { expr, .. } => {
1187            ensure_one_expr(expr, columns)?;
1188            Ok(())
1189        }
1190        _ => error::InvalidSqlSnafu {
1191            msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1192        }
1193        .fail(),
1194    }
1195}
1196
1197/// Ensure that all columns used in "PARTITION ON COLUMNS" are defined in create table.
1198fn ensure_partition_columns_defined<'a>(
1199    columns: &'a [Column],
1200    partitions: &'a Partitions,
1201) -> Result<Vec<&'a Column>> {
1202    partitions
1203        .column_list
1204        .iter()
1205        .map(|x| {
1206            let x = ParserContext::canonicalize_identifier(x.clone());
1207            // Normally the columns in "create table" won't be too many,
1208            // a linear search to find the target every time is fine.
1209            columns
1210                .iter()
1211                .find(|c| *c.name().value == x.value)
1212                .context(error::InvalidSqlSnafu {
1213                    msg: format!("Partition column {:?} not defined", x.value),
1214                })
1215        })
1216        .collect::<Result<Vec<&Column>>>()
1217}
1218
1219#[cfg(test)]
1220mod tests {
1221    use std::assert_matches::assert_matches;
1222    use std::collections::HashMap;
1223
1224    use common_catalog::consts::FILE_ENGINE;
1225    use common_error::ext::ErrorExt;
1226    use sqlparser::ast::ColumnOption::NotNull;
1227    use sqlparser::ast::{BinaryOperator, Expr, ObjectName, ObjectNamePart, Value};
1228    use sqlparser::dialect::GenericDialect;
1229    use sqlparser::tokenizer::Tokenizer;
1230
1231    use super::*;
1232    use crate::dialect::GreptimeDbDialect;
1233    use crate::parser::ParseOptions;
1234
1235    #[test]
1236    fn test_parse_create_table_like() {
1237        let sql = "CREATE TABLE t1 LIKE t2";
1238        let stmts =
1239            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1240                .unwrap();
1241
1242        assert_eq!(1, stmts.len());
1243        match &stmts[0] {
1244            Statement::CreateTableLike(c) => {
1245                assert_eq!(c.table_name.to_string(), "t1");
1246                assert_eq!(c.source_name.to_string(), "t2");
1247            }
1248            _ => unreachable!(),
1249        }
1250    }
1251
1252    #[test]
1253    fn test_validate_external_table_options() {
1254        let sql = "CREATE EXTERNAL TABLE city (
1255            host string,
1256            ts timestamp,
1257            cpu float64 default 0,
1258            memory float64,
1259            TIME INDEX (ts),
1260            PRIMARY KEY(ts, host)
1261        ) with(location='/var/data/city.csv',format='csv',foo='bar');";
1262
1263        let result =
1264            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1265        assert!(matches!(
1266            result,
1267            Err(error::Error::InvalidTableOption { .. })
1268        ));
1269    }
1270
1271    #[test]
1272    fn test_parse_create_external_table() {
1273        struct Test<'a> {
1274            sql: &'a str,
1275            expected_table_name: &'a str,
1276            expected_options: HashMap<&'a str, &'a str>,
1277            expected_engine: &'a str,
1278            expected_if_not_exist: bool,
1279        }
1280
1281        let tests = [
1282            Test {
1283                sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
1284                expected_table_name: "city",
1285                expected_options: HashMap::from([
1286                    ("location", "/var/data/city.csv"),
1287                    ("format", "csv"),
1288                ]),
1289                expected_engine: FILE_ENGINE,
1290                expected_if_not_exist: false,
1291            },
1292            Test {
1293                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');",
1294                expected_table_name: "city",
1295                expected_options: HashMap::from([
1296                    ("location", "/var/data/city.csv"),
1297                    ("format", "csv"),
1298                ]),
1299                expected_engine: "foo",
1300                expected_if_not_exist: true,
1301            },
1302            Test {
1303                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv','compaction.type'='bar');",
1304                expected_table_name: "city",
1305                expected_options: HashMap::from([
1306                    ("location", "/var/data/city.csv"),
1307                    ("format", "csv"),
1308                    ("compaction.type", "bar"),
1309                ]),
1310                expected_engine: "foo",
1311                expected_if_not_exist: true,
1312            },
1313        ];
1314
1315        for test in tests {
1316            let stmts = ParserContext::create_with_dialect(
1317                test.sql,
1318                &GreptimeDbDialect {},
1319                ParseOptions::default(),
1320            )
1321            .unwrap();
1322            assert_eq!(1, stmts.len());
1323            match &stmts[0] {
1324                Statement::CreateExternalTable(c) => {
1325                    assert_eq!(c.name.to_string(), test.expected_table_name.to_string());
1326                    assert_eq!(c.options.to_str_map(), test.expected_options);
1327                    assert_eq!(c.if_not_exists, test.expected_if_not_exist);
1328                    assert_eq!(c.engine, test.expected_engine);
1329                }
1330                _ => unreachable!(),
1331            }
1332        }
1333    }
1334
1335    #[test]
1336    fn test_parse_create_external_table_with_schema() {
1337        let sql = "CREATE EXTERNAL TABLE city (
1338            host string,
1339            ts timestamp,
1340            cpu float32 default 0,
1341            memory float64,
1342            TIME INDEX (ts),
1343            PRIMARY KEY(ts, host),
1344        ) with(location='/var/data/city.csv',format='csv');";
1345
1346        let options = HashMap::from([("location", "/var/data/city.csv"), ("format", "csv")]);
1347
1348        let stmts =
1349            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1350                .unwrap();
1351        assert_eq!(1, stmts.len());
1352        match &stmts[0] {
1353            Statement::CreateExternalTable(c) => {
1354                assert_eq!(c.name.to_string(), "city");
1355                assert_eq!(c.options.to_str_map(), options);
1356
1357                let columns = &c.columns;
1358                assert_column_def(&columns[0].column_def, "host", "STRING");
1359                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
1360                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
1361                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
1362
1363                let constraints = &c.constraints;
1364                assert_eq!(
1365                    &constraints[0],
1366                    &TableConstraint::TimeIndex {
1367                        column: Ident::new("ts"),
1368                    }
1369                );
1370                assert_eq!(
1371                    &constraints[1],
1372                    &TableConstraint::PrimaryKey {
1373                        columns: vec![Ident::new("ts"), Ident::new("host")]
1374                    }
1375                );
1376            }
1377            _ => unreachable!(),
1378        }
1379    }
1380
1381    #[test]
1382    fn test_parse_create_database() {
1383        let sql = "create database";
1384        let result =
1385            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1386        assert!(
1387            result
1388                .unwrap_err()
1389                .to_string()
1390                .contains("Unexpected token while parsing SQL statement")
1391        );
1392
1393        let sql = "create database prometheus";
1394        let stmts =
1395            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1396                .unwrap();
1397
1398        assert_eq!(1, stmts.len());
1399        match &stmts[0] {
1400            Statement::CreateDatabase(c) => {
1401                assert_eq!(c.name.to_string(), "prometheus");
1402                assert!(!c.if_not_exists);
1403            }
1404            _ => unreachable!(),
1405        }
1406
1407        let sql = "create database if not exists prometheus";
1408        let stmts =
1409            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1410                .unwrap();
1411
1412        assert_eq!(1, stmts.len());
1413        match &stmts[0] {
1414            Statement::CreateDatabase(c) => {
1415                assert_eq!(c.name.to_string(), "prometheus");
1416                assert!(c.if_not_exists);
1417            }
1418            _ => unreachable!(),
1419        }
1420
1421        let sql = "CREATE DATABASE `fOo`";
1422        let result =
1423            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1424        let stmts = result.unwrap();
1425        match &stmts.last().unwrap() {
1426            Statement::CreateDatabase(c) => {
1427                assert_eq!(c.name, vec![Ident::with_quote('`', "fOo")].into());
1428                assert!(!c.if_not_exists);
1429            }
1430            _ => unreachable!(),
1431        }
1432
1433        let sql = "CREATE DATABASE prometheus with (ttl='1h');";
1434        let result =
1435            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1436        let stmts = result.unwrap();
1437        match &stmts[0] {
1438            Statement::CreateDatabase(c) => {
1439                assert_eq!(c.name.to_string(), "prometheus");
1440                assert!(!c.if_not_exists);
1441                assert_eq!(c.options.get("ttl").unwrap(), "1h");
1442            }
1443            _ => unreachable!(),
1444        }
1445    }
1446
1447    #[test]
1448    fn test_parse_create_flow_more_testcases() {
1449        use pretty_assertions::assert_eq;
1450        fn parse_create_flow(sql: &str) -> CreateFlow {
1451            let stmts = ParserContext::create_with_dialect(
1452                sql,
1453                &GreptimeDbDialect {},
1454                ParseOptions::default(),
1455            )
1456            .unwrap();
1457            assert_eq!(1, stmts.len());
1458            match &stmts[0] {
1459                Statement::CreateFlow(c) => c.clone(),
1460                _ => unreachable!(),
1461            }
1462        }
1463        struct CreateFlowWoutQuery {
1464            /// Flow name
1465            pub flow_name: ObjectName,
1466            /// Output (sink) table name
1467            pub sink_table_name: ObjectName,
1468            /// Whether to replace existing task
1469            pub or_replace: bool,
1470            /// Create if not exist
1471            pub if_not_exists: bool,
1472            /// `EXPIRE AFTER`
1473            /// Duration in second as `i64`
1474            pub expire_after: Option<i64>,
1475            /// Comment string
1476            pub comment: Option<String>,
1477        }
1478        let testcases = vec![
1479            (
1480                r"
1481CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1482SINK TO schema_1.table_1
1483EXPIRE AFTER INTERVAL '5 minutes'
1484COMMENT 'test comment'
1485AS
1486SELECT max(c1), min(c2) FROM schema_2.table_2;",
1487                CreateFlowWoutQuery {
1488                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1489                    sink_table_name: ObjectName::from(vec![
1490                        Ident::new("schema_1"),
1491                        Ident::new("table_1"),
1492                    ]),
1493                    or_replace: true,
1494                    if_not_exists: true,
1495                    expire_after: Some(300),
1496                    comment: Some("test comment".to_string()),
1497                },
1498            ),
1499            (
1500                r"
1501CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1502SINK TO schema_1.table_1
1503EXPIRE AFTER INTERVAL '300 s'
1504COMMENT 'test comment'
1505AS
1506SELECT max(c1), min(c2) FROM schema_2.table_2;",
1507                CreateFlowWoutQuery {
1508                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1509                    sink_table_name: ObjectName::from(vec![
1510                        Ident::new("schema_1"),
1511                        Ident::new("table_1"),
1512                    ]),
1513                    or_replace: true,
1514                    if_not_exists: true,
1515                    expire_after: Some(300),
1516                    comment: Some("test comment".to_string()),
1517                },
1518            ),
1519            (
1520                r"
1521CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1522SINK TO schema_1.table_1
1523EXPIRE AFTER '5 minutes'
1524COMMENT 'test comment'
1525AS
1526SELECT max(c1), min(c2) FROM schema_2.table_2;",
1527                CreateFlowWoutQuery {
1528                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1529                    sink_table_name: ObjectName::from(vec![
1530                        Ident::new("schema_1"),
1531                        Ident::new("table_1"),
1532                    ]),
1533                    or_replace: true,
1534                    if_not_exists: true,
1535                    expire_after: Some(300),
1536                    comment: Some("test comment".to_string()),
1537                },
1538            ),
1539            (
1540                r"
1541CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1542SINK TO schema_1.table_1
1543EXPIRE AFTER '300 s'
1544COMMENT 'test comment'
1545AS
1546SELECT max(c1), min(c2) FROM schema_2.table_2;",
1547                CreateFlowWoutQuery {
1548                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1549                    sink_table_name: ObjectName::from(vec![
1550                        Ident::new("schema_1"),
1551                        Ident::new("table_1"),
1552                    ]),
1553                    or_replace: true,
1554                    if_not_exists: true,
1555                    expire_after: Some(300),
1556                    comment: Some("test comment".to_string()),
1557                },
1558            ),
1559            (
1560                r"
1561CREATE FLOW `task_2`
1562SINK TO schema_1.table_1
1563EXPIRE AFTER '2 days 1h 2 min'
1564AS
1565SELECT max(c1), min(c2) FROM schema_2.table_2;",
1566                CreateFlowWoutQuery {
1567                    flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_2")]),
1568                    sink_table_name: ObjectName::from(vec![
1569                        Ident::new("schema_1"),
1570                        Ident::new("table_1"),
1571                    ]),
1572                    or_replace: false,
1573                    if_not_exists: false,
1574                    expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1575                    comment: None,
1576                },
1577            ),
1578            (
1579                r"
1580create flow `task_3`
1581sink to schema_1.table_1
1582expire after '10 minutes'
1583as
1584select max(c1), min(c2) from schema_2.table_2;",
1585                CreateFlowWoutQuery {
1586                    flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_3")]),
1587                    sink_table_name: ObjectName::from(vec![
1588                        Ident::new("schema_1"),
1589                        Ident::new("table_1"),
1590                    ]),
1591                    or_replace: false,
1592                    if_not_exists: false,
1593                    expire_after: Some(600), // 10 minutes in seconds
1594                    comment: None,
1595                },
1596            ),
1597            (
1598                r"
1599create or replace flow if not exists task_4
1600sink to schema_1.table_1
1601expire after interval '2 hours'
1602comment 'lowercase test'
1603as
1604select max(c1), min(c2) from schema_2.table_2;",
1605                CreateFlowWoutQuery {
1606                    flow_name: ObjectName::from(vec![Ident::new("task_4")]),
1607                    sink_table_name: ObjectName::from(vec![
1608                        Ident::new("schema_1"),
1609                        Ident::new("table_1"),
1610                    ]),
1611                    or_replace: true,
1612                    if_not_exists: true,
1613                    expire_after: Some(7200), // 2 hours in seconds
1614                    comment: Some("lowercase test".to_string()),
1615                },
1616            ),
1617        ];
1618
1619        for (sql, expected) in testcases {
1620            let create_task = parse_create_flow(sql);
1621
1622            let expected = CreateFlow {
1623                flow_name: expected.flow_name,
1624                sink_table_name: expected.sink_table_name,
1625                or_replace: expected.or_replace,
1626                if_not_exists: expected.if_not_exists,
1627                expire_after: expected.expire_after,
1628                eval_interval: None,
1629                comment: expected.comment,
1630                // ignore query parse result
1631                query: create_task.query.clone(),
1632            };
1633
1634            assert_eq!(create_task, expected, "input sql is:\n{sql}");
1635            let show_create = create_task.to_string();
1636            let recreated = parse_create_flow(&show_create);
1637            assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1638        }
1639    }
1640
1641    #[test]
1642    fn test_parse_create_flow() {
1643        use pretty_assertions::assert_eq;
1644        fn parse_create_flow(sql: &str) -> CreateFlow {
1645            let stmts = ParserContext::create_with_dialect(
1646                sql,
1647                &GreptimeDbDialect {},
1648                ParseOptions::default(),
1649            )
1650            .unwrap();
1651            assert_eq!(1, stmts.len());
1652            match &stmts[0] {
1653                Statement::CreateFlow(c) => c.clone(),
1654                _ => panic!("{:?}", stmts[0]),
1655            }
1656        }
1657        struct CreateFlowWoutQuery {
1658            /// Flow name
1659            pub flow_name: ObjectName,
1660            /// Output (sink) table name
1661            pub sink_table_name: ObjectName,
1662            /// Whether to replace existing task
1663            pub or_replace: bool,
1664            /// Create if not exist
1665            pub if_not_exists: bool,
1666            /// `EXPIRE AFTER`
1667            /// Duration in second as `i64`
1668            pub expire_after: Option<i64>,
1669            /// Duration for flow evaluation interval
1670            /// Duration in seconds as `i64`
1671            /// If not set, flow will be evaluated based on time window size and other args.
1672            pub eval_interval: Option<i64>,
1673            /// Comment string
1674            pub comment: Option<String>,
1675        }
1676
1677        // create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE AFTER` and `COMMENT`
1678        let testcases = vec![
1679            (
1680                r"
1681CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1682SINK TO schema_1.table_1
1683EXPIRE AFTER INTERVAL '5 minutes'
1684COMMENT 'test comment'
1685AS
1686SELECT max(c1), min(c2) FROM schema_2.table_2;",
1687                CreateFlowWoutQuery {
1688                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1689                    sink_table_name: ObjectName(vec![
1690                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1691                        ObjectNamePart::Identifier(Ident::new("table_1")),
1692                    ]),
1693                    or_replace: true,
1694                    if_not_exists: true,
1695                    expire_after: Some(300),
1696                    eval_interval: None,
1697                    comment: Some("test comment".to_string()),
1698                },
1699            ),
1700            (
1701                r"
1702CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1703SINK TO schema_1.table_1
1704EXPIRE AFTER INTERVAL '300 s'
1705COMMENT 'test comment'
1706AS
1707SELECT max(c1), min(c2) FROM schema_2.table_2;",
1708                CreateFlowWoutQuery {
1709                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1710                    sink_table_name: ObjectName(vec![
1711                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1712                        ObjectNamePart::Identifier(Ident::new("table_1")),
1713                    ]),
1714                    or_replace: true,
1715                    if_not_exists: true,
1716                    expire_after: Some(300),
1717                    eval_interval: None,
1718                    comment: Some("test comment".to_string()),
1719                },
1720            ),
1721            (
1722                r"
1723CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1724SINK TO schema_1.table_1
1725EXPIRE AFTER '5 minutes'
1726EVAL INTERVAL '10 seconds'
1727COMMENT 'test comment'
1728AS
1729SELECT max(c1), min(c2) FROM schema_2.table_2;",
1730                CreateFlowWoutQuery {
1731                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1732                    sink_table_name: ObjectName(vec![
1733                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1734                        ObjectNamePart::Identifier(Ident::new("table_1")),
1735                    ]),
1736                    or_replace: true,
1737                    if_not_exists: true,
1738                    expire_after: Some(300),
1739                    eval_interval: Some(10),
1740                    comment: Some("test comment".to_string()),
1741                },
1742            ),
1743            (
1744                r"
1745CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1746SINK TO schema_1.table_1
1747EXPIRE AFTER '5 minutes'
1748EVAL INTERVAL INTERVAL '10 seconds'
1749COMMENT 'test comment'
1750AS
1751SELECT max(c1), min(c2) FROM schema_2.table_2;",
1752                CreateFlowWoutQuery {
1753                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1754                    sink_table_name: ObjectName(vec![
1755                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1756                        ObjectNamePart::Identifier(Ident::new("table_1")),
1757                    ]),
1758                    or_replace: true,
1759                    if_not_exists: true,
1760                    expire_after: Some(300),
1761                    eval_interval: Some(10),
1762                    comment: Some("test comment".to_string()),
1763                },
1764            ),
1765            (
1766                r"
1767CREATE FLOW `task_2`
1768SINK TO schema_1.table_1
1769EXPIRE AFTER '2 days 1h 2 min'
1770AS
1771SELECT max(c1), min(c2) FROM schema_2.table_2;",
1772                CreateFlowWoutQuery {
1773                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::with_quote(
1774                        '`', "task_2",
1775                    ))]),
1776                    sink_table_name: ObjectName(vec![
1777                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1778                        ObjectNamePart::Identifier(Ident::new("table_1")),
1779                    ]),
1780                    or_replace: false,
1781                    if_not_exists: false,
1782                    expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1783                    eval_interval: None,
1784                    comment: None,
1785                },
1786            ),
1787        ];
1788
1789        for (sql, expected) in testcases {
1790            let create_task = parse_create_flow(sql);
1791
1792            let expected = CreateFlow {
1793                flow_name: expected.flow_name,
1794                sink_table_name: expected.sink_table_name,
1795                or_replace: expected.or_replace,
1796                if_not_exists: expected.if_not_exists,
1797                expire_after: expected.expire_after,
1798                eval_interval: expected.eval_interval,
1799                comment: expected.comment,
1800                // ignore query parse result
1801                query: create_task.query.clone(),
1802            };
1803
1804            assert_eq!(create_task, expected, "input sql is:\n{sql}");
1805            let show_create = create_task.to_string();
1806            let recreated = parse_create_flow(&show_create);
1807            assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1808        }
1809    }
1810
1811    #[test]
1812    fn test_create_flow_no_month() {
1813        let sql = r"
1814CREATE FLOW `task_2`
1815SINK TO schema_1.table_1
1816EXPIRE AFTER '1 month 2 days 1h 2 min'
1817AS
1818SELECT max(c1), min(c2) FROM schema_2.table_2;";
1819        let stmts =
1820            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1821
1822        assert!(
1823            stmts.is_err()
1824                && stmts
1825                    .unwrap_err()
1826                    .to_string()
1827                    .contains("Interval with months is not allowed")
1828        );
1829    }
1830
1831    #[test]
1832    fn test_validate_create() {
1833        let sql = r"
1834CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp TIME INDEX)
1835PARTITION ON COLUMNS(c, a) (
1836    a < 10,
1837    a > 10 AND a < 20,
1838    a > 20 AND c < 100,
1839    a > 20 AND c >= 100
1840)
1841ENGINE=mito";
1842        let result =
1843            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1844        let _ = result.unwrap();
1845
1846        let sql = r"
1847CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
1848PARTITION ON COLUMNS(x) ()
1849ENGINE=mito";
1850        let result =
1851            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1852        assert!(
1853            result
1854                .unwrap_err()
1855                .to_string()
1856                .contains("Partition column \"x\" not defined")
1857        );
1858    }
1859
1860    #[test]
1861    fn test_parse_create_table_with_partitions() {
1862        let sql = r"
1863CREATE TABLE monitor (
1864  host_id    INT,
1865  idc        STRING,
1866  ts         TIMESTAMP,
1867  cpu        DOUBLE DEFAULT 0,
1868  memory     DOUBLE,
1869  TIME INDEX (ts),
1870  PRIMARY KEY (host),
1871)
1872PARTITION ON COLUMNS(idc, host_id) (
1873  idc <= 'hz' AND host_id < 1000,
1874  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
1875  idc > 'sh' AND host_id < 3000,
1876  idc > 'sh' AND host_id >= 3000,
1877)
1878ENGINE=mito";
1879        let result =
1880            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1881                .unwrap();
1882        assert_eq!(result.len(), 1);
1883        match &result[0] {
1884            Statement::CreateTable(c) => {
1885                assert!(c.partitions.is_some());
1886
1887                let partitions = c.partitions.as_ref().unwrap();
1888                let column_list = partitions
1889                    .column_list
1890                    .iter()
1891                    .map(|x| &x.value)
1892                    .collect::<Vec<&String>>();
1893                assert_eq!(column_list, vec!["idc", "host_id"]);
1894
1895                let exprs = &partitions.exprs;
1896
1897                assert_eq!(
1898                    exprs[0],
1899                    Expr::BinaryOp {
1900                        left: Box::new(Expr::BinaryOp {
1901                            left: Box::new(Expr::Identifier("idc".into())),
1902                            op: BinaryOperator::LtEq,
1903                            right: Box::new(Expr::Value(
1904                                Value::SingleQuotedString("hz".to_string()).into()
1905                            ))
1906                        }),
1907                        op: BinaryOperator::And,
1908                        right: Box::new(Expr::BinaryOp {
1909                            left: Box::new(Expr::Identifier("host_id".into())),
1910                            op: BinaryOperator::Lt,
1911                            right: Box::new(Expr::Value(
1912                                Value::Number("1000".to_string(), false).into()
1913                            ))
1914                        })
1915                    }
1916                );
1917                assert_eq!(
1918                    exprs[1],
1919                    Expr::BinaryOp {
1920                        left: Box::new(Expr::BinaryOp {
1921                            left: Box::new(Expr::BinaryOp {
1922                                left: Box::new(Expr::Identifier("idc".into())),
1923                                op: BinaryOperator::Gt,
1924                                right: Box::new(Expr::Value(
1925                                    Value::SingleQuotedString("hz".to_string()).into()
1926                                ))
1927                            }),
1928                            op: BinaryOperator::And,
1929                            right: Box::new(Expr::BinaryOp {
1930                                left: Box::new(Expr::Identifier("idc".into())),
1931                                op: BinaryOperator::LtEq,
1932                                right: Box::new(Expr::Value(
1933                                    Value::SingleQuotedString("sh".to_string()).into()
1934                                ))
1935                            })
1936                        }),
1937                        op: BinaryOperator::And,
1938                        right: Box::new(Expr::BinaryOp {
1939                            left: Box::new(Expr::Identifier("host_id".into())),
1940                            op: BinaryOperator::Lt,
1941                            right: Box::new(Expr::Value(
1942                                Value::Number("2000".to_string(), false).into()
1943                            ))
1944                        })
1945                    }
1946                );
1947                assert_eq!(
1948                    exprs[2],
1949                    Expr::BinaryOp {
1950                        left: Box::new(Expr::BinaryOp {
1951                            left: Box::new(Expr::Identifier("idc".into())),
1952                            op: BinaryOperator::Gt,
1953                            right: Box::new(Expr::Value(
1954                                Value::SingleQuotedString("sh".to_string()).into()
1955                            ))
1956                        }),
1957                        op: BinaryOperator::And,
1958                        right: Box::new(Expr::BinaryOp {
1959                            left: Box::new(Expr::Identifier("host_id".into())),
1960                            op: BinaryOperator::Lt,
1961                            right: Box::new(Expr::Value(
1962                                Value::Number("3000".to_string(), false).into()
1963                            ))
1964                        })
1965                    }
1966                );
1967                assert_eq!(
1968                    exprs[3],
1969                    Expr::BinaryOp {
1970                        left: Box::new(Expr::BinaryOp {
1971                            left: Box::new(Expr::Identifier("idc".into())),
1972                            op: BinaryOperator::Gt,
1973                            right: Box::new(Expr::Value(
1974                                Value::SingleQuotedString("sh".to_string()).into()
1975                            ))
1976                        }),
1977                        op: BinaryOperator::And,
1978                        right: Box::new(Expr::BinaryOp {
1979                            left: Box::new(Expr::Identifier("host_id".into())),
1980                            op: BinaryOperator::GtEq,
1981                            right: Box::new(Expr::Value(
1982                                Value::Number("3000".to_string(), false).into()
1983                            ))
1984                        })
1985                    }
1986                );
1987            }
1988            _ => unreachable!(),
1989        }
1990    }
1991
1992    #[test]
1993    fn test_parse_create_table_with_quoted_partitions() {
1994        let sql = r"
1995CREATE TABLE monitor (
1996  `host_id`    INT,
1997  idc        STRING,
1998  ts         TIMESTAMP,
1999  cpu        DOUBLE DEFAULT 0,
2000  memory     DOUBLE,
2001  TIME INDEX (ts),
2002  PRIMARY KEY (host),
2003)
2004PARTITION ON COLUMNS(IdC, host_id) (
2005  idc <= 'hz' AND host_id < 1000,
2006  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
2007  idc > 'sh' AND host_id < 3000,
2008  idc > 'sh' AND host_id >= 3000,
2009)";
2010        let result =
2011            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2012                .unwrap();
2013        assert_eq!(result.len(), 1);
2014    }
2015
2016    #[test]
2017    fn test_parse_create_table_with_timestamp_index() {
2018        let sql1 = r"
2019CREATE TABLE monitor (
2020  host_id    INT,
2021  idc        STRING,
2022  ts         TIMESTAMP TIME INDEX,
2023  cpu        DOUBLE DEFAULT 0,
2024  memory     DOUBLE,
2025  PRIMARY KEY (host),
2026)
2027ENGINE=mito";
2028        let result1 = ParserContext::create_with_dialect(
2029            sql1,
2030            &GreptimeDbDialect {},
2031            ParseOptions::default(),
2032        )
2033        .unwrap();
2034
2035        if let Statement::CreateTable(c) = &result1[0] {
2036            assert_eq!(c.constraints.len(), 2);
2037            let tc = c.constraints[0].clone();
2038            match tc {
2039                TableConstraint::TimeIndex { column } => {
2040                    assert_eq!(&column.value, "ts");
2041                }
2042                _ => panic!("should be time index constraint"),
2043            };
2044        } else {
2045            panic!("should be create_table statement");
2046        }
2047
2048        // `TIME INDEX` should be in front of `PRIMARY KEY`
2049        // in order to equal the `TIMESTAMP TIME INDEX` constraint options vector
2050        let sql2 = r"
2051CREATE TABLE monitor (
2052  host_id    INT,
2053  idc        STRING,
2054  ts         TIMESTAMP NOT NULL,
2055  cpu        DOUBLE DEFAULT 0,
2056  memory     DOUBLE,
2057  TIME INDEX (ts),
2058  PRIMARY KEY (host),
2059)
2060ENGINE=mito";
2061        let result2 = ParserContext::create_with_dialect(
2062            sql2,
2063            &GreptimeDbDialect {},
2064            ParseOptions::default(),
2065        )
2066        .unwrap();
2067
2068        assert_eq!(result1, result2);
2069
2070        // TIMESTAMP can be NULL which is not equal to above
2071        let sql3 = r"
2072CREATE TABLE monitor (
2073  host_id    INT,
2074  idc        STRING,
2075  ts         TIMESTAMP,
2076  cpu        DOUBLE DEFAULT 0,
2077  memory     DOUBLE,
2078  TIME INDEX (ts),
2079  PRIMARY KEY (host),
2080)
2081ENGINE=mito";
2082
2083        let result3 = ParserContext::create_with_dialect(
2084            sql3,
2085            &GreptimeDbDialect {},
2086            ParseOptions::default(),
2087        )
2088        .unwrap();
2089
2090        assert_ne!(result1, result3);
2091
2092        // BIGINT can't be time index any more
2093        let sql1 = r"
2094CREATE TABLE monitor (
2095  host_id    INT,
2096  idc        STRING,
2097  b          bigint TIME INDEX,
2098  cpu        DOUBLE DEFAULT 0,
2099  memory     DOUBLE,
2100  PRIMARY KEY (host),
2101)
2102ENGINE=mito";
2103        let result1 = ParserContext::create_with_dialect(
2104            sql1,
2105            &GreptimeDbDialect {},
2106            ParseOptions::default(),
2107        );
2108
2109        assert!(
2110            result1
2111                .unwrap_err()
2112                .to_string()
2113                .contains("time index column data type should be timestamp")
2114        );
2115    }
2116
2117    #[test]
2118    fn test_parse_create_table_with_timestamp_index_not_null() {
2119        let sql = r"
2120CREATE TABLE monitor (
2121  host_id    INT,
2122  idc        STRING,
2123  ts         TIMESTAMP TIME INDEX,
2124  cpu        DOUBLE DEFAULT 0,
2125  memory     DOUBLE,
2126  TIME INDEX (ts),
2127  PRIMARY KEY (host),
2128)
2129ENGINE=mito";
2130        let result =
2131            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2132                .unwrap();
2133
2134        assert_eq!(result.len(), 1);
2135        if let Statement::CreateTable(c) = &result[0] {
2136            let ts = c.columns[2].clone();
2137            assert_eq!(ts.name().to_string(), "ts");
2138            assert_eq!(ts.options()[0].option, NotNull);
2139        } else {
2140            panic!("should be create table statement");
2141        }
2142
2143        let sql1 = r"
2144CREATE TABLE monitor (
2145  host_id    INT,
2146  idc        STRING,
2147  ts         TIMESTAMP NOT NULL TIME INDEX,
2148  cpu        DOUBLE DEFAULT 0,
2149  memory     DOUBLE,
2150  TIME INDEX (ts),
2151  PRIMARY KEY (host),
2152)
2153ENGINE=mito";
2154
2155        let result1 = ParserContext::create_with_dialect(
2156            sql1,
2157            &GreptimeDbDialect {},
2158            ParseOptions::default(),
2159        )
2160        .unwrap();
2161        assert_eq!(result, result1);
2162
2163        let sql2 = r"
2164CREATE TABLE monitor (
2165  host_id    INT,
2166  idc        STRING,
2167  ts         TIMESTAMP TIME INDEX NOT NULL,
2168  cpu        DOUBLE DEFAULT 0,
2169  memory     DOUBLE,
2170  TIME INDEX (ts),
2171  PRIMARY KEY (host),
2172)
2173ENGINE=mito";
2174
2175        let result2 = ParserContext::create_with_dialect(
2176            sql2,
2177            &GreptimeDbDialect {},
2178            ParseOptions::default(),
2179        )
2180        .unwrap();
2181        assert_eq!(result, result2);
2182
2183        let sql3 = r"
2184CREATE TABLE monitor (
2185  host_id    INT,
2186  idc        STRING,
2187  ts         TIMESTAMP TIME INDEX NULL NOT,
2188  cpu        DOUBLE DEFAULT 0,
2189  memory     DOUBLE,
2190  TIME INDEX (ts),
2191  PRIMARY KEY (host),
2192)
2193ENGINE=mito";
2194
2195        let result3 = ParserContext::create_with_dialect(
2196            sql3,
2197            &GreptimeDbDialect {},
2198            ParseOptions::default(),
2199        );
2200        assert!(result3.is_err());
2201
2202        let sql4 = r"
2203CREATE TABLE monitor (
2204  host_id    INT,
2205  idc        STRING,
2206  ts         TIMESTAMP TIME INDEX NOT NULL NULL,
2207  cpu        DOUBLE DEFAULT 0,
2208  memory     DOUBLE,
2209  TIME INDEX (ts),
2210  PRIMARY KEY (host),
2211)
2212ENGINE=mito";
2213
2214        let result4 = ParserContext::create_with_dialect(
2215            sql4,
2216            &GreptimeDbDialect {},
2217            ParseOptions::default(),
2218        );
2219        assert!(result4.is_err());
2220
2221        let sql = r"
2222CREATE TABLE monitor (
2223  host_id    INT,
2224  idc        STRING,
2225  ts         TIMESTAMP TIME INDEX DEFAULT CURRENT_TIMESTAMP,
2226  cpu        DOUBLE DEFAULT 0,
2227  memory     DOUBLE,
2228  TIME INDEX (ts),
2229  PRIMARY KEY (host),
2230)
2231ENGINE=mito";
2232
2233        let result =
2234            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2235                .unwrap();
2236
2237        if let Statement::CreateTable(c) = &result[0] {
2238            let tc = c.constraints[0].clone();
2239            match tc {
2240                TableConstraint::TimeIndex { column } => {
2241                    assert_eq!(&column.value, "ts");
2242                }
2243                _ => panic!("should be time index constraint"),
2244            }
2245            let ts = c.columns[2].clone();
2246            assert_eq!(ts.name().to_string(), "ts");
2247            assert!(matches!(ts.options()[0].option, ColumnOption::Default(..)));
2248            assert_eq!(ts.options()[1].option, NotNull);
2249        } else {
2250            unreachable!("should be create table statement");
2251        }
2252    }
2253
2254    #[test]
2255    fn test_parse_partitions_with_error_syntax() {
2256        let sql = r"
2257CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2258PARTITION COLUMNS(c, a) (
2259    a < 10,
2260    a > 10 AND a < 20,
2261    a > 20 AND c < 100,
2262    a > 20 AND c >= 100
2263)
2264ENGINE=mito";
2265        let result =
2266            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2267        assert!(
2268            result
2269                .unwrap_err()
2270                .output_msg()
2271                .contains("sql parser error: Expected: ON, found: COLUMNS")
2272        );
2273    }
2274
2275    #[test]
2276    fn test_parse_partitions_without_rule() {
2277        let sql = r"
2278CREATE TABLE rcx ( a INT, b STRING, c INT, d TIMESTAMP TIME INDEX )
2279PARTITION ON COLUMNS(c, a) ()
2280ENGINE=mito";
2281        ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2282            .unwrap();
2283    }
2284
2285    #[test]
2286    fn test_parse_partitions_unreferenced_column() {
2287        let sql = r"
2288CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2289PARTITION ON COLUMNS(c, a) (
2290    b = 'foo'
2291)
2292ENGINE=mito";
2293        let result =
2294            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2295        assert_eq!(
2296            result.unwrap_err().output_msg(),
2297            "Invalid SQL, error: Column \"b\" in rule expr is not referenced in PARTITION ON"
2298        );
2299    }
2300
2301    #[test]
2302    fn test_parse_partitions_not_binary_expr() {
2303        let sql = r"
2304CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2305PARTITION ON COLUMNS(c, a) (
2306    b
2307)
2308ENGINE=mito";
2309        let result =
2310            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2311        assert_eq!(
2312            result.unwrap_err().output_msg(),
2313            r#"Invalid SQL, error: Partition rule expr Identifier(Ident { value: "b", quote_style: None, span: Span(Location(4,5)..Location(4,6)) }) is not a binary expr"#
2314        );
2315    }
2316
2317    fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {
2318        assert_eq!(column.name.to_string(), name);
2319        assert_eq!(column.data_type.to_string(), data_type);
2320    }
2321
2322    #[test]
2323    pub fn test_parse_create_table() {
2324        let sql = r"create table demo(
2325                             host string,
2326                             ts timestamp,
2327                             cpu float32 default 0,
2328                             memory float64,
2329                             TIME INDEX (ts),
2330                             PRIMARY KEY(ts, host),
2331                             ) engine=mito
2332                             with(ttl='10s');
2333         ";
2334        let result =
2335            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2336                .unwrap();
2337        assert_eq!(1, result.len());
2338        match &result[0] {
2339            Statement::CreateTable(c) => {
2340                assert!(!c.if_not_exists);
2341                assert_eq!("demo", c.name.to_string());
2342                assert_eq!("mito", c.engine);
2343                assert_eq!(4, c.columns.len());
2344                let columns = &c.columns;
2345                assert_column_def(&columns[0].column_def, "host", "STRING");
2346                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
2347                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
2348                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
2349
2350                let constraints = &c.constraints;
2351                assert_eq!(
2352                    &constraints[0],
2353                    &TableConstraint::TimeIndex {
2354                        column: Ident::new("ts"),
2355                    }
2356                );
2357                assert_eq!(
2358                    &constraints[1],
2359                    &TableConstraint::PrimaryKey {
2360                        columns: vec![Ident::new("ts"), Ident::new("host")]
2361                    }
2362                );
2363                // inverted index is merged into column options
2364                assert_eq!(1, c.options.len());
2365                assert_eq!(
2366                    [("ttl", "10s")].into_iter().collect::<HashMap<_, _>>(),
2367                    c.options.to_str_map()
2368                );
2369            }
2370            _ => unreachable!(),
2371        }
2372    }
2373
2374    #[test]
2375    fn test_invalid_index_keys() {
2376        let sql = r"create table demo(
2377                             host string,
2378                             ts int64,
2379                             cpu float64 default 0,
2380                             memory float64,
2381                             TIME INDEX (ts, host),
2382                             PRIMARY KEY(ts, host)) engine=mito;
2383         ";
2384        let result =
2385            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2386        assert!(result.is_err());
2387        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2388    }
2389
2390    #[test]
2391    fn test_duplicated_time_index() {
2392        let sql = r"create table demo(
2393                             host string,
2394                             ts timestamp time index,
2395                             t timestamp time index,
2396                             cpu float64 default 0,
2397                             memory float64,
2398                             TIME INDEX (ts, host),
2399                             PRIMARY KEY(ts, host)) engine=mito;
2400         ";
2401        let result =
2402            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2403        assert!(result.is_err());
2404        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2405
2406        let sql = r"create table demo(
2407                             host string,
2408                             ts timestamp time index,
2409                             cpu float64 default 0,
2410                             t timestamp,
2411                             memory float64,
2412                             TIME INDEX (t),
2413                             PRIMARY KEY(ts, host)) engine=mito;
2414         ";
2415        let result =
2416            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2417        assert!(result.is_err());
2418        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2419    }
2420
2421    #[test]
2422    fn test_invalid_column_name() {
2423        let sql = "create table foo(user string, i timestamp time index)";
2424        let result =
2425            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2426        let err = result.unwrap_err().output_msg();
2427        assert!(err.contains("Cannot use keyword 'user' as column name"));
2428
2429        // If column name is quoted, it's valid even same with keyword.
2430        let sql = r#"
2431            create table foo("user" string, i timestamp time index)
2432        "#;
2433        let result =
2434            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2435        let _ = result.unwrap();
2436    }
2437
2438    #[test]
2439    fn test_incorrect_default_value_issue_3479() {
2440        let sql = r#"CREATE TABLE `ExcePTuRi`(
2441non TIMESTAMP(6) TIME INDEX,
2442`iUSTO` DOUBLE DEFAULT 0.047318541668048164
2443)"#;
2444        let result =
2445            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2446                .unwrap();
2447        assert_eq!(1, result.len());
2448        match &result[0] {
2449            Statement::CreateTable(c) => {
2450                assert_eq!(
2451                    "`iUSTO` DOUBLE DEFAULT 0.047318541668048164",
2452                    c.columns[1].to_string()
2453                );
2454            }
2455            _ => unreachable!(),
2456        }
2457    }
2458
2459    #[test]
2460    fn test_parse_create_view() {
2461        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
2462
2463        let result =
2464            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2465                .unwrap();
2466        match &result[0] {
2467            Statement::CreateView(c) => {
2468                assert_eq!(c.to_string(), sql);
2469                assert!(!c.or_replace);
2470                assert!(!c.if_not_exists);
2471                assert_eq!("test", c.name.to_string());
2472            }
2473            _ => unreachable!(),
2474        }
2475
2476        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS";
2477
2478        let result =
2479            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2480                .unwrap();
2481        match &result[0] {
2482            Statement::CreateView(c) => {
2483                assert_eq!(c.to_string(), sql);
2484                assert!(c.or_replace);
2485                assert!(c.if_not_exists);
2486                assert_eq!("test", c.name.to_string());
2487            }
2488            _ => unreachable!(),
2489        }
2490    }
2491
2492    #[test]
2493    fn test_parse_create_view_invalid_query() {
2494        let sql = "CREATE VIEW test AS DELETE from demo";
2495        let result =
2496            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2497        assert!(result.is_ok_and(|x| x.len() == 1));
2498    }
2499
2500    #[test]
2501    fn test_parse_create_table_fulltext_options() {
2502        let sql1 = r"
2503CREATE TABLE log (
2504    ts TIMESTAMP TIME INDEX,
2505    msg TEXT FULLTEXT INDEX,
2506)";
2507        let result1 = ParserContext::create_with_dialect(
2508            sql1,
2509            &GreptimeDbDialect {},
2510            ParseOptions::default(),
2511        )
2512        .unwrap();
2513
2514        if let Statement::CreateTable(c) = &result1[0] {
2515            c.columns.iter().for_each(|col| {
2516                if col.name().value == "msg" {
2517                    assert!(
2518                        col.extensions
2519                            .fulltext_index_options
2520                            .as_ref()
2521                            .unwrap()
2522                            .is_empty()
2523                    );
2524                }
2525            });
2526        } else {
2527            panic!("should be create_table statement");
2528        }
2529
2530        let sql2 = r"
2531CREATE TABLE log (
2532    ts TIMESTAMP TIME INDEX,
2533    msg STRING FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false')
2534)";
2535        let result2 = ParserContext::create_with_dialect(
2536            sql2,
2537            &GreptimeDbDialect {},
2538            ParseOptions::default(),
2539        )
2540        .unwrap();
2541
2542        if let Statement::CreateTable(c) = &result2[0] {
2543            c.columns.iter().for_each(|col| {
2544                if col.name().value == "msg" {
2545                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2546                    assert_eq!(options.len(), 2);
2547                    assert_eq!(options.get("analyzer").unwrap(), "English");
2548                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2549                }
2550            });
2551        } else {
2552            panic!("should be create_table statement");
2553        }
2554
2555        let sql3 = r"
2556CREATE TABLE log (
2557    ts TIMESTAMP TIME INDEX,
2558    msg1 TINYTEXT FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false'),
2559    msg2 CHAR(20) FULLTEXT INDEX WITH (analyzer='Chinese', case_sensitive='true')
2560)";
2561        let result3 = ParserContext::create_with_dialect(
2562            sql3,
2563            &GreptimeDbDialect {},
2564            ParseOptions::default(),
2565        )
2566        .unwrap();
2567
2568        if let Statement::CreateTable(c) = &result3[0] {
2569            c.columns.iter().for_each(|col| {
2570                if col.name().value == "msg1" {
2571                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2572                    assert_eq!(options.len(), 2);
2573                    assert_eq!(options.get("analyzer").unwrap(), "English");
2574                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2575                } else if col.name().value == "msg2" {
2576                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2577                    assert_eq!(options.len(), 2);
2578                    assert_eq!(options.get("analyzer").unwrap(), "Chinese");
2579                    assert_eq!(options.get("case_sensitive").unwrap(), "true");
2580                }
2581            });
2582        } else {
2583            panic!("should be create_table statement");
2584        }
2585    }
2586
2587    #[test]
2588    fn test_parse_create_table_fulltext_options_invalid_type() {
2589        let sql = r"
2590CREATE TABLE log (
2591    ts TIMESTAMP TIME INDEX,
2592    msg INT FULLTEXT INDEX,
2593)";
2594        let result =
2595            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2596        assert!(result.is_err());
2597        assert!(
2598            result
2599                .unwrap_err()
2600                .to_string()
2601                .contains("FULLTEXT index only supports string type")
2602        );
2603    }
2604
2605    #[test]
2606    fn test_parse_create_table_fulltext_options_duplicate() {
2607        let sql = r"
2608CREATE TABLE log (
2609    ts TIMESTAMP TIME INDEX,
2610    msg STRING FULLTEXT INDEX WITH (analyzer='English', analyzer='Chinese') FULLTEXT INDEX WITH (case_sensitive='false')
2611)";
2612        let result =
2613            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2614        assert!(result.is_err());
2615        assert!(
2616            result
2617                .unwrap_err()
2618                .to_string()
2619                .contains("duplicated FULLTEXT INDEX option")
2620        );
2621    }
2622
2623    #[test]
2624    fn test_parse_create_table_fulltext_options_invalid_option() {
2625        let sql = r"
2626CREATE TABLE log (
2627    ts TIMESTAMP TIME INDEX,
2628    msg STRING FULLTEXT INDEX WITH (analyzer='English', invalid_option='Chinese')
2629)";
2630        let result =
2631            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2632        assert!(result.is_err());
2633        assert!(
2634            result
2635                .unwrap_err()
2636                .to_string()
2637                .contains("invalid FULLTEXT INDEX option")
2638        );
2639    }
2640
2641    #[test]
2642    fn test_parse_create_table_skip_options() {
2643        let sql = r"
2644CREATE TABLE log (
2645    ts TIMESTAMP TIME INDEX,
2646    msg INT SKIPPING INDEX WITH (granularity='8192', type='bloom'),
2647)";
2648        let result =
2649            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2650                .unwrap();
2651
2652        if let Statement::CreateTable(c) = &result[0] {
2653            c.columns.iter().for_each(|col| {
2654                if col.name().value == "msg" {
2655                    assert!(
2656                        !col.extensions
2657                            .skipping_index_options
2658                            .as_ref()
2659                            .unwrap()
2660                            .is_empty()
2661                    );
2662                }
2663            });
2664        } else {
2665            panic!("should be create_table statement");
2666        }
2667
2668        let sql = r"
2669        CREATE TABLE log (
2670            ts TIMESTAMP TIME INDEX,
2671            msg INT SKIPPING INDEX,
2672        )";
2673        let result =
2674            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2675                .unwrap();
2676
2677        if let Statement::CreateTable(c) = &result[0] {
2678            c.columns.iter().for_each(|col| {
2679                if col.name().value == "msg" {
2680                    assert!(
2681                        col.extensions
2682                            .skipping_index_options
2683                            .as_ref()
2684                            .unwrap()
2685                            .is_empty()
2686                    );
2687                }
2688            });
2689        } else {
2690            panic!("should be create_table statement");
2691        }
2692    }
2693
2694    #[test]
2695    fn test_parse_create_view_with_columns() {
2696        let sql = "CREATE VIEW test () AS SELECT * FROM NUMBERS";
2697        let result =
2698            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2699                .unwrap();
2700
2701        match &result[0] {
2702            Statement::CreateView(c) => {
2703                assert_eq!(c.to_string(), "CREATE VIEW test AS SELECT * FROM NUMBERS");
2704                assert!(!c.or_replace);
2705                assert!(!c.if_not_exists);
2706                assert_eq!("test", c.name.to_string());
2707            }
2708            _ => unreachable!(),
2709        }
2710        assert_eq!(
2711            "CREATE VIEW test AS SELECT * FROM NUMBERS",
2712            result[0].to_string()
2713        );
2714
2715        let sql = "CREATE VIEW test (n1) AS SELECT * FROM NUMBERS";
2716        let result =
2717            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2718                .unwrap();
2719
2720        match &result[0] {
2721            Statement::CreateView(c) => {
2722                assert_eq!(c.to_string(), sql);
2723                assert!(!c.or_replace);
2724                assert!(!c.if_not_exists);
2725                assert_eq!("test", c.name.to_string());
2726            }
2727            _ => unreachable!(),
2728        }
2729        assert_eq!(sql, result[0].to_string());
2730
2731        let sql = "CREATE VIEW test (n1, n2) AS SELECT * FROM NUMBERS";
2732        let result =
2733            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2734                .unwrap();
2735
2736        match &result[0] {
2737            Statement::CreateView(c) => {
2738                assert_eq!(c.to_string(), sql);
2739                assert!(!c.or_replace);
2740                assert!(!c.if_not_exists);
2741                assert_eq!("test", c.name.to_string());
2742            }
2743            _ => unreachable!(),
2744        }
2745        assert_eq!(sql, result[0].to_string());
2746
2747        // Some invalid syntax cases
2748        let sql = "CREATE VIEW test (n1 AS select * from demo";
2749        let result =
2750            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2751        assert!(result.is_err());
2752
2753        let sql = "CREATE VIEW test (n1, AS select * from demo";
2754        let result =
2755            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2756        assert!(result.is_err());
2757
2758        let sql = "CREATE VIEW test n1,n2) AS select * from demo";
2759        let result =
2760            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2761        assert!(result.is_err());
2762
2763        let sql = "CREATE VIEW test (1) AS select * from demo";
2764        let result =
2765            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2766        assert!(result.is_err());
2767
2768        // keyword
2769        let sql = "CREATE VIEW test (n1, select) AS select * from demo";
2770        let result =
2771            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2772        assert!(result.is_err());
2773    }
2774
2775    #[test]
2776    fn test_parse_column_extensions_vector() {
2777        // Test that vector options are parsed from data_type (no additional SQL needed)
2778        let sql = "";
2779        let dialect = GenericDialect {};
2780        let mut tokenizer = Tokenizer::new(&dialect, sql);
2781        let tokens = tokenizer.tokenize().unwrap();
2782        let mut parser = Parser::new(&dialect).with_tokens(tokens);
2783        let name = Ident::new("vec_col");
2784        let data_type =
2785            DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
2786        let mut extensions = ColumnExtensions::default();
2787
2788        let result =
2789            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2790        assert!(result.is_ok());
2791        assert!(extensions.vector_options.is_some());
2792        let vector_options = extensions.vector_options.unwrap();
2793        assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some("128"));
2794    }
2795
2796    #[test]
2797    fn test_parse_column_extensions_vector_invalid() {
2798        // Test that vector with no dimension fails
2799        let sql = "";
2800        let dialect = GenericDialect {};
2801        let mut tokenizer = Tokenizer::new(&dialect, sql);
2802        let tokens = tokenizer.tokenize().unwrap();
2803        let mut parser = Parser::new(&dialect).with_tokens(tokens);
2804        let name = Ident::new("vec_col");
2805        let data_type = DataType::Custom(vec![Ident::new("VECTOR")].into(), vec![]);
2806        let mut extensions = ColumnExtensions::default();
2807
2808        let result =
2809            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2810        assert!(result.is_err());
2811    }
2812
2813    #[test]
2814    fn test_parse_column_extensions_indices() {
2815        // Test skipping index
2816        {
2817            let sql = "SKIPPING INDEX";
2818            let dialect = GenericDialect {};
2819            let mut tokenizer = Tokenizer::new(&dialect, sql);
2820            let tokens = tokenizer.tokenize().unwrap();
2821            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2822            let name = Ident::new("col");
2823            let data_type = DataType::String(None);
2824            let mut extensions = ColumnExtensions::default();
2825            let result = ParserContext::parse_column_extensions(
2826                &mut parser,
2827                &name,
2828                &data_type,
2829                &mut extensions,
2830            );
2831            assert!(result.is_ok());
2832            assert!(extensions.skipping_index_options.is_some());
2833        }
2834
2835        // Test fulltext index with options
2836        {
2837            let sql = "FULLTEXT INDEX WITH (analyzer = 'English', case_sensitive = 'true')";
2838            let dialect = GenericDialect {};
2839            let mut tokenizer = Tokenizer::new(&dialect, sql);
2840            let tokens = tokenizer.tokenize().unwrap();
2841            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2842            let name = Ident::new("text_col");
2843            let data_type = DataType::String(None);
2844            let mut extensions = ColumnExtensions::default();
2845            let result = ParserContext::parse_column_extensions(
2846                &mut parser,
2847                &name,
2848                &data_type,
2849                &mut extensions,
2850            );
2851            assert!(result.unwrap());
2852            assert!(extensions.fulltext_index_options.is_some());
2853            let fulltext_options = extensions.fulltext_index_options.unwrap();
2854            assert_eq!(fulltext_options.get("analyzer"), Some("English"));
2855            assert_eq!(fulltext_options.get("case_sensitive"), Some("true"));
2856        }
2857
2858        // Test fulltext index with invalid type (should fail)
2859        {
2860            let sql = "FULLTEXT INDEX WITH (analyzer = 'English')";
2861            let dialect = GenericDialect {};
2862            let mut tokenizer = Tokenizer::new(&dialect, sql);
2863            let tokens = tokenizer.tokenize().unwrap();
2864            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2865            let name = Ident::new("num_col");
2866            let data_type = DataType::Int(None); // Non-string type
2867            let mut extensions = ColumnExtensions::default();
2868            let result = ParserContext::parse_column_extensions(
2869                &mut parser,
2870                &name,
2871                &data_type,
2872                &mut extensions,
2873            );
2874            assert!(result.is_err());
2875            assert!(
2876                result
2877                    .unwrap_err()
2878                    .to_string()
2879                    .contains("FULLTEXT index only supports string type")
2880            );
2881        }
2882
2883        // Test fulltext index with invalid option (won't fail, the parser doesn't check the option's content)
2884        {
2885            let sql = "FULLTEXT INDEX WITH (analyzer = 'Invalid', case_sensitive = 'true')";
2886            let dialect = GenericDialect {};
2887            let mut tokenizer = Tokenizer::new(&dialect, sql);
2888            let tokens = tokenizer.tokenize().unwrap();
2889            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2890            let name = Ident::new("text_col");
2891            let data_type = DataType::String(None);
2892            let mut extensions = ColumnExtensions::default();
2893            let result = ParserContext::parse_column_extensions(
2894                &mut parser,
2895                &name,
2896                &data_type,
2897                &mut extensions,
2898            );
2899            assert!(result.unwrap());
2900        }
2901
2902        // Test inverted index
2903        {
2904            let sql = "INVERTED INDEX";
2905            let dialect = GenericDialect {};
2906            let mut tokenizer = Tokenizer::new(&dialect, sql);
2907            let tokens = tokenizer.tokenize().unwrap();
2908            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2909            let name = Ident::new("col");
2910            let data_type = DataType::String(None);
2911            let mut extensions = ColumnExtensions::default();
2912            let result = ParserContext::parse_column_extensions(
2913                &mut parser,
2914                &name,
2915                &data_type,
2916                &mut extensions,
2917            );
2918            assert!(result.is_ok());
2919            assert!(extensions.inverted_index_options.is_some());
2920        }
2921
2922        // Test inverted index with options (should fail)
2923        {
2924            let sql = "INVERTED INDEX WITH (analyzer = 'English')";
2925            let dialect = GenericDialect {};
2926            let mut tokenizer = Tokenizer::new(&dialect, sql);
2927            let tokens = tokenizer.tokenize().unwrap();
2928            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2929            let name = Ident::new("col");
2930            let data_type = DataType::String(None);
2931            let mut extensions = ColumnExtensions::default();
2932            let result = ParserContext::parse_column_extensions(
2933                &mut parser,
2934                &name,
2935                &data_type,
2936                &mut extensions,
2937            );
2938            assert!(result.is_err());
2939            assert!(
2940                result
2941                    .unwrap_err()
2942                    .to_string()
2943                    .contains("INVERTED index doesn't support options")
2944            );
2945        }
2946
2947        // Test multiple indices
2948        {
2949            let sql = "SKIPPING INDEX FULLTEXT INDEX";
2950            let dialect = GenericDialect {};
2951            let mut tokenizer = Tokenizer::new(&dialect, sql);
2952            let tokens = tokenizer.tokenize().unwrap();
2953            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2954            let name = Ident::new("col");
2955            let data_type = DataType::String(None);
2956            let mut extensions = ColumnExtensions::default();
2957            let result = ParserContext::parse_column_extensions(
2958                &mut parser,
2959                &name,
2960                &data_type,
2961                &mut extensions,
2962            );
2963            assert!(result.unwrap());
2964            assert!(extensions.skipping_index_options.is_some());
2965            assert!(extensions.fulltext_index_options.is_some());
2966        }
2967    }
2968
2969    #[test]
2970    fn test_parse_interval_cast() {
2971        let s = "select '10s'::INTERVAL";
2972        let stmts =
2973            ParserContext::create_with_dialect(s, &GreptimeDbDialect {}, ParseOptions::default())
2974                .unwrap();
2975        assert_eq!("SELECT '10 seconds'::INTERVAL", &stmts[0].to_string());
2976    }
2977
2978    #[test]
2979    fn test_parse_create_table_vector_index_options() {
2980        // Test basic vector index
2981        let sql = r"
2982CREATE TABLE vectors (
2983    ts TIMESTAMP TIME INDEX,
2984    vec VECTOR(128) VECTOR INDEX,
2985)";
2986        let result =
2987            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2988                .unwrap();
2989
2990        if let Statement::CreateTable(c) = &result[0] {
2991            c.columns.iter().for_each(|col| {
2992                if col.name().value == "vec" {
2993                    assert!(
2994                        col.extensions
2995                            .vector_index_options
2996                            .as_ref()
2997                            .unwrap()
2998                            .is_empty()
2999                    );
3000                }
3001            });
3002        } else {
3003            panic!("should be create_table statement");
3004        }
3005
3006        // Test vector index with options
3007        let sql = r"
3008CREATE TABLE vectors (
3009    ts TIMESTAMP TIME INDEX,
3010    vec VECTOR(128) VECTOR INDEX WITH (metric='cosine', connectivity='32', expansion_add='256', expansion_search='128')
3011)";
3012        let result =
3013            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3014                .unwrap();
3015
3016        if let Statement::CreateTable(c) = &result[0] {
3017            c.columns.iter().for_each(|col| {
3018                if col.name().value == "vec" {
3019                    let options = col.extensions.vector_index_options.as_ref().unwrap();
3020                    assert_eq!(options.len(), 4);
3021                    assert_eq!(options.get("metric").unwrap(), "cosine");
3022                    assert_eq!(options.get("connectivity").unwrap(), "32");
3023                    assert_eq!(options.get("expansion_add").unwrap(), "256");
3024                    assert_eq!(options.get("expansion_search").unwrap(), "128");
3025                }
3026            });
3027        } else {
3028            panic!("should be create_table statement");
3029        }
3030    }
3031
3032    #[test]
3033    fn test_parse_create_table_vector_index_invalid_type() {
3034        // Test vector index on non-vector type (should fail)
3035        let sql = r"
3036CREATE TABLE vectors (
3037    ts TIMESTAMP TIME INDEX,
3038    col INT VECTOR INDEX,
3039)";
3040        let result =
3041            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3042        assert!(result.is_err());
3043        assert!(
3044            result
3045                .unwrap_err()
3046                .to_string()
3047                .contains("VECTOR INDEX only supports Vector type columns")
3048        );
3049    }
3050
3051    #[test]
3052    fn test_parse_create_table_vector_index_duplicate() {
3053        // Test duplicate vector index (should fail)
3054        let sql = r"
3055CREATE TABLE vectors (
3056    ts TIMESTAMP TIME INDEX,
3057    vec VECTOR(128) VECTOR INDEX VECTOR INDEX,
3058)";
3059        let result =
3060            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3061        assert!(result.is_err());
3062        assert!(
3063            result
3064                .unwrap_err()
3065                .to_string()
3066                .contains("duplicated VECTOR INDEX option")
3067        );
3068    }
3069
3070    #[test]
3071    fn test_parse_create_table_vector_index_invalid_option() {
3072        // Test invalid option key (should fail)
3073        let sql = r"
3074CREATE TABLE vectors (
3075    ts TIMESTAMP TIME INDEX,
3076    vec VECTOR(128) VECTOR INDEX WITH (metric='l2sq', invalid_option='foo')
3077)";
3078        let result =
3079            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3080        assert!(result.is_err());
3081        assert!(
3082            result
3083                .unwrap_err()
3084                .to_string()
3085                .contains("invalid VECTOR INDEX option")
3086        );
3087    }
3088
3089    #[test]
3090    fn test_parse_column_extensions_vector_index() {
3091        // Test vector index on vector type
3092        {
3093            let sql = "VECTOR INDEX WITH (metric = 'l2sq')";
3094            let dialect = GenericDialect {};
3095            let mut tokenizer = Tokenizer::new(&dialect, sql);
3096            let tokens = tokenizer.tokenize().unwrap();
3097            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3098            let name = Ident::new("vec_col");
3099            let data_type =
3100                DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
3101            // First, parse the vector type to set vector_options
3102            let mut extensions = ColumnExtensions {
3103                vector_options: Some(OptionMap::from([(
3104                    VECTOR_OPT_DIM.to_string(),
3105                    "128".to_string(),
3106                )])),
3107                ..Default::default()
3108            };
3109
3110            let result = ParserContext::parse_column_extensions(
3111                &mut parser,
3112                &name,
3113                &data_type,
3114                &mut extensions,
3115            );
3116            assert!(result.is_ok());
3117            assert!(extensions.vector_index_options.is_some());
3118            let vi_options = extensions.vector_index_options.unwrap();
3119            assert_eq!(vi_options.get("metric"), Some("l2sq"));
3120        }
3121
3122        // Test vector index on non-vector type (should fail)
3123        {
3124            let sql = "VECTOR INDEX";
3125            let dialect = GenericDialect {};
3126            let mut tokenizer = Tokenizer::new(&dialect, sql);
3127            let tokens = tokenizer.tokenize().unwrap();
3128            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3129            let name = Ident::new("num_col");
3130            let data_type = DataType::Int(None); // Non-vector type
3131            let mut extensions = ColumnExtensions::default();
3132            let result = ParserContext::parse_column_extensions(
3133                &mut parser,
3134                &name,
3135                &data_type,
3136                &mut extensions,
3137            );
3138            assert!(result.is_err());
3139            assert!(
3140                result
3141                    .unwrap_err()
3142                    .to_string()
3143                    .contains("VECTOR INDEX only supports Vector type columns")
3144            );
3145        }
3146    }
3147}