sql/parsers/
create_parser.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::HashMap;
19
20use arrow_buffer::IntervalMonthDayNano;
21use common_catalog::consts::default_engine;
22use datafusion_common::ScalarValue;
23use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
24use datatypes::data_type::ConcreteDataType;
25use itertools::Itertools;
26use snafu::{ensure, OptionExt, ResultExt};
27use sqlparser::ast::{ColumnOption, ColumnOptionDef, DataType, Expr};
28use sqlparser::dialect::keywords::Keyword;
29use sqlparser::keywords::ALL_KEYWORDS;
30use sqlparser::parser::IsOptional::Mandatory;
31use sqlparser::parser::{Parser, ParserError};
32use sqlparser::tokenizer::{Token, TokenWithSpan, Word};
33use table::requests::{validate_database_option, validate_table_option};
34
35use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
36use crate::error::{
37    self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidIntervalSnafu,
38    InvalidSqlSnafu, InvalidTableOptionSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result,
39    SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu,
40};
41use crate::parser::{ParserContext, FLOW};
42use crate::parsers::utils::{
43    self, validate_column_fulltext_create_option, validate_column_skipping_index_create_option,
44};
45use crate::statements::create::{
46    Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
47    CreateTableLike, CreateView, Partitions, SqlOrTql, TableConstraint, VECTOR_OPT_DIM,
48};
49use crate::statements::statement::Statement;
50use crate::statements::transform::type_alias::get_data_type_by_alias_name;
51use crate::statements::{sql_data_type_to_concrete_data_type, OptionMap};
52use crate::util::{location_to_index, parse_option_string};
53
54pub const ENGINE: &str = "ENGINE";
55pub const MAXVALUE: &str = "MAXVALUE";
56pub const SINK: &str = "SINK";
57pub const EXPIRE: &str = "EXPIRE";
58pub const AFTER: &str = "AFTER";
59pub const INVERTED: &str = "INVERTED";
60pub const SKIPPING: &str = "SKIPPING";
61
62pub type RawIntervalExpr = String;
63
64/// Parses create [table] statement
65impl<'a> ParserContext<'a> {
66    pub(crate) fn parse_create(&mut self) -> Result<Statement> {
67        match self.parser.peek_token().token {
68            Token::Word(w) => match w.keyword {
69                Keyword::TABLE => self.parse_create_table(),
70
71                Keyword::SCHEMA | Keyword::DATABASE => self.parse_create_database(),
72
73                Keyword::EXTERNAL => self.parse_create_external_table(),
74
75                Keyword::OR => {
76                    let _ = self.parser.next_token();
77                    self.parser
78                        .expect_keyword(Keyword::REPLACE)
79                        .context(SyntaxSnafu)?;
80                    match self.parser.next_token().token {
81                        Token::Word(w) => match w.keyword {
82                            Keyword::VIEW => self.parse_create_view(true),
83                            Keyword::NoKeyword => {
84                                let uppercase = w.value.to_uppercase();
85                                match uppercase.as_str() {
86                                    FLOW => self.parse_create_flow(true),
87                                    _ => self.unsupported(w.to_string()),
88                                }
89                            }
90                            _ => self.unsupported(w.to_string()),
91                        },
92                        _ => self.unsupported(w.to_string()),
93                    }
94                }
95
96                Keyword::VIEW => {
97                    let _ = self.parser.next_token();
98                    self.parse_create_view(false)
99                }
100
101                #[cfg(feature = "enterprise")]
102                Keyword::TRIGGER => {
103                    let _ = self.parser.next_token();
104                    self.parse_create_trigger()
105                }
106
107                Keyword::NoKeyword => {
108                    let _ = self.parser.next_token();
109                    let uppercase = w.value.to_uppercase();
110                    match uppercase.as_str() {
111                        FLOW => self.parse_create_flow(false),
112                        _ => self.unsupported(w.to_string()),
113                    }
114                }
115                _ => self.unsupported(w.to_string()),
116            },
117            unexpected => self.unsupported(unexpected.to_string()),
118        }
119    }
120
121    /// Parse `CREAVE VIEW` statement.
122    fn parse_create_view(&mut self, or_replace: bool) -> Result<Statement> {
123        let if_not_exists = self.parse_if_not_exist()?;
124        let view_name = self.intern_parse_table_name()?;
125
126        let columns = self.parse_view_columns()?;
127
128        self.parser
129            .expect_keyword(Keyword::AS)
130            .context(SyntaxSnafu)?;
131
132        let query = self.parse_query()?;
133
134        Ok(Statement::CreateView(CreateView {
135            name: view_name,
136            columns,
137            or_replace,
138            query: Box::new(query),
139            if_not_exists,
140        }))
141    }
142
143    fn parse_view_columns(&mut self) -> Result<Vec<Ident>> {
144        let mut columns = vec![];
145        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
146            return Ok(columns);
147        }
148
149        loop {
150            let name = self.parse_column_name().context(SyntaxSnafu)?;
151
152            columns.push(name);
153
154            let comma = self.parser.consume_token(&Token::Comma);
155            if self.parser.consume_token(&Token::RParen) {
156                // allow a trailing comma, even though it's not in standard
157                break;
158            } else if !comma {
159                return self.expected("',' or ')' after column name", self.parser.peek_token());
160            }
161        }
162
163        Ok(columns)
164    }
165
166    fn parse_create_external_table(&mut self) -> Result<Statement> {
167        let _ = self.parser.next_token();
168        self.parser
169            .expect_keyword(Keyword::TABLE)
170            .context(SyntaxSnafu)?;
171        let if_not_exists = self.parse_if_not_exist()?;
172        let table_name = self.intern_parse_table_name()?;
173        let (columns, constraints) = self.parse_columns()?;
174        if !columns.is_empty() {
175            validate_time_index(&columns, &constraints)?;
176        }
177
178        let engine = self.parse_table_engine(common_catalog::consts::FILE_ENGINE)?;
179        let options = self.parse_create_table_options()?;
180        Ok(Statement::CreateExternalTable(CreateExternalTable {
181            name: table_name,
182            columns,
183            constraints,
184            options,
185            if_not_exists,
186            engine,
187        }))
188    }
189
190    fn parse_create_database(&mut self) -> Result<Statement> {
191        let _ = self.parser.next_token();
192        let if_not_exists = self.parse_if_not_exist()?;
193        let database_name = self.parse_object_name().context(error::UnexpectedSnafu {
194            expected: "a database name",
195            actual: self.peek_token_as_string(),
196        })?;
197        let database_name = Self::canonicalize_object_name(database_name);
198
199        let options = self
200            .parser
201            .parse_options(Keyword::WITH)
202            .context(SyntaxSnafu)?
203            .into_iter()
204            .map(parse_option_string)
205            .collect::<Result<HashMap<String, String>>>()?;
206
207        for key in options.keys() {
208            ensure!(
209                validate_database_option(key),
210                InvalidDatabaseOptionSnafu {
211                    key: key.to_string()
212                }
213            );
214        }
215        if let Some(append_mode) = options.get("append_mode") {
216            if append_mode == "true" && options.contains_key("merge_mode") {
217                return InvalidDatabaseOptionSnafu {
218                    key: "merge_mode".to_string(),
219                }
220                .fail();
221            }
222        }
223
224        Ok(Statement::CreateDatabase(CreateDatabase {
225            name: database_name,
226            if_not_exists,
227            options: options.into(),
228        }))
229    }
230
231    fn parse_create_table(&mut self) -> Result<Statement> {
232        let _ = self.parser.next_token();
233
234        let if_not_exists = self.parse_if_not_exist()?;
235
236        let table_name = self.intern_parse_table_name()?;
237
238        if self.parser.parse_keyword(Keyword::LIKE) {
239            let source_name = self.intern_parse_table_name()?;
240
241            return Ok(Statement::CreateTableLike(CreateTableLike {
242                table_name,
243                source_name,
244            }));
245        }
246
247        let (columns, constraints) = self.parse_columns()?;
248        validate_time_index(&columns, &constraints)?;
249
250        let partitions = self.parse_partitions()?;
251        if let Some(partitions) = &partitions {
252            validate_partitions(&columns, partitions)?;
253        }
254
255        let engine = self.parse_table_engine(default_engine())?;
256        let options = self.parse_create_table_options()?;
257        let create_table = CreateTable {
258            if_not_exists,
259            name: table_name,
260            columns,
261            engine,
262            constraints,
263            options,
264            table_id: 0, // table id is assigned by catalog manager
265            partitions,
266        };
267
268        Ok(Statement::CreateTable(create_table))
269    }
270
271    /// "CREATE FLOW" clause
272    fn parse_create_flow(&mut self, or_replace: bool) -> Result<Statement> {
273        let if_not_exists = self.parse_if_not_exist()?;
274
275        let flow_name = self.intern_parse_table_name()?;
276
277        // make `SINK` case in-sensitive
278        if let Token::Word(word) = self.parser.peek_token().token
279            && word.value.eq_ignore_ascii_case(SINK)
280        {
281            self.parser.next_token();
282        } else {
283            Err(ParserError::ParserError(
284                "Expect `SINK` keyword".to_string(),
285            ))
286            .context(SyntaxSnafu)?
287        }
288        self.parser
289            .expect_keyword(Keyword::TO)
290            .context(SyntaxSnafu)?;
291
292        let output_table_name = self.intern_parse_table_name()?;
293
294        let expire_after = if self
295            .parser
296            .consume_tokens(&[Token::make_keyword(EXPIRE), Token::make_keyword(AFTER)])
297        {
298            Some(self.parse_interval()?)
299        } else {
300            None
301        };
302
303        let comment = if self.parser.parse_keyword(Keyword::COMMENT) {
304            match self.parser.next_token() {
305                TokenWithSpan {
306                    token: Token::SingleQuotedString(value, ..),
307                    ..
308                } => Some(value),
309                unexpected => {
310                    return self
311                        .parser
312                        .expected("string", unexpected)
313                        .context(SyntaxSnafu)
314                }
315            }
316        } else {
317            None
318        };
319
320        self.parser
321            .expect_keyword(Keyword::AS)
322            .context(SyntaxSnafu)?;
323
324        let start_loc = self.parser.peek_token().span.start;
325        let start_index = location_to_index(self.sql, &start_loc);
326
327        let query = self.parse_statement()?;
328        let end_token = self.parser.peek_token();
329
330        let raw_query = if end_token == Token::EOF {
331            &self.sql[start_index..]
332        } else {
333            let end_loc = end_token.span.end;
334            let end_index = location_to_index(self.sql, &end_loc);
335            &self.sql[start_index..end_index.min(self.sql.len())]
336        };
337        let raw_query = raw_query.trim_end_matches(";");
338
339        let query = Box::new(SqlOrTql::try_from_statement(query, raw_query)?);
340
341        Ok(Statement::CreateFlow(CreateFlow {
342            flow_name,
343            sink_table_name: output_table_name,
344            or_replace,
345            if_not_exists,
346            expire_after,
347            comment,
348            query,
349        }))
350    }
351
352    /// Parse the interval expr to duration in seconds.
353    fn parse_interval(&mut self) -> Result<i64> {
354        let interval = self.parse_interval_month_day_nano()?.0;
355        Ok(
356            interval.nanoseconds / 1_000_000_000
357                + interval.days as i64 * 60 * 60 * 24
358                + interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, // 1 month=365.25/12=30.44 days
359                                                                       // this is to keep the same as https://docs.rs/humantime/latest/humantime/fn.parse_duration.html
360                                                                       // which we use in database to parse i.e. ttl interval and many other intervals
361        )
362    }
363
364    /// Parse interval expr to [`IntervalMonthDayNano`].
365    fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> {
366        let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
367        let raw_interval_expr = interval_expr.to_string();
368        let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone())?
369            .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
370            .ok()
371            .with_context(|| InvalidIntervalSnafu {
372                reason: format!("cannot cast {} to interval type", interval_expr),
373            })?;
374        if let ScalarValue::IntervalMonthDayNano(Some(interval)) = interval {
375            Ok((interval, raw_interval_expr))
376        } else {
377            unreachable!()
378        }
379    }
380
381    fn parse_if_not_exist(&mut self) -> Result<bool> {
382        match self.parser.peek_token().token {
383            Token::Word(w) if Keyword::IF != w.keyword => return Ok(false),
384            _ => {}
385        }
386
387        if self.parser.parse_keywords(&[Keyword::IF, Keyword::NOT]) {
388            return self
389                .parser
390                .expect_keyword(Keyword::EXISTS)
391                .map(|_| true)
392                .context(UnexpectedSnafu {
393                    expected: "EXISTS",
394                    actual: self.peek_token_as_string(),
395                });
396        }
397
398        if self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]) {
399            return UnsupportedSnafu { keyword: "EXISTS" }.fail();
400        }
401
402        Ok(false)
403    }
404
405    fn parse_create_table_options(&mut self) -> Result<OptionMap> {
406        let options = self
407            .parser
408            .parse_options(Keyword::WITH)
409            .context(SyntaxSnafu)?
410            .into_iter()
411            .map(parse_option_string)
412            .collect::<Result<HashMap<String, String>>>()?;
413        for key in options.keys() {
414            ensure!(validate_table_option(key), InvalidTableOptionSnafu { key });
415        }
416        Ok(options.into())
417    }
418
419    /// "PARTITION ON COLUMNS (...)" clause
420    fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
421        if !self.parser.parse_keyword(Keyword::PARTITION) {
422            return Ok(None);
423        }
424        self.parser
425            .expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
426            .context(error::UnexpectedSnafu {
427                expected: "ON, COLUMNS",
428                actual: self.peek_token_as_string(),
429            })?;
430
431        let raw_column_list = self
432            .parser
433            .parse_parenthesized_column_list(Mandatory, false)
434            .context(error::SyntaxSnafu)?;
435        let column_list = raw_column_list
436            .into_iter()
437            .map(Self::canonicalize_identifier)
438            .collect();
439
440        let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
441
442        Ok(Some(Partitions { column_list, exprs }))
443    }
444
445    fn parse_partition_entry(&mut self) -> Result<Expr> {
446        self.parser.parse_expr().context(error::SyntaxSnafu)
447    }
448
449    /// Parse a comma-separated list wrapped by "()", and of which all items accepted by `F`
450    fn parse_comma_separated<T, F>(&mut self, mut f: F) -> Result<Vec<T>>
451    where
452        F: FnMut(&mut ParserContext<'a>) -> Result<T>,
453    {
454        self.parser
455            .expect_token(&Token::LParen)
456            .context(error::UnexpectedSnafu {
457                expected: "(",
458                actual: self.peek_token_as_string(),
459            })?;
460
461        let mut values = vec![];
462        while self.parser.peek_token() != Token::RParen {
463            values.push(f(self)?);
464            if !self.parser.consume_token(&Token::Comma) {
465                break;
466            }
467        }
468
469        self.parser
470            .expect_token(&Token::RParen)
471            .context(error::UnexpectedSnafu {
472                expected: ")",
473                actual: self.peek_token_as_string(),
474            })?;
475
476        Ok(values)
477    }
478
479    /// Parse the columns and constraints.
480    fn parse_columns(&mut self) -> Result<(Vec<Column>, Vec<TableConstraint>)> {
481        let mut columns = vec![];
482        let mut constraints = vec![];
483        if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
484            return Ok((columns, constraints));
485        }
486
487        loop {
488            if let Some(constraint) = self.parse_optional_table_constraint()? {
489                constraints.push(constraint);
490            } else if let Token::Word(_) = self.parser.peek_token().token {
491                self.parse_column(&mut columns, &mut constraints)?;
492            } else {
493                return self.expected(
494                    "column name or constraint definition",
495                    self.parser.peek_token(),
496                );
497            }
498            let comma = self.parser.consume_token(&Token::Comma);
499            if self.parser.consume_token(&Token::RParen) {
500                // allow a trailing comma, even though it's not in standard
501                break;
502            } else if !comma {
503                return self.expected(
504                    "',' or ')' after column definition",
505                    self.parser.peek_token(),
506                );
507            }
508        }
509
510        Ok((columns, constraints))
511    }
512
513    fn parse_column(
514        &mut self,
515        columns: &mut Vec<Column>,
516        constraints: &mut Vec<TableConstraint>,
517    ) -> Result<()> {
518        let mut column = self.parse_column_def()?;
519
520        let mut time_index_opt_idx = None;
521        for (index, opt) in column.options().iter().enumerate() {
522            if let ColumnOption::DialectSpecific(tokens) = &opt.option {
523                if matches!(
524                    &tokens[..],
525                    [
526                        Token::Word(Word {
527                            keyword: Keyword::TIME,
528                            ..
529                        }),
530                        Token::Word(Word {
531                            keyword: Keyword::INDEX,
532                            ..
533                        })
534                    ]
535                ) {
536                    ensure!(
537                        time_index_opt_idx.is_none(),
538                        InvalidColumnOptionSnafu {
539                            name: column.name().to_string(),
540                            msg: "duplicated time index",
541                        }
542                    );
543                    time_index_opt_idx = Some(index);
544
545                    let constraint = TableConstraint::TimeIndex {
546                        column: Ident::new(column.name().value.clone()),
547                    };
548                    constraints.push(constraint);
549                }
550            }
551        }
552
553        if let Some(index) = time_index_opt_idx {
554            ensure!(
555                !column.options().contains(&ColumnOptionDef {
556                    option: ColumnOption::Null,
557                    name: None,
558                }),
559                InvalidColumnOptionSnafu {
560                    name: column.name().to_string(),
561                    msg: "time index column can't be null",
562                }
563            );
564
565            // The timestamp type may be an alias type, we have to retrieve the actual type.
566            let data_type = get_unalias_type(column.data_type());
567            ensure!(
568                matches!(data_type, DataType::Timestamp(_, _)),
569                InvalidColumnOptionSnafu {
570                    name: column.name().to_string(),
571                    msg: "time index column data type should be timestamp",
572                }
573            );
574
575            let not_null_opt = ColumnOptionDef {
576                option: ColumnOption::NotNull,
577                name: None,
578            };
579
580            if !column.options().contains(&not_null_opt) {
581                column.mut_options().push(not_null_opt);
582            }
583
584            let _ = column.mut_options().remove(index);
585        }
586
587        columns.push(column);
588
589        Ok(())
590    }
591
592    /// Parse the column name and check if it's valid.
593    fn parse_column_name(&mut self) -> std::result::Result<Ident, ParserError> {
594        let name = self.parser.parse_identifier()?;
595        if name.quote_style.is_none() &&
596        // "ALL_KEYWORDS" are sorted.
597            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()
598        {
599            return Err(ParserError::ParserError(format!(
600                "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
601                &name.value
602            )));
603        }
604
605        Ok(name)
606    }
607
608    pub fn parse_column_def(&mut self) -> Result<Column> {
609        let name = self.parse_column_name().context(SyntaxSnafu)?;
610        let parser = &mut self.parser;
611
612        ensure!(
613            !(name.quote_style.is_none() &&
614            // "ALL_KEYWORDS" are sorted.
615            ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()),
616            InvalidSqlSnafu {
617                msg: format!(
618                    "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
619                    &name.value
620                ),
621            }
622        );
623
624        let data_type = parser.parse_data_type().context(SyntaxSnafu)?;
625        let mut options = vec![];
626        let mut extensions = ColumnExtensions::default();
627        loop {
628            if parser.parse_keyword(Keyword::CONSTRAINT) {
629                let name = Some(parser.parse_identifier().context(SyntaxSnafu)?);
630                if let Some(option) = Self::parse_optional_column_option(parser)? {
631                    options.push(ColumnOptionDef { name, option });
632                } else {
633                    return parser
634                        .expected(
635                            "constraint details after CONSTRAINT <name>",
636                            parser.peek_token(),
637                        )
638                        .context(SyntaxSnafu);
639                }
640            } else if let Some(option) = Self::parse_optional_column_option(parser)? {
641                options.push(ColumnOptionDef { name: None, option });
642            } else if !Self::parse_column_extensions(parser, &name, &data_type, &mut extensions)? {
643                break;
644            };
645        }
646
647        Ok(Column {
648            column_def: ColumnDef {
649                name: Self::canonicalize_identifier(name),
650                data_type,
651                options,
652            },
653            extensions,
654        })
655    }
656
657    fn parse_optional_column_option(parser: &mut Parser<'_>) -> Result<Option<ColumnOption>> {
658        if parser.parse_keywords(&[Keyword::CHARACTER, Keyword::SET]) {
659            Ok(Some(ColumnOption::CharacterSet(
660                parser.parse_object_name(false).context(SyntaxSnafu)?,
661            )))
662        } else if parser.parse_keywords(&[Keyword::NOT, Keyword::NULL]) {
663            Ok(Some(ColumnOption::NotNull))
664        } else if parser.parse_keywords(&[Keyword::COMMENT]) {
665            match parser.next_token() {
666                TokenWithSpan {
667                    token: Token::SingleQuotedString(value, ..),
668                    ..
669                } => Ok(Some(ColumnOption::Comment(value))),
670                unexpected => parser.expected("string", unexpected).context(SyntaxSnafu),
671            }
672        } else if parser.parse_keyword(Keyword::NULL) {
673            Ok(Some(ColumnOption::Null))
674        } else if parser.parse_keyword(Keyword::DEFAULT) {
675            Ok(Some(ColumnOption::Default(
676                parser.parse_expr().context(SyntaxSnafu)?,
677            )))
678        } else if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
679            Ok(Some(ColumnOption::Unique {
680                is_primary: true,
681                characteristics: None,
682            }))
683        } else if parser.parse_keyword(Keyword::UNIQUE) {
684            Ok(Some(ColumnOption::Unique {
685                is_primary: false,
686                characteristics: None,
687            }))
688        } else if parser.parse_keywords(&[Keyword::TIME, Keyword::INDEX]) {
689            // Use a DialectSpecific option for time index
690            Ok(Some(ColumnOption::DialectSpecific(vec![
691                Token::Word(Word {
692                    value: "TIME".to_string(),
693                    quote_style: None,
694                    keyword: Keyword::TIME,
695                }),
696                Token::Word(Word {
697                    value: "INDEX".to_string(),
698                    quote_style: None,
699                    keyword: Keyword::INDEX,
700                }),
701            ])))
702        } else {
703            Ok(None)
704        }
705    }
706
707    /// Parse a column option extensions.
708    ///
709    /// This function will handle:
710    /// - Vector type
711    /// - Indexes
712    fn parse_column_extensions(
713        parser: &mut Parser<'_>,
714        column_name: &Ident,
715        column_type: &DataType,
716        column_extensions: &mut ColumnExtensions,
717    ) -> Result<bool> {
718        if let DataType::Custom(name, tokens) = column_type
719            && name.0.len() == 1
720            && &name.0[0].to_string_unquoted().to_uppercase() == "VECTOR"
721        {
722            ensure!(
723                tokens.len() == 1,
724                InvalidColumnOptionSnafu {
725                    name: column_name.to_string(),
726                    msg: "VECTOR type should have dimension",
727                }
728            );
729
730            let dimension =
731                tokens[0]
732                    .parse::<u32>()
733                    .ok()
734                    .with_context(|| InvalidColumnOptionSnafu {
735                        name: column_name.to_string(),
736                        msg: "dimension should be a positive integer",
737                    })?;
738
739            let options = OptionMap::from([(VECTOR_OPT_DIM.to_string(), dimension.to_string())]);
740            column_extensions.vector_options = Some(options);
741        }
742
743        // parse index options in column definition
744        let mut is_index_declared = false;
745
746        // skipping index
747        if let Token::Word(word) = parser.peek_token().token
748            && word.value.eq_ignore_ascii_case(SKIPPING)
749        {
750            parser.next_token();
751            // Consume `INDEX` keyword
752            ensure!(
753                parser.parse_keyword(Keyword::INDEX),
754                InvalidColumnOptionSnafu {
755                    name: column_name.to_string(),
756                    msg: "expect INDEX after SKIPPING keyword",
757                }
758            );
759            ensure!(
760                column_extensions.skipping_index_options.is_none(),
761                InvalidColumnOptionSnafu {
762                    name: column_name.to_string(),
763                    msg: "duplicated SKIPPING index option",
764                }
765            );
766
767            let options = parser
768                .parse_options(Keyword::WITH)
769                .context(error::SyntaxSnafu)?
770                .into_iter()
771                .map(parse_option_string)
772                .collect::<Result<HashMap<String, String>>>()?;
773
774            for key in options.keys() {
775                ensure!(
776                    validate_column_skipping_index_create_option(key),
777                    InvalidColumnOptionSnafu {
778                        name: column_name.to_string(),
779                        msg: format!("invalid SKIPPING INDEX option: {key}"),
780                    }
781                );
782            }
783
784            column_extensions.skipping_index_options = Some(options.into());
785            is_index_declared |= true;
786        }
787
788        // fulltext index
789        if parser.parse_keyword(Keyword::FULLTEXT) {
790            // Consume `INDEX` keyword
791            ensure!(
792                parser.parse_keyword(Keyword::INDEX),
793                InvalidColumnOptionSnafu {
794                    name: column_name.to_string(),
795                    msg: "expect INDEX after FULLTEXT keyword",
796                }
797            );
798
799            ensure!(
800                column_extensions.fulltext_index_options.is_none(),
801                InvalidColumnOptionSnafu {
802                    name: column_name.to_string(),
803                    msg: "duplicated FULLTEXT INDEX option",
804                }
805            );
806
807            let column_type = get_unalias_type(column_type);
808            let data_type = sql_data_type_to_concrete_data_type(&column_type)?;
809            ensure!(
810                data_type == ConcreteDataType::string_datatype(),
811                InvalidColumnOptionSnafu {
812                    name: column_name.to_string(),
813                    msg: "FULLTEXT index only supports string type",
814                }
815            );
816
817            let options = parser
818                .parse_options(Keyword::WITH)
819                .context(error::SyntaxSnafu)?
820                .into_iter()
821                .map(parse_option_string)
822                .collect::<Result<HashMap<String, String>>>()?;
823
824            for key in options.keys() {
825                ensure!(
826                    validate_column_fulltext_create_option(key),
827                    InvalidColumnOptionSnafu {
828                        name: column_name.to_string(),
829                        msg: format!("invalid FULLTEXT INDEX option: {key}"),
830                    }
831                );
832            }
833
834            column_extensions.fulltext_index_options = Some(options.into());
835            is_index_declared |= true;
836        }
837
838        // inverted index
839        if let Token::Word(word) = parser.peek_token().token
840            && word.value.eq_ignore_ascii_case(INVERTED)
841        {
842            parser.next_token();
843            // Consume `INDEX` keyword
844            ensure!(
845                parser.parse_keyword(Keyword::INDEX),
846                InvalidColumnOptionSnafu {
847                    name: column_name.to_string(),
848                    msg: "expect INDEX after INVERTED keyword",
849                }
850            );
851
852            ensure!(
853                column_extensions.inverted_index_options.is_none(),
854                InvalidColumnOptionSnafu {
855                    name: column_name.to_string(),
856                    msg: "duplicated INVERTED index option",
857                }
858            );
859
860            // inverted index doesn't have options, skipping `WITH`
861            // try cache `WITH` and throw error
862            let with_token = parser.peek_token();
863            ensure!(
864                with_token.token
865                    != Token::Word(Word {
866                        value: "WITH".to_string(),
867                        keyword: Keyword::WITH,
868                        quote_style: None,
869                    }),
870                InvalidColumnOptionSnafu {
871                    name: column_name.to_string(),
872                    msg: "INVERTED index doesn't support options",
873                }
874            );
875
876            column_extensions.inverted_index_options = Some(OptionMap::default());
877            is_index_declared |= true;
878        }
879
880        Ok(is_index_declared)
881    }
882
883    fn parse_optional_table_constraint(&mut self) -> Result<Option<TableConstraint>> {
884        match self.parser.next_token() {
885            TokenWithSpan {
886                token: Token::Word(w),
887                ..
888            } if w.keyword == Keyword::PRIMARY => {
889                self.parser
890                    .expect_keyword(Keyword::KEY)
891                    .context(error::UnexpectedSnafu {
892                        expected: "KEY",
893                        actual: self.peek_token_as_string(),
894                    })?;
895                let raw_columns = self
896                    .parser
897                    .parse_parenthesized_column_list(Mandatory, false)
898                    .context(error::SyntaxSnafu)?;
899                let columns = raw_columns
900                    .into_iter()
901                    .map(Self::canonicalize_identifier)
902                    .collect();
903                Ok(Some(TableConstraint::PrimaryKey { columns }))
904            }
905            TokenWithSpan {
906                token: Token::Word(w),
907                ..
908            } if w.keyword == Keyword::TIME => {
909                self.parser
910                    .expect_keyword(Keyword::INDEX)
911                    .context(error::UnexpectedSnafu {
912                        expected: "INDEX",
913                        actual: self.peek_token_as_string(),
914                    })?;
915
916                let raw_columns = self
917                    .parser
918                    .parse_parenthesized_column_list(Mandatory, false)
919                    .context(error::SyntaxSnafu)?;
920                let mut columns = raw_columns
921                    .into_iter()
922                    .map(Self::canonicalize_identifier)
923                    .collect::<Vec<_>>();
924
925                ensure!(
926                    columns.len() == 1,
927                    InvalidTimeIndexSnafu {
928                        msg: "it should contain only one column in time index",
929                    }
930                );
931
932                Ok(Some(TableConstraint::TimeIndex {
933                    column: columns.pop().unwrap(),
934                }))
935            }
936            _ => {
937                self.parser.prev_token();
938                Ok(None)
939            }
940        }
941    }
942
943    /// Parses the set of valid formats
944    fn parse_table_engine(&mut self, default: &str) -> Result<String> {
945        if !self.consume_token(ENGINE) {
946            return Ok(default.to_string());
947        }
948
949        self.parser
950            .expect_token(&Token::Eq)
951            .context(error::UnexpectedSnafu {
952                expected: "=",
953                actual: self.peek_token_as_string(),
954            })?;
955
956        let token = self.parser.next_token();
957        if let Token::Word(w) = token.token {
958            Ok(w.value)
959        } else {
960            self.expected("'Engine' is missing", token)
961        }
962    }
963}
964
965fn validate_time_index(columns: &[Column], constraints: &[TableConstraint]) -> Result<()> {
966    let time_index_constraints: Vec<_> = constraints
967        .iter()
968        .filter_map(|c| match c {
969            TableConstraint::TimeIndex { column } => Some(column),
970            _ => None,
971        })
972        .unique()
973        .collect();
974
975    ensure!(!time_index_constraints.is_empty(), MissingTimeIndexSnafu);
976    ensure!(
977        time_index_constraints.len() == 1,
978        InvalidTimeIndexSnafu {
979            msg: format!(
980                "expected only one time index constraint but actual {}",
981                time_index_constraints.len()
982            ),
983        }
984    );
985
986    // It's safe to use time_index_constraints[0][0],
987    // we already check the bound above.
988    let time_index_column_ident = &time_index_constraints[0];
989    let time_index_column = columns
990        .iter()
991        .find(|c| c.name().value == *time_index_column_ident.value)
992        .with_context(|| InvalidTimeIndexSnafu {
993            msg: format!(
994                "time index column {} not found in columns",
995                time_index_column_ident
996            ),
997        })?;
998
999    let time_index_data_type = get_unalias_type(time_index_column.data_type());
1000    ensure!(
1001        matches!(time_index_data_type, DataType::Timestamp(_, _)),
1002        InvalidColumnOptionSnafu {
1003            name: time_index_column.name().to_string(),
1004            msg: "time index column data type should be timestamp",
1005        }
1006    );
1007
1008    Ok(())
1009}
1010
1011fn get_unalias_type(data_type: &DataType) -> DataType {
1012    match data_type {
1013        DataType::Custom(name, tokens) if name.0.len() == 1 && tokens.is_empty() => {
1014            if let Some(real_type) =
1015                get_data_type_by_alias_name(name.0[0].to_string_unquoted().as_str())
1016            {
1017                real_type
1018            } else {
1019                data_type.clone()
1020            }
1021        }
1022        _ => data_type.clone(),
1023    }
1024}
1025
1026fn validate_partitions(columns: &[Column], partitions: &Partitions) -> Result<()> {
1027    let partition_columns = ensure_partition_columns_defined(columns, partitions)?;
1028
1029    ensure_exprs_are_binary(&partitions.exprs, &partition_columns)?;
1030
1031    Ok(())
1032}
1033
1034/// Ensure all exprs are binary expr and all the columns are defined in the column list.
1035fn ensure_exprs_are_binary(exprs: &[Expr], columns: &[&Column]) -> Result<()> {
1036    for expr in exprs {
1037        // The first level must be binary expr
1038        if let Expr::BinaryOp { left, op: _, right } = expr {
1039            ensure_one_expr(left, columns)?;
1040            ensure_one_expr(right, columns)?;
1041        } else {
1042            return error::InvalidSqlSnafu {
1043                msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1044            }
1045            .fail();
1046        }
1047    }
1048    Ok(())
1049}
1050
1051/// Check if the expr is a binary expr, an ident or a literal value.
1052/// If is ident, then check it is in the column list.
1053/// This recursive function is intended to be used by [ensure_exprs_are_binary].
1054fn ensure_one_expr(expr: &Expr, columns: &[&Column]) -> Result<()> {
1055    match expr {
1056        Expr::BinaryOp { left, op: _, right } => {
1057            ensure_one_expr(left, columns)?;
1058            ensure_one_expr(right, columns)?;
1059            Ok(())
1060        }
1061        Expr::Identifier(ident) => {
1062            let column_name = &ident.value;
1063            ensure!(
1064                columns.iter().any(|c| &c.name().value == column_name),
1065                error::InvalidSqlSnafu {
1066                    msg: format!(
1067                        "Column {:?} in rule expr is not referenced in PARTITION ON",
1068                        column_name
1069                    ),
1070                }
1071            );
1072            Ok(())
1073        }
1074        Expr::Value(_) => Ok(()),
1075        Expr::UnaryOp { expr, .. } => {
1076            ensure_one_expr(expr, columns)?;
1077            Ok(())
1078        }
1079        _ => error::InvalidSqlSnafu {
1080            msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1081        }
1082        .fail(),
1083    }
1084}
1085
1086/// Ensure that all columns used in "PARTITION ON COLUMNS" are defined in create table.
1087fn ensure_partition_columns_defined<'a>(
1088    columns: &'a [Column],
1089    partitions: &'a Partitions,
1090) -> Result<Vec<&'a Column>> {
1091    partitions
1092        .column_list
1093        .iter()
1094        .map(|x| {
1095            let x = ParserContext::canonicalize_identifier(x.clone());
1096            // Normally the columns in "create table" won't be too many,
1097            // a linear search to find the target every time is fine.
1098            columns
1099                .iter()
1100                .find(|c| *c.name().value == x.value)
1101                .context(error::InvalidSqlSnafu {
1102                    msg: format!("Partition column {:?} not defined", x.value),
1103                })
1104        })
1105        .collect::<Result<Vec<&Column>>>()
1106}
1107
1108#[cfg(test)]
1109mod tests {
1110    use std::assert_matches::assert_matches;
1111    use std::collections::HashMap;
1112
1113    use common_catalog::consts::FILE_ENGINE;
1114    use common_error::ext::ErrorExt;
1115    use sqlparser::ast::ColumnOption::NotNull;
1116    use sqlparser::ast::{BinaryOperator, Expr, ObjectName, Value};
1117    use sqlparser::dialect::GenericDialect;
1118    use sqlparser::tokenizer::Tokenizer;
1119
1120    use super::*;
1121    use crate::dialect::GreptimeDbDialect;
1122    use crate::parser::ParseOptions;
1123
1124    #[test]
1125    fn test_parse_create_table_like() {
1126        let sql = "CREATE TABLE t1 LIKE t2";
1127        let stmts =
1128            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1129                .unwrap();
1130
1131        assert_eq!(1, stmts.len());
1132        match &stmts[0] {
1133            Statement::CreateTableLike(c) => {
1134                assert_eq!(c.table_name.to_string(), "t1");
1135                assert_eq!(c.source_name.to_string(), "t2");
1136            }
1137            _ => unreachable!(),
1138        }
1139    }
1140
1141    #[test]
1142    fn test_validate_external_table_options() {
1143        let sql = "CREATE EXTERNAL TABLE city (
1144            host string,
1145            ts timestamp,
1146            cpu float64 default 0,
1147            memory float64,
1148            TIME INDEX (ts),
1149            PRIMARY KEY(ts, host)
1150        ) with(location='/var/data/city.csv',format='csv',foo='bar');";
1151
1152        let result =
1153            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1154        assert!(matches!(
1155            result,
1156            Err(error::Error::InvalidTableOption { .. })
1157        ));
1158    }
1159
1160    #[test]
1161    fn test_parse_create_external_table() {
1162        struct Test<'a> {
1163            sql: &'a str,
1164            expected_table_name: &'a str,
1165            expected_options: HashMap<String, String>,
1166            expected_engine: &'a str,
1167            expected_if_not_exist: bool,
1168        }
1169
1170        let tests = [
1171            Test {
1172                sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
1173                expected_table_name: "city",
1174                expected_options: HashMap::from([
1175                    ("location".to_string(), "/var/data/city.csv".to_string()),
1176                    ("format".to_string(), "csv".to_string()),
1177                ]),
1178                expected_engine: FILE_ENGINE,
1179                expected_if_not_exist: false,
1180            },
1181            Test {
1182                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');",
1183                expected_table_name: "city",
1184                expected_options: HashMap::from([
1185                    ("location".to_string(), "/var/data/city.csv".to_string()),
1186                    ("format".to_string(), "csv".to_string()),
1187                ]),
1188                expected_engine: "foo",
1189                expected_if_not_exist: true,
1190            },
1191            Test {
1192                sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv','compaction.type'='bar');",
1193                expected_table_name: "city",
1194                expected_options: HashMap::from([
1195                    ("location".to_string(), "/var/data/city.csv".to_string()),
1196                    ("format".to_string(), "csv".to_string()),
1197                    ("compaction.type".to_string(), "bar".to_string()),
1198                ]),
1199                expected_engine: "foo",
1200                expected_if_not_exist: true,
1201            },
1202        ];
1203
1204        for test in tests {
1205            let stmts = ParserContext::create_with_dialect(
1206                test.sql,
1207                &GreptimeDbDialect {},
1208                ParseOptions::default(),
1209            )
1210            .unwrap();
1211            assert_eq!(1, stmts.len());
1212            match &stmts[0] {
1213                Statement::CreateExternalTable(c) => {
1214                    assert_eq!(c.name.to_string(), test.expected_table_name.to_string());
1215                    assert_eq!(c.options, test.expected_options.into());
1216                    assert_eq!(c.if_not_exists, test.expected_if_not_exist);
1217                    assert_eq!(c.engine, test.expected_engine);
1218                }
1219                _ => unreachable!(),
1220            }
1221        }
1222    }
1223
1224    #[test]
1225    fn test_parse_create_external_table_with_schema() {
1226        let sql = "CREATE EXTERNAL TABLE city (
1227            host string,
1228            ts timestamp,
1229            cpu float32 default 0,
1230            memory float64,
1231            TIME INDEX (ts),
1232            PRIMARY KEY(ts, host),
1233        ) with(location='/var/data/city.csv',format='csv');";
1234
1235        let options = HashMap::from([
1236            ("location".to_string(), "/var/data/city.csv".to_string()),
1237            ("format".to_string(), "csv".to_string()),
1238        ]);
1239
1240        let stmts =
1241            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1242                .unwrap();
1243        assert_eq!(1, stmts.len());
1244        match &stmts[0] {
1245            Statement::CreateExternalTable(c) => {
1246                assert_eq!(c.name.to_string(), "city");
1247                assert_eq!(c.options, options.into());
1248
1249                let columns = &c.columns;
1250                assert_column_def(&columns[0].column_def, "host", "STRING");
1251                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
1252                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
1253                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
1254
1255                let constraints = &c.constraints;
1256                assert_eq!(
1257                    &constraints[0],
1258                    &TableConstraint::TimeIndex {
1259                        column: Ident::new("ts"),
1260                    }
1261                );
1262                assert_eq!(
1263                    &constraints[1],
1264                    &TableConstraint::PrimaryKey {
1265                        columns: vec![Ident::new("ts"), Ident::new("host")]
1266                    }
1267                );
1268            }
1269            _ => unreachable!(),
1270        }
1271    }
1272
1273    #[test]
1274    fn test_parse_create_database() {
1275        let sql = "create database";
1276        let result =
1277            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1278        assert!(result
1279            .unwrap_err()
1280            .to_string()
1281            .contains("Unexpected token while parsing SQL statement"));
1282
1283        let sql = "create database prometheus";
1284        let stmts =
1285            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1286                .unwrap();
1287
1288        assert_eq!(1, stmts.len());
1289        match &stmts[0] {
1290            Statement::CreateDatabase(c) => {
1291                assert_eq!(c.name.to_string(), "prometheus");
1292                assert!(!c.if_not_exists);
1293            }
1294            _ => unreachable!(),
1295        }
1296
1297        let sql = "create database if not exists prometheus";
1298        let stmts =
1299            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1300                .unwrap();
1301
1302        assert_eq!(1, stmts.len());
1303        match &stmts[0] {
1304            Statement::CreateDatabase(c) => {
1305                assert_eq!(c.name.to_string(), "prometheus");
1306                assert!(c.if_not_exists);
1307            }
1308            _ => unreachable!(),
1309        }
1310
1311        let sql = "CREATE DATABASE `fOo`";
1312        let result =
1313            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1314        let stmts = result.unwrap();
1315        match &stmts.last().unwrap() {
1316            Statement::CreateDatabase(c) => {
1317                assert_eq!(c.name, vec![Ident::with_quote('`', "fOo")].into());
1318                assert!(!c.if_not_exists);
1319            }
1320            _ => unreachable!(),
1321        }
1322
1323        let sql = "CREATE DATABASE prometheus with (ttl='1h');";
1324        let result =
1325            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1326        let stmts = result.unwrap();
1327        match &stmts[0] {
1328            Statement::CreateDatabase(c) => {
1329                assert_eq!(c.name.to_string(), "prometheus");
1330                assert!(!c.if_not_exists);
1331                assert_eq!(c.options.get("ttl").unwrap(), "1h");
1332            }
1333            _ => unreachable!(),
1334        }
1335    }
1336
1337    #[test]
1338    fn test_parse_create_flow_more_testcases() {
1339        use pretty_assertions::assert_eq;
1340        fn parse_create_flow(sql: &str) -> CreateFlow {
1341            let stmts = ParserContext::create_with_dialect(
1342                sql,
1343                &GreptimeDbDialect {},
1344                ParseOptions::default(),
1345            )
1346            .unwrap();
1347            assert_eq!(1, stmts.len());
1348            match &stmts[0] {
1349                Statement::CreateFlow(c) => c.clone(),
1350                _ => unreachable!(),
1351            }
1352        }
1353        struct CreateFlowWoutQuery {
1354            /// Flow name
1355            pub flow_name: ObjectName,
1356            /// Output (sink) table name
1357            pub sink_table_name: ObjectName,
1358            /// Whether to replace existing task
1359            pub or_replace: bool,
1360            /// Create if not exist
1361            pub if_not_exists: bool,
1362            /// `EXPIRE AFTER`
1363            /// Duration in second as `i64`
1364            pub expire_after: Option<i64>,
1365            /// Comment string
1366            pub comment: Option<String>,
1367        }
1368        let testcases = vec![
1369            (
1370                r"
1371CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1372SINK TO schema_1.table_1
1373EXPIRE AFTER INTERVAL '5 minutes'
1374COMMENT 'test comment'
1375AS
1376SELECT max(c1), min(c2) FROM schema_2.table_2;",
1377                CreateFlowWoutQuery {
1378                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1379                    sink_table_name: ObjectName::from(vec![
1380                        Ident::new("schema_1"),
1381                        Ident::new("table_1"),
1382                    ]),
1383                    or_replace: true,
1384                    if_not_exists: true,
1385                    expire_after: Some(300),
1386                    comment: Some("test comment".to_string()),
1387                },
1388            ),
1389            (
1390                r"
1391CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1392SINK TO schema_1.table_1
1393EXPIRE AFTER INTERVAL '300 s'
1394COMMENT 'test comment'
1395AS
1396SELECT max(c1), min(c2) FROM schema_2.table_2;",
1397                CreateFlowWoutQuery {
1398                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1399                    sink_table_name: ObjectName::from(vec![
1400                        Ident::new("schema_1"),
1401                        Ident::new("table_1"),
1402                    ]),
1403                    or_replace: true,
1404                    if_not_exists: true,
1405                    expire_after: Some(300),
1406                    comment: Some("test comment".to_string()),
1407                },
1408            ),
1409            (
1410                r"
1411CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1412SINK TO schema_1.table_1
1413EXPIRE AFTER '5 minutes'
1414COMMENT 'test comment'
1415AS
1416SELECT max(c1), min(c2) FROM schema_2.table_2;",
1417                CreateFlowWoutQuery {
1418                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1419                    sink_table_name: ObjectName::from(vec![
1420                        Ident::new("schema_1"),
1421                        Ident::new("table_1"),
1422                    ]),
1423                    or_replace: true,
1424                    if_not_exists: true,
1425                    expire_after: Some(300),
1426                    comment: Some("test comment".to_string()),
1427                },
1428            ),
1429            (
1430                r"
1431CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1432SINK TO schema_1.table_1
1433EXPIRE AFTER '300 s'
1434COMMENT 'test comment'
1435AS
1436SELECT max(c1), min(c2) FROM schema_2.table_2;",
1437                CreateFlowWoutQuery {
1438                    flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1439                    sink_table_name: ObjectName::from(vec![
1440                        Ident::new("schema_1"),
1441                        Ident::new("table_1"),
1442                    ]),
1443                    or_replace: true,
1444                    if_not_exists: true,
1445                    expire_after: Some(300),
1446                    comment: Some("test comment".to_string()),
1447                },
1448            ),
1449            (
1450                r"
1451CREATE FLOW `task_2`
1452SINK TO schema_1.table_1
1453EXPIRE AFTER '1 month 2 days 1h 2 min'
1454AS
1455SELECT max(c1), min(c2) FROM schema_2.table_2;",
1456                CreateFlowWoutQuery {
1457                    flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_2")]),
1458                    sink_table_name: ObjectName::from(vec![
1459                        Ident::new("schema_1"),
1460                        Ident::new("table_1"),
1461                    ]),
1462                    or_replace: false,
1463                    if_not_exists: false,
1464                    expire_after: Some(86400 * 3044 / 1000 + 2 * 86400 + 3600 + 2 * 60),
1465                    comment: None,
1466                },
1467            ),
1468        ];
1469
1470        for (sql, expected) in testcases {
1471            let create_task = parse_create_flow(sql);
1472
1473            let expected = CreateFlow {
1474                flow_name: expected.flow_name,
1475                sink_table_name: expected.sink_table_name,
1476                or_replace: expected.or_replace,
1477                if_not_exists: expected.if_not_exists,
1478                expire_after: expected.expire_after,
1479                comment: expected.comment,
1480                // ignore query parse result
1481                query: create_task.query.clone(),
1482            };
1483
1484            assert_eq!(create_task, expected, "input sql is:\n{sql}");
1485            let show_create = create_task.to_string();
1486            let recreated = parse_create_flow(&show_create);
1487            assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1488        }
1489    }
1490
1491    #[test]
1492    fn test_parse_create_flow() {
1493        let sql = r"
1494CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1495SINK TO schema_1.table_1
1496EXPIRE AFTER INTERVAL '5 minutes'
1497COMMENT 'test comment'
1498AS
1499SELECT max(c1), min(c2) FROM schema_2.table_2;";
1500        let stmts =
1501            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1502                .unwrap();
1503        assert_eq!(1, stmts.len());
1504        let create_task = match &stmts[0] {
1505            Statement::CreateFlow(c) => c,
1506            _ => unreachable!(),
1507        };
1508
1509        let expected = CreateFlow {
1510            flow_name: vec![Ident::new("task_1")].into(),
1511            sink_table_name: vec![Ident::new("schema_1"), Ident::new("table_1")].into(),
1512            or_replace: true,
1513            if_not_exists: true,
1514            expire_after: Some(300),
1515            comment: Some("test comment".to_string()),
1516            // ignore query parse result
1517            query: create_task.query.clone(),
1518        };
1519        assert_eq!(create_task, &expected);
1520
1521        // create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE AFTER` and `COMMENT`
1522        let sql = r"
1523CREATE FLOW `task_2`
1524SINK TO schema_1.table_1
1525EXPIRE AFTER '1 month 2 days 1h 2 min'
1526AS
1527SELECT max(c1), min(c2) FROM schema_2.table_2;";
1528        let stmts =
1529            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1530                .unwrap();
1531        assert_eq!(1, stmts.len());
1532        let create_task = match &stmts[0] {
1533            Statement::CreateFlow(c) => c,
1534            _ => unreachable!(),
1535        };
1536        assert!(!create_task.or_replace);
1537        assert!(!create_task.if_not_exists);
1538        assert_eq!(
1539            create_task.expire_after,
1540            Some(86400 * 3044 / 1000 + 2 * 86400 + 3600 + 2 * 60)
1541        );
1542        assert!(create_task.comment.is_none());
1543        assert_eq!(create_task.flow_name.to_string(), "`task_2`");
1544    }
1545
1546    #[test]
1547    fn test_validate_create() {
1548        let sql = r"
1549CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp TIME INDEX)
1550PARTITION ON COLUMNS(c, a) (
1551    a < 10,
1552    a > 10 AND a < 20,
1553    a > 20 AND c < 100,
1554    a > 20 AND c >= 100
1555)
1556ENGINE=mito";
1557        let result =
1558            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1559        let _ = result.unwrap();
1560
1561        let sql = r"
1562CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
1563PARTITION ON COLUMNS(x) ()
1564ENGINE=mito";
1565        let result =
1566            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1567        assert!(result
1568            .unwrap_err()
1569            .to_string()
1570            .contains("Partition column \"x\" not defined"));
1571    }
1572
1573    #[test]
1574    fn test_parse_create_table_with_partitions() {
1575        let sql = r"
1576CREATE TABLE monitor (
1577  host_id    INT,
1578  idc        STRING,
1579  ts         TIMESTAMP,
1580  cpu        DOUBLE DEFAULT 0,
1581  memory     DOUBLE,
1582  TIME INDEX (ts),
1583  PRIMARY KEY (host),
1584)
1585PARTITION ON COLUMNS(idc, host_id) (
1586  idc <= 'hz' AND host_id < 1000,
1587  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
1588  idc > 'sh' AND host_id < 3000,
1589  idc > 'sh' AND host_id >= 3000,
1590)
1591ENGINE=mito";
1592        let result =
1593            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1594                .unwrap();
1595        assert_eq!(result.len(), 1);
1596        match &result[0] {
1597            Statement::CreateTable(c) => {
1598                assert!(c.partitions.is_some());
1599
1600                let partitions = c.partitions.as_ref().unwrap();
1601                let column_list = partitions
1602                    .column_list
1603                    .iter()
1604                    .map(|x| &x.value)
1605                    .collect::<Vec<&String>>();
1606                assert_eq!(column_list, vec!["idc", "host_id"]);
1607
1608                let exprs = &partitions.exprs;
1609
1610                assert_eq!(
1611                    exprs[0],
1612                    Expr::BinaryOp {
1613                        left: Box::new(Expr::BinaryOp {
1614                            left: Box::new(Expr::Identifier("idc".into())),
1615                            op: BinaryOperator::LtEq,
1616                            right: Box::new(Expr::Value(
1617                                Value::SingleQuotedString("hz".to_string()).into()
1618                            ))
1619                        }),
1620                        op: BinaryOperator::And,
1621                        right: Box::new(Expr::BinaryOp {
1622                            left: Box::new(Expr::Identifier("host_id".into())),
1623                            op: BinaryOperator::Lt,
1624                            right: Box::new(Expr::Value(
1625                                Value::Number("1000".to_string(), false).into()
1626                            ))
1627                        })
1628                    }
1629                );
1630                assert_eq!(
1631                    exprs[1],
1632                    Expr::BinaryOp {
1633                        left: Box::new(Expr::BinaryOp {
1634                            left: Box::new(Expr::BinaryOp {
1635                                left: Box::new(Expr::Identifier("idc".into())),
1636                                op: BinaryOperator::Gt,
1637                                right: Box::new(Expr::Value(
1638                                    Value::SingleQuotedString("hz".to_string()).into()
1639                                ))
1640                            }),
1641                            op: BinaryOperator::And,
1642                            right: Box::new(Expr::BinaryOp {
1643                                left: Box::new(Expr::Identifier("idc".into())),
1644                                op: BinaryOperator::LtEq,
1645                                right: Box::new(Expr::Value(
1646                                    Value::SingleQuotedString("sh".to_string()).into()
1647                                ))
1648                            })
1649                        }),
1650                        op: BinaryOperator::And,
1651                        right: Box::new(Expr::BinaryOp {
1652                            left: Box::new(Expr::Identifier("host_id".into())),
1653                            op: BinaryOperator::Lt,
1654                            right: Box::new(Expr::Value(
1655                                Value::Number("2000".to_string(), false).into()
1656                            ))
1657                        })
1658                    }
1659                );
1660                assert_eq!(
1661                    exprs[2],
1662                    Expr::BinaryOp {
1663                        left: Box::new(Expr::BinaryOp {
1664                            left: Box::new(Expr::Identifier("idc".into())),
1665                            op: BinaryOperator::Gt,
1666                            right: Box::new(Expr::Value(
1667                                Value::SingleQuotedString("sh".to_string()).into()
1668                            ))
1669                        }),
1670                        op: BinaryOperator::And,
1671                        right: Box::new(Expr::BinaryOp {
1672                            left: Box::new(Expr::Identifier("host_id".into())),
1673                            op: BinaryOperator::Lt,
1674                            right: Box::new(Expr::Value(
1675                                Value::Number("3000".to_string(), false).into()
1676                            ))
1677                        })
1678                    }
1679                );
1680                assert_eq!(
1681                    exprs[3],
1682                    Expr::BinaryOp {
1683                        left: Box::new(Expr::BinaryOp {
1684                            left: Box::new(Expr::Identifier("idc".into())),
1685                            op: BinaryOperator::Gt,
1686                            right: Box::new(Expr::Value(
1687                                Value::SingleQuotedString("sh".to_string()).into()
1688                            ))
1689                        }),
1690                        op: BinaryOperator::And,
1691                        right: Box::new(Expr::BinaryOp {
1692                            left: Box::new(Expr::Identifier("host_id".into())),
1693                            op: BinaryOperator::GtEq,
1694                            right: Box::new(Expr::Value(
1695                                Value::Number("3000".to_string(), false).into()
1696                            ))
1697                        })
1698                    }
1699                );
1700            }
1701            _ => unreachable!(),
1702        }
1703    }
1704
1705    #[test]
1706    fn test_parse_create_table_with_quoted_partitions() {
1707        let sql = r"
1708CREATE TABLE monitor (
1709  `host_id`    INT,
1710  idc        STRING,
1711  ts         TIMESTAMP,
1712  cpu        DOUBLE DEFAULT 0,
1713  memory     DOUBLE,
1714  TIME INDEX (ts),
1715  PRIMARY KEY (host),
1716)
1717PARTITION ON COLUMNS(IdC, host_id) (
1718  idc <= 'hz' AND host_id < 1000,
1719  idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
1720  idc > 'sh' AND host_id < 3000,
1721  idc > 'sh' AND host_id >= 3000,
1722)";
1723        let result =
1724            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1725                .unwrap();
1726        assert_eq!(result.len(), 1);
1727    }
1728
1729    #[test]
1730    fn test_parse_create_table_with_timestamp_index() {
1731        let sql1 = r"
1732CREATE TABLE monitor (
1733  host_id    INT,
1734  idc        STRING,
1735  ts         TIMESTAMP TIME INDEX,
1736  cpu        DOUBLE DEFAULT 0,
1737  memory     DOUBLE,
1738  PRIMARY KEY (host),
1739)
1740ENGINE=mito";
1741        let result1 = ParserContext::create_with_dialect(
1742            sql1,
1743            &GreptimeDbDialect {},
1744            ParseOptions::default(),
1745        )
1746        .unwrap();
1747
1748        if let Statement::CreateTable(c) = &result1[0] {
1749            assert_eq!(c.constraints.len(), 2);
1750            let tc = c.constraints[0].clone();
1751            match tc {
1752                TableConstraint::TimeIndex { column } => {
1753                    assert_eq!(&column.value, "ts");
1754                }
1755                _ => panic!("should be time index constraint"),
1756            };
1757        } else {
1758            panic!("should be create_table statement");
1759        }
1760
1761        // `TIME INDEX` should be in front of `PRIMARY KEY`
1762        // in order to equal the `TIMESTAMP TIME INDEX` constraint options vector
1763        let sql2 = r"
1764CREATE TABLE monitor (
1765  host_id    INT,
1766  idc        STRING,
1767  ts         TIMESTAMP NOT NULL,
1768  cpu        DOUBLE DEFAULT 0,
1769  memory     DOUBLE,
1770  TIME INDEX (ts),
1771  PRIMARY KEY (host),
1772)
1773ENGINE=mito";
1774        let result2 = ParserContext::create_with_dialect(
1775            sql2,
1776            &GreptimeDbDialect {},
1777            ParseOptions::default(),
1778        )
1779        .unwrap();
1780
1781        assert_eq!(result1, result2);
1782
1783        // TIMESTAMP can be NULL which is not equal to above
1784        let sql3 = r"
1785CREATE TABLE monitor (
1786  host_id    INT,
1787  idc        STRING,
1788  ts         TIMESTAMP,
1789  cpu        DOUBLE DEFAULT 0,
1790  memory     DOUBLE,
1791  TIME INDEX (ts),
1792  PRIMARY KEY (host),
1793)
1794ENGINE=mito";
1795
1796        let result3 = ParserContext::create_with_dialect(
1797            sql3,
1798            &GreptimeDbDialect {},
1799            ParseOptions::default(),
1800        )
1801        .unwrap();
1802
1803        assert_ne!(result1, result3);
1804
1805        // BIGINT can't be time index any more
1806        let sql1 = r"
1807CREATE TABLE monitor (
1808  host_id    INT,
1809  idc        STRING,
1810  b          bigint TIME INDEX,
1811  cpu        DOUBLE DEFAULT 0,
1812  memory     DOUBLE,
1813  PRIMARY KEY (host),
1814)
1815ENGINE=mito";
1816        let result1 = ParserContext::create_with_dialect(
1817            sql1,
1818            &GreptimeDbDialect {},
1819            ParseOptions::default(),
1820        );
1821
1822        assert!(result1
1823            .unwrap_err()
1824            .to_string()
1825            .contains("time index column data type should be timestamp"));
1826    }
1827
1828    #[test]
1829    fn test_parse_create_table_with_timestamp_index_not_null() {
1830        let sql = r"
1831CREATE TABLE monitor (
1832  host_id    INT,
1833  idc        STRING,
1834  ts         TIMESTAMP TIME INDEX,
1835  cpu        DOUBLE DEFAULT 0,
1836  memory     DOUBLE,
1837  TIME INDEX (ts),
1838  PRIMARY KEY (host),
1839)
1840ENGINE=mito";
1841        let result =
1842            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1843                .unwrap();
1844
1845        assert_eq!(result.len(), 1);
1846        if let Statement::CreateTable(c) = &result[0] {
1847            let ts = c.columns[2].clone();
1848            assert_eq!(ts.name().to_string(), "ts");
1849            assert_eq!(ts.options()[0].option, NotNull);
1850        } else {
1851            panic!("should be create table statement");
1852        }
1853
1854        let sql1 = r"
1855CREATE TABLE monitor (
1856  host_id    INT,
1857  idc        STRING,
1858  ts         TIMESTAMP NOT NULL TIME INDEX,
1859  cpu        DOUBLE DEFAULT 0,
1860  memory     DOUBLE,
1861  TIME INDEX (ts),
1862  PRIMARY KEY (host),
1863)
1864ENGINE=mito";
1865
1866        let result1 = ParserContext::create_with_dialect(
1867            sql1,
1868            &GreptimeDbDialect {},
1869            ParseOptions::default(),
1870        )
1871        .unwrap();
1872        assert_eq!(result, result1);
1873
1874        let sql2 = r"
1875CREATE TABLE monitor (
1876  host_id    INT,
1877  idc        STRING,
1878  ts         TIMESTAMP TIME INDEX NOT NULL,
1879  cpu        DOUBLE DEFAULT 0,
1880  memory     DOUBLE,
1881  TIME INDEX (ts),
1882  PRIMARY KEY (host),
1883)
1884ENGINE=mito";
1885
1886        let result2 = ParserContext::create_with_dialect(
1887            sql2,
1888            &GreptimeDbDialect {},
1889            ParseOptions::default(),
1890        )
1891        .unwrap();
1892        assert_eq!(result, result2);
1893
1894        let sql3 = r"
1895CREATE TABLE monitor (
1896  host_id    INT,
1897  idc        STRING,
1898  ts         TIMESTAMP TIME INDEX NULL NOT,
1899  cpu        DOUBLE DEFAULT 0,
1900  memory     DOUBLE,
1901  TIME INDEX (ts),
1902  PRIMARY KEY (host),
1903)
1904ENGINE=mito";
1905
1906        let result3 = ParserContext::create_with_dialect(
1907            sql3,
1908            &GreptimeDbDialect {},
1909            ParseOptions::default(),
1910        );
1911        assert!(result3.is_err());
1912
1913        let sql4 = r"
1914CREATE TABLE monitor (
1915  host_id    INT,
1916  idc        STRING,
1917  ts         TIMESTAMP TIME INDEX NOT NULL NULL,
1918  cpu        DOUBLE DEFAULT 0,
1919  memory     DOUBLE,
1920  TIME INDEX (ts),
1921  PRIMARY KEY (host),
1922)
1923ENGINE=mito";
1924
1925        let result4 = ParserContext::create_with_dialect(
1926            sql4,
1927            &GreptimeDbDialect {},
1928            ParseOptions::default(),
1929        );
1930        assert!(result4.is_err());
1931
1932        let sql = r"
1933CREATE TABLE monitor (
1934  host_id    INT,
1935  idc        STRING,
1936  ts         TIMESTAMP TIME INDEX DEFAULT CURRENT_TIMESTAMP,
1937  cpu        DOUBLE DEFAULT 0,
1938  memory     DOUBLE,
1939  TIME INDEX (ts),
1940  PRIMARY KEY (host),
1941)
1942ENGINE=mito";
1943
1944        let result =
1945            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1946                .unwrap();
1947
1948        if let Statement::CreateTable(c) = &result[0] {
1949            let tc = c.constraints[0].clone();
1950            match tc {
1951                TableConstraint::TimeIndex { column } => {
1952                    assert_eq!(&column.value, "ts");
1953                }
1954                _ => panic!("should be time index constraint"),
1955            }
1956            let ts = c.columns[2].clone();
1957            assert_eq!(ts.name().to_string(), "ts");
1958            assert!(matches!(ts.options()[0].option, ColumnOption::Default(..)));
1959            assert_eq!(ts.options()[1].option, NotNull);
1960        } else {
1961            unreachable!("should be create table statement");
1962        }
1963    }
1964
1965    #[test]
1966    fn test_parse_partitions_with_error_syntax() {
1967        let sql = r"
1968CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
1969PARTITION COLUMNS(c, a) (
1970    a < 10,
1971    a > 10 AND a < 20,
1972    a > 20 AND c < 100,
1973    a > 20 AND c >= 100
1974)
1975ENGINE=mito";
1976        let result =
1977            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1978        assert!(result
1979            .unwrap_err()
1980            .output_msg()
1981            .contains("sql parser error: Expected: ON, found: COLUMNS"));
1982    }
1983
1984    #[test]
1985    fn test_parse_partitions_without_rule() {
1986        let sql = r"
1987CREATE TABLE rcx ( a INT, b STRING, c INT, d TIMESTAMP TIME INDEX )
1988PARTITION ON COLUMNS(c, a) ()
1989ENGINE=mito";
1990        ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1991            .unwrap();
1992    }
1993
1994    #[test]
1995    fn test_parse_partitions_unreferenced_column() {
1996        let sql = r"
1997CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
1998PARTITION ON COLUMNS(c, a) (
1999    b = 'foo'
2000)
2001ENGINE=mito";
2002        let result =
2003            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2004        assert_eq!(
2005            result.unwrap_err().output_msg(),
2006            "Invalid SQL, error: Column \"b\" in rule expr is not referenced in PARTITION ON"
2007        );
2008    }
2009
2010    #[test]
2011    fn test_parse_partitions_not_binary_expr() {
2012        let sql = r"
2013CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2014PARTITION ON COLUMNS(c, a) (
2015    b
2016)
2017ENGINE=mito";
2018        let result =
2019            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2020        assert_eq!(
2021            result.unwrap_err().output_msg(),
2022            r#"Invalid SQL, error: Partition rule expr Identifier(Ident { value: "b", quote_style: None, span: Span(Location(4,5)..Location(4,6)) }) is not a binary expr"#
2023        );
2024    }
2025
2026    fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {
2027        assert_eq!(column.name.to_string(), name);
2028        assert_eq!(column.data_type.to_string(), data_type);
2029    }
2030
2031    #[test]
2032    pub fn test_parse_create_table() {
2033        let sql = r"create table demo(
2034                             host string,
2035                             ts timestamp,
2036                             cpu float32 default 0,
2037                             memory float64,
2038                             TIME INDEX (ts),
2039                             PRIMARY KEY(ts, host),
2040                             ) engine=mito
2041                             with(ttl='10s');
2042         ";
2043        let result =
2044            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2045                .unwrap();
2046        assert_eq!(1, result.len());
2047        match &result[0] {
2048            Statement::CreateTable(c) => {
2049                assert!(!c.if_not_exists);
2050                assert_eq!("demo", c.name.to_string());
2051                assert_eq!("mito", c.engine);
2052                assert_eq!(4, c.columns.len());
2053                let columns = &c.columns;
2054                assert_column_def(&columns[0].column_def, "host", "STRING");
2055                assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
2056                assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
2057                assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
2058
2059                let constraints = &c.constraints;
2060                assert_eq!(
2061                    &constraints[0],
2062                    &TableConstraint::TimeIndex {
2063                        column: Ident::new("ts"),
2064                    }
2065                );
2066                assert_eq!(
2067                    &constraints[1],
2068                    &TableConstraint::PrimaryKey {
2069                        columns: vec![Ident::new("ts"), Ident::new("host")]
2070                    }
2071                );
2072                // inverted index is merged into column options
2073                assert_eq!(1, c.options.len());
2074                assert_eq!(
2075                    [("ttl", "10s")].into_iter().collect::<HashMap<_, _>>(),
2076                    c.options.to_str_map()
2077                );
2078            }
2079            _ => unreachable!(),
2080        }
2081    }
2082
2083    #[test]
2084    fn test_invalid_index_keys() {
2085        let sql = r"create table demo(
2086                             host string,
2087                             ts int64,
2088                             cpu float64 default 0,
2089                             memory float64,
2090                             TIME INDEX (ts, host),
2091                             PRIMARY KEY(ts, host)) engine=mito;
2092         ";
2093        let result =
2094            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2095        assert!(result.is_err());
2096        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2097    }
2098
2099    #[test]
2100    fn test_duplicated_time_index() {
2101        let sql = r"create table demo(
2102                             host string,
2103                             ts timestamp time index,
2104                             t timestamp time index,
2105                             cpu float64 default 0,
2106                             memory float64,
2107                             TIME INDEX (ts, host),
2108                             PRIMARY KEY(ts, host)) engine=mito;
2109         ";
2110        let result =
2111            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2112        assert!(result.is_err());
2113        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2114
2115        let sql = r"create table demo(
2116                             host string,
2117                             ts timestamp time index,
2118                             cpu float64 default 0,
2119                             t timestamp,
2120                             memory float64,
2121                             TIME INDEX (t),
2122                             PRIMARY KEY(ts, host)) engine=mito;
2123         ";
2124        let result =
2125            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2126        assert!(result.is_err());
2127        assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2128    }
2129
2130    #[test]
2131    fn test_invalid_column_name() {
2132        let sql = "create table foo(user string, i timestamp time index)";
2133        let result =
2134            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2135        let err = result.unwrap_err().output_msg();
2136        assert!(err.contains("Cannot use keyword 'user' as column name"));
2137
2138        // If column name is quoted, it's valid even same with keyword.
2139        let sql = r#"
2140            create table foo("user" string, i timestamp time index)
2141        "#;
2142        let result =
2143            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2144        let _ = result.unwrap();
2145    }
2146
2147    #[test]
2148    fn test_incorrect_default_value_issue_3479() {
2149        let sql = r#"CREATE TABLE `ExcePTuRi`(
2150non TIMESTAMP(6) TIME INDEX,
2151`iUSTO` DOUBLE DEFAULT 0.047318541668048164
2152)"#;
2153        let result =
2154            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2155                .unwrap();
2156        assert_eq!(1, result.len());
2157        match &result[0] {
2158            Statement::CreateTable(c) => {
2159                assert_eq!(
2160                    "`iUSTO` DOUBLE DEFAULT 0.047318541668048164",
2161                    c.columns[1].to_string()
2162                );
2163            }
2164            _ => unreachable!(),
2165        }
2166    }
2167
2168    #[test]
2169    fn test_parse_create_view() {
2170        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
2171
2172        let result =
2173            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2174                .unwrap();
2175        match &result[0] {
2176            Statement::CreateView(c) => {
2177                assert_eq!(c.to_string(), sql);
2178                assert!(!c.or_replace);
2179                assert!(!c.if_not_exists);
2180                assert_eq!("test", c.name.to_string());
2181            }
2182            _ => unreachable!(),
2183        }
2184
2185        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS";
2186
2187        let result =
2188            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2189                .unwrap();
2190        match &result[0] {
2191            Statement::CreateView(c) => {
2192                assert_eq!(c.to_string(), sql);
2193                assert!(c.or_replace);
2194                assert!(c.if_not_exists);
2195                assert_eq!("test", c.name.to_string());
2196            }
2197            _ => unreachable!(),
2198        }
2199    }
2200
2201    #[test]
2202    fn test_parse_create_view_invalid_query() {
2203        let sql = "CREATE VIEW test AS DELETE from demo";
2204        let result =
2205            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2206        assert!(result.is_err());
2207        assert_matches!(result, Err(crate::error::Error::Syntax { .. }));
2208    }
2209
2210    #[test]
2211    fn test_parse_create_table_fulltext_options() {
2212        let sql1 = r"
2213CREATE TABLE log (
2214    ts TIMESTAMP TIME INDEX,
2215    msg TEXT FULLTEXT INDEX,
2216)";
2217        let result1 = ParserContext::create_with_dialect(
2218            sql1,
2219            &GreptimeDbDialect {},
2220            ParseOptions::default(),
2221        )
2222        .unwrap();
2223
2224        if let Statement::CreateTable(c) = &result1[0] {
2225            c.columns.iter().for_each(|col| {
2226                if col.name().value == "msg" {
2227                    assert!(col
2228                        .extensions
2229                        .fulltext_index_options
2230                        .as_ref()
2231                        .unwrap()
2232                        .is_empty());
2233                }
2234            });
2235        } else {
2236            panic!("should be create_table statement");
2237        }
2238
2239        let sql2 = r"
2240CREATE TABLE log (
2241    ts TIMESTAMP TIME INDEX,
2242    msg STRING FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false')
2243)";
2244        let result2 = ParserContext::create_with_dialect(
2245            sql2,
2246            &GreptimeDbDialect {},
2247            ParseOptions::default(),
2248        )
2249        .unwrap();
2250
2251        if let Statement::CreateTable(c) = &result2[0] {
2252            c.columns.iter().for_each(|col| {
2253                if col.name().value == "msg" {
2254                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2255                    assert_eq!(options.len(), 2);
2256                    assert_eq!(options.get("analyzer").unwrap(), "English");
2257                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2258                }
2259            });
2260        } else {
2261            panic!("should be create_table statement");
2262        }
2263
2264        let sql3 = r"
2265CREATE TABLE log (
2266    ts TIMESTAMP TIME INDEX,
2267    msg1 TINYTEXT FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false'),
2268    msg2 CHAR(20) FULLTEXT INDEX WITH (analyzer='Chinese', case_sensitive='true')
2269)";
2270        let result3 = ParserContext::create_with_dialect(
2271            sql3,
2272            &GreptimeDbDialect {},
2273            ParseOptions::default(),
2274        )
2275        .unwrap();
2276
2277        if let Statement::CreateTable(c) = &result3[0] {
2278            c.columns.iter().for_each(|col| {
2279                if col.name().value == "msg1" {
2280                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2281                    assert_eq!(options.len(), 2);
2282                    assert_eq!(options.get("analyzer").unwrap(), "English");
2283                    assert_eq!(options.get("case_sensitive").unwrap(), "false");
2284                } else if col.name().value == "msg2" {
2285                    let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2286                    assert_eq!(options.len(), 2);
2287                    assert_eq!(options.get("analyzer").unwrap(), "Chinese");
2288                    assert_eq!(options.get("case_sensitive").unwrap(), "true");
2289                }
2290            });
2291        } else {
2292            panic!("should be create_table statement");
2293        }
2294    }
2295
2296    #[test]
2297    fn test_parse_create_table_fulltext_options_invalid_type() {
2298        let sql = r"
2299CREATE TABLE log (
2300    ts TIMESTAMP TIME INDEX,
2301    msg INT FULLTEXT INDEX,
2302)";
2303        let result =
2304            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2305        assert!(result.is_err());
2306        assert!(result
2307            .unwrap_err()
2308            .to_string()
2309            .contains("FULLTEXT index only supports string type"));
2310    }
2311
2312    #[test]
2313    fn test_parse_create_table_fulltext_options_duplicate() {
2314        let sql = r"
2315CREATE TABLE log (
2316    ts TIMESTAMP TIME INDEX,
2317    msg STRING FULLTEXT INDEX WITH (analyzer='English', analyzer='Chinese') FULLTEXT INDEX WITH (case_sensitive='false')
2318)";
2319        let result =
2320            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2321        assert!(result.is_err());
2322        assert!(result
2323            .unwrap_err()
2324            .to_string()
2325            .contains("duplicated FULLTEXT INDEX option"));
2326    }
2327
2328    #[test]
2329    fn test_parse_create_table_fulltext_options_invalid_option() {
2330        let sql = r"
2331CREATE TABLE log (
2332    ts TIMESTAMP TIME INDEX,
2333    msg STRING FULLTEXT INDEX WITH (analyzer='English', invalid_option='Chinese')
2334)";
2335        let result =
2336            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2337        assert!(result.is_err());
2338        assert!(result
2339            .unwrap_err()
2340            .to_string()
2341            .contains("invalid FULLTEXT INDEX option"));
2342    }
2343
2344    #[test]
2345    fn test_parse_create_table_skip_options() {
2346        let sql = r"
2347CREATE TABLE log (
2348    ts TIMESTAMP TIME INDEX,
2349    msg INT SKIPPING INDEX WITH (granularity='8192', type='bloom'),
2350)";
2351        let result =
2352            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2353                .unwrap();
2354
2355        if let Statement::CreateTable(c) = &result[0] {
2356            c.columns.iter().for_each(|col| {
2357                if col.name().value == "msg" {
2358                    assert!(!col
2359                        .extensions
2360                        .skipping_index_options
2361                        .as_ref()
2362                        .unwrap()
2363                        .is_empty());
2364                }
2365            });
2366        } else {
2367            panic!("should be create_table statement");
2368        }
2369
2370        let sql = r"
2371        CREATE TABLE log (
2372            ts TIMESTAMP TIME INDEX,
2373            msg INT SKIPPING INDEX,
2374        )";
2375        let result =
2376            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2377                .unwrap();
2378
2379        if let Statement::CreateTable(c) = &result[0] {
2380            c.columns.iter().for_each(|col| {
2381                if col.name().value == "msg" {
2382                    assert!(col
2383                        .extensions
2384                        .skipping_index_options
2385                        .as_ref()
2386                        .unwrap()
2387                        .is_empty());
2388                }
2389            });
2390        } else {
2391            panic!("should be create_table statement");
2392        }
2393    }
2394
2395    #[test]
2396    fn test_parse_create_view_with_columns() {
2397        let sql = "CREATE VIEW test () AS SELECT * FROM NUMBERS";
2398        let result =
2399            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2400                .unwrap();
2401
2402        match &result[0] {
2403            Statement::CreateView(c) => {
2404                assert_eq!(c.to_string(), "CREATE VIEW test AS SELECT * FROM NUMBERS");
2405                assert!(!c.or_replace);
2406                assert!(!c.if_not_exists);
2407                assert_eq!("test", c.name.to_string());
2408            }
2409            _ => unreachable!(),
2410        }
2411        assert_eq!(
2412            "CREATE VIEW test AS SELECT * FROM NUMBERS",
2413            result[0].to_string()
2414        );
2415
2416        let sql = "CREATE VIEW test (n1) AS SELECT * FROM NUMBERS";
2417        let result =
2418            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2419                .unwrap();
2420
2421        match &result[0] {
2422            Statement::CreateView(c) => {
2423                assert_eq!(c.to_string(), sql);
2424                assert!(!c.or_replace);
2425                assert!(!c.if_not_exists);
2426                assert_eq!("test", c.name.to_string());
2427            }
2428            _ => unreachable!(),
2429        }
2430        assert_eq!(sql, result[0].to_string());
2431
2432        let sql = "CREATE VIEW test (n1, n2) AS SELECT * FROM NUMBERS";
2433        let result =
2434            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2435                .unwrap();
2436
2437        match &result[0] {
2438            Statement::CreateView(c) => {
2439                assert_eq!(c.to_string(), sql);
2440                assert!(!c.or_replace);
2441                assert!(!c.if_not_exists);
2442                assert_eq!("test", c.name.to_string());
2443            }
2444            _ => unreachable!(),
2445        }
2446        assert_eq!(sql, result[0].to_string());
2447
2448        // Some invalid syntax cases
2449        let sql = "CREATE VIEW test (n1 AS select * from demo";
2450        let result =
2451            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2452        assert!(result.is_err());
2453
2454        let sql = "CREATE VIEW test (n1, AS select * from demo";
2455        let result =
2456            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2457        assert!(result.is_err());
2458
2459        let sql = "CREATE VIEW test n1,n2) AS select * from demo";
2460        let result =
2461            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2462        assert!(result.is_err());
2463
2464        let sql = "CREATE VIEW test (1) AS select * from demo";
2465        let result =
2466            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2467        assert!(result.is_err());
2468
2469        // keyword
2470        let sql = "CREATE VIEW test (n1, select) AS select * from demo";
2471        let result =
2472            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2473        assert!(result.is_err());
2474    }
2475
2476    #[test]
2477    fn test_parse_column_extensions_vector() {
2478        let sql = "VECTOR(128)";
2479        let dialect = GenericDialect {};
2480        let mut tokenizer = Tokenizer::new(&dialect, sql);
2481        let tokens = tokenizer.tokenize().unwrap();
2482        let mut parser = Parser::new(&dialect).with_tokens(tokens);
2483        let name = Ident::new("vec_col");
2484        let data_type =
2485            DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
2486        let mut extensions = ColumnExtensions::default();
2487
2488        let result =
2489            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2490        assert!(result.is_ok());
2491        assert!(extensions.vector_options.is_some());
2492        let vector_options = extensions.vector_options.unwrap();
2493        assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some(&"128".to_string()));
2494    }
2495
2496    #[test]
2497    fn test_parse_column_extensions_vector_invalid() {
2498        let sql = "VECTOR()";
2499        let dialect = GenericDialect {};
2500        let mut tokenizer = Tokenizer::new(&dialect, sql);
2501        let tokens = tokenizer.tokenize().unwrap();
2502        let mut parser = Parser::new(&dialect).with_tokens(tokens);
2503        let name = Ident::new("vec_col");
2504        let data_type = DataType::Custom(vec![Ident::new("VECTOR")].into(), vec![]);
2505        let mut extensions = ColumnExtensions::default();
2506
2507        let result =
2508            ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2509        assert!(result.is_err());
2510    }
2511
2512    #[test]
2513    fn test_parse_column_extensions_indices() {
2514        // Test skipping index
2515        {
2516            let sql = "SKIPPING INDEX";
2517            let dialect = GenericDialect {};
2518            let mut tokenizer = Tokenizer::new(&dialect, sql);
2519            let tokens = tokenizer.tokenize().unwrap();
2520            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2521            let name = Ident::new("col");
2522            let data_type = DataType::String(None);
2523            let mut extensions = ColumnExtensions::default();
2524            let result = ParserContext::parse_column_extensions(
2525                &mut parser,
2526                &name,
2527                &data_type,
2528                &mut extensions,
2529            );
2530            assert!(result.is_ok());
2531            assert!(extensions.skipping_index_options.is_some());
2532        }
2533
2534        // Test fulltext index with options
2535        {
2536            let sql = "FULLTEXT INDEX WITH (analyzer = 'English', case_sensitive = 'true')";
2537            let dialect = GenericDialect {};
2538            let mut tokenizer = Tokenizer::new(&dialect, sql);
2539            let tokens = tokenizer.tokenize().unwrap();
2540            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2541            let name = Ident::new("text_col");
2542            let data_type = DataType::String(None);
2543            let mut extensions = ColumnExtensions::default();
2544            let result = ParserContext::parse_column_extensions(
2545                &mut parser,
2546                &name,
2547                &data_type,
2548                &mut extensions,
2549            );
2550            assert!(result.unwrap());
2551            assert!(extensions.fulltext_index_options.is_some());
2552            let fulltext_options = extensions.fulltext_index_options.unwrap();
2553            assert_eq!(
2554                fulltext_options.get("analyzer"),
2555                Some(&"English".to_string())
2556            );
2557            assert_eq!(
2558                fulltext_options.get("case_sensitive"),
2559                Some(&"true".to_string())
2560            );
2561        }
2562
2563        // Test fulltext index with invalid type (should fail)
2564        {
2565            let sql = "FULLTEXT INDEX WITH (analyzer = 'English')";
2566            let dialect = GenericDialect {};
2567            let mut tokenizer = Tokenizer::new(&dialect, sql);
2568            let tokens = tokenizer.tokenize().unwrap();
2569            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2570            let name = Ident::new("num_col");
2571            let data_type = DataType::Int(None); // Non-string type
2572            let mut extensions = ColumnExtensions::default();
2573            let result = ParserContext::parse_column_extensions(
2574                &mut parser,
2575                &name,
2576                &data_type,
2577                &mut extensions,
2578            );
2579            assert!(result.is_err());
2580            assert!(result
2581                .unwrap_err()
2582                .to_string()
2583                .contains("FULLTEXT index only supports string type"));
2584        }
2585
2586        // Test fulltext index with invalid option (won't fail, the parser doesn't check the option's content)
2587        {
2588            let sql = "FULLTEXT INDEX WITH (analyzer = 'Invalid', case_sensitive = 'true')";
2589            let dialect = GenericDialect {};
2590            let mut tokenizer = Tokenizer::new(&dialect, sql);
2591            let tokens = tokenizer.tokenize().unwrap();
2592            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2593            let name = Ident::new("text_col");
2594            let data_type = DataType::String(None);
2595            let mut extensions = ColumnExtensions::default();
2596            let result = ParserContext::parse_column_extensions(
2597                &mut parser,
2598                &name,
2599                &data_type,
2600                &mut extensions,
2601            );
2602            assert!(result.unwrap());
2603        }
2604
2605        // Test inverted index
2606        {
2607            let sql = "INVERTED INDEX";
2608            let dialect = GenericDialect {};
2609            let mut tokenizer = Tokenizer::new(&dialect, sql);
2610            let tokens = tokenizer.tokenize().unwrap();
2611            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2612            let name = Ident::new("col");
2613            let data_type = DataType::String(None);
2614            let mut extensions = ColumnExtensions::default();
2615            let result = ParserContext::parse_column_extensions(
2616                &mut parser,
2617                &name,
2618                &data_type,
2619                &mut extensions,
2620            );
2621            assert!(result.is_ok());
2622            assert!(extensions.inverted_index_options.is_some());
2623        }
2624
2625        // Test inverted index with options (should fail)
2626        {
2627            let sql = "INVERTED INDEX WITH (analyzer = 'English')";
2628            let dialect = GenericDialect {};
2629            let mut tokenizer = Tokenizer::new(&dialect, sql);
2630            let tokens = tokenizer.tokenize().unwrap();
2631            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2632            let name = Ident::new("col");
2633            let data_type = DataType::String(None);
2634            let mut extensions = ColumnExtensions::default();
2635            let result = ParserContext::parse_column_extensions(
2636                &mut parser,
2637                &name,
2638                &data_type,
2639                &mut extensions,
2640            );
2641            assert!(result.is_err());
2642            assert!(result
2643                .unwrap_err()
2644                .to_string()
2645                .contains("INVERTED index doesn't support options"));
2646        }
2647
2648        // Test multiple indices
2649        {
2650            let sql = "SKIPPING INDEX FULLTEXT INDEX";
2651            let dialect = GenericDialect {};
2652            let mut tokenizer = Tokenizer::new(&dialect, sql);
2653            let tokens = tokenizer.tokenize().unwrap();
2654            let mut parser = Parser::new(&dialect).with_tokens(tokens);
2655            let name = Ident::new("col");
2656            let data_type = DataType::String(None);
2657            let mut extensions = ColumnExtensions::default();
2658            let result = ParserContext::parse_column_extensions(
2659                &mut parser,
2660                &name,
2661                &data_type,
2662                &mut extensions,
2663            );
2664            assert!(result.unwrap());
2665            assert!(extensions.skipping_index_options.is_some());
2666            assert!(extensions.fulltext_index_options.is_some());
2667        }
2668    }
2669
2670    #[test]
2671    fn test_parse_interval_cast() {
2672        let s = "select '10s'::INTERVAL";
2673        let stmts =
2674            ParserContext::create_with_dialect(s, &GreptimeDbDialect {}, ParseOptions::default())
2675                .unwrap();
2676        assert_eq!("SELECT '10 seconds'::INTERVAL", &stmts[0].to_string());
2677    }
2678}