1mod json;
16#[cfg(feature = "enterprise")]
17pub mod trigger;
18
19use std::collections::HashMap;
20
21use arrow_buffer::IntervalMonthDayNano;
22use common_catalog::consts::default_engine;
23use datafusion_common::ScalarValue;
24use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
25use datatypes::data_type::ConcreteDataType;
26use itertools::Itertools;
27use snafu::{OptionExt, ResultExt, ensure};
28use sqlparser::ast::{
29 ColumnOption, ColumnOptionDef, DataType, Expr, KeyOrIndexDisplay, NullsDistinctOption,
30 PrimaryKeyConstraint, UniqueConstraint,
31};
32use sqlparser::dialect::keywords::Keyword;
33use sqlparser::keywords::ALL_KEYWORDS;
34use sqlparser::parser::IsOptional::Mandatory;
35use sqlparser::parser::{Parser, ParserError};
36use sqlparser::tokenizer::{Token, TokenWithSpan, Word};
37use table::requests::validate_database_option;
38
39use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
40use crate::error::{
41 self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidFlowQuerySnafu,
42 InvalidIntervalSnafu, InvalidSqlSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result,
43 SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu,
44};
45use crate::parser::{FLOW, ParserContext};
46use crate::parsers::tql_parser;
47use crate::parsers::utils::{
48 self, parse_with_options, validate_column_fulltext_create_option,
49 validate_column_skipping_index_create_option, validate_column_vector_index_create_option,
50};
51use crate::statements::create::{
52 Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
53 CreateTableLike, CreateView, Partitions, SqlOrTql, TableConstraint, VECTOR_OPT_DIM,
54};
55use crate::statements::statement::Statement;
56use crate::statements::transform::type_alias::get_data_type_by_alias_name;
57use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
58use crate::util::{OptionValue, location_to_index, parse_option_string};
59
60pub const ENGINE: &str = "ENGINE";
61pub const MAXVALUE: &str = "MAXVALUE";
62pub const SINK: &str = "SINK";
63pub const EXPIRE: &str = "EXPIRE";
64pub const AFTER: &str = "AFTER";
65pub const INVERTED: &str = "INVERTED";
66pub const SKIPPING: &str = "SKIPPING";
67pub const VECTOR: &str = "VECTOR";
68
69pub type RawIntervalExpr = String;
70
71fn flow_option_map(options: HashMap<String, OptionValue>) -> OptionMap {
75 let mut flow_options = OptionMap::default();
76 for (key, value) in options {
77 flow_options.insert_options(&key, value);
78 }
79 flow_options
80}
81
82impl<'a> ParserContext<'a> {
84 pub(crate) fn parse_create(&mut self) -> Result<Statement> {
85 match self.parser.peek_token().token {
86 Token::Word(w) => match w.keyword {
87 Keyword::TABLE => self.parse_create_table(),
88
89 Keyword::SCHEMA | Keyword::DATABASE => self.parse_create_database(),
90
91 Keyword::EXTERNAL => self.parse_create_external_table(),
92
93 Keyword::OR => {
94 let _ = self.parser.next_token();
95 self.parser
96 .expect_keyword(Keyword::REPLACE)
97 .context(SyntaxSnafu)?;
98 match self.parser.next_token().token {
99 Token::Word(w) => match w.keyword {
100 Keyword::VIEW => self.parse_create_view(true),
101 Keyword::NoKeyword => {
102 let uppercase = w.value.to_uppercase();
103 match uppercase.as_str() {
104 FLOW => self.parse_create_flow(true),
105 _ => self.unsupported(w.to_string()),
106 }
107 }
108 _ => self.unsupported(w.to_string()),
109 },
110 _ => self.unsupported(w.to_string()),
111 }
112 }
113
114 Keyword::VIEW => {
115 let _ = self.parser.next_token();
116 self.parse_create_view(false)
117 }
118
119 #[cfg(feature = "enterprise")]
120 Keyword::TRIGGER => {
121 let _ = self.parser.next_token();
122 self.parse_create_trigger()
123 }
124
125 Keyword::NoKeyword => {
126 let _ = self.parser.next_token();
127 let uppercase = w.value.to_uppercase();
128 match uppercase.as_str() {
129 FLOW => self.parse_create_flow(false),
130 _ => self.unsupported(w.to_string()),
131 }
132 }
133 _ => self.unsupported(w.to_string()),
134 },
135 unexpected => self.unsupported(unexpected.to_string()),
136 }
137 }
138
139 fn parse_create_view(&mut self, or_replace: bool) -> Result<Statement> {
141 let if_not_exists = self.parse_if_not_exist()?;
142 let view_name = self.intern_parse_table_name()?;
143
144 let columns = self.parse_view_columns()?;
145
146 self.parser
147 .expect_keyword(Keyword::AS)
148 .context(SyntaxSnafu)?;
149
150 let query = self.parse_query()?;
151
152 Ok(Statement::CreateView(CreateView {
153 name: view_name,
154 columns,
155 or_replace,
156 query: Box::new(query),
157 if_not_exists,
158 }))
159 }
160
161 fn parse_view_columns(&mut self) -> Result<Vec<Ident>> {
162 let mut columns = vec![];
163 if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
164 return Ok(columns);
165 }
166
167 loop {
168 let name = self.parse_column_name().context(SyntaxSnafu)?;
169
170 columns.push(name);
171
172 let comma = self.parser.consume_token(&Token::Comma);
173 if self.parser.consume_token(&Token::RParen) {
174 break;
176 } else if !comma {
177 return self.expected("',' or ')' after column name", self.parser.peek_token());
178 }
179 }
180
181 Ok(columns)
182 }
183
184 fn parse_create_external_table(&mut self) -> Result<Statement> {
185 let _ = self.parser.next_token();
186 self.parser
187 .expect_keyword(Keyword::TABLE)
188 .context(SyntaxSnafu)?;
189 let if_not_exists = self.parse_if_not_exist()?;
190 let table_name = self.intern_parse_table_name()?;
191 let (columns, constraints) = self.parse_columns()?;
192 if !columns.is_empty() {
193 validate_time_index(&columns, &constraints)?;
194 }
195
196 let engine = self.parse_table_engine(common_catalog::consts::FILE_ENGINE)?;
197 let options = self.parse_create_table_options()?;
198 Ok(Statement::CreateExternalTable(CreateExternalTable {
199 name: table_name,
200 columns,
201 constraints,
202 options,
203 if_not_exists,
204 engine,
205 }))
206 }
207
208 fn parse_create_database(&mut self) -> Result<Statement> {
209 let _ = self.parser.next_token();
210 let if_not_exists = self.parse_if_not_exist()?;
211 let database_name = self.parse_object_name().context(error::UnexpectedSnafu {
212 expected: "a database name",
213 actual: self.peek_token_as_string(),
214 })?;
215 let database_name = Self::canonicalize_object_name(database_name)?;
216
217 let options = self
218 .parser
219 .parse_options(Keyword::WITH)
220 .context(SyntaxSnafu)?
221 .into_iter()
222 .map(parse_option_string)
223 .collect::<Result<HashMap<String, OptionValue>>>()?;
224
225 for key in options.keys() {
226 ensure!(
227 validate_database_option(key),
228 InvalidDatabaseOptionSnafu { key: key.clone() }
229 );
230 }
231 if let Some(append_mode) = options.get("append_mode").and_then(|x| x.as_string())
232 && append_mode == "true"
233 && options.contains_key("merge_mode")
234 {
235 return InvalidDatabaseOptionSnafu {
236 key: "merge_mode".to_string(),
237 }
238 .fail();
239 }
240
241 Ok(Statement::CreateDatabase(CreateDatabase {
242 name: database_name,
243 if_not_exists,
244 options: OptionMap::new(options),
245 }))
246 }
247
248 fn parse_create_table(&mut self) -> Result<Statement> {
249 let _ = self.parser.next_token();
250
251 let if_not_exists = self.parse_if_not_exist()?;
252
253 let table_name = self.intern_parse_table_name()?;
254
255 if self.parser.parse_keyword(Keyword::LIKE) {
256 let source_name = self.intern_parse_table_name()?;
257
258 return Ok(Statement::CreateTableLike(CreateTableLike {
259 table_name,
260 source_name,
261 }));
262 }
263
264 let (columns, constraints) = self.parse_columns()?;
265 validate_time_index(&columns, &constraints)?;
266
267 let partitions = self.parse_partitions()?;
268 if let Some(partitions) = &partitions {
269 validate_partitions(&columns, partitions)?;
270 }
271
272 let engine = self.parse_table_engine(default_engine())?;
273 let options = self.parse_create_table_options()?;
274 let create_table = CreateTable {
275 if_not_exists,
276 name: table_name,
277 columns,
278 engine,
279 constraints,
280 options,
281 table_id: 0, partitions,
283 };
284
285 Ok(Statement::CreateTable(create_table))
286 }
287
288 fn parse_create_flow(&mut self, or_replace: bool) -> Result<Statement> {
290 let if_not_exists = self.parse_if_not_exist()?;
291
292 let flow_name = self.intern_parse_table_name()?;
293
294 if let Token::Word(word) = self.parser.peek_token().token
296 && word.value.eq_ignore_ascii_case(SINK)
297 {
298 self.parser.next_token();
299 } else {
300 Err(ParserError::ParserError(
301 "Expect `SINK` keyword".to_string(),
302 ))
303 .context(SyntaxSnafu)?
304 }
305 self.parser
306 .expect_keyword(Keyword::TO)
307 .context(SyntaxSnafu)?;
308
309 let output_table_name = self.intern_parse_table_name()?;
310
311 let expire_after = if let Token::Word(w1) = &self.parser.peek_token().token
312 && w1.value.eq_ignore_ascii_case(EXPIRE)
313 {
314 self.parser.next_token();
315 if let Token::Word(w2) = &self.parser.peek_token().token
316 && w2.value.eq_ignore_ascii_case(AFTER)
317 {
318 self.parser.next_token();
319 Some(self.parse_interval_no_month("EXPIRE AFTER")?)
320 } else {
321 None
322 }
323 } else {
324 None
325 };
326
327 let eval_interval = if self
328 .parser
329 .consume_tokens(&[Token::make_keyword("EVAL"), Token::make_keyword("INTERVAL")])
330 {
331 Some(self.parse_interval_no_month("EVAL INTERVAL")?)
332 } else {
333 None
334 };
335
336 let comment = if self.parser.parse_keyword(Keyword::COMMENT) {
337 match self.parser.next_token() {
338 TokenWithSpan {
339 token: Token::SingleQuotedString(value, ..),
340 ..
341 } => Some(value),
342 unexpected => {
343 return self
344 .parser
345 .expected("string", unexpected)
346 .context(SyntaxSnafu);
347 }
348 }
349 } else {
350 None
351 };
352
353 let flow_options = self
354 .parser
355 .parse_options(Keyword::WITH)
356 .context(SyntaxSnafu)?
357 .into_iter()
358 .map(parse_option_string)
359 .collect::<Result<HashMap<String, OptionValue>>>()?;
360
361 self.parser
362 .expect_keyword(Keyword::AS)
363 .context(SyntaxSnafu)?;
364
365 let query = Box::new(self.parse_flow_sql_or_tql(true)?);
366
367 Ok(Statement::CreateFlow(CreateFlow {
368 flow_name,
369 sink_table_name: output_table_name,
370 or_replace,
371 if_not_exists,
372 expire_after,
373 eval_interval,
374 comment,
375 flow_options: flow_option_map(flow_options),
376 query,
377 }))
378 }
379
380 fn parse_flow_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
381 let start_loc = self.parser.peek_token().span.start;
382 let start_index = location_to_index(self.sql, &start_loc);
383
384 let starts_with_with = matches!(
385 self.parser.peek_token().token,
386 Token::Word(w) if w.keyword == Keyword::WITH
387 );
388
389 let query = match self.parser.peek_token().token {
391 Token::Word(w) => match w.keyword {
392 Keyword::SELECT => self.parse_query(),
393 Keyword::WITH => self.parse_with_tql_with_now(require_now_expr),
394 Keyword::NoKeyword
395 if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
396 {
397 self.parse_tql(require_now_expr)
398 }
399
400 _ => self.unsupported(self.peek_token_as_string()),
401 },
402 _ => self.unsupported(self.peek_token_as_string()),
403 }?;
404
405 if starts_with_with {
406 let Statement::Query(query) = &query else {
407 return InvalidFlowQuerySnafu {
408 reason: "Expect a query after WITH".to_string(),
409 }
410 .fail();
411 };
412
413 if !utils::is_simple_tql_cte_query(query) {
414 return InvalidFlowQuerySnafu {
415 reason: "WITH is only supported for the simplest TQL CTE in CREATE FLOW"
416 .to_string(),
417 }
418 .fail();
419 }
420 }
421
422 let end_token = self.parser.peek_token();
423
424 let raw_query = if end_token == Token::EOF {
425 &self.sql[start_index..]
426 } else {
427 let end_loc = end_token.span.end;
428 let end_index = location_to_index(self.sql, &end_loc);
429 &self.sql[start_index..end_index.min(self.sql.len())]
430 };
431 let raw_query = raw_query.trim_end_matches(";");
432
433 let query = SqlOrTql::try_from_statement(query, raw_query)?;
434 Ok(query)
435 }
436
437 fn parse_interval_no_month(&mut self, context: &str) -> Result<i64> {
439 let interval = self.parse_interval_month_day_nano()?.0;
440 if interval.months != 0 {
441 return InvalidIntervalSnafu {
442 reason: format!("Interval with months is not allowed in {context}"),
443 }
444 .fail();
445 }
446 Ok(
447 interval.nanoseconds / 1_000_000_000
448 + interval.days as i64 * 60 * 60 * 24
449 + interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, )
453 }
454
455 fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> {
457 let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
458 let raw_interval_expr = interval_expr.to_string();
459 let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone(), false)?
460 .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
461 .ok()
462 .with_context(|| InvalidIntervalSnafu {
463 reason: format!("cannot cast {} to interval type", interval_expr),
464 })?;
465 if let ScalarValue::IntervalMonthDayNano(Some(interval)) = interval {
466 Ok((interval, raw_interval_expr))
467 } else {
468 unreachable!()
469 }
470 }
471
472 fn parse_if_not_exist(&mut self) -> Result<bool> {
473 match self.parser.peek_token().token {
474 Token::Word(w) if Keyword::IF != w.keyword => return Ok(false),
475 _ => {}
476 }
477
478 if self.parser.parse_keywords(&[Keyword::IF, Keyword::NOT]) {
479 return self
480 .parser
481 .expect_keyword(Keyword::EXISTS)
482 .map(|_| true)
483 .context(UnexpectedSnafu {
484 expected: "EXISTS",
485 actual: self.peek_token_as_string(),
486 });
487 }
488
489 if self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]) {
490 return UnsupportedSnafu { keyword: "EXISTS" }.fail();
491 }
492
493 Ok(false)
494 }
495
496 fn parse_create_table_options(&mut self) -> Result<OptionMap> {
497 parse_with_options(&mut self.parser)
498 }
499
500 fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
502 if !self.parser.parse_keyword(Keyword::PARTITION) {
503 return Ok(None);
504 }
505
506 self.parse_partition_on_columns().map(Some)
507 }
508
509 pub(crate) fn parse_partition_on_columns(&mut self) -> Result<Partitions> {
511 self.parser
512 .expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
513 .context(error::UnexpectedSnafu {
514 expected: "ON, COLUMNS",
515 actual: self.peek_token_as_string(),
516 })?;
517
518 let raw_column_list = self
519 .parser
520 .parse_parenthesized_column_list(Mandatory, false)
521 .context(error::SyntaxSnafu)?;
522 let column_list = raw_column_list
523 .into_iter()
524 .map(Self::canonicalize_identifier)
525 .collect();
526
527 let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
528
529 Ok(Partitions { column_list, exprs })
530 }
531
532 fn parse_partition_entry(&mut self) -> Result<Expr> {
533 self.parser.parse_expr().context(error::SyntaxSnafu)
534 }
535
536 fn parse_comma_separated<T, F>(&mut self, mut f: F) -> Result<Vec<T>>
538 where
539 F: FnMut(&mut ParserContext<'a>) -> Result<T>,
540 {
541 self.parser
542 .expect_token(&Token::LParen)
543 .context(error::UnexpectedSnafu {
544 expected: "(",
545 actual: self.peek_token_as_string(),
546 })?;
547
548 let mut values = vec![];
549 while self.parser.peek_token() != Token::RParen {
550 values.push(f(self)?);
551 if !self.parser.consume_token(&Token::Comma) {
552 break;
553 }
554 }
555
556 self.parser
557 .expect_token(&Token::RParen)
558 .context(error::UnexpectedSnafu {
559 expected: ")",
560 actual: self.peek_token_as_string(),
561 })?;
562
563 Ok(values)
564 }
565
566 fn parse_columns(&mut self) -> Result<(Vec<Column>, Vec<TableConstraint>)> {
568 let mut columns = vec![];
569 let mut constraints = vec![];
570 if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
571 return Ok((columns, constraints));
572 }
573
574 loop {
575 if let Some(constraint) = self.parse_optional_table_constraint()? {
576 constraints.push(constraint);
577 } else if let Token::Word(_) = self.parser.peek_token().token {
578 self.parse_column(&mut columns, &mut constraints)?;
579 } else {
580 return self.expected(
581 "column name or constraint definition",
582 self.parser.peek_token(),
583 );
584 }
585 let comma = self.parser.consume_token(&Token::Comma);
586 if self.parser.consume_token(&Token::RParen) {
587 break;
589 } else if !comma {
590 return self.expected(
591 "',' or ')' after column definition",
592 self.parser.peek_token(),
593 );
594 }
595 }
596
597 Ok((columns, constraints))
598 }
599
600 fn parse_column(
601 &mut self,
602 columns: &mut Vec<Column>,
603 constraints: &mut Vec<TableConstraint>,
604 ) -> Result<()> {
605 let mut column = self.parse_column_def()?;
606
607 let mut time_index_opt_idx = None;
608 for (index, opt) in column.options().iter().enumerate() {
609 if let ColumnOption::DialectSpecific(tokens) = &opt.option
610 && matches!(
611 &tokens[..],
612 [
613 Token::Word(Word {
614 keyword: Keyword::TIME,
615 ..
616 }),
617 Token::Word(Word {
618 keyword: Keyword::INDEX,
619 ..
620 })
621 ]
622 )
623 {
624 ensure!(
625 time_index_opt_idx.is_none(),
626 InvalidColumnOptionSnafu {
627 name: column.name().to_string(),
628 msg: "duplicated time index",
629 }
630 );
631 time_index_opt_idx = Some(index);
632
633 let constraint = TableConstraint::TimeIndex {
634 column: Ident::new(column.name().value.clone()),
635 };
636 constraints.push(constraint);
637 }
638 }
639
640 if let Some(index) = time_index_opt_idx {
641 ensure!(
642 !column.options().contains(&ColumnOptionDef {
643 option: ColumnOption::Null,
644 name: None,
645 }),
646 InvalidColumnOptionSnafu {
647 name: column.name().to_string(),
648 msg: "time index column can't be null",
649 }
650 );
651
652 let data_type = get_unalias_type(column.data_type());
654 ensure!(
655 matches!(data_type, DataType::Timestamp(_, _)),
656 InvalidColumnOptionSnafu {
657 name: column.name().to_string(),
658 msg: "time index column data type should be timestamp",
659 }
660 );
661
662 let not_null_opt = ColumnOptionDef {
663 option: ColumnOption::NotNull,
664 name: None,
665 };
666
667 if !column.options().contains(¬_null_opt) {
668 column.mut_options().push(not_null_opt);
669 }
670
671 let _ = column.mut_options().remove(index);
672 }
673
674 columns.push(column);
675
676 Ok(())
677 }
678
679 fn parse_column_name(&mut self) -> std::result::Result<Ident, ParserError> {
681 let name = self.parser.parse_identifier()?;
682 if name.quote_style.is_none() &&
683 ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()
685 {
686 return Err(ParserError::ParserError(format!(
687 "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
688 &name.value
689 )));
690 }
691
692 Ok(name)
693 }
694
695 pub fn parse_column_def(&mut self) -> Result<Column> {
696 let name = self.parse_column_name().context(SyntaxSnafu)?;
697 let parser = &mut self.parser;
698
699 ensure!(
700 !(name.quote_style.is_none() &&
701 ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()),
703 InvalidSqlSnafu {
704 msg: format!(
705 "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
706 &name.value
707 ),
708 }
709 );
710
711 let mut extensions = ColumnExtensions::default();
712
713 let data_type = parser.parse_data_type().context(SyntaxSnafu)?;
714 if matches!(data_type, DataType::JSON) {
717 extensions.json_datatype_options = json::parse_json_datatype_options(parser)?;
718 }
719
720 let mut options = vec![];
721 loop {
722 if parser.parse_keyword(Keyword::CONSTRAINT) {
723 let name = Some(parser.parse_identifier().context(SyntaxSnafu)?);
724 if let Some(option) = Self::parse_optional_column_option(parser)? {
725 options.push(ColumnOptionDef { name, option });
726 } else {
727 return parser
728 .expected(
729 "constraint details after CONSTRAINT <name>",
730 parser.peek_token(),
731 )
732 .context(SyntaxSnafu);
733 }
734 } else if let Some(option) = Self::parse_optional_column_option(parser)? {
735 options.push(ColumnOptionDef { name: None, option });
736 } else if !Self::parse_column_extensions(parser, &name, &data_type, &mut extensions)? {
737 break;
738 };
739 }
740
741 Ok(Column {
742 column_def: ColumnDef {
743 name: Self::canonicalize_identifier(name),
744 data_type,
745 options,
746 },
747 extensions,
748 })
749 }
750
751 fn parse_optional_column_option(parser: &mut Parser<'_>) -> Result<Option<ColumnOption>> {
752 if parser.parse_keywords(&[Keyword::CHARACTER, Keyword::SET]) {
753 Ok(Some(ColumnOption::CharacterSet(
754 parser.parse_object_name(false).context(SyntaxSnafu)?,
755 )))
756 } else if parser.parse_keywords(&[Keyword::NOT, Keyword::NULL]) {
757 Ok(Some(ColumnOption::NotNull))
758 } else if parser.parse_keywords(&[Keyword::COMMENT]) {
759 match parser.next_token() {
760 TokenWithSpan {
761 token: Token::SingleQuotedString(value, ..),
762 ..
763 } => Ok(Some(ColumnOption::Comment(value))),
764 unexpected => parser.expected("string", unexpected).context(SyntaxSnafu),
765 }
766 } else if parser.parse_keyword(Keyword::NULL) {
767 Ok(Some(ColumnOption::Null))
768 } else if parser.parse_keyword(Keyword::DEFAULT) {
769 Ok(Some(ColumnOption::Default(
770 parser.parse_expr().context(SyntaxSnafu)?,
771 )))
772 } else if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
773 Ok(Some(ColumnOption::PrimaryKey(PrimaryKeyConstraint {
774 name: None,
775 index_name: None,
776 index_type: None,
777 columns: vec![],
778 index_options: vec![],
779 characteristics: None,
780 })))
781 } else if parser.parse_keyword(Keyword::UNIQUE) {
782 Ok(Some(ColumnOption::Unique(UniqueConstraint {
783 name: None,
784 index_name: None,
785 index_type_display: KeyOrIndexDisplay::None,
786 index_type: None,
787 columns: vec![],
788 index_options: vec![],
789 characteristics: None,
790 nulls_distinct: NullsDistinctOption::None,
791 })))
792 } else if parser.parse_keywords(&[Keyword::TIME, Keyword::INDEX]) {
793 Ok(Some(ColumnOption::DialectSpecific(vec![
795 Token::Word(Word {
796 value: "TIME".to_string(),
797 quote_style: None,
798 keyword: Keyword::TIME,
799 }),
800 Token::Word(Word {
801 value: "INDEX".to_string(),
802 quote_style: None,
803 keyword: Keyword::INDEX,
804 }),
805 ])))
806 } else {
807 Ok(None)
808 }
809 }
810
811 fn parse_column_extensions(
817 parser: &mut Parser<'_>,
818 column_name: &Ident,
819 column_type: &DataType,
820 column_extensions: &mut ColumnExtensions,
821 ) -> Result<bool> {
822 if let DataType::Custom(name, tokens) = column_type
823 && name.0.len() == 1
824 && &name.0[0].to_string_unquoted().to_uppercase() == "VECTOR"
825 {
826 ensure!(
827 tokens.len() == 1,
828 InvalidColumnOptionSnafu {
829 name: column_name.to_string(),
830 msg: "VECTOR type should have dimension",
831 }
832 );
833
834 let dimension =
835 tokens[0]
836 .parse::<u32>()
837 .ok()
838 .with_context(|| InvalidColumnOptionSnafu {
839 name: column_name.to_string(),
840 msg: "dimension should be a positive integer",
841 })?;
842
843 let options = OptionMap::from([(VECTOR_OPT_DIM.to_string(), dimension.to_string())]);
844 column_extensions.vector_options = Some(options);
845 }
846
847 let mut is_index_declared = false;
849
850 if let Token::Word(word) = parser.peek_token().token
852 && word.value.eq_ignore_ascii_case(SKIPPING)
853 {
854 parser.next_token();
855 ensure!(
857 parser.parse_keyword(Keyword::INDEX),
858 InvalidColumnOptionSnafu {
859 name: column_name.to_string(),
860 msg: "expect INDEX after SKIPPING keyword",
861 }
862 );
863 ensure!(
864 column_extensions.skipping_index_options.is_none(),
865 InvalidColumnOptionSnafu {
866 name: column_name.to_string(),
867 msg: "duplicated SKIPPING index option",
868 }
869 );
870
871 let options = parser
872 .parse_options(Keyword::WITH)
873 .context(error::SyntaxSnafu)?
874 .into_iter()
875 .map(parse_option_string)
876 .collect::<Result<Vec<_>>>()?;
877
878 for (key, _) in options.iter() {
879 ensure!(
880 validate_column_skipping_index_create_option(key),
881 InvalidColumnOptionSnafu {
882 name: column_name.to_string(),
883 msg: format!("invalid SKIPPING INDEX option: {key}"),
884 }
885 );
886 }
887
888 let options = OptionMap::new(options);
889 column_extensions.skipping_index_options = Some(options);
890 is_index_declared |= true;
891 }
892
893 if parser.parse_keyword(Keyword::FULLTEXT) {
895 ensure!(
897 parser.parse_keyword(Keyword::INDEX),
898 InvalidColumnOptionSnafu {
899 name: column_name.to_string(),
900 msg: "expect INDEX after FULLTEXT keyword",
901 }
902 );
903
904 ensure!(
905 column_extensions.fulltext_index_options.is_none(),
906 InvalidColumnOptionSnafu {
907 name: column_name.to_string(),
908 msg: "duplicated FULLTEXT INDEX option",
909 }
910 );
911
912 let column_type = get_unalias_type(column_type);
913 let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
914 ensure!(
915 data_type == ConcreteDataType::string_datatype(),
916 InvalidColumnOptionSnafu {
917 name: column_name.to_string(),
918 msg: "FULLTEXT index only supports string type",
919 }
920 );
921
922 let options = parser
923 .parse_options(Keyword::WITH)
924 .context(error::SyntaxSnafu)?
925 .into_iter()
926 .map(parse_option_string)
927 .collect::<Result<Vec<_>>>()?;
928
929 for (key, _) in options.iter() {
930 ensure!(
931 validate_column_fulltext_create_option(key),
932 InvalidColumnOptionSnafu {
933 name: column_name.to_string(),
934 msg: format!("invalid FULLTEXT INDEX option: {key}"),
935 }
936 );
937 }
938
939 let options = OptionMap::new(options);
940 column_extensions.fulltext_index_options = Some(options);
941 is_index_declared |= true;
942 }
943
944 if let Token::Word(word) = parser.peek_token().token
946 && word.value.eq_ignore_ascii_case(INVERTED)
947 {
948 parser.next_token();
949 ensure!(
951 parser.parse_keyword(Keyword::INDEX),
952 InvalidColumnOptionSnafu {
953 name: column_name.to_string(),
954 msg: "expect INDEX after INVERTED keyword",
955 }
956 );
957
958 ensure!(
959 column_extensions.inverted_index_options.is_none(),
960 InvalidColumnOptionSnafu {
961 name: column_name.to_string(),
962 msg: "duplicated INVERTED index option",
963 }
964 );
965
966 let with_token = parser.peek_token();
969 ensure!(
970 with_token.token
971 != Token::Word(Word {
972 value: "WITH".to_string(),
973 keyword: Keyword::WITH,
974 quote_style: None,
975 }),
976 InvalidColumnOptionSnafu {
977 name: column_name.to_string(),
978 msg: "INVERTED index doesn't support options",
979 }
980 );
981
982 column_extensions.inverted_index_options = Some(OptionMap::default());
983 is_index_declared |= true;
984 }
985
986 if let Token::Word(word) = parser.peek_token().token
988 && word.value.eq_ignore_ascii_case(VECTOR)
989 {
990 parser.next_token();
991 ensure!(
993 parser.parse_keyword(Keyword::INDEX),
994 InvalidColumnOptionSnafu {
995 name: column_name.to_string(),
996 msg: "expect INDEX after VECTOR keyword",
997 }
998 );
999
1000 ensure!(
1001 column_extensions.vector_index_options.is_none(),
1002 InvalidColumnOptionSnafu {
1003 name: column_name.to_string(),
1004 msg: "duplicated VECTOR INDEX option",
1005 }
1006 );
1007
1008 let column_type = get_unalias_type(column_type);
1010 let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
1011 ensure!(
1012 matches!(data_type, ConcreteDataType::Vector(_)),
1013 InvalidColumnOptionSnafu {
1014 name: column_name.to_string(),
1015 msg: "VECTOR INDEX only supports Vector type columns",
1016 }
1017 );
1018
1019 let options = parser
1020 .parse_options(Keyword::WITH)
1021 .context(error::SyntaxSnafu)?
1022 .into_iter()
1023 .map(parse_option_string)
1024 .collect::<Result<Vec<_>>>()?;
1025
1026 for (key, _) in options.iter() {
1027 ensure!(
1028 validate_column_vector_index_create_option(key),
1029 InvalidColumnOptionSnafu {
1030 name: column_name.to_string(),
1031 msg: format!("invalid VECTOR INDEX option: {key}"),
1032 }
1033 );
1034 }
1035
1036 let options = OptionMap::new(options);
1037 column_extensions.vector_index_options = Some(options);
1038 is_index_declared |= true;
1039 }
1040
1041 Ok(is_index_declared)
1042 }
1043
1044 fn parse_optional_table_constraint(&mut self) -> Result<Option<TableConstraint>> {
1045 match self.parser.next_token() {
1046 TokenWithSpan {
1047 token: Token::Word(w),
1048 ..
1049 } if w.keyword == Keyword::PRIMARY => {
1050 self.parser
1051 .expect_keyword(Keyword::KEY)
1052 .context(error::UnexpectedSnafu {
1053 expected: "KEY",
1054 actual: self.peek_token_as_string(),
1055 })?;
1056 let raw_columns = self
1057 .parser
1058 .parse_parenthesized_column_list(Mandatory, false)
1059 .context(error::SyntaxSnafu)?;
1060 let columns = raw_columns
1061 .into_iter()
1062 .map(Self::canonicalize_identifier)
1063 .collect();
1064 Ok(Some(TableConstraint::PrimaryKey { columns }))
1065 }
1066 TokenWithSpan {
1067 token: Token::Word(w),
1068 ..
1069 } if w.keyword == Keyword::TIME => {
1070 self.parser
1071 .expect_keyword(Keyword::INDEX)
1072 .context(error::UnexpectedSnafu {
1073 expected: "INDEX",
1074 actual: self.peek_token_as_string(),
1075 })?;
1076
1077 let raw_columns = self
1078 .parser
1079 .parse_parenthesized_column_list(Mandatory, false)
1080 .context(error::SyntaxSnafu)?;
1081 let mut columns = raw_columns
1082 .into_iter()
1083 .map(Self::canonicalize_identifier)
1084 .collect::<Vec<_>>();
1085
1086 ensure!(
1087 columns.len() == 1,
1088 InvalidTimeIndexSnafu {
1089 msg: "it should contain only one column in time index",
1090 }
1091 );
1092
1093 Ok(Some(TableConstraint::TimeIndex {
1094 column: columns.pop().unwrap(),
1095 }))
1096 }
1097 _ => {
1098 self.parser.prev_token();
1099 Ok(None)
1100 }
1101 }
1102 }
1103
1104 fn parse_table_engine(&mut self, default: &str) -> Result<String> {
1106 if !self.consume_token(ENGINE) {
1107 return Ok(default.to_string());
1108 }
1109
1110 self.parser
1111 .expect_token(&Token::Eq)
1112 .context(error::UnexpectedSnafu {
1113 expected: "=",
1114 actual: self.peek_token_as_string(),
1115 })?;
1116
1117 let token = self.parser.next_token();
1118 if let Token::Word(w) = token.token {
1119 Ok(w.value)
1120 } else {
1121 self.expected("'Engine' is missing", token)
1122 }
1123 }
1124}
1125
1126fn validate_time_index(columns: &[Column], constraints: &[TableConstraint]) -> Result<()> {
1127 let time_index_constraints: Vec<_> = constraints
1128 .iter()
1129 .filter_map(|c| match c {
1130 TableConstraint::TimeIndex { column } => Some(column),
1131 _ => None,
1132 })
1133 .unique()
1134 .collect();
1135
1136 ensure!(!time_index_constraints.is_empty(), MissingTimeIndexSnafu);
1137 ensure!(
1138 time_index_constraints.len() == 1,
1139 InvalidTimeIndexSnafu {
1140 msg: format!(
1141 "expected only one time index constraint but actual {}",
1142 time_index_constraints.len()
1143 ),
1144 }
1145 );
1146
1147 let time_index_column_ident = &time_index_constraints[0];
1150 let time_index_column = columns
1151 .iter()
1152 .find(|c| c.name().value == *time_index_column_ident.value)
1153 .with_context(|| InvalidTimeIndexSnafu {
1154 msg: format!(
1155 "time index column {} not found in columns",
1156 time_index_column_ident
1157 ),
1158 })?;
1159
1160 let time_index_data_type = get_unalias_type(time_index_column.data_type());
1161 ensure!(
1162 matches!(time_index_data_type, DataType::Timestamp(_, _)),
1163 InvalidColumnOptionSnafu {
1164 name: time_index_column.name().to_string(),
1165 msg: "time index column data type should be timestamp",
1166 }
1167 );
1168
1169 Ok(())
1170}
1171
1172fn get_unalias_type(data_type: &DataType) -> DataType {
1173 match data_type {
1174 DataType::Custom(name, tokens) if name.0.len() == 1 && tokens.is_empty() => {
1175 if let Some(real_type) =
1176 get_data_type_by_alias_name(name.0[0].to_string_unquoted().as_str())
1177 {
1178 real_type
1179 } else {
1180 data_type.clone()
1181 }
1182 }
1183 _ => data_type.clone(),
1184 }
1185}
1186
1187fn validate_partitions(columns: &[Column], partitions: &Partitions) -> Result<()> {
1188 let partition_columns = ensure_partition_columns_defined(columns, partitions)?;
1189
1190 ensure_exprs_are_binary(&partitions.exprs, &partition_columns)?;
1191
1192 Ok(())
1193}
1194
1195fn ensure_exprs_are_binary(exprs: &[Expr], columns: &[&Column]) -> Result<()> {
1197 for expr in exprs {
1198 if let Expr::BinaryOp { left, op: _, right } = expr {
1200 ensure_one_expr(left, columns)?;
1201 ensure_one_expr(right, columns)?;
1202 } else {
1203 return error::InvalidSqlSnafu {
1204 msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1205 }
1206 .fail();
1207 }
1208 }
1209 Ok(())
1210}
1211
1212fn ensure_one_expr(expr: &Expr, columns: &[&Column]) -> Result<()> {
1216 match expr {
1217 Expr::BinaryOp { left, op: _, right } => {
1218 ensure_one_expr(left, columns)?;
1219 ensure_one_expr(right, columns)?;
1220 Ok(())
1221 }
1222 Expr::Identifier(ident) => {
1223 let column_name = &ident.value;
1224 ensure!(
1225 columns.iter().any(|c| &c.name().value == column_name),
1226 error::InvalidSqlSnafu {
1227 msg: format!(
1228 "Column {:?} in rule expr is not referenced in PARTITION ON",
1229 column_name
1230 ),
1231 }
1232 );
1233 Ok(())
1234 }
1235 Expr::Value(_) => Ok(()),
1236 Expr::UnaryOp { expr, .. } => {
1237 ensure_one_expr(expr, columns)?;
1238 Ok(())
1239 }
1240 _ => error::InvalidSqlSnafu {
1241 msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1242 }
1243 .fail(),
1244 }
1245}
1246
1247fn ensure_partition_columns_defined<'a>(
1249 columns: &'a [Column],
1250 partitions: &'a Partitions,
1251) -> Result<Vec<&'a Column>> {
1252 partitions
1253 .column_list
1254 .iter()
1255 .map(|x| {
1256 let x = ParserContext::canonicalize_identifier(x.clone());
1257 columns
1260 .iter()
1261 .find(|c| *c.name().value == x.value)
1262 .context(error::InvalidSqlSnafu {
1263 msg: format!("Partition column {:?} not defined", x.value),
1264 })
1265 })
1266 .collect::<Result<Vec<&Column>>>()
1267}
1268
1269#[cfg(test)]
1270mod tests {
1271 use std::assert_matches;
1272 use std::collections::HashMap;
1273
1274 use common_catalog::consts::FILE_ENGINE;
1275 use common_error::ext::ErrorExt;
1276 use sqlparser::ast::ColumnOption::NotNull;
1277 use sqlparser::ast::{BinaryOperator, Expr, ObjectName, ObjectNamePart, Value};
1278 use sqlparser::dialect::GenericDialect;
1279 use sqlparser::tokenizer::Tokenizer;
1280
1281 use super::*;
1282 use crate::dialect::GreptimeDbDialect;
1283 use crate::parser::ParseOptions;
1284
1285 fn string_option_map(
1286 entries: impl IntoIterator<Item = (&'static str, &'static str)>,
1287 ) -> OptionMap {
1288 OptionMap::new(entries.into_iter().map(|(key, value)| {
1289 (
1290 key.to_string(),
1291 OptionValue::try_new(Expr::Value(
1292 Value::SingleQuotedString(value.to_string()).into(),
1293 ))
1294 .unwrap(),
1295 )
1296 }))
1297 }
1298
1299 #[test]
1300 fn test_parse_create_table_like() {
1301 let sql = "CREATE TABLE t1 LIKE t2";
1302 let stmts =
1303 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1304 .unwrap();
1305
1306 assert_eq!(1, stmts.len());
1307 match &stmts[0] {
1308 Statement::CreateTableLike(c) => {
1309 assert_eq!(c.table_name.to_string(), "t1");
1310 assert_eq!(c.source_name.to_string(), "t2");
1311 }
1312 _ => unreachable!(),
1313 }
1314 }
1315
1316 #[test]
1317 fn test_validate_external_table_options() {
1318 let sql = "CREATE EXTERNAL TABLE city (
1319 host string,
1320 ts timestamp,
1321 cpu float64 default 0,
1322 memory float64,
1323 TIME INDEX (ts),
1324 PRIMARY KEY(ts, host)
1325 ) with(location='/var/data/city.csv',format='csv',foo='bar');";
1326
1327 let result =
1328 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1329 assert!(matches!(
1330 result,
1331 Err(error::Error::InvalidTableOption { .. })
1332 ));
1333 }
1334
1335 #[test]
1336 fn test_parse_create_external_table() {
1337 struct Test<'a> {
1338 sql: &'a str,
1339 expected_table_name: &'a str,
1340 expected_options: HashMap<&'a str, &'a str>,
1341 expected_engine: &'a str,
1342 expected_if_not_exist: bool,
1343 }
1344
1345 let tests = [
1346 Test {
1347 sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
1348 expected_table_name: "city",
1349 expected_options: HashMap::from([
1350 ("location", "/var/data/city.csv"),
1351 ("format", "csv"),
1352 ]),
1353 expected_engine: FILE_ENGINE,
1354 expected_if_not_exist: false,
1355 },
1356 Test {
1357 sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');",
1358 expected_table_name: "city",
1359 expected_options: HashMap::from([
1360 ("location", "/var/data/city.csv"),
1361 ("format", "csv"),
1362 ]),
1363 expected_engine: "foo",
1364 expected_if_not_exist: true,
1365 },
1366 Test {
1367 sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv','compaction.type'='bar');",
1368 expected_table_name: "city",
1369 expected_options: HashMap::from([
1370 ("location", "/var/data/city.csv"),
1371 ("format", "csv"),
1372 ("compaction.type", "bar"),
1373 ]),
1374 expected_engine: "foo",
1375 expected_if_not_exist: true,
1376 },
1377 ];
1378
1379 for test in tests {
1380 let stmts = ParserContext::create_with_dialect(
1381 test.sql,
1382 &GreptimeDbDialect {},
1383 ParseOptions::default(),
1384 )
1385 .unwrap();
1386 assert_eq!(1, stmts.len());
1387 match &stmts[0] {
1388 Statement::CreateExternalTable(c) => {
1389 assert_eq!(c.name.to_string(), test.expected_table_name.to_string());
1390 assert_eq!(c.options.to_str_map(), test.expected_options);
1391 assert_eq!(c.if_not_exists, test.expected_if_not_exist);
1392 assert_eq!(c.engine, test.expected_engine);
1393 }
1394 _ => unreachable!(),
1395 }
1396 }
1397 }
1398
1399 #[test]
1400 fn test_parse_create_external_table_with_schema() {
1401 let sql = "CREATE EXTERNAL TABLE city (
1402 host string,
1403 ts timestamp,
1404 cpu float32 default 0,
1405 memory float64,
1406 TIME INDEX (ts),
1407 PRIMARY KEY(ts, host),
1408 ) with(location='/var/data/city.csv',format='csv');";
1409
1410 let options = HashMap::from([("location", "/var/data/city.csv"), ("format", "csv")]);
1411
1412 let stmts =
1413 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1414 .unwrap();
1415 assert_eq!(1, stmts.len());
1416 match &stmts[0] {
1417 Statement::CreateExternalTable(c) => {
1418 assert_eq!(c.name.to_string(), "city");
1419 assert_eq!(c.options.to_str_map(), options);
1420
1421 let columns = &c.columns;
1422 assert_column_def(&columns[0].column_def, "host", "STRING");
1423 assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
1424 assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
1425 assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
1426
1427 let constraints = &c.constraints;
1428 assert_eq!(
1429 &constraints[0],
1430 &TableConstraint::TimeIndex {
1431 column: Ident::new("ts"),
1432 }
1433 );
1434 assert_eq!(
1435 &constraints[1],
1436 &TableConstraint::PrimaryKey {
1437 columns: vec![Ident::new("ts"), Ident::new("host")]
1438 }
1439 );
1440 }
1441 _ => unreachable!(),
1442 }
1443 }
1444
1445 #[test]
1446 fn test_parse_create_database() {
1447 let sql = "create database";
1448 let result =
1449 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1450 assert!(
1451 result
1452 .unwrap_err()
1453 .to_string()
1454 .contains("Unexpected token while parsing SQL statement")
1455 );
1456
1457 let sql = "create database prometheus";
1458 let stmts =
1459 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1460 .unwrap();
1461
1462 assert_eq!(1, stmts.len());
1463 match &stmts[0] {
1464 Statement::CreateDatabase(c) => {
1465 assert_eq!(c.name.to_string(), "prometheus");
1466 assert!(!c.if_not_exists);
1467 }
1468 _ => unreachable!(),
1469 }
1470
1471 let sql = "create database if not exists prometheus";
1472 let stmts =
1473 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1474 .unwrap();
1475
1476 assert_eq!(1, stmts.len());
1477 match &stmts[0] {
1478 Statement::CreateDatabase(c) => {
1479 assert_eq!(c.name.to_string(), "prometheus");
1480 assert!(c.if_not_exists);
1481 }
1482 _ => unreachable!(),
1483 }
1484
1485 let sql = "CREATE DATABASE `fOo`";
1486 let result =
1487 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1488 let stmts = result.unwrap();
1489 match &stmts.last().unwrap() {
1490 Statement::CreateDatabase(c) => {
1491 assert_eq!(c.name, vec![Ident::with_quote('`', "fOo")].into());
1492 assert!(!c.if_not_exists);
1493 }
1494 _ => unreachable!(),
1495 }
1496
1497 let sql = "CREATE DATABASE prometheus with (ttl='1h');";
1498 let result =
1499 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1500 let stmts = result.unwrap();
1501 match &stmts[0] {
1502 Statement::CreateDatabase(c) => {
1503 assert_eq!(c.name.to_string(), "prometheus");
1504 assert!(!c.if_not_exists);
1505 assert_eq!(c.options.get("ttl").unwrap(), "1h");
1506 }
1507 _ => unreachable!(),
1508 }
1509 }
1510
1511 #[test]
1512 fn test_parse_create_flow_more_testcases() {
1513 use pretty_assertions::assert_eq;
1514 fn parse_create_flow(sql: &str) -> CreateFlow {
1515 let stmts = ParserContext::create_with_dialect(
1516 sql,
1517 &GreptimeDbDialect {},
1518 ParseOptions::default(),
1519 )
1520 .unwrap();
1521 assert_eq!(1, stmts.len());
1522 match &stmts[0] {
1523 Statement::CreateFlow(c) => c.clone(),
1524 _ => unreachable!(),
1525 }
1526 }
1527 struct CreateFlowWoutQuery {
1528 pub flow_name: ObjectName,
1530 pub sink_table_name: ObjectName,
1532 pub or_replace: bool,
1534 pub if_not_exists: bool,
1536 pub expire_after: Option<i64>,
1539 pub comment: Option<String>,
1541 pub flow_options: OptionMap,
1543 }
1544 let testcases = vec![
1545 (
1546 r"
1547CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1548SINK TO schema_1.table_1
1549EXPIRE AFTER INTERVAL '5 minutes'
1550COMMENT 'test comment'
1551AS
1552SELECT max(c1), min(c2) FROM schema_2.table_2;",
1553 CreateFlowWoutQuery {
1554 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1555 sink_table_name: ObjectName::from(vec![
1556 Ident::new("schema_1"),
1557 Ident::new("table_1"),
1558 ]),
1559 or_replace: true,
1560 if_not_exists: true,
1561 expire_after: Some(300),
1562 comment: Some("test comment".to_string()),
1563 flow_options: OptionMap::default(),
1564 },
1565 ),
1566 (
1567 r"
1568CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1569SINK TO schema_1.table_1
1570EXPIRE AFTER INTERVAL '300 s'
1571COMMENT 'test comment'
1572AS
1573SELECT max(c1), min(c2) FROM schema_2.table_2;",
1574 CreateFlowWoutQuery {
1575 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1576 sink_table_name: ObjectName::from(vec![
1577 Ident::new("schema_1"),
1578 Ident::new("table_1"),
1579 ]),
1580 or_replace: true,
1581 if_not_exists: true,
1582 expire_after: Some(300),
1583 comment: Some("test comment".to_string()),
1584 flow_options: OptionMap::default(),
1585 },
1586 ),
1587 (
1588 r"
1589CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1590SINK TO schema_1.table_1
1591EXPIRE AFTER '5 minutes'
1592COMMENT 'test comment'
1593AS
1594SELECT max(c1), min(c2) FROM schema_2.table_2;",
1595 CreateFlowWoutQuery {
1596 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1597 sink_table_name: ObjectName::from(vec![
1598 Ident::new("schema_1"),
1599 Ident::new("table_1"),
1600 ]),
1601 or_replace: true,
1602 if_not_exists: true,
1603 expire_after: Some(300),
1604 comment: Some("test comment".to_string()),
1605 flow_options: OptionMap::default(),
1606 },
1607 ),
1608 (
1609 r"
1610CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1611SINK TO schema_1.table_1
1612EXPIRE AFTER '300 s'
1613COMMENT 'test comment'
1614AS
1615SELECT max(c1), min(c2) FROM schema_2.table_2;",
1616 CreateFlowWoutQuery {
1617 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1618 sink_table_name: ObjectName::from(vec![
1619 Ident::new("schema_1"),
1620 Ident::new("table_1"),
1621 ]),
1622 or_replace: true,
1623 if_not_exists: true,
1624 expire_after: Some(300),
1625 comment: Some("test comment".to_string()),
1626 flow_options: OptionMap::default(),
1627 },
1628 ),
1629 (
1630 r"
1631CREATE FLOW `task_2`
1632SINK TO schema_1.table_1
1633EXPIRE AFTER '2 days 1h 2 min'
1634AS
1635SELECT max(c1), min(c2) FROM schema_2.table_2;",
1636 CreateFlowWoutQuery {
1637 flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_2")]),
1638 sink_table_name: ObjectName::from(vec![
1639 Ident::new("schema_1"),
1640 Ident::new("table_1"),
1641 ]),
1642 or_replace: false,
1643 if_not_exists: false,
1644 expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1645 comment: None,
1646 flow_options: OptionMap::default(),
1647 },
1648 ),
1649 (
1650 r"
1651create flow `task_3`
1652sink to schema_1.table_1
1653expire after '10 minutes'
1654as
1655select max(c1), min(c2) from schema_2.table_2;",
1656 CreateFlowWoutQuery {
1657 flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_3")]),
1658 sink_table_name: ObjectName::from(vec![
1659 Ident::new("schema_1"),
1660 Ident::new("table_1"),
1661 ]),
1662 or_replace: false,
1663 if_not_exists: false,
1664 expire_after: Some(600), comment: None,
1666 flow_options: OptionMap::default(),
1667 },
1668 ),
1669 (
1670 r"
1671create or replace flow if not exists task_4
1672sink to schema_1.table_1
1673expire after interval '2 hours'
1674comment 'lowercase test'
1675as
1676select max(c1), min(c2) from schema_2.table_2;",
1677 CreateFlowWoutQuery {
1678 flow_name: ObjectName::from(vec![Ident::new("task_4")]),
1679 sink_table_name: ObjectName::from(vec![
1680 Ident::new("schema_1"),
1681 Ident::new("table_1"),
1682 ]),
1683 or_replace: true,
1684 if_not_exists: true,
1685 expire_after: Some(7200), comment: Some("lowercase test".to_string()),
1687 flow_options: OptionMap::default(),
1688 },
1689 ),
1690 (
1691 r"
1692CREATE FLOW task_5
1693SINK TO schema_1.table_1
1694WITH (defer_on_missing_source = 'true')
1695AS
1696SELECT max(c1), min(c2) FROM schema_2.table_2;",
1697 CreateFlowWoutQuery {
1698 flow_name: ObjectName::from(vec![Ident::new("task_5")]),
1699 sink_table_name: ObjectName::from(vec![
1700 Ident::new("schema_1"),
1701 Ident::new("table_1"),
1702 ]),
1703 or_replace: false,
1704 if_not_exists: false,
1705 expire_after: None,
1706 comment: None,
1707 flow_options: string_option_map([("defer_on_missing_source", "true")]),
1708 },
1709 ),
1710 ];
1711
1712 for (sql, expected) in testcases {
1713 let create_task = parse_create_flow(sql);
1714
1715 let expected = CreateFlow {
1716 flow_name: expected.flow_name,
1717 sink_table_name: expected.sink_table_name,
1718 or_replace: expected.or_replace,
1719 if_not_exists: expected.if_not_exists,
1720 expire_after: expected.expire_after,
1721 eval_interval: None,
1722 comment: expected.comment,
1723 flow_options: expected.flow_options,
1724 query: create_task.query.clone(),
1726 };
1727
1728 assert_eq!(create_task, expected, "input sql is:\n{sql}");
1729 let show_create = create_task.to_string();
1730 let recreated = parse_create_flow(&show_create);
1731 assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1732 }
1733 }
1734
1735 #[test]
1736 fn test_parse_create_flow() {
1737 use pretty_assertions::assert_eq;
1738 fn parse_create_flow(sql: &str) -> CreateFlow {
1739 let stmts = ParserContext::create_with_dialect(
1740 sql,
1741 &GreptimeDbDialect {},
1742 ParseOptions::default(),
1743 )
1744 .unwrap();
1745 assert_eq!(1, stmts.len());
1746 match &stmts[0] {
1747 Statement::CreateFlow(c) => c.clone(),
1748 _ => panic!("{:?}", stmts[0]),
1749 }
1750 }
1751 struct CreateFlowWoutQuery {
1752 pub flow_name: ObjectName,
1754 pub sink_table_name: ObjectName,
1756 pub or_replace: bool,
1758 pub if_not_exists: bool,
1760 pub expire_after: Option<i64>,
1763 pub eval_interval: Option<i64>,
1767 pub comment: Option<String>,
1769 pub flow_options: OptionMap,
1771 }
1772
1773 let testcases = vec![
1775 (
1776 r"
1777CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1778SINK TO schema_1.table_1
1779EXPIRE AFTER INTERVAL '5 minutes'
1780COMMENT 'test comment'
1781AS
1782SELECT max(c1), min(c2) FROM schema_2.table_2;",
1783 CreateFlowWoutQuery {
1784 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1785 sink_table_name: ObjectName(vec![
1786 ObjectNamePart::Identifier(Ident::new("schema_1")),
1787 ObjectNamePart::Identifier(Ident::new("table_1")),
1788 ]),
1789 or_replace: true,
1790 if_not_exists: true,
1791 expire_after: Some(300),
1792 eval_interval: None,
1793 comment: Some("test comment".to_string()),
1794 flow_options: OptionMap::default(),
1795 },
1796 ),
1797 (
1798 r"
1799CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1800SINK TO schema_1.table_1
1801EXPIRE AFTER INTERVAL '300 s'
1802COMMENT 'test comment'
1803AS
1804SELECT max(c1), min(c2) FROM schema_2.table_2;",
1805 CreateFlowWoutQuery {
1806 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1807 sink_table_name: ObjectName(vec![
1808 ObjectNamePart::Identifier(Ident::new("schema_1")),
1809 ObjectNamePart::Identifier(Ident::new("table_1")),
1810 ]),
1811 or_replace: true,
1812 if_not_exists: true,
1813 expire_after: Some(300),
1814 eval_interval: None,
1815 comment: Some("test comment".to_string()),
1816 flow_options: OptionMap::default(),
1817 },
1818 ),
1819 (
1820 r"
1821CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1822SINK TO schema_1.table_1
1823EXPIRE AFTER '5 minutes'
1824EVAL INTERVAL '10 seconds'
1825COMMENT 'test comment'
1826AS
1827SELECT max(c1), min(c2) FROM schema_2.table_2;",
1828 CreateFlowWoutQuery {
1829 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1830 sink_table_name: ObjectName(vec![
1831 ObjectNamePart::Identifier(Ident::new("schema_1")),
1832 ObjectNamePart::Identifier(Ident::new("table_1")),
1833 ]),
1834 or_replace: true,
1835 if_not_exists: true,
1836 expire_after: Some(300),
1837 eval_interval: Some(10),
1838 comment: Some("test comment".to_string()),
1839 flow_options: OptionMap::default(),
1840 },
1841 ),
1842 (
1843 r"
1844CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1845SINK TO schema_1.table_1
1846EXPIRE AFTER '5 minutes'
1847EVAL INTERVAL INTERVAL '10 seconds'
1848COMMENT 'test comment'
1849AS
1850SELECT max(c1), min(c2) FROM schema_2.table_2;",
1851 CreateFlowWoutQuery {
1852 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1853 sink_table_name: ObjectName(vec![
1854 ObjectNamePart::Identifier(Ident::new("schema_1")),
1855 ObjectNamePart::Identifier(Ident::new("table_1")),
1856 ]),
1857 or_replace: true,
1858 if_not_exists: true,
1859 expire_after: Some(300),
1860 eval_interval: Some(10),
1861 comment: Some("test comment".to_string()),
1862 flow_options: OptionMap::default(),
1863 },
1864 ),
1865 (
1866 r"
1867CREATE FLOW `task_2`
1868SINK TO schema_1.table_1
1869EXPIRE AFTER '2 days 1h 2 min'
1870AS
1871SELECT max(c1), min(c2) FROM schema_2.table_2;",
1872 CreateFlowWoutQuery {
1873 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::with_quote(
1874 '`', "task_2",
1875 ))]),
1876 sink_table_name: ObjectName(vec![
1877 ObjectNamePart::Identifier(Ident::new("schema_1")),
1878 ObjectNamePart::Identifier(Ident::new("table_1")),
1879 ]),
1880 or_replace: false,
1881 if_not_exists: false,
1882 expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1883 eval_interval: None,
1884 comment: None,
1885 flow_options: OptionMap::default(),
1886 },
1887 ),
1888 (
1889 r"
1890CREATE FLOW task_3
1891SINK TO schema_1.table_1
1892EVAL INTERVAL '10 seconds'
1893WITH (defer_on_missing_source = 'true', foo = 'bar')
1894AS
1895SELECT max(c1), min(c2) FROM schema_2.table_2;",
1896 CreateFlowWoutQuery {
1897 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_3"))]),
1898 sink_table_name: ObjectName(vec![
1899 ObjectNamePart::Identifier(Ident::new("schema_1")),
1900 ObjectNamePart::Identifier(Ident::new("table_1")),
1901 ]),
1902 or_replace: false,
1903 if_not_exists: false,
1904 expire_after: None,
1905 eval_interval: Some(10),
1906 comment: None,
1907 flow_options: string_option_map([
1908 ("defer_on_missing_source", "true"),
1909 ("foo", "bar"),
1910 ]),
1911 },
1912 ),
1913 ];
1914
1915 for (sql, expected) in testcases {
1916 let create_task = parse_create_flow(sql);
1917
1918 let expected = CreateFlow {
1919 flow_name: expected.flow_name,
1920 sink_table_name: expected.sink_table_name,
1921 or_replace: expected.or_replace,
1922 if_not_exists: expected.if_not_exists,
1923 expire_after: expected.expire_after,
1924 eval_interval: expected.eval_interval,
1925 comment: expected.comment,
1926 flow_options: expected.flow_options,
1927 query: create_task.query.clone(),
1929 };
1930
1931 assert_eq!(create_task, expected, "input sql is:\n{sql}");
1932 let show_create = create_task.to_string();
1933 let recreated = parse_create_flow(&show_create);
1934 assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1935 }
1936 }
1937
1938 #[test]
1939 fn test_parse_create_flow_with_tql_cte_query() {
1940 let sql = r#"
1941CREATE FLOW calc_reqs_cte
1942SINK TO cnt_reqs_cte
1943EVAL INTERVAL '1m'
1944AS
1945WITH tql(the_timestamp, the_value) AS (
1946 TQL EVAL (now() - '1m'::interval, now(), '5s') metric
1947)
1948SELECT * FROM tql;
1949"#;
1950
1951 let stmts =
1952 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1953 .unwrap();
1954 assert_eq!(1, stmts.len());
1955 let Statement::CreateFlow(create_flow) = &stmts[0] else {
1956 panic!("unexpected stmt: {:?}", stmts[0]);
1957 };
1958
1959 let query = create_flow.query.to_string();
1960 assert!(query.to_uppercase().contains("WITH"));
1961 assert!(query.to_uppercase().contains("TQL EVAL"));
1962 }
1963
1964 #[test]
1965 fn test_parse_create_flow_with_sql_cte_is_unsupported() {
1966 let sql = r#"
1967CREATE FLOW f
1968SINK TO s
1969AS
1970WITH cte AS (SELECT 1) SELECT * FROM cte;
1971"#;
1972
1973 let err =
1974 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1975 .unwrap_err();
1976 let msg = err.to_string();
1977 assert!(msg.to_uppercase().contains("WITH"), "err: {msg}");
1978 }
1979
1980 #[test]
1981 fn test_parse_create_flow_with_tql_cte_requires_now_expr() {
1982 let sql = r#"
1983CREATE FLOW f
1984SINK TO s
1985EVAL INTERVAL '1m'
1986AS
1987WITH tql(ts, val) AS (
1988 TQL EVAL (0, 15, '5s') metric
1989)
1990SELECT * FROM tql;
1991"#;
1992
1993 let err =
1994 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1995 .unwrap_err();
1996
1997 let msg = format!("{err:?}");
1998 assert!(
1999 msg.contains("Expected expression containing `now()`"),
2000 "unexpected err: {msg}"
2001 );
2002 }
2003
2004 #[test]
2005 fn test_parse_create_flow_with_tql_cte_non_select_star_is_unsupported() {
2006 let sql = r#"
2007CREATE FLOW f
2008SINK TO s
2009AS
2010WITH tql(ts, val) AS (
2011 TQL EVAL (now() - '1m'::interval, now(), '5s') metric
2012)
2013SELECT ts FROM tql;
2014"#;
2015
2016 let err =
2017 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2018 .unwrap_err();
2019 assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
2020 }
2021
2022 #[test]
2023 fn test_parse_create_flow_with_tql_cte_filter_is_unsupported() {
2024 let sql = r#"
2025CREATE FLOW f
2026SINK TO s
2027AS
2028WITH tql(ts, val) AS (
2029 TQL EVAL (now() - '1m'::interval, now(), '5s') metric
2030)
2031SELECT * FROM tql WHERE ts > 0;
2032"#;
2033
2034 let err =
2035 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2036 .unwrap_err();
2037 assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
2038 }
2039
2040 #[test]
2041 fn test_parse_create_flow_with_mixed_sql_tql_cte_is_unsupported() {
2042 let sql = r#"
2043CREATE FLOW f
2044SINK TO s
2045AS
2046WITH s1 AS (SELECT 1),
2047 tql(ts, val) AS (TQL EVAL (now() - '1m'::interval, now(), '5s') metric)
2048SELECT * FROM tql;
2049"#;
2050
2051 let err =
2052 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2053 .unwrap_err();
2054 assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
2055 }
2056
2057 #[test]
2058 fn test_create_flow_no_month() {
2059 let sql = r"
2060CREATE FLOW `task_2`
2061SINK TO schema_1.table_1
2062EXPIRE AFTER '1 month 2 days 1h 2 min'
2063AS
2064SELECT max(c1), min(c2) FROM schema_2.table_2;";
2065 let stmts =
2066 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2067
2068 assert!(
2069 stmts.is_err()
2070 && stmts
2071 .unwrap_err()
2072 .to_string()
2073 .contains("Interval with months is not allowed")
2074 );
2075 }
2076
2077 #[test]
2078 fn test_validate_create() {
2079 let sql = r"
2080CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp TIME INDEX)
2081PARTITION ON COLUMNS(c, a) (
2082 a < 10,
2083 a > 10 AND a < 20,
2084 a > 20 AND c < 100,
2085 a > 20 AND c >= 100
2086)
2087ENGINE=mito";
2088 let result =
2089 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2090 let _ = result.unwrap();
2091
2092 let sql = r"
2093CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2094PARTITION ON COLUMNS(x) ()
2095ENGINE=mito";
2096 let result =
2097 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2098 assert!(
2099 result
2100 .unwrap_err()
2101 .to_string()
2102 .contains("Partition column \"x\" not defined")
2103 );
2104 }
2105
2106 #[test]
2107 fn test_parse_create_table_with_partitions() {
2108 let sql = r"
2109CREATE TABLE monitor (
2110 host_id INT,
2111 idc STRING,
2112 ts TIMESTAMP,
2113 cpu DOUBLE DEFAULT 0,
2114 memory DOUBLE,
2115 TIME INDEX (ts),
2116 PRIMARY KEY (host),
2117)
2118PARTITION ON COLUMNS(idc, host_id) (
2119 idc <= 'hz' AND host_id < 1000,
2120 idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
2121 idc > 'sh' AND host_id < 3000,
2122 idc > 'sh' AND host_id >= 3000,
2123)
2124ENGINE=mito";
2125 let result =
2126 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2127 .unwrap();
2128 assert_eq!(result.len(), 1);
2129 match &result[0] {
2130 Statement::CreateTable(c) => {
2131 assert!(c.partitions.is_some());
2132
2133 let partitions = c.partitions.as_ref().unwrap();
2134 let column_list = partitions
2135 .column_list
2136 .iter()
2137 .map(|x| &x.value)
2138 .collect::<Vec<&String>>();
2139 assert_eq!(column_list, vec!["idc", "host_id"]);
2140
2141 let exprs = &partitions.exprs;
2142
2143 assert_eq!(
2144 exprs[0],
2145 Expr::BinaryOp {
2146 left: Box::new(Expr::BinaryOp {
2147 left: Box::new(Expr::Identifier("idc".into())),
2148 op: BinaryOperator::LtEq,
2149 right: Box::new(Expr::Value(
2150 Value::SingleQuotedString("hz".to_string()).into()
2151 ))
2152 }),
2153 op: BinaryOperator::And,
2154 right: Box::new(Expr::BinaryOp {
2155 left: Box::new(Expr::Identifier("host_id".into())),
2156 op: BinaryOperator::Lt,
2157 right: Box::new(Expr::Value(
2158 Value::Number("1000".to_string(), false).into()
2159 ))
2160 })
2161 }
2162 );
2163 assert_eq!(
2164 exprs[1],
2165 Expr::BinaryOp {
2166 left: Box::new(Expr::BinaryOp {
2167 left: Box::new(Expr::BinaryOp {
2168 left: Box::new(Expr::Identifier("idc".into())),
2169 op: BinaryOperator::Gt,
2170 right: Box::new(Expr::Value(
2171 Value::SingleQuotedString("hz".to_string()).into()
2172 ))
2173 }),
2174 op: BinaryOperator::And,
2175 right: Box::new(Expr::BinaryOp {
2176 left: Box::new(Expr::Identifier("idc".into())),
2177 op: BinaryOperator::LtEq,
2178 right: Box::new(Expr::Value(
2179 Value::SingleQuotedString("sh".to_string()).into()
2180 ))
2181 })
2182 }),
2183 op: BinaryOperator::And,
2184 right: Box::new(Expr::BinaryOp {
2185 left: Box::new(Expr::Identifier("host_id".into())),
2186 op: BinaryOperator::Lt,
2187 right: Box::new(Expr::Value(
2188 Value::Number("2000".to_string(), false).into()
2189 ))
2190 })
2191 }
2192 );
2193 assert_eq!(
2194 exprs[2],
2195 Expr::BinaryOp {
2196 left: Box::new(Expr::BinaryOp {
2197 left: Box::new(Expr::Identifier("idc".into())),
2198 op: BinaryOperator::Gt,
2199 right: Box::new(Expr::Value(
2200 Value::SingleQuotedString("sh".to_string()).into()
2201 ))
2202 }),
2203 op: BinaryOperator::And,
2204 right: Box::new(Expr::BinaryOp {
2205 left: Box::new(Expr::Identifier("host_id".into())),
2206 op: BinaryOperator::Lt,
2207 right: Box::new(Expr::Value(
2208 Value::Number("3000".to_string(), false).into()
2209 ))
2210 })
2211 }
2212 );
2213 assert_eq!(
2214 exprs[3],
2215 Expr::BinaryOp {
2216 left: Box::new(Expr::BinaryOp {
2217 left: Box::new(Expr::Identifier("idc".into())),
2218 op: BinaryOperator::Gt,
2219 right: Box::new(Expr::Value(
2220 Value::SingleQuotedString("sh".to_string()).into()
2221 ))
2222 }),
2223 op: BinaryOperator::And,
2224 right: Box::new(Expr::BinaryOp {
2225 left: Box::new(Expr::Identifier("host_id".into())),
2226 op: BinaryOperator::GtEq,
2227 right: Box::new(Expr::Value(
2228 Value::Number("3000".to_string(), false).into()
2229 ))
2230 })
2231 }
2232 );
2233 }
2234 _ => unreachable!(),
2235 }
2236 }
2237
2238 #[test]
2239 fn test_parse_create_table_with_quoted_partitions() {
2240 let sql = r"
2241CREATE TABLE monitor (
2242 `host_id` INT,
2243 idc STRING,
2244 ts TIMESTAMP,
2245 cpu DOUBLE DEFAULT 0,
2246 memory DOUBLE,
2247 TIME INDEX (ts),
2248 PRIMARY KEY (host),
2249)
2250PARTITION ON COLUMNS(IdC, host_id) (
2251 idc <= 'hz' AND host_id < 1000,
2252 idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
2253 idc > 'sh' AND host_id < 3000,
2254 idc > 'sh' AND host_id >= 3000,
2255)";
2256 let result =
2257 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2258 .unwrap();
2259 assert_eq!(result.len(), 1);
2260 }
2261
2262 #[test]
2263 fn test_parse_create_table_with_timestamp_index() {
2264 let sql1 = r"
2265CREATE TABLE monitor (
2266 host_id INT,
2267 idc STRING,
2268 ts TIMESTAMP TIME INDEX,
2269 cpu DOUBLE DEFAULT 0,
2270 memory DOUBLE,
2271 PRIMARY KEY (host),
2272)
2273ENGINE=mito";
2274 let result1 = ParserContext::create_with_dialect(
2275 sql1,
2276 &GreptimeDbDialect {},
2277 ParseOptions::default(),
2278 )
2279 .unwrap();
2280
2281 if let Statement::CreateTable(c) = &result1[0] {
2282 assert_eq!(c.constraints.len(), 2);
2283 let tc = c.constraints[0].clone();
2284 match tc {
2285 TableConstraint::TimeIndex { column } => {
2286 assert_eq!(&column.value, "ts");
2287 }
2288 _ => panic!("should be time index constraint"),
2289 };
2290 } else {
2291 panic!("should be create_table statement");
2292 }
2293
2294 let sql2 = r"
2297CREATE TABLE monitor (
2298 host_id INT,
2299 idc STRING,
2300 ts TIMESTAMP NOT NULL,
2301 cpu DOUBLE DEFAULT 0,
2302 memory DOUBLE,
2303 TIME INDEX (ts),
2304 PRIMARY KEY (host),
2305)
2306ENGINE=mito";
2307 let result2 = ParserContext::create_with_dialect(
2308 sql2,
2309 &GreptimeDbDialect {},
2310 ParseOptions::default(),
2311 )
2312 .unwrap();
2313
2314 assert_eq!(result1, result2);
2315
2316 let sql3 = r"
2318CREATE TABLE monitor (
2319 host_id INT,
2320 idc STRING,
2321 ts TIMESTAMP,
2322 cpu DOUBLE DEFAULT 0,
2323 memory DOUBLE,
2324 TIME INDEX (ts),
2325 PRIMARY KEY (host),
2326)
2327ENGINE=mito";
2328
2329 let result3 = ParserContext::create_with_dialect(
2330 sql3,
2331 &GreptimeDbDialect {},
2332 ParseOptions::default(),
2333 )
2334 .unwrap();
2335
2336 assert_ne!(result1, result3);
2337
2338 let sql1 = r"
2340CREATE TABLE monitor (
2341 host_id INT,
2342 idc STRING,
2343 b bigint TIME INDEX,
2344 cpu DOUBLE DEFAULT 0,
2345 memory DOUBLE,
2346 PRIMARY KEY (host),
2347)
2348ENGINE=mito";
2349 let result1 = ParserContext::create_with_dialect(
2350 sql1,
2351 &GreptimeDbDialect {},
2352 ParseOptions::default(),
2353 );
2354
2355 assert!(
2356 result1
2357 .unwrap_err()
2358 .to_string()
2359 .contains("time index column data type should be timestamp")
2360 );
2361 }
2362
2363 #[test]
2364 fn test_parse_create_table_with_timestamp_index_not_null() {
2365 let sql = r"
2366CREATE TABLE monitor (
2367 host_id INT,
2368 idc STRING,
2369 ts TIMESTAMP TIME INDEX,
2370 cpu DOUBLE DEFAULT 0,
2371 memory DOUBLE,
2372 TIME INDEX (ts),
2373 PRIMARY KEY (host),
2374)
2375ENGINE=mito";
2376 let result =
2377 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2378 .unwrap();
2379
2380 assert_eq!(result.len(), 1);
2381 if let Statement::CreateTable(c) = &result[0] {
2382 let ts = c.columns[2].clone();
2383 assert_eq!(ts.name().to_string(), "ts");
2384 assert_eq!(ts.options()[0].option, NotNull);
2385 } else {
2386 panic!("should be create table statement");
2387 }
2388
2389 let sql1 = r"
2390CREATE TABLE monitor (
2391 host_id INT,
2392 idc STRING,
2393 ts TIMESTAMP NOT NULL TIME INDEX,
2394 cpu DOUBLE DEFAULT 0,
2395 memory DOUBLE,
2396 TIME INDEX (ts),
2397 PRIMARY KEY (host),
2398)
2399ENGINE=mito";
2400
2401 let result1 = ParserContext::create_with_dialect(
2402 sql1,
2403 &GreptimeDbDialect {},
2404 ParseOptions::default(),
2405 )
2406 .unwrap();
2407 assert_eq!(result, result1);
2408
2409 let sql2 = r"
2410CREATE TABLE monitor (
2411 host_id INT,
2412 idc STRING,
2413 ts TIMESTAMP TIME INDEX NOT NULL,
2414 cpu DOUBLE DEFAULT 0,
2415 memory DOUBLE,
2416 TIME INDEX (ts),
2417 PRIMARY KEY (host),
2418)
2419ENGINE=mito";
2420
2421 let result2 = ParserContext::create_with_dialect(
2422 sql2,
2423 &GreptimeDbDialect {},
2424 ParseOptions::default(),
2425 )
2426 .unwrap();
2427 assert_eq!(result, result2);
2428
2429 let sql3 = r"
2430CREATE TABLE monitor (
2431 host_id INT,
2432 idc STRING,
2433 ts TIMESTAMP TIME INDEX NULL NOT,
2434 cpu DOUBLE DEFAULT 0,
2435 memory DOUBLE,
2436 TIME INDEX (ts),
2437 PRIMARY KEY (host),
2438)
2439ENGINE=mito";
2440
2441 let result3 = ParserContext::create_with_dialect(
2442 sql3,
2443 &GreptimeDbDialect {},
2444 ParseOptions::default(),
2445 );
2446 assert!(result3.is_err());
2447
2448 let sql4 = r"
2449CREATE TABLE monitor (
2450 host_id INT,
2451 idc STRING,
2452 ts TIMESTAMP TIME INDEX NOT NULL NULL,
2453 cpu DOUBLE DEFAULT 0,
2454 memory DOUBLE,
2455 TIME INDEX (ts),
2456 PRIMARY KEY (host),
2457)
2458ENGINE=mito";
2459
2460 let result4 = ParserContext::create_with_dialect(
2461 sql4,
2462 &GreptimeDbDialect {},
2463 ParseOptions::default(),
2464 );
2465 assert!(result4.is_err());
2466
2467 let sql = r"
2468CREATE TABLE monitor (
2469 host_id INT,
2470 idc STRING,
2471 ts TIMESTAMP TIME INDEX DEFAULT CURRENT_TIMESTAMP,
2472 cpu DOUBLE DEFAULT 0,
2473 memory DOUBLE,
2474 TIME INDEX (ts),
2475 PRIMARY KEY (host),
2476)
2477ENGINE=mito";
2478
2479 let result =
2480 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2481 .unwrap();
2482
2483 if let Statement::CreateTable(c) = &result[0] {
2484 let tc = c.constraints[0].clone();
2485 match tc {
2486 TableConstraint::TimeIndex { column } => {
2487 assert_eq!(&column.value, "ts");
2488 }
2489 _ => panic!("should be time index constraint"),
2490 }
2491 let ts = c.columns[2].clone();
2492 assert_eq!(ts.name().to_string(), "ts");
2493 assert!(matches!(ts.options()[0].option, ColumnOption::Default(..)));
2494 assert_eq!(ts.options()[1].option, NotNull);
2495 } else {
2496 unreachable!("should be create table statement");
2497 }
2498 }
2499
2500 #[test]
2501 fn test_parse_partitions_with_error_syntax() {
2502 let sql = r"
2503CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2504PARTITION COLUMNS(c, a) (
2505 a < 10,
2506 a > 10 AND a < 20,
2507 a > 20 AND c < 100,
2508 a > 20 AND c >= 100
2509)
2510ENGINE=mito";
2511 let result =
2512 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2513 assert!(
2514 result
2515 .unwrap_err()
2516 .output_msg()
2517 .contains("sql parser error: Expected: ON, found: COLUMNS")
2518 );
2519 }
2520
2521 #[test]
2522 fn test_parse_partitions_without_rule() {
2523 let sql = r"
2524CREATE TABLE rcx ( a INT, b STRING, c INT, d TIMESTAMP TIME INDEX )
2525PARTITION ON COLUMNS(c, a) ()
2526ENGINE=mito";
2527 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2528 .unwrap();
2529 }
2530
2531 #[test]
2532 fn test_parse_partitions_unreferenced_column() {
2533 let sql = r"
2534CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2535PARTITION ON COLUMNS(c, a) (
2536 b = 'foo'
2537)
2538ENGINE=mito";
2539 let result =
2540 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2541 assert_eq!(
2542 result.unwrap_err().output_msg(),
2543 "Invalid SQL, error: Column \"b\" in rule expr is not referenced in PARTITION ON"
2544 );
2545 }
2546
2547 #[test]
2548 fn test_parse_partitions_not_binary_expr() {
2549 let sql = r"
2550CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2551PARTITION ON COLUMNS(c, a) (
2552 b
2553)
2554ENGINE=mito";
2555 let result =
2556 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2557 assert_eq!(
2558 result.unwrap_err().output_msg(),
2559 r#"Invalid SQL, error: Partition rule expr Identifier(Ident { value: "b", quote_style: None, span: Span(Location(4,5)..Location(4,6)) }) is not a binary expr"#
2560 );
2561 }
2562
2563 fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {
2564 assert_eq!(column.name.to_string(), name);
2565 assert_eq!(column.data_type.to_string(), data_type);
2566 }
2567
2568 #[test]
2569 pub fn test_parse_create_table() {
2570 let sql = r"create table demo(
2571 host string,
2572 ts timestamp,
2573 cpu float32 default 0,
2574 memory float64,
2575 TIME INDEX (ts),
2576 PRIMARY KEY(ts, host),
2577 ) engine=mito
2578 with(ttl='10s');
2579 ";
2580 let result =
2581 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2582 .unwrap();
2583 assert_eq!(1, result.len());
2584 match &result[0] {
2585 Statement::CreateTable(c) => {
2586 assert!(!c.if_not_exists);
2587 assert_eq!("demo", c.name.to_string());
2588 assert_eq!("mito", c.engine);
2589 assert_eq!(4, c.columns.len());
2590 let columns = &c.columns;
2591 assert_column_def(&columns[0].column_def, "host", "STRING");
2592 assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
2593 assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
2594 assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
2595
2596 let constraints = &c.constraints;
2597 assert_eq!(
2598 &constraints[0],
2599 &TableConstraint::TimeIndex {
2600 column: Ident::new("ts"),
2601 }
2602 );
2603 assert_eq!(
2604 &constraints[1],
2605 &TableConstraint::PrimaryKey {
2606 columns: vec![Ident::new("ts"), Ident::new("host")]
2607 }
2608 );
2609 assert_eq!(1, c.options.len());
2611 assert_eq!(
2612 [("ttl", "10s")].into_iter().collect::<HashMap<_, _>>(),
2613 c.options.to_str_map()
2614 );
2615 }
2616 _ => unreachable!(),
2617 }
2618 }
2619
2620 #[test]
2621 fn test_invalid_index_keys() {
2622 let sql = r"create table demo(
2623 host string,
2624 ts int64,
2625 cpu float64 default 0,
2626 memory float64,
2627 TIME INDEX (ts, host),
2628 PRIMARY KEY(ts, host)) engine=mito;
2629 ";
2630 let result =
2631 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2632 assert!(result.is_err());
2633 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2634 }
2635
2636 #[test]
2637 fn test_duplicated_time_index() {
2638 let sql = r"create table demo(
2639 host string,
2640 ts timestamp time index,
2641 t timestamp time index,
2642 cpu float64 default 0,
2643 memory float64,
2644 TIME INDEX (ts, host),
2645 PRIMARY KEY(ts, host)) engine=mito;
2646 ";
2647 let result =
2648 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2649 assert!(result.is_err());
2650 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2651
2652 let sql = r"create table demo(
2653 host string,
2654 ts timestamp time index,
2655 cpu float64 default 0,
2656 t timestamp,
2657 memory float64,
2658 TIME INDEX (t),
2659 PRIMARY KEY(ts, host)) engine=mito;
2660 ";
2661 let result =
2662 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2663 assert!(result.is_err());
2664 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2665 }
2666
2667 #[test]
2668 fn test_invalid_column_name() {
2669 let sql = "create table foo(user string, i timestamp time index)";
2670 let result =
2671 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2672 let err = result.unwrap_err().output_msg();
2673 assert!(err.contains("Cannot use keyword 'user' as column name"));
2674
2675 let sql = r#"
2677 create table foo("user" string, i timestamp time index)
2678 "#;
2679 let result =
2680 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2681 let _ = result.unwrap();
2682 }
2683
2684 #[test]
2685 fn test_incorrect_default_value_issue_3479() {
2686 let sql = r#"CREATE TABLE `ExcePTuRi`(
2687non TIMESTAMP(6) TIME INDEX,
2688`iUSTO` DOUBLE DEFAULT 0.047318541668048164
2689)"#;
2690 let result =
2691 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2692 .unwrap();
2693 assert_eq!(1, result.len());
2694 match &result[0] {
2695 Statement::CreateTable(c) => {
2696 assert_eq!(
2697 "`iUSTO` DOUBLE DEFAULT 0.047318541668048164",
2698 c.columns[1].to_string()
2699 );
2700 }
2701 _ => unreachable!(),
2702 }
2703 }
2704
2705 #[test]
2706 fn test_parse_create_view() {
2707 let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
2708
2709 let result =
2710 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2711 .unwrap();
2712 match &result[0] {
2713 Statement::CreateView(c) => {
2714 assert_eq!(c.to_string(), sql);
2715 assert!(!c.or_replace);
2716 assert!(!c.if_not_exists);
2717 assert_eq!("test", c.name.to_string());
2718 }
2719 _ => unreachable!(),
2720 }
2721
2722 let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS";
2723
2724 let result =
2725 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2726 .unwrap();
2727 match &result[0] {
2728 Statement::CreateView(c) => {
2729 assert_eq!(c.to_string(), sql);
2730 assert!(c.or_replace);
2731 assert!(c.if_not_exists);
2732 assert_eq!("test", c.name.to_string());
2733 }
2734 _ => unreachable!(),
2735 }
2736 }
2737
2738 #[test]
2739 fn test_parse_create_view_invalid_query() {
2740 let sql = "CREATE VIEW test AS DELETE from demo";
2741 let result =
2742 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2743 assert!(result.is_ok_and(|x| x.len() == 1));
2744 }
2745
2746 #[test]
2747 fn test_parse_create_table_fulltext_options() {
2748 let sql1 = r"
2749CREATE TABLE log (
2750 ts TIMESTAMP TIME INDEX,
2751 msg TEXT FULLTEXT INDEX,
2752)";
2753 let result1 = ParserContext::create_with_dialect(
2754 sql1,
2755 &GreptimeDbDialect {},
2756 ParseOptions::default(),
2757 )
2758 .unwrap();
2759
2760 if let Statement::CreateTable(c) = &result1[0] {
2761 c.columns.iter().for_each(|col| {
2762 if col.name().value == "msg" {
2763 assert!(
2764 col.extensions
2765 .fulltext_index_options
2766 .as_ref()
2767 .unwrap()
2768 .is_empty()
2769 );
2770 }
2771 });
2772 } else {
2773 panic!("should be create_table statement");
2774 }
2775
2776 let sql2 = r"
2777CREATE TABLE log (
2778 ts TIMESTAMP TIME INDEX,
2779 msg STRING FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false')
2780)";
2781 let result2 = ParserContext::create_with_dialect(
2782 sql2,
2783 &GreptimeDbDialect {},
2784 ParseOptions::default(),
2785 )
2786 .unwrap();
2787
2788 if let Statement::CreateTable(c) = &result2[0] {
2789 c.columns.iter().for_each(|col| {
2790 if col.name().value == "msg" {
2791 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2792 assert_eq!(options.len(), 2);
2793 assert_eq!(options.get("analyzer").unwrap(), "English");
2794 assert_eq!(options.get("case_sensitive").unwrap(), "false");
2795 }
2796 });
2797 } else {
2798 panic!("should be create_table statement");
2799 }
2800
2801 let sql3 = r"
2802CREATE TABLE log (
2803 ts TIMESTAMP TIME INDEX,
2804 msg1 TINYTEXT FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false'),
2805 msg2 CHAR(20) FULLTEXT INDEX WITH (analyzer='Chinese', case_sensitive='true')
2806)";
2807 let result3 = ParserContext::create_with_dialect(
2808 sql3,
2809 &GreptimeDbDialect {},
2810 ParseOptions::default(),
2811 )
2812 .unwrap();
2813
2814 if let Statement::CreateTable(c) = &result3[0] {
2815 c.columns.iter().for_each(|col| {
2816 if col.name().value == "msg1" {
2817 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2818 assert_eq!(options.len(), 2);
2819 assert_eq!(options.get("analyzer").unwrap(), "English");
2820 assert_eq!(options.get("case_sensitive").unwrap(), "false");
2821 } else if col.name().value == "msg2" {
2822 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2823 assert_eq!(options.len(), 2);
2824 assert_eq!(options.get("analyzer").unwrap(), "Chinese");
2825 assert_eq!(options.get("case_sensitive").unwrap(), "true");
2826 }
2827 });
2828 } else {
2829 panic!("should be create_table statement");
2830 }
2831 }
2832
2833 #[test]
2834 fn test_parse_create_table_fulltext_options_invalid_type() {
2835 let sql = r"
2836CREATE TABLE log (
2837 ts TIMESTAMP TIME INDEX,
2838 msg INT FULLTEXT INDEX,
2839)";
2840 let result =
2841 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2842 assert!(result.is_err());
2843 assert!(
2844 result
2845 .unwrap_err()
2846 .to_string()
2847 .contains("FULLTEXT index only supports string type")
2848 );
2849 }
2850
2851 #[test]
2852 fn test_parse_create_table_fulltext_options_duplicate() {
2853 let sql = r"
2854CREATE TABLE log (
2855 ts TIMESTAMP TIME INDEX,
2856 msg STRING FULLTEXT INDEX WITH (analyzer='English', analyzer='Chinese') FULLTEXT INDEX WITH (case_sensitive='false')
2857)";
2858 let result =
2859 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2860 assert!(result.is_err());
2861 assert!(
2862 result
2863 .unwrap_err()
2864 .to_string()
2865 .contains("duplicated FULLTEXT INDEX option")
2866 );
2867 }
2868
2869 #[test]
2870 fn test_parse_create_table_fulltext_options_invalid_option() {
2871 let sql = r"
2872CREATE TABLE log (
2873 ts TIMESTAMP TIME INDEX,
2874 msg STRING FULLTEXT INDEX WITH (analyzer='English', invalid_option='Chinese')
2875)";
2876 let result =
2877 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2878 assert!(result.is_err());
2879 assert!(
2880 result
2881 .unwrap_err()
2882 .to_string()
2883 .contains("invalid FULLTEXT INDEX option")
2884 );
2885 }
2886
2887 #[test]
2888 fn test_parse_create_table_skip_options() {
2889 let sql = r"
2890CREATE TABLE log (
2891 ts TIMESTAMP TIME INDEX,
2892 msg INT SKIPPING INDEX WITH (granularity='8192', type='bloom'),
2893)";
2894 let result =
2895 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2896 .unwrap();
2897
2898 if let Statement::CreateTable(c) = &result[0] {
2899 c.columns.iter().for_each(|col| {
2900 if col.name().value == "msg" {
2901 assert!(
2902 !col.extensions
2903 .skipping_index_options
2904 .as_ref()
2905 .unwrap()
2906 .is_empty()
2907 );
2908 }
2909 });
2910 } else {
2911 panic!("should be create_table statement");
2912 }
2913
2914 let sql = r"
2915 CREATE TABLE log (
2916 ts TIMESTAMP TIME INDEX,
2917 msg INT SKIPPING INDEX,
2918 )";
2919 let result =
2920 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2921 .unwrap();
2922
2923 if let Statement::CreateTable(c) = &result[0] {
2924 c.columns.iter().for_each(|col| {
2925 if col.name().value == "msg" {
2926 assert!(
2927 col.extensions
2928 .skipping_index_options
2929 .as_ref()
2930 .unwrap()
2931 .is_empty()
2932 );
2933 }
2934 });
2935 } else {
2936 panic!("should be create_table statement");
2937 }
2938 }
2939
2940 #[test]
2941 fn test_parse_create_view_with_columns() {
2942 let sql = "CREATE VIEW test () AS SELECT * FROM NUMBERS";
2943 let result =
2944 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2945 .unwrap();
2946
2947 match &result[0] {
2948 Statement::CreateView(c) => {
2949 assert_eq!(c.to_string(), "CREATE VIEW test AS SELECT * FROM NUMBERS");
2950 assert!(!c.or_replace);
2951 assert!(!c.if_not_exists);
2952 assert_eq!("test", c.name.to_string());
2953 }
2954 _ => unreachable!(),
2955 }
2956 assert_eq!(
2957 "CREATE VIEW test AS SELECT * FROM NUMBERS",
2958 result[0].to_string()
2959 );
2960
2961 let sql = "CREATE VIEW test (n1) AS SELECT * FROM NUMBERS";
2962 let result =
2963 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2964 .unwrap();
2965
2966 match &result[0] {
2967 Statement::CreateView(c) => {
2968 assert_eq!(c.to_string(), sql);
2969 assert!(!c.or_replace);
2970 assert!(!c.if_not_exists);
2971 assert_eq!("test", c.name.to_string());
2972 }
2973 _ => unreachable!(),
2974 }
2975 assert_eq!(sql, result[0].to_string());
2976
2977 let sql = "CREATE VIEW test (n1, n2) AS SELECT * FROM NUMBERS";
2978 let result =
2979 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2980 .unwrap();
2981
2982 match &result[0] {
2983 Statement::CreateView(c) => {
2984 assert_eq!(c.to_string(), sql);
2985 assert!(!c.or_replace);
2986 assert!(!c.if_not_exists);
2987 assert_eq!("test", c.name.to_string());
2988 }
2989 _ => unreachable!(),
2990 }
2991 assert_eq!(sql, result[0].to_string());
2992
2993 let sql = "CREATE VIEW test (n1 AS select * from demo";
2995 let result =
2996 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2997 assert!(result.is_err());
2998
2999 let sql = "CREATE VIEW test (n1, AS select * from demo";
3000 let result =
3001 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3002 assert!(result.is_err());
3003
3004 let sql = "CREATE VIEW test n1,n2) AS select * from demo";
3005 let result =
3006 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3007 assert!(result.is_err());
3008
3009 let sql = "CREATE VIEW test (1) AS select * from demo";
3010 let result =
3011 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3012 assert!(result.is_err());
3013
3014 let sql = "CREATE VIEW test (n1, select) AS select * from demo";
3016 let result =
3017 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3018 assert!(result.is_err());
3019 }
3020
3021 #[test]
3022 fn test_parse_column_extensions_vector() {
3023 let sql = "";
3025 let dialect = GenericDialect {};
3026 let mut tokenizer = Tokenizer::new(&dialect, sql);
3027 let tokens = tokenizer.tokenize().unwrap();
3028 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3029 let name = Ident::new("vec_col");
3030 let data_type =
3031 DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
3032 let mut extensions = ColumnExtensions::default();
3033
3034 let result =
3035 ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
3036 assert!(result.is_ok());
3037 assert!(extensions.vector_options.is_some());
3038 let vector_options = extensions.vector_options.unwrap();
3039 assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some("128"));
3040 }
3041
3042 #[test]
3043 fn test_parse_column_extensions_vector_invalid() {
3044 let sql = "";
3046 let dialect = GenericDialect {};
3047 let mut tokenizer = Tokenizer::new(&dialect, sql);
3048 let tokens = tokenizer.tokenize().unwrap();
3049 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3050 let name = Ident::new("vec_col");
3051 let data_type = DataType::Custom(vec![Ident::new("VECTOR")].into(), vec![]);
3052 let mut extensions = ColumnExtensions::default();
3053
3054 let result =
3055 ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
3056 assert!(result.is_err());
3057 }
3058
3059 #[test]
3060 fn test_parse_column_extensions_indices() {
3061 {
3063 let sql = "SKIPPING INDEX";
3064 let dialect = GenericDialect {};
3065 let mut tokenizer = Tokenizer::new(&dialect, sql);
3066 let tokens = tokenizer.tokenize().unwrap();
3067 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3068 let name = Ident::new("col");
3069 let data_type = DataType::String(None);
3070 let mut extensions = ColumnExtensions::default();
3071 let result = ParserContext::parse_column_extensions(
3072 &mut parser,
3073 &name,
3074 &data_type,
3075 &mut extensions,
3076 );
3077 assert!(result.is_ok());
3078 assert!(extensions.skipping_index_options.is_some());
3079 }
3080
3081 {
3083 let sql = "FULLTEXT INDEX WITH (analyzer = 'English', case_sensitive = 'true')";
3084 let dialect = GenericDialect {};
3085 let mut tokenizer = Tokenizer::new(&dialect, sql);
3086 let tokens = tokenizer.tokenize().unwrap();
3087 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3088 let name = Ident::new("text_col");
3089 let data_type = DataType::String(None);
3090 let mut extensions = ColumnExtensions::default();
3091 let result = ParserContext::parse_column_extensions(
3092 &mut parser,
3093 &name,
3094 &data_type,
3095 &mut extensions,
3096 );
3097 assert!(result.unwrap());
3098 assert!(extensions.fulltext_index_options.is_some());
3099 let fulltext_options = extensions.fulltext_index_options.unwrap();
3100 assert_eq!(fulltext_options.get("analyzer"), Some("English"));
3101 assert_eq!(fulltext_options.get("case_sensitive"), Some("true"));
3102 }
3103
3104 {
3106 let sql = "FULLTEXT INDEX WITH (analyzer = 'English')";
3107 let dialect = GenericDialect {};
3108 let mut tokenizer = Tokenizer::new(&dialect, sql);
3109 let tokens = tokenizer.tokenize().unwrap();
3110 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3111 let name = Ident::new("num_col");
3112 let data_type = DataType::Int(None); let mut extensions = ColumnExtensions::default();
3114 let result = ParserContext::parse_column_extensions(
3115 &mut parser,
3116 &name,
3117 &data_type,
3118 &mut extensions,
3119 );
3120 assert!(result.is_err());
3121 assert!(
3122 result
3123 .unwrap_err()
3124 .to_string()
3125 .contains("FULLTEXT index only supports string type")
3126 );
3127 }
3128
3129 {
3131 let sql = "FULLTEXT INDEX WITH (analyzer = 'Invalid', case_sensitive = 'true')";
3132 let dialect = GenericDialect {};
3133 let mut tokenizer = Tokenizer::new(&dialect, sql);
3134 let tokens = tokenizer.tokenize().unwrap();
3135 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3136 let name = Ident::new("text_col");
3137 let data_type = DataType::String(None);
3138 let mut extensions = ColumnExtensions::default();
3139 let result = ParserContext::parse_column_extensions(
3140 &mut parser,
3141 &name,
3142 &data_type,
3143 &mut extensions,
3144 );
3145 assert!(result.unwrap());
3146 }
3147
3148 {
3150 let sql = "INVERTED INDEX";
3151 let dialect = GenericDialect {};
3152 let mut tokenizer = Tokenizer::new(&dialect, sql);
3153 let tokens = tokenizer.tokenize().unwrap();
3154 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3155 let name = Ident::new("col");
3156 let data_type = DataType::String(None);
3157 let mut extensions = ColumnExtensions::default();
3158 let result = ParserContext::parse_column_extensions(
3159 &mut parser,
3160 &name,
3161 &data_type,
3162 &mut extensions,
3163 );
3164 assert!(result.is_ok());
3165 assert!(extensions.inverted_index_options.is_some());
3166 }
3167
3168 {
3170 let sql = "INVERTED INDEX WITH (analyzer = 'English')";
3171 let dialect = GenericDialect {};
3172 let mut tokenizer = Tokenizer::new(&dialect, sql);
3173 let tokens = tokenizer.tokenize().unwrap();
3174 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3175 let name = Ident::new("col");
3176 let data_type = DataType::String(None);
3177 let mut extensions = ColumnExtensions::default();
3178 let result = ParserContext::parse_column_extensions(
3179 &mut parser,
3180 &name,
3181 &data_type,
3182 &mut extensions,
3183 );
3184 assert!(result.is_err());
3185 assert!(
3186 result
3187 .unwrap_err()
3188 .to_string()
3189 .contains("INVERTED index doesn't support options")
3190 );
3191 }
3192
3193 {
3195 let sql = "SKIPPING INDEX FULLTEXT INDEX";
3196 let dialect = GenericDialect {};
3197 let mut tokenizer = Tokenizer::new(&dialect, sql);
3198 let tokens = tokenizer.tokenize().unwrap();
3199 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3200 let name = Ident::new("col");
3201 let data_type = DataType::String(None);
3202 let mut extensions = ColumnExtensions::default();
3203 let result = ParserContext::parse_column_extensions(
3204 &mut parser,
3205 &name,
3206 &data_type,
3207 &mut extensions,
3208 );
3209 assert!(result.unwrap());
3210 assert!(extensions.skipping_index_options.is_some());
3211 assert!(extensions.fulltext_index_options.is_some());
3212 }
3213 }
3214
3215 #[test]
3216 fn test_parse_interval_cast() {
3217 let s = "select '10s'::INTERVAL";
3218 let stmts =
3219 ParserContext::create_with_dialect(s, &GreptimeDbDialect {}, ParseOptions::default())
3220 .unwrap();
3221 assert_eq!("SELECT '10 seconds'::INTERVAL", &stmts[0].to_string());
3222 }
3223
3224 #[test]
3225 fn test_parse_create_table_vector_index_options() {
3226 let sql = r"
3228CREATE TABLE vectors (
3229 ts TIMESTAMP TIME INDEX,
3230 vec VECTOR(128) VECTOR INDEX,
3231)";
3232 let result =
3233 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3234 .unwrap();
3235
3236 if let Statement::CreateTable(c) = &result[0] {
3237 c.columns.iter().for_each(|col| {
3238 if col.name().value == "vec" {
3239 assert!(
3240 col.extensions
3241 .vector_index_options
3242 .as_ref()
3243 .unwrap()
3244 .is_empty()
3245 );
3246 }
3247 });
3248 } else {
3249 panic!("should be create_table statement");
3250 }
3251
3252 let sql = r"
3254CREATE TABLE vectors (
3255 ts TIMESTAMP TIME INDEX,
3256 vec VECTOR(128) VECTOR INDEX WITH (metric='cosine', connectivity='32', expansion_add='256', expansion_search='128')
3257)";
3258 let result =
3259 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3260 .unwrap();
3261
3262 if let Statement::CreateTable(c) = &result[0] {
3263 c.columns.iter().for_each(|col| {
3264 if col.name().value == "vec" {
3265 let options = col.extensions.vector_index_options.as_ref().unwrap();
3266 assert_eq!(options.len(), 4);
3267 assert_eq!(options.get("metric").unwrap(), "cosine");
3268 assert_eq!(options.get("connectivity").unwrap(), "32");
3269 assert_eq!(options.get("expansion_add").unwrap(), "256");
3270 assert_eq!(options.get("expansion_search").unwrap(), "128");
3271 }
3272 });
3273 } else {
3274 panic!("should be create_table statement");
3275 }
3276 }
3277
3278 #[test]
3279 fn test_parse_create_table_vector_index_invalid_type() {
3280 let sql = r"
3282CREATE TABLE vectors (
3283 ts TIMESTAMP TIME INDEX,
3284 col INT VECTOR INDEX,
3285)";
3286 let result =
3287 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3288 assert!(result.is_err());
3289 assert!(
3290 result
3291 .unwrap_err()
3292 .to_string()
3293 .contains("VECTOR INDEX only supports Vector type columns")
3294 );
3295 }
3296
3297 #[test]
3298 fn test_parse_create_table_vector_index_duplicate() {
3299 let sql = r"
3301CREATE TABLE vectors (
3302 ts TIMESTAMP TIME INDEX,
3303 vec VECTOR(128) VECTOR INDEX VECTOR INDEX,
3304)";
3305 let result =
3306 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3307 assert!(result.is_err());
3308 assert!(
3309 result
3310 .unwrap_err()
3311 .to_string()
3312 .contains("duplicated VECTOR INDEX option")
3313 );
3314 }
3315
3316 #[test]
3317 fn test_parse_create_table_vector_index_invalid_option() {
3318 let sql = r"
3320CREATE TABLE vectors (
3321 ts TIMESTAMP TIME INDEX,
3322 vec VECTOR(128) VECTOR INDEX WITH (metric='l2sq', invalid_option='foo')
3323)";
3324 let result =
3325 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3326 assert!(result.is_err());
3327 assert!(
3328 result
3329 .unwrap_err()
3330 .to_string()
3331 .contains("invalid VECTOR INDEX option")
3332 );
3333 }
3334
3335 #[test]
3336 fn test_parse_column_extensions_vector_index() {
3337 {
3339 let sql = "VECTOR INDEX WITH (metric = 'l2sq')";
3340 let dialect = GenericDialect {};
3341 let mut tokenizer = Tokenizer::new(&dialect, sql);
3342 let tokens = tokenizer.tokenize().unwrap();
3343 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3344 let name = Ident::new("vec_col");
3345 let data_type =
3346 DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
3347 let mut extensions = ColumnExtensions {
3349 vector_options: Some(OptionMap::from([(
3350 VECTOR_OPT_DIM.to_string(),
3351 "128".to_string(),
3352 )])),
3353 ..Default::default()
3354 };
3355
3356 let result = ParserContext::parse_column_extensions(
3357 &mut parser,
3358 &name,
3359 &data_type,
3360 &mut extensions,
3361 );
3362 assert!(result.is_ok());
3363 assert!(extensions.vector_index_options.is_some());
3364 let vi_options = extensions.vector_index_options.unwrap();
3365 assert_eq!(vi_options.get("metric"), Some("l2sq"));
3366 }
3367
3368 {
3370 let sql = "VECTOR INDEX";
3371 let dialect = GenericDialect {};
3372 let mut tokenizer = Tokenizer::new(&dialect, sql);
3373 let tokens = tokenizer.tokenize().unwrap();
3374 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3375 let name = Ident::new("num_col");
3376 let data_type = DataType::Int(None); let mut extensions = ColumnExtensions::default();
3378 let result = ParserContext::parse_column_extensions(
3379 &mut parser,
3380 &name,
3381 &data_type,
3382 &mut extensions,
3383 );
3384 assert!(result.is_err());
3385 assert!(
3386 result
3387 .unwrap_err()
3388 .to_string()
3389 .contains("VECTOR INDEX only supports Vector type columns")
3390 );
3391 }
3392 }
3393}