Skip to main content

sql/parsers/
create_parser.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod json;
16#[cfg(feature = "enterprise")]
17pub mod trigger;
18
19use std::collections::HashMap;
20
21use arrow_buffer::IntervalMonthDayNano;
22use common_catalog::consts::default_engine;
23use datafusion_common::ScalarValue;
24use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
25use datatypes::data_type::ConcreteDataType;
26use itertools::Itertools;
27use snafu::{OptionExt, ResultExt, ensure};
28use sqlparser::ast::{
29    ColumnOption, ColumnOptionDef, DataType, Expr, KeyOrIndexDisplay, NullsDistinctOption,
30    PrimaryKeyConstraint, UniqueConstraint,
31};
32use sqlparser::dialect::keywords::Keyword;
33use sqlparser::keywords::ALL_KEYWORDS;
34use sqlparser::parser::IsOptional::Mandatory;
35use sqlparser::parser::{Parser, ParserError};
36use sqlparser::tokenizer::{Token, TokenWithSpan, Word};
37use table::requests::validate_database_option;
38
39use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
40use crate::error::{
41    self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidFlowQuerySnafu,
42    InvalidIntervalSnafu, InvalidSqlSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result,
43    SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu,
44};
45use crate::parser::{FLOW, ParserContext};
46use crate::parsers::tql_parser;
47use crate::parsers::utils::{
48    self, parse_with_options, validate_column_fulltext_create_option,
49    validate_column_skipping_index_create_option, validate_column_vector_index_create_option,
50};
51use crate::statements::create::{
52    Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
53    CreateTableLike, CreateView, Partitions, SqlOrTql, TableConstraint, VECTOR_OPT_DIM,
54};
55use crate::statements::statement::Statement;
56use crate::statements::transform::type_alias::get_data_type_by_alias_name;
57use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
58use crate::util::{OptionValue, location_to_index, parse_option_string};
59
60pub const ENGINE: &str = "ENGINE";
61pub const MAXVALUE: &str = "MAXVALUE";
62pub const SINK: &str = "SINK";
63pub const EXPIRE: &str = "EXPIRE";
64pub const AFTER: &str = "AFTER";
65pub const INVERTED: &str = "INVERTED";
66pub const SKIPPING: &str = "SKIPPING";
67pub const VECTOR: &str = "VECTOR";
68
69pub type RawIntervalExpr = String;
70
71// Preserve raw CREATE FLOW option entries until operator-side validation.
72// Do not use `OptionMap::new()` here: it can drop non-string values for
73// redacted keys before the flow option allowlist rejects them.
74fn flow_option_map(options: HashMap<String, OptionValue>) -> OptionMap {
75    let mut flow_options = OptionMap::default();
76    for (key, value) in options {
77        flow_options.insert_options(&key, value);
78    }
79    flow_options
80}
81
82/// Parses create [table] statement
83impl<'a> ParserContext<'a> {
84    pub(crate) fn parse_create(&mut self) -> Result<Statement> {
85        match self.parser.peek_token().token {
86            Token::Word(w) => match w.keyword {
87                Keyword::TABLE => self.parse_create_table(),
88
89                Keyword::SCHEMA | Keyword::DATABASE => self.parse_create_database(),
90
91                Keyword::EXTERNAL => self.parse_create_external_table(),
92
93                Keyword::OR => {
94                    let _ = self.parser.next_token();
95                    self.parser
96                        .expect_keyword(Keyword::REPLACE)
97                        .context(SyntaxSnafu)?;
98                    match self.parser.next_token().token {
99                        Token::Word(w) => match w.keyword {
100                            Keyword::VIEW => self.parse_create_view(true),
101                            Keyword::NoKeyword => {
102                                let uppercase = w.value.to_uppercase();
103                                match uppercase.as_str() {
104                                    FLOW => self.parse_create_flow(true),
105                                    _ => self.unsupported(w.to_string()),
106                                }
107                            }
108                            _ => self.unsupported(w.to_string()),
109                        },
110                        _ => self.unsupported(w.to_string()),
111                    }
112                }
113
114                Keyword::VIEW => {
115                    let _ = self.parser.next_token();
116                    self.parse_create_view(false)
117                }
118
119                #[cfg(feature = "enterprise")]
120                Keyword::TRIGGER => {
121                    let _ = self.parser.next_token();
122                    self.parse_create_trigger()
123                }
124
125                Keyword::NoKeyword => {
126                    let _ = self.parser.next_token();
127                    let uppercase = w.value.to_uppercase();
128                    match uppercase.as_str() {
129                        FLOW => self.parse_create_flow(false),
130                        _ => self.unsupported(w.to_string()),
131                    }
132                }
133                _ => self.unsupported(w.to_string()),
134            },
135            unexpected => self.unsupported(unexpected.to_string()),
136        }
137    }
138
139    /// Parse `CREAVE VIEW` statement.
140    fn parse_create_view(&mut self, or_replace: bool) -> Result<Statement> {
141        let if_not_exists = self.parse_if_not_exist()?;
142        let view_name = self.intern_parse_table_name()?;
143
144        let columns = self.parse_view_columns()?;
145
146        self.parser
147            .expect_keyword(Keyword::AS)
148            .context(SyntaxSnafu)?;
149
150        let query = self.parse_query()?;
151
152        Ok(Statement::CreateView(CreateView {
153            name: view_name,
154            columns,
155            or_replace,
156            query: Box::new(query),
157            if_not_exists,
158        }))
159    }
160
161    fn parse_view_columns(&mut self) -> Result<Vec<Ident>> {
162        let mut columns = vec![];
163        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
164            return Ok(columns);
165        }
166
167        loop {
168            let name = self.parse_column_name().context(SyntaxSnafu)?;
169
170            columns.push(name);
171
172            let comma = self.parser.consume_token(&Token::Comma);
173            if self.parser.consume_token(&Token::RParen) {
174                // allow a trailing comma, even though it's not in standard
175                break;
176            } else if !comma {
177                return self.expected("',' or ')' after column name", self.parser.peek_token());
178            }
179        }
180
181        Ok(columns)
182    }
183
184    fn parse_create_external_table(&mut self) -> Result<Statement> {
185        let _ = self.parser.next_token();
186        self.parser
187            .expect_keyword(Keyword::TABLE)
188            .context(SyntaxSnafu)?;
189        let if_not_exists = self.parse_if_not_exist()?;
190        let table_name = self.intern_parse_table_name()?;
191        let (columns, constraints) = self.parse_columns()?;
192        if !columns.is_empty() {
193            validate_time_index(&columns, &constraints)?;
194        }
195
196        let engine = self.parse_table_engine(common_catalog::consts::FILE_ENGINE)?;
197        let options = self.parse_create_table_options()?;
198        Ok(Statement::CreateExternalTable(CreateExternalTable {
199            name: table_name,
200            columns,
201            constraints,
202            options,
203            if_not_exists,
204            engine,
205        }))
206    }
207
208    fn parse_create_database(&mut self) -> Result<Statement> {
209        let _ = self.parser.next_token();
210        let if_not_exists = self.parse_if_not_exist()?;
211        let database_name = self.parse_object_name().context(error::UnexpectedSnafu {
212            expected: "a database name",
213            actual: self.peek_token_as_string(),
214        })?;
215        let database_name = Self::canonicalize_object_name(database_name)?;
216
217        let options = self
218            .parser
219            .parse_options(Keyword::WITH)
220            .context(SyntaxSnafu)?
221            .into_iter()
222            .map(parse_option_string)
223            .collect::<Result<HashMap<String, OptionValue>>>()?;
224
225        for key in options.keys() {
226            ensure!(
227                validate_database_option(key),
228                InvalidDatabaseOptionSnafu { key: key.clone() }
229            );
230        }
231        if let Some(append_mode) = options.get("append_mode").and_then(|x| x.as_string())
232            && append_mode == "true"
233            && options.contains_key("merge_mode")
234        {
235            return InvalidDatabaseOptionSnafu {
236                key: "merge_mode".to_string(),
237            }
238            .fail();
239        }
240
241        Ok(Statement::CreateDatabase(CreateDatabase {
242            name: database_name,
243            if_not_exists,
244            options: OptionMap::new(options),
245        }))
246    }
247
248    fn parse_create_table(&mut self) -> Result<Statement> {
249        let _ = self.parser.next_token();
250
251        let if_not_exists = self.parse_if_not_exist()?;
252
253        let table_name = self.intern_parse_table_name()?;
254
255        if self.parser.parse_keyword(Keyword::LIKE) {
256            let source_name = self.intern_parse_table_name()?;
257
258            return Ok(Statement::CreateTableLike(CreateTableLike {
259                table_name,
260                source_name,
261            }));
262        }
263
264        let (columns, constraints) = self.parse_columns()?;
265        validate_time_index(&columns, &constraints)?;
266
267        let partitions = self.parse_partitions()?;
268        if let Some(partitions) = &partitions {
269            validate_partitions(&columns, partitions)?;
270        }
271
272        let engine = self.parse_table_engine(default_engine())?;
273        let options = self.parse_create_table_options()?;
274        let create_table = CreateTable {
275            if_not_exists,
276            name: table_name,
277            columns,
278            engine,
279            constraints,
280            options,
281            table_id: 0, // table id is assigned by catalog manager
282            partitions,
283        };
284
285        Ok(Statement::CreateTable(create_table))
286    }
287
288    /// "CREATE FLOW" clause
289    fn parse_create_flow(&mut self, or_replace: bool) -> Result<Statement> {
290        let if_not_exists = self.parse_if_not_exist()?;
291
292        let flow_name = self.intern_parse_table_name()?;
293
294        // make `SINK` case in-sensitive
295        if let Token::Word(word) = self.parser.peek_token().token
296            && word.value.eq_ignore_ascii_case(SINK)
297        {
298            self.parser.next_token();
299        } else {
300            Err(ParserError::ParserError(
301                "Expect `SINK` keyword".to_string(),
302            ))
303            .context(SyntaxSnafu)?
304        }
305        self.parser
306            .expect_keyword(Keyword::TO)
307            .context(SyntaxSnafu)?;
308
309        let output_table_name = self.intern_parse_table_name()?;
310
311        let expire_after = if let Token::Word(w1) = &self.parser.peek_token().token
312            && w1.value.eq_ignore_ascii_case(EXPIRE)
313        {
314            self.parser.next_token();
315            if let Token::Word(w2) = &self.parser.peek_token().token
316                && w2.value.eq_ignore_ascii_case(AFTER)
317            {
318                self.parser.next_token();
319                Some(self.parse_interval_no_month("EXPIRE AFTER")?)
320            } else {
321                None
322            }
323        } else {
324            None
325        };
326
327        let eval_interval = if self
328            .parser
329            .consume_tokens(&[Token::make_keyword("EVAL"), Token::make_keyword("INTERVAL")])
330        {
331            Some(self.parse_interval_no_month("EVAL INTERVAL")?)
332        } else {
333            None
334        };
335
336        let comment = if self.parser.parse_keyword(Keyword::COMMENT) {
337            match self.parser.next_token() {
338                TokenWithSpan {
339                    token: Token::SingleQuotedString(value, ..),
340                    ..
341                } => Some(value),
342                unexpected => {
343                    return self
344                        .parser
345                        .expected("string", unexpected)
346                        .context(SyntaxSnafu);
347                }
348            }
349        } else {
350            None
351        };
352
353        let flow_options = self
354            .parser
355            .parse_options(Keyword::WITH)
356            .context(SyntaxSnafu)?
357            .into_iter()
358            .map(parse_option_string)
359            .collect::<Result<HashMap<String, OptionValue>>>()?;
360
361        self.parser
362            .expect_keyword(Keyword::AS)
363            .context(SyntaxSnafu)?;
364
365        let query = Box::new(self.parse_flow_sql_or_tql(true)?);
366
367        Ok(Statement::CreateFlow(CreateFlow {
368            flow_name,
369            sink_table_name: output_table_name,
370            or_replace,
371            if_not_exists,
372            expire_after,
373            eval_interval,
374            comment,
375            flow_options: flow_option_map(flow_options),
376            query,
377        }))
378    }
379
380    fn parse_flow_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
381        let start_loc = self.parser.peek_token().span.start;
382        let start_index = location_to_index(self.sql, &start_loc);
383
384        let starts_with_with = matches!(
385            self.parser.peek_token().token,
386            Token::Word(w) if w.keyword == Keyword::WITH
387        );
388
389        // only accept sql or tql
390        let query = match self.parser.peek_token().token {
391            Token::Word(w) => match w.keyword {
392                Keyword::SELECT => self.parse_query(),
393                Keyword::WITH => self.parse_with_tql_with_now(require_now_expr),
394                Keyword::NoKeyword
395                    if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
396                {
397                    self.parse_tql(require_now_expr)
398                }
399
400                _ => self.unsupported(self.peek_token_as_string()),
401            },
402            _ => self.unsupported(self.peek_token_as_string()),
403        }?;
404
405        if starts_with_with {
406            let Statement::Query(query) = &query else {
407                return InvalidFlowQuerySnafu {
408                    reason: "Expect a query after WITH".to_string(),
409                }
410                .fail();
411            };
412
413            if utils::has_tql_cte(query) && !utils::is_simple_tql_cte_query(query) {
414                return InvalidFlowQuerySnafu {
415                    reason: "WITH is only supported for the simplest TQL CTE in CREATE FLOW"
416                        .to_string(),
417                }
418                .fail();
419            }
420        }
421
422        let end_token = self.parser.peek_token();
423
424        let raw_query = if end_token == Token::EOF {
425            &self.sql[start_index..]
426        } else {
427            let end_loc = end_token.span.end;
428            let end_index = location_to_index(self.sql, &end_loc);
429            &self.sql[start_index..end_index.min(self.sql.len())]
430        };
431        let raw_query = raw_query.trim_end_matches(";");
432
433        let query = SqlOrTql::try_from_statement(query, raw_query)?;
434        Ok(query)
435    }
436
437    /// Parse the interval expr to duration in seconds.
438    fn parse_interval_no_month(&mut self, context: &str) -> Result<i64> {
439        let interval = self.parse_interval_month_day_nano()?.0;
440        if interval.months != 0 {
441            return InvalidIntervalSnafu {
442                reason: format!("Interval with months is not allowed in {context}"),
443            }
444            .fail();
445        }
446        Ok(
447            interval.nanoseconds / 1_000_000_000
448                + interval.days as i64 * 60 * 60 * 24
449                + interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, // 1 month=365.25/12=30.44 days
450                                                                       // this is to keep the same as https://docs.rs/humantime/latest/humantime/fn.parse_duration.html
451                                                                       // which we use in database to parse i.e. ttl interval and many other intervals
452        )
453    }
454
455    /// Parse interval expr to [`IntervalMonthDayNano`].
456    fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> {
457        let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
458        let raw_interval_expr = interval_expr.to_string();
459        let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone(), false)?
460            .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
461            .ok()
462            .with_context(|| InvalidIntervalSnafu {
463                reason: format!("cannot cast {} to interval type", interval_expr),
464            })?;
465        if let ScalarValue::IntervalMonthDayNano(Some(interval)) = interval {
466            Ok((interval, raw_interval_expr))
467        } else {
468            unreachable!()
469        }
470    }
471
472    fn parse_if_not_exist(&mut self) -> Result<bool> {
473        match self.parser.peek_token().token {
474            Token::Word(w) if Keyword::IF != w.keyword => return Ok(false),
475            _ => {}
476        }
477
478        if self.parser.parse_keywords(&[Keyword::IF, Keyword::NOT]) {
479            return self
480                .parser
481                .expect_keyword(Keyword::EXISTS)
482                .map(|_| true)
483                .context(UnexpectedSnafu {
484                    expected: "EXISTS",
485                    actual: self.peek_token_as_string(),
486                });
487        }
488
489        if self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]) {
490            return UnsupportedSnafu { keyword: "EXISTS" }.fail();
491        }
492
493        Ok(false)
494    }
495
496    fn parse_create_table_options(&mut self) -> Result<OptionMap> {
497        parse_with_options(&mut self.parser)
498    }
499
500    /// "PARTITION ON COLUMNS (...)" clause
501    fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
502        if !self.parser.parse_keyword(Keyword::PARTITION) {
503            return Ok(None);
504        }
505
506        self.parse_partition_on_columns().map(Some)
507    }
508
509    /// Parses the "ON COLUMNS (...) (...)" part after "PARTITION".
510    pub(crate) fn parse_partition_on_columns(&mut self) -> Result<Partitions> {
511        self.parser
512            .expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
513            .context(error::UnexpectedSnafu {
514                expected: "ON, COLUMNS",
515                actual: self.peek_token_as_string(),
516            })?;
517
518        let raw_column_list = self
519            .parser
520            .parse_parenthesized_column_list(Mandatory, false)
521            .context(error::SyntaxSnafu)?;
522        let column_list = raw_column_list
523            .into_iter()
524            .map(Self::canonicalize_identifier)
525            .collect();
526
527        let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
528
529        Ok(Partitions { column_list, exprs })
530    }
531
532    fn parse_partition_entry(&mut self) -> Result<Expr> {
533        self.parser.parse_expr().context(error::SyntaxSnafu)
534    }
535
536    /// Parse a comma-separated list wrapped by "()", and of which all items accepted by `F`
537    fn parse_comma_separated<T, F>(&mut self, mut f: F) -> Result<Vec<T>>
538    where
539        F: FnMut(&mut ParserContext<'a>) -> Result<T>,
540    {
541        self.parser
542            .expect_token(&Token::LParen)
543            .context(error::UnexpectedSnafu {
544                expected: "(",
545                actual: self.peek_token_as_string(),
546            })?;
547
548        let mut values = vec![];
549        while self.parser.peek_token() != Token::RParen {
550            values.push(f(self)?);
551            if !self.parser.consume_token(&Token::Comma) {
552                break;
553            }
554        }
555
556        self.parser
557            .expect_token(&Token::RParen)
558            .context(error::UnexpectedSnafu {
559                expected: ")",
560                actual: self.peek_token_as_string(),
561            })?;
562
563        Ok(values)
564    }
565
566    /// Parse the columns and constraints.
567    fn parse_columns(&mut self) -> Result<(Vec<Column>, Vec<TableConstraint>)> {
568        let mut columns = vec![];
569        let mut constraints = vec![];
570        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
571            return Ok((columns, constraints));
572        }
573
574        loop {
575            if let Some(constraint) = self.parse_optional_table_constraint()? {
576                constraints.push(constraint);
577            } else if let Token::Word(_) = self.parser.peek_token().token {
578                self.parse_column(&mut columns, &mut constraints)?;
579            } else {
580                return self.expected(
581                    "column name or constraint definition",
582                    self.parser.peek_token(),
583                );
584            }
585            let comma = self.parser.consume_token(&Token::Comma);
586            if self.parser.consume_token(&Token::RParen) {
587                // allow a trailing comma, even though it's not in standard
588                break;
589            } else if !comma {
590                return self.expected(
591                    "',' or ')' after column definition",
592                    self.parser.peek_token(),
593                );
594            }
595        }
596
597        Ok((columns, constraints))
598    }
599
600    fn parse_column(
601        &mut self,
602        columns: &mut Vec<Column>,
603        constraints: &mut Vec<TableConstraint>,
604    ) -> Result<()> {
605        let mut column = self.parse_column_def()?;
606
607        let mut time_index_opt_idx = None;
608        for (index, opt) in column.options().iter().enumerate() {
609            if let ColumnOption::DialectSpecific(tokens) = &opt.option
610                && matches!(
611                    &tokens[..],
612                    [
613                        Token::Word(Word {
614                            keyword: Keyword::TIME,
615                            ..
616                        }),
617                        Token::Word(Word {
618                            keyword: Keyword::INDEX,
619                            ..
620                        })
621                    ]
622                )
623            {
624                ensure!(
625                    time_index_opt_idx.is_none(),
626                    InvalidColumnOptionSnafu {
627                        name: column.name().to_string(),
628                        msg: "duplicated time index",
629                    }
630                );
631                time_index_opt_idx = Some(index);
632
633                let constraint = TableConstraint::TimeIndex {
634                    column: Ident::new(column.name().value.clone()),
635                };
636                constraints.push(constraint);
637            }
638        }
639
640        if let Some(index) = time_index_opt_idx {
641            ensure!(
642                !column.options().contains(&ColumnOptionDef {
643                    option: ColumnOption::Null,
644                    name: None,
645                }),
646                InvalidColumnOptionSnafu {
647                    name: column.name().to_string(),
648                    msg: "time index column can't be null",
649                }
650            );
651
652            // The timestamp type may be an alias type, we have to retrieve the actual type.
653            let data_type = get_unalias_type(column.data_type());
654            ensure!(
655                matches!(data_type, DataType::Timestamp(_, _)),
656                InvalidColumnOptionSnafu {
657                    name: column.name().to_string(),
658                    msg: "time index column data type should be timestamp",
659                }
660            );
661
662            let not_null_opt = ColumnOptionDef {
663                option: ColumnOption::NotNull,
664                name: None,
665            };
666
667            if !column.options().contains(&not_null_opt) {
668                column.mut_options().push(not_null_opt);
669            }
670
671            let _ = column.mut_options().remove(index);
672        }
673
674        columns.push(column);
675
676        Ok(())
677    }
678
679    /// Parse the column name and check if it's valid.
680    fn parse_column_name(&mut self) -> std::result::Result<Ident, ParserError> {
681        let name = self.parser.parse_identifier()?;
682        if name.quote_style.is_none() &&
683        // "ALL_KEYWORDS" are sorted.
684            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()
685        {
686            return Err(ParserError::ParserError(format!(
687                "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
688                &name.value
689            )));
690        }
691
692        Ok(name)
693    }
694
695    pub fn parse_column_def(&mut self) -> Result<Column> {
696        let name = self.parse_column_name().context(SyntaxSnafu)?;
697        let parser = &mut self.parser;
698
699        ensure!(
700            !(name.quote_style.is_none() &&
701            // "ALL_KEYWORDS" are sorted.
702            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()),
703            InvalidSqlSnafu {
704                msg: format!(
705                    "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
706                    &name.value
707                ),
708            }
709        );
710
711        let mut extensions = ColumnExtensions::default();
712
713        let data_type =
714            if let Some((data_type, type_hints)) = json::parse_json2_type_and_hints(parser)? {
715                extensions.json_type_hints = type_hints;
716                data_type
717            } else {
718                parser.parse_data_type().context(SyntaxSnafu)?
719            };
720
721        let mut options = vec![];
722        loop {
723            if parser.parse_keyword(Keyword::CONSTRAINT) {
724                let name = Some(parser.parse_identifier().context(SyntaxSnafu)?);
725                if let Some(option) = Self::parse_optional_column_option(parser)? {
726                    options.push(ColumnOptionDef { name, option });
727                } else {
728                    return parser
729                        .expected(
730                            "constraint details after CONSTRAINT <name>",
731                            parser.peek_token(),
732                        )
733                        .context(SyntaxSnafu);
734                }
735            } else if let Some(option) = Self::parse_optional_column_option(parser)? {
736                options.push(ColumnOptionDef { name: None, option });
737            } else if !Self::parse_column_extensions(parser, &name, &data_type, &mut extensions)? {
738                break;
739            };
740        }
741
742        Ok(Column {
743            column_def: ColumnDef {
744                name: Self::canonicalize_identifier(name),
745                data_type,
746                options,
747            },
748            extensions,
749        })
750    }
751
752    fn parse_optional_column_option(parser: &mut Parser<'_>) -> Result<Option<ColumnOption>> {
753        if parser.parse_keywords(&[Keyword::CHARACTER, Keyword::SET]) {
754            Ok(Some(ColumnOption::CharacterSet(
755                parser.parse_object_name(false).context(SyntaxSnafu)?,
756            )))
757        } else if parser.parse_keywords(&[Keyword::NOT, Keyword::NULL]) {
758            Ok(Some(ColumnOption::NotNull))
759        } else if parser.parse_keywords(&[Keyword::COMMENT]) {
760            match parser.next_token() {
761                TokenWithSpan {
762                    token: Token::SingleQuotedString(value, ..),
763                    ..
764                } => Ok(Some(ColumnOption::Comment(value))),
765                unexpected => parser.expected("string", unexpected).context(SyntaxSnafu),
766            }
767        } else if parser.parse_keyword(Keyword::NULL) {
768            Ok(Some(ColumnOption::Null))
769        } else if parser.parse_keyword(Keyword::DEFAULT) {
770            Ok(Some(ColumnOption::Default(
771                parser.parse_expr().context(SyntaxSnafu)?,
772            )))
773        } else if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
774            Ok(Some(ColumnOption::PrimaryKey(PrimaryKeyConstraint {
775                name: None,
776                index_name: None,
777                index_type: None,
778                columns: vec![],
779                index_options: vec![],
780                characteristics: None,
781            })))
782        } else if parser.parse_keyword(Keyword::UNIQUE) {
783            Ok(Some(ColumnOption::Unique(UniqueConstraint {
784                name: None,
785                index_name: None,
786                index_type_display: KeyOrIndexDisplay::None,
787                index_type: None,
788                columns: vec![],
789                index_options: vec![],
790                characteristics: None,
791                nulls_distinct: NullsDistinctOption::None,
792            })))
793        } else if parser.parse_keywords(&[Keyword::TIME, Keyword::INDEX]) {
794            // Use a DialectSpecific option for time index
795            Ok(Some(ColumnOption::DialectSpecific(vec![
796                Token::Word(Word {
797                    value: "TIME".to_string(),
798                    quote_style: None,
799                    keyword: Keyword::TIME,
800                }),
801                Token::Word(Word {
802                    value: "INDEX".to_string(),
803                    quote_style: None,
804                    keyword: Keyword::INDEX,
805                }),
806            ])))
807        } else {
808            Ok(None)
809        }
810    }
811
812    /// Parse a column option extensions.
813    ///
814    /// This function will handle:
815    /// - Vector type
816    /// - Indexes
817    fn parse_column_extensions(
818        parser: &mut Parser<'_>,
819        column_name: &Ident,
820        column_type: &DataType,
821        column_extensions: &mut ColumnExtensions,
822    ) -> Result<bool> {
823        if let DataType::Custom(name, tokens) = column_type
824            && name.0.len() == 1
825            && &name.0[0].to_string_unquoted().to_uppercase() == "VECTOR"
826        {
827            ensure!(
828                tokens.len() == 1,
829                InvalidColumnOptionSnafu {
830                    name: column_name.to_string(),
831                    msg: "VECTOR type should have dimension",
832                }
833            );
834
835            let dimension =
836                tokens[0]
837                    .parse::<u32>()
838                    .ok()
839                    .with_context(|| InvalidColumnOptionSnafu {
840                        name: column_name.to_string(),
841                        msg: "dimension should be a positive integer",
842                    })?;
843
844            let options = OptionMap::from([(VECTOR_OPT_DIM.to_string(), dimension.to_string())]);
845            column_extensions.vector_options = Some(options);
846        }
847
848        // parse index options in column definition
849        let mut is_index_declared = false;
850
851        // skipping index
852        if let Token::Word(word) = parser.peek_token().token
853            && word.value.eq_ignore_ascii_case(SKIPPING)
854        {
855            parser.next_token();
856            // Consume `INDEX` keyword
857            ensure!(
858                parser.parse_keyword(Keyword::INDEX),
859                InvalidColumnOptionSnafu {
860                    name: column_name.to_string(),
861                    msg: "expect INDEX after SKIPPING keyword",
862                }
863            );
864            ensure!(
865                column_extensions.skipping_index_options.is_none(),
866                InvalidColumnOptionSnafu {
867                    name: column_name.to_string(),
868                    msg: "duplicated SKIPPING index option",
869                }
870            );
871
872            let options = parser
873                .parse_options(Keyword::WITH)
874                .context(error::SyntaxSnafu)?
875                .into_iter()
876                .map(parse_option_string)
877                .collect::<Result<Vec<_>>>()?;
878
879            for (key, _) in options.iter() {
880                ensure!(
881                    validate_column_skipping_index_create_option(key),
882                    InvalidColumnOptionSnafu {
883                        name: column_name.to_string(),
884                        msg: format!("invalid SKIPPING INDEX option: {key}"),
885                    }
886                );
887            }
888
889            let options = OptionMap::new(options);
890            column_extensions.skipping_index_options = Some(options);
891            is_index_declared |= true;
892        }
893
894        // fulltext index
895        if parser.parse_keyword(Keyword::FULLTEXT) {
896            // Consume `INDEX` keyword
897            ensure!(
898                parser.parse_keyword(Keyword::INDEX),
899                InvalidColumnOptionSnafu {
900                    name: column_name.to_string(),
901                    msg: "expect INDEX after FULLTEXT keyword",
902                }
903            );
904
905            ensure!(
906                column_extensions.fulltext_index_options.is_none(),
907                InvalidColumnOptionSnafu {
908                    name: column_name.to_string(),
909                    msg: "duplicated FULLTEXT INDEX option",
910                }
911            );
912
913            let column_type = get_unalias_type(column_type);
914            let data_type = sql_data_type_to_concrete_data_type(&column_type)?;
915            ensure!(
916                data_type == ConcreteDataType::string_datatype(),
917                InvalidColumnOptionSnafu {
918                    name: column_name.to_string(),
919                    msg: "FULLTEXT index only supports string type",
920                }
921            );
922
923            let options = parser
924                .parse_options(Keyword::WITH)
925                .context(error::SyntaxSnafu)?
926                .into_iter()
927                .map(parse_option_string)
928                .collect::<Result<Vec<_>>>()?;
929
930            for (key, _) in options.iter() {
931                ensure!(
932                    validate_column_fulltext_create_option(key),
933                    InvalidColumnOptionSnafu {
934                        name: column_name.to_string(),
935                        msg: format!("invalid FULLTEXT INDEX option: {key}"),
936                    }
937                );
938            }
939
940            let options = OptionMap::new(options);
941            column_extensions.fulltext_index_options = Some(options);
942            is_index_declared |= true;
943        }
944
945        // inverted index
946        if let Token::Word(word) = parser.peek_token().token
947            && word.value.eq_ignore_ascii_case(INVERTED)
948        {
949            parser.next_token();
950            // Consume `INDEX` keyword
951            ensure!(
952                parser.parse_keyword(Keyword::INDEX),
953                InvalidColumnOptionSnafu {
954                    name: column_name.to_string(),
955                    msg: "expect INDEX after INVERTED keyword",
956                }
957            );
958
959            ensure!(
960                column_extensions.inverted_index_options.is_none(),
961                InvalidColumnOptionSnafu {
962                    name: column_name.to_string(),
963                    msg: "duplicated INVERTED index option",
964                }
965            );
966
967            // inverted index doesn't have options, skipping `WITH`
968            // try cache `WITH` and throw error
969            let with_token = parser.peek_token();
970            ensure!(
971                with_token.token
972                    != Token::Word(Word {
973                        value: "WITH".to_string(),
974                        keyword: Keyword::WITH,
975                        quote_style: None,
976                    }),
977                InvalidColumnOptionSnafu {
978                    name: column_name.to_string(),
979                    msg: "INVERTED index doesn't support options",
980                }
981            );
982
983            column_extensions.inverted_index_options = Some(OptionMap::default());
984            is_index_declared |= true;
985        }
986
987        // vector index
988        if let Token::Word(word) = parser.peek_token().token
989            && word.value.eq_ignore_ascii_case(VECTOR)
990        {
991            parser.next_token();
992            // Consume `INDEX` keyword
993            ensure!(
994                parser.parse_keyword(Keyword::INDEX),
995                InvalidColumnOptionSnafu {
996                    name: column_name.to_string(),
997                    msg: "expect INDEX after VECTOR keyword",
998                }
999            );
1000
1001            ensure!(
1002                column_extensions.vector_index_options.is_none(),
1003                InvalidColumnOptionSnafu {
1004                    name: column_name.to_string(),
1005                    msg: "duplicated VECTOR INDEX option",
1006                }
1007            );
1008
1009            // Check that column is a vector type
1010            let column_type = get_unalias_type(column_type);
1011            let data_type = sql_data_type_to_concrete_data_type(&column_type)?;
1012            ensure!(
1013                matches!(data_type, ConcreteDataType::Vector(_)),
1014                InvalidColumnOptionSnafu {
1015                    name: column_name.to_string(),
1016                    msg: "VECTOR INDEX only supports Vector type columns",
1017                }
1018            );
1019
1020            let options = parser
1021                .parse_options(Keyword::WITH)
1022                .context(error::SyntaxSnafu)?
1023                .into_iter()
1024                .map(parse_option_string)
1025                .collect::<Result<Vec<_>>>()?;
1026
1027            for (key, _) in options.iter() {
1028                ensure!(
1029                    validate_column_vector_index_create_option(key),
1030                    InvalidColumnOptionSnafu {
1031                        name: column_name.to_string(),
1032                        msg: format!("invalid VECTOR INDEX option: {key}"),
1033                    }
1034                );
1035            }
1036
1037            let options = OptionMap::new(options);
1038            column_extensions.vector_index_options = Some(options);
1039            is_index_declared |= true;
1040        }
1041
1042        Ok(is_index_declared)
1043    }
1044
1045    fn parse_optional_table_constraint(&mut self) -> Result<Option<TableConstraint>> {
1046        match self.parser.next_token() {
1047            TokenWithSpan {
1048                token: Token::Word(w),
1049                ..
1050            } if w.keyword == Keyword::PRIMARY => {
1051                self.parser
1052                    .expect_keyword(Keyword::KEY)
1053                    .context(error::UnexpectedSnafu {
1054                        expected: "KEY",
1055                        actual: self.peek_token_as_string(),
1056                    })?;
1057                let raw_columns = self
1058                    .parser
1059                    .parse_parenthesized_column_list(Mandatory, false)
1060                    .context(error::SyntaxSnafu)?;
1061                let columns = raw_columns
1062                    .into_iter()
1063                    .map(Self::canonicalize_identifier)
1064                    .collect();
1065                Ok(Some(TableConstraint::PrimaryKey { columns }))
1066            }
1067            TokenWithSpan {
1068                token: Token::Word(w),
1069                ..
1070            } if w.keyword == Keyword::TIME => {
1071                self.parser
1072                    .expect_keyword(Keyword::INDEX)
1073                    .context(error::UnexpectedSnafu {
1074                        expected: "INDEX",
1075                        actual: self.peek_token_as_string(),
1076                    })?;
1077
1078                let raw_columns = self
1079                    .parser
1080                    .parse_parenthesized_column_list(Mandatory, false)
1081                    .context(error::SyntaxSnafu)?;
1082                let mut columns = raw_columns
1083                    .into_iter()
1084                    .map(Self::canonicalize_identifier)
1085                    .collect::<Vec<_>>();
1086
1087                ensure!(
1088                    columns.len() == 1,
1089                    InvalidTimeIndexSnafu {
1090                        msg: "it should contain only one column in time index",
1091                    }
1092                );
1093
1094                Ok(Some(TableConstraint::TimeIndex {
1095                    column: columns.pop().unwrap(),
1096                }))
1097            }
1098            _ => {
1099                self.parser.prev_token();
1100                Ok(None)
1101            }
1102        }
1103    }
1104
1105    /// Parses the set of valid formats
1106    fn parse_table_engine(&mut self, default: &str) -> Result<String> {
1107        if !self.consume_token(ENGINE) {
1108            return Ok(default.to_string());
1109        }
1110
1111        self.parser
1112            .expect_token(&Token::Eq)
1113            .context(error::UnexpectedSnafu {
1114                expected: "=",
1115                actual: self.peek_token_as_string(),
1116            })?;
1117
1118        let token = self.parser.next_token();
1119        if let Token::Word(w) = token.token {
1120            Ok(w.value)
1121        } else {
1122            self.expected("'Engine' is missing", token)
1123        }
1124    }
1125}
1126
1127fn validate_time_index(columns: &[Column], constraints: &[TableConstraint]) -> Result<()> {
1128    let time_index_constraints: Vec<_> = constraints
1129        .iter()
1130        .filter_map(|c| match c {
1131            TableConstraint::TimeIndex { column } => Some(column),
1132            _ => None,
1133        })
1134        .unique()
1135        .collect();
1136
1137    ensure!(!time_index_constraints.is_empty(), MissingTimeIndexSnafu);
1138    ensure!(
1139        time_index_constraints.len() == 1,
1140        InvalidTimeIndexSnafu {
1141            msg: format!(
1142                "expected only one time index constraint but actual {}",
1143                time_index_constraints.len()
1144            ),
1145        }
1146    );
1147
1148    // It's safe to use time_index_constraints[0][0],
1149    // we already check the bound above.
1150    let time_index_column_ident = &time_index_constraints[0];
1151    let time_index_column = columns
1152        .iter()
1153        .find(|c| c.name().value == *time_index_column_ident.value)
1154        .with_context(|| InvalidTimeIndexSnafu {
1155            msg: format!(
1156                "time index column {} not found in columns",
1157                time_index_column_ident
1158            ),
1159        })?;
1160
1161    let time_index_data_type = get_unalias_type(time_index_column.data_type());
1162    ensure!(
1163        matches!(time_index_data_type, DataType::Timestamp(_, _)),
1164        InvalidColumnOptionSnafu {
1165            name: time_index_column.name().to_string(),
1166            msg: "time index column data type should be timestamp",
1167        }
1168    );
1169
1170    Ok(())
1171}
1172
1173fn get_unalias_type(data_type: &DataType) -> DataType {
1174    match data_type {
1175        DataType::Custom(name, tokens) if name.0.len() == 1 && tokens.is_empty() => {
1176            if let Some(real_type) =
1177                get_data_type_by_alias_name(name.0[0].to_string_unquoted().as_str())
1178            {
1179                real_type
1180            } else {
1181                data_type.clone()
1182            }
1183        }
1184        _ => data_type.clone(),
1185    }
1186}
1187
1188fn validate_partitions(columns: &[Column], partitions: &Partitions) -> Result<()> {
1189    let partition_columns = ensure_partition_columns_defined(columns, partitions)?;
1190
1191    ensure_exprs_are_binary(&partitions.exprs, &partition_columns)?;
1192
1193    Ok(())
1194}
1195
1196/// Ensure all exprs are binary expr and all the columns are defined in the column list.
1197fn ensure_exprs_are_binary(exprs: &[Expr], columns: &[&Column]) -> Result<()> {
1198    for expr in exprs {
1199        // The first level must be binary expr
1200        if let Expr::BinaryOp { left, op: _, right } = expr {
1201            ensure_one_expr(left, columns)?;
1202            ensure_one_expr(right, columns)?;
1203        } else {
1204            return error::InvalidSqlSnafu {
1205                msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1206            }
1207            .fail();
1208        }
1209    }
1210    Ok(())
1211}
1212
1213/// Check if the expr is a binary expr, an ident or a literal value.
1214/// If is ident, then check it is in the column list.
1215/// This recursive function is intended to be used by [ensure_exprs_are_binary].
1216fn ensure_one_expr(expr: &Expr, columns: &[&Column]) -> Result<()> {
1217    match expr {
1218        Expr::BinaryOp { left, op: _, right } => {
1219            ensure_one_expr(left, columns)?;
1220            ensure_one_expr(right, columns)?;
1221            Ok(())
1222        }
1223        Expr::Identifier(ident) => {
1224            let column_name = &ident.value;
1225            ensure!(
1226                columns.iter().any(|c| &c.name().value == column_name),
1227                error::InvalidSqlSnafu {
1228                    msg: format!(
1229                        "Column {:?} in rule expr is not referenced in PARTITION ON",
1230                        column_name
1231                    ),
1232                }
1233            );
1234            Ok(())
1235        }
1236        Expr::Value(_) => Ok(()),
1237        Expr::UnaryOp { expr, .. } => {
1238            ensure_one_expr(expr, columns)?;
1239            Ok(())
1240        }
1241        _ => error::InvalidSqlSnafu {
1242            msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1243        }
1244        .fail(),
1245    }
1246}
1247
1248/// Ensure that all columns used in "PARTITION ON COLUMNS" are defined in create table.
1249fn ensure_partition_columns_defined<'a>(
1250    columns: &'a [Column],
1251    partitions: &'a Partitions,
1252) -> Result<Vec<&'a Column>> {
1253    partitions
1254        .column_list
1255        .iter()
1256        .map(|x| {
1257            let x = ParserContext::canonicalize_identifier(x.clone());
1258            // Normally the columns in "create table" won't be too many,
1259            // a linear search to find the target every time is fine.
1260            columns
1261                .iter()
1262                .find(|c| *c.name().value == x.value)
1263                .context(error::InvalidSqlSnafu {
1264                    msg: format!("Partition column {:?} not defined", x.value),
1265                })
1266        })
1267        .collect::<Result<Vec<&Column>>>()
1268}
1269
1270#[cfg(test)]
1271mod tests {
1272    use std::assert_matches;
1273    use std::collections::HashMap;
1274
1275    use common_catalog::consts::FILE_ENGINE;
1276    use common_error::ext::ErrorExt;
1277    use sqlparser::ast::ColumnOption::NotNull;
1278    use sqlparser::ast::{BinaryOperator, Expr, ObjectName, ObjectNamePart, Value};
1279    use sqlparser::dialect::GenericDialect;
1280    use sqlparser::tokenizer::Tokenizer;
1281
1282    use super::*;
1283    use crate::dialect::GreptimeDbDialect;
1284    use crate::parser::ParseOptions;
1285
1286    fn string_option_map(
1287        entries: impl IntoIterator<Item = (&'static str, &'static str)>,
1288    ) -> OptionMap {
1289        OptionMap::new(entries.into_iter().map(|(key, value)| {
1290            (
1291                key.to_string(),
1292                OptionValue::try_new(Expr::Value(
1293                    Value::SingleQuotedString(value.to_string()).into(),
1294                ))
1295                .unwrap(),
1296            )
1297        }))
1298    }
1299
1300    #[test]
1301    fn test_parse_create_table_like() {
1302        let sql = "CREATE TABLE t1 LIKE t2";
1303        let stmts =
1304            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1305                .unwrap();
1306
1307        assert_eq!(1, stmts.len());
1308        match &stmts[0] {
1309            Statement::CreateTableLike(c) => {
1310                assert_eq!(c.table_name.to_string(), "t1");
1311                assert_eq!(c.source_name.to_string(), "t2");
1312            }
1313            _ => unreachable!(),
1314        }
1315    }
1316
1317    #[test]
1318    fn test_validate_external_table_options() {
1319        let sql = "CREATE EXTERNAL TABLE city (
1320            host string,
1321            ts timestamp,
1322            cpu float64 default 0,
1323            memory float64,
1324            TIME INDEX (ts),
1325            PRIMARY KEY(ts, host)
1326        ) with(location='/var/data/city.csv',format='csv',foo='bar');";
1327
1328        let result =
1329            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1330        assert!(matches!(
1331            result,
1332            Err(error::Error::InvalidTableOption { .. })
1333        ));
1334    }
1335
1336    #[test]
1337    fn test_parse_create_external_table() {
1338        struct Test<'a> {
1339            sql: &'a str,
1340            expected_table_name: &'a str,
1341            expected_options: HashMap<&'a str, &'a str>,
1342            expected_engine: &'a str,
1343            expected_if_not_exist: bool,
1344        }
1345
1346        let tests = [
1347            Test {
1348                sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
1349                expected_table_name: "city",
1350                expected_options: HashMap::from([
1351                    ("location", "/var/data/city.csv"),
1352                    ("format", "csv"),
1353                ]),
1354                expected_engine: FILE_ENGINE,
1355                expected_if_not_exist: false,
1356            },
1357            Test {
1358                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');",
1359                expected_table_name: "city",
1360                expected_options: HashMap::from([
1361                    ("location", "/var/data/city.csv"),
1362                    ("format", "csv"),
1363                ]),
1364                expected_engine: "foo",
1365                expected_if_not_exist: true,
1366            },
1367            Test {
1368                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv','compaction.type'='bar');",
1369                expected_table_name: "city",
1370                expected_options: HashMap::from([
1371                    ("location", "/var/data/city.csv"),
1372                    ("format", "csv"),
1373                    ("compaction.type", "bar"),
1374                ]),
1375                expected_engine: "foo",
1376                expected_if_not_exist: true,
1377            },
1378        ];
1379
1380        for test in tests {
1381            let stmts = ParserContext::create_with_dialect(
1382                test.sql,
1383                &GreptimeDbDialect {},
1384                ParseOptions::default(),
1385            )
1386            .unwrap();
1387            assert_eq!(1, stmts.len());
1388            match &stmts[0] {
1389                Statement::CreateExternalTable(c) => {
1390                    assert_eq!(c.name.to_string(), test.expected_table_name.to_string());
1391                    assert_eq!(c.options.to_str_map(), test.expected_options);
1392                    assert_eq!(c.if_not_exists, test.expected_if_not_exist);
1393                    assert_eq!(c.engine, test.expected_engine);
1394                }
1395                _ => unreachable!(),
1396            }
1397        }
1398    }
1399
1400    #[test]
1401    fn test_parse_create_external_table_with_schema() {
1402        let sql = "CREATE EXTERNAL TABLE city (
1403            host string,
1404            ts timestamp,
1405            cpu float32 default 0,
1406            memory float64,
1407            TIME INDEX (ts),
1408            PRIMARY KEY(ts, host),
1409        ) with(location='/var/data/city.csv',format='csv');";
1410
1411        let options = HashMap::from([("location", "/var/data/city.csv"), ("format", "csv")]);
1412
1413        let stmts =
1414            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1415                .unwrap();
1416        assert_eq!(1, stmts.len());
1417        match &stmts[0] {
1418            Statement::CreateExternalTable(c) => {
1419                assert_eq!(c.name.to_string(), "city");
1420                assert_eq!(c.options.to_str_map(), options);
1421
1422                let columns = &c.columns;
1423                assert_column_def(&columns[0].column_def, "host", "STRING");
1424                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
1425                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
1426                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
1427
1428                let constraints = &c.constraints;
1429                assert_eq!(
1430                    &constraints[0],
1431                    &TableConstraint::TimeIndex {
1432                        column: Ident::new("ts"),
1433                    }
1434                );
1435                assert_eq!(
1436                    &constraints[1],
1437                    &TableConstraint::PrimaryKey {
1438                        columns: vec![Ident::new("ts"), Ident::new("host")]
1439                    }
1440                );
1441            }
1442            _ => unreachable!(),
1443        }
1444    }
1445
1446    #[test]
1447    fn test_parse_create_database() {
1448        let sql = "create database";
1449        let result =
1450            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1451        assert!(
1452            result
1453                .unwrap_err()
1454                .to_string()
1455                .contains("Unexpected token while parsing SQL statement")
1456        );
1457
1458        let sql = "create database prometheus";
1459        let stmts =
1460            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1461                .unwrap();
1462
1463        assert_eq!(1, stmts.len());
1464        match &stmts[0] {
1465            Statement::CreateDatabase(c) => {
1466                assert_eq!(c.name.to_string(), "prometheus");
1467                assert!(!c.if_not_exists);
1468            }
1469            _ => unreachable!(),
1470        }
1471
1472        let sql = "create database if not exists prometheus";
1473        let stmts =
1474            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1475                .unwrap();
1476
1477        assert_eq!(1, stmts.len());
1478        match &stmts[0] {
1479            Statement::CreateDatabase(c) => {
1480                assert_eq!(c.name.to_string(), "prometheus");
1481                assert!(c.if_not_exists);
1482            }
1483            _ => unreachable!(),
1484        }
1485
1486        let sql = "CREATE DATABASE `fOo`";
1487        let result =
1488            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1489        let stmts = result.unwrap();
1490        match &stmts.last().unwrap() {
1491            Statement::CreateDatabase(c) => {
1492                assert_eq!(c.name, vec![Ident::with_quote('`', "fOo")].into());
1493                assert!(!c.if_not_exists);
1494            }
1495            _ => unreachable!(),
1496        }
1497
1498        let sql = "CREATE DATABASE prometheus with (ttl='1h');";
1499        let result =
1500            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1501        let stmts = result.unwrap();
1502        match &stmts[0] {
1503            Statement::CreateDatabase(c) => {
1504                assert_eq!(c.name.to_string(), "prometheus");
1505                assert!(!c.if_not_exists);
1506                assert_eq!(c.options.get("ttl").unwrap(), "1h");
1507            }
1508            _ => unreachable!(),
1509        }
1510    }
1511
1512    #[test]
1513    fn test_parse_create_flow_more_testcases() {
1514        use pretty_assertions::assert_eq;
1515        fn parse_create_flow(sql: &str) -> CreateFlow {
1516            let stmts = ParserContext::create_with_dialect(
1517                sql,
1518                &GreptimeDbDialect {},
1519                ParseOptions::default(),
1520            )
1521            .unwrap();
1522            assert_eq!(1, stmts.len());
1523            match &stmts[0] {
1524                Statement::CreateFlow(c) => c.clone(),
1525                _ => unreachable!(),
1526            }
1527        }
1528        struct CreateFlowWoutQuery {
1529            /// Flow name
1530            pub flow_name: ObjectName,
1531            /// Output (sink) table name
1532            pub sink_table_name: ObjectName,
1533            /// Whether to replace existing task
1534            pub or_replace: bool,
1535            /// Create if not exist
1536            pub if_not_exists: bool,
1537            /// `EXPIRE AFTER`
1538            /// Duration in second as `i64`
1539            pub expire_after: Option<i64>,
1540            /// Comment string
1541            pub comment: Option<String>,
1542            /// Flow creation options
1543            pub flow_options: OptionMap,
1544        }
1545        let testcases = vec![
1546            (
1547                r"
1548CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1549SINK TO schema_1.table_1
1550EXPIRE AFTER INTERVAL '5 minutes'
1551COMMENT 'test comment'
1552AS
1553SELECT max(c1), min(c2) FROM schema_2.table_2;",
1554                CreateFlowWoutQuery {
1555                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1556                    sink_table_name: ObjectName::from(vec![
1557                        Ident::new("schema_1"),
1558                        Ident::new("table_1"),
1559                    ]),
1560                    or_replace: true,
1561                    if_not_exists: true,
1562                    expire_after: Some(300),
1563                    comment: Some("test comment".to_string()),
1564                    flow_options: OptionMap::default(),
1565                },
1566            ),
1567            (
1568                r"
1569CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1570SINK TO schema_1.table_1
1571EXPIRE AFTER INTERVAL '300 s'
1572COMMENT 'test comment'
1573AS
1574SELECT max(c1), min(c2) FROM schema_2.table_2;",
1575                CreateFlowWoutQuery {
1576                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1577                    sink_table_name: ObjectName::from(vec![
1578                        Ident::new("schema_1"),
1579                        Ident::new("table_1"),
1580                    ]),
1581                    or_replace: true,
1582                    if_not_exists: true,
1583                    expire_after: Some(300),
1584                    comment: Some("test comment".to_string()),
1585                    flow_options: OptionMap::default(),
1586                },
1587            ),
1588            (
1589                r"
1590CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1591SINK TO schema_1.table_1
1592EXPIRE AFTER '5 minutes'
1593COMMENT 'test comment'
1594AS
1595SELECT max(c1), min(c2) FROM schema_2.table_2;",
1596                CreateFlowWoutQuery {
1597                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1598                    sink_table_name: ObjectName::from(vec![
1599                        Ident::new("schema_1"),
1600                        Ident::new("table_1"),
1601                    ]),
1602                    or_replace: true,
1603                    if_not_exists: true,
1604                    expire_after: Some(300),
1605                    comment: Some("test comment".to_string()),
1606                    flow_options: OptionMap::default(),
1607                },
1608            ),
1609            (
1610                r"
1611CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1612SINK TO schema_1.table_1
1613EXPIRE AFTER '300 s'
1614COMMENT 'test comment'
1615AS
1616SELECT max(c1), min(c2) FROM schema_2.table_2;",
1617                CreateFlowWoutQuery {
1618                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1619                    sink_table_name: ObjectName::from(vec![
1620                        Ident::new("schema_1"),
1621                        Ident::new("table_1"),
1622                    ]),
1623                    or_replace: true,
1624                    if_not_exists: true,
1625                    expire_after: Some(300),
1626                    comment: Some("test comment".to_string()),
1627                    flow_options: OptionMap::default(),
1628                },
1629            ),
1630            (
1631                r"
1632CREATE FLOW `task_2`
1633SINK TO schema_1.table_1
1634EXPIRE AFTER '2 days 1h 2 min'
1635AS
1636SELECT max(c1), min(c2) FROM schema_2.table_2;",
1637                CreateFlowWoutQuery {
1638                    flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_2")]),
1639                    sink_table_name: ObjectName::from(vec![
1640                        Ident::new("schema_1"),
1641                        Ident::new("table_1"),
1642                    ]),
1643                    or_replace: false,
1644                    if_not_exists: false,
1645                    expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1646                    comment: None,
1647                    flow_options: OptionMap::default(),
1648                },
1649            ),
1650            (
1651                r"
1652create flow `task_3`
1653sink to schema_1.table_1
1654expire after '10 minutes'
1655as
1656select max(c1), min(c2) from schema_2.table_2;",
1657                CreateFlowWoutQuery {
1658                    flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_3")]),
1659                    sink_table_name: ObjectName::from(vec![
1660                        Ident::new("schema_1"),
1661                        Ident::new("table_1"),
1662                    ]),
1663                    or_replace: false,
1664                    if_not_exists: false,
1665                    expire_after: Some(600), // 10 minutes in seconds
1666                    comment: None,
1667                    flow_options: OptionMap::default(),
1668                },
1669            ),
1670            (
1671                r"
1672create or replace flow if not exists task_4
1673sink to schema_1.table_1
1674expire after interval '2 hours'
1675comment 'lowercase test'
1676as
1677select max(c1), min(c2) from schema_2.table_2;",
1678                CreateFlowWoutQuery {
1679                    flow_name: ObjectName::from(vec![Ident::new("task_4")]),
1680                    sink_table_name: ObjectName::from(vec![
1681                        Ident::new("schema_1"),
1682                        Ident::new("table_1"),
1683                    ]),
1684                    or_replace: true,
1685                    if_not_exists: true,
1686                    expire_after: Some(7200), // 2 hours in seconds
1687                    comment: Some("lowercase test".to_string()),
1688                    flow_options: OptionMap::default(),
1689                },
1690            ),
1691            (
1692                r"
1693CREATE FLOW task_5
1694SINK TO schema_1.table_1
1695WITH (defer_on_missing_source = 'true')
1696AS
1697SELECT max(c1), min(c2) FROM schema_2.table_2;",
1698                CreateFlowWoutQuery {
1699                    flow_name: ObjectName::from(vec![Ident::new("task_5")]),
1700                    sink_table_name: ObjectName::from(vec![
1701                        Ident::new("schema_1"),
1702                        Ident::new("table_1"),
1703                    ]),
1704                    or_replace: false,
1705                    if_not_exists: false,
1706                    expire_after: None,
1707                    comment: None,
1708                    flow_options: string_option_map([("defer_on_missing_source", "true")]),
1709                },
1710            ),
1711        ];
1712
1713        for (sql, expected) in testcases {
1714            let create_task = parse_create_flow(sql);
1715
1716            let expected = CreateFlow {
1717                flow_name: expected.flow_name,
1718                sink_table_name: expected.sink_table_name,
1719                or_replace: expected.or_replace,
1720                if_not_exists: expected.if_not_exists,
1721                expire_after: expected.expire_after,
1722                eval_interval: None,
1723                comment: expected.comment,
1724                flow_options: expected.flow_options,
1725                // ignore query parse result
1726                query: create_task.query.clone(),
1727            };
1728
1729            assert_eq!(create_task, expected, "input sql is:\n{sql}");
1730            let show_create = create_task.to_string();
1731            let recreated = parse_create_flow(&show_create);
1732            assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1733        }
1734    }
1735
1736    #[test]
1737    fn test_parse_create_flow() {
1738        use pretty_assertions::assert_eq;
1739        fn parse_create_flow(sql: &str) -> CreateFlow {
1740            let stmts = ParserContext::create_with_dialect(
1741                sql,
1742                &GreptimeDbDialect {},
1743                ParseOptions::default(),
1744            )
1745            .unwrap();
1746            assert_eq!(1, stmts.len());
1747            match &stmts[0] {
1748                Statement::CreateFlow(c) => c.clone(),
1749                _ => panic!("{:?}", stmts[0]),
1750            }
1751        }
1752        struct CreateFlowWoutQuery {
1753            /// Flow name
1754            pub flow_name: ObjectName,
1755            /// Output (sink) table name
1756            pub sink_table_name: ObjectName,
1757            /// Whether to replace existing task
1758            pub or_replace: bool,
1759            /// Create if not exist
1760            pub if_not_exists: bool,
1761            /// `EXPIRE AFTER`
1762            /// Duration in second as `i64`
1763            pub expire_after: Option<i64>,
1764            /// Duration for flow evaluation interval
1765            /// Duration in seconds as `i64`
1766            /// If not set, flow will be evaluated based on time window size and other args.
1767            pub eval_interval: Option<i64>,
1768            /// Comment string
1769            pub comment: Option<String>,
1770            /// Flow creation options
1771            pub flow_options: OptionMap,
1772        }
1773
1774        // create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE AFTER` and `COMMENT`
1775        let testcases = vec![
1776            (
1777                r"
1778CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1779SINK TO schema_1.table_1
1780EXPIRE AFTER INTERVAL '5 minutes'
1781COMMENT 'test comment'
1782AS
1783SELECT max(c1), min(c2) FROM schema_2.table_2;",
1784                CreateFlowWoutQuery {
1785                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1786                    sink_table_name: ObjectName(vec![
1787                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1788                        ObjectNamePart::Identifier(Ident::new("table_1")),
1789                    ]),
1790                    or_replace: true,
1791                    if_not_exists: true,
1792                    expire_after: Some(300),
1793                    eval_interval: None,
1794                    comment: Some("test comment".to_string()),
1795                    flow_options: OptionMap::default(),
1796                },
1797            ),
1798            (
1799                r"
1800CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1801SINK TO schema_1.table_1
1802EXPIRE AFTER INTERVAL '300 s'
1803COMMENT 'test comment'
1804AS
1805SELECT max(c1), min(c2) FROM schema_2.table_2;",
1806                CreateFlowWoutQuery {
1807                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1808                    sink_table_name: ObjectName(vec![
1809                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1810                        ObjectNamePart::Identifier(Ident::new("table_1")),
1811                    ]),
1812                    or_replace: true,
1813                    if_not_exists: true,
1814                    expire_after: Some(300),
1815                    eval_interval: None,
1816                    comment: Some("test comment".to_string()),
1817                    flow_options: OptionMap::default(),
1818                },
1819            ),
1820            (
1821                r"
1822CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1823SINK TO schema_1.table_1
1824EXPIRE AFTER '5 minutes'
1825EVAL INTERVAL '10 seconds'
1826COMMENT 'test comment'
1827AS
1828SELECT max(c1), min(c2) FROM schema_2.table_2;",
1829                CreateFlowWoutQuery {
1830                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1831                    sink_table_name: ObjectName(vec![
1832                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1833                        ObjectNamePart::Identifier(Ident::new("table_1")),
1834                    ]),
1835                    or_replace: true,
1836                    if_not_exists: true,
1837                    expire_after: Some(300),
1838                    eval_interval: Some(10),
1839                    comment: Some("test comment".to_string()),
1840                    flow_options: OptionMap::default(),
1841                },
1842            ),
1843            (
1844                r"
1845CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1846SINK TO schema_1.table_1
1847EXPIRE AFTER '5 minutes'
1848EVAL INTERVAL INTERVAL '10 seconds'
1849COMMENT 'test comment'
1850AS
1851SELECT max(c1), min(c2) FROM schema_2.table_2;",
1852                CreateFlowWoutQuery {
1853                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1854                    sink_table_name: ObjectName(vec![
1855                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1856                        ObjectNamePart::Identifier(Ident::new("table_1")),
1857                    ]),
1858                    or_replace: true,
1859                    if_not_exists: true,
1860                    expire_after: Some(300),
1861                    eval_interval: Some(10),
1862                    comment: Some("test comment".to_string()),
1863                    flow_options: OptionMap::default(),
1864                },
1865            ),
1866            (
1867                r"
1868CREATE FLOW `task_2`
1869SINK TO schema_1.table_1
1870EXPIRE AFTER '2 days 1h 2 min'
1871AS
1872SELECT max(c1), min(c2) FROM schema_2.table_2;",
1873                CreateFlowWoutQuery {
1874                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::with_quote(
1875                        '`', "task_2",
1876                    ))]),
1877                    sink_table_name: ObjectName(vec![
1878                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1879                        ObjectNamePart::Identifier(Ident::new("table_1")),
1880                    ]),
1881                    or_replace: false,
1882                    if_not_exists: false,
1883                    expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1884                    eval_interval: None,
1885                    comment: None,
1886                    flow_options: OptionMap::default(),
1887                },
1888            ),
1889            (
1890                r"
1891CREATE FLOW task_3
1892SINK TO schema_1.table_1
1893EVAL INTERVAL '10 seconds'
1894WITH (defer_on_missing_source = 'true', foo = 'bar')
1895AS
1896SELECT max(c1), min(c2) FROM schema_2.table_2;",
1897                CreateFlowWoutQuery {
1898                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_3"))]),
1899                    sink_table_name: ObjectName(vec![
1900                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1901                        ObjectNamePart::Identifier(Ident::new("table_1")),
1902                    ]),
1903                    or_replace: false,
1904                    if_not_exists: false,
1905                    expire_after: None,
1906                    eval_interval: Some(10),
1907                    comment: None,
1908                    flow_options: string_option_map([
1909                        ("defer_on_missing_source", "true"),
1910                        ("foo", "bar"),
1911                    ]),
1912                },
1913            ),
1914        ];
1915
1916        for (sql, expected) in testcases {
1917            let create_task = parse_create_flow(sql);
1918
1919            let expected = CreateFlow {
1920                flow_name: expected.flow_name,
1921                sink_table_name: expected.sink_table_name,
1922                or_replace: expected.or_replace,
1923                if_not_exists: expected.if_not_exists,
1924                expire_after: expected.expire_after,
1925                eval_interval: expected.eval_interval,
1926                comment: expected.comment,
1927                flow_options: expected.flow_options,
1928                // ignore query parse result
1929                query: create_task.query.clone(),
1930            };
1931
1932            assert_eq!(create_task, expected, "input sql is:\n{sql}");
1933            let show_create = create_task.to_string();
1934            let recreated = parse_create_flow(&show_create);
1935            assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1936        }
1937    }
1938
1939    #[test]
1940    fn test_parse_create_flow_with_tql_cte_query() {
1941        let sql = r#"
1942CREATE FLOW calc_reqs_cte
1943SINK TO cnt_reqs_cte
1944EVAL INTERVAL '1m'
1945AS
1946WITH tql(the_timestamp, the_value) AS (
1947    TQL EVAL (now() - '1m'::interval, now(), '5s') metric
1948)
1949SELECT * FROM tql;
1950"#;
1951
1952        let stmts =
1953            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1954                .unwrap();
1955        assert_eq!(1, stmts.len());
1956        let Statement::CreateFlow(create_flow) = &stmts[0] else {
1957            panic!("unexpected stmt: {:?}", stmts[0]);
1958        };
1959
1960        let query = create_flow.query.to_string();
1961        assert!(query.to_uppercase().contains("WITH"));
1962        assert!(query.to_uppercase().contains("TQL EVAL"));
1963    }
1964
1965    #[test]
1966    fn test_parse_create_flow_with_sql_cte_is_supported() {
1967        let sql = r#"
1968CREATE FLOW f
1969SINK TO s
1970AS
1971WITH cte AS (SELECT 1) SELECT * FROM cte;
1972"#;
1973
1974        let stmts =
1975            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1976                .unwrap();
1977        assert_eq!(1, stmts.len());
1978        let Statement::CreateFlow(create_flow) = &stmts[0] else {
1979            panic!("unexpected stmt: {:?}", stmts[0]);
1980        };
1981        assert_eq!(
1982            "WITH cte AS (SELECT 1) SELECT * FROM cte",
1983            create_flow.query.to_string()
1984        );
1985    }
1986
1987    #[test]
1988    fn test_parse_create_flow_with_tql_cte_requires_now_expr() {
1989        let sql = r#"
1990CREATE FLOW f
1991SINK TO s
1992EVAL INTERVAL '1m'
1993AS
1994WITH tql(ts, val) AS (
1995    TQL EVAL (0, 15, '5s') metric
1996)
1997SELECT * FROM tql;
1998"#;
1999
2000        let err =
2001            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2002                .unwrap_err();
2003
2004        let msg = format!("{err:?}");
2005        assert!(
2006            msg.contains("Expected expression containing `now()`"),
2007            "unexpected err: {msg}"
2008        );
2009    }
2010
2011    #[test]
2012    fn test_parse_create_flow_with_tql_cte_non_select_star_is_unsupported() {
2013        let sql = r#"
2014CREATE FLOW f
2015SINK TO s
2016AS
2017WITH tql(ts, val) AS (
2018    TQL EVAL (now() - '1m'::interval, now(), '5s') metric
2019)
2020SELECT ts FROM tql;
2021"#;
2022
2023        let err =
2024            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2025                .unwrap_err();
2026        assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
2027    }
2028
2029    #[test]
2030    fn test_parse_create_flow_with_tql_cte_filter_is_unsupported() {
2031        let sql = r#"
2032CREATE FLOW f
2033SINK TO s
2034AS
2035WITH tql(ts, val) AS (
2036    TQL EVAL (now() - '1m'::interval, now(), '5s') metric
2037)
2038SELECT * FROM tql WHERE ts > 0;
2039"#;
2040
2041        let err =
2042            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2043                .unwrap_err();
2044        assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
2045    }
2046
2047    #[test]
2048    fn test_parse_create_flow_with_mixed_sql_tql_cte_is_unsupported() {
2049        let sql = r#"
2050CREATE FLOW f
2051SINK TO s
2052AS
2053WITH s1 AS (SELECT 1),
2054     tql(ts, val) AS (TQL EVAL (now() - '1m'::interval, now(), '5s') metric)
2055SELECT * FROM tql;
2056"#;
2057
2058        let err =
2059            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2060                .unwrap_err();
2061        assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
2062    }
2063
2064    #[test]
2065    fn test_create_flow_no_month() {
2066        let sql = r"
2067CREATE FLOW `task_2`
2068SINK TO schema_1.table_1
2069EXPIRE AFTER '1 month 2 days 1h 2 min'
2070AS
2071SELECT max(c1), min(c2) FROM schema_2.table_2;";
2072        let stmts =
2073            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2074
2075        assert!(
2076            stmts.is_err()
2077                && stmts
2078                    .unwrap_err()
2079                    .to_string()
2080                    .contains("Interval with months is not allowed")
2081        );
2082    }
2083
2084    #[test]
2085    fn test_validate_create() {
2086        let sql = r"
2087CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp TIME INDEX)
2088PARTITION ON COLUMNS(c, a) (
2089    a < 10,
2090    a > 10 AND a < 20,
2091    a > 20 AND c < 100,
2092    a > 20 AND c >= 100
2093)
2094ENGINE=mito";
2095        let result =
2096            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2097        let _ = result.unwrap();
2098
2099        let sql = r"
2100CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2101PARTITION ON COLUMNS(x) ()
2102ENGINE=mito";
2103        let result =
2104            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2105        assert!(
2106            result
2107                .unwrap_err()
2108                .to_string()
2109                .contains("Partition column \"x\" not defined")
2110        );
2111    }
2112
2113    #[test]
2114    fn test_parse_create_table_with_partitions() {
2115        let sql = r"
2116CREATE TABLE monitor (
2117  host_id    INT,
2118  idc        STRING,
2119  ts         TIMESTAMP,
2120  cpu        DOUBLE DEFAULT 0,
2121  memory     DOUBLE,
2122  TIME INDEX (ts),
2123  PRIMARY KEY (host),
2124)
2125PARTITION ON COLUMNS(idc, host_id) (
2126  idc <= 'hz' AND host_id < 1000,
2127  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
2128  idc > 'sh' AND host_id < 3000,
2129  idc > 'sh' AND host_id >= 3000,
2130)
2131ENGINE=mito";
2132        let result =
2133            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2134                .unwrap();
2135        assert_eq!(result.len(), 1);
2136        match &result[0] {
2137            Statement::CreateTable(c) => {
2138                assert!(c.partitions.is_some());
2139
2140                let partitions = c.partitions.as_ref().unwrap();
2141                let column_list = partitions
2142                    .column_list
2143                    .iter()
2144                    .map(|x| &x.value)
2145                    .collect::<Vec<&String>>();
2146                assert_eq!(column_list, vec!["idc", "host_id"]);
2147
2148                let exprs = &partitions.exprs;
2149
2150                assert_eq!(
2151                    exprs[0],
2152                    Expr::BinaryOp {
2153                        left: Box::new(Expr::BinaryOp {
2154                            left: Box::new(Expr::Identifier("idc".into())),
2155                            op: BinaryOperator::LtEq,
2156                            right: Box::new(Expr::Value(
2157                                Value::SingleQuotedString("hz".to_string()).into()
2158                            ))
2159                        }),
2160                        op: BinaryOperator::And,
2161                        right: Box::new(Expr::BinaryOp {
2162                            left: Box::new(Expr::Identifier("host_id".into())),
2163                            op: BinaryOperator::Lt,
2164                            right: Box::new(Expr::Value(
2165                                Value::Number("1000".to_string(), false).into()
2166                            ))
2167                        })
2168                    }
2169                );
2170                assert_eq!(
2171                    exprs[1],
2172                    Expr::BinaryOp {
2173                        left: Box::new(Expr::BinaryOp {
2174                            left: Box::new(Expr::BinaryOp {
2175                                left: Box::new(Expr::Identifier("idc".into())),
2176                                op: BinaryOperator::Gt,
2177                                right: Box::new(Expr::Value(
2178                                    Value::SingleQuotedString("hz".to_string()).into()
2179                                ))
2180                            }),
2181                            op: BinaryOperator::And,
2182                            right: Box::new(Expr::BinaryOp {
2183                                left: Box::new(Expr::Identifier("idc".into())),
2184                                op: BinaryOperator::LtEq,
2185                                right: Box::new(Expr::Value(
2186                                    Value::SingleQuotedString("sh".to_string()).into()
2187                                ))
2188                            })
2189                        }),
2190                        op: BinaryOperator::And,
2191                        right: Box::new(Expr::BinaryOp {
2192                            left: Box::new(Expr::Identifier("host_id".into())),
2193                            op: BinaryOperator::Lt,
2194                            right: Box::new(Expr::Value(
2195                                Value::Number("2000".to_string(), false).into()
2196                            ))
2197                        })
2198                    }
2199                );
2200                assert_eq!(
2201                    exprs[2],
2202                    Expr::BinaryOp {
2203                        left: Box::new(Expr::BinaryOp {
2204                            left: Box::new(Expr::Identifier("idc".into())),
2205                            op: BinaryOperator::Gt,
2206                            right: Box::new(Expr::Value(
2207                                Value::SingleQuotedString("sh".to_string()).into()
2208                            ))
2209                        }),
2210                        op: BinaryOperator::And,
2211                        right: Box::new(Expr::BinaryOp {
2212                            left: Box::new(Expr::Identifier("host_id".into())),
2213                            op: BinaryOperator::Lt,
2214                            right: Box::new(Expr::Value(
2215                                Value::Number("3000".to_string(), false).into()
2216                            ))
2217                        })
2218                    }
2219                );
2220                assert_eq!(
2221                    exprs[3],
2222                    Expr::BinaryOp {
2223                        left: Box::new(Expr::BinaryOp {
2224                            left: Box::new(Expr::Identifier("idc".into())),
2225                            op: BinaryOperator::Gt,
2226                            right: Box::new(Expr::Value(
2227                                Value::SingleQuotedString("sh".to_string()).into()
2228                            ))
2229                        }),
2230                        op: BinaryOperator::And,
2231                        right: Box::new(Expr::BinaryOp {
2232                            left: Box::new(Expr::Identifier("host_id".into())),
2233                            op: BinaryOperator::GtEq,
2234                            right: Box::new(Expr::Value(
2235                                Value::Number("3000".to_string(), false).into()
2236                            ))
2237                        })
2238                    }
2239                );
2240            }
2241            _ => unreachable!(),
2242        }
2243    }
2244
2245    #[test]
2246    fn test_parse_create_table_with_quoted_partitions() {
2247        let sql = r"
2248CREATE TABLE monitor (
2249  `host_id`    INT,
2250  idc        STRING,
2251  ts         TIMESTAMP,
2252  cpu        DOUBLE DEFAULT 0,
2253  memory     DOUBLE,
2254  TIME INDEX (ts),
2255  PRIMARY KEY (host),
2256)
2257PARTITION ON COLUMNS(IdC, host_id) (
2258  idc <= 'hz' AND host_id < 1000,
2259  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
2260  idc > 'sh' AND host_id < 3000,
2261  idc > 'sh' AND host_id >= 3000,
2262)";
2263        let result =
2264            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2265                .unwrap();
2266        assert_eq!(result.len(), 1);
2267    }
2268
2269    #[test]
2270    fn test_parse_create_table_with_timestamp_index() {
2271        let sql1 = r"
2272CREATE TABLE monitor (
2273  host_id    INT,
2274  idc        STRING,
2275  ts         TIMESTAMP TIME INDEX,
2276  cpu        DOUBLE DEFAULT 0,
2277  memory     DOUBLE,
2278  PRIMARY KEY (host),
2279)
2280ENGINE=mito";
2281        let result1 = ParserContext::create_with_dialect(
2282            sql1,
2283            &GreptimeDbDialect {},
2284            ParseOptions::default(),
2285        )
2286        .unwrap();
2287
2288        if let Statement::CreateTable(c) = &result1[0] {
2289            assert_eq!(c.constraints.len(), 2);
2290            let tc = c.constraints[0].clone();
2291            match tc {
2292                TableConstraint::TimeIndex { column } => {
2293                    assert_eq!(&column.value, "ts");
2294                }
2295                _ => panic!("should be time index constraint"),
2296            };
2297        } else {
2298            panic!("should be create_table statement");
2299        }
2300
2301        // `TIME INDEX` should be in front of `PRIMARY KEY`
2302        // in order to equal the `TIMESTAMP TIME INDEX` constraint options vector
2303        let sql2 = r"
2304CREATE TABLE monitor (
2305  host_id    INT,
2306  idc        STRING,
2307  ts         TIMESTAMP NOT NULL,
2308  cpu        DOUBLE DEFAULT 0,
2309  memory     DOUBLE,
2310  TIME INDEX (ts),
2311  PRIMARY KEY (host),
2312)
2313ENGINE=mito";
2314        let result2 = ParserContext::create_with_dialect(
2315            sql2,
2316            &GreptimeDbDialect {},
2317            ParseOptions::default(),
2318        )
2319        .unwrap();
2320
2321        assert_eq!(result1, result2);
2322
2323        // TIMESTAMP can be NULL which is not equal to above
2324        let sql3 = r"
2325CREATE TABLE monitor (
2326  host_id    INT,
2327  idc        STRING,
2328  ts         TIMESTAMP,
2329  cpu        DOUBLE DEFAULT 0,
2330  memory     DOUBLE,
2331  TIME INDEX (ts),
2332  PRIMARY KEY (host),
2333)
2334ENGINE=mito";
2335
2336        let result3 = ParserContext::create_with_dialect(
2337            sql3,
2338            &GreptimeDbDialect {},
2339            ParseOptions::default(),
2340        )
2341        .unwrap();
2342
2343        assert_ne!(result1, result3);
2344
2345        // BIGINT can't be time index any more
2346        let sql1 = r"
2347CREATE TABLE monitor (
2348  host_id    INT,
2349  idc        STRING,
2350  b          bigint TIME INDEX,
2351  cpu        DOUBLE DEFAULT 0,
2352  memory     DOUBLE,
2353  PRIMARY KEY (host),
2354)
2355ENGINE=mito";
2356        let result1 = ParserContext::create_with_dialect(
2357            sql1,
2358            &GreptimeDbDialect {},
2359            ParseOptions::default(),
2360        );
2361
2362        assert!(
2363            result1
2364                .unwrap_err()
2365                .to_string()
2366                .contains("time index column data type should be timestamp")
2367        );
2368    }
2369
2370    #[test]
2371    fn test_parse_create_table_with_timestamp_index_not_null() {
2372        let sql = r"
2373CREATE TABLE monitor (
2374  host_id    INT,
2375  idc        STRING,
2376  ts         TIMESTAMP TIME INDEX,
2377  cpu        DOUBLE DEFAULT 0,
2378  memory     DOUBLE,
2379  TIME INDEX (ts),
2380  PRIMARY KEY (host),
2381)
2382ENGINE=mito";
2383        let result =
2384            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2385                .unwrap();
2386
2387        assert_eq!(result.len(), 1);
2388        if let Statement::CreateTable(c) = &result[0] {
2389            let ts = c.columns[2].clone();
2390            assert_eq!(ts.name().to_string(), "ts");
2391            assert_eq!(ts.options()[0].option, NotNull);
2392        } else {
2393            panic!("should be create table statement");
2394        }
2395
2396        let sql1 = r"
2397CREATE TABLE monitor (
2398  host_id    INT,
2399  idc        STRING,
2400  ts         TIMESTAMP NOT NULL TIME INDEX,
2401  cpu        DOUBLE DEFAULT 0,
2402  memory     DOUBLE,
2403  TIME INDEX (ts),
2404  PRIMARY KEY (host),
2405)
2406ENGINE=mito";
2407
2408        let result1 = ParserContext::create_with_dialect(
2409            sql1,
2410            &GreptimeDbDialect {},
2411            ParseOptions::default(),
2412        )
2413        .unwrap();
2414        assert_eq!(result, result1);
2415
2416        let sql2 = r"
2417CREATE TABLE monitor (
2418  host_id    INT,
2419  idc        STRING,
2420  ts         TIMESTAMP TIME INDEX NOT NULL,
2421  cpu        DOUBLE DEFAULT 0,
2422  memory     DOUBLE,
2423  TIME INDEX (ts),
2424  PRIMARY KEY (host),
2425)
2426ENGINE=mito";
2427
2428        let result2 = ParserContext::create_with_dialect(
2429            sql2,
2430            &GreptimeDbDialect {},
2431            ParseOptions::default(),
2432        )
2433        .unwrap();
2434        assert_eq!(result, result2);
2435
2436        let sql3 = r"
2437CREATE TABLE monitor (
2438  host_id    INT,
2439  idc        STRING,
2440  ts         TIMESTAMP TIME INDEX NULL NOT,
2441  cpu        DOUBLE DEFAULT 0,
2442  memory     DOUBLE,
2443  TIME INDEX (ts),
2444  PRIMARY KEY (host),
2445)
2446ENGINE=mito";
2447
2448        let result3 = ParserContext::create_with_dialect(
2449            sql3,
2450            &GreptimeDbDialect {},
2451            ParseOptions::default(),
2452        );
2453        assert!(result3.is_err());
2454
2455        let sql4 = r"
2456CREATE TABLE monitor (
2457  host_id    INT,
2458  idc        STRING,
2459  ts         TIMESTAMP TIME INDEX NOT NULL NULL,
2460  cpu        DOUBLE DEFAULT 0,
2461  memory     DOUBLE,
2462  TIME INDEX (ts),
2463  PRIMARY KEY (host),
2464)
2465ENGINE=mito";
2466
2467        let result4 = ParserContext::create_with_dialect(
2468            sql4,
2469            &GreptimeDbDialect {},
2470            ParseOptions::default(),
2471        );
2472        assert!(result4.is_err());
2473
2474        let sql = r"
2475CREATE TABLE monitor (
2476  host_id    INT,
2477  idc        STRING,
2478  ts         TIMESTAMP TIME INDEX DEFAULT CURRENT_TIMESTAMP,
2479  cpu        DOUBLE DEFAULT 0,
2480  memory     DOUBLE,
2481  TIME INDEX (ts),
2482  PRIMARY KEY (host),
2483)
2484ENGINE=mito";
2485
2486        let result =
2487            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2488                .unwrap();
2489
2490        if let Statement::CreateTable(c) = &result[0] {
2491            let tc = c.constraints[0].clone();
2492            match tc {
2493                TableConstraint::TimeIndex { column } => {
2494                    assert_eq!(&column.value, "ts");
2495                }
2496                _ => panic!("should be time index constraint"),
2497            }
2498            let ts = c.columns[2].clone();
2499            assert_eq!(ts.name().to_string(), "ts");
2500            assert!(matches!(ts.options()[0].option, ColumnOption::Default(..)));
2501            assert_eq!(ts.options()[1].option, NotNull);
2502        } else {
2503            unreachable!("should be create table statement");
2504        }
2505    }
2506
2507    #[test]
2508    fn test_parse_partitions_with_error_syntax() {
2509        let sql = r"
2510CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2511PARTITION COLUMNS(c, a) (
2512    a < 10,
2513    a > 10 AND a < 20,
2514    a > 20 AND c < 100,
2515    a > 20 AND c >= 100
2516)
2517ENGINE=mito";
2518        let result =
2519            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2520        assert!(
2521            result
2522                .unwrap_err()
2523                .output_msg()
2524                .contains("sql parser error: Expected: ON, found: COLUMNS")
2525        );
2526    }
2527
2528    #[test]
2529    fn test_parse_partitions_without_rule() {
2530        let sql = r"
2531CREATE TABLE rcx ( a INT, b STRING, c INT, d TIMESTAMP TIME INDEX )
2532PARTITION ON COLUMNS(c, a) ()
2533ENGINE=mito";
2534        ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2535            .unwrap();
2536    }
2537
2538    #[test]
2539    fn test_parse_partitions_unreferenced_column() {
2540        let sql = r"
2541CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2542PARTITION ON COLUMNS(c, a) (
2543    b = 'foo'
2544)
2545ENGINE=mito";
2546        let result =
2547            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2548        assert_eq!(
2549            result.unwrap_err().output_msg(),
2550            "Invalid SQL, error: Column \"b\" in rule expr is not referenced in PARTITION ON"
2551        );
2552    }
2553
2554    #[test]
2555    fn test_parse_partitions_not_binary_expr() {
2556        let sql = r"
2557CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2558PARTITION ON COLUMNS(c, a) (
2559    b
2560)
2561ENGINE=mito";
2562        let result =
2563            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2564        assert_eq!(
2565            result.unwrap_err().output_msg(),
2566            r#"Invalid SQL, error: Partition rule expr Identifier(Ident { value: "b", quote_style: None, span: Span(Location(4,5)..Location(4,6)) }) is not a binary expr"#
2567        );
2568    }
2569
2570    fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {
2571        assert_eq!(column.name.to_string(), name);
2572        assert_eq!(column.data_type.to_string(), data_type);
2573    }
2574
2575    #[test]
2576    pub fn test_parse_create_table() {
2577        let sql = r"create table demo(
2578                             host string,
2579                             ts timestamp,
2580                             cpu float32 default 0,
2581                             memory float64,
2582                             TIME INDEX (ts),
2583                             PRIMARY KEY(ts, host),
2584                             ) engine=mito
2585                             with(ttl='10s');
2586         ";
2587        let result =
2588            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2589                .unwrap();
2590        assert_eq!(1, result.len());
2591        match &result[0] {
2592            Statement::CreateTable(c) => {
2593                assert!(!c.if_not_exists);
2594                assert_eq!("demo", c.name.to_string());
2595                assert_eq!("mito", c.engine);
2596                assert_eq!(4, c.columns.len());
2597                let columns = &c.columns;
2598                assert_column_def(&columns[0].column_def, "host", "STRING");
2599                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
2600                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
2601                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
2602
2603                let constraints = &c.constraints;
2604                assert_eq!(
2605                    &constraints[0],
2606                    &TableConstraint::TimeIndex {
2607                        column: Ident::new("ts"),
2608                    }
2609                );
2610                assert_eq!(
2611                    &constraints[1],
2612                    &TableConstraint::PrimaryKey {
2613                        columns: vec![Ident::new("ts"), Ident::new("host")]
2614                    }
2615                );
2616                // inverted index is merged into column options
2617                assert_eq!(1, c.options.len());
2618                assert_eq!(
2619                    [("ttl", "10s")].into_iter().collect::<HashMap<_, _>>(),
2620                    c.options.to_str_map()
2621                );
2622            }
2623            _ => unreachable!(),
2624        }
2625    }
2626
2627    #[test]
2628    fn test_invalid_index_keys() {
2629        let sql = r"create table demo(
2630                             host string,
2631                             ts int64,
2632                             cpu float64 default 0,
2633                             memory float64,
2634                             TIME INDEX (ts, host),
2635                             PRIMARY KEY(ts, host)) engine=mito;
2636         ";
2637        let result =
2638            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2639        assert!(result.is_err());
2640        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2641    }
2642
2643    #[test]
2644    fn test_duplicated_time_index() {
2645        let sql = r"create table demo(
2646                             host string,
2647                             ts timestamp time index,
2648                             t timestamp time index,
2649                             cpu float64 default 0,
2650                             memory float64,
2651                             TIME INDEX (ts, host),
2652                             PRIMARY KEY(ts, host)) engine=mito;
2653         ";
2654        let result =
2655            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2656        assert!(result.is_err());
2657        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2658
2659        let sql = r"create table demo(
2660                             host string,
2661                             ts timestamp time index,
2662                             cpu float64 default 0,
2663                             t timestamp,
2664                             memory float64,
2665                             TIME INDEX (t),
2666                             PRIMARY KEY(ts, host)) engine=mito;
2667         ";
2668        let result =
2669            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2670        assert!(result.is_err());
2671        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2672    }
2673
2674    #[test]
2675    fn test_invalid_column_name() {
2676        let sql = "create table foo(user string, i timestamp time index)";
2677        let result =
2678            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2679        let err = result.unwrap_err().output_msg();
2680        assert!(err.contains("Cannot use keyword 'user' as column name"));
2681
2682        // If column name is quoted, it's valid even same with keyword.
2683        let sql = r#"
2684            create table foo("user" string, i timestamp time index)
2685        "#;
2686        let result =
2687            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2688        let _ = result.unwrap();
2689    }
2690
2691    #[test]
2692    fn test_incorrect_default_value_issue_3479() {
2693        let sql = r#"CREATE TABLE `ExcePTuRi`(
2694non TIMESTAMP(6) TIME INDEX,
2695`iUSTO` DOUBLE DEFAULT 0.047318541668048164
2696)"#;
2697        let result =
2698            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2699                .unwrap();
2700        assert_eq!(1, result.len());
2701        match &result[0] {
2702            Statement::CreateTable(c) => {
2703                assert_eq!(
2704                    "`iUSTO` DOUBLE DEFAULT 0.047318541668048164",
2705                    c.columns[1].to_string()
2706                );
2707            }
2708            _ => unreachable!(),
2709        }
2710    }
2711
2712    #[test]
2713    fn test_parse_create_view() {
2714        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
2715
2716        let result =
2717            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2718                .unwrap();
2719        match &result[0] {
2720            Statement::CreateView(c) => {
2721                assert_eq!(c.to_string(), sql);
2722                assert!(!c.or_replace);
2723                assert!(!c.if_not_exists);
2724                assert_eq!("test", c.name.to_string());
2725            }
2726            _ => unreachable!(),
2727        }
2728
2729        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS";
2730
2731        let result =
2732            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2733                .unwrap();
2734        match &result[0] {
2735            Statement::CreateView(c) => {
2736                assert_eq!(c.to_string(), sql);
2737                assert!(c.or_replace);
2738                assert!(c.if_not_exists);
2739                assert_eq!("test", c.name.to_string());
2740            }
2741            _ => unreachable!(),
2742        }
2743    }
2744
2745    #[test]
2746    fn test_parse_create_view_invalid_query() {
2747        let sql = "CREATE VIEW test AS DELETE from demo";
2748        let result =
2749            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2750        assert!(result.is_ok_and(|x| x.len() == 1));
2751    }
2752
2753    #[test]
2754    fn test_parse_create_table_fulltext_options() {
2755        let sql1 = r"
2756CREATE TABLE log (
2757    ts TIMESTAMP TIME INDEX,
2758    msg TEXT FULLTEXT INDEX,
2759)";
2760        let result1 = ParserContext::create_with_dialect(
2761            sql1,
2762            &GreptimeDbDialect {},
2763            ParseOptions::default(),
2764        )
2765        .unwrap();
2766
2767        if let Statement::CreateTable(c) = &result1[0] {
2768            c.columns.iter().for_each(|col| {
2769                if col.name().value == "msg" {
2770                    assert!(
2771                        col.extensions
2772                            .fulltext_index_options
2773                            .as_ref()
2774                            .unwrap()
2775                            .is_empty()
2776                    );
2777                }
2778            });
2779        } else {
2780            panic!("should be create_table statement");
2781        }
2782
2783        let sql2 = r"
2784CREATE TABLE log (
2785    ts TIMESTAMP TIME INDEX,
2786    msg STRING FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false')
2787)";
2788        let result2 = ParserContext::create_with_dialect(
2789            sql2,
2790            &GreptimeDbDialect {},
2791            ParseOptions::default(),
2792        )
2793        .unwrap();
2794
2795        if let Statement::CreateTable(c) = &result2[0] {
2796            c.columns.iter().for_each(|col| {
2797                if col.name().value == "msg" {
2798                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2799                    assert_eq!(options.len(), 2);
2800                    assert_eq!(options.get("analyzer").unwrap(), "English");
2801                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2802                }
2803            });
2804        } else {
2805            panic!("should be create_table statement");
2806        }
2807
2808        let sql3 = r"
2809CREATE TABLE log (
2810    ts TIMESTAMP TIME INDEX,
2811    msg1 TINYTEXT FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false'),
2812    msg2 CHAR(20) FULLTEXT INDEX WITH (analyzer='Chinese', case_sensitive='true')
2813)";
2814        let result3 = ParserContext::create_with_dialect(
2815            sql3,
2816            &GreptimeDbDialect {},
2817            ParseOptions::default(),
2818        )
2819        .unwrap();
2820
2821        if let Statement::CreateTable(c) = &result3[0] {
2822            c.columns.iter().for_each(|col| {
2823                if col.name().value == "msg1" {
2824                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2825                    assert_eq!(options.len(), 2);
2826                    assert_eq!(options.get("analyzer").unwrap(), "English");
2827                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2828                } else if col.name().value == "msg2" {
2829                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2830                    assert_eq!(options.len(), 2);
2831                    assert_eq!(options.get("analyzer").unwrap(), "Chinese");
2832                    assert_eq!(options.get("case_sensitive").unwrap(), "true");
2833                }
2834            });
2835        } else {
2836            panic!("should be create_table statement");
2837        }
2838    }
2839
2840    #[test]
2841    fn test_parse_create_table_fulltext_options_invalid_type() {
2842        let sql = r"
2843CREATE TABLE log (
2844    ts TIMESTAMP TIME INDEX,
2845    msg INT FULLTEXT INDEX,
2846)";
2847        let result =
2848            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2849        assert!(result.is_err());
2850        assert!(
2851            result
2852                .unwrap_err()
2853                .to_string()
2854                .contains("FULLTEXT index only supports string type")
2855        );
2856    }
2857
2858    #[test]
2859    fn test_parse_create_table_fulltext_options_duplicate() {
2860        let sql = r"
2861CREATE TABLE log (
2862    ts TIMESTAMP TIME INDEX,
2863    msg STRING FULLTEXT INDEX WITH (analyzer='English', analyzer='Chinese') FULLTEXT INDEX WITH (case_sensitive='false')
2864)";
2865        let result =
2866            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2867        assert!(result.is_err());
2868        assert!(
2869            result
2870                .unwrap_err()
2871                .to_string()
2872                .contains("duplicated FULLTEXT INDEX option")
2873        );
2874    }
2875
2876    #[test]
2877    fn test_parse_create_table_fulltext_options_invalid_option() {
2878        let sql = r"
2879CREATE TABLE log (
2880    ts TIMESTAMP TIME INDEX,
2881    msg STRING FULLTEXT INDEX WITH (analyzer='English', invalid_option='Chinese')
2882)";
2883        let result =
2884            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2885        assert!(result.is_err());
2886        assert!(
2887            result
2888                .unwrap_err()
2889                .to_string()
2890                .contains("invalid FULLTEXT INDEX option")
2891        );
2892    }
2893
2894    #[test]
2895    fn test_parse_create_table_skip_options() {
2896        let sql = r"
2897CREATE TABLE log (
2898    ts TIMESTAMP TIME INDEX,
2899    msg INT SKIPPING INDEX WITH (granularity='8192', type='bloom'),
2900)";
2901        let result =
2902            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2903                .unwrap();
2904
2905        if let Statement::CreateTable(c) = &result[0] {
2906            c.columns.iter().for_each(|col| {
2907                if col.name().value == "msg" {
2908                    assert!(
2909                        !col.extensions
2910                            .skipping_index_options
2911                            .as_ref()
2912                            .unwrap()
2913                            .is_empty()
2914                    );
2915                }
2916            });
2917        } else {
2918            panic!("should be create_table statement");
2919        }
2920
2921        let sql = r"
2922        CREATE TABLE log (
2923            ts TIMESTAMP TIME INDEX,
2924            msg INT SKIPPING INDEX,
2925        )";
2926        let result =
2927            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2928                .unwrap();
2929
2930        if let Statement::CreateTable(c) = &result[0] {
2931            c.columns.iter().for_each(|col| {
2932                if col.name().value == "msg" {
2933                    assert!(
2934                        col.extensions
2935                            .skipping_index_options
2936                            .as_ref()
2937                            .unwrap()
2938                            .is_empty()
2939                    );
2940                }
2941            });
2942        } else {
2943            panic!("should be create_table statement");
2944        }
2945    }
2946
2947    #[test]
2948    fn test_parse_create_view_with_columns() {
2949        let sql = "CREATE VIEW test () AS SELECT * FROM NUMBERS";
2950        let result =
2951            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2952                .unwrap();
2953
2954        match &result[0] {
2955            Statement::CreateView(c) => {
2956                assert_eq!(c.to_string(), "CREATE VIEW test AS SELECT * FROM NUMBERS");
2957                assert!(!c.or_replace);
2958                assert!(!c.if_not_exists);
2959                assert_eq!("test", c.name.to_string());
2960            }
2961            _ => unreachable!(),
2962        }
2963        assert_eq!(
2964            "CREATE VIEW test AS SELECT * FROM NUMBERS",
2965            result[0].to_string()
2966        );
2967
2968        let sql = "CREATE VIEW test (n1) AS SELECT * FROM NUMBERS";
2969        let result =
2970            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2971                .unwrap();
2972
2973        match &result[0] {
2974            Statement::CreateView(c) => {
2975                assert_eq!(c.to_string(), sql);
2976                assert!(!c.or_replace);
2977                assert!(!c.if_not_exists);
2978                assert_eq!("test", c.name.to_string());
2979            }
2980            _ => unreachable!(),
2981        }
2982        assert_eq!(sql, result[0].to_string());
2983
2984        let sql = "CREATE VIEW test (n1, n2) AS SELECT * FROM NUMBERS";
2985        let result =
2986            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2987                .unwrap();
2988
2989        match &result[0] {
2990            Statement::CreateView(c) => {
2991                assert_eq!(c.to_string(), sql);
2992                assert!(!c.or_replace);
2993                assert!(!c.if_not_exists);
2994                assert_eq!("test", c.name.to_string());
2995            }
2996            _ => unreachable!(),
2997        }
2998        assert_eq!(sql, result[0].to_string());
2999
3000        // Some invalid syntax cases
3001        let sql = "CREATE VIEW test (n1 AS select * from demo";
3002        let result =
3003            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3004        assert!(result.is_err());
3005
3006        let sql = "CREATE VIEW test (n1, AS select * from demo";
3007        let result =
3008            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3009        assert!(result.is_err());
3010
3011        let sql = "CREATE VIEW test n1,n2) AS select * from demo";
3012        let result =
3013            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3014        assert!(result.is_err());
3015
3016        let sql = "CREATE VIEW test (1) AS select * from demo";
3017        let result =
3018            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3019        assert!(result.is_err());
3020
3021        // keyword
3022        let sql = "CREATE VIEW test (n1, select) AS select * from demo";
3023        let result =
3024            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3025        assert!(result.is_err());
3026    }
3027
3028    #[test]
3029    fn test_parse_column_extensions_vector() {
3030        // Test that vector options are parsed from data_type (no additional SQL needed)
3031        let sql = "";
3032        let dialect = GenericDialect {};
3033        let mut tokenizer = Tokenizer::new(&dialect, sql);
3034        let tokens = tokenizer.tokenize().unwrap();
3035        let mut parser = Parser::new(&dialect).with_tokens(tokens);
3036        let name = Ident::new("vec_col");
3037        let data_type =
3038            DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
3039        let mut extensions = ColumnExtensions::default();
3040
3041        let result =
3042            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
3043        assert!(result.is_ok());
3044        assert!(extensions.vector_options.is_some());
3045        let vector_options = extensions.vector_options.unwrap();
3046        assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some("128"));
3047    }
3048
3049    #[test]
3050    fn test_parse_column_extensions_vector_invalid() {
3051        // Test that vector with no dimension fails
3052        let sql = "";
3053        let dialect = GenericDialect {};
3054        let mut tokenizer = Tokenizer::new(&dialect, sql);
3055        let tokens = tokenizer.tokenize().unwrap();
3056        let mut parser = Parser::new(&dialect).with_tokens(tokens);
3057        let name = Ident::new("vec_col");
3058        let data_type = DataType::Custom(vec![Ident::new("VECTOR")].into(), vec![]);
3059        let mut extensions = ColumnExtensions::default();
3060
3061        let result =
3062            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
3063        assert!(result.is_err());
3064    }
3065
3066    #[test]
3067    fn test_parse_column_extensions_indices() {
3068        // Test skipping index
3069        {
3070            let sql = "SKIPPING INDEX";
3071            let dialect = GenericDialect {};
3072            let mut tokenizer = Tokenizer::new(&dialect, sql);
3073            let tokens = tokenizer.tokenize().unwrap();
3074            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3075            let name = Ident::new("col");
3076            let data_type = DataType::String(None);
3077            let mut extensions = ColumnExtensions::default();
3078            let result = ParserContext::parse_column_extensions(
3079                &mut parser,
3080                &name,
3081                &data_type,
3082                &mut extensions,
3083            );
3084            assert!(result.is_ok());
3085            assert!(extensions.skipping_index_options.is_some());
3086        }
3087
3088        // Test fulltext index with options
3089        {
3090            let sql = "FULLTEXT INDEX WITH (analyzer = 'English', case_sensitive = 'true')";
3091            let dialect = GenericDialect {};
3092            let mut tokenizer = Tokenizer::new(&dialect, sql);
3093            let tokens = tokenizer.tokenize().unwrap();
3094            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3095            let name = Ident::new("text_col");
3096            let data_type = DataType::String(None);
3097            let mut extensions = ColumnExtensions::default();
3098            let result = ParserContext::parse_column_extensions(
3099                &mut parser,
3100                &name,
3101                &data_type,
3102                &mut extensions,
3103            );
3104            assert!(result.unwrap());
3105            assert!(extensions.fulltext_index_options.is_some());
3106            let fulltext_options = extensions.fulltext_index_options.unwrap();
3107            assert_eq!(fulltext_options.get("analyzer"), Some("English"));
3108            assert_eq!(fulltext_options.get("case_sensitive"), Some("true"));
3109        }
3110
3111        // Test fulltext index with invalid type (should fail)
3112        {
3113            let sql = "FULLTEXT INDEX WITH (analyzer = 'English')";
3114            let dialect = GenericDialect {};
3115            let mut tokenizer = Tokenizer::new(&dialect, sql);
3116            let tokens = tokenizer.tokenize().unwrap();
3117            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3118            let name = Ident::new("num_col");
3119            let data_type = DataType::Int(None); // Non-string type
3120            let mut extensions = ColumnExtensions::default();
3121            let result = ParserContext::parse_column_extensions(
3122                &mut parser,
3123                &name,
3124                &data_type,
3125                &mut extensions,
3126            );
3127            assert!(result.is_err());
3128            assert!(
3129                result
3130                    .unwrap_err()
3131                    .to_string()
3132                    .contains("FULLTEXT index only supports string type")
3133            );
3134        }
3135
3136        // Test fulltext index with invalid option (won't fail, the parser doesn't check the option's content)
3137        {
3138            let sql = "FULLTEXT INDEX WITH (analyzer = 'Invalid', case_sensitive = 'true')";
3139            let dialect = GenericDialect {};
3140            let mut tokenizer = Tokenizer::new(&dialect, sql);
3141            let tokens = tokenizer.tokenize().unwrap();
3142            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3143            let name = Ident::new("text_col");
3144            let data_type = DataType::String(None);
3145            let mut extensions = ColumnExtensions::default();
3146            let result = ParserContext::parse_column_extensions(
3147                &mut parser,
3148                &name,
3149                &data_type,
3150                &mut extensions,
3151            );
3152            assert!(result.unwrap());
3153        }
3154
3155        // Test inverted index
3156        {
3157            let sql = "INVERTED INDEX";
3158            let dialect = GenericDialect {};
3159            let mut tokenizer = Tokenizer::new(&dialect, sql);
3160            let tokens = tokenizer.tokenize().unwrap();
3161            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3162            let name = Ident::new("col");
3163            let data_type = DataType::String(None);
3164            let mut extensions = ColumnExtensions::default();
3165            let result = ParserContext::parse_column_extensions(
3166                &mut parser,
3167                &name,
3168                &data_type,
3169                &mut extensions,
3170            );
3171            assert!(result.is_ok());
3172            assert!(extensions.inverted_index_options.is_some());
3173        }
3174
3175        // Test inverted index with options (should fail)
3176        {
3177            let sql = "INVERTED INDEX WITH (analyzer = 'English')";
3178            let dialect = GenericDialect {};
3179            let mut tokenizer = Tokenizer::new(&dialect, sql);
3180            let tokens = tokenizer.tokenize().unwrap();
3181            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3182            let name = Ident::new("col");
3183            let data_type = DataType::String(None);
3184            let mut extensions = ColumnExtensions::default();
3185            let result = ParserContext::parse_column_extensions(
3186                &mut parser,
3187                &name,
3188                &data_type,
3189                &mut extensions,
3190            );
3191            assert!(result.is_err());
3192            assert!(
3193                result
3194                    .unwrap_err()
3195                    .to_string()
3196                    .contains("INVERTED index doesn't support options")
3197            );
3198        }
3199
3200        // Test multiple indices
3201        {
3202            let sql = "SKIPPING INDEX FULLTEXT INDEX";
3203            let dialect = GenericDialect {};
3204            let mut tokenizer = Tokenizer::new(&dialect, sql);
3205            let tokens = tokenizer.tokenize().unwrap();
3206            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3207            let name = Ident::new("col");
3208            let data_type = DataType::String(None);
3209            let mut extensions = ColumnExtensions::default();
3210            let result = ParserContext::parse_column_extensions(
3211                &mut parser,
3212                &name,
3213                &data_type,
3214                &mut extensions,
3215            );
3216            assert!(result.unwrap());
3217            assert!(extensions.skipping_index_options.is_some());
3218            assert!(extensions.fulltext_index_options.is_some());
3219        }
3220    }
3221
3222    #[test]
3223    fn test_parse_interval_cast() {
3224        let s = "select '10s'::INTERVAL";
3225        let stmts =
3226            ParserContext::create_with_dialect(s, &GreptimeDbDialect {}, ParseOptions::default())
3227                .unwrap();
3228        assert_eq!("SELECT '10 seconds'::INTERVAL", &stmts[0].to_string());
3229    }
3230
3231    #[test]
3232    fn test_parse_create_table_vector_index_options() {
3233        // Test basic vector index
3234        let sql = r"
3235CREATE TABLE vectors (
3236    ts TIMESTAMP TIME INDEX,
3237    vec VECTOR(128) VECTOR INDEX,
3238)";
3239        let result =
3240            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3241                .unwrap();
3242
3243        if let Statement::CreateTable(c) = &result[0] {
3244            c.columns.iter().for_each(|col| {
3245                if col.name().value == "vec" {
3246                    assert!(
3247                        col.extensions
3248                            .vector_index_options
3249                            .as_ref()
3250                            .unwrap()
3251                            .is_empty()
3252                    );
3253                }
3254            });
3255        } else {
3256            panic!("should be create_table statement");
3257        }
3258
3259        // Test vector index with options
3260        let sql = r"
3261CREATE TABLE vectors (
3262    ts TIMESTAMP TIME INDEX,
3263    vec VECTOR(128) VECTOR INDEX WITH (metric='cosine', connectivity='32', expansion_add='256', expansion_search='128')
3264)";
3265        let result =
3266            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3267                .unwrap();
3268
3269        if let Statement::CreateTable(c) = &result[0] {
3270            c.columns.iter().for_each(|col| {
3271                if col.name().value == "vec" {
3272                    let options = col.extensions.vector_index_options.as_ref().unwrap();
3273                    assert_eq!(options.len(), 4);
3274                    assert_eq!(options.get("metric").unwrap(), "cosine");
3275                    assert_eq!(options.get("connectivity").unwrap(), "32");
3276                    assert_eq!(options.get("expansion_add").unwrap(), "256");
3277                    assert_eq!(options.get("expansion_search").unwrap(), "128");
3278                }
3279            });
3280        } else {
3281            panic!("should be create_table statement");
3282        }
3283    }
3284
3285    #[test]
3286    fn test_parse_create_table_vector_index_invalid_type() {
3287        // Test vector index on non-vector type (should fail)
3288        let sql = r"
3289CREATE TABLE vectors (
3290    ts TIMESTAMP TIME INDEX,
3291    col INT VECTOR INDEX,
3292)";
3293        let result =
3294            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3295        assert!(result.is_err());
3296        assert!(
3297            result
3298                .unwrap_err()
3299                .to_string()
3300                .contains("VECTOR INDEX only supports Vector type columns")
3301        );
3302    }
3303
3304    #[test]
3305    fn test_parse_create_table_vector_index_duplicate() {
3306        // Test duplicate vector index (should fail)
3307        let sql = r"
3308CREATE TABLE vectors (
3309    ts TIMESTAMP TIME INDEX,
3310    vec VECTOR(128) VECTOR INDEX VECTOR INDEX,
3311)";
3312        let result =
3313            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3314        assert!(result.is_err());
3315        assert!(
3316            result
3317                .unwrap_err()
3318                .to_string()
3319                .contains("duplicated VECTOR INDEX option")
3320        );
3321    }
3322
3323    #[test]
3324    fn test_parse_create_table_vector_index_invalid_option() {
3325        // Test invalid option key (should fail)
3326        let sql = r"
3327CREATE TABLE vectors (
3328    ts TIMESTAMP TIME INDEX,
3329    vec VECTOR(128) VECTOR INDEX WITH (metric='l2sq', invalid_option='foo')
3330)";
3331        let result =
3332            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3333        assert!(result.is_err());
3334        assert!(
3335            result
3336                .unwrap_err()
3337                .to_string()
3338                .contains("invalid VECTOR INDEX option")
3339        );
3340    }
3341
3342    #[test]
3343    fn test_parse_column_extensions_vector_index() {
3344        // Test vector index on vector type
3345        {
3346            let sql = "VECTOR INDEX WITH (metric = 'l2sq')";
3347            let dialect = GenericDialect {};
3348            let mut tokenizer = Tokenizer::new(&dialect, sql);
3349            let tokens = tokenizer.tokenize().unwrap();
3350            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3351            let name = Ident::new("vec_col");
3352            let data_type =
3353                DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
3354            // First, parse the vector type to set vector_options
3355            let mut extensions = ColumnExtensions {
3356                vector_options: Some(OptionMap::from([(
3357                    VECTOR_OPT_DIM.to_string(),
3358                    "128".to_string(),
3359                )])),
3360                ..Default::default()
3361            };
3362
3363            let result = ParserContext::parse_column_extensions(
3364                &mut parser,
3365                &name,
3366                &data_type,
3367                &mut extensions,
3368            );
3369            assert!(result.is_ok());
3370            assert!(extensions.vector_index_options.is_some());
3371            let vi_options = extensions.vector_index_options.unwrap();
3372            assert_eq!(vi_options.get("metric"), Some("l2sq"));
3373        }
3374
3375        // Test vector index on non-vector type (should fail)
3376        {
3377            let sql = "VECTOR INDEX";
3378            let dialect = GenericDialect {};
3379            let mut tokenizer = Tokenizer::new(&dialect, sql);
3380            let tokens = tokenizer.tokenize().unwrap();
3381            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3382            let name = Ident::new("num_col");
3383            let data_type = DataType::Int(None); // Non-vector type
3384            let mut extensions = ColumnExtensions::default();
3385            let result = ParserContext::parse_column_extensions(
3386                &mut parser,
3387                &name,
3388                &data_type,
3389                &mut extensions,
3390            );
3391            assert!(result.is_err());
3392            assert!(
3393                result
3394                    .unwrap_err()
3395                    .to_string()
3396                    .contains("VECTOR INDEX only supports Vector type columns")
3397            );
3398        }
3399    }
3400}