Skip to main content

sql/parsers/
create_parser.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod json;
16#[cfg(feature = "enterprise")]
17pub mod trigger;
18
19use std::collections::HashMap;
20
21use arrow_buffer::IntervalMonthDayNano;
22use common_catalog::consts::default_engine;
23use datafusion_common::ScalarValue;
24use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
25use datatypes::data_type::ConcreteDataType;
26use itertools::Itertools;
27use snafu::{OptionExt, ResultExt, ensure};
28use sqlparser::ast::{
29    ColumnOption, ColumnOptionDef, DataType, Expr, KeyOrIndexDisplay, NullsDistinctOption,
30    PrimaryKeyConstraint, UniqueConstraint,
31};
32use sqlparser::dialect::keywords::Keyword;
33use sqlparser::keywords::ALL_KEYWORDS;
34use sqlparser::parser::IsOptional::Mandatory;
35use sqlparser::parser::{Parser, ParserError};
36use sqlparser::tokenizer::{Token, TokenWithSpan, Word};
37use table::requests::validate_database_option;
38
39use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
40use crate::error::{
41    self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidFlowQuerySnafu,
42    InvalidIntervalSnafu, InvalidSqlSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result,
43    SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu,
44};
45use crate::parser::{FLOW, ParserContext};
46use crate::parsers::tql_parser;
47use crate::parsers::utils::{
48    self, parse_with_options, validate_column_fulltext_create_option,
49    validate_column_skipping_index_create_option, validate_column_vector_index_create_option,
50};
51use crate::statements::create::{
52    Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
53    CreateTableLike, CreateView, Partitions, SqlOrTql, TableConstraint, VECTOR_OPT_DIM,
54};
55use crate::statements::statement::Statement;
56use crate::statements::transform::type_alias::get_data_type_by_alias_name;
57use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
58use crate::util::{OptionValue, location_to_index, parse_option_string};
59
60pub const ENGINE: &str = "ENGINE";
61pub const MAXVALUE: &str = "MAXVALUE";
62pub const SINK: &str = "SINK";
63pub const EXPIRE: &str = "EXPIRE";
64pub const AFTER: &str = "AFTER";
65pub const INVERTED: &str = "INVERTED";
66pub const SKIPPING: &str = "SKIPPING";
67pub const VECTOR: &str = "VECTOR";
68
69pub type RawIntervalExpr = String;
70
71// Preserve raw CREATE FLOW option entries until operator-side validation.
72// Do not use `OptionMap::new()` here: it can drop non-string values for
73// redacted keys before the flow option allowlist rejects them.
74fn flow_option_map(options: HashMap<String, OptionValue>) -> OptionMap {
75    let mut flow_options = OptionMap::default();
76    for (key, value) in options {
77        flow_options.insert_options(&key, value);
78    }
79    flow_options
80}
81
82/// Parses create [table] statement
83impl<'a> ParserContext<'a> {
84    pub(crate) fn parse_create(&mut self) -> Result<Statement> {
85        match self.parser.peek_token().token {
86            Token::Word(w) => match w.keyword {
87                Keyword::TABLE => self.parse_create_table(),
88
89                Keyword::SCHEMA | Keyword::DATABASE => self.parse_create_database(),
90
91                Keyword::EXTERNAL => self.parse_create_external_table(),
92
93                Keyword::OR => {
94                    let _ = self.parser.next_token();
95                    self.parser
96                        .expect_keyword(Keyword::REPLACE)
97                        .context(SyntaxSnafu)?;
98                    match self.parser.next_token().token {
99                        Token::Word(w) => match w.keyword {
100                            Keyword::VIEW => self.parse_create_view(true),
101                            Keyword::NoKeyword => {
102                                let uppercase = w.value.to_uppercase();
103                                match uppercase.as_str() {
104                                    FLOW => self.parse_create_flow(true),
105                                    _ => self.unsupported(w.to_string()),
106                                }
107                            }
108                            _ => self.unsupported(w.to_string()),
109                        },
110                        _ => self.unsupported(w.to_string()),
111                    }
112                }
113
114                Keyword::VIEW => {
115                    let _ = self.parser.next_token();
116                    self.parse_create_view(false)
117                }
118
119                #[cfg(feature = "enterprise")]
120                Keyword::TRIGGER => {
121                    let _ = self.parser.next_token();
122                    self.parse_create_trigger()
123                }
124
125                Keyword::NoKeyword => {
126                    let _ = self.parser.next_token();
127                    let uppercase = w.value.to_uppercase();
128                    match uppercase.as_str() {
129                        FLOW => self.parse_create_flow(false),
130                        _ => self.unsupported(w.to_string()),
131                    }
132                }
133                _ => self.unsupported(w.to_string()),
134            },
135            unexpected => self.unsupported(unexpected.to_string()),
136        }
137    }
138
139    /// Parse `CREAVE VIEW` statement.
140    fn parse_create_view(&mut self, or_replace: bool) -> Result<Statement> {
141        let if_not_exists = self.parse_if_not_exist()?;
142        let view_name = self.intern_parse_table_name()?;
143
144        let columns = self.parse_view_columns()?;
145
146        self.parser
147            .expect_keyword(Keyword::AS)
148            .context(SyntaxSnafu)?;
149
150        let query = self.parse_query()?;
151
152        Ok(Statement::CreateView(CreateView {
153            name: view_name,
154            columns,
155            or_replace,
156            query: Box::new(query),
157            if_not_exists,
158        }))
159    }
160
161    fn parse_view_columns(&mut self) -> Result<Vec<Ident>> {
162        let mut columns = vec![];
163        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
164            return Ok(columns);
165        }
166
167        loop {
168            let name = self.parse_column_name().context(SyntaxSnafu)?;
169
170            columns.push(name);
171
172            let comma = self.parser.consume_token(&Token::Comma);
173            if self.parser.consume_token(&Token::RParen) {
174                // allow a trailing comma, even though it's not in standard
175                break;
176            } else if !comma {
177                return self.expected("',' or ')' after column name", self.parser.peek_token());
178            }
179        }
180
181        Ok(columns)
182    }
183
184    fn parse_create_external_table(&mut self) -> Result<Statement> {
185        let _ = self.parser.next_token();
186        self.parser
187            .expect_keyword(Keyword::TABLE)
188            .context(SyntaxSnafu)?;
189        let if_not_exists = self.parse_if_not_exist()?;
190        let table_name = self.intern_parse_table_name()?;
191        let (columns, constraints) = self.parse_columns()?;
192        if !columns.is_empty() {
193            validate_time_index(&columns, &constraints)?;
194        }
195
196        let engine = self.parse_table_engine(common_catalog::consts::FILE_ENGINE)?;
197        let options = self.parse_create_table_options()?;
198        Ok(Statement::CreateExternalTable(CreateExternalTable {
199            name: table_name,
200            columns,
201            constraints,
202            options,
203            if_not_exists,
204            engine,
205        }))
206    }
207
208    fn parse_create_database(&mut self) -> Result<Statement> {
209        let _ = self.parser.next_token();
210        let if_not_exists = self.parse_if_not_exist()?;
211        let database_name = self.parse_object_name().context(error::UnexpectedSnafu {
212            expected: "a database name",
213            actual: self.peek_token_as_string(),
214        })?;
215        let database_name = Self::canonicalize_object_name(database_name)?;
216
217        let options = self
218            .parser
219            .parse_options(Keyword::WITH)
220            .context(SyntaxSnafu)?
221            .into_iter()
222            .map(parse_option_string)
223            .collect::<Result<HashMap<String, OptionValue>>>()?;
224
225        for key in options.keys() {
226            ensure!(
227                validate_database_option(key),
228                InvalidDatabaseOptionSnafu { key: key.clone() }
229            );
230        }
231        if let Some(append_mode) = options.get("append_mode").and_then(|x| x.as_string())
232            && append_mode == "true"
233            && options.contains_key("merge_mode")
234        {
235            return InvalidDatabaseOptionSnafu {
236                key: "merge_mode".to_string(),
237            }
238            .fail();
239        }
240
241        Ok(Statement::CreateDatabase(CreateDatabase {
242            name: database_name,
243            if_not_exists,
244            options: OptionMap::new(options),
245        }))
246    }
247
248    fn parse_create_table(&mut self) -> Result<Statement> {
249        let _ = self.parser.next_token();
250
251        let if_not_exists = self.parse_if_not_exist()?;
252
253        let table_name = self.intern_parse_table_name()?;
254
255        if self.parser.parse_keyword(Keyword::LIKE) {
256            let source_name = self.intern_parse_table_name()?;
257
258            return Ok(Statement::CreateTableLike(CreateTableLike {
259                table_name,
260                source_name,
261            }));
262        }
263
264        let (columns, constraints) = self.parse_columns()?;
265        validate_time_index(&columns, &constraints)?;
266
267        let partitions = self.parse_partitions()?;
268        if let Some(partitions) = &partitions {
269            validate_partitions(&columns, partitions)?;
270        }
271
272        let engine = self.parse_table_engine(default_engine())?;
273        let options = self.parse_create_table_options()?;
274        let create_table = CreateTable {
275            if_not_exists,
276            name: table_name,
277            columns,
278            engine,
279            constraints,
280            options,
281            table_id: 0, // table id is assigned by catalog manager
282            partitions,
283        };
284
285        Ok(Statement::CreateTable(create_table))
286    }
287
288    /// "CREATE FLOW" clause
289    fn parse_create_flow(&mut self, or_replace: bool) -> Result<Statement> {
290        let if_not_exists = self.parse_if_not_exist()?;
291
292        let flow_name = self.intern_parse_table_name()?;
293
294        // make `SINK` case in-sensitive
295        if let Token::Word(word) = self.parser.peek_token().token
296            && word.value.eq_ignore_ascii_case(SINK)
297        {
298            self.parser.next_token();
299        } else {
300            Err(ParserError::ParserError(
301                "Expect `SINK` keyword".to_string(),
302            ))
303            .context(SyntaxSnafu)?
304        }
305        self.parser
306            .expect_keyword(Keyword::TO)
307            .context(SyntaxSnafu)?;
308
309        let output_table_name = self.intern_parse_table_name()?;
310
311        let expire_after = if let Token::Word(w1) = &self.parser.peek_token().token
312            && w1.value.eq_ignore_ascii_case(EXPIRE)
313        {
314            self.parser.next_token();
315            if let Token::Word(w2) = &self.parser.peek_token().token
316                && w2.value.eq_ignore_ascii_case(AFTER)
317            {
318                self.parser.next_token();
319                Some(self.parse_interval_no_month("EXPIRE AFTER")?)
320            } else {
321                None
322            }
323        } else {
324            None
325        };
326
327        let eval_interval = if self
328            .parser
329            .consume_tokens(&[Token::make_keyword("EVAL"), Token::make_keyword("INTERVAL")])
330        {
331            Some(self.parse_interval_no_month("EVAL INTERVAL")?)
332        } else {
333            None
334        };
335
336        let comment = if self.parser.parse_keyword(Keyword::COMMENT) {
337            match self.parser.next_token() {
338                TokenWithSpan {
339                    token: Token::SingleQuotedString(value, ..),
340                    ..
341                } => Some(value),
342                unexpected => {
343                    return self
344                        .parser
345                        .expected("string", unexpected)
346                        .context(SyntaxSnafu);
347                }
348            }
349        } else {
350            None
351        };
352
353        let flow_options = self
354            .parser
355            .parse_options(Keyword::WITH)
356            .context(SyntaxSnafu)?
357            .into_iter()
358            .map(parse_option_string)
359            .collect::<Result<HashMap<String, OptionValue>>>()?;
360
361        self.parser
362            .expect_keyword(Keyword::AS)
363            .context(SyntaxSnafu)?;
364
365        let query = Box::new(self.parse_flow_sql_or_tql(true)?);
366
367        Ok(Statement::CreateFlow(CreateFlow {
368            flow_name,
369            sink_table_name: output_table_name,
370            or_replace,
371            if_not_exists,
372            expire_after,
373            eval_interval,
374            comment,
375            flow_options: flow_option_map(flow_options),
376            query,
377        }))
378    }
379
380    fn parse_flow_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
381        let start_loc = self.parser.peek_token().span.start;
382        let start_index = location_to_index(self.sql, &start_loc);
383
384        let starts_with_with = matches!(
385            self.parser.peek_token().token,
386            Token::Word(w) if w.keyword == Keyword::WITH
387        );
388
389        // only accept sql or tql
390        let query = match self.parser.peek_token().token {
391            Token::Word(w) => match w.keyword {
392                Keyword::SELECT => self.parse_query(),
393                Keyword::WITH => self.parse_with_tql_with_now(require_now_expr),
394                Keyword::NoKeyword
395                    if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
396                {
397                    self.parse_tql(require_now_expr)
398                }
399
400                _ => self.unsupported(self.peek_token_as_string()),
401            },
402            _ => self.unsupported(self.peek_token_as_string()),
403        }?;
404
405        if starts_with_with {
406            let Statement::Query(query) = &query else {
407                return InvalidFlowQuerySnafu {
408                    reason: "Expect a query after WITH".to_string(),
409                }
410                .fail();
411            };
412
413            if utils::has_tql_cte(query) && !utils::is_simple_tql_cte_query(query) {
414                return InvalidFlowQuerySnafu {
415                    reason: "WITH is only supported for the simplest TQL CTE in CREATE FLOW"
416                        .to_string(),
417                }
418                .fail();
419            }
420        }
421
422        let end_token = self.parser.peek_token();
423
424        let raw_query = if end_token == Token::EOF {
425            &self.sql[start_index..]
426        } else {
427            let end_loc = end_token.span.end;
428            let end_index = location_to_index(self.sql, &end_loc);
429            &self.sql[start_index..end_index.min(self.sql.len())]
430        };
431        let raw_query = raw_query.trim_end_matches(";");
432
433        let query = SqlOrTql::try_from_statement(query, raw_query)?;
434        Ok(query)
435    }
436
437    /// Parse the interval expr to duration in seconds.
438    fn parse_interval_no_month(&mut self, context: &str) -> Result<i64> {
439        let interval = self.parse_interval_month_day_nano()?.0;
440        if interval.months != 0 {
441            return InvalidIntervalSnafu {
442                reason: format!("Interval with months is not allowed in {context}"),
443            }
444            .fail();
445        }
446        Ok(
447            interval.nanoseconds / 1_000_000_000
448                + interval.days as i64 * 60 * 60 * 24
449                + interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, // 1 month=365.25/12=30.44 days
450                                                                       // this is to keep the same as https://docs.rs/humantime/latest/humantime/fn.parse_duration.html
451                                                                       // which we use in database to parse i.e. ttl interval and many other intervals
452        )
453    }
454
455    /// Parse interval expr to [`IntervalMonthDayNano`].
456    fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> {
457        let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
458        let raw_interval_expr = interval_expr.to_string();
459        let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone(), false)?
460            .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
461            .ok()
462            .with_context(|| InvalidIntervalSnafu {
463                reason: format!("cannot cast {} to interval type", interval_expr),
464            })?;
465        if let ScalarValue::IntervalMonthDayNano(Some(interval)) = interval {
466            Ok((interval, raw_interval_expr))
467        } else {
468            unreachable!()
469        }
470    }
471
472    fn parse_if_not_exist(&mut self) -> Result<bool> {
473        match self.parser.peek_token().token {
474            Token::Word(w) if Keyword::IF != w.keyword => return Ok(false),
475            _ => {}
476        }
477
478        if self.parser.parse_keywords(&[Keyword::IF, Keyword::NOT]) {
479            return self
480                .parser
481                .expect_keyword(Keyword::EXISTS)
482                .map(|_| true)
483                .context(UnexpectedSnafu {
484                    expected: "EXISTS",
485                    actual: self.peek_token_as_string(),
486                });
487        }
488
489        if self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]) {
490            return UnsupportedSnafu { keyword: "EXISTS" }.fail();
491        }
492
493        Ok(false)
494    }
495
496    fn parse_create_table_options(&mut self) -> Result<OptionMap> {
497        parse_with_options(&mut self.parser)
498    }
499
500    /// "PARTITION ON COLUMNS (...)" clause
501    fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
502        if !self.parser.parse_keyword(Keyword::PARTITION) {
503            return Ok(None);
504        }
505
506        self.parse_partition_on_columns().map(Some)
507    }
508
509    /// Parses the "ON COLUMNS (...) (...)" part after "PARTITION".
510    pub(crate) fn parse_partition_on_columns(&mut self) -> Result<Partitions> {
511        self.parser
512            .expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
513            .context(error::UnexpectedSnafu {
514                expected: "ON, COLUMNS",
515                actual: self.peek_token_as_string(),
516            })?;
517
518        let raw_column_list = self
519            .parser
520            .parse_parenthesized_column_list(Mandatory, false)
521            .context(error::SyntaxSnafu)?;
522        let column_list = raw_column_list
523            .into_iter()
524            .map(Self::canonicalize_identifier)
525            .collect();
526
527        let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
528
529        Ok(Partitions { column_list, exprs })
530    }
531
532    fn parse_partition_entry(&mut self) -> Result<Expr> {
533        self.parser.parse_expr().context(error::SyntaxSnafu)
534    }
535
536    /// Parse a comma-separated list wrapped by "()", and of which all items accepted by `F`
537    fn parse_comma_separated<T, F>(&mut self, mut f: F) -> Result<Vec<T>>
538    where
539        F: FnMut(&mut ParserContext<'a>) -> Result<T>,
540    {
541        self.parser
542            .expect_token(&Token::LParen)
543            .context(error::UnexpectedSnafu {
544                expected: "(",
545                actual: self.peek_token_as_string(),
546            })?;
547
548        let mut values = vec![];
549        while self.parser.peek_token() != Token::RParen {
550            values.push(f(self)?);
551            if !self.parser.consume_token(&Token::Comma) {
552                break;
553            }
554        }
555
556        self.parser
557            .expect_token(&Token::RParen)
558            .context(error::UnexpectedSnafu {
559                expected: ")",
560                actual: self.peek_token_as_string(),
561            })?;
562
563        Ok(values)
564    }
565
566    /// Parse the columns and constraints.
567    fn parse_columns(&mut self) -> Result<(Vec<Column>, Vec<TableConstraint>)> {
568        let mut columns = vec![];
569        let mut constraints = vec![];
570        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
571            return Ok((columns, constraints));
572        }
573
574        loop {
575            if let Some(constraint) = self.parse_optional_table_constraint()? {
576                constraints.push(constraint);
577            } else if let Token::Word(_) = self.parser.peek_token().token {
578                self.parse_column(&mut columns, &mut constraints)?;
579            } else {
580                return self.expected(
581                    "column name or constraint definition",
582                    self.parser.peek_token(),
583                );
584            }
585            let comma = self.parser.consume_token(&Token::Comma);
586            if self.parser.consume_token(&Token::RParen) {
587                // allow a trailing comma, even though it's not in standard
588                break;
589            } else if !comma {
590                return self.expected(
591                    "',' or ')' after column definition",
592                    self.parser.peek_token(),
593                );
594            }
595        }
596
597        Ok((columns, constraints))
598    }
599
600    fn parse_column(
601        &mut self,
602        columns: &mut Vec<Column>,
603        constraints: &mut Vec<TableConstraint>,
604    ) -> Result<()> {
605        let mut column = self.parse_column_def()?;
606
607        let mut time_index_opt_idx = None;
608        for (index, opt) in column.options().iter().enumerate() {
609            if let ColumnOption::DialectSpecific(tokens) = &opt.option
610                && matches!(
611                    &tokens[..],
612                    [
613                        Token::Word(Word {
614                            keyword: Keyword::TIME,
615                            ..
616                        }),
617                        Token::Word(Word {
618                            keyword: Keyword::INDEX,
619                            ..
620                        })
621                    ]
622                )
623            {
624                ensure!(
625                    time_index_opt_idx.is_none(),
626                    InvalidColumnOptionSnafu {
627                        name: column.name().to_string(),
628                        msg: "duplicated time index",
629                    }
630                );
631                time_index_opt_idx = Some(index);
632
633                let constraint = TableConstraint::TimeIndex {
634                    column: Ident::new(column.name().value.clone()),
635                };
636                constraints.push(constraint);
637            }
638        }
639
640        if let Some(index) = time_index_opt_idx {
641            ensure!(
642                !column.options().contains(&ColumnOptionDef {
643                    option: ColumnOption::Null,
644                    name: None,
645                }),
646                InvalidColumnOptionSnafu {
647                    name: column.name().to_string(),
648                    msg: "time index column can't be null",
649                }
650            );
651
652            // The timestamp type may be an alias type, we have to retrieve the actual type.
653            let data_type = get_unalias_type(column.data_type());
654            ensure!(
655                matches!(data_type, DataType::Timestamp(_, _)),
656                InvalidColumnOptionSnafu {
657                    name: column.name().to_string(),
658                    msg: "time index column data type should be timestamp",
659                }
660            );
661
662            let not_null_opt = ColumnOptionDef {
663                option: ColumnOption::NotNull,
664                name: None,
665            };
666
667            if !column.options().contains(&not_null_opt) {
668                column.mut_options().push(not_null_opt);
669            }
670
671            let _ = column.mut_options().remove(index);
672        }
673
674        columns.push(column);
675
676        Ok(())
677    }
678
679    /// Parse the column name and check if it's valid.
680    fn parse_column_name(&mut self) -> std::result::Result<Ident, ParserError> {
681        let name = self.parser.parse_identifier()?;
682        if name.quote_style.is_none() &&
683        // "ALL_KEYWORDS" are sorted.
684            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()
685        {
686            return Err(ParserError::ParserError(format!(
687                "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
688                &name.value
689            )));
690        }
691
692        Ok(name)
693    }
694
695    pub fn parse_column_def(&mut self) -> Result<Column> {
696        let name = self.parse_column_name().context(SyntaxSnafu)?;
697        let parser = &mut self.parser;
698
699        ensure!(
700            !(name.quote_style.is_none() &&
701            // "ALL_KEYWORDS" are sorted.
702            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()),
703            InvalidSqlSnafu {
704                msg: format!(
705                    "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
706                    &name.value
707                ),
708            }
709        );
710
711        let mut extensions = ColumnExtensions::default();
712
713        let data_type = parser.parse_data_type().context(SyntaxSnafu)?;
714        // Must immediately parse the JSON datatype format because it is closely after the "JSON"
715        // datatype, like this: "JSON(format = ...)".
716        if matches!(data_type, DataType::JSON) {
717            extensions.json_datatype_options = json::parse_json_datatype_options(parser)?;
718        }
719
720        let mut options = vec![];
721        loop {
722            if parser.parse_keyword(Keyword::CONSTRAINT) {
723                let name = Some(parser.parse_identifier().context(SyntaxSnafu)?);
724                if let Some(option) = Self::parse_optional_column_option(parser)? {
725                    options.push(ColumnOptionDef { name, option });
726                } else {
727                    return parser
728                        .expected(
729                            "constraint details after CONSTRAINT <name>",
730                            parser.peek_token(),
731                        )
732                        .context(SyntaxSnafu);
733                }
734            } else if let Some(option) = Self::parse_optional_column_option(parser)? {
735                options.push(ColumnOptionDef { name: None, option });
736            } else if !Self::parse_column_extensions(parser, &name, &data_type, &mut extensions)? {
737                break;
738            };
739        }
740
741        Ok(Column {
742            column_def: ColumnDef {
743                name: Self::canonicalize_identifier(name),
744                data_type,
745                options,
746            },
747            extensions,
748        })
749    }
750
751    fn parse_optional_column_option(parser: &mut Parser<'_>) -> Result<Option<ColumnOption>> {
752        if parser.parse_keywords(&[Keyword::CHARACTER, Keyword::SET]) {
753            Ok(Some(ColumnOption::CharacterSet(
754                parser.parse_object_name(false).context(SyntaxSnafu)?,
755            )))
756        } else if parser.parse_keywords(&[Keyword::NOT, Keyword::NULL]) {
757            Ok(Some(ColumnOption::NotNull))
758        } else if parser.parse_keywords(&[Keyword::COMMENT]) {
759            match parser.next_token() {
760                TokenWithSpan {
761                    token: Token::SingleQuotedString(value, ..),
762                    ..
763                } => Ok(Some(ColumnOption::Comment(value))),
764                unexpected => parser.expected("string", unexpected).context(SyntaxSnafu),
765            }
766        } else if parser.parse_keyword(Keyword::NULL) {
767            Ok(Some(ColumnOption::Null))
768        } else if parser.parse_keyword(Keyword::DEFAULT) {
769            Ok(Some(ColumnOption::Default(
770                parser.parse_expr().context(SyntaxSnafu)?,
771            )))
772        } else if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
773            Ok(Some(ColumnOption::PrimaryKey(PrimaryKeyConstraint {
774                name: None,
775                index_name: None,
776                index_type: None,
777                columns: vec![],
778                index_options: vec![],
779                characteristics: None,
780            })))
781        } else if parser.parse_keyword(Keyword::UNIQUE) {
782            Ok(Some(ColumnOption::Unique(UniqueConstraint {
783                name: None,
784                index_name: None,
785                index_type_display: KeyOrIndexDisplay::None,
786                index_type: None,
787                columns: vec![],
788                index_options: vec![],
789                characteristics: None,
790                nulls_distinct: NullsDistinctOption::None,
791            })))
792        } else if parser.parse_keywords(&[Keyword::TIME, Keyword::INDEX]) {
793            // Use a DialectSpecific option for time index
794            Ok(Some(ColumnOption::DialectSpecific(vec![
795                Token::Word(Word {
796                    value: "TIME".to_string(),
797                    quote_style: None,
798                    keyword: Keyword::TIME,
799                }),
800                Token::Word(Word {
801                    value: "INDEX".to_string(),
802                    quote_style: None,
803                    keyword: Keyword::INDEX,
804                }),
805            ])))
806        } else {
807            Ok(None)
808        }
809    }
810
811    /// Parse a column option extensions.
812    ///
813    /// This function will handle:
814    /// - Vector type
815    /// - Indexes
816    fn parse_column_extensions(
817        parser: &mut Parser<'_>,
818        column_name: &Ident,
819        column_type: &DataType,
820        column_extensions: &mut ColumnExtensions,
821    ) -> Result<bool> {
822        if let DataType::Custom(name, tokens) = column_type
823            && name.0.len() == 1
824            && &name.0[0].to_string_unquoted().to_uppercase() == "VECTOR"
825        {
826            ensure!(
827                tokens.len() == 1,
828                InvalidColumnOptionSnafu {
829                    name: column_name.to_string(),
830                    msg: "VECTOR type should have dimension",
831                }
832            );
833
834            let dimension =
835                tokens[0]
836                    .parse::<u32>()
837                    .ok()
838                    .with_context(|| InvalidColumnOptionSnafu {
839                        name: column_name.to_string(),
840                        msg: "dimension should be a positive integer",
841                    })?;
842
843            let options = OptionMap::from([(VECTOR_OPT_DIM.to_string(), dimension.to_string())]);
844            column_extensions.vector_options = Some(options);
845        }
846
847        // parse index options in column definition
848        let mut is_index_declared = false;
849
850        // skipping index
851        if let Token::Word(word) = parser.peek_token().token
852            && word.value.eq_ignore_ascii_case(SKIPPING)
853        {
854            parser.next_token();
855            // Consume `INDEX` keyword
856            ensure!(
857                parser.parse_keyword(Keyword::INDEX),
858                InvalidColumnOptionSnafu {
859                    name: column_name.to_string(),
860                    msg: "expect INDEX after SKIPPING keyword",
861                }
862            );
863            ensure!(
864                column_extensions.skipping_index_options.is_none(),
865                InvalidColumnOptionSnafu {
866                    name: column_name.to_string(),
867                    msg: "duplicated SKIPPING index option",
868                }
869            );
870
871            let options = parser
872                .parse_options(Keyword::WITH)
873                .context(error::SyntaxSnafu)?
874                .into_iter()
875                .map(parse_option_string)
876                .collect::<Result<Vec<_>>>()?;
877
878            for (key, _) in options.iter() {
879                ensure!(
880                    validate_column_skipping_index_create_option(key),
881                    InvalidColumnOptionSnafu {
882                        name: column_name.to_string(),
883                        msg: format!("invalid SKIPPING INDEX option: {key}"),
884                    }
885                );
886            }
887
888            let options = OptionMap::new(options);
889            column_extensions.skipping_index_options = Some(options);
890            is_index_declared |= true;
891        }
892
893        // fulltext index
894        if parser.parse_keyword(Keyword::FULLTEXT) {
895            // Consume `INDEX` keyword
896            ensure!(
897                parser.parse_keyword(Keyword::INDEX),
898                InvalidColumnOptionSnafu {
899                    name: column_name.to_string(),
900                    msg: "expect INDEX after FULLTEXT keyword",
901                }
902            );
903
904            ensure!(
905                column_extensions.fulltext_index_options.is_none(),
906                InvalidColumnOptionSnafu {
907                    name: column_name.to_string(),
908                    msg: "duplicated FULLTEXT INDEX option",
909                }
910            );
911
912            let column_type = get_unalias_type(column_type);
913            let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
914            ensure!(
915                data_type == ConcreteDataType::string_datatype(),
916                InvalidColumnOptionSnafu {
917                    name: column_name.to_string(),
918                    msg: "FULLTEXT index only supports string type",
919                }
920            );
921
922            let options = parser
923                .parse_options(Keyword::WITH)
924                .context(error::SyntaxSnafu)?
925                .into_iter()
926                .map(parse_option_string)
927                .collect::<Result<Vec<_>>>()?;
928
929            for (key, _) in options.iter() {
930                ensure!(
931                    validate_column_fulltext_create_option(key),
932                    InvalidColumnOptionSnafu {
933                        name: column_name.to_string(),
934                        msg: format!("invalid FULLTEXT INDEX option: {key}"),
935                    }
936                );
937            }
938
939            let options = OptionMap::new(options);
940            column_extensions.fulltext_index_options = Some(options);
941            is_index_declared |= true;
942        }
943
944        // inverted index
945        if let Token::Word(word) = parser.peek_token().token
946            && word.value.eq_ignore_ascii_case(INVERTED)
947        {
948            parser.next_token();
949            // Consume `INDEX` keyword
950            ensure!(
951                parser.parse_keyword(Keyword::INDEX),
952                InvalidColumnOptionSnafu {
953                    name: column_name.to_string(),
954                    msg: "expect INDEX after INVERTED keyword",
955                }
956            );
957
958            ensure!(
959                column_extensions.inverted_index_options.is_none(),
960                InvalidColumnOptionSnafu {
961                    name: column_name.to_string(),
962                    msg: "duplicated INVERTED index option",
963                }
964            );
965
966            // inverted index doesn't have options, skipping `WITH`
967            // try cache `WITH` and throw error
968            let with_token = parser.peek_token();
969            ensure!(
970                with_token.token
971                    != Token::Word(Word {
972                        value: "WITH".to_string(),
973                        keyword: Keyword::WITH,
974                        quote_style: None,
975                    }),
976                InvalidColumnOptionSnafu {
977                    name: column_name.to_string(),
978                    msg: "INVERTED index doesn't support options",
979                }
980            );
981
982            column_extensions.inverted_index_options = Some(OptionMap::default());
983            is_index_declared |= true;
984        }
985
986        // vector index
987        if let Token::Word(word) = parser.peek_token().token
988            && word.value.eq_ignore_ascii_case(VECTOR)
989        {
990            parser.next_token();
991            // Consume `INDEX` keyword
992            ensure!(
993                parser.parse_keyword(Keyword::INDEX),
994                InvalidColumnOptionSnafu {
995                    name: column_name.to_string(),
996                    msg: "expect INDEX after VECTOR keyword",
997                }
998            );
999
1000            ensure!(
1001                column_extensions.vector_index_options.is_none(),
1002                InvalidColumnOptionSnafu {
1003                    name: column_name.to_string(),
1004                    msg: "duplicated VECTOR INDEX option",
1005                }
1006            );
1007
1008            // Check that column is a vector type
1009            let column_type = get_unalias_type(column_type);
1010            let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
1011            ensure!(
1012                matches!(data_type, ConcreteDataType::Vector(_)),
1013                InvalidColumnOptionSnafu {
1014                    name: column_name.to_string(),
1015                    msg: "VECTOR INDEX only supports Vector type columns",
1016                }
1017            );
1018
1019            let options = parser
1020                .parse_options(Keyword::WITH)
1021                .context(error::SyntaxSnafu)?
1022                .into_iter()
1023                .map(parse_option_string)
1024                .collect::<Result<Vec<_>>>()?;
1025
1026            for (key, _) in options.iter() {
1027                ensure!(
1028                    validate_column_vector_index_create_option(key),
1029                    InvalidColumnOptionSnafu {
1030                        name: column_name.to_string(),
1031                        msg: format!("invalid VECTOR INDEX option: {key}"),
1032                    }
1033                );
1034            }
1035
1036            let options = OptionMap::new(options);
1037            column_extensions.vector_index_options = Some(options);
1038            is_index_declared |= true;
1039        }
1040
1041        Ok(is_index_declared)
1042    }
1043
1044    fn parse_optional_table_constraint(&mut self) -> Result<Option<TableConstraint>> {
1045        match self.parser.next_token() {
1046            TokenWithSpan {
1047                token: Token::Word(w),
1048                ..
1049            } if w.keyword == Keyword::PRIMARY => {
1050                self.parser
1051                    .expect_keyword(Keyword::KEY)
1052                    .context(error::UnexpectedSnafu {
1053                        expected: "KEY",
1054                        actual: self.peek_token_as_string(),
1055                    })?;
1056                let raw_columns = self
1057                    .parser
1058                    .parse_parenthesized_column_list(Mandatory, false)
1059                    .context(error::SyntaxSnafu)?;
1060                let columns = raw_columns
1061                    .into_iter()
1062                    .map(Self::canonicalize_identifier)
1063                    .collect();
1064                Ok(Some(TableConstraint::PrimaryKey { columns }))
1065            }
1066            TokenWithSpan {
1067                token: Token::Word(w),
1068                ..
1069            } if w.keyword == Keyword::TIME => {
1070                self.parser
1071                    .expect_keyword(Keyword::INDEX)
1072                    .context(error::UnexpectedSnafu {
1073                        expected: "INDEX",
1074                        actual: self.peek_token_as_string(),
1075                    })?;
1076
1077                let raw_columns = self
1078                    .parser
1079                    .parse_parenthesized_column_list(Mandatory, false)
1080                    .context(error::SyntaxSnafu)?;
1081                let mut columns = raw_columns
1082                    .into_iter()
1083                    .map(Self::canonicalize_identifier)
1084                    .collect::<Vec<_>>();
1085
1086                ensure!(
1087                    columns.len() == 1,
1088                    InvalidTimeIndexSnafu {
1089                        msg: "it should contain only one column in time index",
1090                    }
1091                );
1092
1093                Ok(Some(TableConstraint::TimeIndex {
1094                    column: columns.pop().unwrap(),
1095                }))
1096            }
1097            _ => {
1098                self.parser.prev_token();
1099                Ok(None)
1100            }
1101        }
1102    }
1103
1104    /// Parses the set of valid formats
1105    fn parse_table_engine(&mut self, default: &str) -> Result<String> {
1106        if !self.consume_token(ENGINE) {
1107            return Ok(default.to_string());
1108        }
1109
1110        self.parser
1111            .expect_token(&Token::Eq)
1112            .context(error::UnexpectedSnafu {
1113                expected: "=",
1114                actual: self.peek_token_as_string(),
1115            })?;
1116
1117        let token = self.parser.next_token();
1118        if let Token::Word(w) = token.token {
1119            Ok(w.value)
1120        } else {
1121            self.expected("'Engine' is missing", token)
1122        }
1123    }
1124}
1125
1126fn validate_time_index(columns: &[Column], constraints: &[TableConstraint]) -> Result<()> {
1127    let time_index_constraints: Vec<_> = constraints
1128        .iter()
1129        .filter_map(|c| match c {
1130            TableConstraint::TimeIndex { column } => Some(column),
1131            _ => None,
1132        })
1133        .unique()
1134        .collect();
1135
1136    ensure!(!time_index_constraints.is_empty(), MissingTimeIndexSnafu);
1137    ensure!(
1138        time_index_constraints.len() == 1,
1139        InvalidTimeIndexSnafu {
1140            msg: format!(
1141                "expected only one time index constraint but actual {}",
1142                time_index_constraints.len()
1143            ),
1144        }
1145    );
1146
1147    // It's safe to use time_index_constraints[0][0],
1148    // we already check the bound above.
1149    let time_index_column_ident = &time_index_constraints[0];
1150    let time_index_column = columns
1151        .iter()
1152        .find(|c| c.name().value == *time_index_column_ident.value)
1153        .with_context(|| InvalidTimeIndexSnafu {
1154            msg: format!(
1155                "time index column {} not found in columns",
1156                time_index_column_ident
1157            ),
1158        })?;
1159
1160    let time_index_data_type = get_unalias_type(time_index_column.data_type());
1161    ensure!(
1162        matches!(time_index_data_type, DataType::Timestamp(_, _)),
1163        InvalidColumnOptionSnafu {
1164            name: time_index_column.name().to_string(),
1165            msg: "time index column data type should be timestamp",
1166        }
1167    );
1168
1169    Ok(())
1170}
1171
1172fn get_unalias_type(data_type: &DataType) -> DataType {
1173    match data_type {
1174        DataType::Custom(name, tokens) if name.0.len() == 1 && tokens.is_empty() => {
1175            if let Some(real_type) =
1176                get_data_type_by_alias_name(name.0[0].to_string_unquoted().as_str())
1177            {
1178                real_type
1179            } else {
1180                data_type.clone()
1181            }
1182        }
1183        _ => data_type.clone(),
1184    }
1185}
1186
1187fn validate_partitions(columns: &[Column], partitions: &Partitions) -> Result<()> {
1188    let partition_columns = ensure_partition_columns_defined(columns, partitions)?;
1189
1190    ensure_exprs_are_binary(&partitions.exprs, &partition_columns)?;
1191
1192    Ok(())
1193}
1194
1195/// Ensure all exprs are binary expr and all the columns are defined in the column list.
1196fn ensure_exprs_are_binary(exprs: &[Expr], columns: &[&Column]) -> Result<()> {
1197    for expr in exprs {
1198        // The first level must be binary expr
1199        if let Expr::BinaryOp { left, op: _, right } = expr {
1200            ensure_one_expr(left, columns)?;
1201            ensure_one_expr(right, columns)?;
1202        } else {
1203            return error::InvalidSqlSnafu {
1204                msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1205            }
1206            .fail();
1207        }
1208    }
1209    Ok(())
1210}
1211
1212/// Check if the expr is a binary expr, an ident or a literal value.
1213/// If is ident, then check it is in the column list.
1214/// This recursive function is intended to be used by [ensure_exprs_are_binary].
1215fn ensure_one_expr(expr: &Expr, columns: &[&Column]) -> Result<()> {
1216    match expr {
1217        Expr::BinaryOp { left, op: _, right } => {
1218            ensure_one_expr(left, columns)?;
1219            ensure_one_expr(right, columns)?;
1220            Ok(())
1221        }
1222        Expr::Identifier(ident) => {
1223            let column_name = &ident.value;
1224            ensure!(
1225                columns.iter().any(|c| &c.name().value == column_name),
1226                error::InvalidSqlSnafu {
1227                    msg: format!(
1228                        "Column {:?} in rule expr is not referenced in PARTITION ON",
1229                        column_name
1230                    ),
1231                }
1232            );
1233            Ok(())
1234        }
1235        Expr::Value(_) => Ok(()),
1236        Expr::UnaryOp { expr, .. } => {
1237            ensure_one_expr(expr, columns)?;
1238            Ok(())
1239        }
1240        _ => error::InvalidSqlSnafu {
1241            msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1242        }
1243        .fail(),
1244    }
1245}
1246
1247/// Ensure that all columns used in "PARTITION ON COLUMNS" are defined in create table.
1248fn ensure_partition_columns_defined<'a>(
1249    columns: &'a [Column],
1250    partitions: &'a Partitions,
1251) -> Result<Vec<&'a Column>> {
1252    partitions
1253        .column_list
1254        .iter()
1255        .map(|x| {
1256            let x = ParserContext::canonicalize_identifier(x.clone());
1257            // Normally the columns in "create table" won't be too many,
1258            // a linear search to find the target every time is fine.
1259            columns
1260                .iter()
1261                .find(|c| *c.name().value == x.value)
1262                .context(error::InvalidSqlSnafu {
1263                    msg: format!("Partition column {:?} not defined", x.value),
1264                })
1265        })
1266        .collect::<Result<Vec<&Column>>>()
1267}
1268
1269#[cfg(test)]
1270mod tests {
1271    use std::assert_matches;
1272    use std::collections::HashMap;
1273
1274    use common_catalog::consts::FILE_ENGINE;
1275    use common_error::ext::ErrorExt;
1276    use sqlparser::ast::ColumnOption::NotNull;
1277    use sqlparser::ast::{BinaryOperator, Expr, ObjectName, ObjectNamePart, Value};
1278    use sqlparser::dialect::GenericDialect;
1279    use sqlparser::tokenizer::Tokenizer;
1280
1281    use super::*;
1282    use crate::dialect::GreptimeDbDialect;
1283    use crate::parser::ParseOptions;
1284
1285    fn string_option_map(
1286        entries: impl IntoIterator<Item = (&'static str, &'static str)>,
1287    ) -> OptionMap {
1288        OptionMap::new(entries.into_iter().map(|(key, value)| {
1289            (
1290                key.to_string(),
1291                OptionValue::try_new(Expr::Value(
1292                    Value::SingleQuotedString(value.to_string()).into(),
1293                ))
1294                .unwrap(),
1295            )
1296        }))
1297    }
1298
1299    #[test]
1300    fn test_parse_create_table_like() {
1301        let sql = "CREATE TABLE t1 LIKE t2";
1302        let stmts =
1303            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1304                .unwrap();
1305
1306        assert_eq!(1, stmts.len());
1307        match &stmts[0] {
1308            Statement::CreateTableLike(c) => {
1309                assert_eq!(c.table_name.to_string(), "t1");
1310                assert_eq!(c.source_name.to_string(), "t2");
1311            }
1312            _ => unreachable!(),
1313        }
1314    }
1315
1316    #[test]
1317    fn test_validate_external_table_options() {
1318        let sql = "CREATE EXTERNAL TABLE city (
1319            host string,
1320            ts timestamp,
1321            cpu float64 default 0,
1322            memory float64,
1323            TIME INDEX (ts),
1324            PRIMARY KEY(ts, host)
1325        ) with(location='/var/data/city.csv',format='csv',foo='bar');";
1326
1327        let result =
1328            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1329        assert!(matches!(
1330            result,
1331            Err(error::Error::InvalidTableOption { .. })
1332        ));
1333    }
1334
1335    #[test]
1336    fn test_parse_create_external_table() {
1337        struct Test<'a> {
1338            sql: &'a str,
1339            expected_table_name: &'a str,
1340            expected_options: HashMap<&'a str, &'a str>,
1341            expected_engine: &'a str,
1342            expected_if_not_exist: bool,
1343        }
1344
1345        let tests = [
1346            Test {
1347                sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
1348                expected_table_name: "city",
1349                expected_options: HashMap::from([
1350                    ("location", "/var/data/city.csv"),
1351                    ("format", "csv"),
1352                ]),
1353                expected_engine: FILE_ENGINE,
1354                expected_if_not_exist: false,
1355            },
1356            Test {
1357                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');",
1358                expected_table_name: "city",
1359                expected_options: HashMap::from([
1360                    ("location", "/var/data/city.csv"),
1361                    ("format", "csv"),
1362                ]),
1363                expected_engine: "foo",
1364                expected_if_not_exist: true,
1365            },
1366            Test {
1367                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv','compaction.type'='bar');",
1368                expected_table_name: "city",
1369                expected_options: HashMap::from([
1370                    ("location", "/var/data/city.csv"),
1371                    ("format", "csv"),
1372                    ("compaction.type", "bar"),
1373                ]),
1374                expected_engine: "foo",
1375                expected_if_not_exist: true,
1376            },
1377        ];
1378
1379        for test in tests {
1380            let stmts = ParserContext::create_with_dialect(
1381                test.sql,
1382                &GreptimeDbDialect {},
1383                ParseOptions::default(),
1384            )
1385            .unwrap();
1386            assert_eq!(1, stmts.len());
1387            match &stmts[0] {
1388                Statement::CreateExternalTable(c) => {
1389                    assert_eq!(c.name.to_string(), test.expected_table_name.to_string());
1390                    assert_eq!(c.options.to_str_map(), test.expected_options);
1391                    assert_eq!(c.if_not_exists, test.expected_if_not_exist);
1392                    assert_eq!(c.engine, test.expected_engine);
1393                }
1394                _ => unreachable!(),
1395            }
1396        }
1397    }
1398
1399    #[test]
1400    fn test_parse_create_external_table_with_schema() {
1401        let sql = "CREATE EXTERNAL TABLE city (
1402            host string,
1403            ts timestamp,
1404            cpu float32 default 0,
1405            memory float64,
1406            TIME INDEX (ts),
1407            PRIMARY KEY(ts, host),
1408        ) with(location='/var/data/city.csv',format='csv');";
1409
1410        let options = HashMap::from([("location", "/var/data/city.csv"), ("format", "csv")]);
1411
1412        let stmts =
1413            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1414                .unwrap();
1415        assert_eq!(1, stmts.len());
1416        match &stmts[0] {
1417            Statement::CreateExternalTable(c) => {
1418                assert_eq!(c.name.to_string(), "city");
1419                assert_eq!(c.options.to_str_map(), options);
1420
1421                let columns = &c.columns;
1422                assert_column_def(&columns[0].column_def, "host", "STRING");
1423                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
1424                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
1425                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
1426
1427                let constraints = &c.constraints;
1428                assert_eq!(
1429                    &constraints[0],
1430                    &TableConstraint::TimeIndex {
1431                        column: Ident::new("ts"),
1432                    }
1433                );
1434                assert_eq!(
1435                    &constraints[1],
1436                    &TableConstraint::PrimaryKey {
1437                        columns: vec![Ident::new("ts"), Ident::new("host")]
1438                    }
1439                );
1440            }
1441            _ => unreachable!(),
1442        }
1443    }
1444
1445    #[test]
1446    fn test_parse_create_database() {
1447        let sql = "create database";
1448        let result =
1449            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1450        assert!(
1451            result
1452                .unwrap_err()
1453                .to_string()
1454                .contains("Unexpected token while parsing SQL statement")
1455        );
1456
1457        let sql = "create database prometheus";
1458        let stmts =
1459            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1460                .unwrap();
1461
1462        assert_eq!(1, stmts.len());
1463        match &stmts[0] {
1464            Statement::CreateDatabase(c) => {
1465                assert_eq!(c.name.to_string(), "prometheus");
1466                assert!(!c.if_not_exists);
1467            }
1468            _ => unreachable!(),
1469        }
1470
1471        let sql = "create database if not exists prometheus";
1472        let stmts =
1473            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1474                .unwrap();
1475
1476        assert_eq!(1, stmts.len());
1477        match &stmts[0] {
1478            Statement::CreateDatabase(c) => {
1479                assert_eq!(c.name.to_string(), "prometheus");
1480                assert!(c.if_not_exists);
1481            }
1482            _ => unreachable!(),
1483        }
1484
1485        let sql = "CREATE DATABASE `fOo`";
1486        let result =
1487            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1488        let stmts = result.unwrap();
1489        match &stmts.last().unwrap() {
1490            Statement::CreateDatabase(c) => {
1491                assert_eq!(c.name, vec![Ident::with_quote('`', "fOo")].into());
1492                assert!(!c.if_not_exists);
1493            }
1494            _ => unreachable!(),
1495        }
1496
1497        let sql = "CREATE DATABASE prometheus with (ttl='1h');";
1498        let result =
1499            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1500        let stmts = result.unwrap();
1501        match &stmts[0] {
1502            Statement::CreateDatabase(c) => {
1503                assert_eq!(c.name.to_string(), "prometheus");
1504                assert!(!c.if_not_exists);
1505                assert_eq!(c.options.get("ttl").unwrap(), "1h");
1506            }
1507            _ => unreachable!(),
1508        }
1509    }
1510
1511    #[test]
1512    fn test_parse_create_flow_more_testcases() {
1513        use pretty_assertions::assert_eq;
1514        fn parse_create_flow(sql: &str) -> CreateFlow {
1515            let stmts = ParserContext::create_with_dialect(
1516                sql,
1517                &GreptimeDbDialect {},
1518                ParseOptions::default(),
1519            )
1520            .unwrap();
1521            assert_eq!(1, stmts.len());
1522            match &stmts[0] {
1523                Statement::CreateFlow(c) => c.clone(),
1524                _ => unreachable!(),
1525            }
1526        }
1527        struct CreateFlowWoutQuery {
1528            /// Flow name
1529            pub flow_name: ObjectName,
1530            /// Output (sink) table name
1531            pub sink_table_name: ObjectName,
1532            /// Whether to replace existing task
1533            pub or_replace: bool,
1534            /// Create if not exist
1535            pub if_not_exists: bool,
1536            /// `EXPIRE AFTER`
1537            /// Duration in second as `i64`
1538            pub expire_after: Option<i64>,
1539            /// Comment string
1540            pub comment: Option<String>,
1541            /// Flow creation options
1542            pub flow_options: OptionMap,
1543        }
1544        let testcases = vec![
1545            (
1546                r"
1547CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1548SINK TO schema_1.table_1
1549EXPIRE AFTER INTERVAL '5 minutes'
1550COMMENT 'test comment'
1551AS
1552SELECT max(c1), min(c2) FROM schema_2.table_2;",
1553                CreateFlowWoutQuery {
1554                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1555                    sink_table_name: ObjectName::from(vec![
1556                        Ident::new("schema_1"),
1557                        Ident::new("table_1"),
1558                    ]),
1559                    or_replace: true,
1560                    if_not_exists: true,
1561                    expire_after: Some(300),
1562                    comment: Some("test comment".to_string()),
1563                    flow_options: OptionMap::default(),
1564                },
1565            ),
1566            (
1567                r"
1568CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1569SINK TO schema_1.table_1
1570EXPIRE AFTER INTERVAL '300 s'
1571COMMENT 'test comment'
1572AS
1573SELECT max(c1), min(c2) FROM schema_2.table_2;",
1574                CreateFlowWoutQuery {
1575                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1576                    sink_table_name: ObjectName::from(vec![
1577                        Ident::new("schema_1"),
1578                        Ident::new("table_1"),
1579                    ]),
1580                    or_replace: true,
1581                    if_not_exists: true,
1582                    expire_after: Some(300),
1583                    comment: Some("test comment".to_string()),
1584                    flow_options: OptionMap::default(),
1585                },
1586            ),
1587            (
1588                r"
1589CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1590SINK TO schema_1.table_1
1591EXPIRE AFTER '5 minutes'
1592COMMENT 'test comment'
1593AS
1594SELECT max(c1), min(c2) FROM schema_2.table_2;",
1595                CreateFlowWoutQuery {
1596                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1597                    sink_table_name: ObjectName::from(vec![
1598                        Ident::new("schema_1"),
1599                        Ident::new("table_1"),
1600                    ]),
1601                    or_replace: true,
1602                    if_not_exists: true,
1603                    expire_after: Some(300),
1604                    comment: Some("test comment".to_string()),
1605                    flow_options: OptionMap::default(),
1606                },
1607            ),
1608            (
1609                r"
1610CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1611SINK TO schema_1.table_1
1612EXPIRE AFTER '300 s'
1613COMMENT 'test comment'
1614AS
1615SELECT max(c1), min(c2) FROM schema_2.table_2;",
1616                CreateFlowWoutQuery {
1617                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1618                    sink_table_name: ObjectName::from(vec![
1619                        Ident::new("schema_1"),
1620                        Ident::new("table_1"),
1621                    ]),
1622                    or_replace: true,
1623                    if_not_exists: true,
1624                    expire_after: Some(300),
1625                    comment: Some("test comment".to_string()),
1626                    flow_options: OptionMap::default(),
1627                },
1628            ),
1629            (
1630                r"
1631CREATE FLOW `task_2`
1632SINK TO schema_1.table_1
1633EXPIRE AFTER '2 days 1h 2 min'
1634AS
1635SELECT max(c1), min(c2) FROM schema_2.table_2;",
1636                CreateFlowWoutQuery {
1637                    flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_2")]),
1638                    sink_table_name: ObjectName::from(vec![
1639                        Ident::new("schema_1"),
1640                        Ident::new("table_1"),
1641                    ]),
1642                    or_replace: false,
1643                    if_not_exists: false,
1644                    expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1645                    comment: None,
1646                    flow_options: OptionMap::default(),
1647                },
1648            ),
1649            (
1650                r"
1651create flow `task_3`
1652sink to schema_1.table_1
1653expire after '10 minutes'
1654as
1655select max(c1), min(c2) from schema_2.table_2;",
1656                CreateFlowWoutQuery {
1657                    flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_3")]),
1658                    sink_table_name: ObjectName::from(vec![
1659                        Ident::new("schema_1"),
1660                        Ident::new("table_1"),
1661                    ]),
1662                    or_replace: false,
1663                    if_not_exists: false,
1664                    expire_after: Some(600), // 10 minutes in seconds
1665                    comment: None,
1666                    flow_options: OptionMap::default(),
1667                },
1668            ),
1669            (
1670                r"
1671create or replace flow if not exists task_4
1672sink to schema_1.table_1
1673expire after interval '2 hours'
1674comment 'lowercase test'
1675as
1676select max(c1), min(c2) from schema_2.table_2;",
1677                CreateFlowWoutQuery {
1678                    flow_name: ObjectName::from(vec![Ident::new("task_4")]),
1679                    sink_table_name: ObjectName::from(vec![
1680                        Ident::new("schema_1"),
1681                        Ident::new("table_1"),
1682                    ]),
1683                    or_replace: true,
1684                    if_not_exists: true,
1685                    expire_after: Some(7200), // 2 hours in seconds
1686                    comment: Some("lowercase test".to_string()),
1687                    flow_options: OptionMap::default(),
1688                },
1689            ),
1690            (
1691                r"
1692CREATE FLOW task_5
1693SINK TO schema_1.table_1
1694WITH (defer_on_missing_source = 'true')
1695AS
1696SELECT max(c1), min(c2) FROM schema_2.table_2;",
1697                CreateFlowWoutQuery {
1698                    flow_name: ObjectName::from(vec![Ident::new("task_5")]),
1699                    sink_table_name: ObjectName::from(vec![
1700                        Ident::new("schema_1"),
1701                        Ident::new("table_1"),
1702                    ]),
1703                    or_replace: false,
1704                    if_not_exists: false,
1705                    expire_after: None,
1706                    comment: None,
1707                    flow_options: string_option_map([("defer_on_missing_source", "true")]),
1708                },
1709            ),
1710        ];
1711
1712        for (sql, expected) in testcases {
1713            let create_task = parse_create_flow(sql);
1714
1715            let expected = CreateFlow {
1716                flow_name: expected.flow_name,
1717                sink_table_name: expected.sink_table_name,
1718                or_replace: expected.or_replace,
1719                if_not_exists: expected.if_not_exists,
1720                expire_after: expected.expire_after,
1721                eval_interval: None,
1722                comment: expected.comment,
1723                flow_options: expected.flow_options,
1724                // ignore query parse result
1725                query: create_task.query.clone(),
1726            };
1727
1728            assert_eq!(create_task, expected, "input sql is:\n{sql}");
1729            let show_create = create_task.to_string();
1730            let recreated = parse_create_flow(&show_create);
1731            assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1732        }
1733    }
1734
1735    #[test]
1736    fn test_parse_create_flow() {
1737        use pretty_assertions::assert_eq;
1738        fn parse_create_flow(sql: &str) -> CreateFlow {
1739            let stmts = ParserContext::create_with_dialect(
1740                sql,
1741                &GreptimeDbDialect {},
1742                ParseOptions::default(),
1743            )
1744            .unwrap();
1745            assert_eq!(1, stmts.len());
1746            match &stmts[0] {
1747                Statement::CreateFlow(c) => c.clone(),
1748                _ => panic!("{:?}", stmts[0]),
1749            }
1750        }
1751        struct CreateFlowWoutQuery {
1752            /// Flow name
1753            pub flow_name: ObjectName,
1754            /// Output (sink) table name
1755            pub sink_table_name: ObjectName,
1756            /// Whether to replace existing task
1757            pub or_replace: bool,
1758            /// Create if not exist
1759            pub if_not_exists: bool,
1760            /// `EXPIRE AFTER`
1761            /// Duration in second as `i64`
1762            pub expire_after: Option<i64>,
1763            /// Duration for flow evaluation interval
1764            /// Duration in seconds as `i64`
1765            /// If not set, flow will be evaluated based on time window size and other args.
1766            pub eval_interval: Option<i64>,
1767            /// Comment string
1768            pub comment: Option<String>,
1769            /// Flow creation options
1770            pub flow_options: OptionMap,
1771        }
1772
1773        // create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE AFTER` and `COMMENT`
1774        let testcases = vec![
1775            (
1776                r"
1777CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1778SINK TO schema_1.table_1
1779EXPIRE AFTER INTERVAL '5 minutes'
1780COMMENT 'test comment'
1781AS
1782SELECT max(c1), min(c2) FROM schema_2.table_2;",
1783                CreateFlowWoutQuery {
1784                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1785                    sink_table_name: ObjectName(vec![
1786                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1787                        ObjectNamePart::Identifier(Ident::new("table_1")),
1788                    ]),
1789                    or_replace: true,
1790                    if_not_exists: true,
1791                    expire_after: Some(300),
1792                    eval_interval: None,
1793                    comment: Some("test comment".to_string()),
1794                    flow_options: OptionMap::default(),
1795                },
1796            ),
1797            (
1798                r"
1799CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1800SINK TO schema_1.table_1
1801EXPIRE AFTER INTERVAL '300 s'
1802COMMENT 'test comment'
1803AS
1804SELECT max(c1), min(c2) FROM schema_2.table_2;",
1805                CreateFlowWoutQuery {
1806                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1807                    sink_table_name: ObjectName(vec![
1808                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1809                        ObjectNamePart::Identifier(Ident::new("table_1")),
1810                    ]),
1811                    or_replace: true,
1812                    if_not_exists: true,
1813                    expire_after: Some(300),
1814                    eval_interval: None,
1815                    comment: Some("test comment".to_string()),
1816                    flow_options: OptionMap::default(),
1817                },
1818            ),
1819            (
1820                r"
1821CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1822SINK TO schema_1.table_1
1823EXPIRE AFTER '5 minutes'
1824EVAL INTERVAL '10 seconds'
1825COMMENT 'test comment'
1826AS
1827SELECT max(c1), min(c2) FROM schema_2.table_2;",
1828                CreateFlowWoutQuery {
1829                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1830                    sink_table_name: ObjectName(vec![
1831                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1832                        ObjectNamePart::Identifier(Ident::new("table_1")),
1833                    ]),
1834                    or_replace: true,
1835                    if_not_exists: true,
1836                    expire_after: Some(300),
1837                    eval_interval: Some(10),
1838                    comment: Some("test comment".to_string()),
1839                    flow_options: OptionMap::default(),
1840                },
1841            ),
1842            (
1843                r"
1844CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1845SINK TO schema_1.table_1
1846EXPIRE AFTER '5 minutes'
1847EVAL INTERVAL INTERVAL '10 seconds'
1848COMMENT 'test comment'
1849AS
1850SELECT max(c1), min(c2) FROM schema_2.table_2;",
1851                CreateFlowWoutQuery {
1852                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1853                    sink_table_name: ObjectName(vec![
1854                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1855                        ObjectNamePart::Identifier(Ident::new("table_1")),
1856                    ]),
1857                    or_replace: true,
1858                    if_not_exists: true,
1859                    expire_after: Some(300),
1860                    eval_interval: Some(10),
1861                    comment: Some("test comment".to_string()),
1862                    flow_options: OptionMap::default(),
1863                },
1864            ),
1865            (
1866                r"
1867CREATE FLOW `task_2`
1868SINK TO schema_1.table_1
1869EXPIRE AFTER '2 days 1h 2 min'
1870AS
1871SELECT max(c1), min(c2) FROM schema_2.table_2;",
1872                CreateFlowWoutQuery {
1873                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::with_quote(
1874                        '`', "task_2",
1875                    ))]),
1876                    sink_table_name: ObjectName(vec![
1877                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1878                        ObjectNamePart::Identifier(Ident::new("table_1")),
1879                    ]),
1880                    or_replace: false,
1881                    if_not_exists: false,
1882                    expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1883                    eval_interval: None,
1884                    comment: None,
1885                    flow_options: OptionMap::default(),
1886                },
1887            ),
1888            (
1889                r"
1890CREATE FLOW task_3
1891SINK TO schema_1.table_1
1892EVAL INTERVAL '10 seconds'
1893WITH (defer_on_missing_source = 'true', foo = 'bar')
1894AS
1895SELECT max(c1), min(c2) FROM schema_2.table_2;",
1896                CreateFlowWoutQuery {
1897                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_3"))]),
1898                    sink_table_name: ObjectName(vec![
1899                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1900                        ObjectNamePart::Identifier(Ident::new("table_1")),
1901                    ]),
1902                    or_replace: false,
1903                    if_not_exists: false,
1904                    expire_after: None,
1905                    eval_interval: Some(10),
1906                    comment: None,
1907                    flow_options: string_option_map([
1908                        ("defer_on_missing_source", "true"),
1909                        ("foo", "bar"),
1910                    ]),
1911                },
1912            ),
1913        ];
1914
1915        for (sql, expected) in testcases {
1916            let create_task = parse_create_flow(sql);
1917
1918            let expected = CreateFlow {
1919                flow_name: expected.flow_name,
1920                sink_table_name: expected.sink_table_name,
1921                or_replace: expected.or_replace,
1922                if_not_exists: expected.if_not_exists,
1923                expire_after: expected.expire_after,
1924                eval_interval: expected.eval_interval,
1925                comment: expected.comment,
1926                flow_options: expected.flow_options,
1927                // ignore query parse result
1928                query: create_task.query.clone(),
1929            };
1930
1931            assert_eq!(create_task, expected, "input sql is:\n{sql}");
1932            let show_create = create_task.to_string();
1933            let recreated = parse_create_flow(&show_create);
1934            assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1935        }
1936    }
1937
1938    #[test]
1939    fn test_parse_create_flow_with_tql_cte_query() {
1940        let sql = r#"
1941CREATE FLOW calc_reqs_cte
1942SINK TO cnt_reqs_cte
1943EVAL INTERVAL '1m'
1944AS
1945WITH tql(the_timestamp, the_value) AS (
1946    TQL EVAL (now() - '1m'::interval, now(), '5s') metric
1947)
1948SELECT * FROM tql;
1949"#;
1950
1951        let stmts =
1952            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1953                .unwrap();
1954        assert_eq!(1, stmts.len());
1955        let Statement::CreateFlow(create_flow) = &stmts[0] else {
1956            panic!("unexpected stmt: {:?}", stmts[0]);
1957        };
1958
1959        let query = create_flow.query.to_string();
1960        assert!(query.to_uppercase().contains("WITH"));
1961        assert!(query.to_uppercase().contains("TQL EVAL"));
1962    }
1963
1964    #[test]
1965    fn test_parse_create_flow_with_sql_cte_is_supported() {
1966        let sql = r#"
1967CREATE FLOW f
1968SINK TO s
1969AS
1970WITH cte AS (SELECT 1) SELECT * FROM cte;
1971"#;
1972
1973        let stmts =
1974            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1975                .unwrap();
1976        assert_eq!(1, stmts.len());
1977        let Statement::CreateFlow(create_flow) = &stmts[0] else {
1978            panic!("unexpected stmt: {:?}", stmts[0]);
1979        };
1980        assert_eq!(
1981            "WITH cte AS (SELECT 1) SELECT * FROM cte",
1982            create_flow.query.to_string()
1983        );
1984    }
1985
1986    #[test]
1987    fn test_parse_create_flow_with_tql_cte_requires_now_expr() {
1988        let sql = r#"
1989CREATE FLOW f
1990SINK TO s
1991EVAL INTERVAL '1m'
1992AS
1993WITH tql(ts, val) AS (
1994    TQL EVAL (0, 15, '5s') metric
1995)
1996SELECT * FROM tql;
1997"#;
1998
1999        let err =
2000            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2001                .unwrap_err();
2002
2003        let msg = format!("{err:?}");
2004        assert!(
2005            msg.contains("Expected expression containing `now()`"),
2006            "unexpected err: {msg}"
2007        );
2008    }
2009
2010    #[test]
2011    fn test_parse_create_flow_with_tql_cte_non_select_star_is_unsupported() {
2012        let sql = r#"
2013CREATE FLOW f
2014SINK TO s
2015AS
2016WITH tql(ts, val) AS (
2017    TQL EVAL (now() - '1m'::interval, now(), '5s') metric
2018)
2019SELECT ts FROM tql;
2020"#;
2021
2022        let err =
2023            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2024                .unwrap_err();
2025        assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
2026    }
2027
2028    #[test]
2029    fn test_parse_create_flow_with_tql_cte_filter_is_unsupported() {
2030        let sql = r#"
2031CREATE FLOW f
2032SINK TO s
2033AS
2034WITH tql(ts, val) AS (
2035    TQL EVAL (now() - '1m'::interval, now(), '5s') metric
2036)
2037SELECT * FROM tql WHERE ts > 0;
2038"#;
2039
2040        let err =
2041            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2042                .unwrap_err();
2043        assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
2044    }
2045
2046    #[test]
2047    fn test_parse_create_flow_with_mixed_sql_tql_cte_is_unsupported() {
2048        let sql = r#"
2049CREATE FLOW f
2050SINK TO s
2051AS
2052WITH s1 AS (SELECT 1),
2053     tql(ts, val) AS (TQL EVAL (now() - '1m'::interval, now(), '5s') metric)
2054SELECT * FROM tql;
2055"#;
2056
2057        let err =
2058            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2059                .unwrap_err();
2060        assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
2061    }
2062
2063    #[test]
2064    fn test_create_flow_no_month() {
2065        let sql = r"
2066CREATE FLOW `task_2`
2067SINK TO schema_1.table_1
2068EXPIRE AFTER '1 month 2 days 1h 2 min'
2069AS
2070SELECT max(c1), min(c2) FROM schema_2.table_2;";
2071        let stmts =
2072            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2073
2074        assert!(
2075            stmts.is_err()
2076                && stmts
2077                    .unwrap_err()
2078                    .to_string()
2079                    .contains("Interval with months is not allowed")
2080        );
2081    }
2082
2083    #[test]
2084    fn test_validate_create() {
2085        let sql = r"
2086CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp TIME INDEX)
2087PARTITION ON COLUMNS(c, a) (
2088    a < 10,
2089    a > 10 AND a < 20,
2090    a > 20 AND c < 100,
2091    a > 20 AND c >= 100
2092)
2093ENGINE=mito";
2094        let result =
2095            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2096        let _ = result.unwrap();
2097
2098        let sql = r"
2099CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2100PARTITION ON COLUMNS(x) ()
2101ENGINE=mito";
2102        let result =
2103            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2104        assert!(
2105            result
2106                .unwrap_err()
2107                .to_string()
2108                .contains("Partition column \"x\" not defined")
2109        );
2110    }
2111
2112    #[test]
2113    fn test_parse_create_table_with_partitions() {
2114        let sql = r"
2115CREATE TABLE monitor (
2116  host_id    INT,
2117  idc        STRING,
2118  ts         TIMESTAMP,
2119  cpu        DOUBLE DEFAULT 0,
2120  memory     DOUBLE,
2121  TIME INDEX (ts),
2122  PRIMARY KEY (host),
2123)
2124PARTITION ON COLUMNS(idc, host_id) (
2125  idc <= 'hz' AND host_id < 1000,
2126  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
2127  idc > 'sh' AND host_id < 3000,
2128  idc > 'sh' AND host_id >= 3000,
2129)
2130ENGINE=mito";
2131        let result =
2132            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2133                .unwrap();
2134        assert_eq!(result.len(), 1);
2135        match &result[0] {
2136            Statement::CreateTable(c) => {
2137                assert!(c.partitions.is_some());
2138
2139                let partitions = c.partitions.as_ref().unwrap();
2140                let column_list = partitions
2141                    .column_list
2142                    .iter()
2143                    .map(|x| &x.value)
2144                    .collect::<Vec<&String>>();
2145                assert_eq!(column_list, vec!["idc", "host_id"]);
2146
2147                let exprs = &partitions.exprs;
2148
2149                assert_eq!(
2150                    exprs[0],
2151                    Expr::BinaryOp {
2152                        left: Box::new(Expr::BinaryOp {
2153                            left: Box::new(Expr::Identifier("idc".into())),
2154                            op: BinaryOperator::LtEq,
2155                            right: Box::new(Expr::Value(
2156                                Value::SingleQuotedString("hz".to_string()).into()
2157                            ))
2158                        }),
2159                        op: BinaryOperator::And,
2160                        right: Box::new(Expr::BinaryOp {
2161                            left: Box::new(Expr::Identifier("host_id".into())),
2162                            op: BinaryOperator::Lt,
2163                            right: Box::new(Expr::Value(
2164                                Value::Number("1000".to_string(), false).into()
2165                            ))
2166                        })
2167                    }
2168                );
2169                assert_eq!(
2170                    exprs[1],
2171                    Expr::BinaryOp {
2172                        left: Box::new(Expr::BinaryOp {
2173                            left: Box::new(Expr::BinaryOp {
2174                                left: Box::new(Expr::Identifier("idc".into())),
2175                                op: BinaryOperator::Gt,
2176                                right: Box::new(Expr::Value(
2177                                    Value::SingleQuotedString("hz".to_string()).into()
2178                                ))
2179                            }),
2180                            op: BinaryOperator::And,
2181                            right: Box::new(Expr::BinaryOp {
2182                                left: Box::new(Expr::Identifier("idc".into())),
2183                                op: BinaryOperator::LtEq,
2184                                right: Box::new(Expr::Value(
2185                                    Value::SingleQuotedString("sh".to_string()).into()
2186                                ))
2187                            })
2188                        }),
2189                        op: BinaryOperator::And,
2190                        right: Box::new(Expr::BinaryOp {
2191                            left: Box::new(Expr::Identifier("host_id".into())),
2192                            op: BinaryOperator::Lt,
2193                            right: Box::new(Expr::Value(
2194                                Value::Number("2000".to_string(), false).into()
2195                            ))
2196                        })
2197                    }
2198                );
2199                assert_eq!(
2200                    exprs[2],
2201                    Expr::BinaryOp {
2202                        left: Box::new(Expr::BinaryOp {
2203                            left: Box::new(Expr::Identifier("idc".into())),
2204                            op: BinaryOperator::Gt,
2205                            right: Box::new(Expr::Value(
2206                                Value::SingleQuotedString("sh".to_string()).into()
2207                            ))
2208                        }),
2209                        op: BinaryOperator::And,
2210                        right: Box::new(Expr::BinaryOp {
2211                            left: Box::new(Expr::Identifier("host_id".into())),
2212                            op: BinaryOperator::Lt,
2213                            right: Box::new(Expr::Value(
2214                                Value::Number("3000".to_string(), false).into()
2215                            ))
2216                        })
2217                    }
2218                );
2219                assert_eq!(
2220                    exprs[3],
2221                    Expr::BinaryOp {
2222                        left: Box::new(Expr::BinaryOp {
2223                            left: Box::new(Expr::Identifier("idc".into())),
2224                            op: BinaryOperator::Gt,
2225                            right: Box::new(Expr::Value(
2226                                Value::SingleQuotedString("sh".to_string()).into()
2227                            ))
2228                        }),
2229                        op: BinaryOperator::And,
2230                        right: Box::new(Expr::BinaryOp {
2231                            left: Box::new(Expr::Identifier("host_id".into())),
2232                            op: BinaryOperator::GtEq,
2233                            right: Box::new(Expr::Value(
2234                                Value::Number("3000".to_string(), false).into()
2235                            ))
2236                        })
2237                    }
2238                );
2239            }
2240            _ => unreachable!(),
2241        }
2242    }
2243
2244    #[test]
2245    fn test_parse_create_table_with_quoted_partitions() {
2246        let sql = r"
2247CREATE TABLE monitor (
2248  `host_id`    INT,
2249  idc        STRING,
2250  ts         TIMESTAMP,
2251  cpu        DOUBLE DEFAULT 0,
2252  memory     DOUBLE,
2253  TIME INDEX (ts),
2254  PRIMARY KEY (host),
2255)
2256PARTITION ON COLUMNS(IdC, host_id) (
2257  idc <= 'hz' AND host_id < 1000,
2258  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
2259  idc > 'sh' AND host_id < 3000,
2260  idc > 'sh' AND host_id >= 3000,
2261)";
2262        let result =
2263            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2264                .unwrap();
2265        assert_eq!(result.len(), 1);
2266    }
2267
2268    #[test]
2269    fn test_parse_create_table_with_timestamp_index() {
2270        let sql1 = r"
2271CREATE TABLE monitor (
2272  host_id    INT,
2273  idc        STRING,
2274  ts         TIMESTAMP TIME INDEX,
2275  cpu        DOUBLE DEFAULT 0,
2276  memory     DOUBLE,
2277  PRIMARY KEY (host),
2278)
2279ENGINE=mito";
2280        let result1 = ParserContext::create_with_dialect(
2281            sql1,
2282            &GreptimeDbDialect {},
2283            ParseOptions::default(),
2284        )
2285        .unwrap();
2286
2287        if let Statement::CreateTable(c) = &result1[0] {
2288            assert_eq!(c.constraints.len(), 2);
2289            let tc = c.constraints[0].clone();
2290            match tc {
2291                TableConstraint::TimeIndex { column } => {
2292                    assert_eq!(&column.value, "ts");
2293                }
2294                _ => panic!("should be time index constraint"),
2295            };
2296        } else {
2297            panic!("should be create_table statement");
2298        }
2299
2300        // `TIME INDEX` should be in front of `PRIMARY KEY`
2301        // in order to equal the `TIMESTAMP TIME INDEX` constraint options vector
2302        let sql2 = r"
2303CREATE TABLE monitor (
2304  host_id    INT,
2305  idc        STRING,
2306  ts         TIMESTAMP NOT NULL,
2307  cpu        DOUBLE DEFAULT 0,
2308  memory     DOUBLE,
2309  TIME INDEX (ts),
2310  PRIMARY KEY (host),
2311)
2312ENGINE=mito";
2313        let result2 = ParserContext::create_with_dialect(
2314            sql2,
2315            &GreptimeDbDialect {},
2316            ParseOptions::default(),
2317        )
2318        .unwrap();
2319
2320        assert_eq!(result1, result2);
2321
2322        // TIMESTAMP can be NULL which is not equal to above
2323        let sql3 = r"
2324CREATE TABLE monitor (
2325  host_id    INT,
2326  idc        STRING,
2327  ts         TIMESTAMP,
2328  cpu        DOUBLE DEFAULT 0,
2329  memory     DOUBLE,
2330  TIME INDEX (ts),
2331  PRIMARY KEY (host),
2332)
2333ENGINE=mito";
2334
2335        let result3 = ParserContext::create_with_dialect(
2336            sql3,
2337            &GreptimeDbDialect {},
2338            ParseOptions::default(),
2339        )
2340        .unwrap();
2341
2342        assert_ne!(result1, result3);
2343
2344        // BIGINT can't be time index any more
2345        let sql1 = r"
2346CREATE TABLE monitor (
2347  host_id    INT,
2348  idc        STRING,
2349  b          bigint TIME INDEX,
2350  cpu        DOUBLE DEFAULT 0,
2351  memory     DOUBLE,
2352  PRIMARY KEY (host),
2353)
2354ENGINE=mito";
2355        let result1 = ParserContext::create_with_dialect(
2356            sql1,
2357            &GreptimeDbDialect {},
2358            ParseOptions::default(),
2359        );
2360
2361        assert!(
2362            result1
2363                .unwrap_err()
2364                .to_string()
2365                .contains("time index column data type should be timestamp")
2366        );
2367    }
2368
2369    #[test]
2370    fn test_parse_create_table_with_timestamp_index_not_null() {
2371        let sql = r"
2372CREATE TABLE monitor (
2373  host_id    INT,
2374  idc        STRING,
2375  ts         TIMESTAMP TIME INDEX,
2376  cpu        DOUBLE DEFAULT 0,
2377  memory     DOUBLE,
2378  TIME INDEX (ts),
2379  PRIMARY KEY (host),
2380)
2381ENGINE=mito";
2382        let result =
2383            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2384                .unwrap();
2385
2386        assert_eq!(result.len(), 1);
2387        if let Statement::CreateTable(c) = &result[0] {
2388            let ts = c.columns[2].clone();
2389            assert_eq!(ts.name().to_string(), "ts");
2390            assert_eq!(ts.options()[0].option, NotNull);
2391        } else {
2392            panic!("should be create table statement");
2393        }
2394
2395        let sql1 = r"
2396CREATE TABLE monitor (
2397  host_id    INT,
2398  idc        STRING,
2399  ts         TIMESTAMP NOT NULL TIME INDEX,
2400  cpu        DOUBLE DEFAULT 0,
2401  memory     DOUBLE,
2402  TIME INDEX (ts),
2403  PRIMARY KEY (host),
2404)
2405ENGINE=mito";
2406
2407        let result1 = ParserContext::create_with_dialect(
2408            sql1,
2409            &GreptimeDbDialect {},
2410            ParseOptions::default(),
2411        )
2412        .unwrap();
2413        assert_eq!(result, result1);
2414
2415        let sql2 = r"
2416CREATE TABLE monitor (
2417  host_id    INT,
2418  idc        STRING,
2419  ts         TIMESTAMP TIME INDEX NOT NULL,
2420  cpu        DOUBLE DEFAULT 0,
2421  memory     DOUBLE,
2422  TIME INDEX (ts),
2423  PRIMARY KEY (host),
2424)
2425ENGINE=mito";
2426
2427        let result2 = ParserContext::create_with_dialect(
2428            sql2,
2429            &GreptimeDbDialect {},
2430            ParseOptions::default(),
2431        )
2432        .unwrap();
2433        assert_eq!(result, result2);
2434
2435        let sql3 = r"
2436CREATE TABLE monitor (
2437  host_id    INT,
2438  idc        STRING,
2439  ts         TIMESTAMP TIME INDEX NULL NOT,
2440  cpu        DOUBLE DEFAULT 0,
2441  memory     DOUBLE,
2442  TIME INDEX (ts),
2443  PRIMARY KEY (host),
2444)
2445ENGINE=mito";
2446
2447        let result3 = ParserContext::create_with_dialect(
2448            sql3,
2449            &GreptimeDbDialect {},
2450            ParseOptions::default(),
2451        );
2452        assert!(result3.is_err());
2453
2454        let sql4 = r"
2455CREATE TABLE monitor (
2456  host_id    INT,
2457  idc        STRING,
2458  ts         TIMESTAMP TIME INDEX NOT NULL NULL,
2459  cpu        DOUBLE DEFAULT 0,
2460  memory     DOUBLE,
2461  TIME INDEX (ts),
2462  PRIMARY KEY (host),
2463)
2464ENGINE=mito";
2465
2466        let result4 = ParserContext::create_with_dialect(
2467            sql4,
2468            &GreptimeDbDialect {},
2469            ParseOptions::default(),
2470        );
2471        assert!(result4.is_err());
2472
2473        let sql = r"
2474CREATE TABLE monitor (
2475  host_id    INT,
2476  idc        STRING,
2477  ts         TIMESTAMP TIME INDEX DEFAULT CURRENT_TIMESTAMP,
2478  cpu        DOUBLE DEFAULT 0,
2479  memory     DOUBLE,
2480  TIME INDEX (ts),
2481  PRIMARY KEY (host),
2482)
2483ENGINE=mito";
2484
2485        let result =
2486            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2487                .unwrap();
2488
2489        if let Statement::CreateTable(c) = &result[0] {
2490            let tc = c.constraints[0].clone();
2491            match tc {
2492                TableConstraint::TimeIndex { column } => {
2493                    assert_eq!(&column.value, "ts");
2494                }
2495                _ => panic!("should be time index constraint"),
2496            }
2497            let ts = c.columns[2].clone();
2498            assert_eq!(ts.name().to_string(), "ts");
2499            assert!(matches!(ts.options()[0].option, ColumnOption::Default(..)));
2500            assert_eq!(ts.options()[1].option, NotNull);
2501        } else {
2502            unreachable!("should be create table statement");
2503        }
2504    }
2505
2506    #[test]
2507    fn test_parse_partitions_with_error_syntax() {
2508        let sql = r"
2509CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2510PARTITION COLUMNS(c, a) (
2511    a < 10,
2512    a > 10 AND a < 20,
2513    a > 20 AND c < 100,
2514    a > 20 AND c >= 100
2515)
2516ENGINE=mito";
2517        let result =
2518            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2519        assert!(
2520            result
2521                .unwrap_err()
2522                .output_msg()
2523                .contains("sql parser error: Expected: ON, found: COLUMNS")
2524        );
2525    }
2526
2527    #[test]
2528    fn test_parse_partitions_without_rule() {
2529        let sql = r"
2530CREATE TABLE rcx ( a INT, b STRING, c INT, d TIMESTAMP TIME INDEX )
2531PARTITION ON COLUMNS(c, a) ()
2532ENGINE=mito";
2533        ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2534            .unwrap();
2535    }
2536
2537    #[test]
2538    fn test_parse_partitions_unreferenced_column() {
2539        let sql = r"
2540CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2541PARTITION ON COLUMNS(c, a) (
2542    b = 'foo'
2543)
2544ENGINE=mito";
2545        let result =
2546            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2547        assert_eq!(
2548            result.unwrap_err().output_msg(),
2549            "Invalid SQL, error: Column \"b\" in rule expr is not referenced in PARTITION ON"
2550        );
2551    }
2552
2553    #[test]
2554    fn test_parse_partitions_not_binary_expr() {
2555        let sql = r"
2556CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2557PARTITION ON COLUMNS(c, a) (
2558    b
2559)
2560ENGINE=mito";
2561        let result =
2562            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2563        assert_eq!(
2564            result.unwrap_err().output_msg(),
2565            r#"Invalid SQL, error: Partition rule expr Identifier(Ident { value: "b", quote_style: None, span: Span(Location(4,5)..Location(4,6)) }) is not a binary expr"#
2566        );
2567    }
2568
2569    fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {
2570        assert_eq!(column.name.to_string(), name);
2571        assert_eq!(column.data_type.to_string(), data_type);
2572    }
2573
2574    #[test]
2575    pub fn test_parse_create_table() {
2576        let sql = r"create table demo(
2577                             host string,
2578                             ts timestamp,
2579                             cpu float32 default 0,
2580                             memory float64,
2581                             TIME INDEX (ts),
2582                             PRIMARY KEY(ts, host),
2583                             ) engine=mito
2584                             with(ttl='10s');
2585         ";
2586        let result =
2587            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2588                .unwrap();
2589        assert_eq!(1, result.len());
2590        match &result[0] {
2591            Statement::CreateTable(c) => {
2592                assert!(!c.if_not_exists);
2593                assert_eq!("demo", c.name.to_string());
2594                assert_eq!("mito", c.engine);
2595                assert_eq!(4, c.columns.len());
2596                let columns = &c.columns;
2597                assert_column_def(&columns[0].column_def, "host", "STRING");
2598                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
2599                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
2600                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
2601
2602                let constraints = &c.constraints;
2603                assert_eq!(
2604                    &constraints[0],
2605                    &TableConstraint::TimeIndex {
2606                        column: Ident::new("ts"),
2607                    }
2608                );
2609                assert_eq!(
2610                    &constraints[1],
2611                    &TableConstraint::PrimaryKey {
2612                        columns: vec![Ident::new("ts"), Ident::new("host")]
2613                    }
2614                );
2615                // inverted index is merged into column options
2616                assert_eq!(1, c.options.len());
2617                assert_eq!(
2618                    [("ttl", "10s")].into_iter().collect::<HashMap<_, _>>(),
2619                    c.options.to_str_map()
2620                );
2621            }
2622            _ => unreachable!(),
2623        }
2624    }
2625
2626    #[test]
2627    fn test_invalid_index_keys() {
2628        let sql = r"create table demo(
2629                             host string,
2630                             ts int64,
2631                             cpu float64 default 0,
2632                             memory float64,
2633                             TIME INDEX (ts, host),
2634                             PRIMARY KEY(ts, host)) engine=mito;
2635         ";
2636        let result =
2637            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2638        assert!(result.is_err());
2639        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2640    }
2641
2642    #[test]
2643    fn test_duplicated_time_index() {
2644        let sql = r"create table demo(
2645                             host string,
2646                             ts timestamp time index,
2647                             t timestamp time index,
2648                             cpu float64 default 0,
2649                             memory float64,
2650                             TIME INDEX (ts, host),
2651                             PRIMARY KEY(ts, host)) engine=mito;
2652         ";
2653        let result =
2654            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2655        assert!(result.is_err());
2656        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2657
2658        let sql = r"create table demo(
2659                             host string,
2660                             ts timestamp time index,
2661                             cpu float64 default 0,
2662                             t timestamp,
2663                             memory float64,
2664                             TIME INDEX (t),
2665                             PRIMARY KEY(ts, host)) engine=mito;
2666         ";
2667        let result =
2668            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2669        assert!(result.is_err());
2670        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2671    }
2672
2673    #[test]
2674    fn test_invalid_column_name() {
2675        let sql = "create table foo(user string, i timestamp time index)";
2676        let result =
2677            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2678        let err = result.unwrap_err().output_msg();
2679        assert!(err.contains("Cannot use keyword 'user' as column name"));
2680
2681        // If column name is quoted, it's valid even same with keyword.
2682        let sql = r#"
2683            create table foo("user" string, i timestamp time index)
2684        "#;
2685        let result =
2686            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2687        let _ = result.unwrap();
2688    }
2689
2690    #[test]
2691    fn test_incorrect_default_value_issue_3479() {
2692        let sql = r#"CREATE TABLE `ExcePTuRi`(
2693non TIMESTAMP(6) TIME INDEX,
2694`iUSTO` DOUBLE DEFAULT 0.047318541668048164
2695)"#;
2696        let result =
2697            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2698                .unwrap();
2699        assert_eq!(1, result.len());
2700        match &result[0] {
2701            Statement::CreateTable(c) => {
2702                assert_eq!(
2703                    "`iUSTO` DOUBLE DEFAULT 0.047318541668048164",
2704                    c.columns[1].to_string()
2705                );
2706            }
2707            _ => unreachable!(),
2708        }
2709    }
2710
2711    #[test]
2712    fn test_parse_create_view() {
2713        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
2714
2715        let result =
2716            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2717                .unwrap();
2718        match &result[0] {
2719            Statement::CreateView(c) => {
2720                assert_eq!(c.to_string(), sql);
2721                assert!(!c.or_replace);
2722                assert!(!c.if_not_exists);
2723                assert_eq!("test", c.name.to_string());
2724            }
2725            _ => unreachable!(),
2726        }
2727
2728        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS";
2729
2730        let result =
2731            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2732                .unwrap();
2733        match &result[0] {
2734            Statement::CreateView(c) => {
2735                assert_eq!(c.to_string(), sql);
2736                assert!(c.or_replace);
2737                assert!(c.if_not_exists);
2738                assert_eq!("test", c.name.to_string());
2739            }
2740            _ => unreachable!(),
2741        }
2742    }
2743
2744    #[test]
2745    fn test_parse_create_view_invalid_query() {
2746        let sql = "CREATE VIEW test AS DELETE from demo";
2747        let result =
2748            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2749        assert!(result.is_ok_and(|x| x.len() == 1));
2750    }
2751
2752    #[test]
2753    fn test_parse_create_table_fulltext_options() {
2754        let sql1 = r"
2755CREATE TABLE log (
2756    ts TIMESTAMP TIME INDEX,
2757    msg TEXT FULLTEXT INDEX,
2758)";
2759        let result1 = ParserContext::create_with_dialect(
2760            sql1,
2761            &GreptimeDbDialect {},
2762            ParseOptions::default(),
2763        )
2764        .unwrap();
2765
2766        if let Statement::CreateTable(c) = &result1[0] {
2767            c.columns.iter().for_each(|col| {
2768                if col.name().value == "msg" {
2769                    assert!(
2770                        col.extensions
2771                            .fulltext_index_options
2772                            .as_ref()
2773                            .unwrap()
2774                            .is_empty()
2775                    );
2776                }
2777            });
2778        } else {
2779            panic!("should be create_table statement");
2780        }
2781
2782        let sql2 = r"
2783CREATE TABLE log (
2784    ts TIMESTAMP TIME INDEX,
2785    msg STRING FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false')
2786)";
2787        let result2 = ParserContext::create_with_dialect(
2788            sql2,
2789            &GreptimeDbDialect {},
2790            ParseOptions::default(),
2791        )
2792        .unwrap();
2793
2794        if let Statement::CreateTable(c) = &result2[0] {
2795            c.columns.iter().for_each(|col| {
2796                if col.name().value == "msg" {
2797                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2798                    assert_eq!(options.len(), 2);
2799                    assert_eq!(options.get("analyzer").unwrap(), "English");
2800                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2801                }
2802            });
2803        } else {
2804            panic!("should be create_table statement");
2805        }
2806
2807        let sql3 = r"
2808CREATE TABLE log (
2809    ts TIMESTAMP TIME INDEX,
2810    msg1 TINYTEXT FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false'),
2811    msg2 CHAR(20) FULLTEXT INDEX WITH (analyzer='Chinese', case_sensitive='true')
2812)";
2813        let result3 = ParserContext::create_with_dialect(
2814            sql3,
2815            &GreptimeDbDialect {},
2816            ParseOptions::default(),
2817        )
2818        .unwrap();
2819
2820        if let Statement::CreateTable(c) = &result3[0] {
2821            c.columns.iter().for_each(|col| {
2822                if col.name().value == "msg1" {
2823                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2824                    assert_eq!(options.len(), 2);
2825                    assert_eq!(options.get("analyzer").unwrap(), "English");
2826                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2827                } else if col.name().value == "msg2" {
2828                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2829                    assert_eq!(options.len(), 2);
2830                    assert_eq!(options.get("analyzer").unwrap(), "Chinese");
2831                    assert_eq!(options.get("case_sensitive").unwrap(), "true");
2832                }
2833            });
2834        } else {
2835            panic!("should be create_table statement");
2836        }
2837    }
2838
2839    #[test]
2840    fn test_parse_create_table_fulltext_options_invalid_type() {
2841        let sql = r"
2842CREATE TABLE log (
2843    ts TIMESTAMP TIME INDEX,
2844    msg INT FULLTEXT INDEX,
2845)";
2846        let result =
2847            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2848        assert!(result.is_err());
2849        assert!(
2850            result
2851                .unwrap_err()
2852                .to_string()
2853                .contains("FULLTEXT index only supports string type")
2854        );
2855    }
2856
2857    #[test]
2858    fn test_parse_create_table_fulltext_options_duplicate() {
2859        let sql = r"
2860CREATE TABLE log (
2861    ts TIMESTAMP TIME INDEX,
2862    msg STRING FULLTEXT INDEX WITH (analyzer='English', analyzer='Chinese') FULLTEXT INDEX WITH (case_sensitive='false')
2863)";
2864        let result =
2865            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2866        assert!(result.is_err());
2867        assert!(
2868            result
2869                .unwrap_err()
2870                .to_string()
2871                .contains("duplicated FULLTEXT INDEX option")
2872        );
2873    }
2874
2875    #[test]
2876    fn test_parse_create_table_fulltext_options_invalid_option() {
2877        let sql = r"
2878CREATE TABLE log (
2879    ts TIMESTAMP TIME INDEX,
2880    msg STRING FULLTEXT INDEX WITH (analyzer='English', invalid_option='Chinese')
2881)";
2882        let result =
2883            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2884        assert!(result.is_err());
2885        assert!(
2886            result
2887                .unwrap_err()
2888                .to_string()
2889                .contains("invalid FULLTEXT INDEX option")
2890        );
2891    }
2892
2893    #[test]
2894    fn test_parse_create_table_skip_options() {
2895        let sql = r"
2896CREATE TABLE log (
2897    ts TIMESTAMP TIME INDEX,
2898    msg INT SKIPPING INDEX WITH (granularity='8192', type='bloom'),
2899)";
2900        let result =
2901            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2902                .unwrap();
2903
2904        if let Statement::CreateTable(c) = &result[0] {
2905            c.columns.iter().for_each(|col| {
2906                if col.name().value == "msg" {
2907                    assert!(
2908                        !col.extensions
2909                            .skipping_index_options
2910                            .as_ref()
2911                            .unwrap()
2912                            .is_empty()
2913                    );
2914                }
2915            });
2916        } else {
2917            panic!("should be create_table statement");
2918        }
2919
2920        let sql = r"
2921        CREATE TABLE log (
2922            ts TIMESTAMP TIME INDEX,
2923            msg INT SKIPPING INDEX,
2924        )";
2925        let result =
2926            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2927                .unwrap();
2928
2929        if let Statement::CreateTable(c) = &result[0] {
2930            c.columns.iter().for_each(|col| {
2931                if col.name().value == "msg" {
2932                    assert!(
2933                        col.extensions
2934                            .skipping_index_options
2935                            .as_ref()
2936                            .unwrap()
2937                            .is_empty()
2938                    );
2939                }
2940            });
2941        } else {
2942            panic!("should be create_table statement");
2943        }
2944    }
2945
2946    #[test]
2947    fn test_parse_create_view_with_columns() {
2948        let sql = "CREATE VIEW test () AS SELECT * FROM NUMBERS";
2949        let result =
2950            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2951                .unwrap();
2952
2953        match &result[0] {
2954            Statement::CreateView(c) => {
2955                assert_eq!(c.to_string(), "CREATE VIEW test AS SELECT * FROM NUMBERS");
2956                assert!(!c.or_replace);
2957                assert!(!c.if_not_exists);
2958                assert_eq!("test", c.name.to_string());
2959            }
2960            _ => unreachable!(),
2961        }
2962        assert_eq!(
2963            "CREATE VIEW test AS SELECT * FROM NUMBERS",
2964            result[0].to_string()
2965        );
2966
2967        let sql = "CREATE VIEW test (n1) AS SELECT * FROM NUMBERS";
2968        let result =
2969            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2970                .unwrap();
2971
2972        match &result[0] {
2973            Statement::CreateView(c) => {
2974                assert_eq!(c.to_string(), sql);
2975                assert!(!c.or_replace);
2976                assert!(!c.if_not_exists);
2977                assert_eq!("test", c.name.to_string());
2978            }
2979            _ => unreachable!(),
2980        }
2981        assert_eq!(sql, result[0].to_string());
2982
2983        let sql = "CREATE VIEW test (n1, n2) AS SELECT * FROM NUMBERS";
2984        let result =
2985            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2986                .unwrap();
2987
2988        match &result[0] {
2989            Statement::CreateView(c) => {
2990                assert_eq!(c.to_string(), sql);
2991                assert!(!c.or_replace);
2992                assert!(!c.if_not_exists);
2993                assert_eq!("test", c.name.to_string());
2994            }
2995            _ => unreachable!(),
2996        }
2997        assert_eq!(sql, result[0].to_string());
2998
2999        // Some invalid syntax cases
3000        let sql = "CREATE VIEW test (n1 AS select * from demo";
3001        let result =
3002            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3003        assert!(result.is_err());
3004
3005        let sql = "CREATE VIEW test (n1, AS select * from demo";
3006        let result =
3007            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3008        assert!(result.is_err());
3009
3010        let sql = "CREATE VIEW test n1,n2) AS select * from demo";
3011        let result =
3012            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3013        assert!(result.is_err());
3014
3015        let sql = "CREATE VIEW test (1) AS select * from demo";
3016        let result =
3017            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3018        assert!(result.is_err());
3019
3020        // keyword
3021        let sql = "CREATE VIEW test (n1, select) AS select * from demo";
3022        let result =
3023            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3024        assert!(result.is_err());
3025    }
3026
3027    #[test]
3028    fn test_parse_column_extensions_vector() {
3029        // Test that vector options are parsed from data_type (no additional SQL needed)
3030        let sql = "";
3031        let dialect = GenericDialect {};
3032        let mut tokenizer = Tokenizer::new(&dialect, sql);
3033        let tokens = tokenizer.tokenize().unwrap();
3034        let mut parser = Parser::new(&dialect).with_tokens(tokens);
3035        let name = Ident::new("vec_col");
3036        let data_type =
3037            DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
3038        let mut extensions = ColumnExtensions::default();
3039
3040        let result =
3041            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
3042        assert!(result.is_ok());
3043        assert!(extensions.vector_options.is_some());
3044        let vector_options = extensions.vector_options.unwrap();
3045        assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some("128"));
3046    }
3047
3048    #[test]
3049    fn test_parse_column_extensions_vector_invalid() {
3050        // Test that vector with no dimension fails
3051        let sql = "";
3052        let dialect = GenericDialect {};
3053        let mut tokenizer = Tokenizer::new(&dialect, sql);
3054        let tokens = tokenizer.tokenize().unwrap();
3055        let mut parser = Parser::new(&dialect).with_tokens(tokens);
3056        let name = Ident::new("vec_col");
3057        let data_type = DataType::Custom(vec![Ident::new("VECTOR")].into(), vec![]);
3058        let mut extensions = ColumnExtensions::default();
3059
3060        let result =
3061            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
3062        assert!(result.is_err());
3063    }
3064
3065    #[test]
3066    fn test_parse_column_extensions_indices() {
3067        // Test skipping index
3068        {
3069            let sql = "SKIPPING INDEX";
3070            let dialect = GenericDialect {};
3071            let mut tokenizer = Tokenizer::new(&dialect, sql);
3072            let tokens = tokenizer.tokenize().unwrap();
3073            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3074            let name = Ident::new("col");
3075            let data_type = DataType::String(None);
3076            let mut extensions = ColumnExtensions::default();
3077            let result = ParserContext::parse_column_extensions(
3078                &mut parser,
3079                &name,
3080                &data_type,
3081                &mut extensions,
3082            );
3083            assert!(result.is_ok());
3084            assert!(extensions.skipping_index_options.is_some());
3085        }
3086
3087        // Test fulltext index with options
3088        {
3089            let sql = "FULLTEXT INDEX WITH (analyzer = 'English', case_sensitive = 'true')";
3090            let dialect = GenericDialect {};
3091            let mut tokenizer = Tokenizer::new(&dialect, sql);
3092            let tokens = tokenizer.tokenize().unwrap();
3093            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3094            let name = Ident::new("text_col");
3095            let data_type = DataType::String(None);
3096            let mut extensions = ColumnExtensions::default();
3097            let result = ParserContext::parse_column_extensions(
3098                &mut parser,
3099                &name,
3100                &data_type,
3101                &mut extensions,
3102            );
3103            assert!(result.unwrap());
3104            assert!(extensions.fulltext_index_options.is_some());
3105            let fulltext_options = extensions.fulltext_index_options.unwrap();
3106            assert_eq!(fulltext_options.get("analyzer"), Some("English"));
3107            assert_eq!(fulltext_options.get("case_sensitive"), Some("true"));
3108        }
3109
3110        // Test fulltext index with invalid type (should fail)
3111        {
3112            let sql = "FULLTEXT INDEX WITH (analyzer = 'English')";
3113            let dialect = GenericDialect {};
3114            let mut tokenizer = Tokenizer::new(&dialect, sql);
3115            let tokens = tokenizer.tokenize().unwrap();
3116            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3117            let name = Ident::new("num_col");
3118            let data_type = DataType::Int(None); // Non-string type
3119            let mut extensions = ColumnExtensions::default();
3120            let result = ParserContext::parse_column_extensions(
3121                &mut parser,
3122                &name,
3123                &data_type,
3124                &mut extensions,
3125            );
3126            assert!(result.is_err());
3127            assert!(
3128                result
3129                    .unwrap_err()
3130                    .to_string()
3131                    .contains("FULLTEXT index only supports string type")
3132            );
3133        }
3134
3135        // Test fulltext index with invalid option (won't fail, the parser doesn't check the option's content)
3136        {
3137            let sql = "FULLTEXT INDEX WITH (analyzer = 'Invalid', case_sensitive = 'true')";
3138            let dialect = GenericDialect {};
3139            let mut tokenizer = Tokenizer::new(&dialect, sql);
3140            let tokens = tokenizer.tokenize().unwrap();
3141            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3142            let name = Ident::new("text_col");
3143            let data_type = DataType::String(None);
3144            let mut extensions = ColumnExtensions::default();
3145            let result = ParserContext::parse_column_extensions(
3146                &mut parser,
3147                &name,
3148                &data_type,
3149                &mut extensions,
3150            );
3151            assert!(result.unwrap());
3152        }
3153
3154        // Test inverted index
3155        {
3156            let sql = "INVERTED INDEX";
3157            let dialect = GenericDialect {};
3158            let mut tokenizer = Tokenizer::new(&dialect, sql);
3159            let tokens = tokenizer.tokenize().unwrap();
3160            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3161            let name = Ident::new("col");
3162            let data_type = DataType::String(None);
3163            let mut extensions = ColumnExtensions::default();
3164            let result = ParserContext::parse_column_extensions(
3165                &mut parser,
3166                &name,
3167                &data_type,
3168                &mut extensions,
3169            );
3170            assert!(result.is_ok());
3171            assert!(extensions.inverted_index_options.is_some());
3172        }
3173
3174        // Test inverted index with options (should fail)
3175        {
3176            let sql = "INVERTED INDEX WITH (analyzer = 'English')";
3177            let dialect = GenericDialect {};
3178            let mut tokenizer = Tokenizer::new(&dialect, sql);
3179            let tokens = tokenizer.tokenize().unwrap();
3180            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3181            let name = Ident::new("col");
3182            let data_type = DataType::String(None);
3183            let mut extensions = ColumnExtensions::default();
3184            let result = ParserContext::parse_column_extensions(
3185                &mut parser,
3186                &name,
3187                &data_type,
3188                &mut extensions,
3189            );
3190            assert!(result.is_err());
3191            assert!(
3192                result
3193                    .unwrap_err()
3194                    .to_string()
3195                    .contains("INVERTED index doesn't support options")
3196            );
3197        }
3198
3199        // Test multiple indices
3200        {
3201            let sql = "SKIPPING INDEX FULLTEXT INDEX";
3202            let dialect = GenericDialect {};
3203            let mut tokenizer = Tokenizer::new(&dialect, sql);
3204            let tokens = tokenizer.tokenize().unwrap();
3205            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3206            let name = Ident::new("col");
3207            let data_type = DataType::String(None);
3208            let mut extensions = ColumnExtensions::default();
3209            let result = ParserContext::parse_column_extensions(
3210                &mut parser,
3211                &name,
3212                &data_type,
3213                &mut extensions,
3214            );
3215            assert!(result.unwrap());
3216            assert!(extensions.skipping_index_options.is_some());
3217            assert!(extensions.fulltext_index_options.is_some());
3218        }
3219    }
3220
3221    #[test]
3222    fn test_parse_interval_cast() {
3223        let s = "select '10s'::INTERVAL";
3224        let stmts =
3225            ParserContext::create_with_dialect(s, &GreptimeDbDialect {}, ParseOptions::default())
3226                .unwrap();
3227        assert_eq!("SELECT '10 seconds'::INTERVAL", &stmts[0].to_string());
3228    }
3229
3230    #[test]
3231    fn test_parse_create_table_vector_index_options() {
3232        // Test basic vector index
3233        let sql = r"
3234CREATE TABLE vectors (
3235    ts TIMESTAMP TIME INDEX,
3236    vec VECTOR(128) VECTOR INDEX,
3237)";
3238        let result =
3239            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3240                .unwrap();
3241
3242        if let Statement::CreateTable(c) = &result[0] {
3243            c.columns.iter().for_each(|col| {
3244                if col.name().value == "vec" {
3245                    assert!(
3246                        col.extensions
3247                            .vector_index_options
3248                            .as_ref()
3249                            .unwrap()
3250                            .is_empty()
3251                    );
3252                }
3253            });
3254        } else {
3255            panic!("should be create_table statement");
3256        }
3257
3258        // Test vector index with options
3259        let sql = r"
3260CREATE TABLE vectors (
3261    ts TIMESTAMP TIME INDEX,
3262    vec VECTOR(128) VECTOR INDEX WITH (metric='cosine', connectivity='32', expansion_add='256', expansion_search='128')
3263)";
3264        let result =
3265            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3266                .unwrap();
3267
3268        if let Statement::CreateTable(c) = &result[0] {
3269            c.columns.iter().for_each(|col| {
3270                if col.name().value == "vec" {
3271                    let options = col.extensions.vector_index_options.as_ref().unwrap();
3272                    assert_eq!(options.len(), 4);
3273                    assert_eq!(options.get("metric").unwrap(), "cosine");
3274                    assert_eq!(options.get("connectivity").unwrap(), "32");
3275                    assert_eq!(options.get("expansion_add").unwrap(), "256");
3276                    assert_eq!(options.get("expansion_search").unwrap(), "128");
3277                }
3278            });
3279        } else {
3280            panic!("should be create_table statement");
3281        }
3282    }
3283
3284    #[test]
3285    fn test_parse_create_table_vector_index_invalid_type() {
3286        // Test vector index on non-vector type (should fail)
3287        let sql = r"
3288CREATE TABLE vectors (
3289    ts TIMESTAMP TIME INDEX,
3290    col INT VECTOR INDEX,
3291)";
3292        let result =
3293            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3294        assert!(result.is_err());
3295        assert!(
3296            result
3297                .unwrap_err()
3298                .to_string()
3299                .contains("VECTOR INDEX only supports Vector type columns")
3300        );
3301    }
3302
3303    #[test]
3304    fn test_parse_create_table_vector_index_duplicate() {
3305        // Test duplicate vector index (should fail)
3306        let sql = r"
3307CREATE TABLE vectors (
3308    ts TIMESTAMP TIME INDEX,
3309    vec VECTOR(128) VECTOR INDEX VECTOR INDEX,
3310)";
3311        let result =
3312            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3313        assert!(result.is_err());
3314        assert!(
3315            result
3316                .unwrap_err()
3317                .to_string()
3318                .contains("duplicated VECTOR INDEX option")
3319        );
3320    }
3321
3322    #[test]
3323    fn test_parse_create_table_vector_index_invalid_option() {
3324        // Test invalid option key (should fail)
3325        let sql = r"
3326CREATE TABLE vectors (
3327    ts TIMESTAMP TIME INDEX,
3328    vec VECTOR(128) VECTOR INDEX WITH (metric='l2sq', invalid_option='foo')
3329)";
3330        let result =
3331            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3332        assert!(result.is_err());
3333        assert!(
3334            result
3335                .unwrap_err()
3336                .to_string()
3337                .contains("invalid VECTOR INDEX option")
3338        );
3339    }
3340
3341    #[test]
3342    fn test_parse_column_extensions_vector_index() {
3343        // Test vector index on vector type
3344        {
3345            let sql = "VECTOR INDEX WITH (metric = 'l2sq')";
3346            let dialect = GenericDialect {};
3347            let mut tokenizer = Tokenizer::new(&dialect, sql);
3348            let tokens = tokenizer.tokenize().unwrap();
3349            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3350            let name = Ident::new("vec_col");
3351            let data_type =
3352                DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
3353            // First, parse the vector type to set vector_options
3354            let mut extensions = ColumnExtensions {
3355                vector_options: Some(OptionMap::from([(
3356                    VECTOR_OPT_DIM.to_string(),
3357                    "128".to_string(),
3358                )])),
3359                ..Default::default()
3360            };
3361
3362            let result = ParserContext::parse_column_extensions(
3363                &mut parser,
3364                &name,
3365                &data_type,
3366                &mut extensions,
3367            );
3368            assert!(result.is_ok());
3369            assert!(extensions.vector_index_options.is_some());
3370            let vi_options = extensions.vector_index_options.unwrap();
3371            assert_eq!(vi_options.get("metric"), Some("l2sq"));
3372        }
3373
3374        // Test vector index on non-vector type (should fail)
3375        {
3376            let sql = "VECTOR INDEX";
3377            let dialect = GenericDialect {};
3378            let mut tokenizer = Tokenizer::new(&dialect, sql);
3379            let tokens = tokenizer.tokenize().unwrap();
3380            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3381            let name = Ident::new("num_col");
3382            let data_type = DataType::Int(None); // Non-vector type
3383            let mut extensions = ColumnExtensions::default();
3384            let result = ParserContext::parse_column_extensions(
3385                &mut parser,
3386                &name,
3387                &data_type,
3388                &mut extensions,
3389            );
3390            assert!(result.is_err());
3391            assert!(
3392                result
3393                    .unwrap_err()
3394                    .to_string()
3395                    .contains("VECTOR INDEX only supports Vector type columns")
3396            );
3397        }
3398    }
3399}