sql/parsers/
create_parser.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use common_catalog::consts::default_engine;
18use datafusion_common::ScalarValue;
19use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
20use datatypes::data_type::ConcreteDataType;
21use itertools::Itertools;
22use snafu::{ensure, OptionExt, ResultExt};
23use sqlparser::ast::{ColumnOption, ColumnOptionDef, DataType, Expr};
24use sqlparser::dialect::keywords::Keyword;
25use sqlparser::keywords::ALL_KEYWORDS;
26use sqlparser::parser::IsOptional::Mandatory;
27use sqlparser::parser::{Parser, ParserError};
28use sqlparser::tokenizer::{Token, TokenWithSpan, Word};
29use table::requests::validate_table_option;
30
31use crate::ast::{ColumnDef, Ident};
32use crate::error::{
33    self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidIntervalSnafu,
34    InvalidSqlSnafu, InvalidTableOptionSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result,
35    SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu,
36};
37use crate::parser::{ParserContext, FLOW};
38use crate::parsers::utils::{
39    self, validate_column_fulltext_create_option, validate_column_skipping_index_create_option,
40};
41use crate::statements::create::{
42    Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
43    CreateTableLike, CreateView, Partitions, TableConstraint, VECTOR_OPT_DIM,
44};
45use crate::statements::statement::Statement;
46use crate::statements::transform::type_alias::get_data_type_by_alias_name;
47use crate::statements::{sql_data_type_to_concrete_data_type, OptionMap};
48use crate::util::parse_option_string;
49
50pub const ENGINE: &str = "ENGINE";
51pub const MAXVALUE: &str = "MAXVALUE";
52pub const SINK: &str = "SINK";
53pub const EXPIRE: &str = "EXPIRE";
54pub const AFTER: &str = "AFTER";
55pub const INVERTED: &str = "INVERTED";
56pub const SKIPPING: &str = "SKIPPING";
57
58const DB_OPT_KEY_TTL: &str = "ttl";
59
60fn validate_database_option(key: &str) -> bool {
61    [DB_OPT_KEY_TTL].contains(&key)
62}
63
64/// Parses create [table] statement
65impl<'a> ParserContext<'a> {
66    pub(crate) fn parse_create(&mut self) -> Result<Statement> {
67        match self.parser.peek_token().token {
68            Token::Word(w) => match w.keyword {
69                Keyword::TABLE => self.parse_create_table(),
70
71                Keyword::SCHEMA | Keyword::DATABASE => self.parse_create_database(),
72
73                Keyword::EXTERNAL => self.parse_create_external_table(),
74
75                Keyword::OR => {
76                    let _ = self.parser.next_token();
77                    self.parser
78                        .expect_keyword(Keyword::REPLACE)
79                        .context(SyntaxSnafu)?;
80                    match self.parser.next_token().token {
81                        Token::Word(w) => match w.keyword {
82                            Keyword::VIEW => self.parse_create_view(true),
83                            Keyword::NoKeyword => {
84                                let uppercase = w.value.to_uppercase();
85                                match uppercase.as_str() {
86                                    FLOW => self.parse_create_flow(true),
87                                    _ => self.unsupported(w.to_string()),
88                                }
89                            }
90                            _ => self.unsupported(w.to_string()),
91                        },
92                        _ => self.unsupported(w.to_string()),
93                    }
94                }
95
96                Keyword::VIEW => {
97                    let _ = self.parser.next_token();
98                    self.parse_create_view(false)
99                }
100
101                Keyword::NoKeyword => {
102                    let _ = self.parser.next_token();
103                    let uppercase = w.value.to_uppercase();
104                    match uppercase.as_str() {
105                        FLOW => self.parse_create_flow(false),
106                        _ => self.unsupported(w.to_string()),
107                    }
108                }
109                _ => self.unsupported(w.to_string()),
110            },
111            unexpected => self.unsupported(unexpected.to_string()),
112        }
113    }
114
115    /// Parse `CREAVE VIEW` statement.
116    fn parse_create_view(&mut self, or_replace: bool) -> Result<Statement> {
117        let if_not_exists = self.parse_if_not_exist()?;
118        let view_name = self.intern_parse_table_name()?;
119
120        let columns = self.parse_view_columns()?;
121
122        self.parser
123            .expect_keyword(Keyword::AS)
124            .context(SyntaxSnafu)?;
125
126        let query = self.parse_query()?;
127
128        Ok(Statement::CreateView(CreateView {
129            name: view_name,
130            columns,
131            or_replace,
132            query: Box::new(query),
133            if_not_exists,
134        }))
135    }
136
137    fn parse_view_columns(&mut self) -> Result<Vec<Ident>> {
138        let mut columns = vec![];
139        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
140            return Ok(columns);
141        }
142
143        loop {
144            let name = self.parse_column_name().context(SyntaxSnafu)?;
145
146            columns.push(name);
147
148            let comma = self.parser.consume_token(&Token::Comma);
149            if self.parser.consume_token(&Token::RParen) {
150                // allow a trailing comma, even though it's not in standard
151                break;
152            } else if !comma {
153                return self.expected("',' or ')' after column name", self.parser.peek_token());
154            }
155        }
156
157        Ok(columns)
158    }
159
160    fn parse_create_external_table(&mut self) -> Result<Statement> {
161        let _ = self.parser.next_token();
162        self.parser
163            .expect_keyword(Keyword::TABLE)
164            .context(SyntaxSnafu)?;
165        let if_not_exists = self.parse_if_not_exist()?;
166        let table_name = self.intern_parse_table_name()?;
167        let (columns, constraints) = self.parse_columns()?;
168        if !columns.is_empty() {
169            validate_time_index(&columns, &constraints)?;
170        }
171
172        let engine = self.parse_table_engine(common_catalog::consts::FILE_ENGINE)?;
173        let options = self.parse_create_table_options()?;
174        Ok(Statement::CreateExternalTable(CreateExternalTable {
175            name: table_name,
176            columns,
177            constraints,
178            options,
179            if_not_exists,
180            engine,
181        }))
182    }
183
184    fn parse_create_database(&mut self) -> Result<Statement> {
185        let _ = self.parser.next_token();
186        let if_not_exists = self.parse_if_not_exist()?;
187        let database_name = self.parse_object_name().context(error::UnexpectedSnafu {
188            expected: "a database name",
189            actual: self.peek_token_as_string(),
190        })?;
191        let database_name = Self::canonicalize_object_name(database_name);
192
193        let options = self
194            .parser
195            .parse_options(Keyword::WITH)
196            .context(SyntaxSnafu)?
197            .into_iter()
198            .map(parse_option_string)
199            .collect::<Result<HashMap<String, String>>>()?;
200
201        for key in options.keys() {
202            ensure!(
203                validate_database_option(key),
204                InvalidDatabaseOptionSnafu {
205                    key: key.to_string()
206                }
207            );
208        }
209
210        Ok(Statement::CreateDatabase(CreateDatabase {
211            name: database_name,
212            if_not_exists,
213            options: options.into(),
214        }))
215    }
216
217    fn parse_create_table(&mut self) -> Result<Statement> {
218        let _ = self.parser.next_token();
219
220        let if_not_exists = self.parse_if_not_exist()?;
221
222        let table_name = self.intern_parse_table_name()?;
223
224        if self.parser.parse_keyword(Keyword::LIKE) {
225            let source_name = self.intern_parse_table_name()?;
226
227            return Ok(Statement::CreateTableLike(CreateTableLike {
228                table_name,
229                source_name,
230            }));
231        }
232
233        let (columns, constraints) = self.parse_columns()?;
234        validate_time_index(&columns, &constraints)?;
235
236        let partitions = self.parse_partitions()?;
237        if let Some(partitions) = &partitions {
238            validate_partitions(&columns, partitions)?;
239        }
240
241        let engine = self.parse_table_engine(default_engine())?;
242        let options = self.parse_create_table_options()?;
243        let create_table = CreateTable {
244            if_not_exists,
245            name: table_name,
246            columns,
247            engine,
248            constraints,
249            options,
250            table_id: 0, // table id is assigned by catalog manager
251            partitions,
252        };
253
254        Ok(Statement::CreateTable(create_table))
255    }
256
257    /// "CREATE FLOW" clause
258    fn parse_create_flow(&mut self, or_replace: bool) -> Result<Statement> {
259        let if_not_exists = self.parse_if_not_exist()?;
260
261        let flow_name = self.intern_parse_table_name()?;
262
263        // make `SINK` case in-sensitive
264        if let Token::Word(word) = self.parser.peek_token().token
265            && word.value.eq_ignore_ascii_case(SINK)
266        {
267            self.parser.next_token();
268        } else {
269            Err(ParserError::ParserError(
270                "Expect `SINK` keyword".to_string(),
271            ))
272            .context(SyntaxSnafu)?
273        }
274        self.parser
275            .expect_keyword(Keyword::TO)
276            .context(SyntaxSnafu)?;
277
278        let output_table_name = self.intern_parse_table_name()?;
279
280        let expire_after = if self
281            .parser
282            .consume_tokens(&[Token::make_keyword(EXPIRE), Token::make_keyword(AFTER)])
283        {
284            let expire_after_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
285            let expire_after_lit = utils::parser_expr_to_scalar_value(expire_after_expr.clone())?
286                .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
287                .ok()
288                .with_context(|| InvalidIntervalSnafu {
289                    reason: format!("cannot cast {} to interval type", expire_after_expr),
290                })?;
291            if let ScalarValue::IntervalMonthDayNano(Some(interval)) = expire_after_lit {
292                Some(
293                    interval.nanoseconds / 1_000_000_000
294                        + interval.days as i64 * 60 * 60 * 24
295                        + interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, // 1 month=365.25/12=30.44 days
296                                                                               // this is to keep the same as https://docs.rs/humantime/latest/humantime/fn.parse_duration.html
297                                                                               // which we use in database to parse i.e. ttl interval and many other intervals
298                )
299            } else {
300                unreachable!()
301            }
302        } else {
303            None
304        };
305
306        let comment = if self.parser.parse_keyword(Keyword::COMMENT) {
307            match self.parser.next_token() {
308                TokenWithSpan {
309                    token: Token::SingleQuotedString(value, ..),
310                    ..
311                } => Some(value),
312                unexpected => {
313                    return self
314                        .parser
315                        .expected("string", unexpected)
316                        .context(SyntaxSnafu)
317                }
318            }
319        } else {
320            None
321        };
322
323        self.parser
324            .expect_keyword(Keyword::AS)
325            .context(SyntaxSnafu)?;
326
327        let query = self.parser.parse_query().context(error::SyntaxSnafu)?;
328
329        Ok(Statement::CreateFlow(CreateFlow {
330            flow_name,
331            sink_table_name: output_table_name,
332            or_replace,
333            if_not_exists,
334            expire_after,
335            comment,
336            query,
337        }))
338    }
339
340    fn parse_if_not_exist(&mut self) -> Result<bool> {
341        match self.parser.peek_token().token {
342            Token::Word(w) if Keyword::IF != w.keyword => return Ok(false),
343            _ => {}
344        }
345
346        if self.parser.parse_keywords(&[Keyword::IF, Keyword::NOT]) {
347            return self
348                .parser
349                .expect_keyword(Keyword::EXISTS)
350                .map(|_| true)
351                .context(UnexpectedSnafu {
352                    expected: "EXISTS",
353                    actual: self.peek_token_as_string(),
354                });
355        }
356
357        if self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]) {
358            return UnsupportedSnafu { keyword: "EXISTS" }.fail();
359        }
360
361        Ok(false)
362    }
363
364    fn parse_create_table_options(&mut self) -> Result<OptionMap> {
365        let options = self
366            .parser
367            .parse_options(Keyword::WITH)
368            .context(SyntaxSnafu)?
369            .into_iter()
370            .map(parse_option_string)
371            .collect::<Result<HashMap<String, String>>>()?;
372        for key in options.keys() {
373            ensure!(validate_table_option(key), InvalidTableOptionSnafu { key });
374        }
375        Ok(options.into())
376    }
377
378    /// "PARTITION ON COLUMNS (...)" clause
379    fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
380        if !self.parser.parse_keyword(Keyword::PARTITION) {
381            return Ok(None);
382        }
383        self.parser
384            .expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
385            .context(error::UnexpectedSnafu {
386                expected: "ON, COLUMNS",
387                actual: self.peek_token_as_string(),
388            })?;
389
390        let raw_column_list = self
391            .parser
392            .parse_parenthesized_column_list(Mandatory, false)
393            .context(error::SyntaxSnafu)?;
394        let column_list = raw_column_list
395            .into_iter()
396            .map(Self::canonicalize_identifier)
397            .collect();
398
399        let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
400
401        Ok(Some(Partitions { column_list, exprs }))
402    }
403
404    fn parse_partition_entry(&mut self) -> Result<Expr> {
405        self.parser.parse_expr().context(error::SyntaxSnafu)
406    }
407
408    /// Parse a comma-separated list wrapped by "()", and of which all items accepted by `F`
409    fn parse_comma_separated<T, F>(&mut self, mut f: F) -> Result<Vec<T>>
410    where
411        F: FnMut(&mut ParserContext<'a>) -> Result<T>,
412    {
413        self.parser
414            .expect_token(&Token::LParen)
415            .context(error::UnexpectedSnafu {
416                expected: "(",
417                actual: self.peek_token_as_string(),
418            })?;
419
420        let mut values = vec![];
421        while self.parser.peek_token() != Token::RParen {
422            values.push(f(self)?);
423            if !self.parser.consume_token(&Token::Comma) {
424                break;
425            }
426        }
427
428        self.parser
429            .expect_token(&Token::RParen)
430            .context(error::UnexpectedSnafu {
431                expected: ")",
432                actual: self.peek_token_as_string(),
433            })?;
434
435        Ok(values)
436    }
437
438    /// Parse the columns and constraints.
439    fn parse_columns(&mut self) -> Result<(Vec<Column>, Vec<TableConstraint>)> {
440        let mut columns = vec![];
441        let mut constraints = vec![];
442        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
443            return Ok((columns, constraints));
444        }
445
446        loop {
447            if let Some(constraint) = self.parse_optional_table_constraint()? {
448                constraints.push(constraint);
449            } else if let Token::Word(_) = self.parser.peek_token().token {
450                self.parse_column(&mut columns, &mut constraints)?;
451            } else {
452                return self.expected(
453                    "column name or constraint definition",
454                    self.parser.peek_token(),
455                );
456            }
457            let comma = self.parser.consume_token(&Token::Comma);
458            if self.parser.consume_token(&Token::RParen) {
459                // allow a trailing comma, even though it's not in standard
460                break;
461            } else if !comma {
462                return self.expected(
463                    "',' or ')' after column definition",
464                    self.parser.peek_token(),
465                );
466            }
467        }
468
469        Ok((columns, constraints))
470    }
471
472    fn parse_column(
473        &mut self,
474        columns: &mut Vec<Column>,
475        constraints: &mut Vec<TableConstraint>,
476    ) -> Result<()> {
477        let mut column = self.parse_column_def()?;
478
479        let mut time_index_opt_idx = None;
480        for (index, opt) in column.options().iter().enumerate() {
481            if let ColumnOption::DialectSpecific(tokens) = &opt.option {
482                if matches!(
483                    &tokens[..],
484                    [
485                        Token::Word(Word {
486                            keyword: Keyword::TIME,
487                            ..
488                        }),
489                        Token::Word(Word {
490                            keyword: Keyword::INDEX,
491                            ..
492                        })
493                    ]
494                ) {
495                    ensure!(
496                        time_index_opt_idx.is_none(),
497                        InvalidColumnOptionSnafu {
498                            name: column.name().to_string(),
499                            msg: "duplicated time index",
500                        }
501                    );
502                    time_index_opt_idx = Some(index);
503
504                    let constraint = TableConstraint::TimeIndex {
505                        column: Ident::new(column.name().value.clone()),
506                    };
507                    constraints.push(constraint);
508                }
509            }
510        }
511
512        if let Some(index) = time_index_opt_idx {
513            ensure!(
514                !column.options().contains(&ColumnOptionDef {
515                    option: ColumnOption::Null,
516                    name: None,
517                }),
518                InvalidColumnOptionSnafu {
519                    name: column.name().to_string(),
520                    msg: "time index column can't be null",
521                }
522            );
523
524            // The timestamp type may be an alias type, we have to retrieve the actual type.
525            let data_type = get_unalias_type(column.data_type());
526            ensure!(
527                matches!(data_type, DataType::Timestamp(_, _)),
528                InvalidColumnOptionSnafu {
529                    name: column.name().to_string(),
530                    msg: "time index column data type should be timestamp",
531                }
532            );
533
534            let not_null_opt = ColumnOptionDef {
535                option: ColumnOption::NotNull,
536                name: None,
537            };
538
539            if !column.options().contains(&not_null_opt) {
540                column.mut_options().push(not_null_opt);
541            }
542
543            let _ = column.mut_options().remove(index);
544        }
545
546        columns.push(column);
547
548        Ok(())
549    }
550
551    /// Parse the column name and check if it's valid.
552    fn parse_column_name(&mut self) -> std::result::Result<Ident, ParserError> {
553        let name = self.parser.parse_identifier()?;
554        if name.quote_style.is_none() &&
555        // "ALL_KEYWORDS" are sorted.
556            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()
557        {
558            return Err(ParserError::ParserError(format!(
559                "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
560                &name.value
561            )));
562        }
563
564        Ok(name)
565    }
566
567    pub fn parse_column_def(&mut self) -> Result<Column> {
568        let name = self.parse_column_name().context(SyntaxSnafu)?;
569        let parser = &mut self.parser;
570
571        ensure!(
572            !(name.quote_style.is_none() &&
573            // "ALL_KEYWORDS" are sorted.
574            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()),
575            InvalidSqlSnafu {
576                msg: format!(
577                    "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
578                    &name.value
579                ),
580            }
581        );
582
583        let data_type = parser.parse_data_type().context(SyntaxSnafu)?;
584        let collation = if parser.parse_keyword(Keyword::COLLATE) {
585            Some(parser.parse_object_name(false).context(SyntaxSnafu)?)
586        } else {
587            None
588        };
589        let mut options = vec![];
590        let mut extensions = ColumnExtensions::default();
591        loop {
592            if parser.parse_keyword(Keyword::CONSTRAINT) {
593                let name = Some(parser.parse_identifier().context(SyntaxSnafu)?);
594                if let Some(option) = Self::parse_optional_column_option(parser)? {
595                    options.push(ColumnOptionDef { name, option });
596                } else {
597                    return parser
598                        .expected(
599                            "constraint details after CONSTRAINT <name>",
600                            parser.peek_token(),
601                        )
602                        .context(SyntaxSnafu);
603                }
604            } else if let Some(option) = Self::parse_optional_column_option(parser)? {
605                options.push(ColumnOptionDef { name: None, option });
606            } else if !Self::parse_column_extensions(parser, &name, &data_type, &mut extensions)? {
607                break;
608            };
609        }
610
611        Ok(Column {
612            column_def: ColumnDef {
613                name: Self::canonicalize_identifier(name),
614                data_type,
615                collation,
616                options,
617            },
618            extensions,
619        })
620    }
621
622    fn parse_optional_column_option(parser: &mut Parser<'_>) -> Result<Option<ColumnOption>> {
623        if parser.parse_keywords(&[Keyword::CHARACTER, Keyword::SET]) {
624            Ok(Some(ColumnOption::CharacterSet(
625                parser.parse_object_name(false).context(SyntaxSnafu)?,
626            )))
627        } else if parser.parse_keywords(&[Keyword::NOT, Keyword::NULL]) {
628            Ok(Some(ColumnOption::NotNull))
629        } else if parser.parse_keywords(&[Keyword::COMMENT]) {
630            match parser.next_token() {
631                TokenWithSpan {
632                    token: Token::SingleQuotedString(value, ..),
633                    ..
634                } => Ok(Some(ColumnOption::Comment(value))),
635                unexpected => parser.expected("string", unexpected).context(SyntaxSnafu),
636            }
637        } else if parser.parse_keyword(Keyword::NULL) {
638            Ok(Some(ColumnOption::Null))
639        } else if parser.parse_keyword(Keyword::DEFAULT) {
640            Ok(Some(ColumnOption::Default(
641                parser.parse_expr().context(SyntaxSnafu)?,
642            )))
643        } else if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
644            Ok(Some(ColumnOption::Unique {
645                is_primary: true,
646                characteristics: None,
647            }))
648        } else if parser.parse_keyword(Keyword::UNIQUE) {
649            Ok(Some(ColumnOption::Unique {
650                is_primary: false,
651                characteristics: None,
652            }))
653        } else if parser.parse_keywords(&[Keyword::TIME, Keyword::INDEX]) {
654            // Use a DialectSpecific option for time index
655            Ok(Some(ColumnOption::DialectSpecific(vec![
656                Token::Word(Word {
657                    value: "TIME".to_string(),
658                    quote_style: None,
659                    keyword: Keyword::TIME,
660                }),
661                Token::Word(Word {
662                    value: "INDEX".to_string(),
663                    quote_style: None,
664                    keyword: Keyword::INDEX,
665                }),
666            ])))
667        } else {
668            Ok(None)
669        }
670    }
671
672    /// Parse a column option extensions.
673    ///
674    /// This function will handle:
675    /// - Vector type
676    /// - Indexes
677    fn parse_column_extensions(
678        parser: &mut Parser<'_>,
679        column_name: &Ident,
680        column_type: &DataType,
681        column_extensions: &mut ColumnExtensions,
682    ) -> Result<bool> {
683        if let DataType::Custom(name, tokens) = column_type
684            && name.0.len() == 1
685            && &name.0[0].value.to_uppercase() == "VECTOR"
686        {
687            ensure!(
688                tokens.len() == 1,
689                InvalidColumnOptionSnafu {
690                    name: column_name.to_string(),
691                    msg: "VECTOR type should have dimension",
692                }
693            );
694
695            let dimension =
696                tokens[0]
697                    .parse::<u32>()
698                    .ok()
699                    .with_context(|| InvalidColumnOptionSnafu {
700                        name: column_name.to_string(),
701                        msg: "dimension should be a positive integer",
702                    })?;
703
704            let options = HashMap::from_iter([(VECTOR_OPT_DIM.to_string(), dimension.to_string())]);
705            column_extensions.vector_options = Some(options.into());
706        }
707
708        // parse index options in column definition
709        let mut is_index_declared = false;
710
711        // skipping index
712        if let Token::Word(word) = parser.peek_token().token
713            && word.value.eq_ignore_ascii_case(SKIPPING)
714        {
715            parser.next_token();
716            // Consume `INDEX` keyword
717            ensure!(
718                parser.parse_keyword(Keyword::INDEX),
719                InvalidColumnOptionSnafu {
720                    name: column_name.to_string(),
721                    msg: "expect INDEX after SKIPPING keyword",
722                }
723            );
724            ensure!(
725                column_extensions.skipping_index_options.is_none(),
726                InvalidColumnOptionSnafu {
727                    name: column_name.to_string(),
728                    msg: "duplicated SKIPPING index option",
729                }
730            );
731
732            let options = parser
733                .parse_options(Keyword::WITH)
734                .context(error::SyntaxSnafu)?
735                .into_iter()
736                .map(parse_option_string)
737                .collect::<Result<HashMap<String, String>>>()?;
738
739            for key in options.keys() {
740                ensure!(
741                    validate_column_skipping_index_create_option(key),
742                    InvalidColumnOptionSnafu {
743                        name: column_name.to_string(),
744                        msg: format!("invalid SKIPPING INDEX option: {key}"),
745                    }
746                );
747            }
748
749            column_extensions.skipping_index_options = Some(options.into());
750            is_index_declared |= true;
751        }
752
753        // fulltext index
754        if parser.parse_keyword(Keyword::FULLTEXT) {
755            // Consume `INDEX` keyword
756            ensure!(
757                parser.parse_keyword(Keyword::INDEX),
758                InvalidColumnOptionSnafu {
759                    name: column_name.to_string(),
760                    msg: "expect INDEX after FULLTEXT keyword",
761                }
762            );
763
764            ensure!(
765                column_extensions.fulltext_index_options.is_none(),
766                InvalidColumnOptionSnafu {
767                    name: column_name.to_string(),
768                    msg: "duplicated FULLTEXT INDEX option",
769                }
770            );
771
772            let column_type = get_unalias_type(column_type);
773            let data_type = sql_data_type_to_concrete_data_type(&column_type)?;
774            ensure!(
775                data_type == ConcreteDataType::string_datatype(),
776                InvalidColumnOptionSnafu {
777                    name: column_name.to_string(),
778                    msg: "FULLTEXT index only supports string type",
779                }
780            );
781
782            let options = parser
783                .parse_options(Keyword::WITH)
784                .context(error::SyntaxSnafu)?
785                .into_iter()
786                .map(parse_option_string)
787                .collect::<Result<HashMap<String, String>>>()?;
788
789            for key in options.keys() {
790                ensure!(
791                    validate_column_fulltext_create_option(key),
792                    InvalidColumnOptionSnafu {
793                        name: column_name.to_string(),
794                        msg: format!("invalid FULLTEXT INDEX option: {key}"),
795                    }
796                );
797            }
798
799            column_extensions.fulltext_index_options = Some(options.into());
800            is_index_declared |= true;
801        }
802
803        // inverted index
804        if let Token::Word(word) = parser.peek_token().token
805            && word.value.eq_ignore_ascii_case(INVERTED)
806        {
807            parser.next_token();
808            // Consume `INDEX` keyword
809            ensure!(
810                parser.parse_keyword(Keyword::INDEX),
811                InvalidColumnOptionSnafu {
812                    name: column_name.to_string(),
813                    msg: "expect INDEX after INVERTED keyword",
814                }
815            );
816
817            ensure!(
818                column_extensions.inverted_index_options.is_none(),
819                InvalidColumnOptionSnafu {
820                    name: column_name.to_string(),
821                    msg: "duplicated INVERTED index option",
822                }
823            );
824
825            // inverted index doesn't have options, skipping `WITH`
826            // try cache `WITH` and throw error
827            let with_token = parser.peek_token();
828            ensure!(
829                with_token.token
830                    != Token::Word(Word {
831                        value: "WITH".to_string(),
832                        keyword: Keyword::WITH,
833                        quote_style: None,
834                    }),
835                InvalidColumnOptionSnafu {
836                    name: column_name.to_string(),
837                    msg: "INVERTED index doesn't support options",
838                }
839            );
840
841            column_extensions.inverted_index_options = Some(OptionMap::default());
842            is_index_declared |= true;
843        }
844
845        Ok(is_index_declared)
846    }
847
848    fn parse_optional_table_constraint(&mut self) -> Result<Option<TableConstraint>> {
849        match self.parser.next_token() {
850            TokenWithSpan {
851                token: Token::Word(w),
852                ..
853            } if w.keyword == Keyword::PRIMARY => {
854                self.parser
855                    .expect_keyword(Keyword::KEY)
856                    .context(error::UnexpectedSnafu {
857                        expected: "KEY",
858                        actual: self.peek_token_as_string(),
859                    })?;
860                let raw_columns = self
861                    .parser
862                    .parse_parenthesized_column_list(Mandatory, false)
863                    .context(error::SyntaxSnafu)?;
864                let columns = raw_columns
865                    .into_iter()
866                    .map(Self::canonicalize_identifier)
867                    .collect();
868                Ok(Some(TableConstraint::PrimaryKey { columns }))
869            }
870            TokenWithSpan {
871                token: Token::Word(w),
872                ..
873            } if w.keyword == Keyword::TIME => {
874                self.parser
875                    .expect_keyword(Keyword::INDEX)
876                    .context(error::UnexpectedSnafu {
877                        expected: "INDEX",
878                        actual: self.peek_token_as_string(),
879                    })?;
880
881                let raw_columns = self
882                    .parser
883                    .parse_parenthesized_column_list(Mandatory, false)
884                    .context(error::SyntaxSnafu)?;
885                let mut columns = raw_columns
886                    .into_iter()
887                    .map(Self::canonicalize_identifier)
888                    .collect::<Vec<_>>();
889
890                ensure!(
891                    columns.len() == 1,
892                    InvalidTimeIndexSnafu {
893                        msg: "it should contain only one column in time index",
894                    }
895                );
896
897                Ok(Some(TableConstraint::TimeIndex {
898                    column: columns.pop().unwrap(),
899                }))
900            }
901            _ => {
902                self.parser.prev_token();
903                Ok(None)
904            }
905        }
906    }
907
908    /// Parses the set of valid formats
909    fn parse_table_engine(&mut self, default: &str) -> Result<String> {
910        if !self.consume_token(ENGINE) {
911            return Ok(default.to_string());
912        }
913
914        self.parser
915            .expect_token(&Token::Eq)
916            .context(error::UnexpectedSnafu {
917                expected: "=",
918                actual: self.peek_token_as_string(),
919            })?;
920
921        let token = self.parser.next_token();
922        if let Token::Word(w) = token.token {
923            Ok(w.value)
924        } else {
925            self.expected("'Engine' is missing", token)
926        }
927    }
928}
929
930fn validate_time_index(columns: &[Column], constraints: &[TableConstraint]) -> Result<()> {
931    let time_index_constraints: Vec<_> = constraints
932        .iter()
933        .filter_map(|c| match c {
934            TableConstraint::TimeIndex { column } => Some(column),
935            _ => None,
936        })
937        .unique()
938        .collect();
939
940    ensure!(!time_index_constraints.is_empty(), MissingTimeIndexSnafu);
941    ensure!(
942        time_index_constraints.len() == 1,
943        InvalidTimeIndexSnafu {
944            msg: format!(
945                "expected only one time index constraint but actual {}",
946                time_index_constraints.len()
947            ),
948        }
949    );
950
951    // It's safe to use time_index_constraints[0][0],
952    // we already check the bound above.
953    let time_index_column_ident = &time_index_constraints[0];
954    let time_index_column = columns
955        .iter()
956        .find(|c| c.name().value == *time_index_column_ident.value)
957        .with_context(|| InvalidTimeIndexSnafu {
958            msg: format!(
959                "time index column {} not found in columns",
960                time_index_column_ident
961            ),
962        })?;
963
964    let time_index_data_type = get_unalias_type(time_index_column.data_type());
965    ensure!(
966        matches!(time_index_data_type, DataType::Timestamp(_, _)),
967        InvalidColumnOptionSnafu {
968            name: time_index_column.name().to_string(),
969            msg: "time index column data type should be timestamp",
970        }
971    );
972
973    Ok(())
974}
975
976fn get_unalias_type(data_type: &DataType) -> DataType {
977    match data_type {
978        DataType::Custom(name, tokens) if name.0.len() == 1 && tokens.is_empty() => {
979            if let Some(real_type) = get_data_type_by_alias_name(name.0[0].value.as_str()) {
980                real_type
981            } else {
982                data_type.clone()
983            }
984        }
985        _ => data_type.clone(),
986    }
987}
988
989fn validate_partitions(columns: &[Column], partitions: &Partitions) -> Result<()> {
990    let partition_columns = ensure_partition_columns_defined(columns, partitions)?;
991
992    ensure_exprs_are_binary(&partitions.exprs, &partition_columns)?;
993
994    Ok(())
995}
996
997/// Ensure all exprs are binary expr and all the columns are defined in the column list.
998fn ensure_exprs_are_binary(exprs: &[Expr], columns: &[&Column]) -> Result<()> {
999    for expr in exprs {
1000        // The first level must be binary expr
1001        if let Expr::BinaryOp { left, op: _, right } = expr {
1002            ensure_one_expr(left, columns)?;
1003            ensure_one_expr(right, columns)?;
1004        } else {
1005            return error::InvalidSqlSnafu {
1006                msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1007            }
1008            .fail();
1009        }
1010    }
1011    Ok(())
1012}
1013
1014/// Check if the expr is a binary expr, an ident or a literal value.
1015/// If is ident, then check it is in the column list.
1016/// This recursive function is intended to be used by [ensure_exprs_are_binary].
1017fn ensure_one_expr(expr: &Expr, columns: &[&Column]) -> Result<()> {
1018    match expr {
1019        Expr::BinaryOp { left, op: _, right } => {
1020            ensure_one_expr(left, columns)?;
1021            ensure_one_expr(right, columns)?;
1022            Ok(())
1023        }
1024        Expr::Identifier(ident) => {
1025            let column_name = &ident.value;
1026            ensure!(
1027                columns.iter().any(|c| &c.name().value == column_name),
1028                error::InvalidSqlSnafu {
1029                    msg: format!(
1030                        "Column {:?} in rule expr is not referenced in PARTITION ON",
1031                        column_name
1032                    ),
1033                }
1034            );
1035            Ok(())
1036        }
1037        Expr::Value(_) => Ok(()),
1038        Expr::UnaryOp { expr, .. } => {
1039            ensure_one_expr(expr, columns)?;
1040            Ok(())
1041        }
1042        _ => error::InvalidSqlSnafu {
1043            msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1044        }
1045        .fail(),
1046    }
1047}
1048
1049/// Ensure that all columns used in "PARTITION ON COLUMNS" are defined in create table.
1050fn ensure_partition_columns_defined<'a>(
1051    columns: &'a [Column],
1052    partitions: &'a Partitions,
1053) -> Result<Vec<&'a Column>> {
1054    partitions
1055        .column_list
1056        .iter()
1057        .map(|x| {
1058            let x = ParserContext::canonicalize_identifier(x.clone());
1059            // Normally the columns in "create table" won't be too many,
1060            // a linear search to find the target every time is fine.
1061            columns
1062                .iter()
1063                .find(|c| *c.name().value == x.value)
1064                .context(error::InvalidSqlSnafu {
1065                    msg: format!("Partition column {:?} not defined", x.value),
1066                })
1067        })
1068        .collect::<Result<Vec<&Column>>>()
1069}
1070
1071#[cfg(test)]
1072mod tests {
1073    use std::assert_matches::assert_matches;
1074    use std::collections::HashMap;
1075
1076    use common_catalog::consts::FILE_ENGINE;
1077    use common_error::ext::ErrorExt;
1078    use sqlparser::ast::ColumnOption::NotNull;
1079    use sqlparser::ast::{BinaryOperator, Expr, ObjectName, Value};
1080    use sqlparser::dialect::GenericDialect;
1081    use sqlparser::tokenizer::Tokenizer;
1082
1083    use super::*;
1084    use crate::dialect::GreptimeDbDialect;
1085    use crate::parser::ParseOptions;
1086
1087    #[test]
1088    fn test_parse_create_table_like() {
1089        let sql = "CREATE TABLE t1 LIKE t2";
1090        let stmts =
1091            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1092                .unwrap();
1093
1094        assert_eq!(1, stmts.len());
1095        match &stmts[0] {
1096            Statement::CreateTableLike(c) => {
1097                assert_eq!(c.table_name.to_string(), "t1");
1098                assert_eq!(c.source_name.to_string(), "t2");
1099            }
1100            _ => unreachable!(),
1101        }
1102    }
1103
1104    #[test]
1105    fn test_validate_external_table_options() {
1106        let sql = "CREATE EXTERNAL TABLE city (
1107            host string,
1108            ts timestamp,
1109            cpu float64 default 0,
1110            memory float64,
1111            TIME INDEX (ts),
1112            PRIMARY KEY(ts, host)
1113        ) with(location='/var/data/city.csv',format='csv',foo='bar');";
1114
1115        let result =
1116            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1117        assert!(matches!(
1118            result,
1119            Err(error::Error::InvalidTableOption { .. })
1120        ));
1121    }
1122
1123    #[test]
1124    fn test_parse_create_external_table() {
1125        struct Test<'a> {
1126            sql: &'a str,
1127            expected_table_name: &'a str,
1128            expected_options: HashMap<String, String>,
1129            expected_engine: &'a str,
1130            expected_if_not_exist: bool,
1131        }
1132
1133        let tests = [
1134            Test {
1135                sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
1136                expected_table_name: "city",
1137                expected_options: HashMap::from([
1138                    ("location".to_string(), "/var/data/city.csv".to_string()),
1139                    ("format".to_string(), "csv".to_string()),
1140                ]),
1141                expected_engine: FILE_ENGINE,
1142                expected_if_not_exist: false,
1143            },
1144            Test {
1145                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');",
1146                expected_table_name: "city",
1147                expected_options: HashMap::from([
1148                    ("location".to_string(), "/var/data/city.csv".to_string()),
1149                    ("format".to_string(), "csv".to_string()),
1150                ]),
1151                expected_engine: "foo",
1152                expected_if_not_exist: true,
1153            },
1154            Test {
1155                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv','compaction.type'='bar');",
1156                expected_table_name: "city",
1157                expected_options: HashMap::from([
1158                    ("location".to_string(), "/var/data/city.csv".to_string()),
1159                    ("format".to_string(), "csv".to_string()),
1160                    ("compaction.type".to_string(), "bar".to_string()),
1161                ]),
1162                expected_engine: "foo",
1163                expected_if_not_exist: true,
1164            },
1165        ];
1166
1167        for test in tests {
1168            let stmts = ParserContext::create_with_dialect(
1169                test.sql,
1170                &GreptimeDbDialect {},
1171                ParseOptions::default(),
1172            )
1173            .unwrap();
1174            assert_eq!(1, stmts.len());
1175            match &stmts[0] {
1176                Statement::CreateExternalTable(c) => {
1177                    assert_eq!(c.name.to_string(), test.expected_table_name.to_string());
1178                    assert_eq!(c.options, test.expected_options.into());
1179                    assert_eq!(c.if_not_exists, test.expected_if_not_exist);
1180                    assert_eq!(c.engine, test.expected_engine);
1181                }
1182                _ => unreachable!(),
1183            }
1184        }
1185    }
1186
1187    #[test]
1188    fn test_parse_create_external_table_with_schema() {
1189        let sql = "CREATE EXTERNAL TABLE city (
1190            host string,
1191            ts timestamp,
1192            cpu float32 default 0,
1193            memory float64,
1194            TIME INDEX (ts),
1195            PRIMARY KEY(ts, host),
1196        ) with(location='/var/data/city.csv',format='csv');";
1197
1198        let options = HashMap::from([
1199            ("location".to_string(), "/var/data/city.csv".to_string()),
1200            ("format".to_string(), "csv".to_string()),
1201        ]);
1202
1203        let stmts =
1204            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1205                .unwrap();
1206        assert_eq!(1, stmts.len());
1207        match &stmts[0] {
1208            Statement::CreateExternalTable(c) => {
1209                assert_eq!(c.name.to_string(), "city");
1210                assert_eq!(c.options, options.into());
1211
1212                let columns = &c.columns;
1213                assert_column_def(&columns[0].column_def, "host", "STRING");
1214                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
1215                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
1216                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
1217
1218                let constraints = &c.constraints;
1219                assert_eq!(
1220                    &constraints[0],
1221                    &TableConstraint::TimeIndex {
1222                        column: Ident::new("ts"),
1223                    }
1224                );
1225                assert_eq!(
1226                    &constraints[1],
1227                    &TableConstraint::PrimaryKey {
1228                        columns: vec![Ident::new("ts"), Ident::new("host")]
1229                    }
1230                );
1231            }
1232            _ => unreachable!(),
1233        }
1234    }
1235
1236    #[test]
1237    fn test_parse_create_database() {
1238        let sql = "create database";
1239        let result =
1240            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1241        assert!(result
1242            .unwrap_err()
1243            .to_string()
1244            .contains("Unexpected token while parsing SQL statement"));
1245
1246        let sql = "create database prometheus";
1247        let stmts =
1248            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1249                .unwrap();
1250
1251        assert_eq!(1, stmts.len());
1252        match &stmts[0] {
1253            Statement::CreateDatabase(c) => {
1254                assert_eq!(c.name.to_string(), "prometheus");
1255                assert!(!c.if_not_exists);
1256            }
1257            _ => unreachable!(),
1258        }
1259
1260        let sql = "create database if not exists prometheus";
1261        let stmts =
1262            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1263                .unwrap();
1264
1265        assert_eq!(1, stmts.len());
1266        match &stmts[0] {
1267            Statement::CreateDatabase(c) => {
1268                assert_eq!(c.name.to_string(), "prometheus");
1269                assert!(c.if_not_exists);
1270            }
1271            _ => unreachable!(),
1272        }
1273
1274        let sql = "CREATE DATABASE `fOo`";
1275        let result =
1276            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1277        let stmts = result.unwrap();
1278        match &stmts.last().unwrap() {
1279            Statement::CreateDatabase(c) => {
1280                assert_eq!(c.name, ObjectName(vec![Ident::with_quote('`', "fOo")]));
1281                assert!(!c.if_not_exists);
1282            }
1283            _ => unreachable!(),
1284        }
1285
1286        let sql = "CREATE DATABASE prometheus with (ttl='1h');";
1287        let result =
1288            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1289        let stmts = result.unwrap();
1290        match &stmts[0] {
1291            Statement::CreateDatabase(c) => {
1292                assert_eq!(c.name.to_string(), "prometheus");
1293                assert!(!c.if_not_exists);
1294                assert_eq!(c.options.get("ttl").unwrap(), "1h");
1295            }
1296            _ => unreachable!(),
1297        }
1298    }
1299
1300    #[test]
1301    fn test_parse_create_flow() {
1302        let sql = r"
1303CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1304SINK TO schema_1.table_1
1305EXPIRE AFTER INTERVAL '5 minutes'
1306COMMENT 'test comment'
1307AS
1308SELECT max(c1), min(c2) FROM schema_2.table_2;";
1309        let stmts =
1310            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1311                .unwrap();
1312        assert_eq!(1, stmts.len());
1313        let create_task = match &stmts[0] {
1314            Statement::CreateFlow(c) => c,
1315            _ => unreachable!(),
1316        };
1317
1318        let expected = CreateFlow {
1319            flow_name: ObjectName(vec![Ident::new("task_1")]),
1320            sink_table_name: ObjectName(vec![Ident::new("schema_1"), Ident::new("table_1")]),
1321            or_replace: true,
1322            if_not_exists: true,
1323            expire_after: Some(300),
1324            comment: Some("test comment".to_string()),
1325            // ignore query parse result
1326            query: create_task.query.clone(),
1327        };
1328        assert_eq!(create_task, &expected);
1329
1330        // create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE AFTER` and `COMMENT`
1331        let sql = r"
1332CREATE FLOW `task_2`
1333SINK TO schema_1.table_1
1334EXPIRE AFTER '1 month 2 days 1h 2 min'
1335AS
1336SELECT max(c1), min(c2) FROM schema_2.table_2;";
1337        let stmts =
1338            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1339                .unwrap();
1340        assert_eq!(1, stmts.len());
1341        let create_task = match &stmts[0] {
1342            Statement::CreateFlow(c) => c,
1343            _ => unreachable!(),
1344        };
1345        assert!(!create_task.or_replace);
1346        assert!(!create_task.if_not_exists);
1347        assert_eq!(
1348            create_task.expire_after,
1349            Some(86400 * 3044 / 1000 + 2 * 86400 + 3600 + 2 * 60)
1350        );
1351        assert!(create_task.comment.is_none());
1352        assert_eq!(create_task.flow_name.to_string(), "`task_2`");
1353    }
1354
1355    #[test]
1356    fn test_validate_create() {
1357        let sql = r"
1358CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp TIME INDEX)
1359PARTITION ON COLUMNS(c, a) (
1360    a < 10,
1361    a > 10 AND a < 20,
1362    a > 20 AND c < 100,
1363    a > 20 AND c >= 100
1364)
1365ENGINE=mito";
1366        let result =
1367            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1368        let _ = result.unwrap();
1369
1370        let sql = r"
1371CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
1372PARTITION ON COLUMNS(x) ()
1373ENGINE=mito";
1374        let result =
1375            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1376        assert!(result
1377            .unwrap_err()
1378            .to_string()
1379            .contains("Partition column \"x\" not defined"));
1380    }
1381
1382    #[test]
1383    fn test_parse_create_table_with_partitions() {
1384        let sql = r"
1385CREATE TABLE monitor (
1386  host_id    INT,
1387  idc        STRING,
1388  ts         TIMESTAMP,
1389  cpu        DOUBLE DEFAULT 0,
1390  memory     DOUBLE,
1391  TIME INDEX (ts),
1392  PRIMARY KEY (host),
1393)
1394PARTITION ON COLUMNS(idc, host_id) (
1395  idc <= 'hz' AND host_id < 1000,
1396  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
1397  idc > 'sh' AND host_id < 3000,
1398  idc > 'sh' AND host_id >= 3000,
1399)
1400ENGINE=mito";
1401        let result =
1402            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1403                .unwrap();
1404        assert_eq!(result.len(), 1);
1405        match &result[0] {
1406            Statement::CreateTable(c) => {
1407                assert!(c.partitions.is_some());
1408
1409                let partitions = c.partitions.as_ref().unwrap();
1410                let column_list = partitions
1411                    .column_list
1412                    .iter()
1413                    .map(|x| &x.value)
1414                    .collect::<Vec<&String>>();
1415                assert_eq!(column_list, vec!["idc", "host_id"]);
1416
1417                let exprs = &partitions.exprs;
1418
1419                assert_eq!(
1420                    exprs[0],
1421                    Expr::BinaryOp {
1422                        left: Box::new(Expr::BinaryOp {
1423                            left: Box::new(Expr::Identifier("idc".into())),
1424                            op: BinaryOperator::LtEq,
1425                            right: Box::new(Expr::Value(Value::SingleQuotedString(
1426                                "hz".to_string()
1427                            )))
1428                        }),
1429                        op: BinaryOperator::And,
1430                        right: Box::new(Expr::BinaryOp {
1431                            left: Box::new(Expr::Identifier("host_id".into())),
1432                            op: BinaryOperator::Lt,
1433                            right: Box::new(Expr::Value(Value::Number("1000".to_string(), false)))
1434                        })
1435                    }
1436                );
1437                assert_eq!(
1438                    exprs[1],
1439                    Expr::BinaryOp {
1440                        left: Box::new(Expr::BinaryOp {
1441                            left: Box::new(Expr::BinaryOp {
1442                                left: Box::new(Expr::Identifier("idc".into())),
1443                                op: BinaryOperator::Gt,
1444                                right: Box::new(Expr::Value(Value::SingleQuotedString(
1445                                    "hz".to_string()
1446                                )))
1447                            }),
1448                            op: BinaryOperator::And,
1449                            right: Box::new(Expr::BinaryOp {
1450                                left: Box::new(Expr::Identifier("idc".into())),
1451                                op: BinaryOperator::LtEq,
1452                                right: Box::new(Expr::Value(Value::SingleQuotedString(
1453                                    "sh".to_string()
1454                                )))
1455                            })
1456                        }),
1457                        op: BinaryOperator::And,
1458                        right: Box::new(Expr::BinaryOp {
1459                            left: Box::new(Expr::Identifier("host_id".into())),
1460                            op: BinaryOperator::Lt,
1461                            right: Box::new(Expr::Value(Value::Number("2000".to_string(), false)))
1462                        })
1463                    }
1464                );
1465                assert_eq!(
1466                    exprs[2],
1467                    Expr::BinaryOp {
1468                        left: Box::new(Expr::BinaryOp {
1469                            left: Box::new(Expr::Identifier("idc".into())),
1470                            op: BinaryOperator::Gt,
1471                            right: Box::new(Expr::Value(Value::SingleQuotedString(
1472                                "sh".to_string()
1473                            )))
1474                        }),
1475                        op: BinaryOperator::And,
1476                        right: Box::new(Expr::BinaryOp {
1477                            left: Box::new(Expr::Identifier("host_id".into())),
1478                            op: BinaryOperator::Lt,
1479                            right: Box::new(Expr::Value(Value::Number("3000".to_string(), false)))
1480                        })
1481                    }
1482                );
1483                assert_eq!(
1484                    exprs[3],
1485                    Expr::BinaryOp {
1486                        left: Box::new(Expr::BinaryOp {
1487                            left: Box::new(Expr::Identifier("idc".into())),
1488                            op: BinaryOperator::Gt,
1489                            right: Box::new(Expr::Value(Value::SingleQuotedString(
1490                                "sh".to_string()
1491                            )))
1492                        }),
1493                        op: BinaryOperator::And,
1494                        right: Box::new(Expr::BinaryOp {
1495                            left: Box::new(Expr::Identifier("host_id".into())),
1496                            op: BinaryOperator::GtEq,
1497                            right: Box::new(Expr::Value(Value::Number("3000".to_string(), false)))
1498                        })
1499                    }
1500                );
1501            }
1502            _ => unreachable!(),
1503        }
1504    }
1505
1506    #[test]
1507    fn test_parse_create_table_with_quoted_partitions() {
1508        let sql = r"
1509CREATE TABLE monitor (
1510  `host_id`    INT,
1511  idc        STRING,
1512  ts         TIMESTAMP,
1513  cpu        DOUBLE DEFAULT 0,
1514  memory     DOUBLE,
1515  TIME INDEX (ts),
1516  PRIMARY KEY (host),
1517)
1518PARTITION ON COLUMNS(IdC, host_id) (
1519  idc <= 'hz' AND host_id < 1000,
1520  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
1521  idc > 'sh' AND host_id < 3000,
1522  idc > 'sh' AND host_id >= 3000,
1523)";
1524        let result =
1525            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1526                .unwrap();
1527        assert_eq!(result.len(), 1);
1528    }
1529
1530    #[test]
1531    fn test_parse_create_table_with_timestamp_index() {
1532        let sql1 = r"
1533CREATE TABLE monitor (
1534  host_id    INT,
1535  idc        STRING,
1536  ts         TIMESTAMP TIME INDEX,
1537  cpu        DOUBLE DEFAULT 0,
1538  memory     DOUBLE,
1539  PRIMARY KEY (host),
1540)
1541ENGINE=mito";
1542        let result1 = ParserContext::create_with_dialect(
1543            sql1,
1544            &GreptimeDbDialect {},
1545            ParseOptions::default(),
1546        )
1547        .unwrap();
1548
1549        if let Statement::CreateTable(c) = &result1[0] {
1550            assert_eq!(c.constraints.len(), 2);
1551            let tc = c.constraints[0].clone();
1552            match tc {
1553                TableConstraint::TimeIndex { column } => {
1554                    assert_eq!(&column.value, "ts");
1555                }
1556                _ => panic!("should be time index constraint"),
1557            };
1558        } else {
1559            panic!("should be create_table statement");
1560        }
1561
1562        // `TIME INDEX` should be in front of `PRIMARY KEY`
1563        // in order to equal the `TIMESTAMP TIME INDEX` constraint options vector
1564        let sql2 = r"
1565CREATE TABLE monitor (
1566  host_id    INT,
1567  idc        STRING,
1568  ts         TIMESTAMP NOT NULL,
1569  cpu        DOUBLE DEFAULT 0,
1570  memory     DOUBLE,
1571  TIME INDEX (ts),
1572  PRIMARY KEY (host),
1573)
1574ENGINE=mito";
1575        let result2 = ParserContext::create_with_dialect(
1576            sql2,
1577            &GreptimeDbDialect {},
1578            ParseOptions::default(),
1579        )
1580        .unwrap();
1581
1582        assert_eq!(result1, result2);
1583
1584        // TIMESTAMP can be NULL which is not equal to above
1585        let sql3 = r"
1586CREATE TABLE monitor (
1587  host_id    INT,
1588  idc        STRING,
1589  ts         TIMESTAMP,
1590  cpu        DOUBLE DEFAULT 0,
1591  memory     DOUBLE,
1592  TIME INDEX (ts),
1593  PRIMARY KEY (host),
1594)
1595ENGINE=mito";
1596
1597        let result3 = ParserContext::create_with_dialect(
1598            sql3,
1599            &GreptimeDbDialect {},
1600            ParseOptions::default(),
1601        )
1602        .unwrap();
1603
1604        assert_ne!(result1, result3);
1605
1606        // BIGINT can't be time index any more
1607        let sql1 = r"
1608CREATE TABLE monitor (
1609  host_id    INT,
1610  idc        STRING,
1611  b          bigint TIME INDEX,
1612  cpu        DOUBLE DEFAULT 0,
1613  memory     DOUBLE,
1614  PRIMARY KEY (host),
1615)
1616ENGINE=mito";
1617        let result1 = ParserContext::create_with_dialect(
1618            sql1,
1619            &GreptimeDbDialect {},
1620            ParseOptions::default(),
1621        );
1622
1623        assert!(result1
1624            .unwrap_err()
1625            .to_string()
1626            .contains("time index column data type should be timestamp"));
1627    }
1628
1629    #[test]
1630    fn test_parse_create_table_with_timestamp_index_not_null() {
1631        let sql = r"
1632CREATE TABLE monitor (
1633  host_id    INT,
1634  idc        STRING,
1635  ts         TIMESTAMP TIME INDEX,
1636  cpu        DOUBLE DEFAULT 0,
1637  memory     DOUBLE,
1638  TIME INDEX (ts),
1639  PRIMARY KEY (host),
1640)
1641ENGINE=mito";
1642        let result =
1643            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1644                .unwrap();
1645
1646        assert_eq!(result.len(), 1);
1647        if let Statement::CreateTable(c) = &result[0] {
1648            let ts = c.columns[2].clone();
1649            assert_eq!(ts.name().to_string(), "ts");
1650            assert_eq!(ts.options()[0].option, NotNull);
1651        } else {
1652            panic!("should be create table statement");
1653        }
1654
1655        let sql1 = r"
1656CREATE TABLE monitor (
1657  host_id    INT,
1658  idc        STRING,
1659  ts         TIMESTAMP NOT NULL TIME INDEX,
1660  cpu        DOUBLE DEFAULT 0,
1661  memory     DOUBLE,
1662  TIME INDEX (ts),
1663  PRIMARY KEY (host),
1664)
1665ENGINE=mito";
1666
1667        let result1 = ParserContext::create_with_dialect(
1668            sql1,
1669            &GreptimeDbDialect {},
1670            ParseOptions::default(),
1671        )
1672        .unwrap();
1673        assert_eq!(result, result1);
1674
1675        let sql2 = r"
1676CREATE TABLE monitor (
1677  host_id    INT,
1678  idc        STRING,
1679  ts         TIMESTAMP TIME INDEX NOT NULL,
1680  cpu        DOUBLE DEFAULT 0,
1681  memory     DOUBLE,
1682  TIME INDEX (ts),
1683  PRIMARY KEY (host),
1684)
1685ENGINE=mito";
1686
1687        let result2 = ParserContext::create_with_dialect(
1688            sql2,
1689            &GreptimeDbDialect {},
1690            ParseOptions::default(),
1691        )
1692        .unwrap();
1693        assert_eq!(result, result2);
1694
1695        let sql3 = r"
1696CREATE TABLE monitor (
1697  host_id    INT,
1698  idc        STRING,
1699  ts         TIMESTAMP TIME INDEX NULL NOT,
1700  cpu        DOUBLE DEFAULT 0,
1701  memory     DOUBLE,
1702  TIME INDEX (ts),
1703  PRIMARY KEY (host),
1704)
1705ENGINE=mito";
1706
1707        let result3 = ParserContext::create_with_dialect(
1708            sql3,
1709            &GreptimeDbDialect {},
1710            ParseOptions::default(),
1711        );
1712        assert!(result3.is_err());
1713
1714        let sql4 = r"
1715CREATE TABLE monitor (
1716  host_id    INT,
1717  idc        STRING,
1718  ts         TIMESTAMP TIME INDEX NOT NULL NULL,
1719  cpu        DOUBLE DEFAULT 0,
1720  memory     DOUBLE,
1721  TIME INDEX (ts),
1722  PRIMARY KEY (host),
1723)
1724ENGINE=mito";
1725
1726        let result4 = ParserContext::create_with_dialect(
1727            sql4,
1728            &GreptimeDbDialect {},
1729            ParseOptions::default(),
1730        );
1731        assert!(result4.is_err());
1732
1733        let sql = r"
1734CREATE TABLE monitor (
1735  host_id    INT,
1736  idc        STRING,
1737  ts         TIMESTAMP TIME INDEX DEFAULT CURRENT_TIMESTAMP,
1738  cpu        DOUBLE DEFAULT 0,
1739  memory     DOUBLE,
1740  TIME INDEX (ts),
1741  PRIMARY KEY (host),
1742)
1743ENGINE=mito";
1744
1745        let result =
1746            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1747                .unwrap();
1748
1749        if let Statement::CreateTable(c) = &result[0] {
1750            let tc = c.constraints[0].clone();
1751            match tc {
1752                TableConstraint::TimeIndex { column } => {
1753                    assert_eq!(&column.value, "ts");
1754                }
1755                _ => panic!("should be time index constraint"),
1756            }
1757            let ts = c.columns[2].clone();
1758            assert_eq!(ts.name().to_string(), "ts");
1759            assert!(matches!(ts.options()[0].option, ColumnOption::Default(..)));
1760            assert_eq!(ts.options()[1].option, NotNull);
1761        } else {
1762            unreachable!("should be create table statement");
1763        }
1764    }
1765
1766    #[test]
1767    fn test_parse_partitions_with_error_syntax() {
1768        let sql = r"
1769CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
1770PARTITION COLUMNS(c, a) (
1771    a < 10,
1772    a > 10 AND a < 20,
1773    a > 20 AND c < 100,
1774    a > 20 AND c >= 100
1775)
1776ENGINE=mito";
1777        let result =
1778            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1779        assert!(result
1780            .unwrap_err()
1781            .output_msg()
1782            .contains("sql parser error: Expected: ON, found: COLUMNS"));
1783    }
1784
1785    #[test]
1786    fn test_parse_partitions_without_rule() {
1787        let sql = r"
1788CREATE TABLE rcx ( a INT, b STRING, c INT, d TIMESTAMP TIME INDEX )
1789PARTITION ON COLUMNS(c, a) ()
1790ENGINE=mito";
1791        ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1792            .unwrap();
1793    }
1794
1795    #[test]
1796    fn test_parse_partitions_unreferenced_column() {
1797        let sql = r"
1798CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
1799PARTITION ON COLUMNS(c, a) (
1800    b = 'foo'
1801)
1802ENGINE=mito";
1803        let result =
1804            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1805        assert_eq!(
1806            result.unwrap_err().output_msg(),
1807            "Invalid SQL, error: Column \"b\" in rule expr is not referenced in PARTITION ON"
1808        );
1809    }
1810
1811    #[test]
1812    fn test_parse_partitions_not_binary_expr() {
1813        let sql = r"
1814CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
1815PARTITION ON COLUMNS(c, a) (
1816    b
1817)
1818ENGINE=mito";
1819        let result =
1820            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1821        assert_eq!(
1822            result.unwrap_err().output_msg(),
1823            r#"Invalid SQL, error: Partition rule expr Identifier(Ident { value: "b", quote_style: None, span: Span(Location(4,5)..Location(4,6)) }) is not a binary expr"#
1824        );
1825    }
1826
1827    fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {
1828        assert_eq!(column.name.to_string(), name);
1829        assert_eq!(column.data_type.to_string(), data_type);
1830    }
1831
1832    #[test]
1833    pub fn test_parse_create_table() {
1834        let sql = r"create table demo(
1835                             host string,
1836                             ts timestamp,
1837                             cpu float32 default 0,
1838                             memory float64,
1839                             TIME INDEX (ts),
1840                             PRIMARY KEY(ts, host),
1841                             ) engine=mito
1842                             with(ttl='10s');
1843         ";
1844        let result =
1845            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1846                .unwrap();
1847        assert_eq!(1, result.len());
1848        match &result[0] {
1849            Statement::CreateTable(c) => {
1850                assert!(!c.if_not_exists);
1851                assert_eq!("demo", c.name.to_string());
1852                assert_eq!("mito", c.engine);
1853                assert_eq!(4, c.columns.len());
1854                let columns = &c.columns;
1855                assert_column_def(&columns[0].column_def, "host", "STRING");
1856                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
1857                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
1858                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
1859
1860                let constraints = &c.constraints;
1861                assert_eq!(
1862                    &constraints[0],
1863                    &TableConstraint::TimeIndex {
1864                        column: Ident::new("ts"),
1865                    }
1866                );
1867                assert_eq!(
1868                    &constraints[1],
1869                    &TableConstraint::PrimaryKey {
1870                        columns: vec![Ident::new("ts"), Ident::new("host")]
1871                    }
1872                );
1873                // inverted index is merged into column options
1874                assert_eq!(1, c.options.len());
1875                assert_eq!(
1876                    [("ttl", "10s")].into_iter().collect::<HashMap<_, _>>(),
1877                    c.options.to_str_map()
1878                );
1879            }
1880            _ => unreachable!(),
1881        }
1882    }
1883
1884    #[test]
1885    fn test_invalid_index_keys() {
1886        let sql = r"create table demo(
1887                             host string,
1888                             ts int64,
1889                             cpu float64 default 0,
1890                             memory float64,
1891                             TIME INDEX (ts, host),
1892                             PRIMARY KEY(ts, host)) engine=mito;
1893         ";
1894        let result =
1895            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1896        assert!(result.is_err());
1897        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
1898    }
1899
1900    #[test]
1901    fn test_duplicated_time_index() {
1902        let sql = r"create table demo(
1903                             host string,
1904                             ts timestamp time index,
1905                             t timestamp time index,
1906                             cpu float64 default 0,
1907                             memory float64,
1908                             TIME INDEX (ts, host),
1909                             PRIMARY KEY(ts, host)) engine=mito;
1910         ";
1911        let result =
1912            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1913        assert!(result.is_err());
1914        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
1915
1916        let sql = r"create table demo(
1917                             host string,
1918                             ts timestamp time index,
1919                             cpu float64 default 0,
1920                             t timestamp,
1921                             memory float64,
1922                             TIME INDEX (t),
1923                             PRIMARY KEY(ts, host)) engine=mito;
1924         ";
1925        let result =
1926            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1927        assert!(result.is_err());
1928        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
1929    }
1930
1931    #[test]
1932    fn test_invalid_column_name() {
1933        let sql = "create table foo(user string, i timestamp time index)";
1934        let result =
1935            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1936        let err = result.unwrap_err().output_msg();
1937        assert!(err.contains("Cannot use keyword 'user' as column name"));
1938
1939        // If column name is quoted, it's valid even same with keyword.
1940        let sql = r#"
1941            create table foo("user" string, i timestamp time index)
1942        "#;
1943        let result =
1944            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1945        let _ = result.unwrap();
1946    }
1947
1948    #[test]
1949    fn test_incorrect_default_value_issue_3479() {
1950        let sql = r#"CREATE TABLE `ExcePTuRi`(
1951non TIMESTAMP(6) TIME INDEX,
1952`iUSTO` DOUBLE DEFAULT 0.047318541668048164
1953)"#;
1954        let result =
1955            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1956                .unwrap();
1957        assert_eq!(1, result.len());
1958        match &result[0] {
1959            Statement::CreateTable(c) => {
1960                assert_eq!(
1961                    "`iUSTO` DOUBLE DEFAULT 0.047318541668048164",
1962                    c.columns[1].to_string()
1963                );
1964            }
1965            _ => unreachable!(),
1966        }
1967    }
1968
1969    #[test]
1970    fn test_parse_create_view() {
1971        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
1972
1973        let result =
1974            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1975                .unwrap();
1976        match &result[0] {
1977            Statement::CreateView(c) => {
1978                assert_eq!(c.to_string(), sql);
1979                assert!(!c.or_replace);
1980                assert!(!c.if_not_exists);
1981                assert_eq!("test", c.name.to_string());
1982            }
1983            _ => unreachable!(),
1984        }
1985
1986        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS";
1987
1988        let result =
1989            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1990                .unwrap();
1991        match &result[0] {
1992            Statement::CreateView(c) => {
1993                assert_eq!(c.to_string(), sql);
1994                assert!(c.or_replace);
1995                assert!(c.if_not_exists);
1996                assert_eq!("test", c.name.to_string());
1997            }
1998            _ => unreachable!(),
1999        }
2000    }
2001
2002    #[test]
2003    fn test_parse_create_view_invalid_query() {
2004        let sql = "CREATE VIEW test AS DELETE from demo";
2005        let result =
2006            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2007        assert!(result.is_err());
2008        assert_matches!(result, Err(crate::error::Error::Syntax { .. }));
2009    }
2010
2011    #[test]
2012    fn test_parse_create_table_fulltext_options() {
2013        let sql1 = r"
2014CREATE TABLE log (
2015    ts TIMESTAMP TIME INDEX,
2016    msg TEXT FULLTEXT INDEX,
2017)";
2018        let result1 = ParserContext::create_with_dialect(
2019            sql1,
2020            &GreptimeDbDialect {},
2021            ParseOptions::default(),
2022        )
2023        .unwrap();
2024
2025        if let Statement::CreateTable(c) = &result1[0] {
2026            c.columns.iter().for_each(|col| {
2027                if col.name().value == "msg" {
2028                    assert!(col
2029                        .extensions
2030                        .fulltext_index_options
2031                        .as_ref()
2032                        .unwrap()
2033                        .is_empty());
2034                }
2035            });
2036        } else {
2037            panic!("should be create_table statement");
2038        }
2039
2040        let sql2 = r"
2041CREATE TABLE log (
2042    ts TIMESTAMP TIME INDEX,
2043    msg STRING FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false')
2044)";
2045        let result2 = ParserContext::create_with_dialect(
2046            sql2,
2047            &GreptimeDbDialect {},
2048            ParseOptions::default(),
2049        )
2050        .unwrap();
2051
2052        if let Statement::CreateTable(c) = &result2[0] {
2053            c.columns.iter().for_each(|col| {
2054                if col.name().value == "msg" {
2055                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2056                    assert_eq!(options.len(), 2);
2057                    assert_eq!(options.get("analyzer").unwrap(), "English");
2058                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2059                }
2060            });
2061        } else {
2062            panic!("should be create_table statement");
2063        }
2064
2065        let sql3 = r"
2066CREATE TABLE log (
2067    ts TIMESTAMP TIME INDEX,
2068    msg1 TINYTEXT FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false'),
2069    msg2 CHAR(20) FULLTEXT INDEX WITH (analyzer='Chinese', case_sensitive='true')
2070)";
2071        let result3 = ParserContext::create_with_dialect(
2072            sql3,
2073            &GreptimeDbDialect {},
2074            ParseOptions::default(),
2075        )
2076        .unwrap();
2077
2078        if let Statement::CreateTable(c) = &result3[0] {
2079            c.columns.iter().for_each(|col| {
2080                if col.name().value == "msg1" {
2081                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2082                    assert_eq!(options.len(), 2);
2083                    assert_eq!(options.get("analyzer").unwrap(), "English");
2084                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2085                } else if col.name().value == "msg2" {
2086                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2087                    assert_eq!(options.len(), 2);
2088                    assert_eq!(options.get("analyzer").unwrap(), "Chinese");
2089                    assert_eq!(options.get("case_sensitive").unwrap(), "true");
2090                }
2091            });
2092        } else {
2093            panic!("should be create_table statement");
2094        }
2095    }
2096
2097    #[test]
2098    fn test_parse_create_table_fulltext_options_invalid_type() {
2099        let sql = r"
2100CREATE TABLE log (
2101    ts TIMESTAMP TIME INDEX,
2102    msg INT FULLTEXT INDEX,
2103)";
2104        let result =
2105            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2106        assert!(result.is_err());
2107        assert!(result
2108            .unwrap_err()
2109            .to_string()
2110            .contains("FULLTEXT index only supports string type"));
2111    }
2112
2113    #[test]
2114    fn test_parse_create_table_fulltext_options_duplicate() {
2115        let sql = r"
2116CREATE TABLE log (
2117    ts TIMESTAMP TIME INDEX,
2118    msg STRING FULLTEXT INDEX WITH (analyzer='English', analyzer='Chinese') FULLTEXT INDEX WITH (case_sensitive='false')
2119)";
2120        let result =
2121            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2122        assert!(result.is_err());
2123        assert!(result
2124            .unwrap_err()
2125            .to_string()
2126            .contains("duplicated FULLTEXT INDEX option"));
2127    }
2128
2129    #[test]
2130    fn test_parse_create_table_fulltext_options_invalid_option() {
2131        let sql = r"
2132CREATE TABLE log (
2133    ts TIMESTAMP TIME INDEX,
2134    msg STRING FULLTEXT INDEX WITH (analyzer='English', invalid_option='Chinese')
2135)";
2136        let result =
2137            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2138        assert!(result.is_err());
2139        assert!(result
2140            .unwrap_err()
2141            .to_string()
2142            .contains("invalid FULLTEXT INDEX option"));
2143    }
2144
2145    #[test]
2146    fn test_parse_create_table_skip_options() {
2147        let sql = r"
2148CREATE TABLE log (
2149    ts TIMESTAMP TIME INDEX,
2150    msg INT SKIPPING INDEX WITH (granularity='8192', type='bloom'),
2151)";
2152        let result =
2153            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2154                .unwrap();
2155
2156        if let Statement::CreateTable(c) = &result[0] {
2157            c.columns.iter().for_each(|col| {
2158                if col.name().value == "msg" {
2159                    assert!(!col
2160                        .extensions
2161                        .skipping_index_options
2162                        .as_ref()
2163                        .unwrap()
2164                        .is_empty());
2165                }
2166            });
2167        } else {
2168            panic!("should be create_table statement");
2169        }
2170
2171        let sql = r"
2172        CREATE TABLE log (
2173            ts TIMESTAMP TIME INDEX,
2174            msg INT SKIPPING INDEX,
2175        )";
2176        let result =
2177            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2178                .unwrap();
2179
2180        if let Statement::CreateTable(c) = &result[0] {
2181            c.columns.iter().for_each(|col| {
2182                if col.name().value == "msg" {
2183                    assert!(col
2184                        .extensions
2185                        .skipping_index_options
2186                        .as_ref()
2187                        .unwrap()
2188                        .is_empty());
2189                }
2190            });
2191        } else {
2192            panic!("should be create_table statement");
2193        }
2194    }
2195
2196    #[test]
2197    fn test_parse_create_view_with_columns() {
2198        let sql = "CREATE VIEW test () AS SELECT * FROM NUMBERS";
2199        let result =
2200            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2201                .unwrap();
2202
2203        match &result[0] {
2204            Statement::CreateView(c) => {
2205                assert_eq!(c.to_string(), "CREATE VIEW test AS SELECT * FROM NUMBERS");
2206                assert!(!c.or_replace);
2207                assert!(!c.if_not_exists);
2208                assert_eq!("test", c.name.to_string());
2209            }
2210            _ => unreachable!(),
2211        }
2212        assert_eq!(
2213            "CREATE VIEW test AS SELECT * FROM NUMBERS",
2214            result[0].to_string()
2215        );
2216
2217        let sql = "CREATE VIEW test (n1) AS SELECT * FROM NUMBERS";
2218        let result =
2219            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2220                .unwrap();
2221
2222        match &result[0] {
2223            Statement::CreateView(c) => {
2224                assert_eq!(c.to_string(), sql);
2225                assert!(!c.or_replace);
2226                assert!(!c.if_not_exists);
2227                assert_eq!("test", c.name.to_string());
2228            }
2229            _ => unreachable!(),
2230        }
2231        assert_eq!(sql, result[0].to_string());
2232
2233        let sql = "CREATE VIEW test (n1, n2) AS SELECT * FROM NUMBERS";
2234        let result =
2235            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2236                .unwrap();
2237
2238        match &result[0] {
2239            Statement::CreateView(c) => {
2240                assert_eq!(c.to_string(), sql);
2241                assert!(!c.or_replace);
2242                assert!(!c.if_not_exists);
2243                assert_eq!("test", c.name.to_string());
2244            }
2245            _ => unreachable!(),
2246        }
2247        assert_eq!(sql, result[0].to_string());
2248
2249        // Some invalid syntax cases
2250        let sql = "CREATE VIEW test (n1 AS select * from demo";
2251        let result =
2252            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2253        assert!(result.is_err());
2254
2255        let sql = "CREATE VIEW test (n1, AS select * from demo";
2256        let result =
2257            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2258        assert!(result.is_err());
2259
2260        let sql = "CREATE VIEW test n1,n2) AS select * from demo";
2261        let result =
2262            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2263        assert!(result.is_err());
2264
2265        let sql = "CREATE VIEW test (1) AS select * from demo";
2266        let result =
2267            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2268        assert!(result.is_err());
2269
2270        // keyword
2271        let sql = "CREATE VIEW test (n1, select) AS select * from demo";
2272        let result =
2273            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2274        assert!(result.is_err());
2275    }
2276
2277    #[test]
2278    fn test_parse_column_extensions_vector() {
2279        let sql = "VECTOR(128)";
2280        let dialect = GenericDialect {};
2281        let mut tokenizer = Tokenizer::new(&dialect, sql);
2282        let tokens = tokenizer.tokenize().unwrap();
2283        let mut parser = Parser::new(&dialect).with_tokens(tokens);
2284        let name = Ident::new("vec_col");
2285        let data_type = DataType::Custom(
2286            ObjectName(vec![Ident::new("VECTOR")]),
2287            vec!["128".to_string()],
2288        );
2289        let mut extensions = ColumnExtensions::default();
2290
2291        let result =
2292            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2293        assert!(result.is_ok());
2294        assert!(extensions.vector_options.is_some());
2295        let vector_options = extensions.vector_options.unwrap();
2296        assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some(&"128".to_string()));
2297    }
2298
2299    #[test]
2300    fn test_parse_column_extensions_vector_invalid() {
2301        let sql = "VECTOR()";
2302        let dialect = GenericDialect {};
2303        let mut tokenizer = Tokenizer::new(&dialect, sql);
2304        let tokens = tokenizer.tokenize().unwrap();
2305        let mut parser = Parser::new(&dialect).with_tokens(tokens);
2306        let name = Ident::new("vec_col");
2307        let data_type = DataType::Custom(ObjectName(vec![Ident::new("VECTOR")]), vec![]);
2308        let mut extensions = ColumnExtensions::default();
2309
2310        let result =
2311            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2312        assert!(result.is_err());
2313    }
2314
2315    #[test]
2316    fn test_parse_column_extensions_indices() {
2317        // Test skipping index
2318        {
2319            let sql = "SKIPPING INDEX";
2320            let dialect = GenericDialect {};
2321            let mut tokenizer = Tokenizer::new(&dialect, sql);
2322            let tokens = tokenizer.tokenize().unwrap();
2323            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2324            let name = Ident::new("col");
2325            let data_type = DataType::String(None);
2326            let mut extensions = ColumnExtensions::default();
2327            let result = ParserContext::parse_column_extensions(
2328                &mut parser,
2329                &name,
2330                &data_type,
2331                &mut extensions,
2332            );
2333            assert!(result.is_ok());
2334            assert!(extensions.skipping_index_options.is_some());
2335        }
2336
2337        // Test fulltext index with options
2338        {
2339            let sql = "FULLTEXT INDEX WITH (analyzer = 'English', case_sensitive = 'true')";
2340            let dialect = GenericDialect {};
2341            let mut tokenizer = Tokenizer::new(&dialect, sql);
2342            let tokens = tokenizer.tokenize().unwrap();
2343            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2344            let name = Ident::new("text_col");
2345            let data_type = DataType::String(None);
2346            let mut extensions = ColumnExtensions::default();
2347            let result = ParserContext::parse_column_extensions(
2348                &mut parser,
2349                &name,
2350                &data_type,
2351                &mut extensions,
2352            );
2353            assert!(result.unwrap());
2354            assert!(extensions.fulltext_index_options.is_some());
2355            let fulltext_options = extensions.fulltext_index_options.unwrap();
2356            assert_eq!(
2357                fulltext_options.get("analyzer"),
2358                Some(&"English".to_string())
2359            );
2360            assert_eq!(
2361                fulltext_options.get("case_sensitive"),
2362                Some(&"true".to_string())
2363            );
2364        }
2365
2366        // Test fulltext index with invalid type (should fail)
2367        {
2368            let sql = "FULLTEXT INDEX WITH (analyzer = 'English')";
2369            let dialect = GenericDialect {};
2370            let mut tokenizer = Tokenizer::new(&dialect, sql);
2371            let tokens = tokenizer.tokenize().unwrap();
2372            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2373            let name = Ident::new("num_col");
2374            let data_type = DataType::Int(None); // Non-string type
2375            let mut extensions = ColumnExtensions::default();
2376            let result = ParserContext::parse_column_extensions(
2377                &mut parser,
2378                &name,
2379                &data_type,
2380                &mut extensions,
2381            );
2382            assert!(result.is_err());
2383            assert!(result
2384                .unwrap_err()
2385                .to_string()
2386                .contains("FULLTEXT index only supports string type"));
2387        }
2388
2389        // Test fulltext index with invalid option (won't fail, the parser doesn't check the option's content)
2390        {
2391            let sql = "FULLTEXT INDEX WITH (analyzer = 'Invalid', case_sensitive = 'true')";
2392            let dialect = GenericDialect {};
2393            let mut tokenizer = Tokenizer::new(&dialect, sql);
2394            let tokens = tokenizer.tokenize().unwrap();
2395            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2396            let name = Ident::new("text_col");
2397            let data_type = DataType::String(None);
2398            let mut extensions = ColumnExtensions::default();
2399            let result = ParserContext::parse_column_extensions(
2400                &mut parser,
2401                &name,
2402                &data_type,
2403                &mut extensions,
2404            );
2405            assert!(result.unwrap());
2406        }
2407
2408        // Test inverted index
2409        {
2410            let sql = "INVERTED INDEX";
2411            let dialect = GenericDialect {};
2412            let mut tokenizer = Tokenizer::new(&dialect, sql);
2413            let tokens = tokenizer.tokenize().unwrap();
2414            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2415            let name = Ident::new("col");
2416            let data_type = DataType::String(None);
2417            let mut extensions = ColumnExtensions::default();
2418            let result = ParserContext::parse_column_extensions(
2419                &mut parser,
2420                &name,
2421                &data_type,
2422                &mut extensions,
2423            );
2424            assert!(result.is_ok());
2425            assert!(extensions.inverted_index_options.is_some());
2426        }
2427
2428        // Test inverted index with options (should fail)
2429        {
2430            let sql = "INVERTED INDEX WITH (analyzer = 'English')";
2431            let dialect = GenericDialect {};
2432            let mut tokenizer = Tokenizer::new(&dialect, sql);
2433            let tokens = tokenizer.tokenize().unwrap();
2434            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2435            let name = Ident::new("col");
2436            let data_type = DataType::String(None);
2437            let mut extensions = ColumnExtensions::default();
2438            let result = ParserContext::parse_column_extensions(
2439                &mut parser,
2440                &name,
2441                &data_type,
2442                &mut extensions,
2443            );
2444            assert!(result.is_err());
2445            assert!(result
2446                .unwrap_err()
2447                .to_string()
2448                .contains("INVERTED index doesn't support options"));
2449        }
2450
2451        // Test multiple indices
2452        {
2453            let sql = "SKIPPING INDEX FULLTEXT INDEX";
2454            let dialect = GenericDialect {};
2455            let mut tokenizer = Tokenizer::new(&dialect, sql);
2456            let tokens = tokenizer.tokenize().unwrap();
2457            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2458            let name = Ident::new("col");
2459            let data_type = DataType::String(None);
2460            let mut extensions = ColumnExtensions::default();
2461            let result = ParserContext::parse_column_extensions(
2462                &mut parser,
2463                &name,
2464                &data_type,
2465                &mut extensions,
2466            );
2467            assert!(result.unwrap());
2468            assert!(extensions.skipping_index_options.is_some());
2469            assert!(extensions.fulltext_index_options.is_some());
2470        }
2471    }
2472
2473    #[test]
2474    fn test_parse_interval_cast() {
2475        let s = "select '10s'::INTERVAL";
2476        let stmts =
2477            ParserContext::create_with_dialect(s, &GreptimeDbDialect {}, ParseOptions::default())
2478                .unwrap();
2479        assert_eq!("SELECT '10 seconds'::INTERVAL", &stmts[0].to_string());
2480    }
2481}