sql/parsers/
create_parser.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod json;
16#[cfg(feature = "enterprise")]
17pub mod trigger;
18
19use std::collections::HashMap;
20
21use arrow_buffer::IntervalMonthDayNano;
22use common_catalog::consts::default_engine;
23use datafusion_common::ScalarValue;
24use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
25use datatypes::data_type::ConcreteDataType;
26use itertools::Itertools;
27use snafu::{OptionExt, ResultExt, ensure};
28use sqlparser::ast::{ColumnOption, ColumnOptionDef, DataType, Expr};
29use sqlparser::dialect::keywords::Keyword;
30use sqlparser::keywords::ALL_KEYWORDS;
31use sqlparser::parser::IsOptional::Mandatory;
32use sqlparser::parser::{Parser, ParserError};
33use sqlparser::tokenizer::{Token, TokenWithSpan, Word};
34use table::requests::{validate_database_option, validate_table_option};
35
36use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
37use crate::error::{
38    self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidIntervalSnafu,
39    InvalidSqlSnafu, InvalidTableOptionSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result,
40    SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu,
41};
42use crate::parser::{FLOW, ParserContext};
43use crate::parsers::tql_parser;
44use crate::parsers::utils::{
45    self, validate_column_fulltext_create_option, validate_column_skipping_index_create_option,
46    validate_column_vector_index_create_option,
47};
48use crate::statements::create::{
49    Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
50    CreateTableLike, CreateView, Partitions, SqlOrTql, TableConstraint, VECTOR_OPT_DIM,
51};
52use crate::statements::statement::Statement;
53use crate::statements::transform::type_alias::get_data_type_by_alias_name;
54use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
55use crate::util::{OptionValue, location_to_index, parse_option_string};
56
57pub const ENGINE: &str = "ENGINE";
58pub const MAXVALUE: &str = "MAXVALUE";
59pub const SINK: &str = "SINK";
60pub const EXPIRE: &str = "EXPIRE";
61pub const AFTER: &str = "AFTER";
62pub const INVERTED: &str = "INVERTED";
63pub const SKIPPING: &str = "SKIPPING";
64pub const VECTOR: &str = "VECTOR";
65
66pub type RawIntervalExpr = String;
67
68/// Parses create [table] statement
69impl<'a> ParserContext<'a> {
70    pub(crate) fn parse_create(&mut self) -> Result<Statement> {
71        match self.parser.peek_token().token {
72            Token::Word(w) => match w.keyword {
73                Keyword::TABLE => self.parse_create_table(),
74
75                Keyword::SCHEMA | Keyword::DATABASE => self.parse_create_database(),
76
77                Keyword::EXTERNAL => self.parse_create_external_table(),
78
79                Keyword::OR => {
80                    let _ = self.parser.next_token();
81                    self.parser
82                        .expect_keyword(Keyword::REPLACE)
83                        .context(SyntaxSnafu)?;
84                    match self.parser.next_token().token {
85                        Token::Word(w) => match w.keyword {
86                            Keyword::VIEW => self.parse_create_view(true),
87                            Keyword::NoKeyword => {
88                                let uppercase = w.value.to_uppercase();
89                                match uppercase.as_str() {
90                                    FLOW => self.parse_create_flow(true),
91                                    _ => self.unsupported(w.to_string()),
92                                }
93                            }
94                            _ => self.unsupported(w.to_string()),
95                        },
96                        _ => self.unsupported(w.to_string()),
97                    }
98                }
99
100                Keyword::VIEW => {
101                    let _ = self.parser.next_token();
102                    self.parse_create_view(false)
103                }
104
105                #[cfg(feature = "enterprise")]
106                Keyword::TRIGGER => {
107                    let _ = self.parser.next_token();
108                    self.parse_create_trigger()
109                }
110
111                Keyword::NoKeyword => {
112                    let _ = self.parser.next_token();
113                    let uppercase = w.value.to_uppercase();
114                    match uppercase.as_str() {
115                        FLOW => self.parse_create_flow(false),
116                        _ => self.unsupported(w.to_string()),
117                    }
118                }
119                _ => self.unsupported(w.to_string()),
120            },
121            unexpected => self.unsupported(unexpected.to_string()),
122        }
123    }
124
125    /// Parse `CREAVE VIEW` statement.
126    fn parse_create_view(&mut self, or_replace: bool) -> Result<Statement> {
127        let if_not_exists = self.parse_if_not_exist()?;
128        let view_name = self.intern_parse_table_name()?;
129
130        let columns = self.parse_view_columns()?;
131
132        self.parser
133            .expect_keyword(Keyword::AS)
134            .context(SyntaxSnafu)?;
135
136        let query = self.parse_query()?;
137
138        Ok(Statement::CreateView(CreateView {
139            name: view_name,
140            columns,
141            or_replace,
142            query: Box::new(query),
143            if_not_exists,
144        }))
145    }
146
147    fn parse_view_columns(&mut self) -> Result<Vec<Ident>> {
148        let mut columns = vec![];
149        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
150            return Ok(columns);
151        }
152
153        loop {
154            let name = self.parse_column_name().context(SyntaxSnafu)?;
155
156            columns.push(name);
157
158            let comma = self.parser.consume_token(&Token::Comma);
159            if self.parser.consume_token(&Token::RParen) {
160                // allow a trailing comma, even though it's not in standard
161                break;
162            } else if !comma {
163                return self.expected("',' or ')' after column name", self.parser.peek_token());
164            }
165        }
166
167        Ok(columns)
168    }
169
170    fn parse_create_external_table(&mut self) -> Result<Statement> {
171        let _ = self.parser.next_token();
172        self.parser
173            .expect_keyword(Keyword::TABLE)
174            .context(SyntaxSnafu)?;
175        let if_not_exists = self.parse_if_not_exist()?;
176        let table_name = self.intern_parse_table_name()?;
177        let (columns, constraints) = self.parse_columns()?;
178        if !columns.is_empty() {
179            validate_time_index(&columns, &constraints)?;
180        }
181
182        let engine = self.parse_table_engine(common_catalog::consts::FILE_ENGINE)?;
183        let options = self.parse_create_table_options()?;
184        Ok(Statement::CreateExternalTable(CreateExternalTable {
185            name: table_name,
186            columns,
187            constraints,
188            options,
189            if_not_exists,
190            engine,
191        }))
192    }
193
194    fn parse_create_database(&mut self) -> Result<Statement> {
195        let _ = self.parser.next_token();
196        let if_not_exists = self.parse_if_not_exist()?;
197        let database_name = self.parse_object_name().context(error::UnexpectedSnafu {
198            expected: "a database name",
199            actual: self.peek_token_as_string(),
200        })?;
201        let database_name = Self::canonicalize_object_name(database_name)?;
202
203        let options = self
204            .parser
205            .parse_options(Keyword::WITH)
206            .context(SyntaxSnafu)?
207            .into_iter()
208            .map(parse_option_string)
209            .collect::<Result<HashMap<String, OptionValue>>>()?;
210
211        for key in options.keys() {
212            ensure!(
213                validate_database_option(key),
214                InvalidDatabaseOptionSnafu { key: key.clone() }
215            );
216        }
217        if let Some(append_mode) = options.get("append_mode").and_then(|x| x.as_string())
218            && append_mode == "true"
219            && options.contains_key("merge_mode")
220        {
221            return InvalidDatabaseOptionSnafu {
222                key: "merge_mode".to_string(),
223            }
224            .fail();
225        }
226
227        Ok(Statement::CreateDatabase(CreateDatabase {
228            name: database_name,
229            if_not_exists,
230            options: OptionMap::new(options),
231        }))
232    }
233
234    fn parse_create_table(&mut self) -> Result<Statement> {
235        let _ = self.parser.next_token();
236
237        let if_not_exists = self.parse_if_not_exist()?;
238
239        let table_name = self.intern_parse_table_name()?;
240
241        if self.parser.parse_keyword(Keyword::LIKE) {
242            let source_name = self.intern_parse_table_name()?;
243
244            return Ok(Statement::CreateTableLike(CreateTableLike {
245                table_name,
246                source_name,
247            }));
248        }
249
250        let (columns, constraints) = self.parse_columns()?;
251        validate_time_index(&columns, &constraints)?;
252
253        let partitions = self.parse_partitions()?;
254        if let Some(partitions) = &partitions {
255            validate_partitions(&columns, partitions)?;
256        }
257
258        let engine = self.parse_table_engine(default_engine())?;
259        let options = self.parse_create_table_options()?;
260        let create_table = CreateTable {
261            if_not_exists,
262            name: table_name,
263            columns,
264            engine,
265            constraints,
266            options,
267            table_id: 0, // table id is assigned by catalog manager
268            partitions,
269        };
270
271        Ok(Statement::CreateTable(create_table))
272    }
273
274    /// "CREATE FLOW" clause
275    fn parse_create_flow(&mut self, or_replace: bool) -> Result<Statement> {
276        let if_not_exists = self.parse_if_not_exist()?;
277
278        let flow_name = self.intern_parse_table_name()?;
279
280        // make `SINK` case in-sensitive
281        if let Token::Word(word) = self.parser.peek_token().token
282            && word.value.eq_ignore_ascii_case(SINK)
283        {
284            self.parser.next_token();
285        } else {
286            Err(ParserError::ParserError(
287                "Expect `SINK` keyword".to_string(),
288            ))
289            .context(SyntaxSnafu)?
290        }
291        self.parser
292            .expect_keyword(Keyword::TO)
293            .context(SyntaxSnafu)?;
294
295        let output_table_name = self.intern_parse_table_name()?;
296
297        let expire_after = if let Token::Word(w1) = &self.parser.peek_token().token
298            && w1.value.eq_ignore_ascii_case(EXPIRE)
299        {
300            self.parser.next_token();
301            if let Token::Word(w2) = &self.parser.peek_token().token
302                && w2.value.eq_ignore_ascii_case(AFTER)
303            {
304                self.parser.next_token();
305                Some(self.parse_interval_no_month("EXPIRE AFTER")?)
306            } else {
307                None
308            }
309        } else {
310            None
311        };
312
313        let eval_interval = if self
314            .parser
315            .consume_tokens(&[Token::make_keyword("EVAL"), Token::make_keyword("INTERVAL")])
316        {
317            Some(self.parse_interval_no_month("EVAL INTERVAL")?)
318        } else {
319            None
320        };
321
322        let comment = if self.parser.parse_keyword(Keyword::COMMENT) {
323            match self.parser.next_token() {
324                TokenWithSpan {
325                    token: Token::SingleQuotedString(value, ..),
326                    ..
327                } => Some(value),
328                unexpected => {
329                    return self
330                        .parser
331                        .expected("string", unexpected)
332                        .context(SyntaxSnafu);
333                }
334            }
335        } else {
336            None
337        };
338
339        self.parser
340            .expect_keyword(Keyword::AS)
341            .context(SyntaxSnafu)?;
342
343        let query = Box::new(self.parse_sql_or_tql(true)?);
344
345        Ok(Statement::CreateFlow(CreateFlow {
346            flow_name,
347            sink_table_name: output_table_name,
348            or_replace,
349            if_not_exists,
350            expire_after,
351            eval_interval,
352            comment,
353            query,
354        }))
355    }
356
357    fn parse_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
358        let start_loc = self.parser.peek_token().span.start;
359        let start_index = location_to_index(self.sql, &start_loc);
360
361        // only accept sql or tql
362        let query = match self.parser.peek_token().token {
363            Token::Word(w) => match w.keyword {
364                Keyword::SELECT => self.parse_query(),
365                Keyword::NoKeyword
366                    if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
367                {
368                    self.parse_tql(require_now_expr)
369                }
370
371                _ => self.unsupported(self.peek_token_as_string()),
372            },
373            _ => self.unsupported(self.peek_token_as_string()),
374        }?;
375
376        let end_token = self.parser.peek_token();
377
378        let raw_query = if end_token == Token::EOF {
379            &self.sql[start_index..]
380        } else {
381            let end_loc = end_token.span.end;
382            let end_index = location_to_index(self.sql, &end_loc);
383            &self.sql[start_index..end_index.min(self.sql.len())]
384        };
385        let raw_query = raw_query.trim_end_matches(";");
386        let query = SqlOrTql::try_from_statement(query, raw_query)?;
387        Ok(query)
388    }
389
390    /// Parse the interval expr to duration in seconds.
391    fn parse_interval_no_month(&mut self, context: &str) -> Result<i64> {
392        let interval = self.parse_interval_month_day_nano()?.0;
393        if interval.months != 0 {
394            return InvalidIntervalSnafu {
395                reason: format!("Interval with months is not allowed in {context}"),
396            }
397            .fail();
398        }
399        Ok(
400            interval.nanoseconds / 1_000_000_000
401                + interval.days as i64 * 60 * 60 * 24
402                + interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, // 1 month=365.25/12=30.44 days
403                                                                       // this is to keep the same as https://docs.rs/humantime/latest/humantime/fn.parse_duration.html
404                                                                       // which we use in database to parse i.e. ttl interval and many other intervals
405        )
406    }
407
408    /// Parse interval expr to [`IntervalMonthDayNano`].
409    fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> {
410        let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
411        let raw_interval_expr = interval_expr.to_string();
412        let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone(), false)?
413            .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
414            .ok()
415            .with_context(|| InvalidIntervalSnafu {
416                reason: format!("cannot cast {} to interval type", interval_expr),
417            })?;
418        if let ScalarValue::IntervalMonthDayNano(Some(interval)) = interval {
419            Ok((interval, raw_interval_expr))
420        } else {
421            unreachable!()
422        }
423    }
424
425    fn parse_if_not_exist(&mut self) -> Result<bool> {
426        match self.parser.peek_token().token {
427            Token::Word(w) if Keyword::IF != w.keyword => return Ok(false),
428            _ => {}
429        }
430
431        if self.parser.parse_keywords(&[Keyword::IF, Keyword::NOT]) {
432            return self
433                .parser
434                .expect_keyword(Keyword::EXISTS)
435                .map(|_| true)
436                .context(UnexpectedSnafu {
437                    expected: "EXISTS",
438                    actual: self.peek_token_as_string(),
439                });
440        }
441
442        if self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]) {
443            return UnsupportedSnafu { keyword: "EXISTS" }.fail();
444        }
445
446        Ok(false)
447    }
448
449    fn parse_create_table_options(&mut self) -> Result<OptionMap> {
450        let options = self
451            .parser
452            .parse_options(Keyword::WITH)
453            .context(SyntaxSnafu)?
454            .into_iter()
455            .map(parse_option_string)
456            .collect::<Result<HashMap<String, OptionValue>>>()?;
457        for key in options.keys() {
458            ensure!(validate_table_option(key), InvalidTableOptionSnafu { key });
459        }
460        Ok(OptionMap::new(options))
461    }
462
463    /// "PARTITION ON COLUMNS (...)" clause
464    fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
465        if !self.parser.parse_keyword(Keyword::PARTITION) {
466            return Ok(None);
467        }
468        self.parser
469            .expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
470            .context(error::UnexpectedSnafu {
471                expected: "ON, COLUMNS",
472                actual: self.peek_token_as_string(),
473            })?;
474
475        let raw_column_list = self
476            .parser
477            .parse_parenthesized_column_list(Mandatory, false)
478            .context(error::SyntaxSnafu)?;
479        let column_list = raw_column_list
480            .into_iter()
481            .map(Self::canonicalize_identifier)
482            .collect();
483
484        let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
485
486        Ok(Some(Partitions { column_list, exprs }))
487    }
488
489    fn parse_partition_entry(&mut self) -> Result<Expr> {
490        self.parser.parse_expr().context(error::SyntaxSnafu)
491    }
492
493    /// Parse a comma-separated list wrapped by "()", and of which all items accepted by `F`
494    fn parse_comma_separated<T, F>(&mut self, mut f: F) -> Result<Vec<T>>
495    where
496        F: FnMut(&mut ParserContext<'a>) -> Result<T>,
497    {
498        self.parser
499            .expect_token(&Token::LParen)
500            .context(error::UnexpectedSnafu {
501                expected: "(",
502                actual: self.peek_token_as_string(),
503            })?;
504
505        let mut values = vec![];
506        while self.parser.peek_token() != Token::RParen {
507            values.push(f(self)?);
508            if !self.parser.consume_token(&Token::Comma) {
509                break;
510            }
511        }
512
513        self.parser
514            .expect_token(&Token::RParen)
515            .context(error::UnexpectedSnafu {
516                expected: ")",
517                actual: self.peek_token_as_string(),
518            })?;
519
520        Ok(values)
521    }
522
523    /// Parse the columns and constraints.
524    fn parse_columns(&mut self) -> Result<(Vec<Column>, Vec<TableConstraint>)> {
525        let mut columns = vec![];
526        let mut constraints = vec![];
527        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
528            return Ok((columns, constraints));
529        }
530
531        loop {
532            if let Some(constraint) = self.parse_optional_table_constraint()? {
533                constraints.push(constraint);
534            } else if let Token::Word(_) = self.parser.peek_token().token {
535                self.parse_column(&mut columns, &mut constraints)?;
536            } else {
537                return self.expected(
538                    "column name or constraint definition",
539                    self.parser.peek_token(),
540                );
541            }
542            let comma = self.parser.consume_token(&Token::Comma);
543            if self.parser.consume_token(&Token::RParen) {
544                // allow a trailing comma, even though it's not in standard
545                break;
546            } else if !comma {
547                return self.expected(
548                    "',' or ')' after column definition",
549                    self.parser.peek_token(),
550                );
551            }
552        }
553
554        Ok((columns, constraints))
555    }
556
557    fn parse_column(
558        &mut self,
559        columns: &mut Vec<Column>,
560        constraints: &mut Vec<TableConstraint>,
561    ) -> Result<()> {
562        let mut column = self.parse_column_def()?;
563
564        let mut time_index_opt_idx = None;
565        for (index, opt) in column.options().iter().enumerate() {
566            if let ColumnOption::DialectSpecific(tokens) = &opt.option
567                && matches!(
568                    &tokens[..],
569                    [
570                        Token::Word(Word {
571                            keyword: Keyword::TIME,
572                            ..
573                        }),
574                        Token::Word(Word {
575                            keyword: Keyword::INDEX,
576                            ..
577                        })
578                    ]
579                )
580            {
581                ensure!(
582                    time_index_opt_idx.is_none(),
583                    InvalidColumnOptionSnafu {
584                        name: column.name().to_string(),
585                        msg: "duplicated time index",
586                    }
587                );
588                time_index_opt_idx = Some(index);
589
590                let constraint = TableConstraint::TimeIndex {
591                    column: Ident::new(column.name().value.clone()),
592                };
593                constraints.push(constraint);
594            }
595        }
596
597        if let Some(index) = time_index_opt_idx {
598            ensure!(
599                !column.options().contains(&ColumnOptionDef {
600                    option: ColumnOption::Null,
601                    name: None,
602                }),
603                InvalidColumnOptionSnafu {
604                    name: column.name().to_string(),
605                    msg: "time index column can't be null",
606                }
607            );
608
609            // The timestamp type may be an alias type, we have to retrieve the actual type.
610            let data_type = get_unalias_type(column.data_type());
611            ensure!(
612                matches!(data_type, DataType::Timestamp(_, _)),
613                InvalidColumnOptionSnafu {
614                    name: column.name().to_string(),
615                    msg: "time index column data type should be timestamp",
616                }
617            );
618
619            let not_null_opt = ColumnOptionDef {
620                option: ColumnOption::NotNull,
621                name: None,
622            };
623
624            if !column.options().contains(&not_null_opt) {
625                column.mut_options().push(not_null_opt);
626            }
627
628            let _ = column.mut_options().remove(index);
629        }
630
631        columns.push(column);
632
633        Ok(())
634    }
635
636    /// Parse the column name and check if it's valid.
637    fn parse_column_name(&mut self) -> std::result::Result<Ident, ParserError> {
638        let name = self.parser.parse_identifier()?;
639        if name.quote_style.is_none() &&
640        // "ALL_KEYWORDS" are sorted.
641            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()
642        {
643            return Err(ParserError::ParserError(format!(
644                "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
645                &name.value
646            )));
647        }
648
649        Ok(name)
650    }
651
652    pub fn parse_column_def(&mut self) -> Result<Column> {
653        let name = self.parse_column_name().context(SyntaxSnafu)?;
654        let parser = &mut self.parser;
655
656        ensure!(
657            !(name.quote_style.is_none() &&
658            // "ALL_KEYWORDS" are sorted.
659            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()),
660            InvalidSqlSnafu {
661                msg: format!(
662                    "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
663                    &name.value
664                ),
665            }
666        );
667
668        let mut extensions = ColumnExtensions::default();
669
670        let data_type = parser.parse_data_type().context(SyntaxSnafu)?;
671        // Must immediately parse the JSON datatype format because it is closely after the "JSON"
672        // datatype, like this: "JSON(format = ...)".
673        if matches!(data_type, DataType::JSON) {
674            extensions.json_datatype_options = json::parse_json_datatype_options(parser)?;
675        }
676
677        let mut options = vec![];
678        loop {
679            if parser.parse_keyword(Keyword::CONSTRAINT) {
680                let name = Some(parser.parse_identifier().context(SyntaxSnafu)?);
681                if let Some(option) = Self::parse_optional_column_option(parser)? {
682                    options.push(ColumnOptionDef { name, option });
683                } else {
684                    return parser
685                        .expected(
686                            "constraint details after CONSTRAINT <name>",
687                            parser.peek_token(),
688                        )
689                        .context(SyntaxSnafu);
690                }
691            } else if let Some(option) = Self::parse_optional_column_option(parser)? {
692                options.push(ColumnOptionDef { name: None, option });
693            } else if !Self::parse_column_extensions(parser, &name, &data_type, &mut extensions)? {
694                break;
695            };
696        }
697
698        Ok(Column {
699            column_def: ColumnDef {
700                name: Self::canonicalize_identifier(name),
701                data_type,
702                options,
703            },
704            extensions,
705        })
706    }
707
708    fn parse_optional_column_option(parser: &mut Parser<'_>) -> Result<Option<ColumnOption>> {
709        if parser.parse_keywords(&[Keyword::CHARACTER, Keyword::SET]) {
710            Ok(Some(ColumnOption::CharacterSet(
711                parser.parse_object_name(false).context(SyntaxSnafu)?,
712            )))
713        } else if parser.parse_keywords(&[Keyword::NOT, Keyword::NULL]) {
714            Ok(Some(ColumnOption::NotNull))
715        } else if parser.parse_keywords(&[Keyword::COMMENT]) {
716            match parser.next_token() {
717                TokenWithSpan {
718                    token: Token::SingleQuotedString(value, ..),
719                    ..
720                } => Ok(Some(ColumnOption::Comment(value))),
721                unexpected => parser.expected("string", unexpected).context(SyntaxSnafu),
722            }
723        } else if parser.parse_keyword(Keyword::NULL) {
724            Ok(Some(ColumnOption::Null))
725        } else if parser.parse_keyword(Keyword::DEFAULT) {
726            Ok(Some(ColumnOption::Default(
727                parser.parse_expr().context(SyntaxSnafu)?,
728            )))
729        } else if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
730            Ok(Some(ColumnOption::Unique {
731                is_primary: true,
732                characteristics: None,
733            }))
734        } else if parser.parse_keyword(Keyword::UNIQUE) {
735            Ok(Some(ColumnOption::Unique {
736                is_primary: false,
737                characteristics: None,
738            }))
739        } else if parser.parse_keywords(&[Keyword::TIME, Keyword::INDEX]) {
740            // Use a DialectSpecific option for time index
741            Ok(Some(ColumnOption::DialectSpecific(vec![
742                Token::Word(Word {
743                    value: "TIME".to_string(),
744                    quote_style: None,
745                    keyword: Keyword::TIME,
746                }),
747                Token::Word(Word {
748                    value: "INDEX".to_string(),
749                    quote_style: None,
750                    keyword: Keyword::INDEX,
751                }),
752            ])))
753        } else {
754            Ok(None)
755        }
756    }
757
758    /// Parse a column option extensions.
759    ///
760    /// This function will handle:
761    /// - Vector type
762    /// - Indexes
763    fn parse_column_extensions(
764        parser: &mut Parser<'_>,
765        column_name: &Ident,
766        column_type: &DataType,
767        column_extensions: &mut ColumnExtensions,
768    ) -> Result<bool> {
769        if let DataType::Custom(name, tokens) = column_type
770            && name.0.len() == 1
771            && &name.0[0].to_string_unquoted().to_uppercase() == "VECTOR"
772        {
773            ensure!(
774                tokens.len() == 1,
775                InvalidColumnOptionSnafu {
776                    name: column_name.to_string(),
777                    msg: "VECTOR type should have dimension",
778                }
779            );
780
781            let dimension =
782                tokens[0]
783                    .parse::<u32>()
784                    .ok()
785                    .with_context(|| InvalidColumnOptionSnafu {
786                        name: column_name.to_string(),
787                        msg: "dimension should be a positive integer",
788                    })?;
789
790            let options = OptionMap::from([(VECTOR_OPT_DIM.to_string(), dimension.to_string())]);
791            column_extensions.vector_options = Some(options);
792        }
793
794        // parse index options in column definition
795        let mut is_index_declared = false;
796
797        // skipping index
798        if let Token::Word(word) = parser.peek_token().token
799            && word.value.eq_ignore_ascii_case(SKIPPING)
800        {
801            parser.next_token();
802            // Consume `INDEX` keyword
803            ensure!(
804                parser.parse_keyword(Keyword::INDEX),
805                InvalidColumnOptionSnafu {
806                    name: column_name.to_string(),
807                    msg: "expect INDEX after SKIPPING keyword",
808                }
809            );
810            ensure!(
811                column_extensions.skipping_index_options.is_none(),
812                InvalidColumnOptionSnafu {
813                    name: column_name.to_string(),
814                    msg: "duplicated SKIPPING index option",
815                }
816            );
817
818            let options = parser
819                .parse_options(Keyword::WITH)
820                .context(error::SyntaxSnafu)?
821                .into_iter()
822                .map(parse_option_string)
823                .collect::<Result<Vec<_>>>()?;
824
825            for (key, _) in options.iter() {
826                ensure!(
827                    validate_column_skipping_index_create_option(key),
828                    InvalidColumnOptionSnafu {
829                        name: column_name.to_string(),
830                        msg: format!("invalid SKIPPING INDEX option: {key}"),
831                    }
832                );
833            }
834
835            let options = OptionMap::new(options);
836            column_extensions.skipping_index_options = Some(options);
837            is_index_declared |= true;
838        }
839
840        // fulltext index
841        if parser.parse_keyword(Keyword::FULLTEXT) {
842            // Consume `INDEX` keyword
843            ensure!(
844                parser.parse_keyword(Keyword::INDEX),
845                InvalidColumnOptionSnafu {
846                    name: column_name.to_string(),
847                    msg: "expect INDEX after FULLTEXT keyword",
848                }
849            );
850
851            ensure!(
852                column_extensions.fulltext_index_options.is_none(),
853                InvalidColumnOptionSnafu {
854                    name: column_name.to_string(),
855                    msg: "duplicated FULLTEXT INDEX option",
856                }
857            );
858
859            let column_type = get_unalias_type(column_type);
860            let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
861            ensure!(
862                data_type == ConcreteDataType::string_datatype(),
863                InvalidColumnOptionSnafu {
864                    name: column_name.to_string(),
865                    msg: "FULLTEXT index only supports string type",
866                }
867            );
868
869            let options = parser
870                .parse_options(Keyword::WITH)
871                .context(error::SyntaxSnafu)?
872                .into_iter()
873                .map(parse_option_string)
874                .collect::<Result<Vec<_>>>()?;
875
876            for (key, _) in options.iter() {
877                ensure!(
878                    validate_column_fulltext_create_option(key),
879                    InvalidColumnOptionSnafu {
880                        name: column_name.to_string(),
881                        msg: format!("invalid FULLTEXT INDEX option: {key}"),
882                    }
883                );
884            }
885
886            let options = OptionMap::new(options);
887            column_extensions.fulltext_index_options = Some(options);
888            is_index_declared |= true;
889        }
890
891        // inverted index
892        if let Token::Word(word) = parser.peek_token().token
893            && word.value.eq_ignore_ascii_case(INVERTED)
894        {
895            parser.next_token();
896            // Consume `INDEX` keyword
897            ensure!(
898                parser.parse_keyword(Keyword::INDEX),
899                InvalidColumnOptionSnafu {
900                    name: column_name.to_string(),
901                    msg: "expect INDEX after INVERTED keyword",
902                }
903            );
904
905            ensure!(
906                column_extensions.inverted_index_options.is_none(),
907                InvalidColumnOptionSnafu {
908                    name: column_name.to_string(),
909                    msg: "duplicated INVERTED index option",
910                }
911            );
912
913            // inverted index doesn't have options, skipping `WITH`
914            // try cache `WITH` and throw error
915            let with_token = parser.peek_token();
916            ensure!(
917                with_token.token
918                    != Token::Word(Word {
919                        value: "WITH".to_string(),
920                        keyword: Keyword::WITH,
921                        quote_style: None,
922                    }),
923                InvalidColumnOptionSnafu {
924                    name: column_name.to_string(),
925                    msg: "INVERTED index doesn't support options",
926                }
927            );
928
929            column_extensions.inverted_index_options = Some(OptionMap::default());
930            is_index_declared |= true;
931        }
932
933        // vector index
934        if let Token::Word(word) = parser.peek_token().token
935            && word.value.eq_ignore_ascii_case(VECTOR)
936        {
937            parser.next_token();
938            // Consume `INDEX` keyword
939            ensure!(
940                parser.parse_keyword(Keyword::INDEX),
941                InvalidColumnOptionSnafu {
942                    name: column_name.to_string(),
943                    msg: "expect INDEX after VECTOR keyword",
944                }
945            );
946
947            ensure!(
948                column_extensions.vector_index_options.is_none(),
949                InvalidColumnOptionSnafu {
950                    name: column_name.to_string(),
951                    msg: "duplicated VECTOR INDEX option",
952                }
953            );
954
955            // Check that column is a vector type
956            let column_type = get_unalias_type(column_type);
957            let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
958            ensure!(
959                matches!(data_type, ConcreteDataType::Vector(_)),
960                InvalidColumnOptionSnafu {
961                    name: column_name.to_string(),
962                    msg: "VECTOR INDEX only supports Vector type columns",
963                }
964            );
965
966            let options = parser
967                .parse_options(Keyword::WITH)
968                .context(error::SyntaxSnafu)?
969                .into_iter()
970                .map(parse_option_string)
971                .collect::<Result<Vec<_>>>()?;
972
973            for (key, _) in options.iter() {
974                ensure!(
975                    validate_column_vector_index_create_option(key),
976                    InvalidColumnOptionSnafu {
977                        name: column_name.to_string(),
978                        msg: format!("invalid VECTOR INDEX option: {key}"),
979                    }
980                );
981            }
982
983            let options = OptionMap::new(options);
984            column_extensions.vector_index_options = Some(options);
985            is_index_declared |= true;
986        }
987
988        Ok(is_index_declared)
989    }
990
991    fn parse_optional_table_constraint(&mut self) -> Result<Option<TableConstraint>> {
992        match self.parser.next_token() {
993            TokenWithSpan {
994                token: Token::Word(w),
995                ..
996            } if w.keyword == Keyword::PRIMARY => {
997                self.parser
998                    .expect_keyword(Keyword::KEY)
999                    .context(error::UnexpectedSnafu {
1000                        expected: "KEY",
1001                        actual: self.peek_token_as_string(),
1002                    })?;
1003                let raw_columns = self
1004                    .parser
1005                    .parse_parenthesized_column_list(Mandatory, false)
1006                    .context(error::SyntaxSnafu)?;
1007                let columns = raw_columns
1008                    .into_iter()
1009                    .map(Self::canonicalize_identifier)
1010                    .collect();
1011                Ok(Some(TableConstraint::PrimaryKey { columns }))
1012            }
1013            TokenWithSpan {
1014                token: Token::Word(w),
1015                ..
1016            } if w.keyword == Keyword::TIME => {
1017                self.parser
1018                    .expect_keyword(Keyword::INDEX)
1019                    .context(error::UnexpectedSnafu {
1020                        expected: "INDEX",
1021                        actual: self.peek_token_as_string(),
1022                    })?;
1023
1024                let raw_columns = self
1025                    .parser
1026                    .parse_parenthesized_column_list(Mandatory, false)
1027                    .context(error::SyntaxSnafu)?;
1028                let mut columns = raw_columns
1029                    .into_iter()
1030                    .map(Self::canonicalize_identifier)
1031                    .collect::<Vec<_>>();
1032
1033                ensure!(
1034                    columns.len() == 1,
1035                    InvalidTimeIndexSnafu {
1036                        msg: "it should contain only one column in time index",
1037                    }
1038                );
1039
1040                Ok(Some(TableConstraint::TimeIndex {
1041                    column: columns.pop().unwrap(),
1042                }))
1043            }
1044            _ => {
1045                self.parser.prev_token();
1046                Ok(None)
1047            }
1048        }
1049    }
1050
1051    /// Parses the set of valid formats
1052    fn parse_table_engine(&mut self, default: &str) -> Result<String> {
1053        if !self.consume_token(ENGINE) {
1054            return Ok(default.to_string());
1055        }
1056
1057        self.parser
1058            .expect_token(&Token::Eq)
1059            .context(error::UnexpectedSnafu {
1060                expected: "=",
1061                actual: self.peek_token_as_string(),
1062            })?;
1063
1064        let token = self.parser.next_token();
1065        if let Token::Word(w) = token.token {
1066            Ok(w.value)
1067        } else {
1068            self.expected("'Engine' is missing", token)
1069        }
1070    }
1071}
1072
1073fn validate_time_index(columns: &[Column], constraints: &[TableConstraint]) -> Result<()> {
1074    let time_index_constraints: Vec<_> = constraints
1075        .iter()
1076        .filter_map(|c| match c {
1077            TableConstraint::TimeIndex { column } => Some(column),
1078            _ => None,
1079        })
1080        .unique()
1081        .collect();
1082
1083    ensure!(!time_index_constraints.is_empty(), MissingTimeIndexSnafu);
1084    ensure!(
1085        time_index_constraints.len() == 1,
1086        InvalidTimeIndexSnafu {
1087            msg: format!(
1088                "expected only one time index constraint but actual {}",
1089                time_index_constraints.len()
1090            ),
1091        }
1092    );
1093
1094    // It's safe to use time_index_constraints[0][0],
1095    // we already check the bound above.
1096    let time_index_column_ident = &time_index_constraints[0];
1097    let time_index_column = columns
1098        .iter()
1099        .find(|c| c.name().value == *time_index_column_ident.value)
1100        .with_context(|| InvalidTimeIndexSnafu {
1101            msg: format!(
1102                "time index column {} not found in columns",
1103                time_index_column_ident
1104            ),
1105        })?;
1106
1107    let time_index_data_type = get_unalias_type(time_index_column.data_type());
1108    ensure!(
1109        matches!(time_index_data_type, DataType::Timestamp(_, _)),
1110        InvalidColumnOptionSnafu {
1111            name: time_index_column.name().to_string(),
1112            msg: "time index column data type should be timestamp",
1113        }
1114    );
1115
1116    Ok(())
1117}
1118
1119fn get_unalias_type(data_type: &DataType) -> DataType {
1120    match data_type {
1121        DataType::Custom(name, tokens) if name.0.len() == 1 && tokens.is_empty() => {
1122            if let Some(real_type) =
1123                get_data_type_by_alias_name(name.0[0].to_string_unquoted().as_str())
1124            {
1125                real_type
1126            } else {
1127                data_type.clone()
1128            }
1129        }
1130        _ => data_type.clone(),
1131    }
1132}
1133
1134fn validate_partitions(columns: &[Column], partitions: &Partitions) -> Result<()> {
1135    let partition_columns = ensure_partition_columns_defined(columns, partitions)?;
1136
1137    ensure_exprs_are_binary(&partitions.exprs, &partition_columns)?;
1138
1139    Ok(())
1140}
1141
1142/// Ensure all exprs are binary expr and all the columns are defined in the column list.
1143fn ensure_exprs_are_binary(exprs: &[Expr], columns: &[&Column]) -> Result<()> {
1144    for expr in exprs {
1145        // The first level must be binary expr
1146        if let Expr::BinaryOp { left, op: _, right } = expr {
1147            ensure_one_expr(left, columns)?;
1148            ensure_one_expr(right, columns)?;
1149        } else {
1150            return error::InvalidSqlSnafu {
1151                msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1152            }
1153            .fail();
1154        }
1155    }
1156    Ok(())
1157}
1158
1159/// Check if the expr is a binary expr, an ident or a literal value.
1160/// If is ident, then check it is in the column list.
1161/// This recursive function is intended to be used by [ensure_exprs_are_binary].
1162fn ensure_one_expr(expr: &Expr, columns: &[&Column]) -> Result<()> {
1163    match expr {
1164        Expr::BinaryOp { left, op: _, right } => {
1165            ensure_one_expr(left, columns)?;
1166            ensure_one_expr(right, columns)?;
1167            Ok(())
1168        }
1169        Expr::Identifier(ident) => {
1170            let column_name = &ident.value;
1171            ensure!(
1172                columns.iter().any(|c| &c.name().value == column_name),
1173                error::InvalidSqlSnafu {
1174                    msg: format!(
1175                        "Column {:?} in rule expr is not referenced in PARTITION ON",
1176                        column_name
1177                    ),
1178                }
1179            );
1180            Ok(())
1181        }
1182        Expr::Value(_) => Ok(()),
1183        Expr::UnaryOp { expr, .. } => {
1184            ensure_one_expr(expr, columns)?;
1185            Ok(())
1186        }
1187        _ => error::InvalidSqlSnafu {
1188            msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1189        }
1190        .fail(),
1191    }
1192}
1193
1194/// Ensure that all columns used in "PARTITION ON COLUMNS" are defined in create table.
1195fn ensure_partition_columns_defined<'a>(
1196    columns: &'a [Column],
1197    partitions: &'a Partitions,
1198) -> Result<Vec<&'a Column>> {
1199    partitions
1200        .column_list
1201        .iter()
1202        .map(|x| {
1203            let x = ParserContext::canonicalize_identifier(x.clone());
1204            // Normally the columns in "create table" won't be too many,
1205            // a linear search to find the target every time is fine.
1206            columns
1207                .iter()
1208                .find(|c| *c.name().value == x.value)
1209                .context(error::InvalidSqlSnafu {
1210                    msg: format!("Partition column {:?} not defined", x.value),
1211                })
1212        })
1213        .collect::<Result<Vec<&Column>>>()
1214}
1215
1216#[cfg(test)]
1217mod tests {
1218    use std::assert_matches::assert_matches;
1219    use std::collections::HashMap;
1220
1221    use common_catalog::consts::FILE_ENGINE;
1222    use common_error::ext::ErrorExt;
1223    use sqlparser::ast::ColumnOption::NotNull;
1224    use sqlparser::ast::{BinaryOperator, Expr, ObjectName, ObjectNamePart, Value};
1225    use sqlparser::dialect::GenericDialect;
1226    use sqlparser::tokenizer::Tokenizer;
1227
1228    use super::*;
1229    use crate::dialect::GreptimeDbDialect;
1230    use crate::parser::ParseOptions;
1231
1232    #[test]
1233    fn test_parse_create_table_like() {
1234        let sql = "CREATE TABLE t1 LIKE t2";
1235        let stmts =
1236            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1237                .unwrap();
1238
1239        assert_eq!(1, stmts.len());
1240        match &stmts[0] {
1241            Statement::CreateTableLike(c) => {
1242                assert_eq!(c.table_name.to_string(), "t1");
1243                assert_eq!(c.source_name.to_string(), "t2");
1244            }
1245            _ => unreachable!(),
1246        }
1247    }
1248
1249    #[test]
1250    fn test_validate_external_table_options() {
1251        let sql = "CREATE EXTERNAL TABLE city (
1252            host string,
1253            ts timestamp,
1254            cpu float64 default 0,
1255            memory float64,
1256            TIME INDEX (ts),
1257            PRIMARY KEY(ts, host)
1258        ) with(location='/var/data/city.csv',format='csv',foo='bar');";
1259
1260        let result =
1261            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1262        assert!(matches!(
1263            result,
1264            Err(error::Error::InvalidTableOption { .. })
1265        ));
1266    }
1267
1268    #[test]
1269    fn test_parse_create_external_table() {
1270        struct Test<'a> {
1271            sql: &'a str,
1272            expected_table_name: &'a str,
1273            expected_options: HashMap<&'a str, &'a str>,
1274            expected_engine: &'a str,
1275            expected_if_not_exist: bool,
1276        }
1277
1278        let tests = [
1279            Test {
1280                sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
1281                expected_table_name: "city",
1282                expected_options: HashMap::from([
1283                    ("location", "/var/data/city.csv"),
1284                    ("format", "csv"),
1285                ]),
1286                expected_engine: FILE_ENGINE,
1287                expected_if_not_exist: false,
1288            },
1289            Test {
1290                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');",
1291                expected_table_name: "city",
1292                expected_options: HashMap::from([
1293                    ("location", "/var/data/city.csv"),
1294                    ("format", "csv"),
1295                ]),
1296                expected_engine: "foo",
1297                expected_if_not_exist: true,
1298            },
1299            Test {
1300                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv','compaction.type'='bar');",
1301                expected_table_name: "city",
1302                expected_options: HashMap::from([
1303                    ("location", "/var/data/city.csv"),
1304                    ("format", "csv"),
1305                    ("compaction.type", "bar"),
1306                ]),
1307                expected_engine: "foo",
1308                expected_if_not_exist: true,
1309            },
1310        ];
1311
1312        for test in tests {
1313            let stmts = ParserContext::create_with_dialect(
1314                test.sql,
1315                &GreptimeDbDialect {},
1316                ParseOptions::default(),
1317            )
1318            .unwrap();
1319            assert_eq!(1, stmts.len());
1320            match &stmts[0] {
1321                Statement::CreateExternalTable(c) => {
1322                    assert_eq!(c.name.to_string(), test.expected_table_name.to_string());
1323                    assert_eq!(c.options.to_str_map(), test.expected_options);
1324                    assert_eq!(c.if_not_exists, test.expected_if_not_exist);
1325                    assert_eq!(c.engine, test.expected_engine);
1326                }
1327                _ => unreachable!(),
1328            }
1329        }
1330    }
1331
1332    #[test]
1333    fn test_parse_create_external_table_with_schema() {
1334        let sql = "CREATE EXTERNAL TABLE city (
1335            host string,
1336            ts timestamp,
1337            cpu float32 default 0,
1338            memory float64,
1339            TIME INDEX (ts),
1340            PRIMARY KEY(ts, host),
1341        ) with(location='/var/data/city.csv',format='csv');";
1342
1343        let options = HashMap::from([("location", "/var/data/city.csv"), ("format", "csv")]);
1344
1345        let stmts =
1346            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1347                .unwrap();
1348        assert_eq!(1, stmts.len());
1349        match &stmts[0] {
1350            Statement::CreateExternalTable(c) => {
1351                assert_eq!(c.name.to_string(), "city");
1352                assert_eq!(c.options.to_str_map(), options);
1353
1354                let columns = &c.columns;
1355                assert_column_def(&columns[0].column_def, "host", "STRING");
1356                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
1357                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
1358                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
1359
1360                let constraints = &c.constraints;
1361                assert_eq!(
1362                    &constraints[0],
1363                    &TableConstraint::TimeIndex {
1364                        column: Ident::new("ts"),
1365                    }
1366                );
1367                assert_eq!(
1368                    &constraints[1],
1369                    &TableConstraint::PrimaryKey {
1370                        columns: vec![Ident::new("ts"), Ident::new("host")]
1371                    }
1372                );
1373            }
1374            _ => unreachable!(),
1375        }
1376    }
1377
1378    #[test]
1379    fn test_parse_create_database() {
1380        let sql = "create database";
1381        let result =
1382            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1383        assert!(
1384            result
1385                .unwrap_err()
1386                .to_string()
1387                .contains("Unexpected token while parsing SQL statement")
1388        );
1389
1390        let sql = "create database prometheus";
1391        let stmts =
1392            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1393                .unwrap();
1394
1395        assert_eq!(1, stmts.len());
1396        match &stmts[0] {
1397            Statement::CreateDatabase(c) => {
1398                assert_eq!(c.name.to_string(), "prometheus");
1399                assert!(!c.if_not_exists);
1400            }
1401            _ => unreachable!(),
1402        }
1403
1404        let sql = "create database if not exists prometheus";
1405        let stmts =
1406            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1407                .unwrap();
1408
1409        assert_eq!(1, stmts.len());
1410        match &stmts[0] {
1411            Statement::CreateDatabase(c) => {
1412                assert_eq!(c.name.to_string(), "prometheus");
1413                assert!(c.if_not_exists);
1414            }
1415            _ => unreachable!(),
1416        }
1417
1418        let sql = "CREATE DATABASE `fOo`";
1419        let result =
1420            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1421        let stmts = result.unwrap();
1422        match &stmts.last().unwrap() {
1423            Statement::CreateDatabase(c) => {
1424                assert_eq!(c.name, vec![Ident::with_quote('`', "fOo")].into());
1425                assert!(!c.if_not_exists);
1426            }
1427            _ => unreachable!(),
1428        }
1429
1430        let sql = "CREATE DATABASE prometheus with (ttl='1h');";
1431        let result =
1432            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1433        let stmts = result.unwrap();
1434        match &stmts[0] {
1435            Statement::CreateDatabase(c) => {
1436                assert_eq!(c.name.to_string(), "prometheus");
1437                assert!(!c.if_not_exists);
1438                assert_eq!(c.options.get("ttl").unwrap(), "1h");
1439            }
1440            _ => unreachable!(),
1441        }
1442    }
1443
1444    #[test]
1445    fn test_parse_create_flow_more_testcases() {
1446        use pretty_assertions::assert_eq;
1447        fn parse_create_flow(sql: &str) -> CreateFlow {
1448            let stmts = ParserContext::create_with_dialect(
1449                sql,
1450                &GreptimeDbDialect {},
1451                ParseOptions::default(),
1452            )
1453            .unwrap();
1454            assert_eq!(1, stmts.len());
1455            match &stmts[0] {
1456                Statement::CreateFlow(c) => c.clone(),
1457                _ => unreachable!(),
1458            }
1459        }
1460        struct CreateFlowWoutQuery {
1461            /// Flow name
1462            pub flow_name: ObjectName,
1463            /// Output (sink) table name
1464            pub sink_table_name: ObjectName,
1465            /// Whether to replace existing task
1466            pub or_replace: bool,
1467            /// Create if not exist
1468            pub if_not_exists: bool,
1469            /// `EXPIRE AFTER`
1470            /// Duration in second as `i64`
1471            pub expire_after: Option<i64>,
1472            /// Comment string
1473            pub comment: Option<String>,
1474        }
1475        let testcases = vec![
1476            (
1477                r"
1478CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1479SINK TO schema_1.table_1
1480EXPIRE AFTER INTERVAL '5 minutes'
1481COMMENT 'test comment'
1482AS
1483SELECT max(c1), min(c2) FROM schema_2.table_2;",
1484                CreateFlowWoutQuery {
1485                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1486                    sink_table_name: ObjectName::from(vec![
1487                        Ident::new("schema_1"),
1488                        Ident::new("table_1"),
1489                    ]),
1490                    or_replace: true,
1491                    if_not_exists: true,
1492                    expire_after: Some(300),
1493                    comment: Some("test comment".to_string()),
1494                },
1495            ),
1496            (
1497                r"
1498CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1499SINK TO schema_1.table_1
1500EXPIRE AFTER INTERVAL '300 s'
1501COMMENT 'test comment'
1502AS
1503SELECT max(c1), min(c2) FROM schema_2.table_2;",
1504                CreateFlowWoutQuery {
1505                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1506                    sink_table_name: ObjectName::from(vec![
1507                        Ident::new("schema_1"),
1508                        Ident::new("table_1"),
1509                    ]),
1510                    or_replace: true,
1511                    if_not_exists: true,
1512                    expire_after: Some(300),
1513                    comment: Some("test comment".to_string()),
1514                },
1515            ),
1516            (
1517                r"
1518CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1519SINK TO schema_1.table_1
1520EXPIRE AFTER '5 minutes'
1521COMMENT 'test comment'
1522AS
1523SELECT max(c1), min(c2) FROM schema_2.table_2;",
1524                CreateFlowWoutQuery {
1525                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1526                    sink_table_name: ObjectName::from(vec![
1527                        Ident::new("schema_1"),
1528                        Ident::new("table_1"),
1529                    ]),
1530                    or_replace: true,
1531                    if_not_exists: true,
1532                    expire_after: Some(300),
1533                    comment: Some("test comment".to_string()),
1534                },
1535            ),
1536            (
1537                r"
1538CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1539SINK TO schema_1.table_1
1540EXPIRE AFTER '300 s'
1541COMMENT 'test comment'
1542AS
1543SELECT max(c1), min(c2) FROM schema_2.table_2;",
1544                CreateFlowWoutQuery {
1545                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1546                    sink_table_name: ObjectName::from(vec![
1547                        Ident::new("schema_1"),
1548                        Ident::new("table_1"),
1549                    ]),
1550                    or_replace: true,
1551                    if_not_exists: true,
1552                    expire_after: Some(300),
1553                    comment: Some("test comment".to_string()),
1554                },
1555            ),
1556            (
1557                r"
1558CREATE FLOW `task_2`
1559SINK TO schema_1.table_1
1560EXPIRE AFTER '2 days 1h 2 min'
1561AS
1562SELECT max(c1), min(c2) FROM schema_2.table_2;",
1563                CreateFlowWoutQuery {
1564                    flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_2")]),
1565                    sink_table_name: ObjectName::from(vec![
1566                        Ident::new("schema_1"),
1567                        Ident::new("table_1"),
1568                    ]),
1569                    or_replace: false,
1570                    if_not_exists: false,
1571                    expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1572                    comment: None,
1573                },
1574            ),
1575            (
1576                r"
1577create flow `task_3`
1578sink to schema_1.table_1
1579expire after '10 minutes'
1580as
1581select max(c1), min(c2) from schema_2.table_2;",
1582                CreateFlowWoutQuery {
1583                    flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_3")]),
1584                    sink_table_name: ObjectName::from(vec![
1585                        Ident::new("schema_1"),
1586                        Ident::new("table_1"),
1587                    ]),
1588                    or_replace: false,
1589                    if_not_exists: false,
1590                    expire_after: Some(600), // 10 minutes in seconds
1591                    comment: None,
1592                },
1593            ),
1594            (
1595                r"
1596create or replace flow if not exists task_4
1597sink to schema_1.table_1
1598expire after interval '2 hours'
1599comment 'lowercase test'
1600as
1601select max(c1), min(c2) from schema_2.table_2;",
1602                CreateFlowWoutQuery {
1603                    flow_name: ObjectName::from(vec![Ident::new("task_4")]),
1604                    sink_table_name: ObjectName::from(vec![
1605                        Ident::new("schema_1"),
1606                        Ident::new("table_1"),
1607                    ]),
1608                    or_replace: true,
1609                    if_not_exists: true,
1610                    expire_after: Some(7200), // 2 hours in seconds
1611                    comment: Some("lowercase test".to_string()),
1612                },
1613            ),
1614        ];
1615
1616        for (sql, expected) in testcases {
1617            let create_task = parse_create_flow(sql);
1618
1619            let expected = CreateFlow {
1620                flow_name: expected.flow_name,
1621                sink_table_name: expected.sink_table_name,
1622                or_replace: expected.or_replace,
1623                if_not_exists: expected.if_not_exists,
1624                expire_after: expected.expire_after,
1625                eval_interval: None,
1626                comment: expected.comment,
1627                // ignore query parse result
1628                query: create_task.query.clone(),
1629            };
1630
1631            assert_eq!(create_task, expected, "input sql is:\n{sql}");
1632            let show_create = create_task.to_string();
1633            let recreated = parse_create_flow(&show_create);
1634            assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1635        }
1636    }
1637
1638    #[test]
1639    fn test_parse_create_flow() {
1640        use pretty_assertions::assert_eq;
1641        fn parse_create_flow(sql: &str) -> CreateFlow {
1642            let stmts = ParserContext::create_with_dialect(
1643                sql,
1644                &GreptimeDbDialect {},
1645                ParseOptions::default(),
1646            )
1647            .unwrap();
1648            assert_eq!(1, stmts.len());
1649            match &stmts[0] {
1650                Statement::CreateFlow(c) => c.clone(),
1651                _ => panic!("{:?}", stmts[0]),
1652            }
1653        }
1654        struct CreateFlowWoutQuery {
1655            /// Flow name
1656            pub flow_name: ObjectName,
1657            /// Output (sink) table name
1658            pub sink_table_name: ObjectName,
1659            /// Whether to replace existing task
1660            pub or_replace: bool,
1661            /// Create if not exist
1662            pub if_not_exists: bool,
1663            /// `EXPIRE AFTER`
1664            /// Duration in second as `i64`
1665            pub expire_after: Option<i64>,
1666            /// Duration for flow evaluation interval
1667            /// Duration in seconds as `i64`
1668            /// If not set, flow will be evaluated based on time window size and other args.
1669            pub eval_interval: Option<i64>,
1670            /// Comment string
1671            pub comment: Option<String>,
1672        }
1673
1674        // create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE AFTER` and `COMMENT`
1675        let testcases = vec![
1676            (
1677                r"
1678CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1679SINK TO schema_1.table_1
1680EXPIRE AFTER INTERVAL '5 minutes'
1681COMMENT 'test comment'
1682AS
1683SELECT max(c1), min(c2) FROM schema_2.table_2;",
1684                CreateFlowWoutQuery {
1685                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1686                    sink_table_name: ObjectName(vec![
1687                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1688                        ObjectNamePart::Identifier(Ident::new("table_1")),
1689                    ]),
1690                    or_replace: true,
1691                    if_not_exists: true,
1692                    expire_after: Some(300),
1693                    eval_interval: None,
1694                    comment: Some("test comment".to_string()),
1695                },
1696            ),
1697            (
1698                r"
1699CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1700SINK TO schema_1.table_1
1701EXPIRE AFTER INTERVAL '300 s'
1702COMMENT 'test comment'
1703AS
1704SELECT max(c1), min(c2) FROM schema_2.table_2;",
1705                CreateFlowWoutQuery {
1706                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1707                    sink_table_name: ObjectName(vec![
1708                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1709                        ObjectNamePart::Identifier(Ident::new("table_1")),
1710                    ]),
1711                    or_replace: true,
1712                    if_not_exists: true,
1713                    expire_after: Some(300),
1714                    eval_interval: None,
1715                    comment: Some("test comment".to_string()),
1716                },
1717            ),
1718            (
1719                r"
1720CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1721SINK TO schema_1.table_1
1722EXPIRE AFTER '5 minutes'
1723EVAL INTERVAL '10 seconds'
1724COMMENT 'test comment'
1725AS
1726SELECT max(c1), min(c2) FROM schema_2.table_2;",
1727                CreateFlowWoutQuery {
1728                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1729                    sink_table_name: ObjectName(vec![
1730                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1731                        ObjectNamePart::Identifier(Ident::new("table_1")),
1732                    ]),
1733                    or_replace: true,
1734                    if_not_exists: true,
1735                    expire_after: Some(300),
1736                    eval_interval: Some(10),
1737                    comment: Some("test comment".to_string()),
1738                },
1739            ),
1740            (
1741                r"
1742CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1743SINK TO schema_1.table_1
1744EXPIRE AFTER '5 minutes'
1745EVAL INTERVAL INTERVAL '10 seconds'
1746COMMENT 'test comment'
1747AS
1748SELECT max(c1), min(c2) FROM schema_2.table_2;",
1749                CreateFlowWoutQuery {
1750                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1751                    sink_table_name: ObjectName(vec![
1752                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1753                        ObjectNamePart::Identifier(Ident::new("table_1")),
1754                    ]),
1755                    or_replace: true,
1756                    if_not_exists: true,
1757                    expire_after: Some(300),
1758                    eval_interval: Some(10),
1759                    comment: Some("test comment".to_string()),
1760                },
1761            ),
1762            (
1763                r"
1764CREATE FLOW `task_2`
1765SINK TO schema_1.table_1
1766EXPIRE AFTER '2 days 1h 2 min'
1767AS
1768SELECT max(c1), min(c2) FROM schema_2.table_2;",
1769                CreateFlowWoutQuery {
1770                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::with_quote(
1771                        '`', "task_2",
1772                    ))]),
1773                    sink_table_name: ObjectName(vec![
1774                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1775                        ObjectNamePart::Identifier(Ident::new("table_1")),
1776                    ]),
1777                    or_replace: false,
1778                    if_not_exists: false,
1779                    expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1780                    eval_interval: None,
1781                    comment: None,
1782                },
1783            ),
1784        ];
1785
1786        for (sql, expected) in testcases {
1787            let create_task = parse_create_flow(sql);
1788
1789            let expected = CreateFlow {
1790                flow_name: expected.flow_name,
1791                sink_table_name: expected.sink_table_name,
1792                or_replace: expected.or_replace,
1793                if_not_exists: expected.if_not_exists,
1794                expire_after: expected.expire_after,
1795                eval_interval: expected.eval_interval,
1796                comment: expected.comment,
1797                // ignore query parse result
1798                query: create_task.query.clone(),
1799            };
1800
1801            assert_eq!(create_task, expected, "input sql is:\n{sql}");
1802            let show_create = create_task.to_string();
1803            let recreated = parse_create_flow(&show_create);
1804            assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1805        }
1806    }
1807
1808    #[test]
1809    fn test_create_flow_no_month() {
1810        let sql = r"
1811CREATE FLOW `task_2`
1812SINK TO schema_1.table_1
1813EXPIRE AFTER '1 month 2 days 1h 2 min'
1814AS
1815SELECT max(c1), min(c2) FROM schema_2.table_2;";
1816        let stmts =
1817            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1818
1819        assert!(
1820            stmts.is_err()
1821                && stmts
1822                    .unwrap_err()
1823                    .to_string()
1824                    .contains("Interval with months is not allowed")
1825        );
1826    }
1827
1828    #[test]
1829    fn test_validate_create() {
1830        let sql = r"
1831CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp TIME INDEX)
1832PARTITION ON COLUMNS(c, a) (
1833    a < 10,
1834    a > 10 AND a < 20,
1835    a > 20 AND c < 100,
1836    a > 20 AND c >= 100
1837)
1838ENGINE=mito";
1839        let result =
1840            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1841        let _ = result.unwrap();
1842
1843        let sql = r"
1844CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
1845PARTITION ON COLUMNS(x) ()
1846ENGINE=mito";
1847        let result =
1848            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1849        assert!(
1850            result
1851                .unwrap_err()
1852                .to_string()
1853                .contains("Partition column \"x\" not defined")
1854        );
1855    }
1856
1857    #[test]
1858    fn test_parse_create_table_with_partitions() {
1859        let sql = r"
1860CREATE TABLE monitor (
1861  host_id    INT,
1862  idc        STRING,
1863  ts         TIMESTAMP,
1864  cpu        DOUBLE DEFAULT 0,
1865  memory     DOUBLE,
1866  TIME INDEX (ts),
1867  PRIMARY KEY (host),
1868)
1869PARTITION ON COLUMNS(idc, host_id) (
1870  idc <= 'hz' AND host_id < 1000,
1871  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
1872  idc > 'sh' AND host_id < 3000,
1873  idc > 'sh' AND host_id >= 3000,
1874)
1875ENGINE=mito";
1876        let result =
1877            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1878                .unwrap();
1879        assert_eq!(result.len(), 1);
1880        match &result[0] {
1881            Statement::CreateTable(c) => {
1882                assert!(c.partitions.is_some());
1883
1884                let partitions = c.partitions.as_ref().unwrap();
1885                let column_list = partitions
1886                    .column_list
1887                    .iter()
1888                    .map(|x| &x.value)
1889                    .collect::<Vec<&String>>();
1890                assert_eq!(column_list, vec!["idc", "host_id"]);
1891
1892                let exprs = &partitions.exprs;
1893
1894                assert_eq!(
1895                    exprs[0],
1896                    Expr::BinaryOp {
1897                        left: Box::new(Expr::BinaryOp {
1898                            left: Box::new(Expr::Identifier("idc".into())),
1899                            op: BinaryOperator::LtEq,
1900                            right: Box::new(Expr::Value(
1901                                Value::SingleQuotedString("hz".to_string()).into()
1902                            ))
1903                        }),
1904                        op: BinaryOperator::And,
1905                        right: Box::new(Expr::BinaryOp {
1906                            left: Box::new(Expr::Identifier("host_id".into())),
1907                            op: BinaryOperator::Lt,
1908                            right: Box::new(Expr::Value(
1909                                Value::Number("1000".to_string(), false).into()
1910                            ))
1911                        })
1912                    }
1913                );
1914                assert_eq!(
1915                    exprs[1],
1916                    Expr::BinaryOp {
1917                        left: Box::new(Expr::BinaryOp {
1918                            left: Box::new(Expr::BinaryOp {
1919                                left: Box::new(Expr::Identifier("idc".into())),
1920                                op: BinaryOperator::Gt,
1921                                right: Box::new(Expr::Value(
1922                                    Value::SingleQuotedString("hz".to_string()).into()
1923                                ))
1924                            }),
1925                            op: BinaryOperator::And,
1926                            right: Box::new(Expr::BinaryOp {
1927                                left: Box::new(Expr::Identifier("idc".into())),
1928                                op: BinaryOperator::LtEq,
1929                                right: Box::new(Expr::Value(
1930                                    Value::SingleQuotedString("sh".to_string()).into()
1931                                ))
1932                            })
1933                        }),
1934                        op: BinaryOperator::And,
1935                        right: Box::new(Expr::BinaryOp {
1936                            left: Box::new(Expr::Identifier("host_id".into())),
1937                            op: BinaryOperator::Lt,
1938                            right: Box::new(Expr::Value(
1939                                Value::Number("2000".to_string(), false).into()
1940                            ))
1941                        })
1942                    }
1943                );
1944                assert_eq!(
1945                    exprs[2],
1946                    Expr::BinaryOp {
1947                        left: Box::new(Expr::BinaryOp {
1948                            left: Box::new(Expr::Identifier("idc".into())),
1949                            op: BinaryOperator::Gt,
1950                            right: Box::new(Expr::Value(
1951                                Value::SingleQuotedString("sh".to_string()).into()
1952                            ))
1953                        }),
1954                        op: BinaryOperator::And,
1955                        right: Box::new(Expr::BinaryOp {
1956                            left: Box::new(Expr::Identifier("host_id".into())),
1957                            op: BinaryOperator::Lt,
1958                            right: Box::new(Expr::Value(
1959                                Value::Number("3000".to_string(), false).into()
1960                            ))
1961                        })
1962                    }
1963                );
1964                assert_eq!(
1965                    exprs[3],
1966                    Expr::BinaryOp {
1967                        left: Box::new(Expr::BinaryOp {
1968                            left: Box::new(Expr::Identifier("idc".into())),
1969                            op: BinaryOperator::Gt,
1970                            right: Box::new(Expr::Value(
1971                                Value::SingleQuotedString("sh".to_string()).into()
1972                            ))
1973                        }),
1974                        op: BinaryOperator::And,
1975                        right: Box::new(Expr::BinaryOp {
1976                            left: Box::new(Expr::Identifier("host_id".into())),
1977                            op: BinaryOperator::GtEq,
1978                            right: Box::new(Expr::Value(
1979                                Value::Number("3000".to_string(), false).into()
1980                            ))
1981                        })
1982                    }
1983                );
1984            }
1985            _ => unreachable!(),
1986        }
1987    }
1988
1989    #[test]
1990    fn test_parse_create_table_with_quoted_partitions() {
1991        let sql = r"
1992CREATE TABLE monitor (
1993  `host_id`    INT,
1994  idc        STRING,
1995  ts         TIMESTAMP,
1996  cpu        DOUBLE DEFAULT 0,
1997  memory     DOUBLE,
1998  TIME INDEX (ts),
1999  PRIMARY KEY (host),
2000)
2001PARTITION ON COLUMNS(IdC, host_id) (
2002  idc <= 'hz' AND host_id < 1000,
2003  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
2004  idc > 'sh' AND host_id < 3000,
2005  idc > 'sh' AND host_id >= 3000,
2006)";
2007        let result =
2008            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2009                .unwrap();
2010        assert_eq!(result.len(), 1);
2011    }
2012
2013    #[test]
2014    fn test_parse_create_table_with_timestamp_index() {
2015        let sql1 = r"
2016CREATE TABLE monitor (
2017  host_id    INT,
2018  idc        STRING,
2019  ts         TIMESTAMP TIME INDEX,
2020  cpu        DOUBLE DEFAULT 0,
2021  memory     DOUBLE,
2022  PRIMARY KEY (host),
2023)
2024ENGINE=mito";
2025        let result1 = ParserContext::create_with_dialect(
2026            sql1,
2027            &GreptimeDbDialect {},
2028            ParseOptions::default(),
2029        )
2030        .unwrap();
2031
2032        if let Statement::CreateTable(c) = &result1[0] {
2033            assert_eq!(c.constraints.len(), 2);
2034            let tc = c.constraints[0].clone();
2035            match tc {
2036                TableConstraint::TimeIndex { column } => {
2037                    assert_eq!(&column.value, "ts");
2038                }
2039                _ => panic!("should be time index constraint"),
2040            };
2041        } else {
2042            panic!("should be create_table statement");
2043        }
2044
2045        // `TIME INDEX` should be in front of `PRIMARY KEY`
2046        // in order to equal the `TIMESTAMP TIME INDEX` constraint options vector
2047        let sql2 = r"
2048CREATE TABLE monitor (
2049  host_id    INT,
2050  idc        STRING,
2051  ts         TIMESTAMP NOT NULL,
2052  cpu        DOUBLE DEFAULT 0,
2053  memory     DOUBLE,
2054  TIME INDEX (ts),
2055  PRIMARY KEY (host),
2056)
2057ENGINE=mito";
2058        let result2 = ParserContext::create_with_dialect(
2059            sql2,
2060            &GreptimeDbDialect {},
2061            ParseOptions::default(),
2062        )
2063        .unwrap();
2064
2065        assert_eq!(result1, result2);
2066
2067        // TIMESTAMP can be NULL which is not equal to above
2068        let sql3 = r"
2069CREATE TABLE monitor (
2070  host_id    INT,
2071  idc        STRING,
2072  ts         TIMESTAMP,
2073  cpu        DOUBLE DEFAULT 0,
2074  memory     DOUBLE,
2075  TIME INDEX (ts),
2076  PRIMARY KEY (host),
2077)
2078ENGINE=mito";
2079
2080        let result3 = ParserContext::create_with_dialect(
2081            sql3,
2082            &GreptimeDbDialect {},
2083            ParseOptions::default(),
2084        )
2085        .unwrap();
2086
2087        assert_ne!(result1, result3);
2088
2089        // BIGINT can't be time index any more
2090        let sql1 = r"
2091CREATE TABLE monitor (
2092  host_id    INT,
2093  idc        STRING,
2094  b          bigint TIME INDEX,
2095  cpu        DOUBLE DEFAULT 0,
2096  memory     DOUBLE,
2097  PRIMARY KEY (host),
2098)
2099ENGINE=mito";
2100        let result1 = ParserContext::create_with_dialect(
2101            sql1,
2102            &GreptimeDbDialect {},
2103            ParseOptions::default(),
2104        );
2105
2106        assert!(
2107            result1
2108                .unwrap_err()
2109                .to_string()
2110                .contains("time index column data type should be timestamp")
2111        );
2112    }
2113
2114    #[test]
2115    fn test_parse_create_table_with_timestamp_index_not_null() {
2116        let sql = r"
2117CREATE TABLE monitor (
2118  host_id    INT,
2119  idc        STRING,
2120  ts         TIMESTAMP TIME INDEX,
2121  cpu        DOUBLE DEFAULT 0,
2122  memory     DOUBLE,
2123  TIME INDEX (ts),
2124  PRIMARY KEY (host),
2125)
2126ENGINE=mito";
2127        let result =
2128            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2129                .unwrap();
2130
2131        assert_eq!(result.len(), 1);
2132        if let Statement::CreateTable(c) = &result[0] {
2133            let ts = c.columns[2].clone();
2134            assert_eq!(ts.name().to_string(), "ts");
2135            assert_eq!(ts.options()[0].option, NotNull);
2136        } else {
2137            panic!("should be create table statement");
2138        }
2139
2140        let sql1 = r"
2141CREATE TABLE monitor (
2142  host_id    INT,
2143  idc        STRING,
2144  ts         TIMESTAMP NOT NULL TIME INDEX,
2145  cpu        DOUBLE DEFAULT 0,
2146  memory     DOUBLE,
2147  TIME INDEX (ts),
2148  PRIMARY KEY (host),
2149)
2150ENGINE=mito";
2151
2152        let result1 = ParserContext::create_with_dialect(
2153            sql1,
2154            &GreptimeDbDialect {},
2155            ParseOptions::default(),
2156        )
2157        .unwrap();
2158        assert_eq!(result, result1);
2159
2160        let sql2 = r"
2161CREATE TABLE monitor (
2162  host_id    INT,
2163  idc        STRING,
2164  ts         TIMESTAMP TIME INDEX NOT NULL,
2165  cpu        DOUBLE DEFAULT 0,
2166  memory     DOUBLE,
2167  TIME INDEX (ts),
2168  PRIMARY KEY (host),
2169)
2170ENGINE=mito";
2171
2172        let result2 = ParserContext::create_with_dialect(
2173            sql2,
2174            &GreptimeDbDialect {},
2175            ParseOptions::default(),
2176        )
2177        .unwrap();
2178        assert_eq!(result, result2);
2179
2180        let sql3 = r"
2181CREATE TABLE monitor (
2182  host_id    INT,
2183  idc        STRING,
2184  ts         TIMESTAMP TIME INDEX NULL NOT,
2185  cpu        DOUBLE DEFAULT 0,
2186  memory     DOUBLE,
2187  TIME INDEX (ts),
2188  PRIMARY KEY (host),
2189)
2190ENGINE=mito";
2191
2192        let result3 = ParserContext::create_with_dialect(
2193            sql3,
2194            &GreptimeDbDialect {},
2195            ParseOptions::default(),
2196        );
2197        assert!(result3.is_err());
2198
2199        let sql4 = r"
2200CREATE TABLE monitor (
2201  host_id    INT,
2202  idc        STRING,
2203  ts         TIMESTAMP TIME INDEX NOT NULL NULL,
2204  cpu        DOUBLE DEFAULT 0,
2205  memory     DOUBLE,
2206  TIME INDEX (ts),
2207  PRIMARY KEY (host),
2208)
2209ENGINE=mito";
2210
2211        let result4 = ParserContext::create_with_dialect(
2212            sql4,
2213            &GreptimeDbDialect {},
2214            ParseOptions::default(),
2215        );
2216        assert!(result4.is_err());
2217
2218        let sql = r"
2219CREATE TABLE monitor (
2220  host_id    INT,
2221  idc        STRING,
2222  ts         TIMESTAMP TIME INDEX DEFAULT CURRENT_TIMESTAMP,
2223  cpu        DOUBLE DEFAULT 0,
2224  memory     DOUBLE,
2225  TIME INDEX (ts),
2226  PRIMARY KEY (host),
2227)
2228ENGINE=mito";
2229
2230        let result =
2231            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2232                .unwrap();
2233
2234        if let Statement::CreateTable(c) = &result[0] {
2235            let tc = c.constraints[0].clone();
2236            match tc {
2237                TableConstraint::TimeIndex { column } => {
2238                    assert_eq!(&column.value, "ts");
2239                }
2240                _ => panic!("should be time index constraint"),
2241            }
2242            let ts = c.columns[2].clone();
2243            assert_eq!(ts.name().to_string(), "ts");
2244            assert!(matches!(ts.options()[0].option, ColumnOption::Default(..)));
2245            assert_eq!(ts.options()[1].option, NotNull);
2246        } else {
2247            unreachable!("should be create table statement");
2248        }
2249    }
2250
2251    #[test]
2252    fn test_parse_partitions_with_error_syntax() {
2253        let sql = r"
2254CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2255PARTITION COLUMNS(c, a) (
2256    a < 10,
2257    a > 10 AND a < 20,
2258    a > 20 AND c < 100,
2259    a > 20 AND c >= 100
2260)
2261ENGINE=mito";
2262        let result =
2263            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2264        assert!(
2265            result
2266                .unwrap_err()
2267                .output_msg()
2268                .contains("sql parser error: Expected: ON, found: COLUMNS")
2269        );
2270    }
2271
2272    #[test]
2273    fn test_parse_partitions_without_rule() {
2274        let sql = r"
2275CREATE TABLE rcx ( a INT, b STRING, c INT, d TIMESTAMP TIME INDEX )
2276PARTITION ON COLUMNS(c, a) ()
2277ENGINE=mito";
2278        ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2279            .unwrap();
2280    }
2281
2282    #[test]
2283    fn test_parse_partitions_unreferenced_column() {
2284        let sql = r"
2285CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2286PARTITION ON COLUMNS(c, a) (
2287    b = 'foo'
2288)
2289ENGINE=mito";
2290        let result =
2291            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2292        assert_eq!(
2293            result.unwrap_err().output_msg(),
2294            "Invalid SQL, error: Column \"b\" in rule expr is not referenced in PARTITION ON"
2295        );
2296    }
2297
2298    #[test]
2299    fn test_parse_partitions_not_binary_expr() {
2300        let sql = r"
2301CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2302PARTITION ON COLUMNS(c, a) (
2303    b
2304)
2305ENGINE=mito";
2306        let result =
2307            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2308        assert_eq!(
2309            result.unwrap_err().output_msg(),
2310            r#"Invalid SQL, error: Partition rule expr Identifier(Ident { value: "b", quote_style: None, span: Span(Location(4,5)..Location(4,6)) }) is not a binary expr"#
2311        );
2312    }
2313
2314    fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {
2315        assert_eq!(column.name.to_string(), name);
2316        assert_eq!(column.data_type.to_string(), data_type);
2317    }
2318
2319    #[test]
2320    pub fn test_parse_create_table() {
2321        let sql = r"create table demo(
2322                             host string,
2323                             ts timestamp,
2324                             cpu float32 default 0,
2325                             memory float64,
2326                             TIME INDEX (ts),
2327                             PRIMARY KEY(ts, host),
2328                             ) engine=mito
2329                             with(ttl='10s');
2330         ";
2331        let result =
2332            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2333                .unwrap();
2334        assert_eq!(1, result.len());
2335        match &result[0] {
2336            Statement::CreateTable(c) => {
2337                assert!(!c.if_not_exists);
2338                assert_eq!("demo", c.name.to_string());
2339                assert_eq!("mito", c.engine);
2340                assert_eq!(4, c.columns.len());
2341                let columns = &c.columns;
2342                assert_column_def(&columns[0].column_def, "host", "STRING");
2343                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
2344                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
2345                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
2346
2347                let constraints = &c.constraints;
2348                assert_eq!(
2349                    &constraints[0],
2350                    &TableConstraint::TimeIndex {
2351                        column: Ident::new("ts"),
2352                    }
2353                );
2354                assert_eq!(
2355                    &constraints[1],
2356                    &TableConstraint::PrimaryKey {
2357                        columns: vec![Ident::new("ts"), Ident::new("host")]
2358                    }
2359                );
2360                // inverted index is merged into column options
2361                assert_eq!(1, c.options.len());
2362                assert_eq!(
2363                    [("ttl", "10s")].into_iter().collect::<HashMap<_, _>>(),
2364                    c.options.to_str_map()
2365                );
2366            }
2367            _ => unreachable!(),
2368        }
2369    }
2370
2371    #[test]
2372    fn test_invalid_index_keys() {
2373        let sql = r"create table demo(
2374                             host string,
2375                             ts int64,
2376                             cpu float64 default 0,
2377                             memory float64,
2378                             TIME INDEX (ts, host),
2379                             PRIMARY KEY(ts, host)) engine=mito;
2380         ";
2381        let result =
2382            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2383        assert!(result.is_err());
2384        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2385    }
2386
2387    #[test]
2388    fn test_duplicated_time_index() {
2389        let sql = r"create table demo(
2390                             host string,
2391                             ts timestamp time index,
2392                             t timestamp time index,
2393                             cpu float64 default 0,
2394                             memory float64,
2395                             TIME INDEX (ts, host),
2396                             PRIMARY KEY(ts, host)) engine=mito;
2397         ";
2398        let result =
2399            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2400        assert!(result.is_err());
2401        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2402
2403        let sql = r"create table demo(
2404                             host string,
2405                             ts timestamp time index,
2406                             cpu float64 default 0,
2407                             t timestamp,
2408                             memory float64,
2409                             TIME INDEX (t),
2410                             PRIMARY KEY(ts, host)) engine=mito;
2411         ";
2412        let result =
2413            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2414        assert!(result.is_err());
2415        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2416    }
2417
2418    #[test]
2419    fn test_invalid_column_name() {
2420        let sql = "create table foo(user string, i timestamp time index)";
2421        let result =
2422            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2423        let err = result.unwrap_err().output_msg();
2424        assert!(err.contains("Cannot use keyword 'user' as column name"));
2425
2426        // If column name is quoted, it's valid even same with keyword.
2427        let sql = r#"
2428            create table foo("user" string, i timestamp time index)
2429        "#;
2430        let result =
2431            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2432        let _ = result.unwrap();
2433    }
2434
2435    #[test]
2436    fn test_incorrect_default_value_issue_3479() {
2437        let sql = r#"CREATE TABLE `ExcePTuRi`(
2438non TIMESTAMP(6) TIME INDEX,
2439`iUSTO` DOUBLE DEFAULT 0.047318541668048164
2440)"#;
2441        let result =
2442            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2443                .unwrap();
2444        assert_eq!(1, result.len());
2445        match &result[0] {
2446            Statement::CreateTable(c) => {
2447                assert_eq!(
2448                    "`iUSTO` DOUBLE DEFAULT 0.047318541668048164",
2449                    c.columns[1].to_string()
2450                );
2451            }
2452            _ => unreachable!(),
2453        }
2454    }
2455
2456    #[test]
2457    fn test_parse_create_view() {
2458        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
2459
2460        let result =
2461            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2462                .unwrap();
2463        match &result[0] {
2464            Statement::CreateView(c) => {
2465                assert_eq!(c.to_string(), sql);
2466                assert!(!c.or_replace);
2467                assert!(!c.if_not_exists);
2468                assert_eq!("test", c.name.to_string());
2469            }
2470            _ => unreachable!(),
2471        }
2472
2473        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS";
2474
2475        let result =
2476            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2477                .unwrap();
2478        match &result[0] {
2479            Statement::CreateView(c) => {
2480                assert_eq!(c.to_string(), sql);
2481                assert!(c.or_replace);
2482                assert!(c.if_not_exists);
2483                assert_eq!("test", c.name.to_string());
2484            }
2485            _ => unreachable!(),
2486        }
2487    }
2488
2489    #[test]
2490    fn test_parse_create_view_invalid_query() {
2491        let sql = "CREATE VIEW test AS DELETE from demo";
2492        let result =
2493            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2494        assert!(result.is_ok_and(|x| x.len() == 1));
2495    }
2496
2497    #[test]
2498    fn test_parse_create_table_fulltext_options() {
2499        let sql1 = r"
2500CREATE TABLE log (
2501    ts TIMESTAMP TIME INDEX,
2502    msg TEXT FULLTEXT INDEX,
2503)";
2504        let result1 = ParserContext::create_with_dialect(
2505            sql1,
2506            &GreptimeDbDialect {},
2507            ParseOptions::default(),
2508        )
2509        .unwrap();
2510
2511        if let Statement::CreateTable(c) = &result1[0] {
2512            c.columns.iter().for_each(|col| {
2513                if col.name().value == "msg" {
2514                    assert!(
2515                        col.extensions
2516                            .fulltext_index_options
2517                            .as_ref()
2518                            .unwrap()
2519                            .is_empty()
2520                    );
2521                }
2522            });
2523        } else {
2524            panic!("should be create_table statement");
2525        }
2526
2527        let sql2 = r"
2528CREATE TABLE log (
2529    ts TIMESTAMP TIME INDEX,
2530    msg STRING FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false')
2531)";
2532        let result2 = ParserContext::create_with_dialect(
2533            sql2,
2534            &GreptimeDbDialect {},
2535            ParseOptions::default(),
2536        )
2537        .unwrap();
2538
2539        if let Statement::CreateTable(c) = &result2[0] {
2540            c.columns.iter().for_each(|col| {
2541                if col.name().value == "msg" {
2542                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2543                    assert_eq!(options.len(), 2);
2544                    assert_eq!(options.get("analyzer").unwrap(), "English");
2545                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2546                }
2547            });
2548        } else {
2549            panic!("should be create_table statement");
2550        }
2551
2552        let sql3 = r"
2553CREATE TABLE log (
2554    ts TIMESTAMP TIME INDEX,
2555    msg1 TINYTEXT FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false'),
2556    msg2 CHAR(20) FULLTEXT INDEX WITH (analyzer='Chinese', case_sensitive='true')
2557)";
2558        let result3 = ParserContext::create_with_dialect(
2559            sql3,
2560            &GreptimeDbDialect {},
2561            ParseOptions::default(),
2562        )
2563        .unwrap();
2564
2565        if let Statement::CreateTable(c) = &result3[0] {
2566            c.columns.iter().for_each(|col| {
2567                if col.name().value == "msg1" {
2568                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2569                    assert_eq!(options.len(), 2);
2570                    assert_eq!(options.get("analyzer").unwrap(), "English");
2571                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2572                } else if col.name().value == "msg2" {
2573                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2574                    assert_eq!(options.len(), 2);
2575                    assert_eq!(options.get("analyzer").unwrap(), "Chinese");
2576                    assert_eq!(options.get("case_sensitive").unwrap(), "true");
2577                }
2578            });
2579        } else {
2580            panic!("should be create_table statement");
2581        }
2582    }
2583
2584    #[test]
2585    fn test_parse_create_table_fulltext_options_invalid_type() {
2586        let sql = r"
2587CREATE TABLE log (
2588    ts TIMESTAMP TIME INDEX,
2589    msg INT FULLTEXT INDEX,
2590)";
2591        let result =
2592            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2593        assert!(result.is_err());
2594        assert!(
2595            result
2596                .unwrap_err()
2597                .to_string()
2598                .contains("FULLTEXT index only supports string type")
2599        );
2600    }
2601
2602    #[test]
2603    fn test_parse_create_table_fulltext_options_duplicate() {
2604        let sql = r"
2605CREATE TABLE log (
2606    ts TIMESTAMP TIME INDEX,
2607    msg STRING FULLTEXT INDEX WITH (analyzer='English', analyzer='Chinese') FULLTEXT INDEX WITH (case_sensitive='false')
2608)";
2609        let result =
2610            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2611        assert!(result.is_err());
2612        assert!(
2613            result
2614                .unwrap_err()
2615                .to_string()
2616                .contains("duplicated FULLTEXT INDEX option")
2617        );
2618    }
2619
2620    #[test]
2621    fn test_parse_create_table_fulltext_options_invalid_option() {
2622        let sql = r"
2623CREATE TABLE log (
2624    ts TIMESTAMP TIME INDEX,
2625    msg STRING FULLTEXT INDEX WITH (analyzer='English', invalid_option='Chinese')
2626)";
2627        let result =
2628            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2629        assert!(result.is_err());
2630        assert!(
2631            result
2632                .unwrap_err()
2633                .to_string()
2634                .contains("invalid FULLTEXT INDEX option")
2635        );
2636    }
2637
2638    #[test]
2639    fn test_parse_create_table_skip_options() {
2640        let sql = r"
2641CREATE TABLE log (
2642    ts TIMESTAMP TIME INDEX,
2643    msg INT SKIPPING INDEX WITH (granularity='8192', type='bloom'),
2644)";
2645        let result =
2646            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2647                .unwrap();
2648
2649        if let Statement::CreateTable(c) = &result[0] {
2650            c.columns.iter().for_each(|col| {
2651                if col.name().value == "msg" {
2652                    assert!(
2653                        !col.extensions
2654                            .skipping_index_options
2655                            .as_ref()
2656                            .unwrap()
2657                            .is_empty()
2658                    );
2659                }
2660            });
2661        } else {
2662            panic!("should be create_table statement");
2663        }
2664
2665        let sql = r"
2666        CREATE TABLE log (
2667            ts TIMESTAMP TIME INDEX,
2668            msg INT SKIPPING INDEX,
2669        )";
2670        let result =
2671            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2672                .unwrap();
2673
2674        if let Statement::CreateTable(c) = &result[0] {
2675            c.columns.iter().for_each(|col| {
2676                if col.name().value == "msg" {
2677                    assert!(
2678                        col.extensions
2679                            .skipping_index_options
2680                            .as_ref()
2681                            .unwrap()
2682                            .is_empty()
2683                    );
2684                }
2685            });
2686        } else {
2687            panic!("should be create_table statement");
2688        }
2689    }
2690
2691    #[test]
2692    fn test_parse_create_view_with_columns() {
2693        let sql = "CREATE VIEW test () AS SELECT * FROM NUMBERS";
2694        let result =
2695            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2696                .unwrap();
2697
2698        match &result[0] {
2699            Statement::CreateView(c) => {
2700                assert_eq!(c.to_string(), "CREATE VIEW test AS SELECT * FROM NUMBERS");
2701                assert!(!c.or_replace);
2702                assert!(!c.if_not_exists);
2703                assert_eq!("test", c.name.to_string());
2704            }
2705            _ => unreachable!(),
2706        }
2707        assert_eq!(
2708            "CREATE VIEW test AS SELECT * FROM NUMBERS",
2709            result[0].to_string()
2710        );
2711
2712        let sql = "CREATE VIEW test (n1) AS SELECT * FROM NUMBERS";
2713        let result =
2714            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2715                .unwrap();
2716
2717        match &result[0] {
2718            Statement::CreateView(c) => {
2719                assert_eq!(c.to_string(), sql);
2720                assert!(!c.or_replace);
2721                assert!(!c.if_not_exists);
2722                assert_eq!("test", c.name.to_string());
2723            }
2724            _ => unreachable!(),
2725        }
2726        assert_eq!(sql, result[0].to_string());
2727
2728        let sql = "CREATE VIEW test (n1, n2) AS SELECT * FROM NUMBERS";
2729        let result =
2730            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2731                .unwrap();
2732
2733        match &result[0] {
2734            Statement::CreateView(c) => {
2735                assert_eq!(c.to_string(), sql);
2736                assert!(!c.or_replace);
2737                assert!(!c.if_not_exists);
2738                assert_eq!("test", c.name.to_string());
2739            }
2740            _ => unreachable!(),
2741        }
2742        assert_eq!(sql, result[0].to_string());
2743
2744        // Some invalid syntax cases
2745        let sql = "CREATE VIEW test (n1 AS select * from demo";
2746        let result =
2747            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2748        assert!(result.is_err());
2749
2750        let sql = "CREATE VIEW test (n1, AS select * from demo";
2751        let result =
2752            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2753        assert!(result.is_err());
2754
2755        let sql = "CREATE VIEW test n1,n2) AS select * from demo";
2756        let result =
2757            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2758        assert!(result.is_err());
2759
2760        let sql = "CREATE VIEW test (1) AS select * from demo";
2761        let result =
2762            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2763        assert!(result.is_err());
2764
2765        // keyword
2766        let sql = "CREATE VIEW test (n1, select) AS select * from demo";
2767        let result =
2768            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2769        assert!(result.is_err());
2770    }
2771
2772    #[test]
2773    fn test_parse_column_extensions_vector() {
2774        // Test that vector options are parsed from data_type (no additional SQL needed)
2775        let sql = "";
2776        let dialect = GenericDialect {};
2777        let mut tokenizer = Tokenizer::new(&dialect, sql);
2778        let tokens = tokenizer.tokenize().unwrap();
2779        let mut parser = Parser::new(&dialect).with_tokens(tokens);
2780        let name = Ident::new("vec_col");
2781        let data_type =
2782            DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
2783        let mut extensions = ColumnExtensions::default();
2784
2785        let result =
2786            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2787        assert!(result.is_ok());
2788        assert!(extensions.vector_options.is_some());
2789        let vector_options = extensions.vector_options.unwrap();
2790        assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some("128"));
2791    }
2792
2793    #[test]
2794    fn test_parse_column_extensions_vector_invalid() {
2795        // Test that vector with no dimension fails
2796        let sql = "";
2797        let dialect = GenericDialect {};
2798        let mut tokenizer = Tokenizer::new(&dialect, sql);
2799        let tokens = tokenizer.tokenize().unwrap();
2800        let mut parser = Parser::new(&dialect).with_tokens(tokens);
2801        let name = Ident::new("vec_col");
2802        let data_type = DataType::Custom(vec![Ident::new("VECTOR")].into(), vec![]);
2803        let mut extensions = ColumnExtensions::default();
2804
2805        let result =
2806            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2807        assert!(result.is_err());
2808    }
2809
2810    #[test]
2811    fn test_parse_column_extensions_indices() {
2812        // Test skipping index
2813        {
2814            let sql = "SKIPPING INDEX";
2815            let dialect = GenericDialect {};
2816            let mut tokenizer = Tokenizer::new(&dialect, sql);
2817            let tokens = tokenizer.tokenize().unwrap();
2818            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2819            let name = Ident::new("col");
2820            let data_type = DataType::String(None);
2821            let mut extensions = ColumnExtensions::default();
2822            let result = ParserContext::parse_column_extensions(
2823                &mut parser,
2824                &name,
2825                &data_type,
2826                &mut extensions,
2827            );
2828            assert!(result.is_ok());
2829            assert!(extensions.skipping_index_options.is_some());
2830        }
2831
2832        // Test fulltext index with options
2833        {
2834            let sql = "FULLTEXT INDEX WITH (analyzer = 'English', case_sensitive = 'true')";
2835            let dialect = GenericDialect {};
2836            let mut tokenizer = Tokenizer::new(&dialect, sql);
2837            let tokens = tokenizer.tokenize().unwrap();
2838            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2839            let name = Ident::new("text_col");
2840            let data_type = DataType::String(None);
2841            let mut extensions = ColumnExtensions::default();
2842            let result = ParserContext::parse_column_extensions(
2843                &mut parser,
2844                &name,
2845                &data_type,
2846                &mut extensions,
2847            );
2848            assert!(result.unwrap());
2849            assert!(extensions.fulltext_index_options.is_some());
2850            let fulltext_options = extensions.fulltext_index_options.unwrap();
2851            assert_eq!(fulltext_options.get("analyzer"), Some("English"));
2852            assert_eq!(fulltext_options.get("case_sensitive"), Some("true"));
2853        }
2854
2855        // Test fulltext index with invalid type (should fail)
2856        {
2857            let sql = "FULLTEXT INDEX WITH (analyzer = 'English')";
2858            let dialect = GenericDialect {};
2859            let mut tokenizer = Tokenizer::new(&dialect, sql);
2860            let tokens = tokenizer.tokenize().unwrap();
2861            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2862            let name = Ident::new("num_col");
2863            let data_type = DataType::Int(None); // Non-string type
2864            let mut extensions = ColumnExtensions::default();
2865            let result = ParserContext::parse_column_extensions(
2866                &mut parser,
2867                &name,
2868                &data_type,
2869                &mut extensions,
2870            );
2871            assert!(result.is_err());
2872            assert!(
2873                result
2874                    .unwrap_err()
2875                    .to_string()
2876                    .contains("FULLTEXT index only supports string type")
2877            );
2878        }
2879
2880        // Test fulltext index with invalid option (won't fail, the parser doesn't check the option's content)
2881        {
2882            let sql = "FULLTEXT INDEX WITH (analyzer = 'Invalid', case_sensitive = 'true')";
2883            let dialect = GenericDialect {};
2884            let mut tokenizer = Tokenizer::new(&dialect, sql);
2885            let tokens = tokenizer.tokenize().unwrap();
2886            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2887            let name = Ident::new("text_col");
2888            let data_type = DataType::String(None);
2889            let mut extensions = ColumnExtensions::default();
2890            let result = ParserContext::parse_column_extensions(
2891                &mut parser,
2892                &name,
2893                &data_type,
2894                &mut extensions,
2895            );
2896            assert!(result.unwrap());
2897        }
2898
2899        // Test inverted index
2900        {
2901            let sql = "INVERTED INDEX";
2902            let dialect = GenericDialect {};
2903            let mut tokenizer = Tokenizer::new(&dialect, sql);
2904            let tokens = tokenizer.tokenize().unwrap();
2905            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2906            let name = Ident::new("col");
2907            let data_type = DataType::String(None);
2908            let mut extensions = ColumnExtensions::default();
2909            let result = ParserContext::parse_column_extensions(
2910                &mut parser,
2911                &name,
2912                &data_type,
2913                &mut extensions,
2914            );
2915            assert!(result.is_ok());
2916            assert!(extensions.inverted_index_options.is_some());
2917        }
2918
2919        // Test inverted index with options (should fail)
2920        {
2921            let sql = "INVERTED INDEX WITH (analyzer = 'English')";
2922            let dialect = GenericDialect {};
2923            let mut tokenizer = Tokenizer::new(&dialect, sql);
2924            let tokens = tokenizer.tokenize().unwrap();
2925            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2926            let name = Ident::new("col");
2927            let data_type = DataType::String(None);
2928            let mut extensions = ColumnExtensions::default();
2929            let result = ParserContext::parse_column_extensions(
2930                &mut parser,
2931                &name,
2932                &data_type,
2933                &mut extensions,
2934            );
2935            assert!(result.is_err());
2936            assert!(
2937                result
2938                    .unwrap_err()
2939                    .to_string()
2940                    .contains("INVERTED index doesn't support options")
2941            );
2942        }
2943
2944        // Test multiple indices
2945        {
2946            let sql = "SKIPPING INDEX FULLTEXT INDEX";
2947            let dialect = GenericDialect {};
2948            let mut tokenizer = Tokenizer::new(&dialect, sql);
2949            let tokens = tokenizer.tokenize().unwrap();
2950            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2951            let name = Ident::new("col");
2952            let data_type = DataType::String(None);
2953            let mut extensions = ColumnExtensions::default();
2954            let result = ParserContext::parse_column_extensions(
2955                &mut parser,
2956                &name,
2957                &data_type,
2958                &mut extensions,
2959            );
2960            assert!(result.unwrap());
2961            assert!(extensions.skipping_index_options.is_some());
2962            assert!(extensions.fulltext_index_options.is_some());
2963        }
2964    }
2965
2966    #[test]
2967    fn test_parse_interval_cast() {
2968        let s = "select '10s'::INTERVAL";
2969        let stmts =
2970            ParserContext::create_with_dialect(s, &GreptimeDbDialect {}, ParseOptions::default())
2971                .unwrap();
2972        assert_eq!("SELECT '10 seconds'::INTERVAL", &stmts[0].to_string());
2973    }
2974
2975    #[test]
2976    fn test_parse_create_table_vector_index_options() {
2977        // Test basic vector index
2978        let sql = r"
2979CREATE TABLE vectors (
2980    ts TIMESTAMP TIME INDEX,
2981    vec VECTOR(128) VECTOR INDEX,
2982)";
2983        let result =
2984            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2985                .unwrap();
2986
2987        if let Statement::CreateTable(c) = &result[0] {
2988            c.columns.iter().for_each(|col| {
2989                if col.name().value == "vec" {
2990                    assert!(
2991                        col.extensions
2992                            .vector_index_options
2993                            .as_ref()
2994                            .unwrap()
2995                            .is_empty()
2996                    );
2997                }
2998            });
2999        } else {
3000            panic!("should be create_table statement");
3001        }
3002
3003        // Test vector index with options
3004        let sql = r"
3005CREATE TABLE vectors (
3006    ts TIMESTAMP TIME INDEX,
3007    vec VECTOR(128) VECTOR INDEX WITH (metric='cosine', connectivity='32', expansion_add='256', expansion_search='128')
3008)";
3009        let result =
3010            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3011                .unwrap();
3012
3013        if let Statement::CreateTable(c) = &result[0] {
3014            c.columns.iter().for_each(|col| {
3015                if col.name().value == "vec" {
3016                    let options = col.extensions.vector_index_options.as_ref().unwrap();
3017                    assert_eq!(options.len(), 4);
3018                    assert_eq!(options.get("metric").unwrap(), "cosine");
3019                    assert_eq!(options.get("connectivity").unwrap(), "32");
3020                    assert_eq!(options.get("expansion_add").unwrap(), "256");
3021                    assert_eq!(options.get("expansion_search").unwrap(), "128");
3022                }
3023            });
3024        } else {
3025            panic!("should be create_table statement");
3026        }
3027    }
3028
3029    #[test]
3030    fn test_parse_create_table_vector_index_invalid_type() {
3031        // Test vector index on non-vector type (should fail)
3032        let sql = r"
3033CREATE TABLE vectors (
3034    ts TIMESTAMP TIME INDEX,
3035    col INT VECTOR INDEX,
3036)";
3037        let result =
3038            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3039        assert!(result.is_err());
3040        assert!(
3041            result
3042                .unwrap_err()
3043                .to_string()
3044                .contains("VECTOR INDEX only supports Vector type columns")
3045        );
3046    }
3047
3048    #[test]
3049    fn test_parse_create_table_vector_index_duplicate() {
3050        // Test duplicate vector index (should fail)
3051        let sql = r"
3052CREATE TABLE vectors (
3053    ts TIMESTAMP TIME INDEX,
3054    vec VECTOR(128) VECTOR INDEX VECTOR INDEX,
3055)";
3056        let result =
3057            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3058        assert!(result.is_err());
3059        assert!(
3060            result
3061                .unwrap_err()
3062                .to_string()
3063                .contains("duplicated VECTOR INDEX option")
3064        );
3065    }
3066
3067    #[test]
3068    fn test_parse_create_table_vector_index_invalid_option() {
3069        // Test invalid option key (should fail)
3070        let sql = r"
3071CREATE TABLE vectors (
3072    ts TIMESTAMP TIME INDEX,
3073    vec VECTOR(128) VECTOR INDEX WITH (metric='l2sq', invalid_option='foo')
3074)";
3075        let result =
3076            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3077        assert!(result.is_err());
3078        assert!(
3079            result
3080                .unwrap_err()
3081                .to_string()
3082                .contains("invalid VECTOR INDEX option")
3083        );
3084    }
3085
3086    #[test]
3087    fn test_parse_column_extensions_vector_index() {
3088        // Test vector index on vector type
3089        {
3090            let sql = "VECTOR INDEX WITH (metric = 'l2sq')";
3091            let dialect = GenericDialect {};
3092            let mut tokenizer = Tokenizer::new(&dialect, sql);
3093            let tokens = tokenizer.tokenize().unwrap();
3094            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3095            let name = Ident::new("vec_col");
3096            let data_type =
3097                DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
3098            // First, parse the vector type to set vector_options
3099            let mut extensions = ColumnExtensions {
3100                vector_options: Some(OptionMap::from([(
3101                    VECTOR_OPT_DIM.to_string(),
3102                    "128".to_string(),
3103                )])),
3104                ..Default::default()
3105            };
3106
3107            let result = ParserContext::parse_column_extensions(
3108                &mut parser,
3109                &name,
3110                &data_type,
3111                &mut extensions,
3112            );
3113            assert!(result.is_ok());
3114            assert!(extensions.vector_index_options.is_some());
3115            let vi_options = extensions.vector_index_options.unwrap();
3116            assert_eq!(vi_options.get("metric"), Some("l2sq"));
3117        }
3118
3119        // Test vector index on non-vector type (should fail)
3120        {
3121            let sql = "VECTOR INDEX";
3122            let dialect = GenericDialect {};
3123            let mut tokenizer = Tokenizer::new(&dialect, sql);
3124            let tokens = tokenizer.tokenize().unwrap();
3125            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3126            let name = Ident::new("num_col");
3127            let data_type = DataType::Int(None); // Non-vector type
3128            let mut extensions = ColumnExtensions::default();
3129            let result = ParserContext::parse_column_extensions(
3130                &mut parser,
3131                &name,
3132                &data_type,
3133                &mut extensions,
3134            );
3135            assert!(result.is_err());
3136            assert!(
3137                result
3138                    .unwrap_err()
3139                    .to_string()
3140                    .contains("VECTOR INDEX only supports Vector type columns")
3141            );
3142        }
3143    }
3144}