Skip to main content

sql/parsers/
create_parser.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod json;
16#[cfg(feature = "enterprise")]
17pub mod trigger;
18
19use std::collections::HashMap;
20
21use arrow_buffer::IntervalMonthDayNano;
22use common_catalog::consts::default_engine;
23use datafusion_common::ScalarValue;
24use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
25use datatypes::data_type::ConcreteDataType;
26use itertools::Itertools;
27use snafu::{OptionExt, ResultExt, ensure};
28use sqlparser::ast::{
29    ColumnOption, ColumnOptionDef, DataType, Expr, KeyOrIndexDisplay, NullsDistinctOption,
30    PrimaryKeyConstraint, UniqueConstraint,
31};
32use sqlparser::dialect::keywords::Keyword;
33use sqlparser::keywords::ALL_KEYWORDS;
34use sqlparser::parser::IsOptional::Mandatory;
35use sqlparser::parser::{Parser, ParserError};
36use sqlparser::tokenizer::{Token, TokenWithSpan, Word};
37use table::requests::validate_database_option;
38
39use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
40use crate::error::{
41    self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidFlowQuerySnafu,
42    InvalidIntervalSnafu, InvalidSqlSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result,
43    SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu,
44};
45use crate::parser::{FLOW, ParserContext};
46use crate::parsers::tql_parser;
47use crate::parsers::utils::{
48    self, parse_with_options, validate_column_fulltext_create_option,
49    validate_column_skipping_index_create_option, validate_column_vector_index_create_option,
50};
51use crate::statements::create::{
52    Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
53    CreateTableLike, CreateView, Partitions, SqlOrTql, TableConstraint, VECTOR_OPT_DIM,
54};
55use crate::statements::statement::Statement;
56use crate::statements::transform::type_alias::get_data_type_by_alias_name;
57use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
58use crate::util::{OptionValue, location_to_index, parse_option_string};
59
60pub const ENGINE: &str = "ENGINE";
61pub const MAXVALUE: &str = "MAXVALUE";
62pub const SINK: &str = "SINK";
63pub const EXPIRE: &str = "EXPIRE";
64pub const AFTER: &str = "AFTER";
65pub const INVERTED: &str = "INVERTED";
66pub const SKIPPING: &str = "SKIPPING";
67pub const VECTOR: &str = "VECTOR";
68
69pub type RawIntervalExpr = String;
70
71// Preserve raw CREATE FLOW option entries until operator-side validation.
72// Do not use `OptionMap::new()` here: it can drop non-string values for
73// redacted keys before the flow option allowlist rejects them.
74fn flow_option_map(options: HashMap<String, OptionValue>) -> OptionMap {
75    let mut flow_options = OptionMap::default();
76    for (key, value) in options {
77        flow_options.insert_options(&key, value);
78    }
79    flow_options
80}
81
82/// Parses create [table] statement
83impl<'a> ParserContext<'a> {
84    pub(crate) fn parse_create(&mut self) -> Result<Statement> {
85        match self.parser.peek_token().token {
86            Token::Word(w) => match w.keyword {
87                Keyword::TABLE => self.parse_create_table(),
88
89                Keyword::SCHEMA | Keyword::DATABASE => self.parse_create_database(),
90
91                Keyword::EXTERNAL => self.parse_create_external_table(),
92
93                Keyword::OR => {
94                    let _ = self.parser.next_token();
95                    self.parser
96                        .expect_keyword(Keyword::REPLACE)
97                        .context(SyntaxSnafu)?;
98                    match self.parser.next_token().token {
99                        Token::Word(w) => match w.keyword {
100                            Keyword::VIEW => self.parse_create_view(true),
101                            Keyword::NoKeyword => {
102                                let uppercase = w.value.to_uppercase();
103                                match uppercase.as_str() {
104                                    FLOW => self.parse_create_flow(true),
105                                    _ => self.unsupported(w.to_string()),
106                                }
107                            }
108                            _ => self.unsupported(w.to_string()),
109                        },
110                        _ => self.unsupported(w.to_string()),
111                    }
112                }
113
114                Keyword::VIEW => {
115                    let _ = self.parser.next_token();
116                    self.parse_create_view(false)
117                }
118
119                #[cfg(feature = "enterprise")]
120                Keyword::TRIGGER => {
121                    let _ = self.parser.next_token();
122                    self.parse_create_trigger()
123                }
124
125                Keyword::NoKeyword => {
126                    let _ = self.parser.next_token();
127                    let uppercase = w.value.to_uppercase();
128                    match uppercase.as_str() {
129                        FLOW => self.parse_create_flow(false),
130                        _ => self.unsupported(w.to_string()),
131                    }
132                }
133                _ => self.unsupported(w.to_string()),
134            },
135            unexpected => self.unsupported(unexpected.to_string()),
136        }
137    }
138
139    /// Parse `CREAVE VIEW` statement.
140    fn parse_create_view(&mut self, or_replace: bool) -> Result<Statement> {
141        let if_not_exists = self.parse_if_not_exist()?;
142        let view_name = self.intern_parse_table_name()?;
143
144        let columns = self.parse_view_columns()?;
145
146        self.parser
147            .expect_keyword(Keyword::AS)
148            .context(SyntaxSnafu)?;
149
150        let query = self.parse_query()?;
151
152        Ok(Statement::CreateView(CreateView {
153            name: view_name,
154            columns,
155            or_replace,
156            query: Box::new(query),
157            if_not_exists,
158        }))
159    }
160
161    fn parse_view_columns(&mut self) -> Result<Vec<Ident>> {
162        let mut columns = vec![];
163        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
164            return Ok(columns);
165        }
166
167        loop {
168            let name = self.parse_column_name().context(SyntaxSnafu)?;
169
170            columns.push(name);
171
172            let comma = self.parser.consume_token(&Token::Comma);
173            if self.parser.consume_token(&Token::RParen) {
174                // allow a trailing comma, even though it's not in standard
175                break;
176            } else if !comma {
177                return self.expected("',' or ')' after column name", self.parser.peek_token());
178            }
179        }
180
181        Ok(columns)
182    }
183
184    fn parse_create_external_table(&mut self) -> Result<Statement> {
185        let _ = self.parser.next_token();
186        self.parser
187            .expect_keyword(Keyword::TABLE)
188            .context(SyntaxSnafu)?;
189        let if_not_exists = self.parse_if_not_exist()?;
190        let table_name = self.intern_parse_table_name()?;
191        let (columns, constraints) = self.parse_columns()?;
192        if !columns.is_empty() {
193            validate_time_index(&columns, &constraints)?;
194        }
195
196        let engine = self.parse_table_engine(common_catalog::consts::FILE_ENGINE)?;
197        let options = self.parse_create_table_options()?;
198        Ok(Statement::CreateExternalTable(CreateExternalTable {
199            name: table_name,
200            columns,
201            constraints,
202            options,
203            if_not_exists,
204            engine,
205        }))
206    }
207
208    fn parse_create_database(&mut self) -> Result<Statement> {
209        let _ = self.parser.next_token();
210        let if_not_exists = self.parse_if_not_exist()?;
211        let database_name = self.parse_object_name().context(error::UnexpectedSnafu {
212            expected: "a database name",
213            actual: self.peek_token_as_string(),
214        })?;
215        let database_name = Self::canonicalize_object_name(database_name)?;
216
217        let options = self
218            .parser
219            .parse_options(Keyword::WITH)
220            .context(SyntaxSnafu)?
221            .into_iter()
222            .map(parse_option_string)
223            .collect::<Result<HashMap<String, OptionValue>>>()?;
224
225        for key in options.keys() {
226            ensure!(
227                validate_database_option(key),
228                InvalidDatabaseOptionSnafu { key: key.clone() }
229            );
230        }
231        if let Some(append_mode) = options.get("append_mode").and_then(|x| x.as_string())
232            && append_mode == "true"
233            && options.contains_key("merge_mode")
234        {
235            return InvalidDatabaseOptionSnafu {
236                key: "merge_mode".to_string(),
237            }
238            .fail();
239        }
240
241        Ok(Statement::CreateDatabase(CreateDatabase {
242            name: database_name,
243            if_not_exists,
244            options: OptionMap::new(options),
245        }))
246    }
247
248    fn parse_create_table(&mut self) -> Result<Statement> {
249        let _ = self.parser.next_token();
250
251        let if_not_exists = self.parse_if_not_exist()?;
252
253        let table_name = self.intern_parse_table_name()?;
254
255        if self.parser.parse_keyword(Keyword::LIKE) {
256            let source_name = self.intern_parse_table_name()?;
257
258            return Ok(Statement::CreateTableLike(CreateTableLike {
259                table_name,
260                source_name,
261            }));
262        }
263
264        let (columns, constraints) = self.parse_columns()?;
265        validate_time_index(&columns, &constraints)?;
266
267        let partitions = self.parse_partitions()?;
268        if let Some(partitions) = &partitions {
269            validate_partitions(&columns, partitions)?;
270        }
271
272        let engine = self.parse_table_engine(default_engine())?;
273        let options = self.parse_create_table_options()?;
274        let create_table = CreateTable {
275            if_not_exists,
276            name: table_name,
277            columns,
278            engine,
279            constraints,
280            options,
281            table_id: 0, // table id is assigned by catalog manager
282            partitions,
283        };
284
285        Ok(Statement::CreateTable(create_table))
286    }
287
288    /// "CREATE FLOW" clause
289    fn parse_create_flow(&mut self, or_replace: bool) -> Result<Statement> {
290        let if_not_exists = self.parse_if_not_exist()?;
291
292        let flow_name = self.intern_parse_table_name()?;
293
294        // make `SINK` case in-sensitive
295        if let Token::Word(word) = self.parser.peek_token().token
296            && word.value.eq_ignore_ascii_case(SINK)
297        {
298            self.parser.next_token();
299        } else {
300            Err(ParserError::ParserError(
301                "Expect `SINK` keyword".to_string(),
302            ))
303            .context(SyntaxSnafu)?
304        }
305        self.parser
306            .expect_keyword(Keyword::TO)
307            .context(SyntaxSnafu)?;
308
309        let output_table_name = self.intern_parse_table_name()?;
310
311        let expire_after = if let Token::Word(w1) = &self.parser.peek_token().token
312            && w1.value.eq_ignore_ascii_case(EXPIRE)
313        {
314            self.parser.next_token();
315            if let Token::Word(w2) = &self.parser.peek_token().token
316                && w2.value.eq_ignore_ascii_case(AFTER)
317            {
318                self.parser.next_token();
319                Some(self.parse_interval_no_month("EXPIRE AFTER")?)
320            } else {
321                None
322            }
323        } else {
324            None
325        };
326
327        let eval_interval = if self
328            .parser
329            .consume_tokens(&[Token::make_keyword("EVAL"), Token::make_keyword("INTERVAL")])
330        {
331            Some(self.parse_interval_no_month("EVAL INTERVAL")?)
332        } else {
333            None
334        };
335
336        let comment = if self.parser.parse_keyword(Keyword::COMMENT) {
337            match self.parser.next_token() {
338                TokenWithSpan {
339                    token: Token::SingleQuotedString(value, ..),
340                    ..
341                } => Some(value),
342                unexpected => {
343                    return self
344                        .parser
345                        .expected("string", unexpected)
346                        .context(SyntaxSnafu);
347                }
348            }
349        } else {
350            None
351        };
352
353        let flow_options = self
354            .parser
355            .parse_options(Keyword::WITH)
356            .context(SyntaxSnafu)?
357            .into_iter()
358            .map(parse_option_string)
359            .collect::<Result<HashMap<String, OptionValue>>>()?;
360
361        self.parser
362            .expect_keyword(Keyword::AS)
363            .context(SyntaxSnafu)?;
364
365        let query = Box::new(self.parse_flow_sql_or_tql(true)?);
366
367        Ok(Statement::CreateFlow(CreateFlow {
368            flow_name,
369            sink_table_name: output_table_name,
370            or_replace,
371            if_not_exists,
372            expire_after,
373            eval_interval,
374            comment,
375            flow_options: flow_option_map(flow_options),
376            query,
377        }))
378    }
379
380    fn parse_flow_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
381        let start_loc = self.parser.peek_token().span.start;
382        let start_index = location_to_index(self.sql, &start_loc);
383
384        let starts_with_with = matches!(
385            self.parser.peek_token().token,
386            Token::Word(w) if w.keyword == Keyword::WITH
387        );
388
389        // only accept sql or tql
390        let query = match self.parser.peek_token().token {
391            Token::Word(w) => match w.keyword {
392                Keyword::SELECT => self.parse_query(),
393                Keyword::WITH => self.parse_with_tql_with_now(require_now_expr),
394                Keyword::NoKeyword
395                    if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
396                {
397                    self.parse_tql(require_now_expr)
398                }
399
400                _ => self.unsupported(self.peek_token_as_string()),
401            },
402            _ => self.unsupported(self.peek_token_as_string()),
403        }?;
404
405        if starts_with_with {
406            let Statement::Query(query) = &query else {
407                return InvalidFlowQuerySnafu {
408                    reason: "Expect a query after WITH".to_string(),
409                }
410                .fail();
411            };
412
413            if !utils::is_simple_tql_cte_query(query) {
414                return InvalidFlowQuerySnafu {
415                    reason: "WITH is only supported for the simplest TQL CTE in CREATE FLOW"
416                        .to_string(),
417                }
418                .fail();
419            }
420        }
421
422        let end_token = self.parser.peek_token();
423
424        let raw_query = if end_token == Token::EOF {
425            &self.sql[start_index..]
426        } else {
427            let end_loc = end_token.span.end;
428            let end_index = location_to_index(self.sql, &end_loc);
429            &self.sql[start_index..end_index.min(self.sql.len())]
430        };
431        let raw_query = raw_query.trim_end_matches(";");
432
433        let query = SqlOrTql::try_from_statement(query, raw_query)?;
434        Ok(query)
435    }
436
437    /// Parse the interval expr to duration in seconds.
438    fn parse_interval_no_month(&mut self, context: &str) -> Result<i64> {
439        let interval = self.parse_interval_month_day_nano()?.0;
440        if interval.months != 0 {
441            return InvalidIntervalSnafu {
442                reason: format!("Interval with months is not allowed in {context}"),
443            }
444            .fail();
445        }
446        Ok(
447            interval.nanoseconds / 1_000_000_000
448                + interval.days as i64 * 60 * 60 * 24
449                + interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, // 1 month=365.25/12=30.44 days
450                                                                       // this is to keep the same as https://docs.rs/humantime/latest/humantime/fn.parse_duration.html
451                                                                       // which we use in database to parse i.e. ttl interval and many other intervals
452        )
453    }
454
455    /// Parse interval expr to [`IntervalMonthDayNano`].
456    fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> {
457        let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
458        let raw_interval_expr = interval_expr.to_string();
459        let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone(), false)?
460            .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
461            .ok()
462            .with_context(|| InvalidIntervalSnafu {
463                reason: format!("cannot cast {} to interval type", interval_expr),
464            })?;
465        if let ScalarValue::IntervalMonthDayNano(Some(interval)) = interval {
466            Ok((interval, raw_interval_expr))
467        } else {
468            unreachable!()
469        }
470    }
471
472    fn parse_if_not_exist(&mut self) -> Result<bool> {
473        match self.parser.peek_token().token {
474            Token::Word(w) if Keyword::IF != w.keyword => return Ok(false),
475            _ => {}
476        }
477
478        if self.parser.parse_keywords(&[Keyword::IF, Keyword::NOT]) {
479            return self
480                .parser
481                .expect_keyword(Keyword::EXISTS)
482                .map(|_| true)
483                .context(UnexpectedSnafu {
484                    expected: "EXISTS",
485                    actual: self.peek_token_as_string(),
486                });
487        }
488
489        if self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]) {
490            return UnsupportedSnafu { keyword: "EXISTS" }.fail();
491        }
492
493        Ok(false)
494    }
495
496    fn parse_create_table_options(&mut self) -> Result<OptionMap> {
497        parse_with_options(&mut self.parser)
498    }
499
500    /// "PARTITION ON COLUMNS (...)" clause
501    fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
502        if !self.parser.parse_keyword(Keyword::PARTITION) {
503            return Ok(None);
504        }
505
506        self.parse_partition_on_columns().map(Some)
507    }
508
509    /// Parses the "ON COLUMNS (...) (...)" part after "PARTITION".
510    pub(crate) fn parse_partition_on_columns(&mut self) -> Result<Partitions> {
511        self.parser
512            .expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
513            .context(error::UnexpectedSnafu {
514                expected: "ON, COLUMNS",
515                actual: self.peek_token_as_string(),
516            })?;
517
518        let raw_column_list = self
519            .parser
520            .parse_parenthesized_column_list(Mandatory, false)
521            .context(error::SyntaxSnafu)?;
522        let column_list = raw_column_list
523            .into_iter()
524            .map(Self::canonicalize_identifier)
525            .collect();
526
527        let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
528
529        Ok(Partitions { column_list, exprs })
530    }
531
532    fn parse_partition_entry(&mut self) -> Result<Expr> {
533        self.parser.parse_expr().context(error::SyntaxSnafu)
534    }
535
536    /// Parse a comma-separated list wrapped by "()", and of which all items accepted by `F`
537    fn parse_comma_separated<T, F>(&mut self, mut f: F) -> Result<Vec<T>>
538    where
539        F: FnMut(&mut ParserContext<'a>) -> Result<T>,
540    {
541        self.parser
542            .expect_token(&Token::LParen)
543            .context(error::UnexpectedSnafu {
544                expected: "(",
545                actual: self.peek_token_as_string(),
546            })?;
547
548        let mut values = vec![];
549        while self.parser.peek_token() != Token::RParen {
550            values.push(f(self)?);
551            if !self.parser.consume_token(&Token::Comma) {
552                break;
553            }
554        }
555
556        self.parser
557            .expect_token(&Token::RParen)
558            .context(error::UnexpectedSnafu {
559                expected: ")",
560                actual: self.peek_token_as_string(),
561            })?;
562
563        Ok(values)
564    }
565
566    /// Parse the columns and constraints.
567    fn parse_columns(&mut self) -> Result<(Vec<Column>, Vec<TableConstraint>)> {
568        let mut columns = vec![];
569        let mut constraints = vec![];
570        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
571            return Ok((columns, constraints));
572        }
573
574        loop {
575            if let Some(constraint) = self.parse_optional_table_constraint()? {
576                constraints.push(constraint);
577            } else if let Token::Word(_) = self.parser.peek_token().token {
578                self.parse_column(&mut columns, &mut constraints)?;
579            } else {
580                return self.expected(
581                    "column name or constraint definition",
582                    self.parser.peek_token(),
583                );
584            }
585            let comma = self.parser.consume_token(&Token::Comma);
586            if self.parser.consume_token(&Token::RParen) {
587                // allow a trailing comma, even though it's not in standard
588                break;
589            } else if !comma {
590                return self.expected(
591                    "',' or ')' after column definition",
592                    self.parser.peek_token(),
593                );
594            }
595        }
596
597        Ok((columns, constraints))
598    }
599
600    fn parse_column(
601        &mut self,
602        columns: &mut Vec<Column>,
603        constraints: &mut Vec<TableConstraint>,
604    ) -> Result<()> {
605        let mut column = self.parse_column_def()?;
606
607        let mut time_index_opt_idx = None;
608        for (index, opt) in column.options().iter().enumerate() {
609            if let ColumnOption::DialectSpecific(tokens) = &opt.option
610                && matches!(
611                    &tokens[..],
612                    [
613                        Token::Word(Word {
614                            keyword: Keyword::TIME,
615                            ..
616                        }),
617                        Token::Word(Word {
618                            keyword: Keyword::INDEX,
619                            ..
620                        })
621                    ]
622                )
623            {
624                ensure!(
625                    time_index_opt_idx.is_none(),
626                    InvalidColumnOptionSnafu {
627                        name: column.name().to_string(),
628                        msg: "duplicated time index",
629                    }
630                );
631                time_index_opt_idx = Some(index);
632
633                let constraint = TableConstraint::TimeIndex {
634                    column: Ident::new(column.name().value.clone()),
635                };
636                constraints.push(constraint);
637            }
638        }
639
640        if let Some(index) = time_index_opt_idx {
641            ensure!(
642                !column.options().contains(&ColumnOptionDef {
643                    option: ColumnOption::Null,
644                    name: None,
645                }),
646                InvalidColumnOptionSnafu {
647                    name: column.name().to_string(),
648                    msg: "time index column can't be null",
649                }
650            );
651
652            // The timestamp type may be an alias type, we have to retrieve the actual type.
653            let data_type = get_unalias_type(column.data_type());
654            ensure!(
655                matches!(data_type, DataType::Timestamp(_, _)),
656                InvalidColumnOptionSnafu {
657                    name: column.name().to_string(),
658                    msg: "time index column data type should be timestamp",
659                }
660            );
661
662            let not_null_opt = ColumnOptionDef {
663                option: ColumnOption::NotNull,
664                name: None,
665            };
666
667            if !column.options().contains(&not_null_opt) {
668                column.mut_options().push(not_null_opt);
669            }
670
671            let _ = column.mut_options().remove(index);
672        }
673
674        columns.push(column);
675
676        Ok(())
677    }
678
679    /// Parse the column name and check if it's valid.
680    fn parse_column_name(&mut self) -> std::result::Result<Ident, ParserError> {
681        let name = self.parser.parse_identifier()?;
682        if name.quote_style.is_none() &&
683        // "ALL_KEYWORDS" are sorted.
684            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()
685        {
686            return Err(ParserError::ParserError(format!(
687                "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
688                &name.value
689            )));
690        }
691
692        Ok(name)
693    }
694
695    pub fn parse_column_def(&mut self) -> Result<Column> {
696        let name = self.parse_column_name().context(SyntaxSnafu)?;
697        let parser = &mut self.parser;
698
699        ensure!(
700            !(name.quote_style.is_none() &&
701            // "ALL_KEYWORDS" are sorted.
702            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()),
703            InvalidSqlSnafu {
704                msg: format!(
705                    "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
706                    &name.value
707                ),
708            }
709        );
710
711        let mut extensions = ColumnExtensions::default();
712
713        let data_type = parser.parse_data_type().context(SyntaxSnafu)?;
714        // Must immediately parse the JSON datatype format because it is closely after the "JSON"
715        // datatype, like this: "JSON(format = ...)".
716        if matches!(data_type, DataType::JSON) {
717            extensions.json_datatype_options = json::parse_json_datatype_options(parser)?;
718        }
719
720        let mut options = vec![];
721        loop {
722            if parser.parse_keyword(Keyword::CONSTRAINT) {
723                let name = Some(parser.parse_identifier().context(SyntaxSnafu)?);
724                if let Some(option) = Self::parse_optional_column_option(parser)? {
725                    options.push(ColumnOptionDef { name, option });
726                } else {
727                    return parser
728                        .expected(
729                            "constraint details after CONSTRAINT <name>",
730                            parser.peek_token(),
731                        )
732                        .context(SyntaxSnafu);
733                }
734            } else if let Some(option) = Self::parse_optional_column_option(parser)? {
735                options.push(ColumnOptionDef { name: None, option });
736            } else if !Self::parse_column_extensions(parser, &name, &data_type, &mut extensions)? {
737                break;
738            };
739        }
740
741        Ok(Column {
742            column_def: ColumnDef {
743                name: Self::canonicalize_identifier(name),
744                data_type,
745                options,
746            },
747            extensions,
748        })
749    }
750
751    fn parse_optional_column_option(parser: &mut Parser<'_>) -> Result<Option<ColumnOption>> {
752        if parser.parse_keywords(&[Keyword::CHARACTER, Keyword::SET]) {
753            Ok(Some(ColumnOption::CharacterSet(
754                parser.parse_object_name(false).context(SyntaxSnafu)?,
755            )))
756        } else if parser.parse_keywords(&[Keyword::NOT, Keyword::NULL]) {
757            Ok(Some(ColumnOption::NotNull))
758        } else if parser.parse_keywords(&[Keyword::COMMENT]) {
759            match parser.next_token() {
760                TokenWithSpan {
761                    token: Token::SingleQuotedString(value, ..),
762                    ..
763                } => Ok(Some(ColumnOption::Comment(value))),
764                unexpected => parser.expected("string", unexpected).context(SyntaxSnafu),
765            }
766        } else if parser.parse_keyword(Keyword::NULL) {
767            Ok(Some(ColumnOption::Null))
768        } else if parser.parse_keyword(Keyword::DEFAULT) {
769            Ok(Some(ColumnOption::Default(
770                parser.parse_expr().context(SyntaxSnafu)?,
771            )))
772        } else if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
773            Ok(Some(ColumnOption::PrimaryKey(PrimaryKeyConstraint {
774                name: None,
775                index_name: None,
776                index_type: None,
777                columns: vec![],
778                index_options: vec![],
779                characteristics: None,
780            })))
781        } else if parser.parse_keyword(Keyword::UNIQUE) {
782            Ok(Some(ColumnOption::Unique(UniqueConstraint {
783                name: None,
784                index_name: None,
785                index_type_display: KeyOrIndexDisplay::None,
786                index_type: None,
787                columns: vec![],
788                index_options: vec![],
789                characteristics: None,
790                nulls_distinct: NullsDistinctOption::None,
791            })))
792        } else if parser.parse_keywords(&[Keyword::TIME, Keyword::INDEX]) {
793            // Use a DialectSpecific option for time index
794            Ok(Some(ColumnOption::DialectSpecific(vec![
795                Token::Word(Word {
796                    value: "TIME".to_string(),
797                    quote_style: None,
798                    keyword: Keyword::TIME,
799                }),
800                Token::Word(Word {
801                    value: "INDEX".to_string(),
802                    quote_style: None,
803                    keyword: Keyword::INDEX,
804                }),
805            ])))
806        } else {
807            Ok(None)
808        }
809    }
810
811    /// Parse a column option extensions.
812    ///
813    /// This function will handle:
814    /// - Vector type
815    /// - Indexes
816    fn parse_column_extensions(
817        parser: &mut Parser<'_>,
818        column_name: &Ident,
819        column_type: &DataType,
820        column_extensions: &mut ColumnExtensions,
821    ) -> Result<bool> {
822        if let DataType::Custom(name, tokens) = column_type
823            && name.0.len() == 1
824            && &name.0[0].to_string_unquoted().to_uppercase() == "VECTOR"
825        {
826            ensure!(
827                tokens.len() == 1,
828                InvalidColumnOptionSnafu {
829                    name: column_name.to_string(),
830                    msg: "VECTOR type should have dimension",
831                }
832            );
833
834            let dimension =
835                tokens[0]
836                    .parse::<u32>()
837                    .ok()
838                    .with_context(|| InvalidColumnOptionSnafu {
839                        name: column_name.to_string(),
840                        msg: "dimension should be a positive integer",
841                    })?;
842
843            let options = OptionMap::from([(VECTOR_OPT_DIM.to_string(), dimension.to_string())]);
844            column_extensions.vector_options = Some(options);
845        }
846
847        // parse index options in column definition
848        let mut is_index_declared = false;
849
850        // skipping index
851        if let Token::Word(word) = parser.peek_token().token
852            && word.value.eq_ignore_ascii_case(SKIPPING)
853        {
854            parser.next_token();
855            // Consume `INDEX` keyword
856            ensure!(
857                parser.parse_keyword(Keyword::INDEX),
858                InvalidColumnOptionSnafu {
859                    name: column_name.to_string(),
860                    msg: "expect INDEX after SKIPPING keyword",
861                }
862            );
863            ensure!(
864                column_extensions.skipping_index_options.is_none(),
865                InvalidColumnOptionSnafu {
866                    name: column_name.to_string(),
867                    msg: "duplicated SKIPPING index option",
868                }
869            );
870
871            let options = parser
872                .parse_options(Keyword::WITH)
873                .context(error::SyntaxSnafu)?
874                .into_iter()
875                .map(parse_option_string)
876                .collect::<Result<Vec<_>>>()?;
877
878            for (key, _) in options.iter() {
879                ensure!(
880                    validate_column_skipping_index_create_option(key),
881                    InvalidColumnOptionSnafu {
882                        name: column_name.to_string(),
883                        msg: format!("invalid SKIPPING INDEX option: {key}"),
884                    }
885                );
886            }
887
888            let options = OptionMap::new(options);
889            column_extensions.skipping_index_options = Some(options);
890            is_index_declared |= true;
891        }
892
893        // fulltext index
894        if parser.parse_keyword(Keyword::FULLTEXT) {
895            // Consume `INDEX` keyword
896            ensure!(
897                parser.parse_keyword(Keyword::INDEX),
898                InvalidColumnOptionSnafu {
899                    name: column_name.to_string(),
900                    msg: "expect INDEX after FULLTEXT keyword",
901                }
902            );
903
904            ensure!(
905                column_extensions.fulltext_index_options.is_none(),
906                InvalidColumnOptionSnafu {
907                    name: column_name.to_string(),
908                    msg: "duplicated FULLTEXT INDEX option",
909                }
910            );
911
912            let column_type = get_unalias_type(column_type);
913            let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
914            ensure!(
915                data_type == ConcreteDataType::string_datatype(),
916                InvalidColumnOptionSnafu {
917                    name: column_name.to_string(),
918                    msg: "FULLTEXT index only supports string type",
919                }
920            );
921
922            let options = parser
923                .parse_options(Keyword::WITH)
924                .context(error::SyntaxSnafu)?
925                .into_iter()
926                .map(parse_option_string)
927                .collect::<Result<Vec<_>>>()?;
928
929            for (key, _) in options.iter() {
930                ensure!(
931                    validate_column_fulltext_create_option(key),
932                    InvalidColumnOptionSnafu {
933                        name: column_name.to_string(),
934                        msg: format!("invalid FULLTEXT INDEX option: {key}"),
935                    }
936                );
937            }
938
939            let options = OptionMap::new(options);
940            column_extensions.fulltext_index_options = Some(options);
941            is_index_declared |= true;
942        }
943
944        // inverted index
945        if let Token::Word(word) = parser.peek_token().token
946            && word.value.eq_ignore_ascii_case(INVERTED)
947        {
948            parser.next_token();
949            // Consume `INDEX` keyword
950            ensure!(
951                parser.parse_keyword(Keyword::INDEX),
952                InvalidColumnOptionSnafu {
953                    name: column_name.to_string(),
954                    msg: "expect INDEX after INVERTED keyword",
955                }
956            );
957
958            ensure!(
959                column_extensions.inverted_index_options.is_none(),
960                InvalidColumnOptionSnafu {
961                    name: column_name.to_string(),
962                    msg: "duplicated INVERTED index option",
963                }
964            );
965
966            // inverted index doesn't have options, skipping `WITH`
967            // try cache `WITH` and throw error
968            let with_token = parser.peek_token();
969            ensure!(
970                with_token.token
971                    != Token::Word(Word {
972                        value: "WITH".to_string(),
973                        keyword: Keyword::WITH,
974                        quote_style: None,
975                    }),
976                InvalidColumnOptionSnafu {
977                    name: column_name.to_string(),
978                    msg: "INVERTED index doesn't support options",
979                }
980            );
981
982            column_extensions.inverted_index_options = Some(OptionMap::default());
983            is_index_declared |= true;
984        }
985
986        // vector index
987        if let Token::Word(word) = parser.peek_token().token
988            && word.value.eq_ignore_ascii_case(VECTOR)
989        {
990            parser.next_token();
991            // Consume `INDEX` keyword
992            ensure!(
993                parser.parse_keyword(Keyword::INDEX),
994                InvalidColumnOptionSnafu {
995                    name: column_name.to_string(),
996                    msg: "expect INDEX after VECTOR keyword",
997                }
998            );
999
1000            ensure!(
1001                column_extensions.vector_index_options.is_none(),
1002                InvalidColumnOptionSnafu {
1003                    name: column_name.to_string(),
1004                    msg: "duplicated VECTOR INDEX option",
1005                }
1006            );
1007
1008            // Check that column is a vector type
1009            let column_type = get_unalias_type(column_type);
1010            let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
1011            ensure!(
1012                matches!(data_type, ConcreteDataType::Vector(_)),
1013                InvalidColumnOptionSnafu {
1014                    name: column_name.to_string(),
1015                    msg: "VECTOR INDEX only supports Vector type columns",
1016                }
1017            );
1018
1019            let options = parser
1020                .parse_options(Keyword::WITH)
1021                .context(error::SyntaxSnafu)?
1022                .into_iter()
1023                .map(parse_option_string)
1024                .collect::<Result<Vec<_>>>()?;
1025
1026            for (key, _) in options.iter() {
1027                ensure!(
1028                    validate_column_vector_index_create_option(key),
1029                    InvalidColumnOptionSnafu {
1030                        name: column_name.to_string(),
1031                        msg: format!("invalid VECTOR INDEX option: {key}"),
1032                    }
1033                );
1034            }
1035
1036            let options = OptionMap::new(options);
1037            column_extensions.vector_index_options = Some(options);
1038            is_index_declared |= true;
1039        }
1040
1041        Ok(is_index_declared)
1042    }
1043
1044    fn parse_optional_table_constraint(&mut self) -> Result<Option<TableConstraint>> {
1045        match self.parser.next_token() {
1046            TokenWithSpan {
1047                token: Token::Word(w),
1048                ..
1049            } if w.keyword == Keyword::PRIMARY => {
1050                self.parser
1051                    .expect_keyword(Keyword::KEY)
1052                    .context(error::UnexpectedSnafu {
1053                        expected: "KEY",
1054                        actual: self.peek_token_as_string(),
1055                    })?;
1056                let raw_columns = self
1057                    .parser
1058                    .parse_parenthesized_column_list(Mandatory, false)
1059                    .context(error::SyntaxSnafu)?;
1060                let columns = raw_columns
1061                    .into_iter()
1062                    .map(Self::canonicalize_identifier)
1063                    .collect();
1064                Ok(Some(TableConstraint::PrimaryKey { columns }))
1065            }
1066            TokenWithSpan {
1067                token: Token::Word(w),
1068                ..
1069            } if w.keyword == Keyword::TIME => {
1070                self.parser
1071                    .expect_keyword(Keyword::INDEX)
1072                    .context(error::UnexpectedSnafu {
1073                        expected: "INDEX",
1074                        actual: self.peek_token_as_string(),
1075                    })?;
1076
1077                let raw_columns = self
1078                    .parser
1079                    .parse_parenthesized_column_list(Mandatory, false)
1080                    .context(error::SyntaxSnafu)?;
1081                let mut columns = raw_columns
1082                    .into_iter()
1083                    .map(Self::canonicalize_identifier)
1084                    .collect::<Vec<_>>();
1085
1086                ensure!(
1087                    columns.len() == 1,
1088                    InvalidTimeIndexSnafu {
1089                        msg: "it should contain only one column in time index",
1090                    }
1091                );
1092
1093                Ok(Some(TableConstraint::TimeIndex {
1094                    column: columns.pop().unwrap(),
1095                }))
1096            }
1097            _ => {
1098                self.parser.prev_token();
1099                Ok(None)
1100            }
1101        }
1102    }
1103
1104    /// Parses the set of valid formats
1105    fn parse_table_engine(&mut self, default: &str) -> Result<String> {
1106        if !self.consume_token(ENGINE) {
1107            return Ok(default.to_string());
1108        }
1109
1110        self.parser
1111            .expect_token(&Token::Eq)
1112            .context(error::UnexpectedSnafu {
1113                expected: "=",
1114                actual: self.peek_token_as_string(),
1115            })?;
1116
1117        let token = self.parser.next_token();
1118        if let Token::Word(w) = token.token {
1119            Ok(w.value)
1120        } else {
1121            self.expected("'Engine' is missing", token)
1122        }
1123    }
1124}
1125
1126fn validate_time_index(columns: &[Column], constraints: &[TableConstraint]) -> Result<()> {
1127    let time_index_constraints: Vec<_> = constraints
1128        .iter()
1129        .filter_map(|c| match c {
1130            TableConstraint::TimeIndex { column } => Some(column),
1131            _ => None,
1132        })
1133        .unique()
1134        .collect();
1135
1136    ensure!(!time_index_constraints.is_empty(), MissingTimeIndexSnafu);
1137    ensure!(
1138        time_index_constraints.len() == 1,
1139        InvalidTimeIndexSnafu {
1140            msg: format!(
1141                "expected only one time index constraint but actual {}",
1142                time_index_constraints.len()
1143            ),
1144        }
1145    );
1146
1147    // It's safe to use time_index_constraints[0][0],
1148    // we already check the bound above.
1149    let time_index_column_ident = &time_index_constraints[0];
1150    let time_index_column = columns
1151        .iter()
1152        .find(|c| c.name().value == *time_index_column_ident.value)
1153        .with_context(|| InvalidTimeIndexSnafu {
1154            msg: format!(
1155                "time index column {} not found in columns",
1156                time_index_column_ident
1157            ),
1158        })?;
1159
1160    let time_index_data_type = get_unalias_type(time_index_column.data_type());
1161    ensure!(
1162        matches!(time_index_data_type, DataType::Timestamp(_, _)),
1163        InvalidColumnOptionSnafu {
1164            name: time_index_column.name().to_string(),
1165            msg: "time index column data type should be timestamp",
1166        }
1167    );
1168
1169    Ok(())
1170}
1171
1172fn get_unalias_type(data_type: &DataType) -> DataType {
1173    match data_type {
1174        DataType::Custom(name, tokens) if name.0.len() == 1 && tokens.is_empty() => {
1175            if let Some(real_type) =
1176                get_data_type_by_alias_name(name.0[0].to_string_unquoted().as_str())
1177            {
1178                real_type
1179            } else {
1180                data_type.clone()
1181            }
1182        }
1183        _ => data_type.clone(),
1184    }
1185}
1186
1187fn validate_partitions(columns: &[Column], partitions: &Partitions) -> Result<()> {
1188    let partition_columns = ensure_partition_columns_defined(columns, partitions)?;
1189
1190    ensure_exprs_are_binary(&partitions.exprs, &partition_columns)?;
1191
1192    Ok(())
1193}
1194
1195/// Ensure all exprs are binary expr and all the columns are defined in the column list.
1196fn ensure_exprs_are_binary(exprs: &[Expr], columns: &[&Column]) -> Result<()> {
1197    for expr in exprs {
1198        // The first level must be binary expr
1199        if let Expr::BinaryOp { left, op: _, right } = expr {
1200            ensure_one_expr(left, columns)?;
1201            ensure_one_expr(right, columns)?;
1202        } else {
1203            return error::InvalidSqlSnafu {
1204                msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1205            }
1206            .fail();
1207        }
1208    }
1209    Ok(())
1210}
1211
1212/// Check if the expr is a binary expr, an ident or a literal value.
1213/// If is ident, then check it is in the column list.
1214/// This recursive function is intended to be used by [ensure_exprs_are_binary].
1215fn ensure_one_expr(expr: &Expr, columns: &[&Column]) -> Result<()> {
1216    match expr {
1217        Expr::BinaryOp { left, op: _, right } => {
1218            ensure_one_expr(left, columns)?;
1219            ensure_one_expr(right, columns)?;
1220            Ok(())
1221        }
1222        Expr::Identifier(ident) => {
1223            let column_name = &ident.value;
1224            ensure!(
1225                columns.iter().any(|c| &c.name().value == column_name),
1226                error::InvalidSqlSnafu {
1227                    msg: format!(
1228                        "Column {:?} in rule expr is not referenced in PARTITION ON",
1229                        column_name
1230                    ),
1231                }
1232            );
1233            Ok(())
1234        }
1235        Expr::Value(_) => Ok(()),
1236        Expr::UnaryOp { expr, .. } => {
1237            ensure_one_expr(expr, columns)?;
1238            Ok(())
1239        }
1240        _ => error::InvalidSqlSnafu {
1241            msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1242        }
1243        .fail(),
1244    }
1245}
1246
1247/// Ensure that all columns used in "PARTITION ON COLUMNS" are defined in create table.
1248fn ensure_partition_columns_defined<'a>(
1249    columns: &'a [Column],
1250    partitions: &'a Partitions,
1251) -> Result<Vec<&'a Column>> {
1252    partitions
1253        .column_list
1254        .iter()
1255        .map(|x| {
1256            let x = ParserContext::canonicalize_identifier(x.clone());
1257            // Normally the columns in "create table" won't be too many,
1258            // a linear search to find the target every time is fine.
1259            columns
1260                .iter()
1261                .find(|c| *c.name().value == x.value)
1262                .context(error::InvalidSqlSnafu {
1263                    msg: format!("Partition column {:?} not defined", x.value),
1264                })
1265        })
1266        .collect::<Result<Vec<&Column>>>()
1267}
1268
1269#[cfg(test)]
1270mod tests {
1271    use std::assert_matches;
1272    use std::collections::HashMap;
1273
1274    use common_catalog::consts::FILE_ENGINE;
1275    use common_error::ext::ErrorExt;
1276    use sqlparser::ast::ColumnOption::NotNull;
1277    use sqlparser::ast::{BinaryOperator, Expr, ObjectName, ObjectNamePart, Value};
1278    use sqlparser::dialect::GenericDialect;
1279    use sqlparser::tokenizer::Tokenizer;
1280
1281    use super::*;
1282    use crate::dialect::GreptimeDbDialect;
1283    use crate::parser::ParseOptions;
1284
1285    fn string_option_map(
1286        entries: impl IntoIterator<Item = (&'static str, &'static str)>,
1287    ) -> OptionMap {
1288        OptionMap::new(entries.into_iter().map(|(key, value)| {
1289            (
1290                key.to_string(),
1291                OptionValue::try_new(Expr::Value(
1292                    Value::SingleQuotedString(value.to_string()).into(),
1293                ))
1294                .unwrap(),
1295            )
1296        }))
1297    }
1298
1299    #[test]
1300    fn test_parse_create_table_like() {
1301        let sql = "CREATE TABLE t1 LIKE t2";
1302        let stmts =
1303            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1304                .unwrap();
1305
1306        assert_eq!(1, stmts.len());
1307        match &stmts[0] {
1308            Statement::CreateTableLike(c) => {
1309                assert_eq!(c.table_name.to_string(), "t1");
1310                assert_eq!(c.source_name.to_string(), "t2");
1311            }
1312            _ => unreachable!(),
1313        }
1314    }
1315
1316    #[test]
1317    fn test_validate_external_table_options() {
1318        let sql = "CREATE EXTERNAL TABLE city (
1319            host string,
1320            ts timestamp,
1321            cpu float64 default 0,
1322            memory float64,
1323            TIME INDEX (ts),
1324            PRIMARY KEY(ts, host)
1325        ) with(location='/var/data/city.csv',format='csv',foo='bar');";
1326
1327        let result =
1328            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1329        assert!(matches!(
1330            result,
1331            Err(error::Error::InvalidTableOption { .. })
1332        ));
1333    }
1334
1335    #[test]
1336    fn test_parse_create_external_table() {
1337        struct Test<'a> {
1338            sql: &'a str,
1339            expected_table_name: &'a str,
1340            expected_options: HashMap<&'a str, &'a str>,
1341            expected_engine: &'a str,
1342            expected_if_not_exist: bool,
1343        }
1344
1345        let tests = [
1346            Test {
1347                sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
1348                expected_table_name: "city",
1349                expected_options: HashMap::from([
1350                    ("location", "/var/data/city.csv"),
1351                    ("format", "csv"),
1352                ]),
1353                expected_engine: FILE_ENGINE,
1354                expected_if_not_exist: false,
1355            },
1356            Test {
1357                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');",
1358                expected_table_name: "city",
1359                expected_options: HashMap::from([
1360                    ("location", "/var/data/city.csv"),
1361                    ("format", "csv"),
1362                ]),
1363                expected_engine: "foo",
1364                expected_if_not_exist: true,
1365            },
1366            Test {
1367                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv','compaction.type'='bar');",
1368                expected_table_name: "city",
1369                expected_options: HashMap::from([
1370                    ("location", "/var/data/city.csv"),
1371                    ("format", "csv"),
1372                    ("compaction.type", "bar"),
1373                ]),
1374                expected_engine: "foo",
1375                expected_if_not_exist: true,
1376            },
1377        ];
1378
1379        for test in tests {
1380            let stmts = ParserContext::create_with_dialect(
1381                test.sql,
1382                &GreptimeDbDialect {},
1383                ParseOptions::default(),
1384            )
1385            .unwrap();
1386            assert_eq!(1, stmts.len());
1387            match &stmts[0] {
1388                Statement::CreateExternalTable(c) => {
1389                    assert_eq!(c.name.to_string(), test.expected_table_name.to_string());
1390                    assert_eq!(c.options.to_str_map(), test.expected_options);
1391                    assert_eq!(c.if_not_exists, test.expected_if_not_exist);
1392                    assert_eq!(c.engine, test.expected_engine);
1393                }
1394                _ => unreachable!(),
1395            }
1396        }
1397    }
1398
1399    #[test]
1400    fn test_parse_create_external_table_with_schema() {
1401        let sql = "CREATE EXTERNAL TABLE city (
1402            host string,
1403            ts timestamp,
1404            cpu float32 default 0,
1405            memory float64,
1406            TIME INDEX (ts),
1407            PRIMARY KEY(ts, host),
1408        ) with(location='/var/data/city.csv',format='csv');";
1409
1410        let options = HashMap::from([("location", "/var/data/city.csv"), ("format", "csv")]);
1411
1412        let stmts =
1413            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1414                .unwrap();
1415        assert_eq!(1, stmts.len());
1416        match &stmts[0] {
1417            Statement::CreateExternalTable(c) => {
1418                assert_eq!(c.name.to_string(), "city");
1419                assert_eq!(c.options.to_str_map(), options);
1420
1421                let columns = &c.columns;
1422                assert_column_def(&columns[0].column_def, "host", "STRING");
1423                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
1424                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
1425                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
1426
1427                let constraints = &c.constraints;
1428                assert_eq!(
1429                    &constraints[0],
1430                    &TableConstraint::TimeIndex {
1431                        column: Ident::new("ts"),
1432                    }
1433                );
1434                assert_eq!(
1435                    &constraints[1],
1436                    &TableConstraint::PrimaryKey {
1437                        columns: vec![Ident::new("ts"), Ident::new("host")]
1438                    }
1439                );
1440            }
1441            _ => unreachable!(),
1442        }
1443    }
1444
1445    #[test]
1446    fn test_parse_create_database() {
1447        let sql = "create database";
1448        let result =
1449            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1450        assert!(
1451            result
1452                .unwrap_err()
1453                .to_string()
1454                .contains("Unexpected token while parsing SQL statement")
1455        );
1456
1457        let sql = "create database prometheus";
1458        let stmts =
1459            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1460                .unwrap();
1461
1462        assert_eq!(1, stmts.len());
1463        match &stmts[0] {
1464            Statement::CreateDatabase(c) => {
1465                assert_eq!(c.name.to_string(), "prometheus");
1466                assert!(!c.if_not_exists);
1467            }
1468            _ => unreachable!(),
1469        }
1470
1471        let sql = "create database if not exists prometheus";
1472        let stmts =
1473            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1474                .unwrap();
1475
1476        assert_eq!(1, stmts.len());
1477        match &stmts[0] {
1478            Statement::CreateDatabase(c) => {
1479                assert_eq!(c.name.to_string(), "prometheus");
1480                assert!(c.if_not_exists);
1481            }
1482            _ => unreachable!(),
1483        }
1484
1485        let sql = "CREATE DATABASE `fOo`";
1486        let result =
1487            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1488        let stmts = result.unwrap();
1489        match &stmts.last().unwrap() {
1490            Statement::CreateDatabase(c) => {
1491                assert_eq!(c.name, vec![Ident::with_quote('`', "fOo")].into());
1492                assert!(!c.if_not_exists);
1493            }
1494            _ => unreachable!(),
1495        }
1496
1497        let sql = "CREATE DATABASE prometheus with (ttl='1h');";
1498        let result =
1499            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1500        let stmts = result.unwrap();
1501        match &stmts[0] {
1502            Statement::CreateDatabase(c) => {
1503                assert_eq!(c.name.to_string(), "prometheus");
1504                assert!(!c.if_not_exists);
1505                assert_eq!(c.options.get("ttl").unwrap(), "1h");
1506            }
1507            _ => unreachable!(),
1508        }
1509    }
1510
1511    #[test]
1512    fn test_parse_create_flow_more_testcases() {
1513        use pretty_assertions::assert_eq;
1514        fn parse_create_flow(sql: &str) -> CreateFlow {
1515            let stmts = ParserContext::create_with_dialect(
1516                sql,
1517                &GreptimeDbDialect {},
1518                ParseOptions::default(),
1519            )
1520            .unwrap();
1521            assert_eq!(1, stmts.len());
1522            match &stmts[0] {
1523                Statement::CreateFlow(c) => c.clone(),
1524                _ => unreachable!(),
1525            }
1526        }
1527        struct CreateFlowWoutQuery {
1528            /// Flow name
1529            pub flow_name: ObjectName,
1530            /// Output (sink) table name
1531            pub sink_table_name: ObjectName,
1532            /// Whether to replace existing task
1533            pub or_replace: bool,
1534            /// Create if not exist
1535            pub if_not_exists: bool,
1536            /// `EXPIRE AFTER`
1537            /// Duration in second as `i64`
1538            pub expire_after: Option<i64>,
1539            /// Comment string
1540            pub comment: Option<String>,
1541            /// Flow creation options
1542            pub flow_options: OptionMap,
1543        }
1544        let testcases = vec![
1545            (
1546                r"
1547CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1548SINK TO schema_1.table_1
1549EXPIRE AFTER INTERVAL '5 minutes'
1550COMMENT 'test comment'
1551AS
1552SELECT max(c1), min(c2) FROM schema_2.table_2;",
1553                CreateFlowWoutQuery {
1554                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1555                    sink_table_name: ObjectName::from(vec![
1556                        Ident::new("schema_1"),
1557                        Ident::new("table_1"),
1558                    ]),
1559                    or_replace: true,
1560                    if_not_exists: true,
1561                    expire_after: Some(300),
1562                    comment: Some("test comment".to_string()),
1563                    flow_options: OptionMap::default(),
1564                },
1565            ),
1566            (
1567                r"
1568CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1569SINK TO schema_1.table_1
1570EXPIRE AFTER INTERVAL '300 s'
1571COMMENT 'test comment'
1572AS
1573SELECT max(c1), min(c2) FROM schema_2.table_2;",
1574                CreateFlowWoutQuery {
1575                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1576                    sink_table_name: ObjectName::from(vec![
1577                        Ident::new("schema_1"),
1578                        Ident::new("table_1"),
1579                    ]),
1580                    or_replace: true,
1581                    if_not_exists: true,
1582                    expire_after: Some(300),
1583                    comment: Some("test comment".to_string()),
1584                    flow_options: OptionMap::default(),
1585                },
1586            ),
1587            (
1588                r"
1589CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1590SINK TO schema_1.table_1
1591EXPIRE AFTER '5 minutes'
1592COMMENT 'test comment'
1593AS
1594SELECT max(c1), min(c2) FROM schema_2.table_2;",
1595                CreateFlowWoutQuery {
1596                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1597                    sink_table_name: ObjectName::from(vec![
1598                        Ident::new("schema_1"),
1599                        Ident::new("table_1"),
1600                    ]),
1601                    or_replace: true,
1602                    if_not_exists: true,
1603                    expire_after: Some(300),
1604                    comment: Some("test comment".to_string()),
1605                    flow_options: OptionMap::default(),
1606                },
1607            ),
1608            (
1609                r"
1610CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1611SINK TO schema_1.table_1
1612EXPIRE AFTER '300 s'
1613COMMENT 'test comment'
1614AS
1615SELECT max(c1), min(c2) FROM schema_2.table_2;",
1616                CreateFlowWoutQuery {
1617                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1618                    sink_table_name: ObjectName::from(vec![
1619                        Ident::new("schema_1"),
1620                        Ident::new("table_1"),
1621                    ]),
1622                    or_replace: true,
1623                    if_not_exists: true,
1624                    expire_after: Some(300),
1625                    comment: Some("test comment".to_string()),
1626                    flow_options: OptionMap::default(),
1627                },
1628            ),
1629            (
1630                r"
1631CREATE FLOW `task_2`
1632SINK TO schema_1.table_1
1633EXPIRE AFTER '2 days 1h 2 min'
1634AS
1635SELECT max(c1), min(c2) FROM schema_2.table_2;",
1636                CreateFlowWoutQuery {
1637                    flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_2")]),
1638                    sink_table_name: ObjectName::from(vec![
1639                        Ident::new("schema_1"),
1640                        Ident::new("table_1"),
1641                    ]),
1642                    or_replace: false,
1643                    if_not_exists: false,
1644                    expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1645                    comment: None,
1646                    flow_options: OptionMap::default(),
1647                },
1648            ),
1649            (
1650                r"
1651create flow `task_3`
1652sink to schema_1.table_1
1653expire after '10 minutes'
1654as
1655select max(c1), min(c2) from schema_2.table_2;",
1656                CreateFlowWoutQuery {
1657                    flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_3")]),
1658                    sink_table_name: ObjectName::from(vec![
1659                        Ident::new("schema_1"),
1660                        Ident::new("table_1"),
1661                    ]),
1662                    or_replace: false,
1663                    if_not_exists: false,
1664                    expire_after: Some(600), // 10 minutes in seconds
1665                    comment: None,
1666                    flow_options: OptionMap::default(),
1667                },
1668            ),
1669            (
1670                r"
1671create or replace flow if not exists task_4
1672sink to schema_1.table_1
1673expire after interval '2 hours'
1674comment 'lowercase test'
1675as
1676select max(c1), min(c2) from schema_2.table_2;",
1677                CreateFlowWoutQuery {
1678                    flow_name: ObjectName::from(vec![Ident::new("task_4")]),
1679                    sink_table_name: ObjectName::from(vec![
1680                        Ident::new("schema_1"),
1681                        Ident::new("table_1"),
1682                    ]),
1683                    or_replace: true,
1684                    if_not_exists: true,
1685                    expire_after: Some(7200), // 2 hours in seconds
1686                    comment: Some("lowercase test".to_string()),
1687                    flow_options: OptionMap::default(),
1688                },
1689            ),
1690            (
1691                r"
1692CREATE FLOW task_5
1693SINK TO schema_1.table_1
1694WITH (defer_on_missing_source = 'true')
1695AS
1696SELECT max(c1), min(c2) FROM schema_2.table_2;",
1697                CreateFlowWoutQuery {
1698                    flow_name: ObjectName::from(vec![Ident::new("task_5")]),
1699                    sink_table_name: ObjectName::from(vec![
1700                        Ident::new("schema_1"),
1701                        Ident::new("table_1"),
1702                    ]),
1703                    or_replace: false,
1704                    if_not_exists: false,
1705                    expire_after: None,
1706                    comment: None,
1707                    flow_options: string_option_map([("defer_on_missing_source", "true")]),
1708                },
1709            ),
1710        ];
1711
1712        for (sql, expected) in testcases {
1713            let create_task = parse_create_flow(sql);
1714
1715            let expected = CreateFlow {
1716                flow_name: expected.flow_name,
1717                sink_table_name: expected.sink_table_name,
1718                or_replace: expected.or_replace,
1719                if_not_exists: expected.if_not_exists,
1720                expire_after: expected.expire_after,
1721                eval_interval: None,
1722                comment: expected.comment,
1723                flow_options: expected.flow_options,
1724                // ignore query parse result
1725                query: create_task.query.clone(),
1726            };
1727
1728            assert_eq!(create_task, expected, "input sql is:\n{sql}");
1729            let show_create = create_task.to_string();
1730            let recreated = parse_create_flow(&show_create);
1731            assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1732        }
1733    }
1734
1735    #[test]
1736    fn test_parse_create_flow() {
1737        use pretty_assertions::assert_eq;
1738        fn parse_create_flow(sql: &str) -> CreateFlow {
1739            let stmts = ParserContext::create_with_dialect(
1740                sql,
1741                &GreptimeDbDialect {},
1742                ParseOptions::default(),
1743            )
1744            .unwrap();
1745            assert_eq!(1, stmts.len());
1746            match &stmts[0] {
1747                Statement::CreateFlow(c) => c.clone(),
1748                _ => panic!("{:?}", stmts[0]),
1749            }
1750        }
1751        struct CreateFlowWoutQuery {
1752            /// Flow name
1753            pub flow_name: ObjectName,
1754            /// Output (sink) table name
1755            pub sink_table_name: ObjectName,
1756            /// Whether to replace existing task
1757            pub or_replace: bool,
1758            /// Create if not exist
1759            pub if_not_exists: bool,
1760            /// `EXPIRE AFTER`
1761            /// Duration in second as `i64`
1762            pub expire_after: Option<i64>,
1763            /// Duration for flow evaluation interval
1764            /// Duration in seconds as `i64`
1765            /// If not set, flow will be evaluated based on time window size and other args.
1766            pub eval_interval: Option<i64>,
1767            /// Comment string
1768            pub comment: Option<String>,
1769            /// Flow creation options
1770            pub flow_options: OptionMap,
1771        }
1772
1773        // create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE AFTER` and `COMMENT`
1774        let testcases = vec![
1775            (
1776                r"
1777CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1778SINK TO schema_1.table_1
1779EXPIRE AFTER INTERVAL '5 minutes'
1780COMMENT 'test comment'
1781AS
1782SELECT max(c1), min(c2) FROM schema_2.table_2;",
1783                CreateFlowWoutQuery {
1784                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1785                    sink_table_name: ObjectName(vec![
1786                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1787                        ObjectNamePart::Identifier(Ident::new("table_1")),
1788                    ]),
1789                    or_replace: true,
1790                    if_not_exists: true,
1791                    expire_after: Some(300),
1792                    eval_interval: None,
1793                    comment: Some("test comment".to_string()),
1794                    flow_options: OptionMap::default(),
1795                },
1796            ),
1797            (
1798                r"
1799CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1800SINK TO schema_1.table_1
1801EXPIRE AFTER INTERVAL '300 s'
1802COMMENT 'test comment'
1803AS
1804SELECT max(c1), min(c2) FROM schema_2.table_2;",
1805                CreateFlowWoutQuery {
1806                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1807                    sink_table_name: ObjectName(vec![
1808                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1809                        ObjectNamePart::Identifier(Ident::new("table_1")),
1810                    ]),
1811                    or_replace: true,
1812                    if_not_exists: true,
1813                    expire_after: Some(300),
1814                    eval_interval: None,
1815                    comment: Some("test comment".to_string()),
1816                    flow_options: OptionMap::default(),
1817                },
1818            ),
1819            (
1820                r"
1821CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1822SINK TO schema_1.table_1
1823EXPIRE AFTER '5 minutes'
1824EVAL INTERVAL '10 seconds'
1825COMMENT 'test comment'
1826AS
1827SELECT max(c1), min(c2) FROM schema_2.table_2;",
1828                CreateFlowWoutQuery {
1829                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1830                    sink_table_name: ObjectName(vec![
1831                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1832                        ObjectNamePart::Identifier(Ident::new("table_1")),
1833                    ]),
1834                    or_replace: true,
1835                    if_not_exists: true,
1836                    expire_after: Some(300),
1837                    eval_interval: Some(10),
1838                    comment: Some("test comment".to_string()),
1839                    flow_options: OptionMap::default(),
1840                },
1841            ),
1842            (
1843                r"
1844CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1845SINK TO schema_1.table_1
1846EXPIRE AFTER '5 minutes'
1847EVAL INTERVAL INTERVAL '10 seconds'
1848COMMENT 'test comment'
1849AS
1850SELECT max(c1), min(c2) FROM schema_2.table_2;",
1851                CreateFlowWoutQuery {
1852                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1853                    sink_table_name: ObjectName(vec![
1854                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1855                        ObjectNamePart::Identifier(Ident::new("table_1")),
1856                    ]),
1857                    or_replace: true,
1858                    if_not_exists: true,
1859                    expire_after: Some(300),
1860                    eval_interval: Some(10),
1861                    comment: Some("test comment".to_string()),
1862                    flow_options: OptionMap::default(),
1863                },
1864            ),
1865            (
1866                r"
1867CREATE FLOW `task_2`
1868SINK TO schema_1.table_1
1869EXPIRE AFTER '2 days 1h 2 min'
1870AS
1871SELECT max(c1), min(c2) FROM schema_2.table_2;",
1872                CreateFlowWoutQuery {
1873                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::with_quote(
1874                        '`', "task_2",
1875                    ))]),
1876                    sink_table_name: ObjectName(vec![
1877                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1878                        ObjectNamePart::Identifier(Ident::new("table_1")),
1879                    ]),
1880                    or_replace: false,
1881                    if_not_exists: false,
1882                    expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1883                    eval_interval: None,
1884                    comment: None,
1885                    flow_options: OptionMap::default(),
1886                },
1887            ),
1888            (
1889                r"
1890CREATE FLOW task_3
1891SINK TO schema_1.table_1
1892EVAL INTERVAL '10 seconds'
1893WITH (defer_on_missing_source = 'true', foo = 'bar')
1894AS
1895SELECT max(c1), min(c2) FROM schema_2.table_2;",
1896                CreateFlowWoutQuery {
1897                    flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_3"))]),
1898                    sink_table_name: ObjectName(vec![
1899                        ObjectNamePart::Identifier(Ident::new("schema_1")),
1900                        ObjectNamePart::Identifier(Ident::new("table_1")),
1901                    ]),
1902                    or_replace: false,
1903                    if_not_exists: false,
1904                    expire_after: None,
1905                    eval_interval: Some(10),
1906                    comment: None,
1907                    flow_options: string_option_map([
1908                        ("defer_on_missing_source", "true"),
1909                        ("foo", "bar"),
1910                    ]),
1911                },
1912            ),
1913        ];
1914
1915        for (sql, expected) in testcases {
1916            let create_task = parse_create_flow(sql);
1917
1918            let expected = CreateFlow {
1919                flow_name: expected.flow_name,
1920                sink_table_name: expected.sink_table_name,
1921                or_replace: expected.or_replace,
1922                if_not_exists: expected.if_not_exists,
1923                expire_after: expected.expire_after,
1924                eval_interval: expected.eval_interval,
1925                comment: expected.comment,
1926                flow_options: expected.flow_options,
1927                // ignore query parse result
1928                query: create_task.query.clone(),
1929            };
1930
1931            assert_eq!(create_task, expected, "input sql is:\n{sql}");
1932            let show_create = create_task.to_string();
1933            let recreated = parse_create_flow(&show_create);
1934            assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1935        }
1936    }
1937
1938    #[test]
1939    fn test_parse_create_flow_with_tql_cte_query() {
1940        let sql = r#"
1941CREATE FLOW calc_reqs_cte
1942SINK TO cnt_reqs_cte
1943EVAL INTERVAL '1m'
1944AS
1945WITH tql(the_timestamp, the_value) AS (
1946    TQL EVAL (now() - '1m'::interval, now(), '5s') metric
1947)
1948SELECT * FROM tql;
1949"#;
1950
1951        let stmts =
1952            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1953                .unwrap();
1954        assert_eq!(1, stmts.len());
1955        let Statement::CreateFlow(create_flow) = &stmts[0] else {
1956            panic!("unexpected stmt: {:?}", stmts[0]);
1957        };
1958
1959        let query = create_flow.query.to_string();
1960        assert!(query.to_uppercase().contains("WITH"));
1961        assert!(query.to_uppercase().contains("TQL EVAL"));
1962    }
1963
1964    #[test]
1965    fn test_parse_create_flow_with_sql_cte_is_unsupported() {
1966        let sql = r#"
1967CREATE FLOW f
1968SINK TO s
1969AS
1970WITH cte AS (SELECT 1) SELECT * FROM cte;
1971"#;
1972
1973        let err =
1974            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1975                .unwrap_err();
1976        let msg = err.to_string();
1977        assert!(msg.to_uppercase().contains("WITH"), "err: {msg}");
1978    }
1979
1980    #[test]
1981    fn test_parse_create_flow_with_tql_cte_requires_now_expr() {
1982        let sql = r#"
1983CREATE FLOW f
1984SINK TO s
1985EVAL INTERVAL '1m'
1986AS
1987WITH tql(ts, val) AS (
1988    TQL EVAL (0, 15, '5s') metric
1989)
1990SELECT * FROM tql;
1991"#;
1992
1993        let err =
1994            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1995                .unwrap_err();
1996
1997        let msg = format!("{err:?}");
1998        assert!(
1999            msg.contains("Expected expression containing `now()`"),
2000            "unexpected err: {msg}"
2001        );
2002    }
2003
2004    #[test]
2005    fn test_parse_create_flow_with_tql_cte_non_select_star_is_unsupported() {
2006        let sql = r#"
2007CREATE FLOW f
2008SINK TO s
2009AS
2010WITH tql(ts, val) AS (
2011    TQL EVAL (now() - '1m'::interval, now(), '5s') metric
2012)
2013SELECT ts FROM tql;
2014"#;
2015
2016        let err =
2017            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2018                .unwrap_err();
2019        assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
2020    }
2021
2022    #[test]
2023    fn test_parse_create_flow_with_tql_cte_filter_is_unsupported() {
2024        let sql = r#"
2025CREATE FLOW f
2026SINK TO s
2027AS
2028WITH tql(ts, val) AS (
2029    TQL EVAL (now() - '1m'::interval, now(), '5s') metric
2030)
2031SELECT * FROM tql WHERE ts > 0;
2032"#;
2033
2034        let err =
2035            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2036                .unwrap_err();
2037        assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
2038    }
2039
2040    #[test]
2041    fn test_parse_create_flow_with_mixed_sql_tql_cte_is_unsupported() {
2042        let sql = r#"
2043CREATE FLOW f
2044SINK TO s
2045AS
2046WITH s1 AS (SELECT 1),
2047     tql(ts, val) AS (TQL EVAL (now() - '1m'::interval, now(), '5s') metric)
2048SELECT * FROM tql;
2049"#;
2050
2051        let err =
2052            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2053                .unwrap_err();
2054        assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
2055    }
2056
2057    #[test]
2058    fn test_create_flow_no_month() {
2059        let sql = r"
2060CREATE FLOW `task_2`
2061SINK TO schema_1.table_1
2062EXPIRE AFTER '1 month 2 days 1h 2 min'
2063AS
2064SELECT max(c1), min(c2) FROM schema_2.table_2;";
2065        let stmts =
2066            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2067
2068        assert!(
2069            stmts.is_err()
2070                && stmts
2071                    .unwrap_err()
2072                    .to_string()
2073                    .contains("Interval with months is not allowed")
2074        );
2075    }
2076
2077    #[test]
2078    fn test_validate_create() {
2079        let sql = r"
2080CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp TIME INDEX)
2081PARTITION ON COLUMNS(c, a) (
2082    a < 10,
2083    a > 10 AND a < 20,
2084    a > 20 AND c < 100,
2085    a > 20 AND c >= 100
2086)
2087ENGINE=mito";
2088        let result =
2089            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2090        let _ = result.unwrap();
2091
2092        let sql = r"
2093CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2094PARTITION ON COLUMNS(x) ()
2095ENGINE=mito";
2096        let result =
2097            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2098        assert!(
2099            result
2100                .unwrap_err()
2101                .to_string()
2102                .contains("Partition column \"x\" not defined")
2103        );
2104    }
2105
2106    #[test]
2107    fn test_parse_create_table_with_partitions() {
2108        let sql = r"
2109CREATE TABLE monitor (
2110  host_id    INT,
2111  idc        STRING,
2112  ts         TIMESTAMP,
2113  cpu        DOUBLE DEFAULT 0,
2114  memory     DOUBLE,
2115  TIME INDEX (ts),
2116  PRIMARY KEY (host),
2117)
2118PARTITION ON COLUMNS(idc, host_id) (
2119  idc <= 'hz' AND host_id < 1000,
2120  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
2121  idc > 'sh' AND host_id < 3000,
2122  idc > 'sh' AND host_id >= 3000,
2123)
2124ENGINE=mito";
2125        let result =
2126            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2127                .unwrap();
2128        assert_eq!(result.len(), 1);
2129        match &result[0] {
2130            Statement::CreateTable(c) => {
2131                assert!(c.partitions.is_some());
2132
2133                let partitions = c.partitions.as_ref().unwrap();
2134                let column_list = partitions
2135                    .column_list
2136                    .iter()
2137                    .map(|x| &x.value)
2138                    .collect::<Vec<&String>>();
2139                assert_eq!(column_list, vec!["idc", "host_id"]);
2140
2141                let exprs = &partitions.exprs;
2142
2143                assert_eq!(
2144                    exprs[0],
2145                    Expr::BinaryOp {
2146                        left: Box::new(Expr::BinaryOp {
2147                            left: Box::new(Expr::Identifier("idc".into())),
2148                            op: BinaryOperator::LtEq,
2149                            right: Box::new(Expr::Value(
2150                                Value::SingleQuotedString("hz".to_string()).into()
2151                            ))
2152                        }),
2153                        op: BinaryOperator::And,
2154                        right: Box::new(Expr::BinaryOp {
2155                            left: Box::new(Expr::Identifier("host_id".into())),
2156                            op: BinaryOperator::Lt,
2157                            right: Box::new(Expr::Value(
2158                                Value::Number("1000".to_string(), false).into()
2159                            ))
2160                        })
2161                    }
2162                );
2163                assert_eq!(
2164                    exprs[1],
2165                    Expr::BinaryOp {
2166                        left: Box::new(Expr::BinaryOp {
2167                            left: Box::new(Expr::BinaryOp {
2168                                left: Box::new(Expr::Identifier("idc".into())),
2169                                op: BinaryOperator::Gt,
2170                                right: Box::new(Expr::Value(
2171                                    Value::SingleQuotedString("hz".to_string()).into()
2172                                ))
2173                            }),
2174                            op: BinaryOperator::And,
2175                            right: Box::new(Expr::BinaryOp {
2176                                left: Box::new(Expr::Identifier("idc".into())),
2177                                op: BinaryOperator::LtEq,
2178                                right: Box::new(Expr::Value(
2179                                    Value::SingleQuotedString("sh".to_string()).into()
2180                                ))
2181                            })
2182                        }),
2183                        op: BinaryOperator::And,
2184                        right: Box::new(Expr::BinaryOp {
2185                            left: Box::new(Expr::Identifier("host_id".into())),
2186                            op: BinaryOperator::Lt,
2187                            right: Box::new(Expr::Value(
2188                                Value::Number("2000".to_string(), false).into()
2189                            ))
2190                        })
2191                    }
2192                );
2193                assert_eq!(
2194                    exprs[2],
2195                    Expr::BinaryOp {
2196                        left: Box::new(Expr::BinaryOp {
2197                            left: Box::new(Expr::Identifier("idc".into())),
2198                            op: BinaryOperator::Gt,
2199                            right: Box::new(Expr::Value(
2200                                Value::SingleQuotedString("sh".to_string()).into()
2201                            ))
2202                        }),
2203                        op: BinaryOperator::And,
2204                        right: Box::new(Expr::BinaryOp {
2205                            left: Box::new(Expr::Identifier("host_id".into())),
2206                            op: BinaryOperator::Lt,
2207                            right: Box::new(Expr::Value(
2208                                Value::Number("3000".to_string(), false).into()
2209                            ))
2210                        })
2211                    }
2212                );
2213                assert_eq!(
2214                    exprs[3],
2215                    Expr::BinaryOp {
2216                        left: Box::new(Expr::BinaryOp {
2217                            left: Box::new(Expr::Identifier("idc".into())),
2218                            op: BinaryOperator::Gt,
2219                            right: Box::new(Expr::Value(
2220                                Value::SingleQuotedString("sh".to_string()).into()
2221                            ))
2222                        }),
2223                        op: BinaryOperator::And,
2224                        right: Box::new(Expr::BinaryOp {
2225                            left: Box::new(Expr::Identifier("host_id".into())),
2226                            op: BinaryOperator::GtEq,
2227                            right: Box::new(Expr::Value(
2228                                Value::Number("3000".to_string(), false).into()
2229                            ))
2230                        })
2231                    }
2232                );
2233            }
2234            _ => unreachable!(),
2235        }
2236    }
2237
2238    #[test]
2239    fn test_parse_create_table_with_quoted_partitions() {
2240        let sql = r"
2241CREATE TABLE monitor (
2242  `host_id`    INT,
2243  idc        STRING,
2244  ts         TIMESTAMP,
2245  cpu        DOUBLE DEFAULT 0,
2246  memory     DOUBLE,
2247  TIME INDEX (ts),
2248  PRIMARY KEY (host),
2249)
2250PARTITION ON COLUMNS(IdC, host_id) (
2251  idc <= 'hz' AND host_id < 1000,
2252  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
2253  idc > 'sh' AND host_id < 3000,
2254  idc > 'sh' AND host_id >= 3000,
2255)";
2256        let result =
2257            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2258                .unwrap();
2259        assert_eq!(result.len(), 1);
2260    }
2261
2262    #[test]
2263    fn test_parse_create_table_with_timestamp_index() {
2264        let sql1 = r"
2265CREATE TABLE monitor (
2266  host_id    INT,
2267  idc        STRING,
2268  ts         TIMESTAMP TIME INDEX,
2269  cpu        DOUBLE DEFAULT 0,
2270  memory     DOUBLE,
2271  PRIMARY KEY (host),
2272)
2273ENGINE=mito";
2274        let result1 = ParserContext::create_with_dialect(
2275            sql1,
2276            &GreptimeDbDialect {},
2277            ParseOptions::default(),
2278        )
2279        .unwrap();
2280
2281        if let Statement::CreateTable(c) = &result1[0] {
2282            assert_eq!(c.constraints.len(), 2);
2283            let tc = c.constraints[0].clone();
2284            match tc {
2285                TableConstraint::TimeIndex { column } => {
2286                    assert_eq!(&column.value, "ts");
2287                }
2288                _ => panic!("should be time index constraint"),
2289            };
2290        } else {
2291            panic!("should be create_table statement");
2292        }
2293
2294        // `TIME INDEX` should be in front of `PRIMARY KEY`
2295        // in order to equal the `TIMESTAMP TIME INDEX` constraint options vector
2296        let sql2 = r"
2297CREATE TABLE monitor (
2298  host_id    INT,
2299  idc        STRING,
2300  ts         TIMESTAMP NOT NULL,
2301  cpu        DOUBLE DEFAULT 0,
2302  memory     DOUBLE,
2303  TIME INDEX (ts),
2304  PRIMARY KEY (host),
2305)
2306ENGINE=mito";
2307        let result2 = ParserContext::create_with_dialect(
2308            sql2,
2309            &GreptimeDbDialect {},
2310            ParseOptions::default(),
2311        )
2312        .unwrap();
2313
2314        assert_eq!(result1, result2);
2315
2316        // TIMESTAMP can be NULL which is not equal to above
2317        let sql3 = r"
2318CREATE TABLE monitor (
2319  host_id    INT,
2320  idc        STRING,
2321  ts         TIMESTAMP,
2322  cpu        DOUBLE DEFAULT 0,
2323  memory     DOUBLE,
2324  TIME INDEX (ts),
2325  PRIMARY KEY (host),
2326)
2327ENGINE=mito";
2328
2329        let result3 = ParserContext::create_with_dialect(
2330            sql3,
2331            &GreptimeDbDialect {},
2332            ParseOptions::default(),
2333        )
2334        .unwrap();
2335
2336        assert_ne!(result1, result3);
2337
2338        // BIGINT can't be time index any more
2339        let sql1 = r"
2340CREATE TABLE monitor (
2341  host_id    INT,
2342  idc        STRING,
2343  b          bigint TIME INDEX,
2344  cpu        DOUBLE DEFAULT 0,
2345  memory     DOUBLE,
2346  PRIMARY KEY (host),
2347)
2348ENGINE=mito";
2349        let result1 = ParserContext::create_with_dialect(
2350            sql1,
2351            &GreptimeDbDialect {},
2352            ParseOptions::default(),
2353        );
2354
2355        assert!(
2356            result1
2357                .unwrap_err()
2358                .to_string()
2359                .contains("time index column data type should be timestamp")
2360        );
2361    }
2362
2363    #[test]
2364    fn test_parse_create_table_with_timestamp_index_not_null() {
2365        let sql = r"
2366CREATE TABLE monitor (
2367  host_id    INT,
2368  idc        STRING,
2369  ts         TIMESTAMP TIME INDEX,
2370  cpu        DOUBLE DEFAULT 0,
2371  memory     DOUBLE,
2372  TIME INDEX (ts),
2373  PRIMARY KEY (host),
2374)
2375ENGINE=mito";
2376        let result =
2377            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2378                .unwrap();
2379
2380        assert_eq!(result.len(), 1);
2381        if let Statement::CreateTable(c) = &result[0] {
2382            let ts = c.columns[2].clone();
2383            assert_eq!(ts.name().to_string(), "ts");
2384            assert_eq!(ts.options()[0].option, NotNull);
2385        } else {
2386            panic!("should be create table statement");
2387        }
2388
2389        let sql1 = r"
2390CREATE TABLE monitor (
2391  host_id    INT,
2392  idc        STRING,
2393  ts         TIMESTAMP NOT NULL TIME INDEX,
2394  cpu        DOUBLE DEFAULT 0,
2395  memory     DOUBLE,
2396  TIME INDEX (ts),
2397  PRIMARY KEY (host),
2398)
2399ENGINE=mito";
2400
2401        let result1 = ParserContext::create_with_dialect(
2402            sql1,
2403            &GreptimeDbDialect {},
2404            ParseOptions::default(),
2405        )
2406        .unwrap();
2407        assert_eq!(result, result1);
2408
2409        let sql2 = r"
2410CREATE TABLE monitor (
2411  host_id    INT,
2412  idc        STRING,
2413  ts         TIMESTAMP TIME INDEX NOT NULL,
2414  cpu        DOUBLE DEFAULT 0,
2415  memory     DOUBLE,
2416  TIME INDEX (ts),
2417  PRIMARY KEY (host),
2418)
2419ENGINE=mito";
2420
2421        let result2 = ParserContext::create_with_dialect(
2422            sql2,
2423            &GreptimeDbDialect {},
2424            ParseOptions::default(),
2425        )
2426        .unwrap();
2427        assert_eq!(result, result2);
2428
2429        let sql3 = r"
2430CREATE TABLE monitor (
2431  host_id    INT,
2432  idc        STRING,
2433  ts         TIMESTAMP TIME INDEX NULL NOT,
2434  cpu        DOUBLE DEFAULT 0,
2435  memory     DOUBLE,
2436  TIME INDEX (ts),
2437  PRIMARY KEY (host),
2438)
2439ENGINE=mito";
2440
2441        let result3 = ParserContext::create_with_dialect(
2442            sql3,
2443            &GreptimeDbDialect {},
2444            ParseOptions::default(),
2445        );
2446        assert!(result3.is_err());
2447
2448        let sql4 = r"
2449CREATE TABLE monitor (
2450  host_id    INT,
2451  idc        STRING,
2452  ts         TIMESTAMP TIME INDEX NOT NULL NULL,
2453  cpu        DOUBLE DEFAULT 0,
2454  memory     DOUBLE,
2455  TIME INDEX (ts),
2456  PRIMARY KEY (host),
2457)
2458ENGINE=mito";
2459
2460        let result4 = ParserContext::create_with_dialect(
2461            sql4,
2462            &GreptimeDbDialect {},
2463            ParseOptions::default(),
2464        );
2465        assert!(result4.is_err());
2466
2467        let sql = r"
2468CREATE TABLE monitor (
2469  host_id    INT,
2470  idc        STRING,
2471  ts         TIMESTAMP TIME INDEX DEFAULT CURRENT_TIMESTAMP,
2472  cpu        DOUBLE DEFAULT 0,
2473  memory     DOUBLE,
2474  TIME INDEX (ts),
2475  PRIMARY KEY (host),
2476)
2477ENGINE=mito";
2478
2479        let result =
2480            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2481                .unwrap();
2482
2483        if let Statement::CreateTable(c) = &result[0] {
2484            let tc = c.constraints[0].clone();
2485            match tc {
2486                TableConstraint::TimeIndex { column } => {
2487                    assert_eq!(&column.value, "ts");
2488                }
2489                _ => panic!("should be time index constraint"),
2490            }
2491            let ts = c.columns[2].clone();
2492            assert_eq!(ts.name().to_string(), "ts");
2493            assert!(matches!(ts.options()[0].option, ColumnOption::Default(..)));
2494            assert_eq!(ts.options()[1].option, NotNull);
2495        } else {
2496            unreachable!("should be create table statement");
2497        }
2498    }
2499
2500    #[test]
2501    fn test_parse_partitions_with_error_syntax() {
2502        let sql = r"
2503CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2504PARTITION COLUMNS(c, a) (
2505    a < 10,
2506    a > 10 AND a < 20,
2507    a > 20 AND c < 100,
2508    a > 20 AND c >= 100
2509)
2510ENGINE=mito";
2511        let result =
2512            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2513        assert!(
2514            result
2515                .unwrap_err()
2516                .output_msg()
2517                .contains("sql parser error: Expected: ON, found: COLUMNS")
2518        );
2519    }
2520
2521    #[test]
2522    fn test_parse_partitions_without_rule() {
2523        let sql = r"
2524CREATE TABLE rcx ( a INT, b STRING, c INT, d TIMESTAMP TIME INDEX )
2525PARTITION ON COLUMNS(c, a) ()
2526ENGINE=mito";
2527        ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2528            .unwrap();
2529    }
2530
2531    #[test]
2532    fn test_parse_partitions_unreferenced_column() {
2533        let sql = r"
2534CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2535PARTITION ON COLUMNS(c, a) (
2536    b = 'foo'
2537)
2538ENGINE=mito";
2539        let result =
2540            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2541        assert_eq!(
2542            result.unwrap_err().output_msg(),
2543            "Invalid SQL, error: Column \"b\" in rule expr is not referenced in PARTITION ON"
2544        );
2545    }
2546
2547    #[test]
2548    fn test_parse_partitions_not_binary_expr() {
2549        let sql = r"
2550CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2551PARTITION ON COLUMNS(c, a) (
2552    b
2553)
2554ENGINE=mito";
2555        let result =
2556            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2557        assert_eq!(
2558            result.unwrap_err().output_msg(),
2559            r#"Invalid SQL, error: Partition rule expr Identifier(Ident { value: "b", quote_style: None, span: Span(Location(4,5)..Location(4,6)) }) is not a binary expr"#
2560        );
2561    }
2562
2563    fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {
2564        assert_eq!(column.name.to_string(), name);
2565        assert_eq!(column.data_type.to_string(), data_type);
2566    }
2567
2568    #[test]
2569    pub fn test_parse_create_table() {
2570        let sql = r"create table demo(
2571                             host string,
2572                             ts timestamp,
2573                             cpu float32 default 0,
2574                             memory float64,
2575                             TIME INDEX (ts),
2576                             PRIMARY KEY(ts, host),
2577                             ) engine=mito
2578                             with(ttl='10s');
2579         ";
2580        let result =
2581            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2582                .unwrap();
2583        assert_eq!(1, result.len());
2584        match &result[0] {
2585            Statement::CreateTable(c) => {
2586                assert!(!c.if_not_exists);
2587                assert_eq!("demo", c.name.to_string());
2588                assert_eq!("mito", c.engine);
2589                assert_eq!(4, c.columns.len());
2590                let columns = &c.columns;
2591                assert_column_def(&columns[0].column_def, "host", "STRING");
2592                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
2593                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
2594                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
2595
2596                let constraints = &c.constraints;
2597                assert_eq!(
2598                    &constraints[0],
2599                    &TableConstraint::TimeIndex {
2600                        column: Ident::new("ts"),
2601                    }
2602                );
2603                assert_eq!(
2604                    &constraints[1],
2605                    &TableConstraint::PrimaryKey {
2606                        columns: vec![Ident::new("ts"), Ident::new("host")]
2607                    }
2608                );
2609                // inverted index is merged into column options
2610                assert_eq!(1, c.options.len());
2611                assert_eq!(
2612                    [("ttl", "10s")].into_iter().collect::<HashMap<_, _>>(),
2613                    c.options.to_str_map()
2614                );
2615            }
2616            _ => unreachable!(),
2617        }
2618    }
2619
2620    #[test]
2621    fn test_invalid_index_keys() {
2622        let sql = r"create table demo(
2623                             host string,
2624                             ts int64,
2625                             cpu float64 default 0,
2626                             memory float64,
2627                             TIME INDEX (ts, host),
2628                             PRIMARY KEY(ts, host)) engine=mito;
2629         ";
2630        let result =
2631            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2632        assert!(result.is_err());
2633        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2634    }
2635
2636    #[test]
2637    fn test_duplicated_time_index() {
2638        let sql = r"create table demo(
2639                             host string,
2640                             ts timestamp time index,
2641                             t timestamp time index,
2642                             cpu float64 default 0,
2643                             memory float64,
2644                             TIME INDEX (ts, host),
2645                             PRIMARY KEY(ts, host)) engine=mito;
2646         ";
2647        let result =
2648            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2649        assert!(result.is_err());
2650        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2651
2652        let sql = r"create table demo(
2653                             host string,
2654                             ts timestamp time index,
2655                             cpu float64 default 0,
2656                             t timestamp,
2657                             memory float64,
2658                             TIME INDEX (t),
2659                             PRIMARY KEY(ts, host)) engine=mito;
2660         ";
2661        let result =
2662            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2663        assert!(result.is_err());
2664        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2665    }
2666
2667    #[test]
2668    fn test_invalid_column_name() {
2669        let sql = "create table foo(user string, i timestamp time index)";
2670        let result =
2671            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2672        let err = result.unwrap_err().output_msg();
2673        assert!(err.contains("Cannot use keyword 'user' as column name"));
2674
2675        // If column name is quoted, it's valid even same with keyword.
2676        let sql = r#"
2677            create table foo("user" string, i timestamp time index)
2678        "#;
2679        let result =
2680            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2681        let _ = result.unwrap();
2682    }
2683
2684    #[test]
2685    fn test_incorrect_default_value_issue_3479() {
2686        let sql = r#"CREATE TABLE `ExcePTuRi`(
2687non TIMESTAMP(6) TIME INDEX,
2688`iUSTO` DOUBLE DEFAULT 0.047318541668048164
2689)"#;
2690        let result =
2691            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2692                .unwrap();
2693        assert_eq!(1, result.len());
2694        match &result[0] {
2695            Statement::CreateTable(c) => {
2696                assert_eq!(
2697                    "`iUSTO` DOUBLE DEFAULT 0.047318541668048164",
2698                    c.columns[1].to_string()
2699                );
2700            }
2701            _ => unreachable!(),
2702        }
2703    }
2704
2705    #[test]
2706    fn test_parse_create_view() {
2707        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
2708
2709        let result =
2710            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2711                .unwrap();
2712        match &result[0] {
2713            Statement::CreateView(c) => {
2714                assert_eq!(c.to_string(), sql);
2715                assert!(!c.or_replace);
2716                assert!(!c.if_not_exists);
2717                assert_eq!("test", c.name.to_string());
2718            }
2719            _ => unreachable!(),
2720        }
2721
2722        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS";
2723
2724        let result =
2725            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2726                .unwrap();
2727        match &result[0] {
2728            Statement::CreateView(c) => {
2729                assert_eq!(c.to_string(), sql);
2730                assert!(c.or_replace);
2731                assert!(c.if_not_exists);
2732                assert_eq!("test", c.name.to_string());
2733            }
2734            _ => unreachable!(),
2735        }
2736    }
2737
2738    #[test]
2739    fn test_parse_create_view_invalid_query() {
2740        let sql = "CREATE VIEW test AS DELETE from demo";
2741        let result =
2742            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2743        assert!(result.is_ok_and(|x| x.len() == 1));
2744    }
2745
2746    #[test]
2747    fn test_parse_create_table_fulltext_options() {
2748        let sql1 = r"
2749CREATE TABLE log (
2750    ts TIMESTAMP TIME INDEX,
2751    msg TEXT FULLTEXT INDEX,
2752)";
2753        let result1 = ParserContext::create_with_dialect(
2754            sql1,
2755            &GreptimeDbDialect {},
2756            ParseOptions::default(),
2757        )
2758        .unwrap();
2759
2760        if let Statement::CreateTable(c) = &result1[0] {
2761            c.columns.iter().for_each(|col| {
2762                if col.name().value == "msg" {
2763                    assert!(
2764                        col.extensions
2765                            .fulltext_index_options
2766                            .as_ref()
2767                            .unwrap()
2768                            .is_empty()
2769                    );
2770                }
2771            });
2772        } else {
2773            panic!("should be create_table statement");
2774        }
2775
2776        let sql2 = r"
2777CREATE TABLE log (
2778    ts TIMESTAMP TIME INDEX,
2779    msg STRING FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false')
2780)";
2781        let result2 = ParserContext::create_with_dialect(
2782            sql2,
2783            &GreptimeDbDialect {},
2784            ParseOptions::default(),
2785        )
2786        .unwrap();
2787
2788        if let Statement::CreateTable(c) = &result2[0] {
2789            c.columns.iter().for_each(|col| {
2790                if col.name().value == "msg" {
2791                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2792                    assert_eq!(options.len(), 2);
2793                    assert_eq!(options.get("analyzer").unwrap(), "English");
2794                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2795                }
2796            });
2797        } else {
2798            panic!("should be create_table statement");
2799        }
2800
2801        let sql3 = r"
2802CREATE TABLE log (
2803    ts TIMESTAMP TIME INDEX,
2804    msg1 TINYTEXT FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false'),
2805    msg2 CHAR(20) FULLTEXT INDEX WITH (analyzer='Chinese', case_sensitive='true')
2806)";
2807        let result3 = ParserContext::create_with_dialect(
2808            sql3,
2809            &GreptimeDbDialect {},
2810            ParseOptions::default(),
2811        )
2812        .unwrap();
2813
2814        if let Statement::CreateTable(c) = &result3[0] {
2815            c.columns.iter().for_each(|col| {
2816                if col.name().value == "msg1" {
2817                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2818                    assert_eq!(options.len(), 2);
2819                    assert_eq!(options.get("analyzer").unwrap(), "English");
2820                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2821                } else if col.name().value == "msg2" {
2822                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2823                    assert_eq!(options.len(), 2);
2824                    assert_eq!(options.get("analyzer").unwrap(), "Chinese");
2825                    assert_eq!(options.get("case_sensitive").unwrap(), "true");
2826                }
2827            });
2828        } else {
2829            panic!("should be create_table statement");
2830        }
2831    }
2832
2833    #[test]
2834    fn test_parse_create_table_fulltext_options_invalid_type() {
2835        let sql = r"
2836CREATE TABLE log (
2837    ts TIMESTAMP TIME INDEX,
2838    msg INT FULLTEXT INDEX,
2839)";
2840        let result =
2841            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2842        assert!(result.is_err());
2843        assert!(
2844            result
2845                .unwrap_err()
2846                .to_string()
2847                .contains("FULLTEXT index only supports string type")
2848        );
2849    }
2850
2851    #[test]
2852    fn test_parse_create_table_fulltext_options_duplicate() {
2853        let sql = r"
2854CREATE TABLE log (
2855    ts TIMESTAMP TIME INDEX,
2856    msg STRING FULLTEXT INDEX WITH (analyzer='English', analyzer='Chinese') FULLTEXT INDEX WITH (case_sensitive='false')
2857)";
2858        let result =
2859            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2860        assert!(result.is_err());
2861        assert!(
2862            result
2863                .unwrap_err()
2864                .to_string()
2865                .contains("duplicated FULLTEXT INDEX option")
2866        );
2867    }
2868
2869    #[test]
2870    fn test_parse_create_table_fulltext_options_invalid_option() {
2871        let sql = r"
2872CREATE TABLE log (
2873    ts TIMESTAMP TIME INDEX,
2874    msg STRING FULLTEXT INDEX WITH (analyzer='English', invalid_option='Chinese')
2875)";
2876        let result =
2877            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2878        assert!(result.is_err());
2879        assert!(
2880            result
2881                .unwrap_err()
2882                .to_string()
2883                .contains("invalid FULLTEXT INDEX option")
2884        );
2885    }
2886
2887    #[test]
2888    fn test_parse_create_table_skip_options() {
2889        let sql = r"
2890CREATE TABLE log (
2891    ts TIMESTAMP TIME INDEX,
2892    msg INT SKIPPING INDEX WITH (granularity='8192', type='bloom'),
2893)";
2894        let result =
2895            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2896                .unwrap();
2897
2898        if let Statement::CreateTable(c) = &result[0] {
2899            c.columns.iter().for_each(|col| {
2900                if col.name().value == "msg" {
2901                    assert!(
2902                        !col.extensions
2903                            .skipping_index_options
2904                            .as_ref()
2905                            .unwrap()
2906                            .is_empty()
2907                    );
2908                }
2909            });
2910        } else {
2911            panic!("should be create_table statement");
2912        }
2913
2914        let sql = r"
2915        CREATE TABLE log (
2916            ts TIMESTAMP TIME INDEX,
2917            msg INT SKIPPING INDEX,
2918        )";
2919        let result =
2920            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2921                .unwrap();
2922
2923        if let Statement::CreateTable(c) = &result[0] {
2924            c.columns.iter().for_each(|col| {
2925                if col.name().value == "msg" {
2926                    assert!(
2927                        col.extensions
2928                            .skipping_index_options
2929                            .as_ref()
2930                            .unwrap()
2931                            .is_empty()
2932                    );
2933                }
2934            });
2935        } else {
2936            panic!("should be create_table statement");
2937        }
2938    }
2939
2940    #[test]
2941    fn test_parse_create_view_with_columns() {
2942        let sql = "CREATE VIEW test () AS SELECT * FROM NUMBERS";
2943        let result =
2944            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2945                .unwrap();
2946
2947        match &result[0] {
2948            Statement::CreateView(c) => {
2949                assert_eq!(c.to_string(), "CREATE VIEW test AS SELECT * FROM NUMBERS");
2950                assert!(!c.or_replace);
2951                assert!(!c.if_not_exists);
2952                assert_eq!("test", c.name.to_string());
2953            }
2954            _ => unreachable!(),
2955        }
2956        assert_eq!(
2957            "CREATE VIEW test AS SELECT * FROM NUMBERS",
2958            result[0].to_string()
2959        );
2960
2961        let sql = "CREATE VIEW test (n1) AS SELECT * FROM NUMBERS";
2962        let result =
2963            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2964                .unwrap();
2965
2966        match &result[0] {
2967            Statement::CreateView(c) => {
2968                assert_eq!(c.to_string(), sql);
2969                assert!(!c.or_replace);
2970                assert!(!c.if_not_exists);
2971                assert_eq!("test", c.name.to_string());
2972            }
2973            _ => unreachable!(),
2974        }
2975        assert_eq!(sql, result[0].to_string());
2976
2977        let sql = "CREATE VIEW test (n1, n2) AS SELECT * FROM NUMBERS";
2978        let result =
2979            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2980                .unwrap();
2981
2982        match &result[0] {
2983            Statement::CreateView(c) => {
2984                assert_eq!(c.to_string(), sql);
2985                assert!(!c.or_replace);
2986                assert!(!c.if_not_exists);
2987                assert_eq!("test", c.name.to_string());
2988            }
2989            _ => unreachable!(),
2990        }
2991        assert_eq!(sql, result[0].to_string());
2992
2993        // Some invalid syntax cases
2994        let sql = "CREATE VIEW test (n1 AS select * from demo";
2995        let result =
2996            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2997        assert!(result.is_err());
2998
2999        let sql = "CREATE VIEW test (n1, AS select * from demo";
3000        let result =
3001            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3002        assert!(result.is_err());
3003
3004        let sql = "CREATE VIEW test n1,n2) AS select * from demo";
3005        let result =
3006            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3007        assert!(result.is_err());
3008
3009        let sql = "CREATE VIEW test (1) AS select * from demo";
3010        let result =
3011            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3012        assert!(result.is_err());
3013
3014        // keyword
3015        let sql = "CREATE VIEW test (n1, select) AS select * from demo";
3016        let result =
3017            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3018        assert!(result.is_err());
3019    }
3020
3021    #[test]
3022    fn test_parse_column_extensions_vector() {
3023        // Test that vector options are parsed from data_type (no additional SQL needed)
3024        let sql = "";
3025        let dialect = GenericDialect {};
3026        let mut tokenizer = Tokenizer::new(&dialect, sql);
3027        let tokens = tokenizer.tokenize().unwrap();
3028        let mut parser = Parser::new(&dialect).with_tokens(tokens);
3029        let name = Ident::new("vec_col");
3030        let data_type =
3031            DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
3032        let mut extensions = ColumnExtensions::default();
3033
3034        let result =
3035            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
3036        assert!(result.is_ok());
3037        assert!(extensions.vector_options.is_some());
3038        let vector_options = extensions.vector_options.unwrap();
3039        assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some("128"));
3040    }
3041
3042    #[test]
3043    fn test_parse_column_extensions_vector_invalid() {
3044        // Test that vector with no dimension fails
3045        let sql = "";
3046        let dialect = GenericDialect {};
3047        let mut tokenizer = Tokenizer::new(&dialect, sql);
3048        let tokens = tokenizer.tokenize().unwrap();
3049        let mut parser = Parser::new(&dialect).with_tokens(tokens);
3050        let name = Ident::new("vec_col");
3051        let data_type = DataType::Custom(vec![Ident::new("VECTOR")].into(), vec![]);
3052        let mut extensions = ColumnExtensions::default();
3053
3054        let result =
3055            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
3056        assert!(result.is_err());
3057    }
3058
3059    #[test]
3060    fn test_parse_column_extensions_indices() {
3061        // Test skipping index
3062        {
3063            let sql = "SKIPPING INDEX";
3064            let dialect = GenericDialect {};
3065            let mut tokenizer = Tokenizer::new(&dialect, sql);
3066            let tokens = tokenizer.tokenize().unwrap();
3067            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3068            let name = Ident::new("col");
3069            let data_type = DataType::String(None);
3070            let mut extensions = ColumnExtensions::default();
3071            let result = ParserContext::parse_column_extensions(
3072                &mut parser,
3073                &name,
3074                &data_type,
3075                &mut extensions,
3076            );
3077            assert!(result.is_ok());
3078            assert!(extensions.skipping_index_options.is_some());
3079        }
3080
3081        // Test fulltext index with options
3082        {
3083            let sql = "FULLTEXT INDEX WITH (analyzer = 'English', case_sensitive = 'true')";
3084            let dialect = GenericDialect {};
3085            let mut tokenizer = Tokenizer::new(&dialect, sql);
3086            let tokens = tokenizer.tokenize().unwrap();
3087            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3088            let name = Ident::new("text_col");
3089            let data_type = DataType::String(None);
3090            let mut extensions = ColumnExtensions::default();
3091            let result = ParserContext::parse_column_extensions(
3092                &mut parser,
3093                &name,
3094                &data_type,
3095                &mut extensions,
3096            );
3097            assert!(result.unwrap());
3098            assert!(extensions.fulltext_index_options.is_some());
3099            let fulltext_options = extensions.fulltext_index_options.unwrap();
3100            assert_eq!(fulltext_options.get("analyzer"), Some("English"));
3101            assert_eq!(fulltext_options.get("case_sensitive"), Some("true"));
3102        }
3103
3104        // Test fulltext index with invalid type (should fail)
3105        {
3106            let sql = "FULLTEXT INDEX WITH (analyzer = 'English')";
3107            let dialect = GenericDialect {};
3108            let mut tokenizer = Tokenizer::new(&dialect, sql);
3109            let tokens = tokenizer.tokenize().unwrap();
3110            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3111            let name = Ident::new("num_col");
3112            let data_type = DataType::Int(None); // Non-string type
3113            let mut extensions = ColumnExtensions::default();
3114            let result = ParserContext::parse_column_extensions(
3115                &mut parser,
3116                &name,
3117                &data_type,
3118                &mut extensions,
3119            );
3120            assert!(result.is_err());
3121            assert!(
3122                result
3123                    .unwrap_err()
3124                    .to_string()
3125                    .contains("FULLTEXT index only supports string type")
3126            );
3127        }
3128
3129        // Test fulltext index with invalid option (won't fail, the parser doesn't check the option's content)
3130        {
3131            let sql = "FULLTEXT INDEX WITH (analyzer = 'Invalid', case_sensitive = 'true')";
3132            let dialect = GenericDialect {};
3133            let mut tokenizer = Tokenizer::new(&dialect, sql);
3134            let tokens = tokenizer.tokenize().unwrap();
3135            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3136            let name = Ident::new("text_col");
3137            let data_type = DataType::String(None);
3138            let mut extensions = ColumnExtensions::default();
3139            let result = ParserContext::parse_column_extensions(
3140                &mut parser,
3141                &name,
3142                &data_type,
3143                &mut extensions,
3144            );
3145            assert!(result.unwrap());
3146        }
3147
3148        // Test inverted index
3149        {
3150            let sql = "INVERTED INDEX";
3151            let dialect = GenericDialect {};
3152            let mut tokenizer = Tokenizer::new(&dialect, sql);
3153            let tokens = tokenizer.tokenize().unwrap();
3154            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3155            let name = Ident::new("col");
3156            let data_type = DataType::String(None);
3157            let mut extensions = ColumnExtensions::default();
3158            let result = ParserContext::parse_column_extensions(
3159                &mut parser,
3160                &name,
3161                &data_type,
3162                &mut extensions,
3163            );
3164            assert!(result.is_ok());
3165            assert!(extensions.inverted_index_options.is_some());
3166        }
3167
3168        // Test inverted index with options (should fail)
3169        {
3170            let sql = "INVERTED INDEX WITH (analyzer = 'English')";
3171            let dialect = GenericDialect {};
3172            let mut tokenizer = Tokenizer::new(&dialect, sql);
3173            let tokens = tokenizer.tokenize().unwrap();
3174            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3175            let name = Ident::new("col");
3176            let data_type = DataType::String(None);
3177            let mut extensions = ColumnExtensions::default();
3178            let result = ParserContext::parse_column_extensions(
3179                &mut parser,
3180                &name,
3181                &data_type,
3182                &mut extensions,
3183            );
3184            assert!(result.is_err());
3185            assert!(
3186                result
3187                    .unwrap_err()
3188                    .to_string()
3189                    .contains("INVERTED index doesn't support options")
3190            );
3191        }
3192
3193        // Test multiple indices
3194        {
3195            let sql = "SKIPPING INDEX FULLTEXT INDEX";
3196            let dialect = GenericDialect {};
3197            let mut tokenizer = Tokenizer::new(&dialect, sql);
3198            let tokens = tokenizer.tokenize().unwrap();
3199            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3200            let name = Ident::new("col");
3201            let data_type = DataType::String(None);
3202            let mut extensions = ColumnExtensions::default();
3203            let result = ParserContext::parse_column_extensions(
3204                &mut parser,
3205                &name,
3206                &data_type,
3207                &mut extensions,
3208            );
3209            assert!(result.unwrap());
3210            assert!(extensions.skipping_index_options.is_some());
3211            assert!(extensions.fulltext_index_options.is_some());
3212        }
3213    }
3214
3215    #[test]
3216    fn test_parse_interval_cast() {
3217        let s = "select '10s'::INTERVAL";
3218        let stmts =
3219            ParserContext::create_with_dialect(s, &GreptimeDbDialect {}, ParseOptions::default())
3220                .unwrap();
3221        assert_eq!("SELECT '10 seconds'::INTERVAL", &stmts[0].to_string());
3222    }
3223
3224    #[test]
3225    fn test_parse_create_table_vector_index_options() {
3226        // Test basic vector index
3227        let sql = r"
3228CREATE TABLE vectors (
3229    ts TIMESTAMP TIME INDEX,
3230    vec VECTOR(128) VECTOR INDEX,
3231)";
3232        let result =
3233            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3234                .unwrap();
3235
3236        if let Statement::CreateTable(c) = &result[0] {
3237            c.columns.iter().for_each(|col| {
3238                if col.name().value == "vec" {
3239                    assert!(
3240                        col.extensions
3241                            .vector_index_options
3242                            .as_ref()
3243                            .unwrap()
3244                            .is_empty()
3245                    );
3246                }
3247            });
3248        } else {
3249            panic!("should be create_table statement");
3250        }
3251
3252        // Test vector index with options
3253        let sql = r"
3254CREATE TABLE vectors (
3255    ts TIMESTAMP TIME INDEX,
3256    vec VECTOR(128) VECTOR INDEX WITH (metric='cosine', connectivity='32', expansion_add='256', expansion_search='128')
3257)";
3258        let result =
3259            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3260                .unwrap();
3261
3262        if let Statement::CreateTable(c) = &result[0] {
3263            c.columns.iter().for_each(|col| {
3264                if col.name().value == "vec" {
3265                    let options = col.extensions.vector_index_options.as_ref().unwrap();
3266                    assert_eq!(options.len(), 4);
3267                    assert_eq!(options.get("metric").unwrap(), "cosine");
3268                    assert_eq!(options.get("connectivity").unwrap(), "32");
3269                    assert_eq!(options.get("expansion_add").unwrap(), "256");
3270                    assert_eq!(options.get("expansion_search").unwrap(), "128");
3271                }
3272            });
3273        } else {
3274            panic!("should be create_table statement");
3275        }
3276    }
3277
3278    #[test]
3279    fn test_parse_create_table_vector_index_invalid_type() {
3280        // Test vector index on non-vector type (should fail)
3281        let sql = r"
3282CREATE TABLE vectors (
3283    ts TIMESTAMP TIME INDEX,
3284    col INT VECTOR INDEX,
3285)";
3286        let result =
3287            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3288        assert!(result.is_err());
3289        assert!(
3290            result
3291                .unwrap_err()
3292                .to_string()
3293                .contains("VECTOR INDEX only supports Vector type columns")
3294        );
3295    }
3296
3297    #[test]
3298    fn test_parse_create_table_vector_index_duplicate() {
3299        // Test duplicate vector index (should fail)
3300        let sql = r"
3301CREATE TABLE vectors (
3302    ts TIMESTAMP TIME INDEX,
3303    vec VECTOR(128) VECTOR INDEX VECTOR INDEX,
3304)";
3305        let result =
3306            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3307        assert!(result.is_err());
3308        assert!(
3309            result
3310                .unwrap_err()
3311                .to_string()
3312                .contains("duplicated VECTOR INDEX option")
3313        );
3314    }
3315
3316    #[test]
3317    fn test_parse_create_table_vector_index_invalid_option() {
3318        // Test invalid option key (should fail)
3319        let sql = r"
3320CREATE TABLE vectors (
3321    ts TIMESTAMP TIME INDEX,
3322    vec VECTOR(128) VECTOR INDEX WITH (metric='l2sq', invalid_option='foo')
3323)";
3324        let result =
3325            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3326        assert!(result.is_err());
3327        assert!(
3328            result
3329                .unwrap_err()
3330                .to_string()
3331                .contains("invalid VECTOR INDEX option")
3332        );
3333    }
3334
3335    #[test]
3336    fn test_parse_column_extensions_vector_index() {
3337        // Test vector index on vector type
3338        {
3339            let sql = "VECTOR INDEX WITH (metric = 'l2sq')";
3340            let dialect = GenericDialect {};
3341            let mut tokenizer = Tokenizer::new(&dialect, sql);
3342            let tokens = tokenizer.tokenize().unwrap();
3343            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3344            let name = Ident::new("vec_col");
3345            let data_type =
3346                DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
3347            // First, parse the vector type to set vector_options
3348            let mut extensions = ColumnExtensions {
3349                vector_options: Some(OptionMap::from([(
3350                    VECTOR_OPT_DIM.to_string(),
3351                    "128".to_string(),
3352                )])),
3353                ..Default::default()
3354            };
3355
3356            let result = ParserContext::parse_column_extensions(
3357                &mut parser,
3358                &name,
3359                &data_type,
3360                &mut extensions,
3361            );
3362            assert!(result.is_ok());
3363            assert!(extensions.vector_index_options.is_some());
3364            let vi_options = extensions.vector_index_options.unwrap();
3365            assert_eq!(vi_options.get("metric"), Some("l2sq"));
3366        }
3367
3368        // Test vector index on non-vector type (should fail)
3369        {
3370            let sql = "VECTOR INDEX";
3371            let dialect = GenericDialect {};
3372            let mut tokenizer = Tokenizer::new(&dialect, sql);
3373            let tokens = tokenizer.tokenize().unwrap();
3374            let mut parser = Parser::new(&dialect).with_tokens(tokens);
3375            let name = Ident::new("num_col");
3376            let data_type = DataType::Int(None); // Non-vector type
3377            let mut extensions = ColumnExtensions::default();
3378            let result = ParserContext::parse_column_extensions(
3379                &mut parser,
3380                &name,
3381                &data_type,
3382                &mut extensions,
3383            );
3384            assert!(result.is_err());
3385            assert!(
3386                result
3387                    .unwrap_err()
3388                    .to_string()
3389                    .contains("VECTOR INDEX only supports Vector type columns")
3390            );
3391        }
3392    }
3393}