1mod json;
16#[cfg(feature = "enterprise")]
17pub mod trigger;
18
19use std::collections::HashMap;
20
21use arrow_buffer::IntervalMonthDayNano;
22use common_catalog::consts::default_engine;
23use datafusion_common::ScalarValue;
24use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
25use datatypes::data_type::ConcreteDataType;
26use itertools::Itertools;
27use snafu::{OptionExt, ResultExt, ensure};
28use sqlparser::ast::{
29 ColumnOption, ColumnOptionDef, DataType, Expr, KeyOrIndexDisplay, NullsDistinctOption,
30 PrimaryKeyConstraint, UniqueConstraint,
31};
32use sqlparser::dialect::keywords::Keyword;
33use sqlparser::keywords::ALL_KEYWORDS;
34use sqlparser::parser::IsOptional::Mandatory;
35use sqlparser::parser::{Parser, ParserError};
36use sqlparser::tokenizer::{Token, TokenWithSpan, Word};
37use table::requests::validate_database_option;
38
39use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
40use crate::error::{
41 self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidFlowQuerySnafu,
42 InvalidIntervalSnafu, InvalidSqlSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result,
43 SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu,
44};
45use crate::parser::{FLOW, ParserContext};
46use crate::parsers::tql_parser;
47use crate::parsers::utils::{
48 self, parse_with_options, validate_column_fulltext_create_option,
49 validate_column_skipping_index_create_option, validate_column_vector_index_create_option,
50};
51use crate::statements::create::{
52 Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
53 CreateTableLike, CreateView, Partitions, SqlOrTql, TableConstraint, VECTOR_OPT_DIM,
54};
55use crate::statements::statement::Statement;
56use crate::statements::transform::type_alias::get_data_type_by_alias_name;
57use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
58use crate::util::{OptionValue, location_to_index, parse_option_string};
59
60pub const ENGINE: &str = "ENGINE";
61pub const MAXVALUE: &str = "MAXVALUE";
62pub const SINK: &str = "SINK";
63pub const EXPIRE: &str = "EXPIRE";
64pub const AFTER: &str = "AFTER";
65pub const INVERTED: &str = "INVERTED";
66pub const SKIPPING: &str = "SKIPPING";
67pub const VECTOR: &str = "VECTOR";
68
69pub type RawIntervalExpr = String;
70
71fn flow_option_map(options: HashMap<String, OptionValue>) -> OptionMap {
75 let mut flow_options = OptionMap::default();
76 for (key, value) in options {
77 flow_options.insert_options(&key, value);
78 }
79 flow_options
80}
81
82impl<'a> ParserContext<'a> {
84 pub(crate) fn parse_create(&mut self) -> Result<Statement> {
85 match self.parser.peek_token().token {
86 Token::Word(w) => match w.keyword {
87 Keyword::TABLE => self.parse_create_table(),
88
89 Keyword::SCHEMA | Keyword::DATABASE => self.parse_create_database(),
90
91 Keyword::EXTERNAL => self.parse_create_external_table(),
92
93 Keyword::OR => {
94 let _ = self.parser.next_token();
95 self.parser
96 .expect_keyword(Keyword::REPLACE)
97 .context(SyntaxSnafu)?;
98 match self.parser.next_token().token {
99 Token::Word(w) => match w.keyword {
100 Keyword::VIEW => self.parse_create_view(true),
101 Keyword::NoKeyword => {
102 let uppercase = w.value.to_uppercase();
103 match uppercase.as_str() {
104 FLOW => self.parse_create_flow(true),
105 _ => self.unsupported(w.to_string()),
106 }
107 }
108 _ => self.unsupported(w.to_string()),
109 },
110 _ => self.unsupported(w.to_string()),
111 }
112 }
113
114 Keyword::VIEW => {
115 let _ = self.parser.next_token();
116 self.parse_create_view(false)
117 }
118
119 #[cfg(feature = "enterprise")]
120 Keyword::TRIGGER => {
121 let _ = self.parser.next_token();
122 self.parse_create_trigger()
123 }
124
125 Keyword::NoKeyword => {
126 let _ = self.parser.next_token();
127 let uppercase = w.value.to_uppercase();
128 match uppercase.as_str() {
129 FLOW => self.parse_create_flow(false),
130 _ => self.unsupported(w.to_string()),
131 }
132 }
133 _ => self.unsupported(w.to_string()),
134 },
135 unexpected => self.unsupported(unexpected.to_string()),
136 }
137 }
138
139 fn parse_create_view(&mut self, or_replace: bool) -> Result<Statement> {
141 let if_not_exists = self.parse_if_not_exist()?;
142 let view_name = self.intern_parse_table_name()?;
143
144 let columns = self.parse_view_columns()?;
145
146 self.parser
147 .expect_keyword(Keyword::AS)
148 .context(SyntaxSnafu)?;
149
150 let query = self.parse_query()?;
151
152 Ok(Statement::CreateView(CreateView {
153 name: view_name,
154 columns,
155 or_replace,
156 query: Box::new(query),
157 if_not_exists,
158 }))
159 }
160
161 fn parse_view_columns(&mut self) -> Result<Vec<Ident>> {
162 let mut columns = vec![];
163 if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
164 return Ok(columns);
165 }
166
167 loop {
168 let name = self.parse_column_name().context(SyntaxSnafu)?;
169
170 columns.push(name);
171
172 let comma = self.parser.consume_token(&Token::Comma);
173 if self.parser.consume_token(&Token::RParen) {
174 break;
176 } else if !comma {
177 return self.expected("',' or ')' after column name", self.parser.peek_token());
178 }
179 }
180
181 Ok(columns)
182 }
183
184 fn parse_create_external_table(&mut self) -> Result<Statement> {
185 let _ = self.parser.next_token();
186 self.parser
187 .expect_keyword(Keyword::TABLE)
188 .context(SyntaxSnafu)?;
189 let if_not_exists = self.parse_if_not_exist()?;
190 let table_name = self.intern_parse_table_name()?;
191 let (columns, constraints) = self.parse_columns()?;
192 if !columns.is_empty() {
193 validate_time_index(&columns, &constraints)?;
194 }
195
196 let engine = self.parse_table_engine(common_catalog::consts::FILE_ENGINE)?;
197 let options = self.parse_create_table_options()?;
198 Ok(Statement::CreateExternalTable(CreateExternalTable {
199 name: table_name,
200 columns,
201 constraints,
202 options,
203 if_not_exists,
204 engine,
205 }))
206 }
207
208 fn parse_create_database(&mut self) -> Result<Statement> {
209 let _ = self.parser.next_token();
210 let if_not_exists = self.parse_if_not_exist()?;
211 let database_name = self.parse_object_name().context(error::UnexpectedSnafu {
212 expected: "a database name",
213 actual: self.peek_token_as_string(),
214 })?;
215 let database_name = Self::canonicalize_object_name(database_name)?;
216
217 let options = self
218 .parser
219 .parse_options(Keyword::WITH)
220 .context(SyntaxSnafu)?
221 .into_iter()
222 .map(parse_option_string)
223 .collect::<Result<HashMap<String, OptionValue>>>()?;
224
225 for key in options.keys() {
226 ensure!(
227 validate_database_option(key),
228 InvalidDatabaseOptionSnafu { key: key.clone() }
229 );
230 }
231 if let Some(append_mode) = options.get("append_mode").and_then(|x| x.as_string())
232 && append_mode == "true"
233 && options.contains_key("merge_mode")
234 {
235 return InvalidDatabaseOptionSnafu {
236 key: "merge_mode".to_string(),
237 }
238 .fail();
239 }
240
241 Ok(Statement::CreateDatabase(CreateDatabase {
242 name: database_name,
243 if_not_exists,
244 options: OptionMap::new(options),
245 }))
246 }
247
248 fn parse_create_table(&mut self) -> Result<Statement> {
249 let _ = self.parser.next_token();
250
251 let if_not_exists = self.parse_if_not_exist()?;
252
253 let table_name = self.intern_parse_table_name()?;
254
255 if self.parser.parse_keyword(Keyword::LIKE) {
256 let source_name = self.intern_parse_table_name()?;
257
258 return Ok(Statement::CreateTableLike(CreateTableLike {
259 table_name,
260 source_name,
261 }));
262 }
263
264 let (columns, constraints) = self.parse_columns()?;
265 validate_time_index(&columns, &constraints)?;
266
267 let partitions = self.parse_partitions()?;
268 if let Some(partitions) = &partitions {
269 validate_partitions(&columns, partitions)?;
270 }
271
272 let engine = self.parse_table_engine(default_engine())?;
273 let options = self.parse_create_table_options()?;
274 let create_table = CreateTable {
275 if_not_exists,
276 name: table_name,
277 columns,
278 engine,
279 constraints,
280 options,
281 table_id: 0, partitions,
283 };
284
285 Ok(Statement::CreateTable(create_table))
286 }
287
288 fn parse_create_flow(&mut self, or_replace: bool) -> Result<Statement> {
290 let if_not_exists = self.parse_if_not_exist()?;
291
292 let flow_name = self.intern_parse_table_name()?;
293
294 if let Token::Word(word) = self.parser.peek_token().token
296 && word.value.eq_ignore_ascii_case(SINK)
297 {
298 self.parser.next_token();
299 } else {
300 Err(ParserError::ParserError(
301 "Expect `SINK` keyword".to_string(),
302 ))
303 .context(SyntaxSnafu)?
304 }
305 self.parser
306 .expect_keyword(Keyword::TO)
307 .context(SyntaxSnafu)?;
308
309 let output_table_name = self.intern_parse_table_name()?;
310
311 let expire_after = if let Token::Word(w1) = &self.parser.peek_token().token
312 && w1.value.eq_ignore_ascii_case(EXPIRE)
313 {
314 self.parser.next_token();
315 if let Token::Word(w2) = &self.parser.peek_token().token
316 && w2.value.eq_ignore_ascii_case(AFTER)
317 {
318 self.parser.next_token();
319 Some(self.parse_interval_no_month("EXPIRE AFTER")?)
320 } else {
321 None
322 }
323 } else {
324 None
325 };
326
327 let eval_interval = if self
328 .parser
329 .consume_tokens(&[Token::make_keyword("EVAL"), Token::make_keyword("INTERVAL")])
330 {
331 Some(self.parse_interval_no_month("EVAL INTERVAL")?)
332 } else {
333 None
334 };
335
336 let comment = if self.parser.parse_keyword(Keyword::COMMENT) {
337 match self.parser.next_token() {
338 TokenWithSpan {
339 token: Token::SingleQuotedString(value, ..),
340 ..
341 } => Some(value),
342 unexpected => {
343 return self
344 .parser
345 .expected("string", unexpected)
346 .context(SyntaxSnafu);
347 }
348 }
349 } else {
350 None
351 };
352
353 let flow_options = self
354 .parser
355 .parse_options(Keyword::WITH)
356 .context(SyntaxSnafu)?
357 .into_iter()
358 .map(parse_option_string)
359 .collect::<Result<HashMap<String, OptionValue>>>()?;
360
361 self.parser
362 .expect_keyword(Keyword::AS)
363 .context(SyntaxSnafu)?;
364
365 let query = Box::new(self.parse_flow_sql_or_tql(true)?);
366
367 Ok(Statement::CreateFlow(CreateFlow {
368 flow_name,
369 sink_table_name: output_table_name,
370 or_replace,
371 if_not_exists,
372 expire_after,
373 eval_interval,
374 comment,
375 flow_options: flow_option_map(flow_options),
376 query,
377 }))
378 }
379
380 fn parse_flow_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
381 let start_loc = self.parser.peek_token().span.start;
382 let start_index = location_to_index(self.sql, &start_loc);
383
384 let starts_with_with = matches!(
385 self.parser.peek_token().token,
386 Token::Word(w) if w.keyword == Keyword::WITH
387 );
388
389 let query = match self.parser.peek_token().token {
391 Token::Word(w) => match w.keyword {
392 Keyword::SELECT => self.parse_query(),
393 Keyword::WITH => self.parse_with_tql_with_now(require_now_expr),
394 Keyword::NoKeyword
395 if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
396 {
397 self.parse_tql(require_now_expr)
398 }
399
400 _ => self.unsupported(self.peek_token_as_string()),
401 },
402 _ => self.unsupported(self.peek_token_as_string()),
403 }?;
404
405 if starts_with_with {
406 let Statement::Query(query) = &query else {
407 return InvalidFlowQuerySnafu {
408 reason: "Expect a query after WITH".to_string(),
409 }
410 .fail();
411 };
412
413 if utils::has_tql_cte(query) && !utils::is_simple_tql_cte_query(query) {
414 return InvalidFlowQuerySnafu {
415 reason: "WITH is only supported for the simplest TQL CTE in CREATE FLOW"
416 .to_string(),
417 }
418 .fail();
419 }
420 }
421
422 let end_token = self.parser.peek_token();
423
424 let raw_query = if end_token == Token::EOF {
425 &self.sql[start_index..]
426 } else {
427 let end_loc = end_token.span.end;
428 let end_index = location_to_index(self.sql, &end_loc);
429 &self.sql[start_index..end_index.min(self.sql.len())]
430 };
431 let raw_query = raw_query.trim_end_matches(";");
432
433 let query = SqlOrTql::try_from_statement(query, raw_query)?;
434 Ok(query)
435 }
436
437 fn parse_interval_no_month(&mut self, context: &str) -> Result<i64> {
439 let interval = self.parse_interval_month_day_nano()?.0;
440 if interval.months != 0 {
441 return InvalidIntervalSnafu {
442 reason: format!("Interval with months is not allowed in {context}"),
443 }
444 .fail();
445 }
446 Ok(
447 interval.nanoseconds / 1_000_000_000
448 + interval.days as i64 * 60 * 60 * 24
449 + interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, )
453 }
454
455 fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> {
457 let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
458 let raw_interval_expr = interval_expr.to_string();
459 let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone(), false)?
460 .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
461 .ok()
462 .with_context(|| InvalidIntervalSnafu {
463 reason: format!("cannot cast {} to interval type", interval_expr),
464 })?;
465 if let ScalarValue::IntervalMonthDayNano(Some(interval)) = interval {
466 Ok((interval, raw_interval_expr))
467 } else {
468 unreachable!()
469 }
470 }
471
472 fn parse_if_not_exist(&mut self) -> Result<bool> {
473 match self.parser.peek_token().token {
474 Token::Word(w) if Keyword::IF != w.keyword => return Ok(false),
475 _ => {}
476 }
477
478 if self.parser.parse_keywords(&[Keyword::IF, Keyword::NOT]) {
479 return self
480 .parser
481 .expect_keyword(Keyword::EXISTS)
482 .map(|_| true)
483 .context(UnexpectedSnafu {
484 expected: "EXISTS",
485 actual: self.peek_token_as_string(),
486 });
487 }
488
489 if self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]) {
490 return UnsupportedSnafu { keyword: "EXISTS" }.fail();
491 }
492
493 Ok(false)
494 }
495
496 fn parse_create_table_options(&mut self) -> Result<OptionMap> {
497 parse_with_options(&mut self.parser)
498 }
499
500 fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
502 if !self.parser.parse_keyword(Keyword::PARTITION) {
503 return Ok(None);
504 }
505
506 self.parse_partition_on_columns().map(Some)
507 }
508
509 pub(crate) fn parse_partition_on_columns(&mut self) -> Result<Partitions> {
511 self.parser
512 .expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
513 .context(error::UnexpectedSnafu {
514 expected: "ON, COLUMNS",
515 actual: self.peek_token_as_string(),
516 })?;
517
518 let raw_column_list = self
519 .parser
520 .parse_parenthesized_column_list(Mandatory, false)
521 .context(error::SyntaxSnafu)?;
522 let column_list = raw_column_list
523 .into_iter()
524 .map(Self::canonicalize_identifier)
525 .collect();
526
527 let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
528
529 Ok(Partitions { column_list, exprs })
530 }
531
532 fn parse_partition_entry(&mut self) -> Result<Expr> {
533 self.parser.parse_expr().context(error::SyntaxSnafu)
534 }
535
536 fn parse_comma_separated<T, F>(&mut self, mut f: F) -> Result<Vec<T>>
538 where
539 F: FnMut(&mut ParserContext<'a>) -> Result<T>,
540 {
541 self.parser
542 .expect_token(&Token::LParen)
543 .context(error::UnexpectedSnafu {
544 expected: "(",
545 actual: self.peek_token_as_string(),
546 })?;
547
548 let mut values = vec![];
549 while self.parser.peek_token() != Token::RParen {
550 values.push(f(self)?);
551 if !self.parser.consume_token(&Token::Comma) {
552 break;
553 }
554 }
555
556 self.parser
557 .expect_token(&Token::RParen)
558 .context(error::UnexpectedSnafu {
559 expected: ")",
560 actual: self.peek_token_as_string(),
561 })?;
562
563 Ok(values)
564 }
565
566 fn parse_columns(&mut self) -> Result<(Vec<Column>, Vec<TableConstraint>)> {
568 let mut columns = vec![];
569 let mut constraints = vec![];
570 if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
571 return Ok((columns, constraints));
572 }
573
574 loop {
575 if let Some(constraint) = self.parse_optional_table_constraint()? {
576 constraints.push(constraint);
577 } else if let Token::Word(_) = self.parser.peek_token().token {
578 self.parse_column(&mut columns, &mut constraints)?;
579 } else {
580 return self.expected(
581 "column name or constraint definition",
582 self.parser.peek_token(),
583 );
584 }
585 let comma = self.parser.consume_token(&Token::Comma);
586 if self.parser.consume_token(&Token::RParen) {
587 break;
589 } else if !comma {
590 return self.expected(
591 "',' or ')' after column definition",
592 self.parser.peek_token(),
593 );
594 }
595 }
596
597 Ok((columns, constraints))
598 }
599
600 fn parse_column(
601 &mut self,
602 columns: &mut Vec<Column>,
603 constraints: &mut Vec<TableConstraint>,
604 ) -> Result<()> {
605 let mut column = self.parse_column_def()?;
606
607 let mut time_index_opt_idx = None;
608 for (index, opt) in column.options().iter().enumerate() {
609 if let ColumnOption::DialectSpecific(tokens) = &opt.option
610 && matches!(
611 &tokens[..],
612 [
613 Token::Word(Word {
614 keyword: Keyword::TIME,
615 ..
616 }),
617 Token::Word(Word {
618 keyword: Keyword::INDEX,
619 ..
620 })
621 ]
622 )
623 {
624 ensure!(
625 time_index_opt_idx.is_none(),
626 InvalidColumnOptionSnafu {
627 name: column.name().to_string(),
628 msg: "duplicated time index",
629 }
630 );
631 time_index_opt_idx = Some(index);
632
633 let constraint = TableConstraint::TimeIndex {
634 column: Ident::new(column.name().value.clone()),
635 };
636 constraints.push(constraint);
637 }
638 }
639
640 if let Some(index) = time_index_opt_idx {
641 ensure!(
642 !column.options().contains(&ColumnOptionDef {
643 option: ColumnOption::Null,
644 name: None,
645 }),
646 InvalidColumnOptionSnafu {
647 name: column.name().to_string(),
648 msg: "time index column can't be null",
649 }
650 );
651
652 let data_type = get_unalias_type(column.data_type());
654 ensure!(
655 matches!(data_type, DataType::Timestamp(_, _)),
656 InvalidColumnOptionSnafu {
657 name: column.name().to_string(),
658 msg: "time index column data type should be timestamp",
659 }
660 );
661
662 let not_null_opt = ColumnOptionDef {
663 option: ColumnOption::NotNull,
664 name: None,
665 };
666
667 if !column.options().contains(¬_null_opt) {
668 column.mut_options().push(not_null_opt);
669 }
670
671 let _ = column.mut_options().remove(index);
672 }
673
674 columns.push(column);
675
676 Ok(())
677 }
678
679 fn parse_column_name(&mut self) -> std::result::Result<Ident, ParserError> {
681 let name = self.parser.parse_identifier()?;
682 if name.quote_style.is_none() &&
683 ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()
685 {
686 return Err(ParserError::ParserError(format!(
687 "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
688 &name.value
689 )));
690 }
691
692 Ok(name)
693 }
694
695 pub fn parse_column_def(&mut self) -> Result<Column> {
696 let name = self.parse_column_name().context(SyntaxSnafu)?;
697 let parser = &mut self.parser;
698
699 ensure!(
700 !(name.quote_style.is_none() &&
701 ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()),
703 InvalidSqlSnafu {
704 msg: format!(
705 "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
706 &name.value
707 ),
708 }
709 );
710
711 let mut extensions = ColumnExtensions::default();
712
713 let data_type =
714 if let Some((data_type, type_hints)) = json::parse_json2_type_and_hints(parser)? {
715 extensions.json_type_hints = type_hints;
716 data_type
717 } else {
718 parser.parse_data_type().context(SyntaxSnafu)?
719 };
720
721 let mut options = vec![];
722 loop {
723 if parser.parse_keyword(Keyword::CONSTRAINT) {
724 let name = Some(parser.parse_identifier().context(SyntaxSnafu)?);
725 if let Some(option) = Self::parse_optional_column_option(parser)? {
726 options.push(ColumnOptionDef { name, option });
727 } else {
728 return parser
729 .expected(
730 "constraint details after CONSTRAINT <name>",
731 parser.peek_token(),
732 )
733 .context(SyntaxSnafu);
734 }
735 } else if let Some(option) = Self::parse_optional_column_option(parser)? {
736 options.push(ColumnOptionDef { name: None, option });
737 } else if !Self::parse_column_extensions(parser, &name, &data_type, &mut extensions)? {
738 break;
739 };
740 }
741
742 Ok(Column {
743 column_def: ColumnDef {
744 name: Self::canonicalize_identifier(name),
745 data_type,
746 options,
747 },
748 extensions,
749 })
750 }
751
752 fn parse_optional_column_option(parser: &mut Parser<'_>) -> Result<Option<ColumnOption>> {
753 if parser.parse_keywords(&[Keyword::CHARACTER, Keyword::SET]) {
754 Ok(Some(ColumnOption::CharacterSet(
755 parser.parse_object_name(false).context(SyntaxSnafu)?,
756 )))
757 } else if parser.parse_keywords(&[Keyword::NOT, Keyword::NULL]) {
758 Ok(Some(ColumnOption::NotNull))
759 } else if parser.parse_keywords(&[Keyword::COMMENT]) {
760 match parser.next_token() {
761 TokenWithSpan {
762 token: Token::SingleQuotedString(value, ..),
763 ..
764 } => Ok(Some(ColumnOption::Comment(value))),
765 unexpected => parser.expected("string", unexpected).context(SyntaxSnafu),
766 }
767 } else if parser.parse_keyword(Keyword::NULL) {
768 Ok(Some(ColumnOption::Null))
769 } else if parser.parse_keyword(Keyword::DEFAULT) {
770 Ok(Some(ColumnOption::Default(
771 parser.parse_expr().context(SyntaxSnafu)?,
772 )))
773 } else if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
774 Ok(Some(ColumnOption::PrimaryKey(PrimaryKeyConstraint {
775 name: None,
776 index_name: None,
777 index_type: None,
778 columns: vec![],
779 index_options: vec![],
780 characteristics: None,
781 })))
782 } else if parser.parse_keyword(Keyword::UNIQUE) {
783 Ok(Some(ColumnOption::Unique(UniqueConstraint {
784 name: None,
785 index_name: None,
786 index_type_display: KeyOrIndexDisplay::None,
787 index_type: None,
788 columns: vec![],
789 index_options: vec![],
790 characteristics: None,
791 nulls_distinct: NullsDistinctOption::None,
792 })))
793 } else if parser.parse_keywords(&[Keyword::TIME, Keyword::INDEX]) {
794 Ok(Some(ColumnOption::DialectSpecific(vec![
796 Token::Word(Word {
797 value: "TIME".to_string(),
798 quote_style: None,
799 keyword: Keyword::TIME,
800 }),
801 Token::Word(Word {
802 value: "INDEX".to_string(),
803 quote_style: None,
804 keyword: Keyword::INDEX,
805 }),
806 ])))
807 } else {
808 Ok(None)
809 }
810 }
811
812 fn parse_column_extensions(
818 parser: &mut Parser<'_>,
819 column_name: &Ident,
820 column_type: &DataType,
821 column_extensions: &mut ColumnExtensions,
822 ) -> Result<bool> {
823 if let DataType::Custom(name, tokens) = column_type
824 && name.0.len() == 1
825 && &name.0[0].to_string_unquoted().to_uppercase() == "VECTOR"
826 {
827 ensure!(
828 tokens.len() == 1,
829 InvalidColumnOptionSnafu {
830 name: column_name.to_string(),
831 msg: "VECTOR type should have dimension",
832 }
833 );
834
835 let dimension =
836 tokens[0]
837 .parse::<u32>()
838 .ok()
839 .with_context(|| InvalidColumnOptionSnafu {
840 name: column_name.to_string(),
841 msg: "dimension should be a positive integer",
842 })?;
843
844 let options = OptionMap::from([(VECTOR_OPT_DIM.to_string(), dimension.to_string())]);
845 column_extensions.vector_options = Some(options);
846 }
847
848 let mut is_index_declared = false;
850
851 if let Token::Word(word) = parser.peek_token().token
853 && word.value.eq_ignore_ascii_case(SKIPPING)
854 {
855 parser.next_token();
856 ensure!(
858 parser.parse_keyword(Keyword::INDEX),
859 InvalidColumnOptionSnafu {
860 name: column_name.to_string(),
861 msg: "expect INDEX after SKIPPING keyword",
862 }
863 );
864 ensure!(
865 column_extensions.skipping_index_options.is_none(),
866 InvalidColumnOptionSnafu {
867 name: column_name.to_string(),
868 msg: "duplicated SKIPPING index option",
869 }
870 );
871
872 let options = parser
873 .parse_options(Keyword::WITH)
874 .context(error::SyntaxSnafu)?
875 .into_iter()
876 .map(parse_option_string)
877 .collect::<Result<Vec<_>>>()?;
878
879 for (key, _) in options.iter() {
880 ensure!(
881 validate_column_skipping_index_create_option(key),
882 InvalidColumnOptionSnafu {
883 name: column_name.to_string(),
884 msg: format!("invalid SKIPPING INDEX option: {key}"),
885 }
886 );
887 }
888
889 let options = OptionMap::new(options);
890 column_extensions.skipping_index_options = Some(options);
891 is_index_declared |= true;
892 }
893
894 if parser.parse_keyword(Keyword::FULLTEXT) {
896 ensure!(
898 parser.parse_keyword(Keyword::INDEX),
899 InvalidColumnOptionSnafu {
900 name: column_name.to_string(),
901 msg: "expect INDEX after FULLTEXT keyword",
902 }
903 );
904
905 ensure!(
906 column_extensions.fulltext_index_options.is_none(),
907 InvalidColumnOptionSnafu {
908 name: column_name.to_string(),
909 msg: "duplicated FULLTEXT INDEX option",
910 }
911 );
912
913 let column_type = get_unalias_type(column_type);
914 let data_type = sql_data_type_to_concrete_data_type(&column_type)?;
915 ensure!(
916 data_type == ConcreteDataType::string_datatype(),
917 InvalidColumnOptionSnafu {
918 name: column_name.to_string(),
919 msg: "FULLTEXT index only supports string type",
920 }
921 );
922
923 let options = parser
924 .parse_options(Keyword::WITH)
925 .context(error::SyntaxSnafu)?
926 .into_iter()
927 .map(parse_option_string)
928 .collect::<Result<Vec<_>>>()?;
929
930 for (key, _) in options.iter() {
931 ensure!(
932 validate_column_fulltext_create_option(key),
933 InvalidColumnOptionSnafu {
934 name: column_name.to_string(),
935 msg: format!("invalid FULLTEXT INDEX option: {key}"),
936 }
937 );
938 }
939
940 let options = OptionMap::new(options);
941 column_extensions.fulltext_index_options = Some(options);
942 is_index_declared |= true;
943 }
944
945 if let Token::Word(word) = parser.peek_token().token
947 && word.value.eq_ignore_ascii_case(INVERTED)
948 {
949 parser.next_token();
950 ensure!(
952 parser.parse_keyword(Keyword::INDEX),
953 InvalidColumnOptionSnafu {
954 name: column_name.to_string(),
955 msg: "expect INDEX after INVERTED keyword",
956 }
957 );
958
959 ensure!(
960 column_extensions.inverted_index_options.is_none(),
961 InvalidColumnOptionSnafu {
962 name: column_name.to_string(),
963 msg: "duplicated INVERTED index option",
964 }
965 );
966
967 let with_token = parser.peek_token();
970 ensure!(
971 with_token.token
972 != Token::Word(Word {
973 value: "WITH".to_string(),
974 keyword: Keyword::WITH,
975 quote_style: None,
976 }),
977 InvalidColumnOptionSnafu {
978 name: column_name.to_string(),
979 msg: "INVERTED index doesn't support options",
980 }
981 );
982
983 column_extensions.inverted_index_options = Some(OptionMap::default());
984 is_index_declared |= true;
985 }
986
987 if let Token::Word(word) = parser.peek_token().token
989 && word.value.eq_ignore_ascii_case(VECTOR)
990 {
991 parser.next_token();
992 ensure!(
994 parser.parse_keyword(Keyword::INDEX),
995 InvalidColumnOptionSnafu {
996 name: column_name.to_string(),
997 msg: "expect INDEX after VECTOR keyword",
998 }
999 );
1000
1001 ensure!(
1002 column_extensions.vector_index_options.is_none(),
1003 InvalidColumnOptionSnafu {
1004 name: column_name.to_string(),
1005 msg: "duplicated VECTOR INDEX option",
1006 }
1007 );
1008
1009 let column_type = get_unalias_type(column_type);
1011 let data_type = sql_data_type_to_concrete_data_type(&column_type)?;
1012 ensure!(
1013 matches!(data_type, ConcreteDataType::Vector(_)),
1014 InvalidColumnOptionSnafu {
1015 name: column_name.to_string(),
1016 msg: "VECTOR INDEX only supports Vector type columns",
1017 }
1018 );
1019
1020 let options = parser
1021 .parse_options(Keyword::WITH)
1022 .context(error::SyntaxSnafu)?
1023 .into_iter()
1024 .map(parse_option_string)
1025 .collect::<Result<Vec<_>>>()?;
1026
1027 for (key, _) in options.iter() {
1028 ensure!(
1029 validate_column_vector_index_create_option(key),
1030 InvalidColumnOptionSnafu {
1031 name: column_name.to_string(),
1032 msg: format!("invalid VECTOR INDEX option: {key}"),
1033 }
1034 );
1035 }
1036
1037 let options = OptionMap::new(options);
1038 column_extensions.vector_index_options = Some(options);
1039 is_index_declared |= true;
1040 }
1041
1042 Ok(is_index_declared)
1043 }
1044
1045 fn parse_optional_table_constraint(&mut self) -> Result<Option<TableConstraint>> {
1046 match self.parser.next_token() {
1047 TokenWithSpan {
1048 token: Token::Word(w),
1049 ..
1050 } if w.keyword == Keyword::PRIMARY => {
1051 self.parser
1052 .expect_keyword(Keyword::KEY)
1053 .context(error::UnexpectedSnafu {
1054 expected: "KEY",
1055 actual: self.peek_token_as_string(),
1056 })?;
1057 let raw_columns = self
1058 .parser
1059 .parse_parenthesized_column_list(Mandatory, false)
1060 .context(error::SyntaxSnafu)?;
1061 let columns = raw_columns
1062 .into_iter()
1063 .map(Self::canonicalize_identifier)
1064 .collect();
1065 Ok(Some(TableConstraint::PrimaryKey { columns }))
1066 }
1067 TokenWithSpan {
1068 token: Token::Word(w),
1069 ..
1070 } if w.keyword == Keyword::TIME => {
1071 self.parser
1072 .expect_keyword(Keyword::INDEX)
1073 .context(error::UnexpectedSnafu {
1074 expected: "INDEX",
1075 actual: self.peek_token_as_string(),
1076 })?;
1077
1078 let raw_columns = self
1079 .parser
1080 .parse_parenthesized_column_list(Mandatory, false)
1081 .context(error::SyntaxSnafu)?;
1082 let mut columns = raw_columns
1083 .into_iter()
1084 .map(Self::canonicalize_identifier)
1085 .collect::<Vec<_>>();
1086
1087 ensure!(
1088 columns.len() == 1,
1089 InvalidTimeIndexSnafu {
1090 msg: "it should contain only one column in time index",
1091 }
1092 );
1093
1094 Ok(Some(TableConstraint::TimeIndex {
1095 column: columns.pop().unwrap(),
1096 }))
1097 }
1098 _ => {
1099 self.parser.prev_token();
1100 Ok(None)
1101 }
1102 }
1103 }
1104
1105 fn parse_table_engine(&mut self, default: &str) -> Result<String> {
1107 if !self.consume_token(ENGINE) {
1108 return Ok(default.to_string());
1109 }
1110
1111 self.parser
1112 .expect_token(&Token::Eq)
1113 .context(error::UnexpectedSnafu {
1114 expected: "=",
1115 actual: self.peek_token_as_string(),
1116 })?;
1117
1118 let token = self.parser.next_token();
1119 if let Token::Word(w) = token.token {
1120 Ok(w.value)
1121 } else {
1122 self.expected("'Engine' is missing", token)
1123 }
1124 }
1125}
1126
1127fn validate_time_index(columns: &[Column], constraints: &[TableConstraint]) -> Result<()> {
1128 let time_index_constraints: Vec<_> = constraints
1129 .iter()
1130 .filter_map(|c| match c {
1131 TableConstraint::TimeIndex { column } => Some(column),
1132 _ => None,
1133 })
1134 .unique()
1135 .collect();
1136
1137 ensure!(!time_index_constraints.is_empty(), MissingTimeIndexSnafu);
1138 ensure!(
1139 time_index_constraints.len() == 1,
1140 InvalidTimeIndexSnafu {
1141 msg: format!(
1142 "expected only one time index constraint but actual {}",
1143 time_index_constraints.len()
1144 ),
1145 }
1146 );
1147
1148 let time_index_column_ident = &time_index_constraints[0];
1151 let time_index_column = columns
1152 .iter()
1153 .find(|c| c.name().value == *time_index_column_ident.value)
1154 .with_context(|| InvalidTimeIndexSnafu {
1155 msg: format!(
1156 "time index column {} not found in columns",
1157 time_index_column_ident
1158 ),
1159 })?;
1160
1161 let time_index_data_type = get_unalias_type(time_index_column.data_type());
1162 ensure!(
1163 matches!(time_index_data_type, DataType::Timestamp(_, _)),
1164 InvalidColumnOptionSnafu {
1165 name: time_index_column.name().to_string(),
1166 msg: "time index column data type should be timestamp",
1167 }
1168 );
1169
1170 Ok(())
1171}
1172
1173fn get_unalias_type(data_type: &DataType) -> DataType {
1174 match data_type {
1175 DataType::Custom(name, tokens) if name.0.len() == 1 && tokens.is_empty() => {
1176 if let Some(real_type) =
1177 get_data_type_by_alias_name(name.0[0].to_string_unquoted().as_str())
1178 {
1179 real_type
1180 } else {
1181 data_type.clone()
1182 }
1183 }
1184 _ => data_type.clone(),
1185 }
1186}
1187
1188fn validate_partitions(columns: &[Column], partitions: &Partitions) -> Result<()> {
1189 let partition_columns = ensure_partition_columns_defined(columns, partitions)?;
1190
1191 ensure_exprs_are_binary(&partitions.exprs, &partition_columns)?;
1192
1193 Ok(())
1194}
1195
1196fn ensure_exprs_are_binary(exprs: &[Expr], columns: &[&Column]) -> Result<()> {
1198 for expr in exprs {
1199 if let Expr::BinaryOp { left, op: _, right } = expr {
1201 ensure_one_expr(left, columns)?;
1202 ensure_one_expr(right, columns)?;
1203 } else {
1204 return error::InvalidSqlSnafu {
1205 msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1206 }
1207 .fail();
1208 }
1209 }
1210 Ok(())
1211}
1212
1213fn ensure_one_expr(expr: &Expr, columns: &[&Column]) -> Result<()> {
1217 match expr {
1218 Expr::BinaryOp { left, op: _, right } => {
1219 ensure_one_expr(left, columns)?;
1220 ensure_one_expr(right, columns)?;
1221 Ok(())
1222 }
1223 Expr::Identifier(ident) => {
1224 let column_name = &ident.value;
1225 ensure!(
1226 columns.iter().any(|c| &c.name().value == column_name),
1227 error::InvalidSqlSnafu {
1228 msg: format!(
1229 "Column {:?} in rule expr is not referenced in PARTITION ON",
1230 column_name
1231 ),
1232 }
1233 );
1234 Ok(())
1235 }
1236 Expr::Value(_) => Ok(()),
1237 Expr::UnaryOp { expr, .. } => {
1238 ensure_one_expr(expr, columns)?;
1239 Ok(())
1240 }
1241 _ => error::InvalidSqlSnafu {
1242 msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1243 }
1244 .fail(),
1245 }
1246}
1247
1248fn ensure_partition_columns_defined<'a>(
1250 columns: &'a [Column],
1251 partitions: &'a Partitions,
1252) -> Result<Vec<&'a Column>> {
1253 partitions
1254 .column_list
1255 .iter()
1256 .map(|x| {
1257 let x = ParserContext::canonicalize_identifier(x.clone());
1258 columns
1261 .iter()
1262 .find(|c| *c.name().value == x.value)
1263 .context(error::InvalidSqlSnafu {
1264 msg: format!("Partition column {:?} not defined", x.value),
1265 })
1266 })
1267 .collect::<Result<Vec<&Column>>>()
1268}
1269
1270#[cfg(test)]
1271mod tests {
1272 use std::assert_matches;
1273 use std::collections::HashMap;
1274
1275 use common_catalog::consts::FILE_ENGINE;
1276 use common_error::ext::ErrorExt;
1277 use sqlparser::ast::ColumnOption::NotNull;
1278 use sqlparser::ast::{BinaryOperator, Expr, ObjectName, ObjectNamePart, Value};
1279 use sqlparser::dialect::GenericDialect;
1280 use sqlparser::tokenizer::Tokenizer;
1281
1282 use super::*;
1283 use crate::dialect::GreptimeDbDialect;
1284 use crate::parser::ParseOptions;
1285
1286 fn string_option_map(
1287 entries: impl IntoIterator<Item = (&'static str, &'static str)>,
1288 ) -> OptionMap {
1289 OptionMap::new(entries.into_iter().map(|(key, value)| {
1290 (
1291 key.to_string(),
1292 OptionValue::try_new(Expr::Value(
1293 Value::SingleQuotedString(value.to_string()).into(),
1294 ))
1295 .unwrap(),
1296 )
1297 }))
1298 }
1299
1300 #[test]
1301 fn test_parse_create_table_like() {
1302 let sql = "CREATE TABLE t1 LIKE t2";
1303 let stmts =
1304 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1305 .unwrap();
1306
1307 assert_eq!(1, stmts.len());
1308 match &stmts[0] {
1309 Statement::CreateTableLike(c) => {
1310 assert_eq!(c.table_name.to_string(), "t1");
1311 assert_eq!(c.source_name.to_string(), "t2");
1312 }
1313 _ => unreachable!(),
1314 }
1315 }
1316
1317 #[test]
1318 fn test_validate_external_table_options() {
1319 let sql = "CREATE EXTERNAL TABLE city (
1320 host string,
1321 ts timestamp,
1322 cpu float64 default 0,
1323 memory float64,
1324 TIME INDEX (ts),
1325 PRIMARY KEY(ts, host)
1326 ) with(location='/var/data/city.csv',format='csv',foo='bar');";
1327
1328 let result =
1329 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1330 assert!(matches!(
1331 result,
1332 Err(error::Error::InvalidTableOption { .. })
1333 ));
1334 }
1335
1336 #[test]
1337 fn test_parse_create_external_table() {
1338 struct Test<'a> {
1339 sql: &'a str,
1340 expected_table_name: &'a str,
1341 expected_options: HashMap<&'a str, &'a str>,
1342 expected_engine: &'a str,
1343 expected_if_not_exist: bool,
1344 }
1345
1346 let tests = [
1347 Test {
1348 sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
1349 expected_table_name: "city",
1350 expected_options: HashMap::from([
1351 ("location", "/var/data/city.csv"),
1352 ("format", "csv"),
1353 ]),
1354 expected_engine: FILE_ENGINE,
1355 expected_if_not_exist: false,
1356 },
1357 Test {
1358 sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');",
1359 expected_table_name: "city",
1360 expected_options: HashMap::from([
1361 ("location", "/var/data/city.csv"),
1362 ("format", "csv"),
1363 ]),
1364 expected_engine: "foo",
1365 expected_if_not_exist: true,
1366 },
1367 Test {
1368 sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv','compaction.type'='bar');",
1369 expected_table_name: "city",
1370 expected_options: HashMap::from([
1371 ("location", "/var/data/city.csv"),
1372 ("format", "csv"),
1373 ("compaction.type", "bar"),
1374 ]),
1375 expected_engine: "foo",
1376 expected_if_not_exist: true,
1377 },
1378 ];
1379
1380 for test in tests {
1381 let stmts = ParserContext::create_with_dialect(
1382 test.sql,
1383 &GreptimeDbDialect {},
1384 ParseOptions::default(),
1385 )
1386 .unwrap();
1387 assert_eq!(1, stmts.len());
1388 match &stmts[0] {
1389 Statement::CreateExternalTable(c) => {
1390 assert_eq!(c.name.to_string(), test.expected_table_name.to_string());
1391 assert_eq!(c.options.to_str_map(), test.expected_options);
1392 assert_eq!(c.if_not_exists, test.expected_if_not_exist);
1393 assert_eq!(c.engine, test.expected_engine);
1394 }
1395 _ => unreachable!(),
1396 }
1397 }
1398 }
1399
1400 #[test]
1401 fn test_parse_create_external_table_with_schema() {
1402 let sql = "CREATE EXTERNAL TABLE city (
1403 host string,
1404 ts timestamp,
1405 cpu float32 default 0,
1406 memory float64,
1407 TIME INDEX (ts),
1408 PRIMARY KEY(ts, host),
1409 ) with(location='/var/data/city.csv',format='csv');";
1410
1411 let options = HashMap::from([("location", "/var/data/city.csv"), ("format", "csv")]);
1412
1413 let stmts =
1414 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1415 .unwrap();
1416 assert_eq!(1, stmts.len());
1417 match &stmts[0] {
1418 Statement::CreateExternalTable(c) => {
1419 assert_eq!(c.name.to_string(), "city");
1420 assert_eq!(c.options.to_str_map(), options);
1421
1422 let columns = &c.columns;
1423 assert_column_def(&columns[0].column_def, "host", "STRING");
1424 assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
1425 assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
1426 assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
1427
1428 let constraints = &c.constraints;
1429 assert_eq!(
1430 &constraints[0],
1431 &TableConstraint::TimeIndex {
1432 column: Ident::new("ts"),
1433 }
1434 );
1435 assert_eq!(
1436 &constraints[1],
1437 &TableConstraint::PrimaryKey {
1438 columns: vec![Ident::new("ts"), Ident::new("host")]
1439 }
1440 );
1441 }
1442 _ => unreachable!(),
1443 }
1444 }
1445
1446 #[test]
1447 fn test_parse_create_database() {
1448 let sql = "create database";
1449 let result =
1450 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1451 assert!(
1452 result
1453 .unwrap_err()
1454 .to_string()
1455 .contains("Unexpected token while parsing SQL statement")
1456 );
1457
1458 let sql = "create database prometheus";
1459 let stmts =
1460 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1461 .unwrap();
1462
1463 assert_eq!(1, stmts.len());
1464 match &stmts[0] {
1465 Statement::CreateDatabase(c) => {
1466 assert_eq!(c.name.to_string(), "prometheus");
1467 assert!(!c.if_not_exists);
1468 }
1469 _ => unreachable!(),
1470 }
1471
1472 let sql = "create database if not exists prometheus";
1473 let stmts =
1474 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1475 .unwrap();
1476
1477 assert_eq!(1, stmts.len());
1478 match &stmts[0] {
1479 Statement::CreateDatabase(c) => {
1480 assert_eq!(c.name.to_string(), "prometheus");
1481 assert!(c.if_not_exists);
1482 }
1483 _ => unreachable!(),
1484 }
1485
1486 let sql = "CREATE DATABASE `fOo`";
1487 let result =
1488 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1489 let stmts = result.unwrap();
1490 match &stmts.last().unwrap() {
1491 Statement::CreateDatabase(c) => {
1492 assert_eq!(c.name, vec![Ident::with_quote('`', "fOo")].into());
1493 assert!(!c.if_not_exists);
1494 }
1495 _ => unreachable!(),
1496 }
1497
1498 let sql = "CREATE DATABASE prometheus with (ttl='1h');";
1499 let result =
1500 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1501 let stmts = result.unwrap();
1502 match &stmts[0] {
1503 Statement::CreateDatabase(c) => {
1504 assert_eq!(c.name.to_string(), "prometheus");
1505 assert!(!c.if_not_exists);
1506 assert_eq!(c.options.get("ttl").unwrap(), "1h");
1507 }
1508 _ => unreachable!(),
1509 }
1510 }
1511
1512 #[test]
1513 fn test_parse_create_flow_more_testcases() {
1514 use pretty_assertions::assert_eq;
1515 fn parse_create_flow(sql: &str) -> CreateFlow {
1516 let stmts = ParserContext::create_with_dialect(
1517 sql,
1518 &GreptimeDbDialect {},
1519 ParseOptions::default(),
1520 )
1521 .unwrap();
1522 assert_eq!(1, stmts.len());
1523 match &stmts[0] {
1524 Statement::CreateFlow(c) => c.clone(),
1525 _ => unreachable!(),
1526 }
1527 }
1528 struct CreateFlowWoutQuery {
1529 pub flow_name: ObjectName,
1531 pub sink_table_name: ObjectName,
1533 pub or_replace: bool,
1535 pub if_not_exists: bool,
1537 pub expire_after: Option<i64>,
1540 pub comment: Option<String>,
1542 pub flow_options: OptionMap,
1544 }
1545 let testcases = vec![
1546 (
1547 r"
1548CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1549SINK TO schema_1.table_1
1550EXPIRE AFTER INTERVAL '5 minutes'
1551COMMENT 'test comment'
1552AS
1553SELECT max(c1), min(c2) FROM schema_2.table_2;",
1554 CreateFlowWoutQuery {
1555 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1556 sink_table_name: ObjectName::from(vec![
1557 Ident::new("schema_1"),
1558 Ident::new("table_1"),
1559 ]),
1560 or_replace: true,
1561 if_not_exists: true,
1562 expire_after: Some(300),
1563 comment: Some("test comment".to_string()),
1564 flow_options: OptionMap::default(),
1565 },
1566 ),
1567 (
1568 r"
1569CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1570SINK TO schema_1.table_1
1571EXPIRE AFTER INTERVAL '300 s'
1572COMMENT 'test comment'
1573AS
1574SELECT max(c1), min(c2) FROM schema_2.table_2;",
1575 CreateFlowWoutQuery {
1576 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1577 sink_table_name: ObjectName::from(vec![
1578 Ident::new("schema_1"),
1579 Ident::new("table_1"),
1580 ]),
1581 or_replace: true,
1582 if_not_exists: true,
1583 expire_after: Some(300),
1584 comment: Some("test comment".to_string()),
1585 flow_options: OptionMap::default(),
1586 },
1587 ),
1588 (
1589 r"
1590CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1591SINK TO schema_1.table_1
1592EXPIRE AFTER '5 minutes'
1593COMMENT 'test comment'
1594AS
1595SELECT max(c1), min(c2) FROM schema_2.table_2;",
1596 CreateFlowWoutQuery {
1597 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1598 sink_table_name: ObjectName::from(vec![
1599 Ident::new("schema_1"),
1600 Ident::new("table_1"),
1601 ]),
1602 or_replace: true,
1603 if_not_exists: true,
1604 expire_after: Some(300),
1605 comment: Some("test comment".to_string()),
1606 flow_options: OptionMap::default(),
1607 },
1608 ),
1609 (
1610 r"
1611CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1612SINK TO schema_1.table_1
1613EXPIRE AFTER '300 s'
1614COMMENT 'test comment'
1615AS
1616SELECT max(c1), min(c2) FROM schema_2.table_2;",
1617 CreateFlowWoutQuery {
1618 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1619 sink_table_name: ObjectName::from(vec![
1620 Ident::new("schema_1"),
1621 Ident::new("table_1"),
1622 ]),
1623 or_replace: true,
1624 if_not_exists: true,
1625 expire_after: Some(300),
1626 comment: Some("test comment".to_string()),
1627 flow_options: OptionMap::default(),
1628 },
1629 ),
1630 (
1631 r"
1632CREATE FLOW `task_2`
1633SINK TO schema_1.table_1
1634EXPIRE AFTER '2 days 1h 2 min'
1635AS
1636SELECT max(c1), min(c2) FROM schema_2.table_2;",
1637 CreateFlowWoutQuery {
1638 flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_2")]),
1639 sink_table_name: ObjectName::from(vec![
1640 Ident::new("schema_1"),
1641 Ident::new("table_1"),
1642 ]),
1643 or_replace: false,
1644 if_not_exists: false,
1645 expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1646 comment: None,
1647 flow_options: OptionMap::default(),
1648 },
1649 ),
1650 (
1651 r"
1652create flow `task_3`
1653sink to schema_1.table_1
1654expire after '10 minutes'
1655as
1656select max(c1), min(c2) from schema_2.table_2;",
1657 CreateFlowWoutQuery {
1658 flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_3")]),
1659 sink_table_name: ObjectName::from(vec![
1660 Ident::new("schema_1"),
1661 Ident::new("table_1"),
1662 ]),
1663 or_replace: false,
1664 if_not_exists: false,
1665 expire_after: Some(600), comment: None,
1667 flow_options: OptionMap::default(),
1668 },
1669 ),
1670 (
1671 r"
1672create or replace flow if not exists task_4
1673sink to schema_1.table_1
1674expire after interval '2 hours'
1675comment 'lowercase test'
1676as
1677select max(c1), min(c2) from schema_2.table_2;",
1678 CreateFlowWoutQuery {
1679 flow_name: ObjectName::from(vec![Ident::new("task_4")]),
1680 sink_table_name: ObjectName::from(vec![
1681 Ident::new("schema_1"),
1682 Ident::new("table_1"),
1683 ]),
1684 or_replace: true,
1685 if_not_exists: true,
1686 expire_after: Some(7200), comment: Some("lowercase test".to_string()),
1688 flow_options: OptionMap::default(),
1689 },
1690 ),
1691 (
1692 r"
1693CREATE FLOW task_5
1694SINK TO schema_1.table_1
1695WITH (defer_on_missing_source = 'true')
1696AS
1697SELECT max(c1), min(c2) FROM schema_2.table_2;",
1698 CreateFlowWoutQuery {
1699 flow_name: ObjectName::from(vec![Ident::new("task_5")]),
1700 sink_table_name: ObjectName::from(vec![
1701 Ident::new("schema_1"),
1702 Ident::new("table_1"),
1703 ]),
1704 or_replace: false,
1705 if_not_exists: false,
1706 expire_after: None,
1707 comment: None,
1708 flow_options: string_option_map([("defer_on_missing_source", "true")]),
1709 },
1710 ),
1711 ];
1712
1713 for (sql, expected) in testcases {
1714 let create_task = parse_create_flow(sql);
1715
1716 let expected = CreateFlow {
1717 flow_name: expected.flow_name,
1718 sink_table_name: expected.sink_table_name,
1719 or_replace: expected.or_replace,
1720 if_not_exists: expected.if_not_exists,
1721 expire_after: expected.expire_after,
1722 eval_interval: None,
1723 comment: expected.comment,
1724 flow_options: expected.flow_options,
1725 query: create_task.query.clone(),
1727 };
1728
1729 assert_eq!(create_task, expected, "input sql is:\n{sql}");
1730 let show_create = create_task.to_string();
1731 let recreated = parse_create_flow(&show_create);
1732 assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1733 }
1734 }
1735
1736 #[test]
1737 fn test_parse_create_flow() {
1738 use pretty_assertions::assert_eq;
1739 fn parse_create_flow(sql: &str) -> CreateFlow {
1740 let stmts = ParserContext::create_with_dialect(
1741 sql,
1742 &GreptimeDbDialect {},
1743 ParseOptions::default(),
1744 )
1745 .unwrap();
1746 assert_eq!(1, stmts.len());
1747 match &stmts[0] {
1748 Statement::CreateFlow(c) => c.clone(),
1749 _ => panic!("{:?}", stmts[0]),
1750 }
1751 }
1752 struct CreateFlowWoutQuery {
1753 pub flow_name: ObjectName,
1755 pub sink_table_name: ObjectName,
1757 pub or_replace: bool,
1759 pub if_not_exists: bool,
1761 pub expire_after: Option<i64>,
1764 pub eval_interval: Option<i64>,
1768 pub comment: Option<String>,
1770 pub flow_options: OptionMap,
1772 }
1773
1774 let testcases = vec![
1776 (
1777 r"
1778CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1779SINK TO schema_1.table_1
1780EXPIRE AFTER INTERVAL '5 minutes'
1781COMMENT 'test comment'
1782AS
1783SELECT max(c1), min(c2) FROM schema_2.table_2;",
1784 CreateFlowWoutQuery {
1785 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1786 sink_table_name: ObjectName(vec![
1787 ObjectNamePart::Identifier(Ident::new("schema_1")),
1788 ObjectNamePart::Identifier(Ident::new("table_1")),
1789 ]),
1790 or_replace: true,
1791 if_not_exists: true,
1792 expire_after: Some(300),
1793 eval_interval: None,
1794 comment: Some("test comment".to_string()),
1795 flow_options: OptionMap::default(),
1796 },
1797 ),
1798 (
1799 r"
1800CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1801SINK TO schema_1.table_1
1802EXPIRE AFTER INTERVAL '300 s'
1803COMMENT 'test comment'
1804AS
1805SELECT max(c1), min(c2) FROM schema_2.table_2;",
1806 CreateFlowWoutQuery {
1807 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1808 sink_table_name: ObjectName(vec![
1809 ObjectNamePart::Identifier(Ident::new("schema_1")),
1810 ObjectNamePart::Identifier(Ident::new("table_1")),
1811 ]),
1812 or_replace: true,
1813 if_not_exists: true,
1814 expire_after: Some(300),
1815 eval_interval: None,
1816 comment: Some("test comment".to_string()),
1817 flow_options: OptionMap::default(),
1818 },
1819 ),
1820 (
1821 r"
1822CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1823SINK TO schema_1.table_1
1824EXPIRE AFTER '5 minutes'
1825EVAL INTERVAL '10 seconds'
1826COMMENT 'test comment'
1827AS
1828SELECT max(c1), min(c2) FROM schema_2.table_2;",
1829 CreateFlowWoutQuery {
1830 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1831 sink_table_name: ObjectName(vec![
1832 ObjectNamePart::Identifier(Ident::new("schema_1")),
1833 ObjectNamePart::Identifier(Ident::new("table_1")),
1834 ]),
1835 or_replace: true,
1836 if_not_exists: true,
1837 expire_after: Some(300),
1838 eval_interval: Some(10),
1839 comment: Some("test comment".to_string()),
1840 flow_options: OptionMap::default(),
1841 },
1842 ),
1843 (
1844 r"
1845CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1846SINK TO schema_1.table_1
1847EXPIRE AFTER '5 minutes'
1848EVAL INTERVAL INTERVAL '10 seconds'
1849COMMENT 'test comment'
1850AS
1851SELECT max(c1), min(c2) FROM schema_2.table_2;",
1852 CreateFlowWoutQuery {
1853 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1854 sink_table_name: ObjectName(vec![
1855 ObjectNamePart::Identifier(Ident::new("schema_1")),
1856 ObjectNamePart::Identifier(Ident::new("table_1")),
1857 ]),
1858 or_replace: true,
1859 if_not_exists: true,
1860 expire_after: Some(300),
1861 eval_interval: Some(10),
1862 comment: Some("test comment".to_string()),
1863 flow_options: OptionMap::default(),
1864 },
1865 ),
1866 (
1867 r"
1868CREATE FLOW `task_2`
1869SINK TO schema_1.table_1
1870EXPIRE AFTER '2 days 1h 2 min'
1871AS
1872SELECT max(c1), min(c2) FROM schema_2.table_2;",
1873 CreateFlowWoutQuery {
1874 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::with_quote(
1875 '`', "task_2",
1876 ))]),
1877 sink_table_name: ObjectName(vec![
1878 ObjectNamePart::Identifier(Ident::new("schema_1")),
1879 ObjectNamePart::Identifier(Ident::new("table_1")),
1880 ]),
1881 or_replace: false,
1882 if_not_exists: false,
1883 expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1884 eval_interval: None,
1885 comment: None,
1886 flow_options: OptionMap::default(),
1887 },
1888 ),
1889 (
1890 r"
1891CREATE FLOW task_3
1892SINK TO schema_1.table_1
1893EVAL INTERVAL '10 seconds'
1894WITH (defer_on_missing_source = 'true', foo = 'bar')
1895AS
1896SELECT max(c1), min(c2) FROM schema_2.table_2;",
1897 CreateFlowWoutQuery {
1898 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_3"))]),
1899 sink_table_name: ObjectName(vec![
1900 ObjectNamePart::Identifier(Ident::new("schema_1")),
1901 ObjectNamePart::Identifier(Ident::new("table_1")),
1902 ]),
1903 or_replace: false,
1904 if_not_exists: false,
1905 expire_after: None,
1906 eval_interval: Some(10),
1907 comment: None,
1908 flow_options: string_option_map([
1909 ("defer_on_missing_source", "true"),
1910 ("foo", "bar"),
1911 ]),
1912 },
1913 ),
1914 ];
1915
1916 for (sql, expected) in testcases {
1917 let create_task = parse_create_flow(sql);
1918
1919 let expected = CreateFlow {
1920 flow_name: expected.flow_name,
1921 sink_table_name: expected.sink_table_name,
1922 or_replace: expected.or_replace,
1923 if_not_exists: expected.if_not_exists,
1924 expire_after: expected.expire_after,
1925 eval_interval: expected.eval_interval,
1926 comment: expected.comment,
1927 flow_options: expected.flow_options,
1928 query: create_task.query.clone(),
1930 };
1931
1932 assert_eq!(create_task, expected, "input sql is:\n{sql}");
1933 let show_create = create_task.to_string();
1934 let recreated = parse_create_flow(&show_create);
1935 assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1936 }
1937 }
1938
1939 #[test]
1940 fn test_parse_create_flow_with_tql_cte_query() {
1941 let sql = r#"
1942CREATE FLOW calc_reqs_cte
1943SINK TO cnt_reqs_cte
1944EVAL INTERVAL '1m'
1945AS
1946WITH tql(the_timestamp, the_value) AS (
1947 TQL EVAL (now() - '1m'::interval, now(), '5s') metric
1948)
1949SELECT * FROM tql;
1950"#;
1951
1952 let stmts =
1953 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1954 .unwrap();
1955 assert_eq!(1, stmts.len());
1956 let Statement::CreateFlow(create_flow) = &stmts[0] else {
1957 panic!("unexpected stmt: {:?}", stmts[0]);
1958 };
1959
1960 let query = create_flow.query.to_string();
1961 assert!(query.to_uppercase().contains("WITH"));
1962 assert!(query.to_uppercase().contains("TQL EVAL"));
1963 }
1964
1965 #[test]
1966 fn test_parse_create_flow_with_sql_cte_is_supported() {
1967 let sql = r#"
1968CREATE FLOW f
1969SINK TO s
1970AS
1971WITH cte AS (SELECT 1) SELECT * FROM cte;
1972"#;
1973
1974 let stmts =
1975 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1976 .unwrap();
1977 assert_eq!(1, stmts.len());
1978 let Statement::CreateFlow(create_flow) = &stmts[0] else {
1979 panic!("unexpected stmt: {:?}", stmts[0]);
1980 };
1981 assert_eq!(
1982 "WITH cte AS (SELECT 1) SELECT * FROM cte",
1983 create_flow.query.to_string()
1984 );
1985 }
1986
1987 #[test]
1988 fn test_parse_create_flow_with_tql_cte_requires_now_expr() {
1989 let sql = r#"
1990CREATE FLOW f
1991SINK TO s
1992EVAL INTERVAL '1m'
1993AS
1994WITH tql(ts, val) AS (
1995 TQL EVAL (0, 15, '5s') metric
1996)
1997SELECT * FROM tql;
1998"#;
1999
2000 let err =
2001 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2002 .unwrap_err();
2003
2004 let msg = format!("{err:?}");
2005 assert!(
2006 msg.contains("Expected expression containing `now()`"),
2007 "unexpected err: {msg}"
2008 );
2009 }
2010
2011 #[test]
2012 fn test_parse_create_flow_with_tql_cte_non_select_star_is_unsupported() {
2013 let sql = r#"
2014CREATE FLOW f
2015SINK TO s
2016AS
2017WITH tql(ts, val) AS (
2018 TQL EVAL (now() - '1m'::interval, now(), '5s') metric
2019)
2020SELECT ts FROM tql;
2021"#;
2022
2023 let err =
2024 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2025 .unwrap_err();
2026 assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
2027 }
2028
2029 #[test]
2030 fn test_parse_create_flow_with_tql_cte_filter_is_unsupported() {
2031 let sql = r#"
2032CREATE FLOW f
2033SINK TO s
2034AS
2035WITH tql(ts, val) AS (
2036 TQL EVAL (now() - '1m'::interval, now(), '5s') metric
2037)
2038SELECT * FROM tql WHERE ts > 0;
2039"#;
2040
2041 let err =
2042 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2043 .unwrap_err();
2044 assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
2045 }
2046
2047 #[test]
2048 fn test_parse_create_flow_with_mixed_sql_tql_cte_is_unsupported() {
2049 let sql = r#"
2050CREATE FLOW f
2051SINK TO s
2052AS
2053WITH s1 AS (SELECT 1),
2054 tql(ts, val) AS (TQL EVAL (now() - '1m'::interval, now(), '5s') metric)
2055SELECT * FROM tql;
2056"#;
2057
2058 let err =
2059 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2060 .unwrap_err();
2061 assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
2062 }
2063
2064 #[test]
2065 fn test_create_flow_no_month() {
2066 let sql = r"
2067CREATE FLOW `task_2`
2068SINK TO schema_1.table_1
2069EXPIRE AFTER '1 month 2 days 1h 2 min'
2070AS
2071SELECT max(c1), min(c2) FROM schema_2.table_2;";
2072 let stmts =
2073 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2074
2075 assert!(
2076 stmts.is_err()
2077 && stmts
2078 .unwrap_err()
2079 .to_string()
2080 .contains("Interval with months is not allowed")
2081 );
2082 }
2083
2084 #[test]
2085 fn test_validate_create() {
2086 let sql = r"
2087CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp TIME INDEX)
2088PARTITION ON COLUMNS(c, a) (
2089 a < 10,
2090 a > 10 AND a < 20,
2091 a > 20 AND c < 100,
2092 a > 20 AND c >= 100
2093)
2094ENGINE=mito";
2095 let result =
2096 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2097 let _ = result.unwrap();
2098
2099 let sql = r"
2100CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2101PARTITION ON COLUMNS(x) ()
2102ENGINE=mito";
2103 let result =
2104 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2105 assert!(
2106 result
2107 .unwrap_err()
2108 .to_string()
2109 .contains("Partition column \"x\" not defined")
2110 );
2111 }
2112
2113 #[test]
2114 fn test_parse_create_table_with_partitions() {
2115 let sql = r"
2116CREATE TABLE monitor (
2117 host_id INT,
2118 idc STRING,
2119 ts TIMESTAMP,
2120 cpu DOUBLE DEFAULT 0,
2121 memory DOUBLE,
2122 TIME INDEX (ts),
2123 PRIMARY KEY (host),
2124)
2125PARTITION ON COLUMNS(idc, host_id) (
2126 idc <= 'hz' AND host_id < 1000,
2127 idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
2128 idc > 'sh' AND host_id < 3000,
2129 idc > 'sh' AND host_id >= 3000,
2130)
2131ENGINE=mito";
2132 let result =
2133 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2134 .unwrap();
2135 assert_eq!(result.len(), 1);
2136 match &result[0] {
2137 Statement::CreateTable(c) => {
2138 assert!(c.partitions.is_some());
2139
2140 let partitions = c.partitions.as_ref().unwrap();
2141 let column_list = partitions
2142 .column_list
2143 .iter()
2144 .map(|x| &x.value)
2145 .collect::<Vec<&String>>();
2146 assert_eq!(column_list, vec!["idc", "host_id"]);
2147
2148 let exprs = &partitions.exprs;
2149
2150 assert_eq!(
2151 exprs[0],
2152 Expr::BinaryOp {
2153 left: Box::new(Expr::BinaryOp {
2154 left: Box::new(Expr::Identifier("idc".into())),
2155 op: BinaryOperator::LtEq,
2156 right: Box::new(Expr::Value(
2157 Value::SingleQuotedString("hz".to_string()).into()
2158 ))
2159 }),
2160 op: BinaryOperator::And,
2161 right: Box::new(Expr::BinaryOp {
2162 left: Box::new(Expr::Identifier("host_id".into())),
2163 op: BinaryOperator::Lt,
2164 right: Box::new(Expr::Value(
2165 Value::Number("1000".to_string(), false).into()
2166 ))
2167 })
2168 }
2169 );
2170 assert_eq!(
2171 exprs[1],
2172 Expr::BinaryOp {
2173 left: Box::new(Expr::BinaryOp {
2174 left: Box::new(Expr::BinaryOp {
2175 left: Box::new(Expr::Identifier("idc".into())),
2176 op: BinaryOperator::Gt,
2177 right: Box::new(Expr::Value(
2178 Value::SingleQuotedString("hz".to_string()).into()
2179 ))
2180 }),
2181 op: BinaryOperator::And,
2182 right: Box::new(Expr::BinaryOp {
2183 left: Box::new(Expr::Identifier("idc".into())),
2184 op: BinaryOperator::LtEq,
2185 right: Box::new(Expr::Value(
2186 Value::SingleQuotedString("sh".to_string()).into()
2187 ))
2188 })
2189 }),
2190 op: BinaryOperator::And,
2191 right: Box::new(Expr::BinaryOp {
2192 left: Box::new(Expr::Identifier("host_id".into())),
2193 op: BinaryOperator::Lt,
2194 right: Box::new(Expr::Value(
2195 Value::Number("2000".to_string(), false).into()
2196 ))
2197 })
2198 }
2199 );
2200 assert_eq!(
2201 exprs[2],
2202 Expr::BinaryOp {
2203 left: Box::new(Expr::BinaryOp {
2204 left: Box::new(Expr::Identifier("idc".into())),
2205 op: BinaryOperator::Gt,
2206 right: Box::new(Expr::Value(
2207 Value::SingleQuotedString("sh".to_string()).into()
2208 ))
2209 }),
2210 op: BinaryOperator::And,
2211 right: Box::new(Expr::BinaryOp {
2212 left: Box::new(Expr::Identifier("host_id".into())),
2213 op: BinaryOperator::Lt,
2214 right: Box::new(Expr::Value(
2215 Value::Number("3000".to_string(), false).into()
2216 ))
2217 })
2218 }
2219 );
2220 assert_eq!(
2221 exprs[3],
2222 Expr::BinaryOp {
2223 left: Box::new(Expr::BinaryOp {
2224 left: Box::new(Expr::Identifier("idc".into())),
2225 op: BinaryOperator::Gt,
2226 right: Box::new(Expr::Value(
2227 Value::SingleQuotedString("sh".to_string()).into()
2228 ))
2229 }),
2230 op: BinaryOperator::And,
2231 right: Box::new(Expr::BinaryOp {
2232 left: Box::new(Expr::Identifier("host_id".into())),
2233 op: BinaryOperator::GtEq,
2234 right: Box::new(Expr::Value(
2235 Value::Number("3000".to_string(), false).into()
2236 ))
2237 })
2238 }
2239 );
2240 }
2241 _ => unreachable!(),
2242 }
2243 }
2244
2245 #[test]
2246 fn test_parse_create_table_with_quoted_partitions() {
2247 let sql = r"
2248CREATE TABLE monitor (
2249 `host_id` INT,
2250 idc STRING,
2251 ts TIMESTAMP,
2252 cpu DOUBLE DEFAULT 0,
2253 memory DOUBLE,
2254 TIME INDEX (ts),
2255 PRIMARY KEY (host),
2256)
2257PARTITION ON COLUMNS(IdC, host_id) (
2258 idc <= 'hz' AND host_id < 1000,
2259 idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
2260 idc > 'sh' AND host_id < 3000,
2261 idc > 'sh' AND host_id >= 3000,
2262)";
2263 let result =
2264 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2265 .unwrap();
2266 assert_eq!(result.len(), 1);
2267 }
2268
2269 #[test]
2270 fn test_parse_create_table_with_timestamp_index() {
2271 let sql1 = r"
2272CREATE TABLE monitor (
2273 host_id INT,
2274 idc STRING,
2275 ts TIMESTAMP TIME INDEX,
2276 cpu DOUBLE DEFAULT 0,
2277 memory DOUBLE,
2278 PRIMARY KEY (host),
2279)
2280ENGINE=mito";
2281 let result1 = ParserContext::create_with_dialect(
2282 sql1,
2283 &GreptimeDbDialect {},
2284 ParseOptions::default(),
2285 )
2286 .unwrap();
2287
2288 if let Statement::CreateTable(c) = &result1[0] {
2289 assert_eq!(c.constraints.len(), 2);
2290 let tc = c.constraints[0].clone();
2291 match tc {
2292 TableConstraint::TimeIndex { column } => {
2293 assert_eq!(&column.value, "ts");
2294 }
2295 _ => panic!("should be time index constraint"),
2296 };
2297 } else {
2298 panic!("should be create_table statement");
2299 }
2300
2301 let sql2 = r"
2304CREATE TABLE monitor (
2305 host_id INT,
2306 idc STRING,
2307 ts TIMESTAMP NOT NULL,
2308 cpu DOUBLE DEFAULT 0,
2309 memory DOUBLE,
2310 TIME INDEX (ts),
2311 PRIMARY KEY (host),
2312)
2313ENGINE=mito";
2314 let result2 = ParserContext::create_with_dialect(
2315 sql2,
2316 &GreptimeDbDialect {},
2317 ParseOptions::default(),
2318 )
2319 .unwrap();
2320
2321 assert_eq!(result1, result2);
2322
2323 let sql3 = r"
2325CREATE TABLE monitor (
2326 host_id INT,
2327 idc STRING,
2328 ts TIMESTAMP,
2329 cpu DOUBLE DEFAULT 0,
2330 memory DOUBLE,
2331 TIME INDEX (ts),
2332 PRIMARY KEY (host),
2333)
2334ENGINE=mito";
2335
2336 let result3 = ParserContext::create_with_dialect(
2337 sql3,
2338 &GreptimeDbDialect {},
2339 ParseOptions::default(),
2340 )
2341 .unwrap();
2342
2343 assert_ne!(result1, result3);
2344
2345 let sql1 = r"
2347CREATE TABLE monitor (
2348 host_id INT,
2349 idc STRING,
2350 b bigint TIME INDEX,
2351 cpu DOUBLE DEFAULT 0,
2352 memory DOUBLE,
2353 PRIMARY KEY (host),
2354)
2355ENGINE=mito";
2356 let result1 = ParserContext::create_with_dialect(
2357 sql1,
2358 &GreptimeDbDialect {},
2359 ParseOptions::default(),
2360 );
2361
2362 assert!(
2363 result1
2364 .unwrap_err()
2365 .to_string()
2366 .contains("time index column data type should be timestamp")
2367 );
2368 }
2369
2370 #[test]
2371 fn test_parse_create_table_with_timestamp_index_not_null() {
2372 let sql = r"
2373CREATE TABLE monitor (
2374 host_id INT,
2375 idc STRING,
2376 ts TIMESTAMP TIME INDEX,
2377 cpu DOUBLE DEFAULT 0,
2378 memory DOUBLE,
2379 TIME INDEX (ts),
2380 PRIMARY KEY (host),
2381)
2382ENGINE=mito";
2383 let result =
2384 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2385 .unwrap();
2386
2387 assert_eq!(result.len(), 1);
2388 if let Statement::CreateTable(c) = &result[0] {
2389 let ts = c.columns[2].clone();
2390 assert_eq!(ts.name().to_string(), "ts");
2391 assert_eq!(ts.options()[0].option, NotNull);
2392 } else {
2393 panic!("should be create table statement");
2394 }
2395
2396 let sql1 = r"
2397CREATE TABLE monitor (
2398 host_id INT,
2399 idc STRING,
2400 ts TIMESTAMP NOT NULL TIME INDEX,
2401 cpu DOUBLE DEFAULT 0,
2402 memory DOUBLE,
2403 TIME INDEX (ts),
2404 PRIMARY KEY (host),
2405)
2406ENGINE=mito";
2407
2408 let result1 = ParserContext::create_with_dialect(
2409 sql1,
2410 &GreptimeDbDialect {},
2411 ParseOptions::default(),
2412 )
2413 .unwrap();
2414 assert_eq!(result, result1);
2415
2416 let sql2 = r"
2417CREATE TABLE monitor (
2418 host_id INT,
2419 idc STRING,
2420 ts TIMESTAMP TIME INDEX NOT NULL,
2421 cpu DOUBLE DEFAULT 0,
2422 memory DOUBLE,
2423 TIME INDEX (ts),
2424 PRIMARY KEY (host),
2425)
2426ENGINE=mito";
2427
2428 let result2 = ParserContext::create_with_dialect(
2429 sql2,
2430 &GreptimeDbDialect {},
2431 ParseOptions::default(),
2432 )
2433 .unwrap();
2434 assert_eq!(result, result2);
2435
2436 let sql3 = r"
2437CREATE TABLE monitor (
2438 host_id INT,
2439 idc STRING,
2440 ts TIMESTAMP TIME INDEX NULL NOT,
2441 cpu DOUBLE DEFAULT 0,
2442 memory DOUBLE,
2443 TIME INDEX (ts),
2444 PRIMARY KEY (host),
2445)
2446ENGINE=mito";
2447
2448 let result3 = ParserContext::create_with_dialect(
2449 sql3,
2450 &GreptimeDbDialect {},
2451 ParseOptions::default(),
2452 );
2453 assert!(result3.is_err());
2454
2455 let sql4 = r"
2456CREATE TABLE monitor (
2457 host_id INT,
2458 idc STRING,
2459 ts TIMESTAMP TIME INDEX NOT NULL NULL,
2460 cpu DOUBLE DEFAULT 0,
2461 memory DOUBLE,
2462 TIME INDEX (ts),
2463 PRIMARY KEY (host),
2464)
2465ENGINE=mito";
2466
2467 let result4 = ParserContext::create_with_dialect(
2468 sql4,
2469 &GreptimeDbDialect {},
2470 ParseOptions::default(),
2471 );
2472 assert!(result4.is_err());
2473
2474 let sql = r"
2475CREATE TABLE monitor (
2476 host_id INT,
2477 idc STRING,
2478 ts TIMESTAMP TIME INDEX DEFAULT CURRENT_TIMESTAMP,
2479 cpu DOUBLE DEFAULT 0,
2480 memory DOUBLE,
2481 TIME INDEX (ts),
2482 PRIMARY KEY (host),
2483)
2484ENGINE=mito";
2485
2486 let result =
2487 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2488 .unwrap();
2489
2490 if let Statement::CreateTable(c) = &result[0] {
2491 let tc = c.constraints[0].clone();
2492 match tc {
2493 TableConstraint::TimeIndex { column } => {
2494 assert_eq!(&column.value, "ts");
2495 }
2496 _ => panic!("should be time index constraint"),
2497 }
2498 let ts = c.columns[2].clone();
2499 assert_eq!(ts.name().to_string(), "ts");
2500 assert!(matches!(ts.options()[0].option, ColumnOption::Default(..)));
2501 assert_eq!(ts.options()[1].option, NotNull);
2502 } else {
2503 unreachable!("should be create table statement");
2504 }
2505 }
2506
2507 #[test]
2508 fn test_parse_partitions_with_error_syntax() {
2509 let sql = r"
2510CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2511PARTITION COLUMNS(c, a) (
2512 a < 10,
2513 a > 10 AND a < 20,
2514 a > 20 AND c < 100,
2515 a > 20 AND c >= 100
2516)
2517ENGINE=mito";
2518 let result =
2519 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2520 assert!(
2521 result
2522 .unwrap_err()
2523 .output_msg()
2524 .contains("sql parser error: Expected: ON, found: COLUMNS")
2525 );
2526 }
2527
2528 #[test]
2529 fn test_parse_partitions_without_rule() {
2530 let sql = r"
2531CREATE TABLE rcx ( a INT, b STRING, c INT, d TIMESTAMP TIME INDEX )
2532PARTITION ON COLUMNS(c, a) ()
2533ENGINE=mito";
2534 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2535 .unwrap();
2536 }
2537
2538 #[test]
2539 fn test_parse_partitions_unreferenced_column() {
2540 let sql = r"
2541CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2542PARTITION ON COLUMNS(c, a) (
2543 b = 'foo'
2544)
2545ENGINE=mito";
2546 let result =
2547 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2548 assert_eq!(
2549 result.unwrap_err().output_msg(),
2550 "Invalid SQL, error: Column \"b\" in rule expr is not referenced in PARTITION ON"
2551 );
2552 }
2553
2554 #[test]
2555 fn test_parse_partitions_not_binary_expr() {
2556 let sql = r"
2557CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2558PARTITION ON COLUMNS(c, a) (
2559 b
2560)
2561ENGINE=mito";
2562 let result =
2563 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2564 assert_eq!(
2565 result.unwrap_err().output_msg(),
2566 r#"Invalid SQL, error: Partition rule expr Identifier(Ident { value: "b", quote_style: None, span: Span(Location(4,5)..Location(4,6)) }) is not a binary expr"#
2567 );
2568 }
2569
2570 fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {
2571 assert_eq!(column.name.to_string(), name);
2572 assert_eq!(column.data_type.to_string(), data_type);
2573 }
2574
2575 #[test]
2576 pub fn test_parse_create_table() {
2577 let sql = r"create table demo(
2578 host string,
2579 ts timestamp,
2580 cpu float32 default 0,
2581 memory float64,
2582 TIME INDEX (ts),
2583 PRIMARY KEY(ts, host),
2584 ) engine=mito
2585 with(ttl='10s');
2586 ";
2587 let result =
2588 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2589 .unwrap();
2590 assert_eq!(1, result.len());
2591 match &result[0] {
2592 Statement::CreateTable(c) => {
2593 assert!(!c.if_not_exists);
2594 assert_eq!("demo", c.name.to_string());
2595 assert_eq!("mito", c.engine);
2596 assert_eq!(4, c.columns.len());
2597 let columns = &c.columns;
2598 assert_column_def(&columns[0].column_def, "host", "STRING");
2599 assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
2600 assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
2601 assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
2602
2603 let constraints = &c.constraints;
2604 assert_eq!(
2605 &constraints[0],
2606 &TableConstraint::TimeIndex {
2607 column: Ident::new("ts"),
2608 }
2609 );
2610 assert_eq!(
2611 &constraints[1],
2612 &TableConstraint::PrimaryKey {
2613 columns: vec![Ident::new("ts"), Ident::new("host")]
2614 }
2615 );
2616 assert_eq!(1, c.options.len());
2618 assert_eq!(
2619 [("ttl", "10s")].into_iter().collect::<HashMap<_, _>>(),
2620 c.options.to_str_map()
2621 );
2622 }
2623 _ => unreachable!(),
2624 }
2625 }
2626
2627 #[test]
2628 fn test_invalid_index_keys() {
2629 let sql = r"create table demo(
2630 host string,
2631 ts int64,
2632 cpu float64 default 0,
2633 memory float64,
2634 TIME INDEX (ts, host),
2635 PRIMARY KEY(ts, host)) engine=mito;
2636 ";
2637 let result =
2638 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2639 assert!(result.is_err());
2640 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2641 }
2642
2643 #[test]
2644 fn test_duplicated_time_index() {
2645 let sql = r"create table demo(
2646 host string,
2647 ts timestamp time index,
2648 t timestamp time index,
2649 cpu float64 default 0,
2650 memory float64,
2651 TIME INDEX (ts, host),
2652 PRIMARY KEY(ts, host)) engine=mito;
2653 ";
2654 let result =
2655 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2656 assert!(result.is_err());
2657 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2658
2659 let sql = r"create table demo(
2660 host string,
2661 ts timestamp time index,
2662 cpu float64 default 0,
2663 t timestamp,
2664 memory float64,
2665 TIME INDEX (t),
2666 PRIMARY KEY(ts, host)) engine=mito;
2667 ";
2668 let result =
2669 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2670 assert!(result.is_err());
2671 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2672 }
2673
2674 #[test]
2675 fn test_invalid_column_name() {
2676 let sql = "create table foo(user string, i timestamp time index)";
2677 let result =
2678 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2679 let err = result.unwrap_err().output_msg();
2680 assert!(err.contains("Cannot use keyword 'user' as column name"));
2681
2682 let sql = r#"
2684 create table foo("user" string, i timestamp time index)
2685 "#;
2686 let result =
2687 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2688 let _ = result.unwrap();
2689 }
2690
2691 #[test]
2692 fn test_incorrect_default_value_issue_3479() {
2693 let sql = r#"CREATE TABLE `ExcePTuRi`(
2694non TIMESTAMP(6) TIME INDEX,
2695`iUSTO` DOUBLE DEFAULT 0.047318541668048164
2696)"#;
2697 let result =
2698 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2699 .unwrap();
2700 assert_eq!(1, result.len());
2701 match &result[0] {
2702 Statement::CreateTable(c) => {
2703 assert_eq!(
2704 "`iUSTO` DOUBLE DEFAULT 0.047318541668048164",
2705 c.columns[1].to_string()
2706 );
2707 }
2708 _ => unreachable!(),
2709 }
2710 }
2711
2712 #[test]
2713 fn test_parse_create_view() {
2714 let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
2715
2716 let result =
2717 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2718 .unwrap();
2719 match &result[0] {
2720 Statement::CreateView(c) => {
2721 assert_eq!(c.to_string(), sql);
2722 assert!(!c.or_replace);
2723 assert!(!c.if_not_exists);
2724 assert_eq!("test", c.name.to_string());
2725 }
2726 _ => unreachable!(),
2727 }
2728
2729 let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS";
2730
2731 let result =
2732 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2733 .unwrap();
2734 match &result[0] {
2735 Statement::CreateView(c) => {
2736 assert_eq!(c.to_string(), sql);
2737 assert!(c.or_replace);
2738 assert!(c.if_not_exists);
2739 assert_eq!("test", c.name.to_string());
2740 }
2741 _ => unreachable!(),
2742 }
2743 }
2744
2745 #[test]
2746 fn test_parse_create_view_invalid_query() {
2747 let sql = "CREATE VIEW test AS DELETE from demo";
2748 let result =
2749 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2750 assert!(result.is_ok_and(|x| x.len() == 1));
2751 }
2752
2753 #[test]
2754 fn test_parse_create_table_fulltext_options() {
2755 let sql1 = r"
2756CREATE TABLE log (
2757 ts TIMESTAMP TIME INDEX,
2758 msg TEXT FULLTEXT INDEX,
2759)";
2760 let result1 = ParserContext::create_with_dialect(
2761 sql1,
2762 &GreptimeDbDialect {},
2763 ParseOptions::default(),
2764 )
2765 .unwrap();
2766
2767 if let Statement::CreateTable(c) = &result1[0] {
2768 c.columns.iter().for_each(|col| {
2769 if col.name().value == "msg" {
2770 assert!(
2771 col.extensions
2772 .fulltext_index_options
2773 .as_ref()
2774 .unwrap()
2775 .is_empty()
2776 );
2777 }
2778 });
2779 } else {
2780 panic!("should be create_table statement");
2781 }
2782
2783 let sql2 = r"
2784CREATE TABLE log (
2785 ts TIMESTAMP TIME INDEX,
2786 msg STRING FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false')
2787)";
2788 let result2 = ParserContext::create_with_dialect(
2789 sql2,
2790 &GreptimeDbDialect {},
2791 ParseOptions::default(),
2792 )
2793 .unwrap();
2794
2795 if let Statement::CreateTable(c) = &result2[0] {
2796 c.columns.iter().for_each(|col| {
2797 if col.name().value == "msg" {
2798 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2799 assert_eq!(options.len(), 2);
2800 assert_eq!(options.get("analyzer").unwrap(), "English");
2801 assert_eq!(options.get("case_sensitive").unwrap(), "false");
2802 }
2803 });
2804 } else {
2805 panic!("should be create_table statement");
2806 }
2807
2808 let sql3 = r"
2809CREATE TABLE log (
2810 ts TIMESTAMP TIME INDEX,
2811 msg1 TINYTEXT FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false'),
2812 msg2 CHAR(20) FULLTEXT INDEX WITH (analyzer='Chinese', case_sensitive='true')
2813)";
2814 let result3 = ParserContext::create_with_dialect(
2815 sql3,
2816 &GreptimeDbDialect {},
2817 ParseOptions::default(),
2818 )
2819 .unwrap();
2820
2821 if let Statement::CreateTable(c) = &result3[0] {
2822 c.columns.iter().for_each(|col| {
2823 if col.name().value == "msg1" {
2824 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2825 assert_eq!(options.len(), 2);
2826 assert_eq!(options.get("analyzer").unwrap(), "English");
2827 assert_eq!(options.get("case_sensitive").unwrap(), "false");
2828 } else if col.name().value == "msg2" {
2829 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2830 assert_eq!(options.len(), 2);
2831 assert_eq!(options.get("analyzer").unwrap(), "Chinese");
2832 assert_eq!(options.get("case_sensitive").unwrap(), "true");
2833 }
2834 });
2835 } else {
2836 panic!("should be create_table statement");
2837 }
2838 }
2839
2840 #[test]
2841 fn test_parse_create_table_fulltext_options_invalid_type() {
2842 let sql = r"
2843CREATE TABLE log (
2844 ts TIMESTAMP TIME INDEX,
2845 msg INT FULLTEXT INDEX,
2846)";
2847 let result =
2848 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2849 assert!(result.is_err());
2850 assert!(
2851 result
2852 .unwrap_err()
2853 .to_string()
2854 .contains("FULLTEXT index only supports string type")
2855 );
2856 }
2857
2858 #[test]
2859 fn test_parse_create_table_fulltext_options_duplicate() {
2860 let sql = r"
2861CREATE TABLE log (
2862 ts TIMESTAMP TIME INDEX,
2863 msg STRING FULLTEXT INDEX WITH (analyzer='English', analyzer='Chinese') FULLTEXT INDEX WITH (case_sensitive='false')
2864)";
2865 let result =
2866 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2867 assert!(result.is_err());
2868 assert!(
2869 result
2870 .unwrap_err()
2871 .to_string()
2872 .contains("duplicated FULLTEXT INDEX option")
2873 );
2874 }
2875
2876 #[test]
2877 fn test_parse_create_table_fulltext_options_invalid_option() {
2878 let sql = r"
2879CREATE TABLE log (
2880 ts TIMESTAMP TIME INDEX,
2881 msg STRING FULLTEXT INDEX WITH (analyzer='English', invalid_option='Chinese')
2882)";
2883 let result =
2884 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2885 assert!(result.is_err());
2886 assert!(
2887 result
2888 .unwrap_err()
2889 .to_string()
2890 .contains("invalid FULLTEXT INDEX option")
2891 );
2892 }
2893
2894 #[test]
2895 fn test_parse_create_table_skip_options() {
2896 let sql = r"
2897CREATE TABLE log (
2898 ts TIMESTAMP TIME INDEX,
2899 msg INT SKIPPING INDEX WITH (granularity='8192', type='bloom'),
2900)";
2901 let result =
2902 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2903 .unwrap();
2904
2905 if let Statement::CreateTable(c) = &result[0] {
2906 c.columns.iter().for_each(|col| {
2907 if col.name().value == "msg" {
2908 assert!(
2909 !col.extensions
2910 .skipping_index_options
2911 .as_ref()
2912 .unwrap()
2913 .is_empty()
2914 );
2915 }
2916 });
2917 } else {
2918 panic!("should be create_table statement");
2919 }
2920
2921 let sql = r"
2922 CREATE TABLE log (
2923 ts TIMESTAMP TIME INDEX,
2924 msg INT SKIPPING INDEX,
2925 )";
2926 let result =
2927 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2928 .unwrap();
2929
2930 if let Statement::CreateTable(c) = &result[0] {
2931 c.columns.iter().for_each(|col| {
2932 if col.name().value == "msg" {
2933 assert!(
2934 col.extensions
2935 .skipping_index_options
2936 .as_ref()
2937 .unwrap()
2938 .is_empty()
2939 );
2940 }
2941 });
2942 } else {
2943 panic!("should be create_table statement");
2944 }
2945 }
2946
2947 #[test]
2948 fn test_parse_create_view_with_columns() {
2949 let sql = "CREATE VIEW test () AS SELECT * FROM NUMBERS";
2950 let result =
2951 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2952 .unwrap();
2953
2954 match &result[0] {
2955 Statement::CreateView(c) => {
2956 assert_eq!(c.to_string(), "CREATE VIEW test AS SELECT * FROM NUMBERS");
2957 assert!(!c.or_replace);
2958 assert!(!c.if_not_exists);
2959 assert_eq!("test", c.name.to_string());
2960 }
2961 _ => unreachable!(),
2962 }
2963 assert_eq!(
2964 "CREATE VIEW test AS SELECT * FROM NUMBERS",
2965 result[0].to_string()
2966 );
2967
2968 let sql = "CREATE VIEW test (n1) AS SELECT * FROM NUMBERS";
2969 let result =
2970 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2971 .unwrap();
2972
2973 match &result[0] {
2974 Statement::CreateView(c) => {
2975 assert_eq!(c.to_string(), sql);
2976 assert!(!c.or_replace);
2977 assert!(!c.if_not_exists);
2978 assert_eq!("test", c.name.to_string());
2979 }
2980 _ => unreachable!(),
2981 }
2982 assert_eq!(sql, result[0].to_string());
2983
2984 let sql = "CREATE VIEW test (n1, n2) AS SELECT * FROM NUMBERS";
2985 let result =
2986 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2987 .unwrap();
2988
2989 match &result[0] {
2990 Statement::CreateView(c) => {
2991 assert_eq!(c.to_string(), sql);
2992 assert!(!c.or_replace);
2993 assert!(!c.if_not_exists);
2994 assert_eq!("test", c.name.to_string());
2995 }
2996 _ => unreachable!(),
2997 }
2998 assert_eq!(sql, result[0].to_string());
2999
3000 let sql = "CREATE VIEW test (n1 AS select * from demo";
3002 let result =
3003 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3004 assert!(result.is_err());
3005
3006 let sql = "CREATE VIEW test (n1, AS select * from demo";
3007 let result =
3008 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3009 assert!(result.is_err());
3010
3011 let sql = "CREATE VIEW test n1,n2) AS select * from demo";
3012 let result =
3013 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3014 assert!(result.is_err());
3015
3016 let sql = "CREATE VIEW test (1) AS select * from demo";
3017 let result =
3018 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3019 assert!(result.is_err());
3020
3021 let sql = "CREATE VIEW test (n1, select) AS select * from demo";
3023 let result =
3024 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3025 assert!(result.is_err());
3026 }
3027
3028 #[test]
3029 fn test_parse_column_extensions_vector() {
3030 let sql = "";
3032 let dialect = GenericDialect {};
3033 let mut tokenizer = Tokenizer::new(&dialect, sql);
3034 let tokens = tokenizer.tokenize().unwrap();
3035 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3036 let name = Ident::new("vec_col");
3037 let data_type =
3038 DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
3039 let mut extensions = ColumnExtensions::default();
3040
3041 let result =
3042 ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
3043 assert!(result.is_ok());
3044 assert!(extensions.vector_options.is_some());
3045 let vector_options = extensions.vector_options.unwrap();
3046 assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some("128"));
3047 }
3048
3049 #[test]
3050 fn test_parse_column_extensions_vector_invalid() {
3051 let sql = "";
3053 let dialect = GenericDialect {};
3054 let mut tokenizer = Tokenizer::new(&dialect, sql);
3055 let tokens = tokenizer.tokenize().unwrap();
3056 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3057 let name = Ident::new("vec_col");
3058 let data_type = DataType::Custom(vec![Ident::new("VECTOR")].into(), vec![]);
3059 let mut extensions = ColumnExtensions::default();
3060
3061 let result =
3062 ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
3063 assert!(result.is_err());
3064 }
3065
3066 #[test]
3067 fn test_parse_column_extensions_indices() {
3068 {
3070 let sql = "SKIPPING INDEX";
3071 let dialect = GenericDialect {};
3072 let mut tokenizer = Tokenizer::new(&dialect, sql);
3073 let tokens = tokenizer.tokenize().unwrap();
3074 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3075 let name = Ident::new("col");
3076 let data_type = DataType::String(None);
3077 let mut extensions = ColumnExtensions::default();
3078 let result = ParserContext::parse_column_extensions(
3079 &mut parser,
3080 &name,
3081 &data_type,
3082 &mut extensions,
3083 );
3084 assert!(result.is_ok());
3085 assert!(extensions.skipping_index_options.is_some());
3086 }
3087
3088 {
3090 let sql = "FULLTEXT INDEX WITH (analyzer = 'English', case_sensitive = 'true')";
3091 let dialect = GenericDialect {};
3092 let mut tokenizer = Tokenizer::new(&dialect, sql);
3093 let tokens = tokenizer.tokenize().unwrap();
3094 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3095 let name = Ident::new("text_col");
3096 let data_type = DataType::String(None);
3097 let mut extensions = ColumnExtensions::default();
3098 let result = ParserContext::parse_column_extensions(
3099 &mut parser,
3100 &name,
3101 &data_type,
3102 &mut extensions,
3103 );
3104 assert!(result.unwrap());
3105 assert!(extensions.fulltext_index_options.is_some());
3106 let fulltext_options = extensions.fulltext_index_options.unwrap();
3107 assert_eq!(fulltext_options.get("analyzer"), Some("English"));
3108 assert_eq!(fulltext_options.get("case_sensitive"), Some("true"));
3109 }
3110
3111 {
3113 let sql = "FULLTEXT INDEX WITH (analyzer = 'English')";
3114 let dialect = GenericDialect {};
3115 let mut tokenizer = Tokenizer::new(&dialect, sql);
3116 let tokens = tokenizer.tokenize().unwrap();
3117 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3118 let name = Ident::new("num_col");
3119 let data_type = DataType::Int(None); let mut extensions = ColumnExtensions::default();
3121 let result = ParserContext::parse_column_extensions(
3122 &mut parser,
3123 &name,
3124 &data_type,
3125 &mut extensions,
3126 );
3127 assert!(result.is_err());
3128 assert!(
3129 result
3130 .unwrap_err()
3131 .to_string()
3132 .contains("FULLTEXT index only supports string type")
3133 );
3134 }
3135
3136 {
3138 let sql = "FULLTEXT INDEX WITH (analyzer = 'Invalid', case_sensitive = 'true')";
3139 let dialect = GenericDialect {};
3140 let mut tokenizer = Tokenizer::new(&dialect, sql);
3141 let tokens = tokenizer.tokenize().unwrap();
3142 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3143 let name = Ident::new("text_col");
3144 let data_type = DataType::String(None);
3145 let mut extensions = ColumnExtensions::default();
3146 let result = ParserContext::parse_column_extensions(
3147 &mut parser,
3148 &name,
3149 &data_type,
3150 &mut extensions,
3151 );
3152 assert!(result.unwrap());
3153 }
3154
3155 {
3157 let sql = "INVERTED INDEX";
3158 let dialect = GenericDialect {};
3159 let mut tokenizer = Tokenizer::new(&dialect, sql);
3160 let tokens = tokenizer.tokenize().unwrap();
3161 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3162 let name = Ident::new("col");
3163 let data_type = DataType::String(None);
3164 let mut extensions = ColumnExtensions::default();
3165 let result = ParserContext::parse_column_extensions(
3166 &mut parser,
3167 &name,
3168 &data_type,
3169 &mut extensions,
3170 );
3171 assert!(result.is_ok());
3172 assert!(extensions.inverted_index_options.is_some());
3173 }
3174
3175 {
3177 let sql = "INVERTED INDEX WITH (analyzer = 'English')";
3178 let dialect = GenericDialect {};
3179 let mut tokenizer = Tokenizer::new(&dialect, sql);
3180 let tokens = tokenizer.tokenize().unwrap();
3181 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3182 let name = Ident::new("col");
3183 let data_type = DataType::String(None);
3184 let mut extensions = ColumnExtensions::default();
3185 let result = ParserContext::parse_column_extensions(
3186 &mut parser,
3187 &name,
3188 &data_type,
3189 &mut extensions,
3190 );
3191 assert!(result.is_err());
3192 assert!(
3193 result
3194 .unwrap_err()
3195 .to_string()
3196 .contains("INVERTED index doesn't support options")
3197 );
3198 }
3199
3200 {
3202 let sql = "SKIPPING INDEX FULLTEXT INDEX";
3203 let dialect = GenericDialect {};
3204 let mut tokenizer = Tokenizer::new(&dialect, sql);
3205 let tokens = tokenizer.tokenize().unwrap();
3206 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3207 let name = Ident::new("col");
3208 let data_type = DataType::String(None);
3209 let mut extensions = ColumnExtensions::default();
3210 let result = ParserContext::parse_column_extensions(
3211 &mut parser,
3212 &name,
3213 &data_type,
3214 &mut extensions,
3215 );
3216 assert!(result.unwrap());
3217 assert!(extensions.skipping_index_options.is_some());
3218 assert!(extensions.fulltext_index_options.is_some());
3219 }
3220 }
3221
3222 #[test]
3223 fn test_parse_interval_cast() {
3224 let s = "select '10s'::INTERVAL";
3225 let stmts =
3226 ParserContext::create_with_dialect(s, &GreptimeDbDialect {}, ParseOptions::default())
3227 .unwrap();
3228 assert_eq!("SELECT '10 seconds'::INTERVAL", &stmts[0].to_string());
3229 }
3230
3231 #[test]
3232 fn test_parse_create_table_vector_index_options() {
3233 let sql = r"
3235CREATE TABLE vectors (
3236 ts TIMESTAMP TIME INDEX,
3237 vec VECTOR(128) VECTOR INDEX,
3238)";
3239 let result =
3240 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3241 .unwrap();
3242
3243 if let Statement::CreateTable(c) = &result[0] {
3244 c.columns.iter().for_each(|col| {
3245 if col.name().value == "vec" {
3246 assert!(
3247 col.extensions
3248 .vector_index_options
3249 .as_ref()
3250 .unwrap()
3251 .is_empty()
3252 );
3253 }
3254 });
3255 } else {
3256 panic!("should be create_table statement");
3257 }
3258
3259 let sql = r"
3261CREATE TABLE vectors (
3262 ts TIMESTAMP TIME INDEX,
3263 vec VECTOR(128) VECTOR INDEX WITH (metric='cosine', connectivity='32', expansion_add='256', expansion_search='128')
3264)";
3265 let result =
3266 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3267 .unwrap();
3268
3269 if let Statement::CreateTable(c) = &result[0] {
3270 c.columns.iter().for_each(|col| {
3271 if col.name().value == "vec" {
3272 let options = col.extensions.vector_index_options.as_ref().unwrap();
3273 assert_eq!(options.len(), 4);
3274 assert_eq!(options.get("metric").unwrap(), "cosine");
3275 assert_eq!(options.get("connectivity").unwrap(), "32");
3276 assert_eq!(options.get("expansion_add").unwrap(), "256");
3277 assert_eq!(options.get("expansion_search").unwrap(), "128");
3278 }
3279 });
3280 } else {
3281 panic!("should be create_table statement");
3282 }
3283 }
3284
3285 #[test]
3286 fn test_parse_create_table_vector_index_invalid_type() {
3287 let sql = r"
3289CREATE TABLE vectors (
3290 ts TIMESTAMP TIME INDEX,
3291 col INT VECTOR INDEX,
3292)";
3293 let result =
3294 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3295 assert!(result.is_err());
3296 assert!(
3297 result
3298 .unwrap_err()
3299 .to_string()
3300 .contains("VECTOR INDEX only supports Vector type columns")
3301 );
3302 }
3303
3304 #[test]
3305 fn test_parse_create_table_vector_index_duplicate() {
3306 let sql = r"
3308CREATE TABLE vectors (
3309 ts TIMESTAMP TIME INDEX,
3310 vec VECTOR(128) VECTOR INDEX VECTOR INDEX,
3311)";
3312 let result =
3313 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3314 assert!(result.is_err());
3315 assert!(
3316 result
3317 .unwrap_err()
3318 .to_string()
3319 .contains("duplicated VECTOR INDEX option")
3320 );
3321 }
3322
3323 #[test]
3324 fn test_parse_create_table_vector_index_invalid_option() {
3325 let sql = r"
3327CREATE TABLE vectors (
3328 ts TIMESTAMP TIME INDEX,
3329 vec VECTOR(128) VECTOR INDEX WITH (metric='l2sq', invalid_option='foo')
3330)";
3331 let result =
3332 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3333 assert!(result.is_err());
3334 assert!(
3335 result
3336 .unwrap_err()
3337 .to_string()
3338 .contains("invalid VECTOR INDEX option")
3339 );
3340 }
3341
3342 #[test]
3343 fn test_parse_column_extensions_vector_index() {
3344 {
3346 let sql = "VECTOR INDEX WITH (metric = 'l2sq')";
3347 let dialect = GenericDialect {};
3348 let mut tokenizer = Tokenizer::new(&dialect, sql);
3349 let tokens = tokenizer.tokenize().unwrap();
3350 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3351 let name = Ident::new("vec_col");
3352 let data_type =
3353 DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
3354 let mut extensions = ColumnExtensions {
3356 vector_options: Some(OptionMap::from([(
3357 VECTOR_OPT_DIM.to_string(),
3358 "128".to_string(),
3359 )])),
3360 ..Default::default()
3361 };
3362
3363 let result = ParserContext::parse_column_extensions(
3364 &mut parser,
3365 &name,
3366 &data_type,
3367 &mut extensions,
3368 );
3369 assert!(result.is_ok());
3370 assert!(extensions.vector_index_options.is_some());
3371 let vi_options = extensions.vector_index_options.unwrap();
3372 assert_eq!(vi_options.get("metric"), Some("l2sq"));
3373 }
3374
3375 {
3377 let sql = "VECTOR INDEX";
3378 let dialect = GenericDialect {};
3379 let mut tokenizer = Tokenizer::new(&dialect, sql);
3380 let tokens = tokenizer.tokenize().unwrap();
3381 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3382 let name = Ident::new("num_col");
3383 let data_type = DataType::Int(None); let mut extensions = ColumnExtensions::default();
3385 let result = ParserContext::parse_column_extensions(
3386 &mut parser,
3387 &name,
3388 &data_type,
3389 &mut extensions,
3390 );
3391 assert!(result.is_err());
3392 assert!(
3393 result
3394 .unwrap_err()
3395 .to_string()
3396 .contains("VECTOR INDEX only supports Vector type columns")
3397 );
3398 }
3399 }
3400}