Skip to main content

sql/parsers/
create_parser.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod json;
16#[cfg(feature = "enterprise")]
17pub mod trigger;
18
19use std::collections::HashMap;
20
21use arrow_buffer::IntervalMonthDayNano;
22use common_catalog::consts::default_engine;
23use datafusion_common::ScalarValue;
24use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
25use datatypes::data_type::ConcreteDataType;
26use itertools::Itertools;
27use snafu::{OptionExt, ResultExt, ensure};
28use sqlparser::ast::{
29    ColumnOption, ColumnOptionDef, DataType, Expr, KeyOrIndexDisplay, NullsDistinctOption,
30    PrimaryKeyConstraint, UniqueConstraint,
31};
32use sqlparser::dialect::keywords::Keyword;
33use sqlparser::keywords::ALL_KEYWORDS;
34use sqlparser::parser::IsOptional::Mandatory;
35use sqlparser::parser::{Parser, ParserError};
36use sqlparser::tokenizer::{Token, TokenWithSpan, Word};
37use table::requests::validate_database_option;
38
39use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
40use crate::error::{
41    self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidFlowQuerySnafu,
42    InvalidIntervalSnafu, InvalidSqlSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result,
43    SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu,
44};
45use crate::parser::{FLOW, ParserContext};
46use crate::parsers::tql_parser;
47use crate::parsers::utils::{
48    self, parse_with_options, validate_column_fulltext_create_option,
49    validate_column_skipping_index_create_option, validate_column_vector_index_create_option,
50};
51use crate::statements::create::{
52    Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
53    CreateTableLike, CreateView, Partitions, SqlOrTql, TableConstraint, VECTOR_OPT_DIM,
54};
55use crate::statements::statement::Statement;
56use crate::statements::transform::type_alias::get_data_type_by_alias_name;
57use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
58use crate::util::{OptionValue, location_to_index, parse_option_string};
59
60pub const ENGINE: &str = "ENGINE";
61pub const MAXVALUE: &str = "MAXVALUE";
62pub const SINK: &str = "SINK";
63pub const EXPIRE: &str = "EXPIRE";
64pub const AFTER: &str = "AFTER";
65pub const INVERTED: &str = "INVERTED";
66pub const SKIPPING: &str = "SKIPPING";
67pub const VECTOR: &str = "VECTOR";
68
69pub type RawIntervalExpr = String;
70
71// Preserve raw CREATE FLOW option entries until operator-side validation.
72// Do not use `OptionMap::new()` here: it can drop non-string values for
73// redacted keys before the flow option allowlist rejects them.
74fn flow_option_map(options: HashMap<String, OptionValue>) -> OptionMap {
75    let mut flow_options = OptionMap::default();
76    for (key, value) in options {
77        flow_options.insert_options(&key, value);
78    }
79    flow_options
80}
81
82/// Parses create [table] statement
83impl<'a> ParserContext<'a> {
84    pub(crate) fn parse_create(&mut self) -> Result<Statement> {
85        match self.parser.peek_token().token {
86            Token::Word(w) => match w.keyword {
87                Keyword::TABLE => self.parse_create_table(),
88
89                Keyword::SCHEMA | Keyword::DATABASE => self.parse_create_database(),
90
91                Keyword::EXTERNAL => self.parse_create_external_table(),
92
93                Keyword::OR => {
94                    let _ = self.parser.next_token();
95                    self.parser
96                        .expect_keyword(Keyword::REPLACE)
97                        .context(SyntaxSnafu)?;
98                    match self.parser.next_token().token {
99                        Token::Word(w) => match w.keyword {
100                            Keyword::VIEW => self.parse_create_view(true),
101                            Keyword::NoKeyword => {
102                                let uppercase = w.value.to_uppercase();
103                                match uppercase.as_str() {
104                                    FLOW => self.parse_create_flow(true),
105                                    _ => self.unsupported(w.to_string()),
106                                }
107                            }
108                            _ => self.unsupported(w.to_string()),
109                        },
110                        _ => self.unsupported(w.to_string()),
111                    }
112                }
113
114                Keyword::VIEW => {
115                    let _ = self.parser.next_token();
116                    self.parse_create_view(false)
117                }
118
119                #[cfg(feature = "enterprise")]
120                Keyword::TRIGGER => {
121                    let _ = self.parser.next_token();
122                    self.parse_create_trigger()
123                }
124
125                Keyword::NoKeyword => {
126                    let _ = self.parser.next_token();
127                    let uppercase = w.value.to_uppercase();
128                    match uppercase.as_str() {
129                        FLOW => self.parse_create_flow(false),
130                        _ => self.unsupported(w.to_string()),
131                    }
132                }
133                _ => self.unsupported(w.to_string()),
134            },
135            unexpected => self.unsupported(unexpected.to_string()),
136        }
137    }
138
139    /// Parse `CREAVE VIEW` statement.
140    fn parse_create_view(&mut self, or_replace: bool) -> Result<Statement> {
141        let if_not_exists = self.parse_if_not_exist()?;
142        let view_name = self.intern_parse_table_name()?;
143
144        let columns = self.parse_view_columns()?;
145
146        self.parser
147            .expect_keyword(Keyword::AS)
148            .context(SyntaxSnafu)?;
149
150        let query = self.parse_query()?;
151
152        Ok(Statement::CreateView(CreateView {
153            name: view_name,
154            columns,
155            or_replace,
156            query: Box::new(query),
157            if_not_exists,
158        }))
159    }
160
161    fn parse_view_columns(&mut self) -> Result<Vec<Ident>> {
162        let mut columns = vec![];
163        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
164            return Ok(columns);
165        }
166
167        loop {
168            let name = self.parse_column_name().context(SyntaxSnafu)?;
169
170            columns.push(name);
171
172            let comma = self.parser.consume_token(&Token::Comma);
173            if self.parser.consume_token(&Token::RParen) {
174                // allow a trailing comma, even though it's not in standard
175                break;
176            } else if !comma {
177                return self.expected("',' or ')' after column name", self.parser.peek_token());
178            }
179        }
180
181        Ok(columns)
182    }
183
184    fn parse_create_external_table(&mut self) -> Result<Statement> {
185        let _ = self.parser.next_token();
186        self.parser
187            .expect_keyword(Keyword::TABLE)
188            .context(SyntaxSnafu)?;
189        let if_not_exists = self.parse_if_not_exist()?;
190        let table_name = self.intern_parse_table_name()?;
191        let (columns, constraints) = self.parse_columns()?;
192        if !columns.is_empty() {
193            validate_time_index(&columns, &constraints)?;
194        }
195
196        let engine = self.parse_table_engine(common_catalog::consts::FILE_ENGINE)?;
197        let options = self.parse_create_table_options()?;
198        Ok(Statement::CreateExternalTable(CreateExternalTable {
199            name: table_name,
200            columns,
201            constraints,
202            options,
203            if_not_exists,
204            engine,
205        }))
206    }
207
208    fn parse_create_database(&mut self) -> Result<Statement> {
209        let _ = self.parser.next_token();
210        let if_not_exists = self.parse_if_not_exist()?;
211        let database_name = self.parse_object_name().context(error::UnexpectedSnafu {
212            expected: "a database name",
213            actual: self.peek_token_as_string(),
214        })?;
215        let database_name = Self::canonicalize_object_name(database_name)?;
216
217        let options = self
218            .parser
219            .parse_options(Keyword::WITH)
220            .context(SyntaxSnafu)?
221            .into_iter()
222            .map(parse_option_string)
223            .collect::<Result<HashMap<String, OptionValue>>>()?;
224
225        for key in options.keys() {
226            ensure!(
227                validate_database_option(key),
228                InvalidDatabaseOptionSnafu { key: key.clone() }
229            );
230        }
231        if let Some(append_mode) = options.get("append_mode").and_then(|x| x.as_string())
232            && append_mode == "true"
233            && options.contains_key("merge_mode")
234        {
235            return InvalidDatabaseOptionSnafu {
236                key: "merge_mode".to_string(),
237            }
238            .fail();
239        }
240
241        Ok(Statement::CreateDatabase(CreateDatabase {
242            name: database_name,
243            if_not_exists,
244            options: OptionMap::new(options),
245        }))
246    }
247
248    fn parse_create_table(&mut self) -> Result<Statement> {
249        let _ = self.parser.next_token();
250
251        let if_not_exists = self.parse_if_not_exist()?;
252
253        let table_name = self.intern_parse_table_name()?;
254
255        if self.parser.parse_keyword(Keyword::LIKE) {
256            let source_name = self.intern_parse_table_name()?;
257
258            return Ok(Statement::CreateTableLike(CreateTableLike {
259                table_name,
260                source_name,
261            }));
262        }
263
264        let (columns, constraints) = self.parse_columns()?;
265        validate_time_index(&columns, &constraints)?;
266
267        let partitions = self.parse_partitions()?;
268        if let Some(partitions) = &partitions {
269            validate_partitions(&columns, partitions)?;
270        }
271
272        let engine = self.parse_table_engine(default_engine())?;
273        let options = self.parse_create_table_options()?;
274        let create_table = CreateTable {
275            if_not_exists,
276            name: table_name,
277            columns,
278            engine,
279            constraints,
280            options,
281            table_id: 0, // table id is assigned by catalog manager
282            partitions,
283        };
284
285        Ok(Statement::CreateTable(create_table))
286    }
287
288    /// "CREATE FLOW" clause
289    fn parse_create_flow(&mut self, or_replace: bool) -> Result<Statement> {
290        let if_not_exists = self.parse_if_not_exist()?;
291
292        let flow_name = self.intern_parse_table_name()?;
293
294        // make `SINK` case in-sensitive
295        if let Token::Word(word) = self.parser.peek_token().token
296            && word.value.eq_ignore_ascii_case(SINK)
297        {
298            self.parser.next_token();
299        } else {
300            Err(ParserError::ParserError(
301                "Expect `SINK` keyword".to_string(),
302            ))
303            .context(SyntaxSnafu)?
304        }
305        self.parser
306            .expect_keyword(Keyword::TO)
307            .context(SyntaxSnafu)?;
308
309        let output_table_name = self.intern_parse_table_name()?;
310
311        let expire_after = if let Token::Word(w1) = &self.parser.peek_token().token
312            && w1.value.eq_ignore_ascii_case(EXPIRE)
313        {
314            self.parser.next_token();
315            if let Token::Word(w2) = &self.parser.peek_token().token
316                && w2.value.eq_ignore_ascii_case(AFTER)
317            {
318                self.parser.next_token();
319                Some(self.parse_interval_no_month("EXPIRE AFTER")?)
320            } else {
321                None
322            }
323        } else {
324            None
325        };
326
327        let eval_interval = if self
328            .parser
329            .consume_tokens(&[Token::make_keyword("EVAL"), Token::make_keyword("INTERVAL")])
330        {
331            Some(self.parse_interval_no_month("EVAL INTERVAL")?)
332        } else {
333            None
334        };
335
336        let comment = if self.parser.parse_keyword(Keyword::COMMENT) {
337            match self.parser.next_token() {
338                TokenWithSpan {
339                    token: Token::SingleQuotedString(value, ..),
340                    ..
341                } => Some(value),
342                unexpected => {
343                    return self
344                        .parser
345                        .expected("string", unexpected)
346                        .context(SyntaxSnafu);
347                }
348            }
349        } else {
350            None
351        };
352
353        let flow_options = self
354            .parser
355            .parse_options(Keyword::WITH)
356            .context(SyntaxSnafu)?
357            .into_iter()
358            .map(parse_option_string)
359            .collect::<Result<HashMap<String, OptionValue>>>()?;
360
361        self.parser
362            .expect_keyword(Keyword::AS)
363            .context(SyntaxSnafu)?;
364
365        let query = Box::new(self.parse_flow_sql_or_tql(true)?);
366
367        Ok(Statement::CreateFlow(CreateFlow {
368            flow_name,
369            sink_table_name: output_table_name,
370            or_replace,
371            if_not_exists,
372            expire_after,
373            eval_interval,
374            comment,
375            flow_options: flow_option_map(flow_options),
376            query,
377        }))
378    }
379
380    fn parse_flow_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
381        let start_loc = self.parser.peek_token().span.start;
382        let start_index = location_to_index(self.sql, &start_loc);
383
384        let starts_with_with = matches!(
385            self.parser.peek_token().token,
386            Token::Word(w) if w.keyword == Keyword::WITH
387        );
388
389        // only accept sql or tql
390        let query = match self.parser.peek_token().token {
391            Token::Word(w) => match w.keyword {
392                Keyword::SELECT => self.parse_query(),
393                Keyword::WITH => self.parse_with_tql_with_now(require_now_expr),
394                Keyword::NoKeyword
395                    if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
396                {
397                    self.parse_tql(require_now_expr)
398                }
399
400                _ => self.unsupported(self.peek_token_as_string()),
401            },
402            _ => self.unsupported(self.peek_token_as_string()),
403        }?;
404
405        if starts_with_with {
406            let Statement::Query(query) = &query else {
407                return InvalidFlowQuerySnafu {
408                    reason: "Expect a query after WITH".to_string(),
409                }
410                .fail();
411            };
412
413            if !utils::is_simple_tql_cte_query(query) {
414                return InvalidFlowQuerySnafu {
415                    reason: "WITH is only supported for the simplest TQL CTE in CREATE FLOW"
416                        .to_string(),
417                }
418                .fail();
419            }
420        }
421
422        let end_token = self.parser.peek_token();
423
424        let raw_query = if end_token == Token::EOF {
425            &self.sql[start_index..]
426        } else {
427            let end_loc = end_token.span.end;
428            let end_index = location_to_index(self.sql, &end_loc);
429            &self.sql[start_index..end_index.min(self.sql.len())]
430        };
431        let raw_query = raw_query.trim_end_matches(";");
432
433        let query = SqlOrTql::try_from_statement(query, raw_query)?;
434        Ok(query)
435    }
436
437    /// Parse the interval expr to duration in seconds.
438    fn parse_interval_no_month(&mut self, context: &str) -> Result<i64> {
439        let interval = self.parse_interval_month_day_nano()?.0;
440        if interval.months != 0 {
441            return InvalidIntervalSnafu {
442                reason: format!("Interval with months is not allowed in {context}"),
443            }
444            .fail();
445        }
446        Ok(
447            interval.nanoseconds / 1_000_000_000
448                + interval.days as i64 * 60 * 60 * 24
449                + interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, // 1 month=365.25/12=30.44 days
450                                                                       // this is to keep the same as https://docs.rs/humantime/latest/humantime/fn.parse_duration.html
451                                                                       // which we use in database to parse i.e. ttl interval and many other intervals
452        )
453    }
454
455    /// Parse interval expr to [`IntervalMonthDayNano`].
456    fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> {
457        let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
458        let raw_interval_expr = interval_expr.to_string();
459        let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone(), false)?
460            .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
461            .ok()
462            .with_context(|| InvalidIntervalSnafu {
463                reason: format!("cannot cast {} to interval type", interval_expr),
464            })?;
465        if let ScalarValue::IntervalMonthDayNano(Some(interval)) = interval {
466            Ok((interval, raw_interval_expr))
467        } else {
468            unreachable!()
469        }
470    }
471
472    fn parse_if_not_exist(&mut self) -> Result<bool> {
473        match self.parser.peek_token().token {
474            Token::Word(w) if Keyword::IF != w.keyword => return Ok(false),
475            _ => {}
476        }
477
478        if self.parser.parse_keywords(&[Keyword::IF, Keyword::NOT]) {
479            return self
480                .parser
481                .expect_keyword(Keyword::EXISTS)
482                .map(|_| true)
483                .context(UnexpectedSnafu {
484                    expected: "EXISTS",
485                    actual: self.peek_token_as_string(),
486                });
487        }
488
489        if self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]) {
490            return UnsupportedSnafu { keyword: "EXISTS" }.fail();
491        }
492
493        Ok(false)
494    }
495
496    fn parse_create_table_options(&mut self) -> Result<OptionMap> {
497        parse_with_options(&mut self.parser)
498    }
499
500    /// "PARTITION ON COLUMNS (...)" clause
501    fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
502        if !self.parser.parse_keyword(Keyword::PARTITION) {
503            return Ok(None);
504        }
505        self.parser
506            .expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
507            .context(error::UnexpectedSnafu {
508                expected: "ON, COLUMNS",
509                actual: self.peek_token_as_string(),
510            })?;
511
512        let raw_column_list = self
513            .parser
514            .parse_parenthesized_column_list(Mandatory, false)
515            .context(error::SyntaxSnafu)?;
516        let column_list = raw_column_list
517            .into_iter()
518            .map(Self::canonicalize_identifier)
519            .collect();
520
521        let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
522
523        Ok(Some(Partitions { column_list, exprs }))
524    }
525
526    fn parse_partition_entry(&mut self) -> Result<Expr> {
527        self.parser.parse_expr().context(error::SyntaxSnafu)
528    }
529
530    /// Parse a comma-separated list wrapped by "()", and of which all items accepted by `F`
531    fn parse_comma_separated<T, F>(&mut self, mut f: F) -> Result<Vec<T>>
532    where
533        F: FnMut(&mut ParserContext<'a>) -> Result<T>,
534    {
535        self.parser
536            .expect_token(&Token::LParen)
537            .context(error::UnexpectedSnafu {
538                expected: "(",
539                actual: self.peek_token_as_string(),
540            })?;
541
542        let mut values = vec![];
543        while self.parser.peek_token() != Token::RParen {
544            values.push(f(self)?);
545            if !self.parser.consume_token(&Token::Comma) {
546                break;
547            }
548        }
549
550        self.parser
551            .expect_token(&Token::RParen)
552            .context(error::UnexpectedSnafu {
553                expected: ")",
554                actual: self.peek_token_as_string(),
555            })?;
556
557        Ok(values)
558    }
559
560    /// Parse the columns and constraints.
561    fn parse_columns(&mut self) -> Result<(Vec<Column>, Vec<TableConstraint>)> {
562        let mut columns = vec![];
563        let mut constraints = vec![];
564        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
565            return Ok((columns, constraints));
566        }
567
568        loop {
569            if let Some(constraint) = self.parse_optional_table_constraint()? {
570                constraints.push(constraint);
571            } else if let Token::Word(_) = self.parser.peek_token().token {
572                self.parse_column(&mut columns, &mut constraints)?;
573            } else {
574                return self.expected(
575                    "column name or constraint definition",
576                    self.parser.peek_token(),
577                );
578            }
579            let comma = self.parser.consume_token(&Token::Comma);
580            if self.parser.consume_token(&Token::RParen) {
581                // allow a trailing comma, even though it's not in standard
582                break;
583            } else if !comma {
584                return self.expected(
585                    "',' or ')' after column definition",
586                    self.parser.peek_token(),
587                );
588            }
589        }
590
591        Ok((columns, constraints))
592    }
593
594    fn parse_column(
595        &mut self,
596        columns: &mut Vec<Column>,
597        constraints: &mut Vec<TableConstraint>,
598    ) -> Result<()> {
599        let mut column = self.parse_column_def()?;
600
601        let mut time_index_opt_idx = None;
602        for (index, opt) in column.options().iter().enumerate() {
603            if let ColumnOption::DialectSpecific(tokens) = &opt.option
604                && matches!(
605                    &tokens[..],
606                    [
607                        Token::Word(Word {
608                            keyword: Keyword::TIME,
609                            ..
610                        }),
611                        Token::Word(Word {
612                            keyword: Keyword::INDEX,
613                            ..
614                        })
615                    ]
616                )
617            {
618                ensure!(
619                    time_index_opt_idx.is_none(),
620                    InvalidColumnOptionSnafu {
621                        name: column.name().to_string(),
622                        msg: "duplicated time index",
623                    }
624                );
625                time_index_opt_idx = Some(index);
626
627                let constraint = TableConstraint::TimeIndex {
628                    column: Ident::new(column.name().value.clone()),
629                };
630                constraints.push(constraint);
631            }
632        }
633
634        if let Some(index) = time_index_opt_idx {
635            ensure!(
636                !column.options().contains(&ColumnOptionDef {
637                    option: ColumnOption::Null,
638                    name: None,
639                }),
640                InvalidColumnOptionSnafu {
641                    name: column.name().to_string(),
642                    msg: "time index column can't be null",
643                }
644            );
645
646            // The timestamp type may be an alias type, we have to retrieve the actual type.
647            let data_type = get_unalias_type(column.data_type());
648            ensure!(
649                matches!(data_type, DataType::Timestamp(_, _)),
650                InvalidColumnOptionSnafu {
651                    name: column.name().to_string(),
652                    msg: "time index column data type should be timestamp",
653                }
654            );
655
656            let not_null_opt = ColumnOptionDef {
657                option: ColumnOption::NotNull,
658                name: None,
659            };
660
661            if !column.options().contains(&not_null_opt) {
662                column.mut_options().push(not_null_opt);
663            }
664
665            let _ = column.mut_options().remove(index);
666        }
667
668        columns.push(column);
669
670        Ok(())
671    }
672
673    /// Parse the column name and check if it's valid.
674    fn parse_column_name(&mut self) -> std::result::Result<Ident, ParserError> {
675        let name = self.parser.parse_identifier()?;
676        if name.quote_style.is_none() &&
677        // "ALL_KEYWORDS" are sorted.
678            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()
679        {
680            return Err(ParserError::ParserError(format!(
681                "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
682                &name.value
683            )));
684        }
685
686        Ok(name)
687    }
688
689    pub fn parse_column_def(&mut self) -> Result<Column> {
690        let name = self.parse_column_name().context(SyntaxSnafu)?;
691        let parser = &mut self.parser;
692
693        ensure!(
694            !(name.quote_style.is_none() &&
695            // "ALL_KEYWORDS" are sorted.
696            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()),
697            InvalidSqlSnafu {
698                msg: format!(
699                    "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
700                    &name.value
701                ),
702            }
703        );
704
705        let mut extensions = ColumnExtensions::default();
706
707        let data_type = parser.parse_data_type().context(SyntaxSnafu)?;
708        // Must immediately parse the JSON datatype format because it is closely after the "JSON"
709        // datatype, like this: "JSON(format = ...)".
710        if matches!(data_type, DataType::JSON) {
711            extensions.json_datatype_options = json::parse_json_datatype_options(parser)?;
712        }
713
714        let mut options = vec![];
715        loop {
716            if parser.parse_keyword(Keyword::CONSTRAINT) {
717                let name = Some(parser.parse_identifier().context(SyntaxSnafu)?);
718                if let Some(option) = Self::parse_optional_column_option(parser)? {
719                    options.push(ColumnOptionDef { name, option });
720                } else {
721                    return parser
722                        .expected(
723                            "constraint details after CONSTRAINT <name>",
724                            parser.peek_token(),
725                        )
726                        .context(SyntaxSnafu);
727                }
728            } else if let Some(option) = Self::parse_optional_column_option(parser)? {
729                options.push(ColumnOptionDef { name: None, option });
730            } else if !Self::parse_column_extensions(parser, &name, &data_type, &mut extensions)? {
731                break;
732            };
733        }
734
735        Ok(Column {
736            column_def: ColumnDef {
737                name: Self::canonicalize_identifier(name),
738                data_type,
739                options,
740            },
741            extensions,
742        })
743    }
744
745    fn parse_optional_column_option(parser: &mut Parser<'_>) -> Result<Option<ColumnOption>> {
746        if parser.parse_keywords(&[Keyword::CHARACTER, Keyword::SET]) {
747            Ok(Some(ColumnOption::CharacterSet(
748                parser.parse_object_name(false).context(SyntaxSnafu)?,
749            )))
750        } else if parser.parse_keywords(&[Keyword::NOT, Keyword::NULL]) {
751            Ok(Some(ColumnOption::NotNull))
752        } else if parser.parse_keywords(&[Keyword::COMMENT]) {
753            match parser.next_token() {
754                TokenWithSpan {
755                    token: Token::SingleQuotedString(value, ..),
756                    ..
757                } => Ok(Some(ColumnOption::Comment(value))),
758                unexpected => parser.expected("string", unexpected).context(SyntaxSnafu),
759            }
760        } else if parser.parse_keyword(Keyword::NULL) {
761            Ok(Some(ColumnOption::Null))
762        } else if parser.parse_keyword(Keyword::DEFAULT) {
763            Ok(Some(ColumnOption::Default(
764                parser.parse_expr().context(SyntaxSnafu)?,
765            )))
766        } else if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
767            Ok(Some(ColumnOption::PrimaryKey(PrimaryKeyConstraint {
768                name: None,
769                index_name: None,
770                index_type: None,
771                columns: vec![],
772                index_options: vec![],
773                characteristics: None,
774            })))
775        } else if parser.parse_keyword(Keyword::UNIQUE) {
776            Ok(Some(ColumnOption::Unique(UniqueConstraint {
777                name: None,
778                index_name: None,
779                index_type_display: KeyOrIndexDisplay::None,
780                index_type: None,
781                columns: vec![],
782                index_options: vec![],
783                characteristics: None,
784                nulls_distinct: NullsDistinctOption::None,
785            })))
786        } else if parser.parse_keywords(&[Keyword::TIME, Keyword::INDEX]) {
787            // Use a DialectSpecific option for time index
788            Ok(Some(ColumnOption::DialectSpecific(vec![
789                Token::Word(Word {
790                    value: "TIME".to_string(),
791                    quote_style: None,
792                    keyword: Keyword::TIME,
793                }),
794                Token::Word(Word {
795                    value: "INDEX".to_string(),
796                    quote_style: None,
797                    keyword: Keyword::INDEX,
798                }),
799            ])))
800        } else {
801            Ok(None)
802        }
803    }
804
805    /// Parse a column option extensions.
806    ///
807    /// This function will handle:
808    /// - Vector type
809    /// - Indexes
810    fn parse_column_extensions(
811        parser: &mut Parser<'_>,
812        column_name: &Ident,
813        column_type: &DataType,
814        column_extensions: &mut ColumnExtensions,
815    ) -> Result<bool> {
816        if let DataType::Custom(name, tokens) = column_type
817            && name.0.len() == 1
818            && &name.0[0].to_string_unquoted().to_uppercase() == "VECTOR"
819        {
820            ensure!(
821                tokens.len() == 1,
822                InvalidColumnOptionSnafu {
823                    name: column_name.to_string(),
824                    msg: "VECTOR type should have dimension",
825                }
826            );
827
828            let dimension =
829                tokens[0]
830                    .parse::<u32>()
831                    .ok()
832                    .with_context(|| InvalidColumnOptionSnafu {
833                        name: column_name.to_string(),
834                        msg: "dimension should be a positive integer",
835                    })?;
836
837            let options = OptionMap::from([(VECTOR_OPT_DIM.to_string(), dimension.to_string())]);
838            column_extensions.vector_options = Some(options);
839        }
840
841        // parse index options in column definition
842        let mut is_index_declared = false;
843
844        // skipping index
845        if let Token::Word(word) = parser.peek_token().token
846            && word.value.eq_ignore_ascii_case(SKIPPING)
847        {
848            parser.next_token();
849            // Consume `INDEX` keyword
850            ensure!(
851                parser.parse_keyword(Keyword::INDEX),
852                InvalidColumnOptionSnafu {
853                    name: column_name.to_string(),
854                    msg: "expect INDEX after SKIPPING keyword",
855                }
856            );
857            ensure!(
858                column_extensions.skipping_index_options.is_none(),
859                InvalidColumnOptionSnafu {
860                    name: column_name.to_string(),
861                    msg: "duplicated SKIPPING index option",
862                }
863            );
864
865            let options = parser
866                .parse_options(Keyword::WITH)
867                .context(error::SyntaxSnafu)?
868                .into_iter()
869                .map(parse_option_string)
870                .collect::<Result<Vec<_>>>()?;
871
872            for (key, _) in options.iter() {
873                ensure!(
874                    validate_column_skipping_index_create_option(key),
875                    InvalidColumnOptionSnafu {
876                        name: column_name.to_string(),
877                        msg: format!("invalid SKIPPING INDEX option: {key}"),
878                    }
879                );
880            }
881
882            let options = OptionMap::new(options);
883            column_extensions.skipping_index_options = Some(options);
884            is_index_declared |= true;
885        }
886
887        // fulltext index
888        if parser.parse_keyword(Keyword::FULLTEXT) {
889            // Consume `INDEX` keyword
890            ensure!(
891                parser.parse_keyword(Keyword::INDEX),
892                InvalidColumnOptionSnafu {
893                    name: column_name.to_string(),
894                    msg: "expect INDEX after FULLTEXT keyword",
895                }
896            );
897
898            ensure!(
899                column_extensions.fulltext_index_options.is_none(),
900                InvalidColumnOptionSnafu {
901                    name: column_name.to_string(),
902                    msg: "duplicated FULLTEXT INDEX option",
903                }
904            );
905
906            let column_type = get_unalias_type(column_type);
907            let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
908            ensure!(
909                data_type == ConcreteDataType::string_datatype(),
910                InvalidColumnOptionSnafu {
911                    name: column_name.to_string(),
912                    msg: "FULLTEXT index only supports string type",
913                }
914            );
915
916            let options = parser
917                .parse_options(Keyword::WITH)
918                .context(error::SyntaxSnafu)?
919                .into_iter()
920                .map(parse_option_string)
921                .collect::<Result<Vec<_>>>()?;
922
923            for (key, _) in options.iter() {
924                ensure!(
925                    validate_column_fulltext_create_option(key),
926                    InvalidColumnOptionSnafu {
927                        name: column_name.to_string(),
928                        msg: format!("invalid FULLTEXT INDEX option: {key}"),
929                    }
930                );
931            }
932
933            let options = OptionMap::new(options);
934            column_extensions.fulltext_index_options = Some(options);
935            is_index_declared |= true;
936        }
937
938        // inverted index
939        if let Token::Word(word) = parser.peek_token().token
940            && word.value.eq_ignore_ascii_case(INVERTED)
941        {
942            parser.next_token();
943            // Consume `INDEX` keyword
944            ensure!(
945                parser.parse_keyword(Keyword::INDEX),
946                InvalidColumnOptionSnafu {
947                    name: column_name.to_string(),
948                    msg: "expect INDEX after INVERTED keyword",
949                }
950            );
951
952            ensure!(
953                column_extensions.inverted_index_options.is_none(),
954                InvalidColumnOptionSnafu {
955                    name: column_name.to_string(),
956                    msg: "duplicated INVERTED index option",
957                }
958            );
959
960            // inverted index doesn't have options, skipping `WITH`
961            // try cache `WITH` and throw error
962            let with_token = parser.peek_token();
963            ensure!(
964                with_token.token
965                    != Token::Word(Word {
966                        value: "WITH".to_string(),
967                        keyword: Keyword::WITH,
968                        quote_style: None,
969                    }),
970                InvalidColumnOptionSnafu {
971                    name: column_name.to_string(),
972                    msg: "INVERTED index doesn't support options",
973                }
974            );
975
976            column_extensions.inverted_index_options = Some(OptionMap::default());
977            is_index_declared |= true;
978        }
979
980        // vector index
981        if let Token::Word(word) = parser.peek_token().token
982            && word.value.eq_ignore_ascii_case(VECTOR)
983        {
984            parser.next_token();
985            // Consume `INDEX` keyword
986            ensure!(
987                parser.parse_keyword(Keyword::INDEX),
988                InvalidColumnOptionSnafu {
989                    name: column_name.to_string(),
990                    msg: "expect INDEX after VECTOR keyword",
991                }
992            );
993
994            ensure!(
995                column_extensions.vector_index_options.is_none(),
996                InvalidColumnOptionSnafu {
997                    name: column_name.to_string(),
998                    msg: "duplicated VECTOR INDEX option",
999                }
1000            );
1001
1002            // Check that column is a vector type
1003            let column_type = get_unalias_type(column_type);
1004            let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
1005            ensure!(
1006                matches!(data_type, ConcreteDataType::Vector(_)),
1007                InvalidColumnOptionSnafu {
1008                    name: column_name.to_string(),
1009                    msg: "VECTOR INDEX only supports Vector type columns",
1010                }
1011            );
1012
1013            let options = parser
1014                .parse_options(Keyword::WITH)
1015                .context(error::SyntaxSnafu)?
1016                .into_iter()
1017                .map(parse_option_string)
1018                .collect::<Result<Vec<_>>>()?;
1019
1020            for (key, _) in options.iter() {
1021                ensure!(
1022                    validate_column_vector_index_create_option(key),
1023                    InvalidColumnOptionSnafu {
1024                        name: column_name.to_string(),
1025                        msg: format!("invalid VECTOR INDEX option: {key}"),
1026                    }
1027                );
1028            }
1029
1030            let options = OptionMap::new(options);
1031            column_extensions.vector_index_options = Some(options);
1032            is_index_declared |= true;
1033        }
1034
1035        Ok(is_index_declared)
1036    }
1037
1038    fn parse_optional_table_constraint(&mut self) -> Result<Option<TableConstraint>> {
1039        match self.parser.next_token() {
1040            TokenWithSpan {
1041                token: Token::Word(w),
1042                ..
1043            } if w.keyword == Keyword::PRIMARY => {
1044                self.parser
1045                    .expect_keyword(Keyword::KEY)
1046                    .context(error::UnexpectedSnafu {
1047                        expected: "KEY",
1048                        actual: self.peek_token_as_string(),
1049                    })?;
1050                let raw_columns = self
1051                    .parser
1052                    .parse_parenthesized_column_list(Mandatory, false)
1053                    .context(error::SyntaxSnafu)?;
1054                let columns = raw_columns
1055                    .into_iter()
1056                    .map(Self::canonicalize_identifier)
1057                    .collect();
1058                Ok(Some(TableConstraint::PrimaryKey { columns }))
1059            }
1060            TokenWithSpan {
1061                token: Token::Word(w),
1062                ..
1063            } if w.keyword == Keyword::TIME => {
1064                self.parser
1065                    .expect_keyword(Keyword::INDEX)
1066                    .context(error::UnexpectedSnafu {
1067                        expected: "INDEX",
1068                        actual: self.peek_token_as_string(),
1069                    })?;
1070
1071                let raw_columns = self
1072                    .parser
1073                    .parse_parenthesized_column_list(Mandatory, false)
1074                    .context(error::SyntaxSnafu)?;
1075                let mut columns = raw_columns
1076                    .into_iter()
1077                    .map(Self::canonicalize_identifier)
1078                    .collect::<Vec<_>>();
1079
1080                ensure!(
1081                    columns.len() == 1,
1082                    InvalidTimeIndexSnafu {
1083                        msg: "it should contain only one column in time index",
1084                    }
1085                );
1086
1087                Ok(Some(TableConstraint::TimeIndex {
1088                    column: columns.pop().unwrap(),
1089                }))
1090            }
1091            _ => {
1092                self.parser.prev_token();
1093                Ok(None)
1094            }
1095        }
1096    }
1097
1098    /// Parses the set of valid formats
1099    fn parse_table_engine(&mut self, default: &str) -> Result<String> {
1100        if !self.consume_token(ENGINE) {
1101            return Ok(default.to_string());
1102        }
1103
1104        self.parser
1105            .expect_token(&Token::Eq)
1106            .context(error::UnexpectedSnafu {
1107                expected: "=",
1108                actual: self.peek_token_as_string(),
1109            })?;
1110
1111        let token = self.parser.next_token();
1112        if let Token::Word(w) = token.token {
1113            Ok(w.value)
1114        } else {
1115            self.expected("'Engine' is missing", token)
1116        }
1117    }
1118}
1119
1120fn validate_time_index(columns: &[Column], constraints: &[TableConstraint]) -> Result<()> {
1121    let time_index_constraints: Vec<_> = constraints
1122        .iter()
1123        .filter_map(|c| match c {
1124            TableConstraint::TimeIndex { column } => Some(column),
1125            _ => None,
1126        })
1127        .unique()
1128        .collect();
1129
1130    ensure!(!time_index_constraints.is_empty(), MissingTimeIndexSnafu);
1131    ensure!(
1132        time_index_constraints.len() == 1,
1133        InvalidTimeIndexSnafu {
1134            msg: format!(
1135                "expected only one time index constraint but actual {}",
1136                time_index_constraints.len()
1137            ),
1138        }
1139    );
1140
1141    // It's safe to use time_index_constraints[0][0],
1142    // we already check the bound above.
1143    let time_index_column_ident = &time_index_constraints[0];
1144    let time_index_column = columns
1145        .iter()
1146        .find(|c| c.name().value == *time_index_column_ident.value)
1147        .with_context(|| InvalidTimeIndexSnafu {
1148            msg: format!(
1149                "time index column {} not found in columns",
1150                time_index_column_ident
1151            ),
1152        })?;
1153
1154    let time_index_data_type = get_unalias_type(time_index_column.data_type());
1155    ensure!(
1156        matches!(time_index_data_type, DataType::Timestamp(_, _)),
1157        InvalidColumnOptionSnafu {
1158            name: time_index_column.name().to_string(),
1159            msg: "time index column data type should be timestamp",
1160        }
1161    );
1162
1163    Ok(())
1164}
1165
1166fn get_unalias_type(data_type: &DataType) -> DataType {
1167    match data_type {
1168        DataType::Custom(name, tokens) if name.0.len() == 1 && tokens.is_empty() => {
1169            if let Some(real_type) =
1170                get_data_type_by_alias_name(name.0[0].to_string_unquoted().as_str())
1171            {
1172                real_type
1173            } else {
1174                data_type.clone()
1175            }
1176        }
1177        _ => data_type.clone(),
1178    }
1179}
1180
1181fn validate_partitions(columns: &[Column], partitions: &Partitions) -> Result<()> {
1182    let partition_columns = ensure_partition_columns_defined(columns, partitions)?;
1183
1184    ensure_exprs_are_binary(&partitions.exprs, &partition_columns)?;
1185
1186    Ok(())
1187}
1188
1189/// Ensure all exprs are binary expr and all the columns are defined in the column list.
1190fn ensure_exprs_are_binary(exprs: &[Expr], columns: &[&Column]) -> Result<()> {
1191    for expr in exprs {
1192        // The first level must be binary expr
1193        if let Expr::BinaryOp { left, op: _, right } = expr {
1194            ensure_one_expr(left, columns)?;
1195            ensure_one_expr(right, columns)?;
1196        } else {
1197            return error::InvalidSqlSnafu {
1198                msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1199            }
1200            .fail();
1201        }
1202    }
1203    Ok(())
1204}
1205
1206/// Check if the expr is a binary expr, an ident or a literal value.
1207/// If is ident, then check it is in the column list.
1208/// This recursive function is intended to be used by [ensure_exprs_are_binary].
1209fn ensure_one_expr(expr: &Expr, columns: &[&Column]) -> Result<()> {
1210    match expr {
1211        Expr::BinaryOp { left, op: _, right } => {
1212            ensure_one_expr(left, columns)?;
1213            ensure_one_expr(right, columns)?;
1214            Ok(())
1215        }
1216        Expr::Identifier(ident) => {
1217            let column_name = &ident.value;
1218            ensure!(
1219                columns.iter().any(|c| &c.name().value == column_name),
1220                error::InvalidSqlSnafu {
1221                    msg: format!(
1222                        "Column {:?} in rule expr is not referenced in PARTITION ON",
1223                        column_name
1224                    ),
1225                }
1226            );
1227            Ok(())
1228        }
1229        Expr::Value(_) => Ok(()),
1230        Expr::UnaryOp { expr, .. } => {
1231            ensure_one_expr(expr, columns)?;
1232            Ok(())
1233        }
1234        _ => error::InvalidSqlSnafu {
1235            msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1236        }
1237        .fail(),
1238    }
1239}
1240
1241/// Ensure that all columns used in "PARTITION ON COLUMNS" are defined in create table.
1242fn ensure_partition_columns_defined<'a>(
1243    columns: &'a [Column],
1244    partitions: &'a Partitions,
1245) -> Result<Vec<&'a Column>> {
1246    partitions
1247        .column_list
1248        .iter()
1249        .map(|x| {
1250            let x = ParserContext::canonicalize_identifier(x.clone());
1251            // Normally the columns in "create table" won't be too many,
1252            // a linear search to find the target every time is fine.
1253            columns
1254                .iter()
1255                .find(|c| *c.name().value == x.value)
1256                .context(error::InvalidSqlSnafu {
1257                    msg: format!("Partition column {:?} not defined", x.value),
1258                })
1259        })
1260        .collect::<Result<Vec<&Column>>>()
1261}
1262
1263#[cfg(test)]
1264mod tests {
1265    use std::assert_matches;
1266    use std::collections::HashMap;
1267
1268    use common_catalog::consts::FILE_ENGINE;
1269    use common_error::ext::ErrorExt;
1270    use sqlparser::ast::ColumnOption::NotNull;
1271    use sqlparser::ast::{BinaryOperator, Expr, ObjectName, ObjectNamePart, Value};
1272    use sqlparser::dialect::GenericDialect;
1273    use sqlparser::tokenizer::Tokenizer;
1274
1275    use super::*;
1276    use crate::dialect::GreptimeDbDialect;
1277    use crate::parser::ParseOptions;
1278
1279    fn string_option_map(
1280        entries: impl IntoIterator<Item = (&'static str, &'static str)>,
1281    ) -> OptionMap {
1282        OptionMap::new(entries.into_iter().map(|(key, value)| {
1283            (
1284                key.to_string(),
1285                OptionValue::try_new(Expr::Value(
1286                    Value::SingleQuotedString(value.to_string()).into(),
1287                ))
1288                .unwrap(),
1289            )
1290        }))
1291    }
1292
1293    #[test]
1294    fn test_parse_create_table_like() {
1295        let sql = "CREATE TABLE t1 LIKE t2";
1296        let stmts =
1297            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1298                .unwrap();
1299
1300        assert_eq!(1, stmts.len());
1301        match &stmts[0] {
1302            Statement::CreateTableLike(c) => {
1303                assert_eq!(c.table_name.to_string(), "t1");
1304                assert_eq!(c.source_name.to_string(), "t2");
1305            }
1306            _ => unreachable!(),
1307        }
1308    }
1309
1310    #[test]
1311    fn test_validate_external_table_options() {
1312        let sql = "CREATE EXTERNAL TABLE city (
1313            host string,
1314            ts timestamp,
1315            cpu float64 default 0,
1316            memory float64,
1317            TIME INDEX (ts),
1318            PRIMARY KEY(ts, host)
1319        ) with(location='/var/data/city.csv',format='csv',foo='bar');";
1320
1321        let result =
1322            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1323        assert!(matches!(
1324            result,
1325            Err(error::Error::InvalidTableOption { .. })
1326        ));
1327    }
1328
1329    #[test]
1330    fn test_parse_create_external_table() {
1331        struct Test<'a> {
1332            sql: &'a str,
1333            expected_table_name: &'a str,
1334            expected_options: HashMap<&'a str, &'a str>,
1335            expected_engine: &'a str,
1336            expected_if_not_exist: bool,
1337        }
1338
1339        let tests = [
1340            Test {
1341                sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
1342                expected_table_name: "city",
1343                expected_options: HashMap::from([
1344                    ("location", "/var/data/city.csv"),
1345                    ("format", "csv"),
1346                ]),
1347                expected_engine: FILE_ENGINE,
1348                expected_if_not_exist: false,
1349            },
1350            Test {
1351                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');",
1352                expected_table_name: "city",
1353                expected_options: HashMap::from([
1354                    ("location", "/var/data/city.csv"),
1355                    ("format", "csv"),
1356                ]),
1357                expected_engine: "foo",
1358                expected_if_not_exist: true,
1359            },
1360            Test {
1361                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv','compaction.type'='bar');",
1362                expected_table_name: "city",
1363                expected_options: HashMap::from([
1364                    ("location", "/var/data/city.csv"),
1365                    ("format", "csv"),
1366                    ("compaction.type", "bar"),
1367                ]),
1368                expected_engine: "foo",
1369                expected_if_not_exist: true,
1370            },
1371        ];
1372
1373        for test in tests {
1374            let stmts = ParserContext::create_with_dialect(
1375                test.sql,
1376                &GreptimeDbDialect {},
1377                ParseOptions::default(),
1378            )
1379            .unwrap();
1380            assert_eq!(1, stmts.len());
1381            match &stmts[0] {
1382                Statement::CreateExternalTable(c) => {
1383                    assert_eq!(c.name.to_string(), test.expected_table_name.to_string());
1384                    assert_eq!(c.options.to_str_map(), test.expected_options);
1385                    assert_eq!(c.if_not_exists, test.expected_if_not_exist);
1386                    assert_eq!(c.engine, test.expected_engine);
1387                }
1388                _ => unreachable!(),
1389            }
1390        }
1391    }
1392
1393    #[test]
1394    fn test_parse_create_external_table_with_schema() {
1395        let sql = "CREATE EXTERNAL TABLE city (
1396            host string,
1397            ts timestamp,
1398            cpu float32 default 0,
1399            memory float64,
1400            TIME INDEX (ts),
1401            PRIMARY KEY(ts, host),
1402        ) with(location='/var/data/city.csv',format='csv');";
1403
1404        let options = HashMap::from([("location", "/var/data/city.csv"), ("format", "csv")]);
1405
1406        let stmts =
1407            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1408                .unwrap();
1409        assert_eq!(1, stmts.len());
1410        match &stmts[0] {
1411            Statement::CreateExternalTable(c) => {
1412                assert_eq!(c.name.to_string(), "city");
1413                assert_eq!(c.options.to_str_map(), options);
1414
1415                let columns = &c.columns;
1416                assert_column_def(&columns[0].column_def, "host", "STRING");
1417                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
1418                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
1419                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
1420
1421                let constraints = &c.constraints;
1422                assert_eq!(
1423                    &constraints[0],
1424                    &TableConstraint::TimeIndex {
1425                        column: Ident::new("ts"),
1426                    }
1427                );
1428                assert_eq!(
1429                    &constraints[1],
1430                    &TableConstraint::PrimaryKey {
1431                        columns: vec![Ident::new("ts"), Ident::new("host")]
1432                    }
1433                );
1434            }
1435            _ => unreachable!(),
1436        }
1437    }
1438
1439    #[test]
1440    fn test_parse_create_database() {
1441        let sql = "create database";
1442        let result =
1443            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1444        assert!(
1445            result
1446                .unwrap_err()
1447                .to_string()
1448                .contains("Unexpected token while parsing SQL statement")
1449        );
1450
1451        let sql = "create database prometheus";
1452        let stmts =
1453            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1454                .unwrap();
1455
1456        assert_eq!(1, stmts.len());
1457        match &stmts[0] {
1458            Statement::CreateDatabase(c) => {
1459                assert_eq!(c.name.to_string(), "prometheus");
1460                assert!(!c.if_not_exists);
1461            }
1462            _ => unreachable!(),
1463        }
1464
1465        let sql = "create database if not exists prometheus";
1466        let stmts =
1467            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1468                .unwrap();
1469
1470        assert_eq!(1, stmts.len());
1471        match &stmts[0] {
1472            Statement::CreateDatabase(c) => {
1473                assert_eq!(c.name.to_string(), "prometheus");
1474                assert!(c.if_not_exists);
1475            }
1476            _ => unreachable!(),
1477        }
1478
1479        let sql = "CREATE DATABASE `fOo`";
1480        let result =
1481            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1482        let stmts = result.unwrap();
1483        match &stmts.last().unwrap() {
1484            Statement::CreateDatabase(c) => {
1485                assert_eq!(c.name, vec![Ident::with_quote('`', "fOo")].into());
1486                assert!(!c.if_not_exists);
1487            }
1488            _ => unreachable!(),
1489        }
1490
1491        let sql = "CREATE DATABASE prometheus with (ttl='1h');";
1492        let result =
1493            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1494        let stmts = result.unwrap();
1495        match &stmts[0] {
1496            Statement::CreateDatabase(c) => {
1497                assert_eq!(c.name.to_string(), "prometheus");
1498                assert!(!c.if_not_exists);
1499                assert_eq!(c.options.get("ttl").unwrap(), "1h");
1500            }
1501            _ => unreachable!(),
1502        }
1503    }
1504
1505    #[test]
1506    fn test_parse_create_flow_more_testcases() {
1507        use pretty_assertions::assert_eq;
1508        fn parse_create_flow(sql: &str) -> CreateFlow {
1509            let stmts = ParserContext::create_with_dialect(
1510                sql,
1511                &GreptimeDbDialect {},
1512                ParseOptions::default(),
1513            )
1514            .unwrap();
1515            assert_eq!(1, stmts.len());
1516            match &stmts[0] {
1517                Statement::CreateFlow(c) => c.clone(),
1518                _ => unreachable!(),
1519            }
1520        }
1521        struct CreateFlowWoutQuery {
1522            /// Flow name
1523            pub flow_name: ObjectName,
1524            /// Output (sink) table name
1525            pub sink_table_name: ObjectName,
1526            /// Whether to replace existing task
1527            pub or_replace: bool,
1528            /// Create if not exist
1529            pub if_not_exists: bool,
1530            /// `EXPIRE AFTER`
1531            /// Duration in second as `i64`
1532            pub expire_after: Option<i64>,
1533            /// Comment string
1534            pub comment: Option<String>,
1535            /// Flow creation options
1536            pub flow_options: OptionMap,
1537        }
1538        let testcases = vec![
1539            (
1540                r"
1541CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1542SINK TO schema_1.table_1
1543EXPIRE AFTER INTERVAL '5 minutes'
1544COMMENT 'test comment'
1545AS
1546SELECT max(c1), min(c2) FROM schema_2.table_2;",
1547                CreateFlowWoutQuery {
1548                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1549                    sink_table_name: ObjectName::from(vec![
1550                        Ident::new("schema_1"),
1551                        Ident::new("table_1"),
1552                    ]),
1553                    or_replace: true,
1554                    if_not_exists: true,
1555                    expire_after: Some(300),
1556                    comment: Some("test comment".to_string()),
1557                    flow_options: OptionMap::default(),
1558                },
1559            ),
1560            (
1561                r"
1562CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1563SINK TO schema_1.table_1
1564EXPIRE AFTER INTERVAL '300 s'
1565COMMENT 'test comment'
1566AS
1567SELECT max(c1), min(c2) FROM schema_2.table_2;",
1568                CreateFlowWoutQuery {
1569                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1570                    sink_table_name: ObjectName::from(vec![
1571                        Ident::new("schema_1"),
1572                        Ident::new("table_1"),
1573                    ]),
1574                    or_replace: true,
1575                    if_not_exists: true,
1576                    expire_after: Some(300),
1577                    comment: Some("test comment".to_string()),
1578                    flow_options: OptionMap::default(),
1579                },
1580            ),
1581            (
1582                r"
1583CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1584SINK TO schema_1.table_1
1585EXPIRE AFTER '5 minutes'
1586COMMENT 'test comment'
1587AS
1588SELECT max(c1), min(c2) FROM schema_2.table_2;",
1589                CreateFlowWoutQuery {
1590                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1591                    sink_table_name: ObjectName::from(vec![
1592                        Ident::new("schema_1"),
1593                        Ident::new("table_1"),
1594                    ]),
1595                    or_replace: true,
1596                    if_not_exists: true,
1597                    expire_after: Some(300),
1598                    comment: Some("test comment".to_string()),
1599                    flow_options: OptionMap::default(),
1600                },
1601            ),
1602            (
1603                r"
1604CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1605SINK TO schema_1.table_1
1606EXPIRE AFTER '300 s'
1607COMMENT 'test comment'
1608AS
1609SELECT max(c1), min(c2) FROM schema_2.table_2;",
1610                CreateFlowWoutQuery {
1611                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1612                    sink_table_name: ObjectName::from(vec![
1613                        Ident::new("schema_1"),
1614                        Ident::new("table_1"),
1615                    ]),
1616                    or_replace: true,
1617                    if_not_exists: true,
1618                    expire_after: Some(300),
1619                    comment: Some("test comment".to_string()),
1620                    flow_options: OptionMap::default(),
1621                },
1622            ),
1623            (
1624                r"
1625CREATE FLOW `task_2`
1626SINK TO schema_1.table_1
1627EXPIRE AFTER '2 days 1h 2 min'
1628AS
1629SELECT max(c1), min(c2) FROM schema_2.table_2;",
1630                CreateFlowWoutQuery {
1631                    flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_2")]),
1632                    sink_table_name: ObjectName::from(vec![
1633                        Ident::new("schema_1"),
1634                        Ident::new("table_1"),
1635                    ]),
1636                    or_replace: false,
1637                    if_not_exists: false,
1638                    expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1639                    comment: None,
1640                    flow_options: OptionMap::default(),
1641                },
1642            ),
1643            (
1644                r"
1645create flow `task_3`
1646sink to schema_1.table_1
1647expire after '10 minutes'
1648as
1649select max(c1), min(c2) from schema_2.table_2;",
1650                CreateFlowWoutQuery {
1651                    flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_3")]),
1652                    sink_table_name: ObjectName::from(vec![
1653                        Ident::new("schema_1"),
1654                        Ident::new("table_1"),
1655                    ]),
1656                    or_replace: false,
1657                    if_not_exists: false,
1658                    expire_after: Some(600), // 10 minutes in seconds
1659                    comment: None,
1660                    flow_options: OptionMap::default(),
1661                },
1662            ),
1663            (
1664                r"
1665create or replace flow if not exists task_4
1666sink to schema_1.table_1
1667expire after interval '2 hours'
1668comment 'lowercase test'
1669as
1670select max(c1), min(c2) from schema_2.table_2;",
1671                CreateFlowWoutQuery {
1672                    flow_name: ObjectName::from(vec![Ident::new("task_4")]),
1673                    sink_table_name: ObjectName::from(vec![
1674                        Ident::new("schema_1"),
1675                        Ident::new("table_1"),
1676                    ]),
1677                    or_replace: true,
1678                    if_not_exists: true,
1679                    expire_after: Some(7200), // 2 hours in seconds
1680                    comment: Some("lowercase test".to_string()),
1681                    flow_options: OptionMap::default(),
1682                },
1683            ),
1684            (
1685                r"
1686CREATE FLOW task_5
1687SINK TO schema_1.table_1
1688WITH (defer_on_missing_source = 'true')
1689AS
1690SELECT max(c1), min(c2) FROM schema_2.table_2;",
1691                CreateFlowWoutQuery {
1692                    flow_name: ObjectName::from(vec![Ident::new("task_5")]),
1693                    sink_table_name: ObjectName::from(vec![
1694                        Ident::new("schema_1"),
1695                        Ident::new("table_1"),
1696                    ]),
1697                    or_replace: false,
1698                    if_not_exists: false,
1699                    expire_after: None,
1700                    comment: None,
1701                    flow_options: string_option_map([("defer_on_missing_source", "true")]),
1702                },
1703            ),
1704        ];
1705
1706        for (sql, expected) in testcases {
1707            let create_task = parse_create_flow(sql);
1708
1709            let expected = CreateFlow {
1710                flow_name: expected.flow_name,
1711                sink_table_name: expected.sink_table_name,
1712                or_replace: expected.or_replace,
1713                if_not_exists: expected.if_not_exists,
1714                expire_after: expected.expire_after,
1715                eval_interval: None,
1716                comment: expected.comment,
1717                flow_options: expected.flow_options,
1718                // ignore query parse result
1719                query: create_task.query.clone(),
1720            };
1721
1722            assert_eq!(create_task, expected, "input sql is:\n{sql}");
1723            let show_create = create_task.to_string();
1724            let recreated = parse_create_flow(&show_create);
1725            assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1726        }
1727    }
1728
1729    #[test]
1730    fn test_parse_create_flow() {
1731        use pretty_assertions::assert_eq;
1732        fn parse_create_flow(sql: &str) -> CreateFlow {
1733            let stmts = ParserContext::create_with_dialect(
1734                sql,
1735                &GreptimeDbDialect {},
1736                ParseOptions::default(),
1737            )
1738            .unwrap();
1739            assert_eq!(1, stmts.len());
1740            match &stmts[0] {
1741                Statement::CreateFlow(c) => c.clone(),
1742                _ => panic!("{:?}", stmts[0]),
1743            }
1744        }
1745        struct CreateFlowWoutQuery {
1746            /// Flow name
1747            pub flow_name: ObjectName,
1748            /// Output (sink) table name
1749            pub sink_table_name: ObjectName,
1750            /// Whether to replace existing task
1751            pub or_replace: bool,
1752            /// Create if not exist
1753            pub if_not_exists: bool,
1754            /// `EXPIRE AFTER`
1755            /// Duration in second as `i64`
1756            pub expire_after: Option<i64>,
1757            /// Duration for flow evaluation interval
1758            /// Duration in seconds as `i64`
1759            /// If not set, flow will be evaluated based on time window size and other args.
1760            pub eval_interval: Option<i64>,
1761            /// Comment string
1762            pub comment: Option<String>,
1763            /// Flow creation options
1764            pub flow_options: OptionMap,
1765        }
1766
1767        // create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE AFTER` and `COMMENT`
1768        let testcases = vec![
1769            (
1770                r"
1771CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1772SINK TO schema_1.table_1
1773EXPIRE AFTER INTERVAL '5 minutes'
1774COMMENT 'test comment'
1775AS
1776SELECT max(c1), min(c2) FROM schema_2.table_2;",
1777                CreateFlowWoutQuery {
1778                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1779                    sink_table_name: ObjectName(vec![
1780                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1781                        ObjectNamePart::Identifier(Ident::new("table_1")),
1782                    ]),
1783                    or_replace: true,
1784                    if_not_exists: true,
1785                    expire_after: Some(300),
1786                    eval_interval: None,
1787                    comment: Some("test comment".to_string()),
1788                    flow_options: OptionMap::default(),
1789                },
1790            ),
1791            (
1792                r"
1793CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1794SINK TO schema_1.table_1
1795EXPIRE AFTER INTERVAL '300 s'
1796COMMENT 'test comment'
1797AS
1798SELECT max(c1), min(c2) FROM schema_2.table_2;",
1799                CreateFlowWoutQuery {
1800                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1801                    sink_table_name: ObjectName(vec![
1802                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1803                        ObjectNamePart::Identifier(Ident::new("table_1")),
1804                    ]),
1805                    or_replace: true,
1806                    if_not_exists: true,
1807                    expire_after: Some(300),
1808                    eval_interval: None,
1809                    comment: Some("test comment".to_string()),
1810                    flow_options: OptionMap::default(),
1811                },
1812            ),
1813            (
1814                r"
1815CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1816SINK TO schema_1.table_1
1817EXPIRE AFTER '5 minutes'
1818EVAL INTERVAL '10 seconds'
1819COMMENT 'test comment'
1820AS
1821SELECT max(c1), min(c2) FROM schema_2.table_2;",
1822                CreateFlowWoutQuery {
1823                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1824                    sink_table_name: ObjectName(vec![
1825                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1826                        ObjectNamePart::Identifier(Ident::new("table_1")),
1827                    ]),
1828                    or_replace: true,
1829                    if_not_exists: true,
1830                    expire_after: Some(300),
1831                    eval_interval: Some(10),
1832                    comment: Some("test comment".to_string()),
1833                    flow_options: OptionMap::default(),
1834                },
1835            ),
1836            (
1837                r"
1838CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1839SINK TO schema_1.table_1
1840EXPIRE AFTER '5 minutes'
1841EVAL INTERVAL INTERVAL '10 seconds'
1842COMMENT 'test comment'
1843AS
1844SELECT max(c1), min(c2) FROM schema_2.table_2;",
1845                CreateFlowWoutQuery {
1846                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1847                    sink_table_name: ObjectName(vec![
1848                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1849                        ObjectNamePart::Identifier(Ident::new("table_1")),
1850                    ]),
1851                    or_replace: true,
1852                    if_not_exists: true,
1853                    expire_after: Some(300),
1854                    eval_interval: Some(10),
1855                    comment: Some("test comment".to_string()),
1856                    flow_options: OptionMap::default(),
1857                },
1858            ),
1859            (
1860                r"
1861CREATE FLOW `task_2`
1862SINK TO schema_1.table_1
1863EXPIRE AFTER '2 days 1h 2 min'
1864AS
1865SELECT max(c1), min(c2) FROM schema_2.table_2;",
1866                CreateFlowWoutQuery {
1867                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::with_quote(
1868                        '`', "task_2",
1869                    ))]),
1870                    sink_table_name: ObjectName(vec![
1871                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1872                        ObjectNamePart::Identifier(Ident::new("table_1")),
1873                    ]),
1874                    or_replace: false,
1875                    if_not_exists: false,
1876                    expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1877                    eval_interval: None,
1878                    comment: None,
1879                    flow_options: OptionMap::default(),
1880                },
1881            ),
1882            (
1883                r"
1884CREATE FLOW task_3
1885SINK TO schema_1.table_1
1886EVAL INTERVAL '10 seconds'
1887WITH (defer_on_missing_source = 'true', foo = 'bar')
1888AS
1889SELECT max(c1), min(c2) FROM schema_2.table_2;",
1890                CreateFlowWoutQuery {
1891                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_3"))]),
1892                    sink_table_name: ObjectName(vec![
1893                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1894                        ObjectNamePart::Identifier(Ident::new("table_1")),
1895                    ]),
1896                    or_replace: false,
1897                    if_not_exists: false,
1898                    expire_after: None,
1899                    eval_interval: Some(10),
1900                    comment: None,
1901                    flow_options: string_option_map([
1902                        ("defer_on_missing_source", "true"),
1903                        ("foo", "bar"),
1904                    ]),
1905                },
1906            ),
1907        ];
1908
1909        for (sql, expected) in testcases {
1910            let create_task = parse_create_flow(sql);
1911
1912            let expected = CreateFlow {
1913                flow_name: expected.flow_name,
1914                sink_table_name: expected.sink_table_name,
1915                or_replace: expected.or_replace,
1916                if_not_exists: expected.if_not_exists,
1917                expire_after: expected.expire_after,
1918                eval_interval: expected.eval_interval,
1919                comment: expected.comment,
1920                flow_options: expected.flow_options,
1921                // ignore query parse result
1922                query: create_task.query.clone(),
1923            };
1924
1925            assert_eq!(create_task, expected, "input sql is:\n{sql}");
1926            let show_create = create_task.to_string();
1927            let recreated = parse_create_flow(&show_create);
1928            assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1929        }
1930    }
1931
1932    #[test]
1933    fn test_parse_create_flow_with_tql_cte_query() {
1934        let sql = r#"
1935CREATE FLOW calc_reqs_cte
1936SINK TO cnt_reqs_cte
1937EVAL INTERVAL '1m'
1938AS
1939WITH tql(the_timestamp, the_value) AS (
1940    TQL EVAL (now() - '1m'::interval, now(), '5s') metric
1941)
1942SELECT * FROM tql;
1943"#;
1944
1945        let stmts =
1946            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1947                .unwrap();
1948        assert_eq!(1, stmts.len());
1949        let Statement::CreateFlow(create_flow) = &stmts[0] else {
1950            panic!("unexpected stmt: {:?}", stmts[0]);
1951        };
1952
1953        let query = create_flow.query.to_string();
1954        assert!(query.to_uppercase().contains("WITH"));
1955        assert!(query.to_uppercase().contains("TQL EVAL"));
1956    }
1957
1958    #[test]
1959    fn test_parse_create_flow_with_sql_cte_is_unsupported() {
1960        let sql = r#"
1961CREATE FLOW f
1962SINK TO s
1963AS
1964WITH cte AS (SELECT 1) SELECT * FROM cte;
1965"#;
1966
1967        let err =
1968            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1969                .unwrap_err();
1970        let msg = err.to_string();
1971        assert!(msg.to_uppercase().contains("WITH"), "err: {msg}");
1972    }
1973
1974    #[test]
1975    fn test_parse_create_flow_with_tql_cte_requires_now_expr() {
1976        let sql = r#"
1977CREATE FLOW f
1978SINK TO s
1979EVAL INTERVAL '1m'
1980AS
1981WITH tql(ts, val) AS (
1982    TQL EVAL (0, 15, '5s') metric
1983)
1984SELECT * FROM tql;
1985"#;
1986
1987        let err =
1988            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1989                .unwrap_err();
1990
1991        let msg = format!("{err:?}");
1992        assert!(
1993            msg.contains("Expected expression containing `now()`"),
1994            "unexpected err: {msg}"
1995        );
1996    }
1997
1998    #[test]
1999    fn test_parse_create_flow_with_tql_cte_non_select_star_is_unsupported() {
2000        let sql = r#"
2001CREATE FLOW f
2002SINK TO s
2003AS
2004WITH tql(ts, val) AS (
2005    TQL EVAL (now() - '1m'::interval, now(), '5s') metric
2006)
2007SELECT ts FROM tql;
2008"#;
2009
2010        let err =
2011            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2012                .unwrap_err();
2013        assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
2014    }
2015
2016    #[test]
2017    fn test_parse_create_flow_with_tql_cte_filter_is_unsupported() {
2018        let sql = r#"
2019CREATE FLOW f
2020SINK TO s
2021AS
2022WITH tql(ts, val) AS (
2023    TQL EVAL (now() - '1m'::interval, now(), '5s') metric
2024)
2025SELECT * FROM tql WHERE ts > 0;
2026"#;
2027
2028        let err =
2029            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2030                .unwrap_err();
2031        assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
2032    }
2033
2034    #[test]
2035    fn test_parse_create_flow_with_mixed_sql_tql_cte_is_unsupported() {
2036        let sql = r#"
2037CREATE FLOW f
2038SINK TO s
2039AS
2040WITH s1 AS (SELECT 1),
2041     tql(ts, val) AS (TQL EVAL (now() - '1m'::interval, now(), '5s') metric)
2042SELECT * FROM tql;
2043"#;
2044
2045        let err =
2046            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2047                .unwrap_err();
2048        assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
2049    }
2050
2051    #[test]
2052    fn test_create_flow_no_month() {
2053        let sql = r"
2054CREATE FLOW `task_2`
2055SINK TO schema_1.table_1
2056EXPIRE AFTER '1 month 2 days 1h 2 min'
2057AS
2058SELECT max(c1), min(c2) FROM schema_2.table_2;";
2059        let stmts =
2060            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2061
2062        assert!(
2063            stmts.is_err()
2064                && stmts
2065                    .unwrap_err()
2066                    .to_string()
2067                    .contains("Interval with months is not allowed")
2068        );
2069    }
2070
2071    #[test]
2072    fn test_validate_create() {
2073        let sql = r"
2074CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp TIME INDEX)
2075PARTITION ON COLUMNS(c, a) (
2076    a < 10,
2077    a > 10 AND a < 20,
2078    a > 20 AND c < 100,
2079    a > 20 AND c >= 100
2080)
2081ENGINE=mito";
2082        let result =
2083            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2084        let _ = result.unwrap();
2085
2086        let sql = r"
2087CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2088PARTITION ON COLUMNS(x) ()
2089ENGINE=mito";
2090        let result =
2091            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2092        assert!(
2093            result
2094                .unwrap_err()
2095                .to_string()
2096                .contains("Partition column \"x\" not defined")
2097        );
2098    }
2099
2100    #[test]
2101    fn test_parse_create_table_with_partitions() {
2102        let sql = r"
2103CREATE TABLE monitor (
2104  host_id    INT,
2105  idc        STRING,
2106  ts         TIMESTAMP,
2107  cpu        DOUBLE DEFAULT 0,
2108  memory     DOUBLE,
2109  TIME INDEX (ts),
2110  PRIMARY KEY (host),
2111)
2112PARTITION ON COLUMNS(idc, host_id) (
2113  idc <= 'hz' AND host_id < 1000,
2114  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
2115  idc > 'sh' AND host_id < 3000,
2116  idc > 'sh' AND host_id >= 3000,
2117)
2118ENGINE=mito";
2119        let result =
2120            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2121                .unwrap();
2122        assert_eq!(result.len(), 1);
2123        match &result[0] {
2124            Statement::CreateTable(c) => {
2125                assert!(c.partitions.is_some());
2126
2127                let partitions = c.partitions.as_ref().unwrap();
2128                let column_list = partitions
2129                    .column_list
2130                    .iter()
2131                    .map(|x| &x.value)
2132                    .collect::<Vec<&String>>();
2133                assert_eq!(column_list, vec!["idc", "host_id"]);
2134
2135                let exprs = &partitions.exprs;
2136
2137                assert_eq!(
2138                    exprs[0],
2139                    Expr::BinaryOp {
2140                        left: Box::new(Expr::BinaryOp {
2141                            left: Box::new(Expr::Identifier("idc".into())),
2142                            op: BinaryOperator::LtEq,
2143                            right: Box::new(Expr::Value(
2144                                Value::SingleQuotedString("hz".to_string()).into()
2145                            ))
2146                        }),
2147                        op: BinaryOperator::And,
2148                        right: Box::new(Expr::BinaryOp {
2149                            left: Box::new(Expr::Identifier("host_id".into())),
2150                            op: BinaryOperator::Lt,
2151                            right: Box::new(Expr::Value(
2152                                Value::Number("1000".to_string(), false).into()
2153                            ))
2154                        })
2155                    }
2156                );
2157                assert_eq!(
2158                    exprs[1],
2159                    Expr::BinaryOp {
2160                        left: Box::new(Expr::BinaryOp {
2161                            left: Box::new(Expr::BinaryOp {
2162                                left: Box::new(Expr::Identifier("idc".into())),
2163                                op: BinaryOperator::Gt,
2164                                right: Box::new(Expr::Value(
2165                                    Value::SingleQuotedString("hz".to_string()).into()
2166                                ))
2167                            }),
2168                            op: BinaryOperator::And,
2169                            right: Box::new(Expr::BinaryOp {
2170                                left: Box::new(Expr::Identifier("idc".into())),
2171                                op: BinaryOperator::LtEq,
2172                                right: Box::new(Expr::Value(
2173                                    Value::SingleQuotedString("sh".to_string()).into()
2174                                ))
2175                            })
2176                        }),
2177                        op: BinaryOperator::And,
2178                        right: Box::new(Expr::BinaryOp {
2179                            left: Box::new(Expr::Identifier("host_id".into())),
2180                            op: BinaryOperator::Lt,
2181                            right: Box::new(Expr::Value(
2182                                Value::Number("2000".to_string(), false).into()
2183                            ))
2184                        })
2185                    }
2186                );
2187                assert_eq!(
2188                    exprs[2],
2189                    Expr::BinaryOp {
2190                        left: Box::new(Expr::BinaryOp {
2191                            left: Box::new(Expr::Identifier("idc".into())),
2192                            op: BinaryOperator::Gt,
2193                            right: Box::new(Expr::Value(
2194                                Value::SingleQuotedString("sh".to_string()).into()
2195                            ))
2196                        }),
2197                        op: BinaryOperator::And,
2198                        right: Box::new(Expr::BinaryOp {
2199                            left: Box::new(Expr::Identifier("host_id".into())),
2200                            op: BinaryOperator::Lt,
2201                            right: Box::new(Expr::Value(
2202                                Value::Number("3000".to_string(), false).into()
2203                            ))
2204                        })
2205                    }
2206                );
2207                assert_eq!(
2208                    exprs[3],
2209                    Expr::BinaryOp {
2210                        left: Box::new(Expr::BinaryOp {
2211                            left: Box::new(Expr::Identifier("idc".into())),
2212                            op: BinaryOperator::Gt,
2213                            right: Box::new(Expr::Value(
2214                                Value::SingleQuotedString("sh".to_string()).into()
2215                            ))
2216                        }),
2217                        op: BinaryOperator::And,
2218                        right: Box::new(Expr::BinaryOp {
2219                            left: Box::new(Expr::Identifier("host_id".into())),
2220                            op: BinaryOperator::GtEq,
2221                            right: Box::new(Expr::Value(
2222                                Value::Number("3000".to_string(), false).into()
2223                            ))
2224                        })
2225                    }
2226                );
2227            }
2228            _ => unreachable!(),
2229        }
2230    }
2231
2232    #[test]
2233    fn test_parse_create_table_with_quoted_partitions() {
2234        let sql = r"
2235CREATE TABLE monitor (
2236  `host_id`    INT,
2237  idc        STRING,
2238  ts         TIMESTAMP,
2239  cpu        DOUBLE DEFAULT 0,
2240  memory     DOUBLE,
2241  TIME INDEX (ts),
2242  PRIMARY KEY (host),
2243)
2244PARTITION ON COLUMNS(IdC, host_id) (
2245  idc <= 'hz' AND host_id < 1000,
2246  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
2247  idc > 'sh' AND host_id < 3000,
2248  idc > 'sh' AND host_id >= 3000,
2249)";
2250        let result =
2251            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2252                .unwrap();
2253        assert_eq!(result.len(), 1);
2254    }
2255
2256    #[test]
2257    fn test_parse_create_table_with_timestamp_index() {
2258        let sql1 = r"
2259CREATE TABLE monitor (
2260  host_id    INT,
2261  idc        STRING,
2262  ts         TIMESTAMP TIME INDEX,
2263  cpu        DOUBLE DEFAULT 0,
2264  memory     DOUBLE,
2265  PRIMARY KEY (host),
2266)
2267ENGINE=mito";
2268        let result1 = ParserContext::create_with_dialect(
2269            sql1,
2270            &GreptimeDbDialect {},
2271            ParseOptions::default(),
2272        )
2273        .unwrap();
2274
2275        if let Statement::CreateTable(c) = &result1[0] {
2276            assert_eq!(c.constraints.len(), 2);
2277            let tc = c.constraints[0].clone();
2278            match tc {
2279                TableConstraint::TimeIndex { column } => {
2280                    assert_eq!(&column.value, "ts");
2281                }
2282                _ => panic!("should be time index constraint"),
2283            };
2284        } else {
2285            panic!("should be create_table statement");
2286        }
2287
2288        // `TIME INDEX` should be in front of `PRIMARY KEY`
2289        // in order to equal the `TIMESTAMP TIME INDEX` constraint options vector
2290        let sql2 = r"
2291CREATE TABLE monitor (
2292  host_id    INT,
2293  idc        STRING,
2294  ts         TIMESTAMP NOT NULL,
2295  cpu        DOUBLE DEFAULT 0,
2296  memory     DOUBLE,
2297  TIME INDEX (ts),
2298  PRIMARY KEY (host),
2299)
2300ENGINE=mito";
2301        let result2 = ParserContext::create_with_dialect(
2302            sql2,
2303            &GreptimeDbDialect {},
2304            ParseOptions::default(),
2305        )
2306        .unwrap();
2307
2308        assert_eq!(result1, result2);
2309
2310        // TIMESTAMP can be NULL which is not equal to above
2311        let sql3 = r"
2312CREATE TABLE monitor (
2313  host_id    INT,
2314  idc        STRING,
2315  ts         TIMESTAMP,
2316  cpu        DOUBLE DEFAULT 0,
2317  memory     DOUBLE,
2318  TIME INDEX (ts),
2319  PRIMARY KEY (host),
2320)
2321ENGINE=mito";
2322
2323        let result3 = ParserContext::create_with_dialect(
2324            sql3,
2325            &GreptimeDbDialect {},
2326            ParseOptions::default(),
2327        )
2328        .unwrap();
2329
2330        assert_ne!(result1, result3);
2331
2332        // BIGINT can't be time index any more
2333        let sql1 = r"
2334CREATE TABLE monitor (
2335  host_id    INT,
2336  idc        STRING,
2337  b          bigint TIME INDEX,
2338  cpu        DOUBLE DEFAULT 0,
2339  memory     DOUBLE,
2340  PRIMARY KEY (host),
2341)
2342ENGINE=mito";
2343        let result1 = ParserContext::create_with_dialect(
2344            sql1,
2345            &GreptimeDbDialect {},
2346            ParseOptions::default(),
2347        );
2348
2349        assert!(
2350            result1
2351                .unwrap_err()
2352                .to_string()
2353                .contains("time index column data type should be timestamp")
2354        );
2355    }
2356
2357    #[test]
2358    fn test_parse_create_table_with_timestamp_index_not_null() {
2359        let sql = r"
2360CREATE TABLE monitor (
2361  host_id    INT,
2362  idc        STRING,
2363  ts         TIMESTAMP TIME INDEX,
2364  cpu        DOUBLE DEFAULT 0,
2365  memory     DOUBLE,
2366  TIME INDEX (ts),
2367  PRIMARY KEY (host),
2368)
2369ENGINE=mito";
2370        let result =
2371            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2372                .unwrap();
2373
2374        assert_eq!(result.len(), 1);
2375        if let Statement::CreateTable(c) = &result[0] {
2376            let ts = c.columns[2].clone();
2377            assert_eq!(ts.name().to_string(), "ts");
2378            assert_eq!(ts.options()[0].option, NotNull);
2379        } else {
2380            panic!("should be create table statement");
2381        }
2382
2383        let sql1 = r"
2384CREATE TABLE monitor (
2385  host_id    INT,
2386  idc        STRING,
2387  ts         TIMESTAMP NOT NULL TIME INDEX,
2388  cpu        DOUBLE DEFAULT 0,
2389  memory     DOUBLE,
2390  TIME INDEX (ts),
2391  PRIMARY KEY (host),
2392)
2393ENGINE=mito";
2394
2395        let result1 = ParserContext::create_with_dialect(
2396            sql1,
2397            &GreptimeDbDialect {},
2398            ParseOptions::default(),
2399        )
2400        .unwrap();
2401        assert_eq!(result, result1);
2402
2403        let sql2 = r"
2404CREATE TABLE monitor (
2405  host_id    INT,
2406  idc        STRING,
2407  ts         TIMESTAMP TIME INDEX NOT NULL,
2408  cpu        DOUBLE DEFAULT 0,
2409  memory     DOUBLE,
2410  TIME INDEX (ts),
2411  PRIMARY KEY (host),
2412)
2413ENGINE=mito";
2414
2415        let result2 = ParserContext::create_with_dialect(
2416            sql2,
2417            &GreptimeDbDialect {},
2418            ParseOptions::default(),
2419        )
2420        .unwrap();
2421        assert_eq!(result, result2);
2422
2423        let sql3 = r"
2424CREATE TABLE monitor (
2425  host_id    INT,
2426  idc        STRING,
2427  ts         TIMESTAMP TIME INDEX NULL NOT,
2428  cpu        DOUBLE DEFAULT 0,
2429  memory     DOUBLE,
2430  TIME INDEX (ts),
2431  PRIMARY KEY (host),
2432)
2433ENGINE=mito";
2434
2435        let result3 = ParserContext::create_with_dialect(
2436            sql3,
2437            &GreptimeDbDialect {},
2438            ParseOptions::default(),
2439        );
2440        assert!(result3.is_err());
2441
2442        let sql4 = r"
2443CREATE TABLE monitor (
2444  host_id    INT,
2445  idc        STRING,
2446  ts         TIMESTAMP TIME INDEX NOT NULL NULL,
2447  cpu        DOUBLE DEFAULT 0,
2448  memory     DOUBLE,
2449  TIME INDEX (ts),
2450  PRIMARY KEY (host),
2451)
2452ENGINE=mito";
2453
2454        let result4 = ParserContext::create_with_dialect(
2455            sql4,
2456            &GreptimeDbDialect {},
2457            ParseOptions::default(),
2458        );
2459        assert!(result4.is_err());
2460
2461        let sql = r"
2462CREATE TABLE monitor (
2463  host_id    INT,
2464  idc        STRING,
2465  ts         TIMESTAMP TIME INDEX DEFAULT CURRENT_TIMESTAMP,
2466  cpu        DOUBLE DEFAULT 0,
2467  memory     DOUBLE,
2468  TIME INDEX (ts),
2469  PRIMARY KEY (host),
2470)
2471ENGINE=mito";
2472
2473        let result =
2474            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2475                .unwrap();
2476
2477        if let Statement::CreateTable(c) = &result[0] {
2478            let tc = c.constraints[0].clone();
2479            match tc {
2480                TableConstraint::TimeIndex { column } => {
2481                    assert_eq!(&column.value, "ts");
2482                }
2483                _ => panic!("should be time index constraint"),
2484            }
2485            let ts = c.columns[2].clone();
2486            assert_eq!(ts.name().to_string(), "ts");
2487            assert!(matches!(ts.options()[0].option, ColumnOption::Default(..)));
2488            assert_eq!(ts.options()[1].option, NotNull);
2489        } else {
2490            unreachable!("should be create table statement");
2491        }
2492    }
2493
2494    #[test]
2495    fn test_parse_partitions_with_error_syntax() {
2496        let sql = r"
2497CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2498PARTITION COLUMNS(c, a) (
2499    a < 10,
2500    a > 10 AND a < 20,
2501    a > 20 AND c < 100,
2502    a > 20 AND c >= 100
2503)
2504ENGINE=mito";
2505        let result =
2506            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2507        assert!(
2508            result
2509                .unwrap_err()
2510                .output_msg()
2511                .contains("sql parser error: Expected: ON, found: COLUMNS")
2512        );
2513    }
2514
2515    #[test]
2516    fn test_parse_partitions_without_rule() {
2517        let sql = r"
2518CREATE TABLE rcx ( a INT, b STRING, c INT, d TIMESTAMP TIME INDEX )
2519PARTITION ON COLUMNS(c, a) ()
2520ENGINE=mito";
2521        ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2522            .unwrap();
2523    }
2524
2525    #[test]
2526    fn test_parse_partitions_unreferenced_column() {
2527        let sql = r"
2528CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2529PARTITION ON COLUMNS(c, a) (
2530    b = 'foo'
2531)
2532ENGINE=mito";
2533        let result =
2534            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2535        assert_eq!(
2536            result.unwrap_err().output_msg(),
2537            "Invalid SQL, error: Column \"b\" in rule expr is not referenced in PARTITION ON"
2538        );
2539    }
2540
2541    #[test]
2542    fn test_parse_partitions_not_binary_expr() {
2543        let sql = r"
2544CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2545PARTITION ON COLUMNS(c, a) (
2546    b
2547)
2548ENGINE=mito";
2549        let result =
2550            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2551        assert_eq!(
2552            result.unwrap_err().output_msg(),
2553            r#"Invalid SQL, error: Partition rule expr Identifier(Ident { value: "b", quote_style: None, span: Span(Location(4,5)..Location(4,6)) }) is not a binary expr"#
2554        );
2555    }
2556
2557    fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {
2558        assert_eq!(column.name.to_string(), name);
2559        assert_eq!(column.data_type.to_string(), data_type);
2560    }
2561
2562    #[test]
2563    pub fn test_parse_create_table() {
2564        let sql = r"create table demo(
2565                             host string,
2566                             ts timestamp,
2567                             cpu float32 default 0,
2568                             memory float64,
2569                             TIME INDEX (ts),
2570                             PRIMARY KEY(ts, host),
2571                             ) engine=mito
2572                             with(ttl='10s');
2573         ";
2574        let result =
2575            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2576                .unwrap();
2577        assert_eq!(1, result.len());
2578        match &result[0] {
2579            Statement::CreateTable(c) => {
2580                assert!(!c.if_not_exists);
2581                assert_eq!("demo", c.name.to_string());
2582                assert_eq!("mito", c.engine);
2583                assert_eq!(4, c.columns.len());
2584                let columns = &c.columns;
2585                assert_column_def(&columns[0].column_def, "host", "STRING");
2586                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
2587                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
2588                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
2589
2590                let constraints = &c.constraints;
2591                assert_eq!(
2592                    &constraints[0],
2593                    &TableConstraint::TimeIndex {
2594                        column: Ident::new("ts"),
2595                    }
2596                );
2597                assert_eq!(
2598                    &constraints[1],
2599                    &TableConstraint::PrimaryKey {
2600                        columns: vec![Ident::new("ts"), Ident::new("host")]
2601                    }
2602                );
2603                // inverted index is merged into column options
2604                assert_eq!(1, c.options.len());
2605                assert_eq!(
2606                    [("ttl", "10s")].into_iter().collect::<HashMap<_, _>>(),
2607                    c.options.to_str_map()
2608                );
2609            }
2610            _ => unreachable!(),
2611        }
2612    }
2613
2614    #[test]
2615    fn test_invalid_index_keys() {
2616        let sql = r"create table demo(
2617                             host string,
2618                             ts int64,
2619                             cpu float64 default 0,
2620                             memory float64,
2621                             TIME INDEX (ts, host),
2622                             PRIMARY KEY(ts, host)) engine=mito;
2623         ";
2624        let result =
2625            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2626        assert!(result.is_err());
2627        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2628    }
2629
2630    #[test]
2631    fn test_duplicated_time_index() {
2632        let sql = r"create table demo(
2633                             host string,
2634                             ts timestamp time index,
2635                             t timestamp time index,
2636                             cpu float64 default 0,
2637                             memory float64,
2638                             TIME INDEX (ts, host),
2639                             PRIMARY KEY(ts, host)) engine=mito;
2640         ";
2641        let result =
2642            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2643        assert!(result.is_err());
2644        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2645
2646        let sql = r"create table demo(
2647                             host string,
2648                             ts timestamp time index,
2649                             cpu float64 default 0,
2650                             t timestamp,
2651                             memory float64,
2652                             TIME INDEX (t),
2653                             PRIMARY KEY(ts, host)) engine=mito;
2654         ";
2655        let result =
2656            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2657        assert!(result.is_err());
2658        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2659    }
2660
2661    #[test]
2662    fn test_invalid_column_name() {
2663        let sql = "create table foo(user string, i timestamp time index)";
2664        let result =
2665            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2666        let err = result.unwrap_err().output_msg();
2667        assert!(err.contains("Cannot use keyword 'user' as column name"));
2668
2669        // If column name is quoted, it's valid even same with keyword.
2670        let sql = r#"
2671            create table foo("user" string, i timestamp time index)
2672        "#;
2673        let result =
2674            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2675        let _ = result.unwrap();
2676    }
2677
2678    #[test]
2679    fn test_incorrect_default_value_issue_3479() {
2680        let sql = r#"CREATE TABLE `ExcePTuRi`(
2681non TIMESTAMP(6) TIME INDEX,
2682`iUSTO` DOUBLE DEFAULT 0.047318541668048164
2683)"#;
2684        let result =
2685            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2686                .unwrap();
2687        assert_eq!(1, result.len());
2688        match &result[0] {
2689            Statement::CreateTable(c) => {
2690                assert_eq!(
2691                    "`iUSTO` DOUBLE DEFAULT 0.047318541668048164",
2692                    c.columns[1].to_string()
2693                );
2694            }
2695            _ => unreachable!(),
2696        }
2697    }
2698
2699    #[test]
2700    fn test_parse_create_view() {
2701        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
2702
2703        let result =
2704            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2705                .unwrap();
2706        match &result[0] {
2707            Statement::CreateView(c) => {
2708                assert_eq!(c.to_string(), sql);
2709                assert!(!c.or_replace);
2710                assert!(!c.if_not_exists);
2711                assert_eq!("test", c.name.to_string());
2712            }
2713            _ => unreachable!(),
2714        }
2715
2716        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS";
2717
2718        let result =
2719            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2720                .unwrap();
2721        match &result[0] {
2722            Statement::CreateView(c) => {
2723                assert_eq!(c.to_string(), sql);
2724                assert!(c.or_replace);
2725                assert!(c.if_not_exists);
2726                assert_eq!("test", c.name.to_string());
2727            }
2728            _ => unreachable!(),
2729        }
2730    }
2731
2732    #[test]
2733    fn test_parse_create_view_invalid_query() {
2734        let sql = "CREATE VIEW test AS DELETE from demo";
2735        let result =
2736            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2737        assert!(result.is_ok_and(|x| x.len() == 1));
2738    }
2739
2740    #[test]
2741    fn test_parse_create_table_fulltext_options() {
2742        let sql1 = r"
2743CREATE TABLE log (
2744    ts TIMESTAMP TIME INDEX,
2745    msg TEXT FULLTEXT INDEX,
2746)";
2747        let result1 = ParserContext::create_with_dialect(
2748            sql1,
2749            &GreptimeDbDialect {},
2750            ParseOptions::default(),
2751        )
2752        .unwrap();
2753
2754        if let Statement::CreateTable(c) = &result1[0] {
2755            c.columns.iter().for_each(|col| {
2756                if col.name().value == "msg" {
2757                    assert!(
2758                        col.extensions
2759                            .fulltext_index_options
2760                            .as_ref()
2761                            .unwrap()
2762                            .is_empty()
2763                    );
2764                }
2765            });
2766        } else {
2767            panic!("should be create_table statement");
2768        }
2769
2770        let sql2 = r"
2771CREATE TABLE log (
2772    ts TIMESTAMP TIME INDEX,
2773    msg STRING FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false')
2774)";
2775        let result2 = ParserContext::create_with_dialect(
2776            sql2,
2777            &GreptimeDbDialect {},
2778            ParseOptions::default(),
2779        )
2780        .unwrap();
2781
2782        if let Statement::CreateTable(c) = &result2[0] {
2783            c.columns.iter().for_each(|col| {
2784                if col.name().value == "msg" {
2785                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2786                    assert_eq!(options.len(), 2);
2787                    assert_eq!(options.get("analyzer").unwrap(), "English");
2788                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2789                }
2790            });
2791        } else {
2792            panic!("should be create_table statement");
2793        }
2794
2795        let sql3 = r"
2796CREATE TABLE log (
2797    ts TIMESTAMP TIME INDEX,
2798    msg1 TINYTEXT FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false'),
2799    msg2 CHAR(20) FULLTEXT INDEX WITH (analyzer='Chinese', case_sensitive='true')
2800)";
2801        let result3 = ParserContext::create_with_dialect(
2802            sql3,
2803            &GreptimeDbDialect {},
2804            ParseOptions::default(),
2805        )
2806        .unwrap();
2807
2808        if let Statement::CreateTable(c) = &result3[0] {
2809            c.columns.iter().for_each(|col| {
2810                if col.name().value == "msg1" {
2811                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2812                    assert_eq!(options.len(), 2);
2813                    assert_eq!(options.get("analyzer").unwrap(), "English");
2814                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2815                } else if col.name().value == "msg2" {
2816                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2817                    assert_eq!(options.len(), 2);
2818                    assert_eq!(options.get("analyzer").unwrap(), "Chinese");
2819                    assert_eq!(options.get("case_sensitive").unwrap(), "true");
2820                }
2821            });
2822        } else {
2823            panic!("should be create_table statement");
2824        }
2825    }
2826
2827    #[test]
2828    fn test_parse_create_table_fulltext_options_invalid_type() {
2829        let sql = r"
2830CREATE TABLE log (
2831    ts TIMESTAMP TIME INDEX,
2832    msg INT FULLTEXT INDEX,
2833)";
2834        let result =
2835            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2836        assert!(result.is_err());
2837        assert!(
2838            result
2839                .unwrap_err()
2840                .to_string()
2841                .contains("FULLTEXT index only supports string type")
2842        );
2843    }
2844
2845    #[test]
2846    fn test_parse_create_table_fulltext_options_duplicate() {
2847        let sql = r"
2848CREATE TABLE log (
2849    ts TIMESTAMP TIME INDEX,
2850    msg STRING FULLTEXT INDEX WITH (analyzer='English', analyzer='Chinese') FULLTEXT INDEX WITH (case_sensitive='false')
2851)";
2852        let result =
2853            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2854        assert!(result.is_err());
2855        assert!(
2856            result
2857                .unwrap_err()
2858                .to_string()
2859                .contains("duplicated FULLTEXT INDEX option")
2860        );
2861    }
2862
2863    #[test]
2864    fn test_parse_create_table_fulltext_options_invalid_option() {
2865        let sql = r"
2866CREATE TABLE log (
2867    ts TIMESTAMP TIME INDEX,
2868    msg STRING FULLTEXT INDEX WITH (analyzer='English', invalid_option='Chinese')
2869)";
2870        let result =
2871            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2872        assert!(result.is_err());
2873        assert!(
2874            result
2875                .unwrap_err()
2876                .to_string()
2877                .contains("invalid FULLTEXT INDEX option")
2878        );
2879    }
2880
2881    #[test]
2882    fn test_parse_create_table_skip_options() {
2883        let sql = r"
2884CREATE TABLE log (
2885    ts TIMESTAMP TIME INDEX,
2886    msg INT SKIPPING INDEX WITH (granularity='8192', type='bloom'),
2887)";
2888        let result =
2889            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2890                .unwrap();
2891
2892        if let Statement::CreateTable(c) = &result[0] {
2893            c.columns.iter().for_each(|col| {
2894                if col.name().value == "msg" {
2895                    assert!(
2896                        !col.extensions
2897                            .skipping_index_options
2898                            .as_ref()
2899                            .unwrap()
2900                            .is_empty()
2901                    );
2902                }
2903            });
2904        } else {
2905            panic!("should be create_table statement");
2906        }
2907
2908        let sql = r"
2909        CREATE TABLE log (
2910            ts TIMESTAMP TIME INDEX,
2911            msg INT SKIPPING INDEX,
2912        )";
2913        let result =
2914            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2915                .unwrap();
2916
2917        if let Statement::CreateTable(c) = &result[0] {
2918            c.columns.iter().for_each(|col| {
2919                if col.name().value == "msg" {
2920                    assert!(
2921                        col.extensions
2922                            .skipping_index_options
2923                            .as_ref()
2924                            .unwrap()
2925                            .is_empty()
2926                    );
2927                }
2928            });
2929        } else {
2930            panic!("should be create_table statement");
2931        }
2932    }
2933
2934    #[test]
2935    fn test_parse_create_view_with_columns() {
2936        let sql = "CREATE VIEW test () AS SELECT * FROM NUMBERS";
2937        let result =
2938            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2939                .unwrap();
2940
2941        match &result[0] {
2942            Statement::CreateView(c) => {
2943                assert_eq!(c.to_string(), "CREATE VIEW test AS SELECT * FROM NUMBERS");
2944                assert!(!c.or_replace);
2945                assert!(!c.if_not_exists);
2946                assert_eq!("test", c.name.to_string());
2947            }
2948            _ => unreachable!(),
2949        }
2950        assert_eq!(
2951            "CREATE VIEW test AS SELECT * FROM NUMBERS",
2952            result[0].to_string()
2953        );
2954
2955        let sql = "CREATE VIEW test (n1) AS SELECT * FROM NUMBERS";
2956        let result =
2957            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2958                .unwrap();
2959
2960        match &result[0] {
2961            Statement::CreateView(c) => {
2962                assert_eq!(c.to_string(), sql);
2963                assert!(!c.or_replace);
2964                assert!(!c.if_not_exists);
2965                assert_eq!("test", c.name.to_string());
2966            }
2967            _ => unreachable!(),
2968        }
2969        assert_eq!(sql, result[0].to_string());
2970
2971        let sql = "CREATE VIEW test (n1, n2) AS SELECT * FROM NUMBERS";
2972        let result =
2973            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2974                .unwrap();
2975
2976        match &result[0] {
2977            Statement::CreateView(c) => {
2978                assert_eq!(c.to_string(), sql);
2979                assert!(!c.or_replace);
2980                assert!(!c.if_not_exists);
2981                assert_eq!("test", c.name.to_string());
2982            }
2983            _ => unreachable!(),
2984        }
2985        assert_eq!(sql, result[0].to_string());
2986
2987        // Some invalid syntax cases
2988        let sql = "CREATE VIEW test (n1 AS select * from demo";
2989        let result =
2990            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2991        assert!(result.is_err());
2992
2993        let sql = "CREATE VIEW test (n1, AS select * from demo";
2994        let result =
2995            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2996        assert!(result.is_err());
2997
2998        let sql = "CREATE VIEW test n1,n2) AS select * from demo";
2999        let result =
3000            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3001        assert!(result.is_err());
3002
3003        let sql = "CREATE VIEW test (1) AS select * from demo";
3004        let result =
3005            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3006        assert!(result.is_err());
3007
3008        // keyword
3009        let sql = "CREATE VIEW test (n1, select) AS select * from demo";
3010        let result =
3011            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3012        assert!(result.is_err());
3013    }
3014
3015    #[test]
3016    fn test_parse_column_extensions_vector() {
3017        // Test that vector options are parsed from data_type (no additional SQL needed)
3018        let sql = "";
3019        let dialect = GenericDialect {};
3020        let mut tokenizer = Tokenizer::new(&dialect, sql);
3021        let tokens = tokenizer.tokenize().unwrap();
3022        let mut parser = Parser::new(&dialect).with_tokens(tokens);
3023        let name = Ident::new("vec_col");
3024        let data_type =
3025            DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
3026        let mut extensions = ColumnExtensions::default();
3027
3028        let result =
3029            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
3030        assert!(result.is_ok());
3031        assert!(extensions.vector_options.is_some());
3032        let vector_options = extensions.vector_options.unwrap();
3033        assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some("128"));
3034    }
3035
3036    #[test]
3037    fn test_parse_column_extensions_vector_invalid() {
3038        // Test that vector with no dimension fails
3039        let sql = "";
3040        let dialect = GenericDialect {};
3041        let mut tokenizer = Tokenizer::new(&dialect, sql);
3042        let tokens = tokenizer.tokenize().unwrap();
3043        let mut parser = Parser::new(&dialect).with_tokens(tokens);
3044        let name = Ident::new("vec_col");
3045        let data_type = DataType::Custom(vec![Ident::new("VECTOR")].into(), vec![]);
3046        let mut extensions = ColumnExtensions::default();
3047
3048        let result =
3049            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
3050        assert!(result.is_err());
3051    }
3052
3053    #[test]
3054    fn test_parse_column_extensions_indices() {
3055        // Test skipping index
3056        {
3057            let sql = "SKIPPING INDEX";
3058            let dialect = GenericDialect {};
3059            let mut tokenizer = Tokenizer::new(&dialect, sql);
3060            let tokens = tokenizer.tokenize().unwrap();
3061            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3062            let name = Ident::new("col");
3063            let data_type = DataType::String(None);
3064            let mut extensions = ColumnExtensions::default();
3065            let result = ParserContext::parse_column_extensions(
3066                &mut parser,
3067                &name,
3068                &data_type,
3069                &mut extensions,
3070            );
3071            assert!(result.is_ok());
3072            assert!(extensions.skipping_index_options.is_some());
3073        }
3074
3075        // Test fulltext index with options
3076        {
3077            let sql = "FULLTEXT INDEX WITH (analyzer = 'English', case_sensitive = 'true')";
3078            let dialect = GenericDialect {};
3079            let mut tokenizer = Tokenizer::new(&dialect, sql);
3080            let tokens = tokenizer.tokenize().unwrap();
3081            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3082            let name = Ident::new("text_col");
3083            let data_type = DataType::String(None);
3084            let mut extensions = ColumnExtensions::default();
3085            let result = ParserContext::parse_column_extensions(
3086                &mut parser,
3087                &name,
3088                &data_type,
3089                &mut extensions,
3090            );
3091            assert!(result.unwrap());
3092            assert!(extensions.fulltext_index_options.is_some());
3093            let fulltext_options = extensions.fulltext_index_options.unwrap();
3094            assert_eq!(fulltext_options.get("analyzer"), Some("English"));
3095            assert_eq!(fulltext_options.get("case_sensitive"), Some("true"));
3096        }
3097
3098        // Test fulltext index with invalid type (should fail)
3099        {
3100            let sql = "FULLTEXT INDEX WITH (analyzer = 'English')";
3101            let dialect = GenericDialect {};
3102            let mut tokenizer = Tokenizer::new(&dialect, sql);
3103            let tokens = tokenizer.tokenize().unwrap();
3104            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3105            let name = Ident::new("num_col");
3106            let data_type = DataType::Int(None); // Non-string type
3107            let mut extensions = ColumnExtensions::default();
3108            let result = ParserContext::parse_column_extensions(
3109                &mut parser,
3110                &name,
3111                &data_type,
3112                &mut extensions,
3113            );
3114            assert!(result.is_err());
3115            assert!(
3116                result
3117                    .unwrap_err()
3118                    .to_string()
3119                    .contains("FULLTEXT index only supports string type")
3120            );
3121        }
3122
3123        // Test fulltext index with invalid option (won't fail, the parser doesn't check the option's content)
3124        {
3125            let sql = "FULLTEXT INDEX WITH (analyzer = 'Invalid', case_sensitive = 'true')";
3126            let dialect = GenericDialect {};
3127            let mut tokenizer = Tokenizer::new(&dialect, sql);
3128            let tokens = tokenizer.tokenize().unwrap();
3129            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3130            let name = Ident::new("text_col");
3131            let data_type = DataType::String(None);
3132            let mut extensions = ColumnExtensions::default();
3133            let result = ParserContext::parse_column_extensions(
3134                &mut parser,
3135                &name,
3136                &data_type,
3137                &mut extensions,
3138            );
3139            assert!(result.unwrap());
3140        }
3141
3142        // Test inverted index
3143        {
3144            let sql = "INVERTED INDEX";
3145            let dialect = GenericDialect {};
3146            let mut tokenizer = Tokenizer::new(&dialect, sql);
3147            let tokens = tokenizer.tokenize().unwrap();
3148            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3149            let name = Ident::new("col");
3150            let data_type = DataType::String(None);
3151            let mut extensions = ColumnExtensions::default();
3152            let result = ParserContext::parse_column_extensions(
3153                &mut parser,
3154                &name,
3155                &data_type,
3156                &mut extensions,
3157            );
3158            assert!(result.is_ok());
3159            assert!(extensions.inverted_index_options.is_some());
3160        }
3161
3162        // Test inverted index with options (should fail)
3163        {
3164            let sql = "INVERTED INDEX WITH (analyzer = 'English')";
3165            let dialect = GenericDialect {};
3166            let mut tokenizer = Tokenizer::new(&dialect, sql);
3167            let tokens = tokenizer.tokenize().unwrap();
3168            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3169            let name = Ident::new("col");
3170            let data_type = DataType::String(None);
3171            let mut extensions = ColumnExtensions::default();
3172            let result = ParserContext::parse_column_extensions(
3173                &mut parser,
3174                &name,
3175                &data_type,
3176                &mut extensions,
3177            );
3178            assert!(result.is_err());
3179            assert!(
3180                result
3181                    .unwrap_err()
3182                    .to_string()
3183                    .contains("INVERTED index doesn't support options")
3184            );
3185        }
3186
3187        // Test multiple indices
3188        {
3189            let sql = "SKIPPING INDEX FULLTEXT INDEX";
3190            let dialect = GenericDialect {};
3191            let mut tokenizer = Tokenizer::new(&dialect, sql);
3192            let tokens = tokenizer.tokenize().unwrap();
3193            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3194            let name = Ident::new("col");
3195            let data_type = DataType::String(None);
3196            let mut extensions = ColumnExtensions::default();
3197            let result = ParserContext::parse_column_extensions(
3198                &mut parser,
3199                &name,
3200                &data_type,
3201                &mut extensions,
3202            );
3203            assert!(result.unwrap());
3204            assert!(extensions.skipping_index_options.is_some());
3205            assert!(extensions.fulltext_index_options.is_some());
3206        }
3207    }
3208
3209    #[test]
3210    fn test_parse_interval_cast() {
3211        let s = "select '10s'::INTERVAL";
3212        let stmts =
3213            ParserContext::create_with_dialect(s, &GreptimeDbDialect {}, ParseOptions::default())
3214                .unwrap();
3215        assert_eq!("SELECT '10 seconds'::INTERVAL", &stmts[0].to_string());
3216    }
3217
3218    #[test]
3219    fn test_parse_create_table_vector_index_options() {
3220        // Test basic vector index
3221        let sql = r"
3222CREATE TABLE vectors (
3223    ts TIMESTAMP TIME INDEX,
3224    vec VECTOR(128) VECTOR INDEX,
3225)";
3226        let result =
3227            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3228                .unwrap();
3229
3230        if let Statement::CreateTable(c) = &result[0] {
3231            c.columns.iter().for_each(|col| {
3232                if col.name().value == "vec" {
3233                    assert!(
3234                        col.extensions
3235                            .vector_index_options
3236                            .as_ref()
3237                            .unwrap()
3238                            .is_empty()
3239                    );
3240                }
3241            });
3242        } else {
3243            panic!("should be create_table statement");
3244        }
3245
3246        // Test vector index with options
3247        let sql = r"
3248CREATE TABLE vectors (
3249    ts TIMESTAMP TIME INDEX,
3250    vec VECTOR(128) VECTOR INDEX WITH (metric='cosine', connectivity='32', expansion_add='256', expansion_search='128')
3251)";
3252        let result =
3253            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3254                .unwrap();
3255
3256        if let Statement::CreateTable(c) = &result[0] {
3257            c.columns.iter().for_each(|col| {
3258                if col.name().value == "vec" {
3259                    let options = col.extensions.vector_index_options.as_ref().unwrap();
3260                    assert_eq!(options.len(), 4);
3261                    assert_eq!(options.get("metric").unwrap(), "cosine");
3262                    assert_eq!(options.get("connectivity").unwrap(), "32");
3263                    assert_eq!(options.get("expansion_add").unwrap(), "256");
3264                    assert_eq!(options.get("expansion_search").unwrap(), "128");
3265                }
3266            });
3267        } else {
3268            panic!("should be create_table statement");
3269        }
3270    }
3271
3272    #[test]
3273    fn test_parse_create_table_vector_index_invalid_type() {
3274        // Test vector index on non-vector type (should fail)
3275        let sql = r"
3276CREATE TABLE vectors (
3277    ts TIMESTAMP TIME INDEX,
3278    col INT VECTOR INDEX,
3279)";
3280        let result =
3281            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3282        assert!(result.is_err());
3283        assert!(
3284            result
3285                .unwrap_err()
3286                .to_string()
3287                .contains("VECTOR INDEX only supports Vector type columns")
3288        );
3289    }
3290
3291    #[test]
3292    fn test_parse_create_table_vector_index_duplicate() {
3293        // Test duplicate vector index (should fail)
3294        let sql = r"
3295CREATE TABLE vectors (
3296    ts TIMESTAMP TIME INDEX,
3297    vec VECTOR(128) VECTOR INDEX VECTOR INDEX,
3298)";
3299        let result =
3300            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3301        assert!(result.is_err());
3302        assert!(
3303            result
3304                .unwrap_err()
3305                .to_string()
3306                .contains("duplicated VECTOR INDEX option")
3307        );
3308    }
3309
3310    #[test]
3311    fn test_parse_create_table_vector_index_invalid_option() {
3312        // Test invalid option key (should fail)
3313        let sql = r"
3314CREATE TABLE vectors (
3315    ts TIMESTAMP TIME INDEX,
3316    vec VECTOR(128) VECTOR INDEX WITH (metric='l2sq', invalid_option='foo')
3317)";
3318        let result =
3319            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3320        assert!(result.is_err());
3321        assert!(
3322            result
3323                .unwrap_err()
3324                .to_string()
3325                .contains("invalid VECTOR INDEX option")
3326        );
3327    }
3328
3329    #[test]
3330    fn test_parse_column_extensions_vector_index() {
3331        // Test vector index on vector type
3332        {
3333            let sql = "VECTOR INDEX WITH (metric = 'l2sq')";
3334            let dialect = GenericDialect {};
3335            let mut tokenizer = Tokenizer::new(&dialect, sql);
3336            let tokens = tokenizer.tokenize().unwrap();
3337            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3338            let name = Ident::new("vec_col");
3339            let data_type =
3340                DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
3341            // First, parse the vector type to set vector_options
3342            let mut extensions = ColumnExtensions {
3343                vector_options: Some(OptionMap::from([(
3344                    VECTOR_OPT_DIM.to_string(),
3345                    "128".to_string(),
3346                )])),
3347                ..Default::default()
3348            };
3349
3350            let result = ParserContext::parse_column_extensions(
3351                &mut parser,
3352                &name,
3353                &data_type,
3354                &mut extensions,
3355            );
3356            assert!(result.is_ok());
3357            assert!(extensions.vector_index_options.is_some());
3358            let vi_options = extensions.vector_index_options.unwrap();
3359            assert_eq!(vi_options.get("metric"), Some("l2sq"));
3360        }
3361
3362        // Test vector index on non-vector type (should fail)
3363        {
3364            let sql = "VECTOR INDEX";
3365            let dialect = GenericDialect {};
3366            let mut tokenizer = Tokenizer::new(&dialect, sql);
3367            let tokens = tokenizer.tokenize().unwrap();
3368            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3369            let name = Ident::new("num_col");
3370            let data_type = DataType::Int(None); // Non-vector type
3371            let mut extensions = ColumnExtensions::default();
3372            let result = ParserContext::parse_column_extensions(
3373                &mut parser,
3374                &name,
3375                &data_type,
3376                &mut extensions,
3377            );
3378            assert!(result.is_err());
3379            assert!(
3380                result
3381                    .unwrap_err()
3382                    .to_string()
3383                    .contains("VECTOR INDEX only supports Vector type columns")
3384            );
3385        }
3386    }
3387}