sql/parsers/
create_parser.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod json;
16#[cfg(feature = "enterprise")]
17pub mod trigger;
18
19use std::collections::HashMap;
20
21use arrow_buffer::IntervalMonthDayNano;
22use common_catalog::consts::default_engine;
23use datafusion_common::ScalarValue;
24use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
25use datatypes::data_type::ConcreteDataType;
26use itertools::Itertools;
27use snafu::{OptionExt, ResultExt, ensure};
28use sqlparser::ast::{ColumnOption, ColumnOptionDef, DataType, Expr};
29use sqlparser::dialect::keywords::Keyword;
30use sqlparser::keywords::ALL_KEYWORDS;
31use sqlparser::parser::IsOptional::Mandatory;
32use sqlparser::parser::{Parser, ParserError};
33use sqlparser::tokenizer::{Token, TokenWithSpan, Word};
34use table::requests::validate_database_option;
35
36use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
37use crate::error::{
38    self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidIntervalSnafu,
39    InvalidSqlSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result, SyntaxSnafu,
40    UnexpectedSnafu, UnsupportedSnafu,
41};
42use crate::parser::{FLOW, ParserContext};
43use crate::parsers::tql_parser;
44use crate::parsers::utils::{
45    self, parse_with_options, validate_column_fulltext_create_option,
46    validate_column_skipping_index_create_option, validate_column_vector_index_create_option,
47};
48use crate::statements::create::{
49    Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
50    CreateTableLike, CreateView, Partitions, SqlOrTql, TableConstraint, VECTOR_OPT_DIM,
51};
52use crate::statements::statement::Statement;
53use crate::statements::transform::type_alias::get_data_type_by_alias_name;
54use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
55use crate::util::{OptionValue, location_to_index, parse_option_string};
56
57pub const ENGINE: &str = "ENGINE";
58pub const MAXVALUE: &str = "MAXVALUE";
59pub const SINK: &str = "SINK";
60pub const EXPIRE: &str = "EXPIRE";
61pub const AFTER: &str = "AFTER";
62pub const INVERTED: &str = "INVERTED";
63pub const SKIPPING: &str = "SKIPPING";
64pub const VECTOR: &str = "VECTOR";
65
66pub type RawIntervalExpr = String;
67
68/// Parses create [table] statement
69impl<'a> ParserContext<'a> {
70    pub(crate) fn parse_create(&mut self) -> Result<Statement> {
71        match self.parser.peek_token().token {
72            Token::Word(w) => match w.keyword {
73                Keyword::TABLE => self.parse_create_table(),
74
75                Keyword::SCHEMA | Keyword::DATABASE => self.parse_create_database(),
76
77                Keyword::EXTERNAL => self.parse_create_external_table(),
78
79                Keyword::OR => {
80                    let _ = self.parser.next_token();
81                    self.parser
82                        .expect_keyword(Keyword::REPLACE)
83                        .context(SyntaxSnafu)?;
84                    match self.parser.next_token().token {
85                        Token::Word(w) => match w.keyword {
86                            Keyword::VIEW => self.parse_create_view(true),
87                            Keyword::NoKeyword => {
88                                let uppercase = w.value.to_uppercase();
89                                match uppercase.as_str() {
90                                    FLOW => self.parse_create_flow(true),
91                                    _ => self.unsupported(w.to_string()),
92                                }
93                            }
94                            _ => self.unsupported(w.to_string()),
95                        },
96                        _ => self.unsupported(w.to_string()),
97                    }
98                }
99
100                Keyword::VIEW => {
101                    let _ = self.parser.next_token();
102                    self.parse_create_view(false)
103                }
104
105                #[cfg(feature = "enterprise")]
106                Keyword::TRIGGER => {
107                    let _ = self.parser.next_token();
108                    self.parse_create_trigger()
109                }
110
111                Keyword::NoKeyword => {
112                    let _ = self.parser.next_token();
113                    let uppercase = w.value.to_uppercase();
114                    match uppercase.as_str() {
115                        FLOW => self.parse_create_flow(false),
116                        _ => self.unsupported(w.to_string()),
117                    }
118                }
119                _ => self.unsupported(w.to_string()),
120            },
121            unexpected => self.unsupported(unexpected.to_string()),
122        }
123    }
124
125    /// Parse `CREAVE VIEW` statement.
126    fn parse_create_view(&mut self, or_replace: bool) -> Result<Statement> {
127        let if_not_exists = self.parse_if_not_exist()?;
128        let view_name = self.intern_parse_table_name()?;
129
130        let columns = self.parse_view_columns()?;
131
132        self.parser
133            .expect_keyword(Keyword::AS)
134            .context(SyntaxSnafu)?;
135
136        let query = self.parse_query()?;
137
138        Ok(Statement::CreateView(CreateView {
139            name: view_name,
140            columns,
141            or_replace,
142            query: Box::new(query),
143            if_not_exists,
144        }))
145    }
146
147    fn parse_view_columns(&mut self) -> Result<Vec<Ident>> {
148        let mut columns = vec![];
149        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
150            return Ok(columns);
151        }
152
153        loop {
154            let name = self.parse_column_name().context(SyntaxSnafu)?;
155
156            columns.push(name);
157
158            let comma = self.parser.consume_token(&Token::Comma);
159            if self.parser.consume_token(&Token::RParen) {
160                // allow a trailing comma, even though it's not in standard
161                break;
162            } else if !comma {
163                return self.expected("',' or ')' after column name", self.parser.peek_token());
164            }
165        }
166
167        Ok(columns)
168    }
169
170    fn parse_create_external_table(&mut self) -> Result<Statement> {
171        let _ = self.parser.next_token();
172        self.parser
173            .expect_keyword(Keyword::TABLE)
174            .context(SyntaxSnafu)?;
175        let if_not_exists = self.parse_if_not_exist()?;
176        let table_name = self.intern_parse_table_name()?;
177        let (columns, constraints) = self.parse_columns()?;
178        if !columns.is_empty() {
179            validate_time_index(&columns, &constraints)?;
180        }
181
182        let engine = self.parse_table_engine(common_catalog::consts::FILE_ENGINE)?;
183        let options = self.parse_create_table_options()?;
184        Ok(Statement::CreateExternalTable(CreateExternalTable {
185            name: table_name,
186            columns,
187            constraints,
188            options,
189            if_not_exists,
190            engine,
191        }))
192    }
193
194    fn parse_create_database(&mut self) -> Result<Statement> {
195        let _ = self.parser.next_token();
196        let if_not_exists = self.parse_if_not_exist()?;
197        let database_name = self.parse_object_name().context(error::UnexpectedSnafu {
198            expected: "a database name",
199            actual: self.peek_token_as_string(),
200        })?;
201        let database_name = Self::canonicalize_object_name(database_name)?;
202
203        let options = self
204            .parser
205            .parse_options(Keyword::WITH)
206            .context(SyntaxSnafu)?
207            .into_iter()
208            .map(parse_option_string)
209            .collect::<Result<HashMap<String, OptionValue>>>()?;
210
211        for key in options.keys() {
212            ensure!(
213                validate_database_option(key),
214                InvalidDatabaseOptionSnafu { key: key.clone() }
215            );
216        }
217        if let Some(append_mode) = options.get("append_mode").and_then(|x| x.as_string())
218            && append_mode == "true"
219            && options.contains_key("merge_mode")
220        {
221            return InvalidDatabaseOptionSnafu {
222                key: "merge_mode".to_string(),
223            }
224            .fail();
225        }
226
227        Ok(Statement::CreateDatabase(CreateDatabase {
228            name: database_name,
229            if_not_exists,
230            options: OptionMap::new(options),
231        }))
232    }
233
234    fn parse_create_table(&mut self) -> Result<Statement> {
235        let _ = self.parser.next_token();
236
237        let if_not_exists = self.parse_if_not_exist()?;
238
239        let table_name = self.intern_parse_table_name()?;
240
241        if self.parser.parse_keyword(Keyword::LIKE) {
242            let source_name = self.intern_parse_table_name()?;
243
244            return Ok(Statement::CreateTableLike(CreateTableLike {
245                table_name,
246                source_name,
247            }));
248        }
249
250        let (columns, constraints) = self.parse_columns()?;
251        validate_time_index(&columns, &constraints)?;
252
253        let partitions = self.parse_partitions()?;
254        if let Some(partitions) = &partitions {
255            validate_partitions(&columns, partitions)?;
256        }
257
258        let engine = self.parse_table_engine(default_engine())?;
259        let options = self.parse_create_table_options()?;
260        let create_table = CreateTable {
261            if_not_exists,
262            name: table_name,
263            columns,
264            engine,
265            constraints,
266            options,
267            table_id: 0, // table id is assigned by catalog manager
268            partitions,
269        };
270
271        Ok(Statement::CreateTable(create_table))
272    }
273
274    /// "CREATE FLOW" clause
275    fn parse_create_flow(&mut self, or_replace: bool) -> Result<Statement> {
276        let if_not_exists = self.parse_if_not_exist()?;
277
278        let flow_name = self.intern_parse_table_name()?;
279
280        // make `SINK` case in-sensitive
281        if let Token::Word(word) = self.parser.peek_token().token
282            && word.value.eq_ignore_ascii_case(SINK)
283        {
284            self.parser.next_token();
285        } else {
286            Err(ParserError::ParserError(
287                "Expect `SINK` keyword".to_string(),
288            ))
289            .context(SyntaxSnafu)?
290        }
291        self.parser
292            .expect_keyword(Keyword::TO)
293            .context(SyntaxSnafu)?;
294
295        let output_table_name = self.intern_parse_table_name()?;
296
297        let expire_after = if let Token::Word(w1) = &self.parser.peek_token().token
298            && w1.value.eq_ignore_ascii_case(EXPIRE)
299        {
300            self.parser.next_token();
301            if let Token::Word(w2) = &self.parser.peek_token().token
302                && w2.value.eq_ignore_ascii_case(AFTER)
303            {
304                self.parser.next_token();
305                Some(self.parse_interval_no_month("EXPIRE AFTER")?)
306            } else {
307                None
308            }
309        } else {
310            None
311        };
312
313        let eval_interval = if self
314            .parser
315            .consume_tokens(&[Token::make_keyword("EVAL"), Token::make_keyword("INTERVAL")])
316        {
317            Some(self.parse_interval_no_month("EVAL INTERVAL")?)
318        } else {
319            None
320        };
321
322        let comment = if self.parser.parse_keyword(Keyword::COMMENT) {
323            match self.parser.next_token() {
324                TokenWithSpan {
325                    token: Token::SingleQuotedString(value, ..),
326                    ..
327                } => Some(value),
328                unexpected => {
329                    return self
330                        .parser
331                        .expected("string", unexpected)
332                        .context(SyntaxSnafu);
333                }
334            }
335        } else {
336            None
337        };
338
339        self.parser
340            .expect_keyword(Keyword::AS)
341            .context(SyntaxSnafu)?;
342
343        let query = Box::new(self.parse_sql_or_tql(true)?);
344
345        Ok(Statement::CreateFlow(CreateFlow {
346            flow_name,
347            sink_table_name: output_table_name,
348            or_replace,
349            if_not_exists,
350            expire_after,
351            eval_interval,
352            comment,
353            query,
354        }))
355    }
356
357    fn parse_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
358        let start_loc = self.parser.peek_token().span.start;
359        let start_index = location_to_index(self.sql, &start_loc);
360
361        // only accept sql or tql
362        let query = match self.parser.peek_token().token {
363            Token::Word(w) => match w.keyword {
364                Keyword::SELECT => self.parse_query(),
365                Keyword::NoKeyword
366                    if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
367                {
368                    self.parse_tql(require_now_expr)
369                }
370
371                _ => self.unsupported(self.peek_token_as_string()),
372            },
373            _ => self.unsupported(self.peek_token_as_string()),
374        }?;
375
376        let end_token = self.parser.peek_token();
377
378        let raw_query = if end_token == Token::EOF {
379            &self.sql[start_index..]
380        } else {
381            let end_loc = end_token.span.end;
382            let end_index = location_to_index(self.sql, &end_loc);
383            &self.sql[start_index..end_index.min(self.sql.len())]
384        };
385        let raw_query = raw_query.trim_end_matches(";");
386        let query = SqlOrTql::try_from_statement(query, raw_query)?;
387        Ok(query)
388    }
389
390    /// Parse the interval expr to duration in seconds.
391    fn parse_interval_no_month(&mut self, context: &str) -> Result<i64> {
392        let interval = self.parse_interval_month_day_nano()?.0;
393        if interval.months != 0 {
394            return InvalidIntervalSnafu {
395                reason: format!("Interval with months is not allowed in {context}"),
396            }
397            .fail();
398        }
399        Ok(
400            interval.nanoseconds / 1_000_000_000
401                + interval.days as i64 * 60 * 60 * 24
402                + interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, // 1 month=365.25/12=30.44 days
403                                                                       // this is to keep the same as https://docs.rs/humantime/latest/humantime/fn.parse_duration.html
404                                                                       // which we use in database to parse i.e. ttl interval and many other intervals
405        )
406    }
407
408    /// Parse interval expr to [`IntervalMonthDayNano`].
409    fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> {
410        let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
411        let raw_interval_expr = interval_expr.to_string();
412        let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone(), false)?
413            .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
414            .ok()
415            .with_context(|| InvalidIntervalSnafu {
416                reason: format!("cannot cast {} to interval type", interval_expr),
417            })?;
418        if let ScalarValue::IntervalMonthDayNano(Some(interval)) = interval {
419            Ok((interval, raw_interval_expr))
420        } else {
421            unreachable!()
422        }
423    }
424
425    fn parse_if_not_exist(&mut self) -> Result<bool> {
426        match self.parser.peek_token().token {
427            Token::Word(w) if Keyword::IF != w.keyword => return Ok(false),
428            _ => {}
429        }
430
431        if self.parser.parse_keywords(&[Keyword::IF, Keyword::NOT]) {
432            return self
433                .parser
434                .expect_keyword(Keyword::EXISTS)
435                .map(|_| true)
436                .context(UnexpectedSnafu {
437                    expected: "EXISTS",
438                    actual: self.peek_token_as_string(),
439                });
440        }
441
442        if self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]) {
443            return UnsupportedSnafu { keyword: "EXISTS" }.fail();
444        }
445
446        Ok(false)
447    }
448
449    fn parse_create_table_options(&mut self) -> Result<OptionMap> {
450        parse_with_options(&mut self.parser)
451    }
452
453    /// "PARTITION ON COLUMNS (...)" clause
454    fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
455        if !self.parser.parse_keyword(Keyword::PARTITION) {
456            return Ok(None);
457        }
458        self.parser
459            .expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
460            .context(error::UnexpectedSnafu {
461                expected: "ON, COLUMNS",
462                actual: self.peek_token_as_string(),
463            })?;
464
465        let raw_column_list = self
466            .parser
467            .parse_parenthesized_column_list(Mandatory, false)
468            .context(error::SyntaxSnafu)?;
469        let column_list = raw_column_list
470            .into_iter()
471            .map(Self::canonicalize_identifier)
472            .collect();
473
474        let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
475
476        Ok(Some(Partitions { column_list, exprs }))
477    }
478
479    fn parse_partition_entry(&mut self) -> Result<Expr> {
480        self.parser.parse_expr().context(error::SyntaxSnafu)
481    }
482
483    /// Parse a comma-separated list wrapped by "()", and of which all items accepted by `F`
484    fn parse_comma_separated<T, F>(&mut self, mut f: F) -> Result<Vec<T>>
485    where
486        F: FnMut(&mut ParserContext<'a>) -> Result<T>,
487    {
488        self.parser
489            .expect_token(&Token::LParen)
490            .context(error::UnexpectedSnafu {
491                expected: "(",
492                actual: self.peek_token_as_string(),
493            })?;
494
495        let mut values = vec![];
496        while self.parser.peek_token() != Token::RParen {
497            values.push(f(self)?);
498            if !self.parser.consume_token(&Token::Comma) {
499                break;
500            }
501        }
502
503        self.parser
504            .expect_token(&Token::RParen)
505            .context(error::UnexpectedSnafu {
506                expected: ")",
507                actual: self.peek_token_as_string(),
508            })?;
509
510        Ok(values)
511    }
512
513    /// Parse the columns and constraints.
514    fn parse_columns(&mut self) -> Result<(Vec<Column>, Vec<TableConstraint>)> {
515        let mut columns = vec![];
516        let mut constraints = vec![];
517        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
518            return Ok((columns, constraints));
519        }
520
521        loop {
522            if let Some(constraint) = self.parse_optional_table_constraint()? {
523                constraints.push(constraint);
524            } else if let Token::Word(_) = self.parser.peek_token().token {
525                self.parse_column(&mut columns, &mut constraints)?;
526            } else {
527                return self.expected(
528                    "column name or constraint definition",
529                    self.parser.peek_token(),
530                );
531            }
532            let comma = self.parser.consume_token(&Token::Comma);
533            if self.parser.consume_token(&Token::RParen) {
534                // allow a trailing comma, even though it's not in standard
535                break;
536            } else if !comma {
537                return self.expected(
538                    "',' or ')' after column definition",
539                    self.parser.peek_token(),
540                );
541            }
542        }
543
544        Ok((columns, constraints))
545    }
546
547    fn parse_column(
548        &mut self,
549        columns: &mut Vec<Column>,
550        constraints: &mut Vec<TableConstraint>,
551    ) -> Result<()> {
552        let mut column = self.parse_column_def()?;
553
554        let mut time_index_opt_idx = None;
555        for (index, opt) in column.options().iter().enumerate() {
556            if let ColumnOption::DialectSpecific(tokens) = &opt.option
557                && matches!(
558                    &tokens[..],
559                    [
560                        Token::Word(Word {
561                            keyword: Keyword::TIME,
562                            ..
563                        }),
564                        Token::Word(Word {
565                            keyword: Keyword::INDEX,
566                            ..
567                        })
568                    ]
569                )
570            {
571                ensure!(
572                    time_index_opt_idx.is_none(),
573                    InvalidColumnOptionSnafu {
574                        name: column.name().to_string(),
575                        msg: "duplicated time index",
576                    }
577                );
578                time_index_opt_idx = Some(index);
579
580                let constraint = TableConstraint::TimeIndex {
581                    column: Ident::new(column.name().value.clone()),
582                };
583                constraints.push(constraint);
584            }
585        }
586
587        if let Some(index) = time_index_opt_idx {
588            ensure!(
589                !column.options().contains(&ColumnOptionDef {
590                    option: ColumnOption::Null,
591                    name: None,
592                }),
593                InvalidColumnOptionSnafu {
594                    name: column.name().to_string(),
595                    msg: "time index column can't be null",
596                }
597            );
598
599            // The timestamp type may be an alias type, we have to retrieve the actual type.
600            let data_type = get_unalias_type(column.data_type());
601            ensure!(
602                matches!(data_type, DataType::Timestamp(_, _)),
603                InvalidColumnOptionSnafu {
604                    name: column.name().to_string(),
605                    msg: "time index column data type should be timestamp",
606                }
607            );
608
609            let not_null_opt = ColumnOptionDef {
610                option: ColumnOption::NotNull,
611                name: None,
612            };
613
614            if !column.options().contains(&not_null_opt) {
615                column.mut_options().push(not_null_opt);
616            }
617
618            let _ = column.mut_options().remove(index);
619        }
620
621        columns.push(column);
622
623        Ok(())
624    }
625
626    /// Parse the column name and check if it's valid.
627    fn parse_column_name(&mut self) -> std::result::Result<Ident, ParserError> {
628        let name = self.parser.parse_identifier()?;
629        if name.quote_style.is_none() &&
630        // "ALL_KEYWORDS" are sorted.
631            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()
632        {
633            return Err(ParserError::ParserError(format!(
634                "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
635                &name.value
636            )));
637        }
638
639        Ok(name)
640    }
641
642    pub fn parse_column_def(&mut self) -> Result<Column> {
643        let name = self.parse_column_name().context(SyntaxSnafu)?;
644        let parser = &mut self.parser;
645
646        ensure!(
647            !(name.quote_style.is_none() &&
648            // "ALL_KEYWORDS" are sorted.
649            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()),
650            InvalidSqlSnafu {
651                msg: format!(
652                    "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
653                    &name.value
654                ),
655            }
656        );
657
658        let mut extensions = ColumnExtensions::default();
659
660        let data_type = parser.parse_data_type().context(SyntaxSnafu)?;
661        // Must immediately parse the JSON datatype format because it is closely after the "JSON"
662        // datatype, like this: "JSON(format = ...)".
663        if matches!(data_type, DataType::JSON) {
664            extensions.json_datatype_options = json::parse_json_datatype_options(parser)?;
665        }
666
667        let mut options = vec![];
668        loop {
669            if parser.parse_keyword(Keyword::CONSTRAINT) {
670                let name = Some(parser.parse_identifier().context(SyntaxSnafu)?);
671                if let Some(option) = Self::parse_optional_column_option(parser)? {
672                    options.push(ColumnOptionDef { name, option });
673                } else {
674                    return parser
675                        .expected(
676                            "constraint details after CONSTRAINT <name>",
677                            parser.peek_token(),
678                        )
679                        .context(SyntaxSnafu);
680                }
681            } else if let Some(option) = Self::parse_optional_column_option(parser)? {
682                options.push(ColumnOptionDef { name: None, option });
683            } else if !Self::parse_column_extensions(parser, &name, &data_type, &mut extensions)? {
684                break;
685            };
686        }
687
688        Ok(Column {
689            column_def: ColumnDef {
690                name: Self::canonicalize_identifier(name),
691                data_type,
692                options,
693            },
694            extensions,
695        })
696    }
697
698    fn parse_optional_column_option(parser: &mut Parser<'_>) -> Result<Option<ColumnOption>> {
699        if parser.parse_keywords(&[Keyword::CHARACTER, Keyword::SET]) {
700            Ok(Some(ColumnOption::CharacterSet(
701                parser.parse_object_name(false).context(SyntaxSnafu)?,
702            )))
703        } else if parser.parse_keywords(&[Keyword::NOT, Keyword::NULL]) {
704            Ok(Some(ColumnOption::NotNull))
705        } else if parser.parse_keywords(&[Keyword::COMMENT]) {
706            match parser.next_token() {
707                TokenWithSpan {
708                    token: Token::SingleQuotedString(value, ..),
709                    ..
710                } => Ok(Some(ColumnOption::Comment(value))),
711                unexpected => parser.expected("string", unexpected).context(SyntaxSnafu),
712            }
713        } else if parser.parse_keyword(Keyword::NULL) {
714            Ok(Some(ColumnOption::Null))
715        } else if parser.parse_keyword(Keyword::DEFAULT) {
716            Ok(Some(ColumnOption::Default(
717                parser.parse_expr().context(SyntaxSnafu)?,
718            )))
719        } else if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
720            Ok(Some(ColumnOption::Unique {
721                is_primary: true,
722                characteristics: None,
723            }))
724        } else if parser.parse_keyword(Keyword::UNIQUE) {
725            Ok(Some(ColumnOption::Unique {
726                is_primary: false,
727                characteristics: None,
728            }))
729        } else if parser.parse_keywords(&[Keyword::TIME, Keyword::INDEX]) {
730            // Use a DialectSpecific option for time index
731            Ok(Some(ColumnOption::DialectSpecific(vec![
732                Token::Word(Word {
733                    value: "TIME".to_string(),
734                    quote_style: None,
735                    keyword: Keyword::TIME,
736                }),
737                Token::Word(Word {
738                    value: "INDEX".to_string(),
739                    quote_style: None,
740                    keyword: Keyword::INDEX,
741                }),
742            ])))
743        } else {
744            Ok(None)
745        }
746    }
747
748    /// Parse a column option extensions.
749    ///
750    /// This function will handle:
751    /// - Vector type
752    /// - Indexes
753    fn parse_column_extensions(
754        parser: &mut Parser<'_>,
755        column_name: &Ident,
756        column_type: &DataType,
757        column_extensions: &mut ColumnExtensions,
758    ) -> Result<bool> {
759        if let DataType::Custom(name, tokens) = column_type
760            && name.0.len() == 1
761            && &name.0[0].to_string_unquoted().to_uppercase() == "VECTOR"
762        {
763            ensure!(
764                tokens.len() == 1,
765                InvalidColumnOptionSnafu {
766                    name: column_name.to_string(),
767                    msg: "VECTOR type should have dimension",
768                }
769            );
770
771            let dimension =
772                tokens[0]
773                    .parse::<u32>()
774                    .ok()
775                    .with_context(|| InvalidColumnOptionSnafu {
776                        name: column_name.to_string(),
777                        msg: "dimension should be a positive integer",
778                    })?;
779
780            let options = OptionMap::from([(VECTOR_OPT_DIM.to_string(), dimension.to_string())]);
781            column_extensions.vector_options = Some(options);
782        }
783
784        // parse index options in column definition
785        let mut is_index_declared = false;
786
787        // skipping index
788        if let Token::Word(word) = parser.peek_token().token
789            && word.value.eq_ignore_ascii_case(SKIPPING)
790        {
791            parser.next_token();
792            // Consume `INDEX` keyword
793            ensure!(
794                parser.parse_keyword(Keyword::INDEX),
795                InvalidColumnOptionSnafu {
796                    name: column_name.to_string(),
797                    msg: "expect INDEX after SKIPPING keyword",
798                }
799            );
800            ensure!(
801                column_extensions.skipping_index_options.is_none(),
802                InvalidColumnOptionSnafu {
803                    name: column_name.to_string(),
804                    msg: "duplicated SKIPPING index option",
805                }
806            );
807
808            let options = parser
809                .parse_options(Keyword::WITH)
810                .context(error::SyntaxSnafu)?
811                .into_iter()
812                .map(parse_option_string)
813                .collect::<Result<Vec<_>>>()?;
814
815            for (key, _) in options.iter() {
816                ensure!(
817                    validate_column_skipping_index_create_option(key),
818                    InvalidColumnOptionSnafu {
819                        name: column_name.to_string(),
820                        msg: format!("invalid SKIPPING INDEX option: {key}"),
821                    }
822                );
823            }
824
825            let options = OptionMap::new(options);
826            column_extensions.skipping_index_options = Some(options);
827            is_index_declared |= true;
828        }
829
830        // fulltext index
831        if parser.parse_keyword(Keyword::FULLTEXT) {
832            // Consume `INDEX` keyword
833            ensure!(
834                parser.parse_keyword(Keyword::INDEX),
835                InvalidColumnOptionSnafu {
836                    name: column_name.to_string(),
837                    msg: "expect INDEX after FULLTEXT keyword",
838                }
839            );
840
841            ensure!(
842                column_extensions.fulltext_index_options.is_none(),
843                InvalidColumnOptionSnafu {
844                    name: column_name.to_string(),
845                    msg: "duplicated FULLTEXT INDEX option",
846                }
847            );
848
849            let column_type = get_unalias_type(column_type);
850            let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
851            ensure!(
852                data_type == ConcreteDataType::string_datatype(),
853                InvalidColumnOptionSnafu {
854                    name: column_name.to_string(),
855                    msg: "FULLTEXT index only supports string type",
856                }
857            );
858
859            let options = parser
860                .parse_options(Keyword::WITH)
861                .context(error::SyntaxSnafu)?
862                .into_iter()
863                .map(parse_option_string)
864                .collect::<Result<Vec<_>>>()?;
865
866            for (key, _) in options.iter() {
867                ensure!(
868                    validate_column_fulltext_create_option(key),
869                    InvalidColumnOptionSnafu {
870                        name: column_name.to_string(),
871                        msg: format!("invalid FULLTEXT INDEX option: {key}"),
872                    }
873                );
874            }
875
876            let options = OptionMap::new(options);
877            column_extensions.fulltext_index_options = Some(options);
878            is_index_declared |= true;
879        }
880
881        // inverted index
882        if let Token::Word(word) = parser.peek_token().token
883            && word.value.eq_ignore_ascii_case(INVERTED)
884        {
885            parser.next_token();
886            // Consume `INDEX` keyword
887            ensure!(
888                parser.parse_keyword(Keyword::INDEX),
889                InvalidColumnOptionSnafu {
890                    name: column_name.to_string(),
891                    msg: "expect INDEX after INVERTED keyword",
892                }
893            );
894
895            ensure!(
896                column_extensions.inverted_index_options.is_none(),
897                InvalidColumnOptionSnafu {
898                    name: column_name.to_string(),
899                    msg: "duplicated INVERTED index option",
900                }
901            );
902
903            // inverted index doesn't have options, skipping `WITH`
904            // try cache `WITH` and throw error
905            let with_token = parser.peek_token();
906            ensure!(
907                with_token.token
908                    != Token::Word(Word {
909                        value: "WITH".to_string(),
910                        keyword: Keyword::WITH,
911                        quote_style: None,
912                    }),
913                InvalidColumnOptionSnafu {
914                    name: column_name.to_string(),
915                    msg: "INVERTED index doesn't support options",
916                }
917            );
918
919            column_extensions.inverted_index_options = Some(OptionMap::default());
920            is_index_declared |= true;
921        }
922
923        // vector index
924        if let Token::Word(word) = parser.peek_token().token
925            && word.value.eq_ignore_ascii_case(VECTOR)
926        {
927            parser.next_token();
928            // Consume `INDEX` keyword
929            ensure!(
930                parser.parse_keyword(Keyword::INDEX),
931                InvalidColumnOptionSnafu {
932                    name: column_name.to_string(),
933                    msg: "expect INDEX after VECTOR keyword",
934                }
935            );
936
937            ensure!(
938                column_extensions.vector_index_options.is_none(),
939                InvalidColumnOptionSnafu {
940                    name: column_name.to_string(),
941                    msg: "duplicated VECTOR INDEX option",
942                }
943            );
944
945            // Check that column is a vector type
946            let column_type = get_unalias_type(column_type);
947            let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
948            ensure!(
949                matches!(data_type, ConcreteDataType::Vector(_)),
950                InvalidColumnOptionSnafu {
951                    name: column_name.to_string(),
952                    msg: "VECTOR INDEX only supports Vector type columns",
953                }
954            );
955
956            let options = parser
957                .parse_options(Keyword::WITH)
958                .context(error::SyntaxSnafu)?
959                .into_iter()
960                .map(parse_option_string)
961                .collect::<Result<Vec<_>>>()?;
962
963            for (key, _) in options.iter() {
964                ensure!(
965                    validate_column_vector_index_create_option(key),
966                    InvalidColumnOptionSnafu {
967                        name: column_name.to_string(),
968                        msg: format!("invalid VECTOR INDEX option: {key}"),
969                    }
970                );
971            }
972
973            let options = OptionMap::new(options);
974            column_extensions.vector_index_options = Some(options);
975            is_index_declared |= true;
976        }
977
978        Ok(is_index_declared)
979    }
980
981    fn parse_optional_table_constraint(&mut self) -> Result<Option<TableConstraint>> {
982        match self.parser.next_token() {
983            TokenWithSpan {
984                token: Token::Word(w),
985                ..
986            } if w.keyword == Keyword::PRIMARY => {
987                self.parser
988                    .expect_keyword(Keyword::KEY)
989                    .context(error::UnexpectedSnafu {
990                        expected: "KEY",
991                        actual: self.peek_token_as_string(),
992                    })?;
993                let raw_columns = self
994                    .parser
995                    .parse_parenthesized_column_list(Mandatory, false)
996                    .context(error::SyntaxSnafu)?;
997                let columns = raw_columns
998                    .into_iter()
999                    .map(Self::canonicalize_identifier)
1000                    .collect();
1001                Ok(Some(TableConstraint::PrimaryKey { columns }))
1002            }
1003            TokenWithSpan {
1004                token: Token::Word(w),
1005                ..
1006            } if w.keyword == Keyword::TIME => {
1007                self.parser
1008                    .expect_keyword(Keyword::INDEX)
1009                    .context(error::UnexpectedSnafu {
1010                        expected: "INDEX",
1011                        actual: self.peek_token_as_string(),
1012                    })?;
1013
1014                let raw_columns = self
1015                    .parser
1016                    .parse_parenthesized_column_list(Mandatory, false)
1017                    .context(error::SyntaxSnafu)?;
1018                let mut columns = raw_columns
1019                    .into_iter()
1020                    .map(Self::canonicalize_identifier)
1021                    .collect::<Vec<_>>();
1022
1023                ensure!(
1024                    columns.len() == 1,
1025                    InvalidTimeIndexSnafu {
1026                        msg: "it should contain only one column in time index",
1027                    }
1028                );
1029
1030                Ok(Some(TableConstraint::TimeIndex {
1031                    column: columns.pop().unwrap(),
1032                }))
1033            }
1034            _ => {
1035                self.parser.prev_token();
1036                Ok(None)
1037            }
1038        }
1039    }
1040
1041    /// Parses the set of valid formats
1042    fn parse_table_engine(&mut self, default: &str) -> Result<String> {
1043        if !self.consume_token(ENGINE) {
1044            return Ok(default.to_string());
1045        }
1046
1047        self.parser
1048            .expect_token(&Token::Eq)
1049            .context(error::UnexpectedSnafu {
1050                expected: "=",
1051                actual: self.peek_token_as_string(),
1052            })?;
1053
1054        let token = self.parser.next_token();
1055        if let Token::Word(w) = token.token {
1056            Ok(w.value)
1057        } else {
1058            self.expected("'Engine' is missing", token)
1059        }
1060    }
1061}
1062
1063fn validate_time_index(columns: &[Column], constraints: &[TableConstraint]) -> Result<()> {
1064    let time_index_constraints: Vec<_> = constraints
1065        .iter()
1066        .filter_map(|c| match c {
1067            TableConstraint::TimeIndex { column } => Some(column),
1068            _ => None,
1069        })
1070        .unique()
1071        .collect();
1072
1073    ensure!(!time_index_constraints.is_empty(), MissingTimeIndexSnafu);
1074    ensure!(
1075        time_index_constraints.len() == 1,
1076        InvalidTimeIndexSnafu {
1077            msg: format!(
1078                "expected only one time index constraint but actual {}",
1079                time_index_constraints.len()
1080            ),
1081        }
1082    );
1083
1084    // It's safe to use time_index_constraints[0][0],
1085    // we already check the bound above.
1086    let time_index_column_ident = &time_index_constraints[0];
1087    let time_index_column = columns
1088        .iter()
1089        .find(|c| c.name().value == *time_index_column_ident.value)
1090        .with_context(|| InvalidTimeIndexSnafu {
1091            msg: format!(
1092                "time index column {} not found in columns",
1093                time_index_column_ident
1094            ),
1095        })?;
1096
1097    let time_index_data_type = get_unalias_type(time_index_column.data_type());
1098    ensure!(
1099        matches!(time_index_data_type, DataType::Timestamp(_, _)),
1100        InvalidColumnOptionSnafu {
1101            name: time_index_column.name().to_string(),
1102            msg: "time index column data type should be timestamp",
1103        }
1104    );
1105
1106    Ok(())
1107}
1108
1109fn get_unalias_type(data_type: &DataType) -> DataType {
1110    match data_type {
1111        DataType::Custom(name, tokens) if name.0.len() == 1 && tokens.is_empty() => {
1112            if let Some(real_type) =
1113                get_data_type_by_alias_name(name.0[0].to_string_unquoted().as_str())
1114            {
1115                real_type
1116            } else {
1117                data_type.clone()
1118            }
1119        }
1120        _ => data_type.clone(),
1121    }
1122}
1123
1124fn validate_partitions(columns: &[Column], partitions: &Partitions) -> Result<()> {
1125    let partition_columns = ensure_partition_columns_defined(columns, partitions)?;
1126
1127    ensure_exprs_are_binary(&partitions.exprs, &partition_columns)?;
1128
1129    Ok(())
1130}
1131
1132/// Ensure all exprs are binary expr and all the columns are defined in the column list.
1133fn ensure_exprs_are_binary(exprs: &[Expr], columns: &[&Column]) -> Result<()> {
1134    for expr in exprs {
1135        // The first level must be binary expr
1136        if let Expr::BinaryOp { left, op: _, right } = expr {
1137            ensure_one_expr(left, columns)?;
1138            ensure_one_expr(right, columns)?;
1139        } else {
1140            return error::InvalidSqlSnafu {
1141                msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1142            }
1143            .fail();
1144        }
1145    }
1146    Ok(())
1147}
1148
1149/// Check if the expr is a binary expr, an ident or a literal value.
1150/// If is ident, then check it is in the column list.
1151/// This recursive function is intended to be used by [ensure_exprs_are_binary].
1152fn ensure_one_expr(expr: &Expr, columns: &[&Column]) -> Result<()> {
1153    match expr {
1154        Expr::BinaryOp { left, op: _, right } => {
1155            ensure_one_expr(left, columns)?;
1156            ensure_one_expr(right, columns)?;
1157            Ok(())
1158        }
1159        Expr::Identifier(ident) => {
1160            let column_name = &ident.value;
1161            ensure!(
1162                columns.iter().any(|c| &c.name().value == column_name),
1163                error::InvalidSqlSnafu {
1164                    msg: format!(
1165                        "Column {:?} in rule expr is not referenced in PARTITION ON",
1166                        column_name
1167                    ),
1168                }
1169            );
1170            Ok(())
1171        }
1172        Expr::Value(_) => Ok(()),
1173        Expr::UnaryOp { expr, .. } => {
1174            ensure_one_expr(expr, columns)?;
1175            Ok(())
1176        }
1177        _ => error::InvalidSqlSnafu {
1178            msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1179        }
1180        .fail(),
1181    }
1182}
1183
1184/// Ensure that all columns used in "PARTITION ON COLUMNS" are defined in create table.
1185fn ensure_partition_columns_defined<'a>(
1186    columns: &'a [Column],
1187    partitions: &'a Partitions,
1188) -> Result<Vec<&'a Column>> {
1189    partitions
1190        .column_list
1191        .iter()
1192        .map(|x| {
1193            let x = ParserContext::canonicalize_identifier(x.clone());
1194            // Normally the columns in "create table" won't be too many,
1195            // a linear search to find the target every time is fine.
1196            columns
1197                .iter()
1198                .find(|c| *c.name().value == x.value)
1199                .context(error::InvalidSqlSnafu {
1200                    msg: format!("Partition column {:?} not defined", x.value),
1201                })
1202        })
1203        .collect::<Result<Vec<&Column>>>()
1204}
1205
1206#[cfg(test)]
1207mod tests {
1208    use std::assert_matches::assert_matches;
1209    use std::collections::HashMap;
1210
1211    use common_catalog::consts::FILE_ENGINE;
1212    use common_error::ext::ErrorExt;
1213    use sqlparser::ast::ColumnOption::NotNull;
1214    use sqlparser::ast::{BinaryOperator, Expr, ObjectName, ObjectNamePart, Value};
1215    use sqlparser::dialect::GenericDialect;
1216    use sqlparser::tokenizer::Tokenizer;
1217
1218    use super::*;
1219    use crate::dialect::GreptimeDbDialect;
1220    use crate::parser::ParseOptions;
1221
1222    #[test]
1223    fn test_parse_create_table_like() {
1224        let sql = "CREATE TABLE t1 LIKE t2";
1225        let stmts =
1226            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1227                .unwrap();
1228
1229        assert_eq!(1, stmts.len());
1230        match &stmts[0] {
1231            Statement::CreateTableLike(c) => {
1232                assert_eq!(c.table_name.to_string(), "t1");
1233                assert_eq!(c.source_name.to_string(), "t2");
1234            }
1235            _ => unreachable!(),
1236        }
1237    }
1238
1239    #[test]
1240    fn test_validate_external_table_options() {
1241        let sql = "CREATE EXTERNAL TABLE city (
1242            host string,
1243            ts timestamp,
1244            cpu float64 default 0,
1245            memory float64,
1246            TIME INDEX (ts),
1247            PRIMARY KEY(ts, host)
1248        ) with(location='/var/data/city.csv',format='csv',foo='bar');";
1249
1250        let result =
1251            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1252        assert!(matches!(
1253            result,
1254            Err(error::Error::InvalidTableOption { .. })
1255        ));
1256    }
1257
1258    #[test]
1259    fn test_parse_create_external_table() {
1260        struct Test<'a> {
1261            sql: &'a str,
1262            expected_table_name: &'a str,
1263            expected_options: HashMap<&'a str, &'a str>,
1264            expected_engine: &'a str,
1265            expected_if_not_exist: bool,
1266        }
1267
1268        let tests = [
1269            Test {
1270                sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
1271                expected_table_name: "city",
1272                expected_options: HashMap::from([
1273                    ("location", "/var/data/city.csv"),
1274                    ("format", "csv"),
1275                ]),
1276                expected_engine: FILE_ENGINE,
1277                expected_if_not_exist: false,
1278            },
1279            Test {
1280                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');",
1281                expected_table_name: "city",
1282                expected_options: HashMap::from([
1283                    ("location", "/var/data/city.csv"),
1284                    ("format", "csv"),
1285                ]),
1286                expected_engine: "foo",
1287                expected_if_not_exist: true,
1288            },
1289            Test {
1290                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv','compaction.type'='bar');",
1291                expected_table_name: "city",
1292                expected_options: HashMap::from([
1293                    ("location", "/var/data/city.csv"),
1294                    ("format", "csv"),
1295                    ("compaction.type", "bar"),
1296                ]),
1297                expected_engine: "foo",
1298                expected_if_not_exist: true,
1299            },
1300        ];
1301
1302        for test in tests {
1303            let stmts = ParserContext::create_with_dialect(
1304                test.sql,
1305                &GreptimeDbDialect {},
1306                ParseOptions::default(),
1307            )
1308            .unwrap();
1309            assert_eq!(1, stmts.len());
1310            match &stmts[0] {
1311                Statement::CreateExternalTable(c) => {
1312                    assert_eq!(c.name.to_string(), test.expected_table_name.to_string());
1313                    assert_eq!(c.options.to_str_map(), test.expected_options);
1314                    assert_eq!(c.if_not_exists, test.expected_if_not_exist);
1315                    assert_eq!(c.engine, test.expected_engine);
1316                }
1317                _ => unreachable!(),
1318            }
1319        }
1320    }
1321
1322    #[test]
1323    fn test_parse_create_external_table_with_schema() {
1324        let sql = "CREATE EXTERNAL TABLE city (
1325            host string,
1326            ts timestamp,
1327            cpu float32 default 0,
1328            memory float64,
1329            TIME INDEX (ts),
1330            PRIMARY KEY(ts, host),
1331        ) with(location='/var/data/city.csv',format='csv');";
1332
1333        let options = HashMap::from([("location", "/var/data/city.csv"), ("format", "csv")]);
1334
1335        let stmts =
1336            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1337                .unwrap();
1338        assert_eq!(1, stmts.len());
1339        match &stmts[0] {
1340            Statement::CreateExternalTable(c) => {
1341                assert_eq!(c.name.to_string(), "city");
1342                assert_eq!(c.options.to_str_map(), options);
1343
1344                let columns = &c.columns;
1345                assert_column_def(&columns[0].column_def, "host", "STRING");
1346                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
1347                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
1348                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
1349
1350                let constraints = &c.constraints;
1351                assert_eq!(
1352                    &constraints[0],
1353                    &TableConstraint::TimeIndex {
1354                        column: Ident::new("ts"),
1355                    }
1356                );
1357                assert_eq!(
1358                    &constraints[1],
1359                    &TableConstraint::PrimaryKey {
1360                        columns: vec![Ident::new("ts"), Ident::new("host")]
1361                    }
1362                );
1363            }
1364            _ => unreachable!(),
1365        }
1366    }
1367
1368    #[test]
1369    fn test_parse_create_database() {
1370        let sql = "create database";
1371        let result =
1372            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1373        assert!(
1374            result
1375                .unwrap_err()
1376                .to_string()
1377                .contains("Unexpected token while parsing SQL statement")
1378        );
1379
1380        let sql = "create database prometheus";
1381        let stmts =
1382            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1383                .unwrap();
1384
1385        assert_eq!(1, stmts.len());
1386        match &stmts[0] {
1387            Statement::CreateDatabase(c) => {
1388                assert_eq!(c.name.to_string(), "prometheus");
1389                assert!(!c.if_not_exists);
1390            }
1391            _ => unreachable!(),
1392        }
1393
1394        let sql = "create database if not exists prometheus";
1395        let stmts =
1396            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1397                .unwrap();
1398
1399        assert_eq!(1, stmts.len());
1400        match &stmts[0] {
1401            Statement::CreateDatabase(c) => {
1402                assert_eq!(c.name.to_string(), "prometheus");
1403                assert!(c.if_not_exists);
1404            }
1405            _ => unreachable!(),
1406        }
1407
1408        let sql = "CREATE DATABASE `fOo`";
1409        let result =
1410            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1411        let stmts = result.unwrap();
1412        match &stmts.last().unwrap() {
1413            Statement::CreateDatabase(c) => {
1414                assert_eq!(c.name, vec![Ident::with_quote('`', "fOo")].into());
1415                assert!(!c.if_not_exists);
1416            }
1417            _ => unreachable!(),
1418        }
1419
1420        let sql = "CREATE DATABASE prometheus with (ttl='1h');";
1421        let result =
1422            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1423        let stmts = result.unwrap();
1424        match &stmts[0] {
1425            Statement::CreateDatabase(c) => {
1426                assert_eq!(c.name.to_string(), "prometheus");
1427                assert!(!c.if_not_exists);
1428                assert_eq!(c.options.get("ttl").unwrap(), "1h");
1429            }
1430            _ => unreachable!(),
1431        }
1432    }
1433
1434    #[test]
1435    fn test_parse_create_flow_more_testcases() {
1436        use pretty_assertions::assert_eq;
1437        fn parse_create_flow(sql: &str) -> CreateFlow {
1438            let stmts = ParserContext::create_with_dialect(
1439                sql,
1440                &GreptimeDbDialect {},
1441                ParseOptions::default(),
1442            )
1443            .unwrap();
1444            assert_eq!(1, stmts.len());
1445            match &stmts[0] {
1446                Statement::CreateFlow(c) => c.clone(),
1447                _ => unreachable!(),
1448            }
1449        }
1450        struct CreateFlowWoutQuery {
1451            /// Flow name
1452            pub flow_name: ObjectName,
1453            /// Output (sink) table name
1454            pub sink_table_name: ObjectName,
1455            /// Whether to replace existing task
1456            pub or_replace: bool,
1457            /// Create if not exist
1458            pub if_not_exists: bool,
1459            /// `EXPIRE AFTER`
1460            /// Duration in second as `i64`
1461            pub expire_after: Option<i64>,
1462            /// Comment string
1463            pub comment: Option<String>,
1464        }
1465        let testcases = vec![
1466            (
1467                r"
1468CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1469SINK TO schema_1.table_1
1470EXPIRE AFTER INTERVAL '5 minutes'
1471COMMENT 'test comment'
1472AS
1473SELECT max(c1), min(c2) FROM schema_2.table_2;",
1474                CreateFlowWoutQuery {
1475                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1476                    sink_table_name: ObjectName::from(vec![
1477                        Ident::new("schema_1"),
1478                        Ident::new("table_1"),
1479                    ]),
1480                    or_replace: true,
1481                    if_not_exists: true,
1482                    expire_after: Some(300),
1483                    comment: Some("test comment".to_string()),
1484                },
1485            ),
1486            (
1487                r"
1488CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1489SINK TO schema_1.table_1
1490EXPIRE AFTER INTERVAL '300 s'
1491COMMENT 'test comment'
1492AS
1493SELECT max(c1), min(c2) FROM schema_2.table_2;",
1494                CreateFlowWoutQuery {
1495                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1496                    sink_table_name: ObjectName::from(vec![
1497                        Ident::new("schema_1"),
1498                        Ident::new("table_1"),
1499                    ]),
1500                    or_replace: true,
1501                    if_not_exists: true,
1502                    expire_after: Some(300),
1503                    comment: Some("test comment".to_string()),
1504                },
1505            ),
1506            (
1507                r"
1508CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1509SINK TO schema_1.table_1
1510EXPIRE AFTER '5 minutes'
1511COMMENT 'test comment'
1512AS
1513SELECT max(c1), min(c2) FROM schema_2.table_2;",
1514                CreateFlowWoutQuery {
1515                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1516                    sink_table_name: ObjectName::from(vec![
1517                        Ident::new("schema_1"),
1518                        Ident::new("table_1"),
1519                    ]),
1520                    or_replace: true,
1521                    if_not_exists: true,
1522                    expire_after: Some(300),
1523                    comment: Some("test comment".to_string()),
1524                },
1525            ),
1526            (
1527                r"
1528CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1529SINK TO schema_1.table_1
1530EXPIRE AFTER '300 s'
1531COMMENT 'test comment'
1532AS
1533SELECT max(c1), min(c2) FROM schema_2.table_2;",
1534                CreateFlowWoutQuery {
1535                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1536                    sink_table_name: ObjectName::from(vec![
1537                        Ident::new("schema_1"),
1538                        Ident::new("table_1"),
1539                    ]),
1540                    or_replace: true,
1541                    if_not_exists: true,
1542                    expire_after: Some(300),
1543                    comment: Some("test comment".to_string()),
1544                },
1545            ),
1546            (
1547                r"
1548CREATE FLOW `task_2`
1549SINK TO schema_1.table_1
1550EXPIRE AFTER '2 days 1h 2 min'
1551AS
1552SELECT max(c1), min(c2) FROM schema_2.table_2;",
1553                CreateFlowWoutQuery {
1554                    flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_2")]),
1555                    sink_table_name: ObjectName::from(vec![
1556                        Ident::new("schema_1"),
1557                        Ident::new("table_1"),
1558                    ]),
1559                    or_replace: false,
1560                    if_not_exists: false,
1561                    expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1562                    comment: None,
1563                },
1564            ),
1565            (
1566                r"
1567create flow `task_3`
1568sink to schema_1.table_1
1569expire after '10 minutes'
1570as
1571select max(c1), min(c2) from schema_2.table_2;",
1572                CreateFlowWoutQuery {
1573                    flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_3")]),
1574                    sink_table_name: ObjectName::from(vec![
1575                        Ident::new("schema_1"),
1576                        Ident::new("table_1"),
1577                    ]),
1578                    or_replace: false,
1579                    if_not_exists: false,
1580                    expire_after: Some(600), // 10 minutes in seconds
1581                    comment: None,
1582                },
1583            ),
1584            (
1585                r"
1586create or replace flow if not exists task_4
1587sink to schema_1.table_1
1588expire after interval '2 hours'
1589comment 'lowercase test'
1590as
1591select max(c1), min(c2) from schema_2.table_2;",
1592                CreateFlowWoutQuery {
1593                    flow_name: ObjectName::from(vec![Ident::new("task_4")]),
1594                    sink_table_name: ObjectName::from(vec![
1595                        Ident::new("schema_1"),
1596                        Ident::new("table_1"),
1597                    ]),
1598                    or_replace: true,
1599                    if_not_exists: true,
1600                    expire_after: Some(7200), // 2 hours in seconds
1601                    comment: Some("lowercase test".to_string()),
1602                },
1603            ),
1604        ];
1605
1606        for (sql, expected) in testcases {
1607            let create_task = parse_create_flow(sql);
1608
1609            let expected = CreateFlow {
1610                flow_name: expected.flow_name,
1611                sink_table_name: expected.sink_table_name,
1612                or_replace: expected.or_replace,
1613                if_not_exists: expected.if_not_exists,
1614                expire_after: expected.expire_after,
1615                eval_interval: None,
1616                comment: expected.comment,
1617                // ignore query parse result
1618                query: create_task.query.clone(),
1619            };
1620
1621            assert_eq!(create_task, expected, "input sql is:\n{sql}");
1622            let show_create = create_task.to_string();
1623            let recreated = parse_create_flow(&show_create);
1624            assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1625        }
1626    }
1627
1628    #[test]
1629    fn test_parse_create_flow() {
1630        use pretty_assertions::assert_eq;
1631        fn parse_create_flow(sql: &str) -> CreateFlow {
1632            let stmts = ParserContext::create_with_dialect(
1633                sql,
1634                &GreptimeDbDialect {},
1635                ParseOptions::default(),
1636            )
1637            .unwrap();
1638            assert_eq!(1, stmts.len());
1639            match &stmts[0] {
1640                Statement::CreateFlow(c) => c.clone(),
1641                _ => panic!("{:?}", stmts[0]),
1642            }
1643        }
1644        struct CreateFlowWoutQuery {
1645            /// Flow name
1646            pub flow_name: ObjectName,
1647            /// Output (sink) table name
1648            pub sink_table_name: ObjectName,
1649            /// Whether to replace existing task
1650            pub or_replace: bool,
1651            /// Create if not exist
1652            pub if_not_exists: bool,
1653            /// `EXPIRE AFTER`
1654            /// Duration in second as `i64`
1655            pub expire_after: Option<i64>,
1656            /// Duration for flow evaluation interval
1657            /// Duration in seconds as `i64`
1658            /// If not set, flow will be evaluated based on time window size and other args.
1659            pub eval_interval: Option<i64>,
1660            /// Comment string
1661            pub comment: Option<String>,
1662        }
1663
1664        // create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE AFTER` and `COMMENT`
1665        let testcases = vec![
1666            (
1667                r"
1668CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1669SINK TO schema_1.table_1
1670EXPIRE AFTER INTERVAL '5 minutes'
1671COMMENT 'test comment'
1672AS
1673SELECT max(c1), min(c2) FROM schema_2.table_2;",
1674                CreateFlowWoutQuery {
1675                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1676                    sink_table_name: ObjectName(vec![
1677                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1678                        ObjectNamePart::Identifier(Ident::new("table_1")),
1679                    ]),
1680                    or_replace: true,
1681                    if_not_exists: true,
1682                    expire_after: Some(300),
1683                    eval_interval: None,
1684                    comment: Some("test comment".to_string()),
1685                },
1686            ),
1687            (
1688                r"
1689CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1690SINK TO schema_1.table_1
1691EXPIRE AFTER INTERVAL '300 s'
1692COMMENT 'test comment'
1693AS
1694SELECT max(c1), min(c2) FROM schema_2.table_2;",
1695                CreateFlowWoutQuery {
1696                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1697                    sink_table_name: ObjectName(vec![
1698                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1699                        ObjectNamePart::Identifier(Ident::new("table_1")),
1700                    ]),
1701                    or_replace: true,
1702                    if_not_exists: true,
1703                    expire_after: Some(300),
1704                    eval_interval: None,
1705                    comment: Some("test comment".to_string()),
1706                },
1707            ),
1708            (
1709                r"
1710CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1711SINK TO schema_1.table_1
1712EXPIRE AFTER '5 minutes'
1713EVAL INTERVAL '10 seconds'
1714COMMENT 'test comment'
1715AS
1716SELECT max(c1), min(c2) FROM schema_2.table_2;",
1717                CreateFlowWoutQuery {
1718                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1719                    sink_table_name: ObjectName(vec![
1720                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1721                        ObjectNamePart::Identifier(Ident::new("table_1")),
1722                    ]),
1723                    or_replace: true,
1724                    if_not_exists: true,
1725                    expire_after: Some(300),
1726                    eval_interval: Some(10),
1727                    comment: Some("test comment".to_string()),
1728                },
1729            ),
1730            (
1731                r"
1732CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1733SINK TO schema_1.table_1
1734EXPIRE AFTER '5 minutes'
1735EVAL INTERVAL INTERVAL '10 seconds'
1736COMMENT 'test comment'
1737AS
1738SELECT max(c1), min(c2) FROM schema_2.table_2;",
1739                CreateFlowWoutQuery {
1740                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1741                    sink_table_name: ObjectName(vec![
1742                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1743                        ObjectNamePart::Identifier(Ident::new("table_1")),
1744                    ]),
1745                    or_replace: true,
1746                    if_not_exists: true,
1747                    expire_after: Some(300),
1748                    eval_interval: Some(10),
1749                    comment: Some("test comment".to_string()),
1750                },
1751            ),
1752            (
1753                r"
1754CREATE FLOW `task_2`
1755SINK TO schema_1.table_1
1756EXPIRE AFTER '2 days 1h 2 min'
1757AS
1758SELECT max(c1), min(c2) FROM schema_2.table_2;",
1759                CreateFlowWoutQuery {
1760                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::with_quote(
1761                        '`', "task_2",
1762                    ))]),
1763                    sink_table_name: ObjectName(vec![
1764                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1765                        ObjectNamePart::Identifier(Ident::new("table_1")),
1766                    ]),
1767                    or_replace: false,
1768                    if_not_exists: false,
1769                    expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1770                    eval_interval: None,
1771                    comment: None,
1772                },
1773            ),
1774        ];
1775
1776        for (sql, expected) in testcases {
1777            let create_task = parse_create_flow(sql);
1778
1779            let expected = CreateFlow {
1780                flow_name: expected.flow_name,
1781                sink_table_name: expected.sink_table_name,
1782                or_replace: expected.or_replace,
1783                if_not_exists: expected.if_not_exists,
1784                expire_after: expected.expire_after,
1785                eval_interval: expected.eval_interval,
1786                comment: expected.comment,
1787                // ignore query parse result
1788                query: create_task.query.clone(),
1789            };
1790
1791            assert_eq!(create_task, expected, "input sql is:\n{sql}");
1792            let show_create = create_task.to_string();
1793            let recreated = parse_create_flow(&show_create);
1794            assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1795        }
1796    }
1797
1798    #[test]
1799    fn test_create_flow_no_month() {
1800        let sql = r"
1801CREATE FLOW `task_2`
1802SINK TO schema_1.table_1
1803EXPIRE AFTER '1 month 2 days 1h 2 min'
1804AS
1805SELECT max(c1), min(c2) FROM schema_2.table_2;";
1806        let stmts =
1807            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1808
1809        assert!(
1810            stmts.is_err()
1811                && stmts
1812                    .unwrap_err()
1813                    .to_string()
1814                    .contains("Interval with months is not allowed")
1815        );
1816    }
1817
1818    #[test]
1819    fn test_validate_create() {
1820        let sql = r"
1821CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp TIME INDEX)
1822PARTITION ON COLUMNS(c, a) (
1823    a < 10,
1824    a > 10 AND a < 20,
1825    a > 20 AND c < 100,
1826    a > 20 AND c >= 100
1827)
1828ENGINE=mito";
1829        let result =
1830            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1831        let _ = result.unwrap();
1832
1833        let sql = r"
1834CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
1835PARTITION ON COLUMNS(x) ()
1836ENGINE=mito";
1837        let result =
1838            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1839        assert!(
1840            result
1841                .unwrap_err()
1842                .to_string()
1843                .contains("Partition column \"x\" not defined")
1844        );
1845    }
1846
1847    #[test]
1848    fn test_parse_create_table_with_partitions() {
1849        let sql = r"
1850CREATE TABLE monitor (
1851  host_id    INT,
1852  idc        STRING,
1853  ts         TIMESTAMP,
1854  cpu        DOUBLE DEFAULT 0,
1855  memory     DOUBLE,
1856  TIME INDEX (ts),
1857  PRIMARY KEY (host),
1858)
1859PARTITION ON COLUMNS(idc, host_id) (
1860  idc <= 'hz' AND host_id < 1000,
1861  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
1862  idc > 'sh' AND host_id < 3000,
1863  idc > 'sh' AND host_id >= 3000,
1864)
1865ENGINE=mito";
1866        let result =
1867            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1868                .unwrap();
1869        assert_eq!(result.len(), 1);
1870        match &result[0] {
1871            Statement::CreateTable(c) => {
1872                assert!(c.partitions.is_some());
1873
1874                let partitions = c.partitions.as_ref().unwrap();
1875                let column_list = partitions
1876                    .column_list
1877                    .iter()
1878                    .map(|x| &x.value)
1879                    .collect::<Vec<&String>>();
1880                assert_eq!(column_list, vec!["idc", "host_id"]);
1881
1882                let exprs = &partitions.exprs;
1883
1884                assert_eq!(
1885                    exprs[0],
1886                    Expr::BinaryOp {
1887                        left: Box::new(Expr::BinaryOp {
1888                            left: Box::new(Expr::Identifier("idc".into())),
1889                            op: BinaryOperator::LtEq,
1890                            right: Box::new(Expr::Value(
1891                                Value::SingleQuotedString("hz".to_string()).into()
1892                            ))
1893                        }),
1894                        op: BinaryOperator::And,
1895                        right: Box::new(Expr::BinaryOp {
1896                            left: Box::new(Expr::Identifier("host_id".into())),
1897                            op: BinaryOperator::Lt,
1898                            right: Box::new(Expr::Value(
1899                                Value::Number("1000".to_string(), false).into()
1900                            ))
1901                        })
1902                    }
1903                );
1904                assert_eq!(
1905                    exprs[1],
1906                    Expr::BinaryOp {
1907                        left: Box::new(Expr::BinaryOp {
1908                            left: Box::new(Expr::BinaryOp {
1909                                left: Box::new(Expr::Identifier("idc".into())),
1910                                op: BinaryOperator::Gt,
1911                                right: Box::new(Expr::Value(
1912                                    Value::SingleQuotedString("hz".to_string()).into()
1913                                ))
1914                            }),
1915                            op: BinaryOperator::And,
1916                            right: Box::new(Expr::BinaryOp {
1917                                left: Box::new(Expr::Identifier("idc".into())),
1918                                op: BinaryOperator::LtEq,
1919                                right: Box::new(Expr::Value(
1920                                    Value::SingleQuotedString("sh".to_string()).into()
1921                                ))
1922                            })
1923                        }),
1924                        op: BinaryOperator::And,
1925                        right: Box::new(Expr::BinaryOp {
1926                            left: Box::new(Expr::Identifier("host_id".into())),
1927                            op: BinaryOperator::Lt,
1928                            right: Box::new(Expr::Value(
1929                                Value::Number("2000".to_string(), false).into()
1930                            ))
1931                        })
1932                    }
1933                );
1934                assert_eq!(
1935                    exprs[2],
1936                    Expr::BinaryOp {
1937                        left: Box::new(Expr::BinaryOp {
1938                            left: Box::new(Expr::Identifier("idc".into())),
1939                            op: BinaryOperator::Gt,
1940                            right: Box::new(Expr::Value(
1941                                Value::SingleQuotedString("sh".to_string()).into()
1942                            ))
1943                        }),
1944                        op: BinaryOperator::And,
1945                        right: Box::new(Expr::BinaryOp {
1946                            left: Box::new(Expr::Identifier("host_id".into())),
1947                            op: BinaryOperator::Lt,
1948                            right: Box::new(Expr::Value(
1949                                Value::Number("3000".to_string(), false).into()
1950                            ))
1951                        })
1952                    }
1953                );
1954                assert_eq!(
1955                    exprs[3],
1956                    Expr::BinaryOp {
1957                        left: Box::new(Expr::BinaryOp {
1958                            left: Box::new(Expr::Identifier("idc".into())),
1959                            op: BinaryOperator::Gt,
1960                            right: Box::new(Expr::Value(
1961                                Value::SingleQuotedString("sh".to_string()).into()
1962                            ))
1963                        }),
1964                        op: BinaryOperator::And,
1965                        right: Box::new(Expr::BinaryOp {
1966                            left: Box::new(Expr::Identifier("host_id".into())),
1967                            op: BinaryOperator::GtEq,
1968                            right: Box::new(Expr::Value(
1969                                Value::Number("3000".to_string(), false).into()
1970                            ))
1971                        })
1972                    }
1973                );
1974            }
1975            _ => unreachable!(),
1976        }
1977    }
1978
1979    #[test]
1980    fn test_parse_create_table_with_quoted_partitions() {
1981        let sql = r"
1982CREATE TABLE monitor (
1983  `host_id`    INT,
1984  idc        STRING,
1985  ts         TIMESTAMP,
1986  cpu        DOUBLE DEFAULT 0,
1987  memory     DOUBLE,
1988  TIME INDEX (ts),
1989  PRIMARY KEY (host),
1990)
1991PARTITION ON COLUMNS(IdC, host_id) (
1992  idc <= 'hz' AND host_id < 1000,
1993  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
1994  idc > 'sh' AND host_id < 3000,
1995  idc > 'sh' AND host_id >= 3000,
1996)";
1997        let result =
1998            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1999                .unwrap();
2000        assert_eq!(result.len(), 1);
2001    }
2002
2003    #[test]
2004    fn test_parse_create_table_with_timestamp_index() {
2005        let sql1 = r"
2006CREATE TABLE monitor (
2007  host_id    INT,
2008  idc        STRING,
2009  ts         TIMESTAMP TIME INDEX,
2010  cpu        DOUBLE DEFAULT 0,
2011  memory     DOUBLE,
2012  PRIMARY KEY (host),
2013)
2014ENGINE=mito";
2015        let result1 = ParserContext::create_with_dialect(
2016            sql1,
2017            &GreptimeDbDialect {},
2018            ParseOptions::default(),
2019        )
2020        .unwrap();
2021
2022        if let Statement::CreateTable(c) = &result1[0] {
2023            assert_eq!(c.constraints.len(), 2);
2024            let tc = c.constraints[0].clone();
2025            match tc {
2026                TableConstraint::TimeIndex { column } => {
2027                    assert_eq!(&column.value, "ts");
2028                }
2029                _ => panic!("should be time index constraint"),
2030            };
2031        } else {
2032            panic!("should be create_table statement");
2033        }
2034
2035        // `TIME INDEX` should be in front of `PRIMARY KEY`
2036        // in order to equal the `TIMESTAMP TIME INDEX` constraint options vector
2037        let sql2 = r"
2038CREATE TABLE monitor (
2039  host_id    INT,
2040  idc        STRING,
2041  ts         TIMESTAMP NOT NULL,
2042  cpu        DOUBLE DEFAULT 0,
2043  memory     DOUBLE,
2044  TIME INDEX (ts),
2045  PRIMARY KEY (host),
2046)
2047ENGINE=mito";
2048        let result2 = ParserContext::create_with_dialect(
2049            sql2,
2050            &GreptimeDbDialect {},
2051            ParseOptions::default(),
2052        )
2053        .unwrap();
2054
2055        assert_eq!(result1, result2);
2056
2057        // TIMESTAMP can be NULL which is not equal to above
2058        let sql3 = r"
2059CREATE TABLE monitor (
2060  host_id    INT,
2061  idc        STRING,
2062  ts         TIMESTAMP,
2063  cpu        DOUBLE DEFAULT 0,
2064  memory     DOUBLE,
2065  TIME INDEX (ts),
2066  PRIMARY KEY (host),
2067)
2068ENGINE=mito";
2069
2070        let result3 = ParserContext::create_with_dialect(
2071            sql3,
2072            &GreptimeDbDialect {},
2073            ParseOptions::default(),
2074        )
2075        .unwrap();
2076
2077        assert_ne!(result1, result3);
2078
2079        // BIGINT can't be time index any more
2080        let sql1 = r"
2081CREATE TABLE monitor (
2082  host_id    INT,
2083  idc        STRING,
2084  b          bigint TIME INDEX,
2085  cpu        DOUBLE DEFAULT 0,
2086  memory     DOUBLE,
2087  PRIMARY KEY (host),
2088)
2089ENGINE=mito";
2090        let result1 = ParserContext::create_with_dialect(
2091            sql1,
2092            &GreptimeDbDialect {},
2093            ParseOptions::default(),
2094        );
2095
2096        assert!(
2097            result1
2098                .unwrap_err()
2099                .to_string()
2100                .contains("time index column data type should be timestamp")
2101        );
2102    }
2103
2104    #[test]
2105    fn test_parse_create_table_with_timestamp_index_not_null() {
2106        let sql = r"
2107CREATE TABLE monitor (
2108  host_id    INT,
2109  idc        STRING,
2110  ts         TIMESTAMP TIME INDEX,
2111  cpu        DOUBLE DEFAULT 0,
2112  memory     DOUBLE,
2113  TIME INDEX (ts),
2114  PRIMARY KEY (host),
2115)
2116ENGINE=mito";
2117        let result =
2118            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2119                .unwrap();
2120
2121        assert_eq!(result.len(), 1);
2122        if let Statement::CreateTable(c) = &result[0] {
2123            let ts = c.columns[2].clone();
2124            assert_eq!(ts.name().to_string(), "ts");
2125            assert_eq!(ts.options()[0].option, NotNull);
2126        } else {
2127            panic!("should be create table statement");
2128        }
2129
2130        let sql1 = r"
2131CREATE TABLE monitor (
2132  host_id    INT,
2133  idc        STRING,
2134  ts         TIMESTAMP NOT NULL TIME INDEX,
2135  cpu        DOUBLE DEFAULT 0,
2136  memory     DOUBLE,
2137  TIME INDEX (ts),
2138  PRIMARY KEY (host),
2139)
2140ENGINE=mito";
2141
2142        let result1 = ParserContext::create_with_dialect(
2143            sql1,
2144            &GreptimeDbDialect {},
2145            ParseOptions::default(),
2146        )
2147        .unwrap();
2148        assert_eq!(result, result1);
2149
2150        let sql2 = r"
2151CREATE TABLE monitor (
2152  host_id    INT,
2153  idc        STRING,
2154  ts         TIMESTAMP TIME INDEX NOT NULL,
2155  cpu        DOUBLE DEFAULT 0,
2156  memory     DOUBLE,
2157  TIME INDEX (ts),
2158  PRIMARY KEY (host),
2159)
2160ENGINE=mito";
2161
2162        let result2 = ParserContext::create_with_dialect(
2163            sql2,
2164            &GreptimeDbDialect {},
2165            ParseOptions::default(),
2166        )
2167        .unwrap();
2168        assert_eq!(result, result2);
2169
2170        let sql3 = r"
2171CREATE TABLE monitor (
2172  host_id    INT,
2173  idc        STRING,
2174  ts         TIMESTAMP TIME INDEX NULL NOT,
2175  cpu        DOUBLE DEFAULT 0,
2176  memory     DOUBLE,
2177  TIME INDEX (ts),
2178  PRIMARY KEY (host),
2179)
2180ENGINE=mito";
2181
2182        let result3 = ParserContext::create_with_dialect(
2183            sql3,
2184            &GreptimeDbDialect {},
2185            ParseOptions::default(),
2186        );
2187        assert!(result3.is_err());
2188
2189        let sql4 = r"
2190CREATE TABLE monitor (
2191  host_id    INT,
2192  idc        STRING,
2193  ts         TIMESTAMP TIME INDEX NOT NULL NULL,
2194  cpu        DOUBLE DEFAULT 0,
2195  memory     DOUBLE,
2196  TIME INDEX (ts),
2197  PRIMARY KEY (host),
2198)
2199ENGINE=mito";
2200
2201        let result4 = ParserContext::create_with_dialect(
2202            sql4,
2203            &GreptimeDbDialect {},
2204            ParseOptions::default(),
2205        );
2206        assert!(result4.is_err());
2207
2208        let sql = r"
2209CREATE TABLE monitor (
2210  host_id    INT,
2211  idc        STRING,
2212  ts         TIMESTAMP TIME INDEX DEFAULT CURRENT_TIMESTAMP,
2213  cpu        DOUBLE DEFAULT 0,
2214  memory     DOUBLE,
2215  TIME INDEX (ts),
2216  PRIMARY KEY (host),
2217)
2218ENGINE=mito";
2219
2220        let result =
2221            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2222                .unwrap();
2223
2224        if let Statement::CreateTable(c) = &result[0] {
2225            let tc = c.constraints[0].clone();
2226            match tc {
2227                TableConstraint::TimeIndex { column } => {
2228                    assert_eq!(&column.value, "ts");
2229                }
2230                _ => panic!("should be time index constraint"),
2231            }
2232            let ts = c.columns[2].clone();
2233            assert_eq!(ts.name().to_string(), "ts");
2234            assert!(matches!(ts.options()[0].option, ColumnOption::Default(..)));
2235            assert_eq!(ts.options()[1].option, NotNull);
2236        } else {
2237            unreachable!("should be create table statement");
2238        }
2239    }
2240
2241    #[test]
2242    fn test_parse_partitions_with_error_syntax() {
2243        let sql = r"
2244CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2245PARTITION COLUMNS(c, a) (
2246    a < 10,
2247    a > 10 AND a < 20,
2248    a > 20 AND c < 100,
2249    a > 20 AND c >= 100
2250)
2251ENGINE=mito";
2252        let result =
2253            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2254        assert!(
2255            result
2256                .unwrap_err()
2257                .output_msg()
2258                .contains("sql parser error: Expected: ON, found: COLUMNS")
2259        );
2260    }
2261
2262    #[test]
2263    fn test_parse_partitions_without_rule() {
2264        let sql = r"
2265CREATE TABLE rcx ( a INT, b STRING, c INT, d TIMESTAMP TIME INDEX )
2266PARTITION ON COLUMNS(c, a) ()
2267ENGINE=mito";
2268        ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2269            .unwrap();
2270    }
2271
2272    #[test]
2273    fn test_parse_partitions_unreferenced_column() {
2274        let sql = r"
2275CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2276PARTITION ON COLUMNS(c, a) (
2277    b = 'foo'
2278)
2279ENGINE=mito";
2280        let result =
2281            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2282        assert_eq!(
2283            result.unwrap_err().output_msg(),
2284            "Invalid SQL, error: Column \"b\" in rule expr is not referenced in PARTITION ON"
2285        );
2286    }
2287
2288    #[test]
2289    fn test_parse_partitions_not_binary_expr() {
2290        let sql = r"
2291CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2292PARTITION ON COLUMNS(c, a) (
2293    b
2294)
2295ENGINE=mito";
2296        let result =
2297            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2298        assert_eq!(
2299            result.unwrap_err().output_msg(),
2300            r#"Invalid SQL, error: Partition rule expr Identifier(Ident { value: "b", quote_style: None, span: Span(Location(4,5)..Location(4,6)) }) is not a binary expr"#
2301        );
2302    }
2303
2304    fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {
2305        assert_eq!(column.name.to_string(), name);
2306        assert_eq!(column.data_type.to_string(), data_type);
2307    }
2308
2309    #[test]
2310    pub fn test_parse_create_table() {
2311        let sql = r"create table demo(
2312                             host string,
2313                             ts timestamp,
2314                             cpu float32 default 0,
2315                             memory float64,
2316                             TIME INDEX (ts),
2317                             PRIMARY KEY(ts, host),
2318                             ) engine=mito
2319                             with(ttl='10s');
2320         ";
2321        let result =
2322            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2323                .unwrap();
2324        assert_eq!(1, result.len());
2325        match &result[0] {
2326            Statement::CreateTable(c) => {
2327                assert!(!c.if_not_exists);
2328                assert_eq!("demo", c.name.to_string());
2329                assert_eq!("mito", c.engine);
2330                assert_eq!(4, c.columns.len());
2331                let columns = &c.columns;
2332                assert_column_def(&columns[0].column_def, "host", "STRING");
2333                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
2334                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
2335                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
2336
2337                let constraints = &c.constraints;
2338                assert_eq!(
2339                    &constraints[0],
2340                    &TableConstraint::TimeIndex {
2341                        column: Ident::new("ts"),
2342                    }
2343                );
2344                assert_eq!(
2345                    &constraints[1],
2346                    &TableConstraint::PrimaryKey {
2347                        columns: vec![Ident::new("ts"), Ident::new("host")]
2348                    }
2349                );
2350                // inverted index is merged into column options
2351                assert_eq!(1, c.options.len());
2352                assert_eq!(
2353                    [("ttl", "10s")].into_iter().collect::<HashMap<_, _>>(),
2354                    c.options.to_str_map()
2355                );
2356            }
2357            _ => unreachable!(),
2358        }
2359    }
2360
2361    #[test]
2362    fn test_invalid_index_keys() {
2363        let sql = r"create table demo(
2364                             host string,
2365                             ts int64,
2366                             cpu float64 default 0,
2367                             memory float64,
2368                             TIME INDEX (ts, host),
2369                             PRIMARY KEY(ts, host)) engine=mito;
2370         ";
2371        let result =
2372            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2373        assert!(result.is_err());
2374        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2375    }
2376
2377    #[test]
2378    fn test_duplicated_time_index() {
2379        let sql = r"create table demo(
2380                             host string,
2381                             ts timestamp time index,
2382                             t timestamp time index,
2383                             cpu float64 default 0,
2384                             memory float64,
2385                             TIME INDEX (ts, host),
2386                             PRIMARY KEY(ts, host)) engine=mito;
2387         ";
2388        let result =
2389            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2390        assert!(result.is_err());
2391        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2392
2393        let sql = r"create table demo(
2394                             host string,
2395                             ts timestamp time index,
2396                             cpu float64 default 0,
2397                             t timestamp,
2398                             memory float64,
2399                             TIME INDEX (t),
2400                             PRIMARY KEY(ts, host)) engine=mito;
2401         ";
2402        let result =
2403            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2404        assert!(result.is_err());
2405        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2406    }
2407
2408    #[test]
2409    fn test_invalid_column_name() {
2410        let sql = "create table foo(user string, i timestamp time index)";
2411        let result =
2412            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2413        let err = result.unwrap_err().output_msg();
2414        assert!(err.contains("Cannot use keyword 'user' as column name"));
2415
2416        // If column name is quoted, it's valid even same with keyword.
2417        let sql = r#"
2418            create table foo("user" string, i timestamp time index)
2419        "#;
2420        let result =
2421            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2422        let _ = result.unwrap();
2423    }
2424
2425    #[test]
2426    fn test_incorrect_default_value_issue_3479() {
2427        let sql = r#"CREATE TABLE `ExcePTuRi`(
2428non TIMESTAMP(6) TIME INDEX,
2429`iUSTO` DOUBLE DEFAULT 0.047318541668048164
2430)"#;
2431        let result =
2432            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2433                .unwrap();
2434        assert_eq!(1, result.len());
2435        match &result[0] {
2436            Statement::CreateTable(c) => {
2437                assert_eq!(
2438                    "`iUSTO` DOUBLE DEFAULT 0.047318541668048164",
2439                    c.columns[1].to_string()
2440                );
2441            }
2442            _ => unreachable!(),
2443        }
2444    }
2445
2446    #[test]
2447    fn test_parse_create_view() {
2448        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
2449
2450        let result =
2451            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2452                .unwrap();
2453        match &result[0] {
2454            Statement::CreateView(c) => {
2455                assert_eq!(c.to_string(), sql);
2456                assert!(!c.or_replace);
2457                assert!(!c.if_not_exists);
2458                assert_eq!("test", c.name.to_string());
2459            }
2460            _ => unreachable!(),
2461        }
2462
2463        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS";
2464
2465        let result =
2466            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2467                .unwrap();
2468        match &result[0] {
2469            Statement::CreateView(c) => {
2470                assert_eq!(c.to_string(), sql);
2471                assert!(c.or_replace);
2472                assert!(c.if_not_exists);
2473                assert_eq!("test", c.name.to_string());
2474            }
2475            _ => unreachable!(),
2476        }
2477    }
2478
2479    #[test]
2480    fn test_parse_create_view_invalid_query() {
2481        let sql = "CREATE VIEW test AS DELETE from demo";
2482        let result =
2483            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2484        assert!(result.is_ok_and(|x| x.len() == 1));
2485    }
2486
2487    #[test]
2488    fn test_parse_create_table_fulltext_options() {
2489        let sql1 = r"
2490CREATE TABLE log (
2491    ts TIMESTAMP TIME INDEX,
2492    msg TEXT FULLTEXT INDEX,
2493)";
2494        let result1 = ParserContext::create_with_dialect(
2495            sql1,
2496            &GreptimeDbDialect {},
2497            ParseOptions::default(),
2498        )
2499        .unwrap();
2500
2501        if let Statement::CreateTable(c) = &result1[0] {
2502            c.columns.iter().for_each(|col| {
2503                if col.name().value == "msg" {
2504                    assert!(
2505                        col.extensions
2506                            .fulltext_index_options
2507                            .as_ref()
2508                            .unwrap()
2509                            .is_empty()
2510                    );
2511                }
2512            });
2513        } else {
2514            panic!("should be create_table statement");
2515        }
2516
2517        let sql2 = r"
2518CREATE TABLE log (
2519    ts TIMESTAMP TIME INDEX,
2520    msg STRING FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false')
2521)";
2522        let result2 = ParserContext::create_with_dialect(
2523            sql2,
2524            &GreptimeDbDialect {},
2525            ParseOptions::default(),
2526        )
2527        .unwrap();
2528
2529        if let Statement::CreateTable(c) = &result2[0] {
2530            c.columns.iter().for_each(|col| {
2531                if col.name().value == "msg" {
2532                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2533                    assert_eq!(options.len(), 2);
2534                    assert_eq!(options.get("analyzer").unwrap(), "English");
2535                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2536                }
2537            });
2538        } else {
2539            panic!("should be create_table statement");
2540        }
2541
2542        let sql3 = r"
2543CREATE TABLE log (
2544    ts TIMESTAMP TIME INDEX,
2545    msg1 TINYTEXT FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false'),
2546    msg2 CHAR(20) FULLTEXT INDEX WITH (analyzer='Chinese', case_sensitive='true')
2547)";
2548        let result3 = ParserContext::create_with_dialect(
2549            sql3,
2550            &GreptimeDbDialect {},
2551            ParseOptions::default(),
2552        )
2553        .unwrap();
2554
2555        if let Statement::CreateTable(c) = &result3[0] {
2556            c.columns.iter().for_each(|col| {
2557                if col.name().value == "msg1" {
2558                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2559                    assert_eq!(options.len(), 2);
2560                    assert_eq!(options.get("analyzer").unwrap(), "English");
2561                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2562                } else if col.name().value == "msg2" {
2563                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2564                    assert_eq!(options.len(), 2);
2565                    assert_eq!(options.get("analyzer").unwrap(), "Chinese");
2566                    assert_eq!(options.get("case_sensitive").unwrap(), "true");
2567                }
2568            });
2569        } else {
2570            panic!("should be create_table statement");
2571        }
2572    }
2573
2574    #[test]
2575    fn test_parse_create_table_fulltext_options_invalid_type() {
2576        let sql = r"
2577CREATE TABLE log (
2578    ts TIMESTAMP TIME INDEX,
2579    msg INT FULLTEXT INDEX,
2580)";
2581        let result =
2582            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2583        assert!(result.is_err());
2584        assert!(
2585            result
2586                .unwrap_err()
2587                .to_string()
2588                .contains("FULLTEXT index only supports string type")
2589        );
2590    }
2591
2592    #[test]
2593    fn test_parse_create_table_fulltext_options_duplicate() {
2594        let sql = r"
2595CREATE TABLE log (
2596    ts TIMESTAMP TIME INDEX,
2597    msg STRING FULLTEXT INDEX WITH (analyzer='English', analyzer='Chinese') FULLTEXT INDEX WITH (case_sensitive='false')
2598)";
2599        let result =
2600            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2601        assert!(result.is_err());
2602        assert!(
2603            result
2604                .unwrap_err()
2605                .to_string()
2606                .contains("duplicated FULLTEXT INDEX option")
2607        );
2608    }
2609
2610    #[test]
2611    fn test_parse_create_table_fulltext_options_invalid_option() {
2612        let sql = r"
2613CREATE TABLE log (
2614    ts TIMESTAMP TIME INDEX,
2615    msg STRING FULLTEXT INDEX WITH (analyzer='English', invalid_option='Chinese')
2616)";
2617        let result =
2618            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2619        assert!(result.is_err());
2620        assert!(
2621            result
2622                .unwrap_err()
2623                .to_string()
2624                .contains("invalid FULLTEXT INDEX option")
2625        );
2626    }
2627
2628    #[test]
2629    fn test_parse_create_table_skip_options() {
2630        let sql = r"
2631CREATE TABLE log (
2632    ts TIMESTAMP TIME INDEX,
2633    msg INT SKIPPING INDEX WITH (granularity='8192', type='bloom'),
2634)";
2635        let result =
2636            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2637                .unwrap();
2638
2639        if let Statement::CreateTable(c) = &result[0] {
2640            c.columns.iter().for_each(|col| {
2641                if col.name().value == "msg" {
2642                    assert!(
2643                        !col.extensions
2644                            .skipping_index_options
2645                            .as_ref()
2646                            .unwrap()
2647                            .is_empty()
2648                    );
2649                }
2650            });
2651        } else {
2652            panic!("should be create_table statement");
2653        }
2654
2655        let sql = r"
2656        CREATE TABLE log (
2657            ts TIMESTAMP TIME INDEX,
2658            msg INT SKIPPING INDEX,
2659        )";
2660        let result =
2661            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2662                .unwrap();
2663
2664        if let Statement::CreateTable(c) = &result[0] {
2665            c.columns.iter().for_each(|col| {
2666                if col.name().value == "msg" {
2667                    assert!(
2668                        col.extensions
2669                            .skipping_index_options
2670                            .as_ref()
2671                            .unwrap()
2672                            .is_empty()
2673                    );
2674                }
2675            });
2676        } else {
2677            panic!("should be create_table statement");
2678        }
2679    }
2680
2681    #[test]
2682    fn test_parse_create_view_with_columns() {
2683        let sql = "CREATE VIEW test () AS SELECT * FROM NUMBERS";
2684        let result =
2685            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2686                .unwrap();
2687
2688        match &result[0] {
2689            Statement::CreateView(c) => {
2690                assert_eq!(c.to_string(), "CREATE VIEW test AS SELECT * FROM NUMBERS");
2691                assert!(!c.or_replace);
2692                assert!(!c.if_not_exists);
2693                assert_eq!("test", c.name.to_string());
2694            }
2695            _ => unreachable!(),
2696        }
2697        assert_eq!(
2698            "CREATE VIEW test AS SELECT * FROM NUMBERS",
2699            result[0].to_string()
2700        );
2701
2702        let sql = "CREATE VIEW test (n1) AS SELECT * FROM NUMBERS";
2703        let result =
2704            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2705                .unwrap();
2706
2707        match &result[0] {
2708            Statement::CreateView(c) => {
2709                assert_eq!(c.to_string(), sql);
2710                assert!(!c.or_replace);
2711                assert!(!c.if_not_exists);
2712                assert_eq!("test", c.name.to_string());
2713            }
2714            _ => unreachable!(),
2715        }
2716        assert_eq!(sql, result[0].to_string());
2717
2718        let sql = "CREATE VIEW test (n1, n2) AS SELECT * FROM NUMBERS";
2719        let result =
2720            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2721                .unwrap();
2722
2723        match &result[0] {
2724            Statement::CreateView(c) => {
2725                assert_eq!(c.to_string(), sql);
2726                assert!(!c.or_replace);
2727                assert!(!c.if_not_exists);
2728                assert_eq!("test", c.name.to_string());
2729            }
2730            _ => unreachable!(),
2731        }
2732        assert_eq!(sql, result[0].to_string());
2733
2734        // Some invalid syntax cases
2735        let sql = "CREATE VIEW test (n1 AS select * from demo";
2736        let result =
2737            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2738        assert!(result.is_err());
2739
2740        let sql = "CREATE VIEW test (n1, AS select * from demo";
2741        let result =
2742            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2743        assert!(result.is_err());
2744
2745        let sql = "CREATE VIEW test n1,n2) AS select * from demo";
2746        let result =
2747            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2748        assert!(result.is_err());
2749
2750        let sql = "CREATE VIEW test (1) AS select * from demo";
2751        let result =
2752            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2753        assert!(result.is_err());
2754
2755        // keyword
2756        let sql = "CREATE VIEW test (n1, select) AS select * from demo";
2757        let result =
2758            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2759        assert!(result.is_err());
2760    }
2761
2762    #[test]
2763    fn test_parse_column_extensions_vector() {
2764        // Test that vector options are parsed from data_type (no additional SQL needed)
2765        let sql = "";
2766        let dialect = GenericDialect {};
2767        let mut tokenizer = Tokenizer::new(&dialect, sql);
2768        let tokens = tokenizer.tokenize().unwrap();
2769        let mut parser = Parser::new(&dialect).with_tokens(tokens);
2770        let name = Ident::new("vec_col");
2771        let data_type =
2772            DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
2773        let mut extensions = ColumnExtensions::default();
2774
2775        let result =
2776            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2777        assert!(result.is_ok());
2778        assert!(extensions.vector_options.is_some());
2779        let vector_options = extensions.vector_options.unwrap();
2780        assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some("128"));
2781    }
2782
2783    #[test]
2784    fn test_parse_column_extensions_vector_invalid() {
2785        // Test that vector with no dimension fails
2786        let sql = "";
2787        let dialect = GenericDialect {};
2788        let mut tokenizer = Tokenizer::new(&dialect, sql);
2789        let tokens = tokenizer.tokenize().unwrap();
2790        let mut parser = Parser::new(&dialect).with_tokens(tokens);
2791        let name = Ident::new("vec_col");
2792        let data_type = DataType::Custom(vec![Ident::new("VECTOR")].into(), vec![]);
2793        let mut extensions = ColumnExtensions::default();
2794
2795        let result =
2796            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2797        assert!(result.is_err());
2798    }
2799
2800    #[test]
2801    fn test_parse_column_extensions_indices() {
2802        // Test skipping index
2803        {
2804            let sql = "SKIPPING INDEX";
2805            let dialect = GenericDialect {};
2806            let mut tokenizer = Tokenizer::new(&dialect, sql);
2807            let tokens = tokenizer.tokenize().unwrap();
2808            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2809            let name = Ident::new("col");
2810            let data_type = DataType::String(None);
2811            let mut extensions = ColumnExtensions::default();
2812            let result = ParserContext::parse_column_extensions(
2813                &mut parser,
2814                &name,
2815                &data_type,
2816                &mut extensions,
2817            );
2818            assert!(result.is_ok());
2819            assert!(extensions.skipping_index_options.is_some());
2820        }
2821
2822        // Test fulltext index with options
2823        {
2824            let sql = "FULLTEXT INDEX WITH (analyzer = 'English', case_sensitive = 'true')";
2825            let dialect = GenericDialect {};
2826            let mut tokenizer = Tokenizer::new(&dialect, sql);
2827            let tokens = tokenizer.tokenize().unwrap();
2828            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2829            let name = Ident::new("text_col");
2830            let data_type = DataType::String(None);
2831            let mut extensions = ColumnExtensions::default();
2832            let result = ParserContext::parse_column_extensions(
2833                &mut parser,
2834                &name,
2835                &data_type,
2836                &mut extensions,
2837            );
2838            assert!(result.unwrap());
2839            assert!(extensions.fulltext_index_options.is_some());
2840            let fulltext_options = extensions.fulltext_index_options.unwrap();
2841            assert_eq!(fulltext_options.get("analyzer"), Some("English"));
2842            assert_eq!(fulltext_options.get("case_sensitive"), Some("true"));
2843        }
2844
2845        // Test fulltext index with invalid type (should fail)
2846        {
2847            let sql = "FULLTEXT INDEX WITH (analyzer = 'English')";
2848            let dialect = GenericDialect {};
2849            let mut tokenizer = Tokenizer::new(&dialect, sql);
2850            let tokens = tokenizer.tokenize().unwrap();
2851            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2852            let name = Ident::new("num_col");
2853            let data_type = DataType::Int(None); // Non-string type
2854            let mut extensions = ColumnExtensions::default();
2855            let result = ParserContext::parse_column_extensions(
2856                &mut parser,
2857                &name,
2858                &data_type,
2859                &mut extensions,
2860            );
2861            assert!(result.is_err());
2862            assert!(
2863                result
2864                    .unwrap_err()
2865                    .to_string()
2866                    .contains("FULLTEXT index only supports string type")
2867            );
2868        }
2869
2870        // Test fulltext index with invalid option (won't fail, the parser doesn't check the option's content)
2871        {
2872            let sql = "FULLTEXT INDEX WITH (analyzer = 'Invalid', case_sensitive = 'true')";
2873            let dialect = GenericDialect {};
2874            let mut tokenizer = Tokenizer::new(&dialect, sql);
2875            let tokens = tokenizer.tokenize().unwrap();
2876            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2877            let name = Ident::new("text_col");
2878            let data_type = DataType::String(None);
2879            let mut extensions = ColumnExtensions::default();
2880            let result = ParserContext::parse_column_extensions(
2881                &mut parser,
2882                &name,
2883                &data_type,
2884                &mut extensions,
2885            );
2886            assert!(result.unwrap());
2887        }
2888
2889        // Test inverted index
2890        {
2891            let sql = "INVERTED INDEX";
2892            let dialect = GenericDialect {};
2893            let mut tokenizer = Tokenizer::new(&dialect, sql);
2894            let tokens = tokenizer.tokenize().unwrap();
2895            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2896            let name = Ident::new("col");
2897            let data_type = DataType::String(None);
2898            let mut extensions = ColumnExtensions::default();
2899            let result = ParserContext::parse_column_extensions(
2900                &mut parser,
2901                &name,
2902                &data_type,
2903                &mut extensions,
2904            );
2905            assert!(result.is_ok());
2906            assert!(extensions.inverted_index_options.is_some());
2907        }
2908
2909        // Test inverted index with options (should fail)
2910        {
2911            let sql = "INVERTED INDEX WITH (analyzer = 'English')";
2912            let dialect = GenericDialect {};
2913            let mut tokenizer = Tokenizer::new(&dialect, sql);
2914            let tokens = tokenizer.tokenize().unwrap();
2915            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2916            let name = Ident::new("col");
2917            let data_type = DataType::String(None);
2918            let mut extensions = ColumnExtensions::default();
2919            let result = ParserContext::parse_column_extensions(
2920                &mut parser,
2921                &name,
2922                &data_type,
2923                &mut extensions,
2924            );
2925            assert!(result.is_err());
2926            assert!(
2927                result
2928                    .unwrap_err()
2929                    .to_string()
2930                    .contains("INVERTED index doesn't support options")
2931            );
2932        }
2933
2934        // Test multiple indices
2935        {
2936            let sql = "SKIPPING INDEX FULLTEXT INDEX";
2937            let dialect = GenericDialect {};
2938            let mut tokenizer = Tokenizer::new(&dialect, sql);
2939            let tokens = tokenizer.tokenize().unwrap();
2940            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2941            let name = Ident::new("col");
2942            let data_type = DataType::String(None);
2943            let mut extensions = ColumnExtensions::default();
2944            let result = ParserContext::parse_column_extensions(
2945                &mut parser,
2946                &name,
2947                &data_type,
2948                &mut extensions,
2949            );
2950            assert!(result.unwrap());
2951            assert!(extensions.skipping_index_options.is_some());
2952            assert!(extensions.fulltext_index_options.is_some());
2953        }
2954    }
2955
2956    #[test]
2957    fn test_parse_interval_cast() {
2958        let s = "select '10s'::INTERVAL";
2959        let stmts =
2960            ParserContext::create_with_dialect(s, &GreptimeDbDialect {}, ParseOptions::default())
2961                .unwrap();
2962        assert_eq!("SELECT '10 seconds'::INTERVAL", &stmts[0].to_string());
2963    }
2964
2965    #[test]
2966    fn test_parse_create_table_vector_index_options() {
2967        // Test basic vector index
2968        let sql = r"
2969CREATE TABLE vectors (
2970    ts TIMESTAMP TIME INDEX,
2971    vec VECTOR(128) VECTOR INDEX,
2972)";
2973        let result =
2974            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2975                .unwrap();
2976
2977        if let Statement::CreateTable(c) = &result[0] {
2978            c.columns.iter().for_each(|col| {
2979                if col.name().value == "vec" {
2980                    assert!(
2981                        col.extensions
2982                            .vector_index_options
2983                            .as_ref()
2984                            .unwrap()
2985                            .is_empty()
2986                    );
2987                }
2988            });
2989        } else {
2990            panic!("should be create_table statement");
2991        }
2992
2993        // Test vector index with options
2994        let sql = r"
2995CREATE TABLE vectors (
2996    ts TIMESTAMP TIME INDEX,
2997    vec VECTOR(128) VECTOR INDEX WITH (metric='cosine', connectivity='32', expansion_add='256', expansion_search='128')
2998)";
2999        let result =
3000            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3001                .unwrap();
3002
3003        if let Statement::CreateTable(c) = &result[0] {
3004            c.columns.iter().for_each(|col| {
3005                if col.name().value == "vec" {
3006                    let options = col.extensions.vector_index_options.as_ref().unwrap();
3007                    assert_eq!(options.len(), 4);
3008                    assert_eq!(options.get("metric").unwrap(), "cosine");
3009                    assert_eq!(options.get("connectivity").unwrap(), "32");
3010                    assert_eq!(options.get("expansion_add").unwrap(), "256");
3011                    assert_eq!(options.get("expansion_search").unwrap(), "128");
3012                }
3013            });
3014        } else {
3015            panic!("should be create_table statement");
3016        }
3017    }
3018
3019    #[test]
3020    fn test_parse_create_table_vector_index_invalid_type() {
3021        // Test vector index on non-vector type (should fail)
3022        let sql = r"
3023CREATE TABLE vectors (
3024    ts TIMESTAMP TIME INDEX,
3025    col INT VECTOR INDEX,
3026)";
3027        let result =
3028            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3029        assert!(result.is_err());
3030        assert!(
3031            result
3032                .unwrap_err()
3033                .to_string()
3034                .contains("VECTOR INDEX only supports Vector type columns")
3035        );
3036    }
3037
3038    #[test]
3039    fn test_parse_create_table_vector_index_duplicate() {
3040        // Test duplicate vector index (should fail)
3041        let sql = r"
3042CREATE TABLE vectors (
3043    ts TIMESTAMP TIME INDEX,
3044    vec VECTOR(128) VECTOR INDEX VECTOR INDEX,
3045)";
3046        let result =
3047            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3048        assert!(result.is_err());
3049        assert!(
3050            result
3051                .unwrap_err()
3052                .to_string()
3053                .contains("duplicated VECTOR INDEX option")
3054        );
3055    }
3056
3057    #[test]
3058    fn test_parse_create_table_vector_index_invalid_option() {
3059        // Test invalid option key (should fail)
3060        let sql = r"
3061CREATE TABLE vectors (
3062    ts TIMESTAMP TIME INDEX,
3063    vec VECTOR(128) VECTOR INDEX WITH (metric='l2sq', invalid_option='foo')
3064)";
3065        let result =
3066            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3067        assert!(result.is_err());
3068        assert!(
3069            result
3070                .unwrap_err()
3071                .to_string()
3072                .contains("invalid VECTOR INDEX option")
3073        );
3074    }
3075
3076    #[test]
3077    fn test_parse_column_extensions_vector_index() {
3078        // Test vector index on vector type
3079        {
3080            let sql = "VECTOR INDEX WITH (metric = 'l2sq')";
3081            let dialect = GenericDialect {};
3082            let mut tokenizer = Tokenizer::new(&dialect, sql);
3083            let tokens = tokenizer.tokenize().unwrap();
3084            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3085            let name = Ident::new("vec_col");
3086            let data_type =
3087                DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
3088            // First, parse the vector type to set vector_options
3089            let mut extensions = ColumnExtensions {
3090                vector_options: Some(OptionMap::from([(
3091                    VECTOR_OPT_DIM.to_string(),
3092                    "128".to_string(),
3093                )])),
3094                ..Default::default()
3095            };
3096
3097            let result = ParserContext::parse_column_extensions(
3098                &mut parser,
3099                &name,
3100                &data_type,
3101                &mut extensions,
3102            );
3103            assert!(result.is_ok());
3104            assert!(extensions.vector_index_options.is_some());
3105            let vi_options = extensions.vector_index_options.unwrap();
3106            assert_eq!(vi_options.get("metric"), Some("l2sq"));
3107        }
3108
3109        // Test vector index on non-vector type (should fail)
3110        {
3111            let sql = "VECTOR INDEX";
3112            let dialect = GenericDialect {};
3113            let mut tokenizer = Tokenizer::new(&dialect, sql);
3114            let tokens = tokenizer.tokenize().unwrap();
3115            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3116            let name = Ident::new("num_col");
3117            let data_type = DataType::Int(None); // Non-vector type
3118            let mut extensions = ColumnExtensions::default();
3119            let result = ParserContext::parse_column_extensions(
3120                &mut parser,
3121                &name,
3122                &data_type,
3123                &mut extensions,
3124            );
3125            assert!(result.is_err());
3126            assert!(
3127                result
3128                    .unwrap_err()
3129                    .to_string()
3130                    .contains("VECTOR INDEX only supports Vector type columns")
3131            );
3132        }
3133    }
3134}