1mod json;
16#[cfg(feature = "enterprise")]
17pub mod trigger;
18
19use std::collections::HashMap;
20
21use arrow_buffer::IntervalMonthDayNano;
22use common_catalog::consts::default_engine;
23use datafusion_common::ScalarValue;
24use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
25use datatypes::data_type::ConcreteDataType;
26use itertools::Itertools;
27use snafu::{OptionExt, ResultExt, ensure};
28use sqlparser::ast::{
29 ColumnOption, ColumnOptionDef, DataType, Expr, KeyOrIndexDisplay, NullsDistinctOption,
30 PrimaryKeyConstraint, UniqueConstraint,
31};
32use sqlparser::dialect::keywords::Keyword;
33use sqlparser::keywords::ALL_KEYWORDS;
34use sqlparser::parser::IsOptional::Mandatory;
35use sqlparser::parser::{Parser, ParserError};
36use sqlparser::tokenizer::{Token, TokenWithSpan, Word};
37use table::requests::validate_database_option;
38
39use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
40use crate::error::{
41 self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidFlowQuerySnafu,
42 InvalidIntervalSnafu, InvalidSqlSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result,
43 SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu,
44};
45use crate::parser::{FLOW, ParserContext};
46use crate::parsers::tql_parser;
47use crate::parsers::utils::{
48 self, parse_with_options, validate_column_fulltext_create_option,
49 validate_column_skipping_index_create_option, validate_column_vector_index_create_option,
50};
51use crate::statements::create::{
52 Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
53 CreateTableLike, CreateView, Partitions, SqlOrTql, TableConstraint, VECTOR_OPT_DIM,
54};
55use crate::statements::statement::Statement;
56use crate::statements::transform::type_alias::get_data_type_by_alias_name;
57use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
58use crate::util::{OptionValue, location_to_index, parse_option_string};
59
60pub const ENGINE: &str = "ENGINE";
61pub const MAXVALUE: &str = "MAXVALUE";
62pub const SINK: &str = "SINK";
63pub const EXPIRE: &str = "EXPIRE";
64pub const AFTER: &str = "AFTER";
65pub const INVERTED: &str = "INVERTED";
66pub const SKIPPING: &str = "SKIPPING";
67pub const VECTOR: &str = "VECTOR";
68
69pub type RawIntervalExpr = String;
70
71fn flow_option_map(options: HashMap<String, OptionValue>) -> OptionMap {
75 let mut flow_options = OptionMap::default();
76 for (key, value) in options {
77 flow_options.insert_options(&key, value);
78 }
79 flow_options
80}
81
82impl<'a> ParserContext<'a> {
84 pub(crate) fn parse_create(&mut self) -> Result<Statement> {
85 match self.parser.peek_token().token {
86 Token::Word(w) => match w.keyword {
87 Keyword::TABLE => self.parse_create_table(),
88
89 Keyword::SCHEMA | Keyword::DATABASE => self.parse_create_database(),
90
91 Keyword::EXTERNAL => self.parse_create_external_table(),
92
93 Keyword::OR => {
94 let _ = self.parser.next_token();
95 self.parser
96 .expect_keyword(Keyword::REPLACE)
97 .context(SyntaxSnafu)?;
98 match self.parser.next_token().token {
99 Token::Word(w) => match w.keyword {
100 Keyword::VIEW => self.parse_create_view(true),
101 Keyword::NoKeyword => {
102 let uppercase = w.value.to_uppercase();
103 match uppercase.as_str() {
104 FLOW => self.parse_create_flow(true),
105 _ => self.unsupported(w.to_string()),
106 }
107 }
108 _ => self.unsupported(w.to_string()),
109 },
110 _ => self.unsupported(w.to_string()),
111 }
112 }
113
114 Keyword::VIEW => {
115 let _ = self.parser.next_token();
116 self.parse_create_view(false)
117 }
118
119 #[cfg(feature = "enterprise")]
120 Keyword::TRIGGER => {
121 let _ = self.parser.next_token();
122 self.parse_create_trigger()
123 }
124
125 Keyword::NoKeyword => {
126 let _ = self.parser.next_token();
127 let uppercase = w.value.to_uppercase();
128 match uppercase.as_str() {
129 FLOW => self.parse_create_flow(false),
130 _ => self.unsupported(w.to_string()),
131 }
132 }
133 _ => self.unsupported(w.to_string()),
134 },
135 unexpected => self.unsupported(unexpected.to_string()),
136 }
137 }
138
139 fn parse_create_view(&mut self, or_replace: bool) -> Result<Statement> {
141 let if_not_exists = self.parse_if_not_exist()?;
142 let view_name = self.intern_parse_table_name()?;
143
144 let columns = self.parse_view_columns()?;
145
146 self.parser
147 .expect_keyword(Keyword::AS)
148 .context(SyntaxSnafu)?;
149
150 let query = self.parse_query()?;
151
152 Ok(Statement::CreateView(CreateView {
153 name: view_name,
154 columns,
155 or_replace,
156 query: Box::new(query),
157 if_not_exists,
158 }))
159 }
160
161 fn parse_view_columns(&mut self) -> Result<Vec<Ident>> {
162 let mut columns = vec![];
163 if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
164 return Ok(columns);
165 }
166
167 loop {
168 let name = self.parse_column_name().context(SyntaxSnafu)?;
169
170 columns.push(name);
171
172 let comma = self.parser.consume_token(&Token::Comma);
173 if self.parser.consume_token(&Token::RParen) {
174 break;
176 } else if !comma {
177 return self.expected("',' or ')' after column name", self.parser.peek_token());
178 }
179 }
180
181 Ok(columns)
182 }
183
184 fn parse_create_external_table(&mut self) -> Result<Statement> {
185 let _ = self.parser.next_token();
186 self.parser
187 .expect_keyword(Keyword::TABLE)
188 .context(SyntaxSnafu)?;
189 let if_not_exists = self.parse_if_not_exist()?;
190 let table_name = self.intern_parse_table_name()?;
191 let (columns, constraints) = self.parse_columns()?;
192 if !columns.is_empty() {
193 validate_time_index(&columns, &constraints)?;
194 }
195
196 let engine = self.parse_table_engine(common_catalog::consts::FILE_ENGINE)?;
197 let options = self.parse_create_table_options()?;
198 Ok(Statement::CreateExternalTable(CreateExternalTable {
199 name: table_name,
200 columns,
201 constraints,
202 options,
203 if_not_exists,
204 engine,
205 }))
206 }
207
208 fn parse_create_database(&mut self) -> Result<Statement> {
209 let _ = self.parser.next_token();
210 let if_not_exists = self.parse_if_not_exist()?;
211 let database_name = self.parse_object_name().context(error::UnexpectedSnafu {
212 expected: "a database name",
213 actual: self.peek_token_as_string(),
214 })?;
215 let database_name = Self::canonicalize_object_name(database_name)?;
216
217 let options = self
218 .parser
219 .parse_options(Keyword::WITH)
220 .context(SyntaxSnafu)?
221 .into_iter()
222 .map(parse_option_string)
223 .collect::<Result<HashMap<String, OptionValue>>>()?;
224
225 for key in options.keys() {
226 ensure!(
227 validate_database_option(key),
228 InvalidDatabaseOptionSnafu { key: key.clone() }
229 );
230 }
231 if let Some(append_mode) = options.get("append_mode").and_then(|x| x.as_string())
232 && append_mode == "true"
233 && options.contains_key("merge_mode")
234 {
235 return InvalidDatabaseOptionSnafu {
236 key: "merge_mode".to_string(),
237 }
238 .fail();
239 }
240
241 Ok(Statement::CreateDatabase(CreateDatabase {
242 name: database_name,
243 if_not_exists,
244 options: OptionMap::new(options),
245 }))
246 }
247
248 fn parse_create_table(&mut self) -> Result<Statement> {
249 let _ = self.parser.next_token();
250
251 let if_not_exists = self.parse_if_not_exist()?;
252
253 let table_name = self.intern_parse_table_name()?;
254
255 if self.parser.parse_keyword(Keyword::LIKE) {
256 let source_name = self.intern_parse_table_name()?;
257
258 return Ok(Statement::CreateTableLike(CreateTableLike {
259 table_name,
260 source_name,
261 }));
262 }
263
264 let (columns, constraints) = self.parse_columns()?;
265 validate_time_index(&columns, &constraints)?;
266
267 let partitions = self.parse_partitions()?;
268 if let Some(partitions) = &partitions {
269 validate_partitions(&columns, partitions)?;
270 }
271
272 let engine = self.parse_table_engine(default_engine())?;
273 let options = self.parse_create_table_options()?;
274 let create_table = CreateTable {
275 if_not_exists,
276 name: table_name,
277 columns,
278 engine,
279 constraints,
280 options,
281 table_id: 0, partitions,
283 };
284
285 Ok(Statement::CreateTable(create_table))
286 }
287
288 fn parse_create_flow(&mut self, or_replace: bool) -> Result<Statement> {
290 let if_not_exists = self.parse_if_not_exist()?;
291
292 let flow_name = self.intern_parse_table_name()?;
293
294 if let Token::Word(word) = self.parser.peek_token().token
296 && word.value.eq_ignore_ascii_case(SINK)
297 {
298 self.parser.next_token();
299 } else {
300 Err(ParserError::ParserError(
301 "Expect `SINK` keyword".to_string(),
302 ))
303 .context(SyntaxSnafu)?
304 }
305 self.parser
306 .expect_keyword(Keyword::TO)
307 .context(SyntaxSnafu)?;
308
309 let output_table_name = self.intern_parse_table_name()?;
310
311 let expire_after = if let Token::Word(w1) = &self.parser.peek_token().token
312 && w1.value.eq_ignore_ascii_case(EXPIRE)
313 {
314 self.parser.next_token();
315 if let Token::Word(w2) = &self.parser.peek_token().token
316 && w2.value.eq_ignore_ascii_case(AFTER)
317 {
318 self.parser.next_token();
319 Some(self.parse_interval_no_month("EXPIRE AFTER")?)
320 } else {
321 None
322 }
323 } else {
324 None
325 };
326
327 let eval_interval = if self
328 .parser
329 .consume_tokens(&[Token::make_keyword("EVAL"), Token::make_keyword("INTERVAL")])
330 {
331 Some(self.parse_interval_no_month("EVAL INTERVAL")?)
332 } else {
333 None
334 };
335
336 let comment = if self.parser.parse_keyword(Keyword::COMMENT) {
337 match self.parser.next_token() {
338 TokenWithSpan {
339 token: Token::SingleQuotedString(value, ..),
340 ..
341 } => Some(value),
342 unexpected => {
343 return self
344 .parser
345 .expected("string", unexpected)
346 .context(SyntaxSnafu);
347 }
348 }
349 } else {
350 None
351 };
352
353 let flow_options = self
354 .parser
355 .parse_options(Keyword::WITH)
356 .context(SyntaxSnafu)?
357 .into_iter()
358 .map(parse_option_string)
359 .collect::<Result<HashMap<String, OptionValue>>>()?;
360
361 self.parser
362 .expect_keyword(Keyword::AS)
363 .context(SyntaxSnafu)?;
364
365 let query = Box::new(self.parse_flow_sql_or_tql(true)?);
366
367 Ok(Statement::CreateFlow(CreateFlow {
368 flow_name,
369 sink_table_name: output_table_name,
370 or_replace,
371 if_not_exists,
372 expire_after,
373 eval_interval,
374 comment,
375 flow_options: flow_option_map(flow_options),
376 query,
377 }))
378 }
379
380 fn parse_flow_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
381 let start_loc = self.parser.peek_token().span.start;
382 let start_index = location_to_index(self.sql, &start_loc);
383
384 let starts_with_with = matches!(
385 self.parser.peek_token().token,
386 Token::Word(w) if w.keyword == Keyword::WITH
387 );
388
389 let query = match self.parser.peek_token().token {
391 Token::Word(w) => match w.keyword {
392 Keyword::SELECT => self.parse_query(),
393 Keyword::WITH => self.parse_with_tql_with_now(require_now_expr),
394 Keyword::NoKeyword
395 if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
396 {
397 self.parse_tql(require_now_expr)
398 }
399
400 _ => self.unsupported(self.peek_token_as_string()),
401 },
402 _ => self.unsupported(self.peek_token_as_string()),
403 }?;
404
405 if starts_with_with {
406 let Statement::Query(query) = &query else {
407 return InvalidFlowQuerySnafu {
408 reason: "Expect a query after WITH".to_string(),
409 }
410 .fail();
411 };
412
413 if utils::has_tql_cte(query) && !utils::is_simple_tql_cte_query(query) {
414 return InvalidFlowQuerySnafu {
415 reason: "WITH is only supported for the simplest TQL CTE in CREATE FLOW"
416 .to_string(),
417 }
418 .fail();
419 }
420 }
421
422 let end_token = self.parser.peek_token();
423
424 let raw_query = if end_token == Token::EOF {
425 &self.sql[start_index..]
426 } else {
427 let end_loc = end_token.span.end;
428 let end_index = location_to_index(self.sql, &end_loc);
429 &self.sql[start_index..end_index.min(self.sql.len())]
430 };
431 let raw_query = raw_query.trim_end_matches(";");
432
433 let query = SqlOrTql::try_from_statement(query, raw_query)?;
434 Ok(query)
435 }
436
437 fn parse_interval_no_month(&mut self, context: &str) -> Result<i64> {
439 let interval = self.parse_interval_month_day_nano()?.0;
440 if interval.months != 0 {
441 return InvalidIntervalSnafu {
442 reason: format!("Interval with months is not allowed in {context}"),
443 }
444 .fail();
445 }
446 Ok(
447 interval.nanoseconds / 1_000_000_000
448 + interval.days as i64 * 60 * 60 * 24
449 + interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, )
453 }
454
455 fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> {
457 let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
458 let raw_interval_expr = interval_expr.to_string();
459 let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone(), false)?
460 .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
461 .ok()
462 .with_context(|| InvalidIntervalSnafu {
463 reason: format!("cannot cast {} to interval type", interval_expr),
464 })?;
465 if let ScalarValue::IntervalMonthDayNano(Some(interval)) = interval {
466 Ok((interval, raw_interval_expr))
467 } else {
468 unreachable!()
469 }
470 }
471
472 fn parse_if_not_exist(&mut self) -> Result<bool> {
473 match self.parser.peek_token().token {
474 Token::Word(w) if Keyword::IF != w.keyword => return Ok(false),
475 _ => {}
476 }
477
478 if self.parser.parse_keywords(&[Keyword::IF, Keyword::NOT]) {
479 return self
480 .parser
481 .expect_keyword(Keyword::EXISTS)
482 .map(|_| true)
483 .context(UnexpectedSnafu {
484 expected: "EXISTS",
485 actual: self.peek_token_as_string(),
486 });
487 }
488
489 if self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]) {
490 return UnsupportedSnafu { keyword: "EXISTS" }.fail();
491 }
492
493 Ok(false)
494 }
495
496 fn parse_create_table_options(&mut self) -> Result<OptionMap> {
497 parse_with_options(&mut self.parser)
498 }
499
500 fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
502 if !self.parser.parse_keyword(Keyword::PARTITION) {
503 return Ok(None);
504 }
505
506 self.parse_partition_on_columns().map(Some)
507 }
508
509 pub(crate) fn parse_partition_on_columns(&mut self) -> Result<Partitions> {
511 self.parser
512 .expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
513 .context(error::UnexpectedSnafu {
514 expected: "ON, COLUMNS",
515 actual: self.peek_token_as_string(),
516 })?;
517
518 let raw_column_list = self
519 .parser
520 .parse_parenthesized_column_list(Mandatory, false)
521 .context(error::SyntaxSnafu)?;
522 let column_list = raw_column_list
523 .into_iter()
524 .map(Self::canonicalize_identifier)
525 .collect();
526
527 let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
528
529 Ok(Partitions { column_list, exprs })
530 }
531
532 fn parse_partition_entry(&mut self) -> Result<Expr> {
533 self.parser.parse_expr().context(error::SyntaxSnafu)
534 }
535
536 fn parse_comma_separated<T, F>(&mut self, mut f: F) -> Result<Vec<T>>
538 where
539 F: FnMut(&mut ParserContext<'a>) -> Result<T>,
540 {
541 self.parser
542 .expect_token(&Token::LParen)
543 .context(error::UnexpectedSnafu {
544 expected: "(",
545 actual: self.peek_token_as_string(),
546 })?;
547
548 let mut values = vec![];
549 while self.parser.peek_token() != Token::RParen {
550 values.push(f(self)?);
551 if !self.parser.consume_token(&Token::Comma) {
552 break;
553 }
554 }
555
556 self.parser
557 .expect_token(&Token::RParen)
558 .context(error::UnexpectedSnafu {
559 expected: ")",
560 actual: self.peek_token_as_string(),
561 })?;
562
563 Ok(values)
564 }
565
566 fn parse_columns(&mut self) -> Result<(Vec<Column>, Vec<TableConstraint>)> {
568 let mut columns = vec![];
569 let mut constraints = vec![];
570 if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
571 return Ok((columns, constraints));
572 }
573
574 loop {
575 if let Some(constraint) = self.parse_optional_table_constraint()? {
576 constraints.push(constraint);
577 } else if let Token::Word(_) = self.parser.peek_token().token {
578 self.parse_column(&mut columns, &mut constraints)?;
579 } else {
580 return self.expected(
581 "column name or constraint definition",
582 self.parser.peek_token(),
583 );
584 }
585 let comma = self.parser.consume_token(&Token::Comma);
586 if self.parser.consume_token(&Token::RParen) {
587 break;
589 } else if !comma {
590 return self.expected(
591 "',' or ')' after column definition",
592 self.parser.peek_token(),
593 );
594 }
595 }
596
597 Ok((columns, constraints))
598 }
599
600 fn parse_column(
601 &mut self,
602 columns: &mut Vec<Column>,
603 constraints: &mut Vec<TableConstraint>,
604 ) -> Result<()> {
605 let mut column = self.parse_column_def()?;
606
607 let mut time_index_opt_idx = None;
608 for (index, opt) in column.options().iter().enumerate() {
609 if let ColumnOption::DialectSpecific(tokens) = &opt.option
610 && matches!(
611 &tokens[..],
612 [
613 Token::Word(Word {
614 keyword: Keyword::TIME,
615 ..
616 }),
617 Token::Word(Word {
618 keyword: Keyword::INDEX,
619 ..
620 })
621 ]
622 )
623 {
624 ensure!(
625 time_index_opt_idx.is_none(),
626 InvalidColumnOptionSnafu {
627 name: column.name().to_string(),
628 msg: "duplicated time index",
629 }
630 );
631 time_index_opt_idx = Some(index);
632
633 let constraint = TableConstraint::TimeIndex {
634 column: Ident::new(column.name().value.clone()),
635 };
636 constraints.push(constraint);
637 }
638 }
639
640 if let Some(index) = time_index_opt_idx {
641 ensure!(
642 !column.options().contains(&ColumnOptionDef {
643 option: ColumnOption::Null,
644 name: None,
645 }),
646 InvalidColumnOptionSnafu {
647 name: column.name().to_string(),
648 msg: "time index column can't be null",
649 }
650 );
651
652 let data_type = get_unalias_type(column.data_type());
654 ensure!(
655 matches!(data_type, DataType::Timestamp(_, _)),
656 InvalidColumnOptionSnafu {
657 name: column.name().to_string(),
658 msg: "time index column data type should be timestamp",
659 }
660 );
661
662 let not_null_opt = ColumnOptionDef {
663 option: ColumnOption::NotNull,
664 name: None,
665 };
666
667 if !column.options().contains(¬_null_opt) {
668 column.mut_options().push(not_null_opt);
669 }
670
671 let _ = column.mut_options().remove(index);
672 }
673
674 columns.push(column);
675
676 Ok(())
677 }
678
679 fn parse_column_name(&mut self) -> std::result::Result<Ident, ParserError> {
681 let name = self.parser.parse_identifier()?;
682 if name.quote_style.is_none() &&
683 ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()
685 {
686 return Err(ParserError::ParserError(format!(
687 "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
688 &name.value
689 )));
690 }
691
692 Ok(name)
693 }
694
695 pub fn parse_column_def(&mut self) -> Result<Column> {
696 let name = self.parse_column_name().context(SyntaxSnafu)?;
697 let parser = &mut self.parser;
698
699 ensure!(
700 !(name.quote_style.is_none() &&
701 ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()),
703 InvalidSqlSnafu {
704 msg: format!(
705 "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
706 &name.value
707 ),
708 }
709 );
710
711 let mut extensions = ColumnExtensions::default();
712
713 let data_type = parser.parse_data_type().context(SyntaxSnafu)?;
714 if matches!(data_type, DataType::JSON) {
717 extensions.json_datatype_options = json::parse_json_datatype_options(parser)?;
718 }
719
720 let mut options = vec![];
721 loop {
722 if parser.parse_keyword(Keyword::CONSTRAINT) {
723 let name = Some(parser.parse_identifier().context(SyntaxSnafu)?);
724 if let Some(option) = Self::parse_optional_column_option(parser)? {
725 options.push(ColumnOptionDef { name, option });
726 } else {
727 return parser
728 .expected(
729 "constraint details after CONSTRAINT <name>",
730 parser.peek_token(),
731 )
732 .context(SyntaxSnafu);
733 }
734 } else if let Some(option) = Self::parse_optional_column_option(parser)? {
735 options.push(ColumnOptionDef { name: None, option });
736 } else if !Self::parse_column_extensions(parser, &name, &data_type, &mut extensions)? {
737 break;
738 };
739 }
740
741 Ok(Column {
742 column_def: ColumnDef {
743 name: Self::canonicalize_identifier(name),
744 data_type,
745 options,
746 },
747 extensions,
748 })
749 }
750
751 fn parse_optional_column_option(parser: &mut Parser<'_>) -> Result<Option<ColumnOption>> {
752 if parser.parse_keywords(&[Keyword::CHARACTER, Keyword::SET]) {
753 Ok(Some(ColumnOption::CharacterSet(
754 parser.parse_object_name(false).context(SyntaxSnafu)?,
755 )))
756 } else if parser.parse_keywords(&[Keyword::NOT, Keyword::NULL]) {
757 Ok(Some(ColumnOption::NotNull))
758 } else if parser.parse_keywords(&[Keyword::COMMENT]) {
759 match parser.next_token() {
760 TokenWithSpan {
761 token: Token::SingleQuotedString(value, ..),
762 ..
763 } => Ok(Some(ColumnOption::Comment(value))),
764 unexpected => parser.expected("string", unexpected).context(SyntaxSnafu),
765 }
766 } else if parser.parse_keyword(Keyword::NULL) {
767 Ok(Some(ColumnOption::Null))
768 } else if parser.parse_keyword(Keyword::DEFAULT) {
769 Ok(Some(ColumnOption::Default(
770 parser.parse_expr().context(SyntaxSnafu)?,
771 )))
772 } else if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
773 Ok(Some(ColumnOption::PrimaryKey(PrimaryKeyConstraint {
774 name: None,
775 index_name: None,
776 index_type: None,
777 columns: vec![],
778 index_options: vec![],
779 characteristics: None,
780 })))
781 } else if parser.parse_keyword(Keyword::UNIQUE) {
782 Ok(Some(ColumnOption::Unique(UniqueConstraint {
783 name: None,
784 index_name: None,
785 index_type_display: KeyOrIndexDisplay::None,
786 index_type: None,
787 columns: vec![],
788 index_options: vec![],
789 characteristics: None,
790 nulls_distinct: NullsDistinctOption::None,
791 })))
792 } else if parser.parse_keywords(&[Keyword::TIME, Keyword::INDEX]) {
793 Ok(Some(ColumnOption::DialectSpecific(vec![
795 Token::Word(Word {
796 value: "TIME".to_string(),
797 quote_style: None,
798 keyword: Keyword::TIME,
799 }),
800 Token::Word(Word {
801 value: "INDEX".to_string(),
802 quote_style: None,
803 keyword: Keyword::INDEX,
804 }),
805 ])))
806 } else {
807 Ok(None)
808 }
809 }
810
811 fn parse_column_extensions(
817 parser: &mut Parser<'_>,
818 column_name: &Ident,
819 column_type: &DataType,
820 column_extensions: &mut ColumnExtensions,
821 ) -> Result<bool> {
822 if let DataType::Custom(name, tokens) = column_type
823 && name.0.len() == 1
824 && &name.0[0].to_string_unquoted().to_uppercase() == "VECTOR"
825 {
826 ensure!(
827 tokens.len() == 1,
828 InvalidColumnOptionSnafu {
829 name: column_name.to_string(),
830 msg: "VECTOR type should have dimension",
831 }
832 );
833
834 let dimension =
835 tokens[0]
836 .parse::<u32>()
837 .ok()
838 .with_context(|| InvalidColumnOptionSnafu {
839 name: column_name.to_string(),
840 msg: "dimension should be a positive integer",
841 })?;
842
843 let options = OptionMap::from([(VECTOR_OPT_DIM.to_string(), dimension.to_string())]);
844 column_extensions.vector_options = Some(options);
845 }
846
847 let mut is_index_declared = false;
849
850 if let Token::Word(word) = parser.peek_token().token
852 && word.value.eq_ignore_ascii_case(SKIPPING)
853 {
854 parser.next_token();
855 ensure!(
857 parser.parse_keyword(Keyword::INDEX),
858 InvalidColumnOptionSnafu {
859 name: column_name.to_string(),
860 msg: "expect INDEX after SKIPPING keyword",
861 }
862 );
863 ensure!(
864 column_extensions.skipping_index_options.is_none(),
865 InvalidColumnOptionSnafu {
866 name: column_name.to_string(),
867 msg: "duplicated SKIPPING index option",
868 }
869 );
870
871 let options = parser
872 .parse_options(Keyword::WITH)
873 .context(error::SyntaxSnafu)?
874 .into_iter()
875 .map(parse_option_string)
876 .collect::<Result<Vec<_>>>()?;
877
878 for (key, _) in options.iter() {
879 ensure!(
880 validate_column_skipping_index_create_option(key),
881 InvalidColumnOptionSnafu {
882 name: column_name.to_string(),
883 msg: format!("invalid SKIPPING INDEX option: {key}"),
884 }
885 );
886 }
887
888 let options = OptionMap::new(options);
889 column_extensions.skipping_index_options = Some(options);
890 is_index_declared |= true;
891 }
892
893 if parser.parse_keyword(Keyword::FULLTEXT) {
895 ensure!(
897 parser.parse_keyword(Keyword::INDEX),
898 InvalidColumnOptionSnafu {
899 name: column_name.to_string(),
900 msg: "expect INDEX after FULLTEXT keyword",
901 }
902 );
903
904 ensure!(
905 column_extensions.fulltext_index_options.is_none(),
906 InvalidColumnOptionSnafu {
907 name: column_name.to_string(),
908 msg: "duplicated FULLTEXT INDEX option",
909 }
910 );
911
912 let column_type = get_unalias_type(column_type);
913 let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
914 ensure!(
915 data_type == ConcreteDataType::string_datatype(),
916 InvalidColumnOptionSnafu {
917 name: column_name.to_string(),
918 msg: "FULLTEXT index only supports string type",
919 }
920 );
921
922 let options = parser
923 .parse_options(Keyword::WITH)
924 .context(error::SyntaxSnafu)?
925 .into_iter()
926 .map(parse_option_string)
927 .collect::<Result<Vec<_>>>()?;
928
929 for (key, _) in options.iter() {
930 ensure!(
931 validate_column_fulltext_create_option(key),
932 InvalidColumnOptionSnafu {
933 name: column_name.to_string(),
934 msg: format!("invalid FULLTEXT INDEX option: {key}"),
935 }
936 );
937 }
938
939 let options = OptionMap::new(options);
940 column_extensions.fulltext_index_options = Some(options);
941 is_index_declared |= true;
942 }
943
944 if let Token::Word(word) = parser.peek_token().token
946 && word.value.eq_ignore_ascii_case(INVERTED)
947 {
948 parser.next_token();
949 ensure!(
951 parser.parse_keyword(Keyword::INDEX),
952 InvalidColumnOptionSnafu {
953 name: column_name.to_string(),
954 msg: "expect INDEX after INVERTED keyword",
955 }
956 );
957
958 ensure!(
959 column_extensions.inverted_index_options.is_none(),
960 InvalidColumnOptionSnafu {
961 name: column_name.to_string(),
962 msg: "duplicated INVERTED index option",
963 }
964 );
965
966 let with_token = parser.peek_token();
969 ensure!(
970 with_token.token
971 != Token::Word(Word {
972 value: "WITH".to_string(),
973 keyword: Keyword::WITH,
974 quote_style: None,
975 }),
976 InvalidColumnOptionSnafu {
977 name: column_name.to_string(),
978 msg: "INVERTED index doesn't support options",
979 }
980 );
981
982 column_extensions.inverted_index_options = Some(OptionMap::default());
983 is_index_declared |= true;
984 }
985
986 if let Token::Word(word) = parser.peek_token().token
988 && word.value.eq_ignore_ascii_case(VECTOR)
989 {
990 parser.next_token();
991 ensure!(
993 parser.parse_keyword(Keyword::INDEX),
994 InvalidColumnOptionSnafu {
995 name: column_name.to_string(),
996 msg: "expect INDEX after VECTOR keyword",
997 }
998 );
999
1000 ensure!(
1001 column_extensions.vector_index_options.is_none(),
1002 InvalidColumnOptionSnafu {
1003 name: column_name.to_string(),
1004 msg: "duplicated VECTOR INDEX option",
1005 }
1006 );
1007
1008 let column_type = get_unalias_type(column_type);
1010 let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
1011 ensure!(
1012 matches!(data_type, ConcreteDataType::Vector(_)),
1013 InvalidColumnOptionSnafu {
1014 name: column_name.to_string(),
1015 msg: "VECTOR INDEX only supports Vector type columns",
1016 }
1017 );
1018
1019 let options = parser
1020 .parse_options(Keyword::WITH)
1021 .context(error::SyntaxSnafu)?
1022 .into_iter()
1023 .map(parse_option_string)
1024 .collect::<Result<Vec<_>>>()?;
1025
1026 for (key, _) in options.iter() {
1027 ensure!(
1028 validate_column_vector_index_create_option(key),
1029 InvalidColumnOptionSnafu {
1030 name: column_name.to_string(),
1031 msg: format!("invalid VECTOR INDEX option: {key}"),
1032 }
1033 );
1034 }
1035
1036 let options = OptionMap::new(options);
1037 column_extensions.vector_index_options = Some(options);
1038 is_index_declared |= true;
1039 }
1040
1041 Ok(is_index_declared)
1042 }
1043
1044 fn parse_optional_table_constraint(&mut self) -> Result<Option<TableConstraint>> {
1045 match self.parser.next_token() {
1046 TokenWithSpan {
1047 token: Token::Word(w),
1048 ..
1049 } if w.keyword == Keyword::PRIMARY => {
1050 self.parser
1051 .expect_keyword(Keyword::KEY)
1052 .context(error::UnexpectedSnafu {
1053 expected: "KEY",
1054 actual: self.peek_token_as_string(),
1055 })?;
1056 let raw_columns = self
1057 .parser
1058 .parse_parenthesized_column_list(Mandatory, false)
1059 .context(error::SyntaxSnafu)?;
1060 let columns = raw_columns
1061 .into_iter()
1062 .map(Self::canonicalize_identifier)
1063 .collect();
1064 Ok(Some(TableConstraint::PrimaryKey { columns }))
1065 }
1066 TokenWithSpan {
1067 token: Token::Word(w),
1068 ..
1069 } if w.keyword == Keyword::TIME => {
1070 self.parser
1071 .expect_keyword(Keyword::INDEX)
1072 .context(error::UnexpectedSnafu {
1073 expected: "INDEX",
1074 actual: self.peek_token_as_string(),
1075 })?;
1076
1077 let raw_columns = self
1078 .parser
1079 .parse_parenthesized_column_list(Mandatory, false)
1080 .context(error::SyntaxSnafu)?;
1081 let mut columns = raw_columns
1082 .into_iter()
1083 .map(Self::canonicalize_identifier)
1084 .collect::<Vec<_>>();
1085
1086 ensure!(
1087 columns.len() == 1,
1088 InvalidTimeIndexSnafu {
1089 msg: "it should contain only one column in time index",
1090 }
1091 );
1092
1093 Ok(Some(TableConstraint::TimeIndex {
1094 column: columns.pop().unwrap(),
1095 }))
1096 }
1097 _ => {
1098 self.parser.prev_token();
1099 Ok(None)
1100 }
1101 }
1102 }
1103
1104 fn parse_table_engine(&mut self, default: &str) -> Result<String> {
1106 if !self.consume_token(ENGINE) {
1107 return Ok(default.to_string());
1108 }
1109
1110 self.parser
1111 .expect_token(&Token::Eq)
1112 .context(error::UnexpectedSnafu {
1113 expected: "=",
1114 actual: self.peek_token_as_string(),
1115 })?;
1116
1117 let token = self.parser.next_token();
1118 if let Token::Word(w) = token.token {
1119 Ok(w.value)
1120 } else {
1121 self.expected("'Engine' is missing", token)
1122 }
1123 }
1124}
1125
1126fn validate_time_index(columns: &[Column], constraints: &[TableConstraint]) -> Result<()> {
1127 let time_index_constraints: Vec<_> = constraints
1128 .iter()
1129 .filter_map(|c| match c {
1130 TableConstraint::TimeIndex { column } => Some(column),
1131 _ => None,
1132 })
1133 .unique()
1134 .collect();
1135
1136 ensure!(!time_index_constraints.is_empty(), MissingTimeIndexSnafu);
1137 ensure!(
1138 time_index_constraints.len() == 1,
1139 InvalidTimeIndexSnafu {
1140 msg: format!(
1141 "expected only one time index constraint but actual {}",
1142 time_index_constraints.len()
1143 ),
1144 }
1145 );
1146
1147 let time_index_column_ident = &time_index_constraints[0];
1150 let time_index_column = columns
1151 .iter()
1152 .find(|c| c.name().value == *time_index_column_ident.value)
1153 .with_context(|| InvalidTimeIndexSnafu {
1154 msg: format!(
1155 "time index column {} not found in columns",
1156 time_index_column_ident
1157 ),
1158 })?;
1159
1160 let time_index_data_type = get_unalias_type(time_index_column.data_type());
1161 ensure!(
1162 matches!(time_index_data_type, DataType::Timestamp(_, _)),
1163 InvalidColumnOptionSnafu {
1164 name: time_index_column.name().to_string(),
1165 msg: "time index column data type should be timestamp",
1166 }
1167 );
1168
1169 Ok(())
1170}
1171
1172fn get_unalias_type(data_type: &DataType) -> DataType {
1173 match data_type {
1174 DataType::Custom(name, tokens) if name.0.len() == 1 && tokens.is_empty() => {
1175 if let Some(real_type) =
1176 get_data_type_by_alias_name(name.0[0].to_string_unquoted().as_str())
1177 {
1178 real_type
1179 } else {
1180 data_type.clone()
1181 }
1182 }
1183 _ => data_type.clone(),
1184 }
1185}
1186
1187fn validate_partitions(columns: &[Column], partitions: &Partitions) -> Result<()> {
1188 let partition_columns = ensure_partition_columns_defined(columns, partitions)?;
1189
1190 ensure_exprs_are_binary(&partitions.exprs, &partition_columns)?;
1191
1192 Ok(())
1193}
1194
1195fn ensure_exprs_are_binary(exprs: &[Expr], columns: &[&Column]) -> Result<()> {
1197 for expr in exprs {
1198 if let Expr::BinaryOp { left, op: _, right } = expr {
1200 ensure_one_expr(left, columns)?;
1201 ensure_one_expr(right, columns)?;
1202 } else {
1203 return error::InvalidSqlSnafu {
1204 msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1205 }
1206 .fail();
1207 }
1208 }
1209 Ok(())
1210}
1211
1212fn ensure_one_expr(expr: &Expr, columns: &[&Column]) -> Result<()> {
1216 match expr {
1217 Expr::BinaryOp { left, op: _, right } => {
1218 ensure_one_expr(left, columns)?;
1219 ensure_one_expr(right, columns)?;
1220 Ok(())
1221 }
1222 Expr::Identifier(ident) => {
1223 let column_name = &ident.value;
1224 ensure!(
1225 columns.iter().any(|c| &c.name().value == column_name),
1226 error::InvalidSqlSnafu {
1227 msg: format!(
1228 "Column {:?} in rule expr is not referenced in PARTITION ON",
1229 column_name
1230 ),
1231 }
1232 );
1233 Ok(())
1234 }
1235 Expr::Value(_) => Ok(()),
1236 Expr::UnaryOp { expr, .. } => {
1237 ensure_one_expr(expr, columns)?;
1238 Ok(())
1239 }
1240 _ => error::InvalidSqlSnafu {
1241 msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1242 }
1243 .fail(),
1244 }
1245}
1246
1247fn ensure_partition_columns_defined<'a>(
1249 columns: &'a [Column],
1250 partitions: &'a Partitions,
1251) -> Result<Vec<&'a Column>> {
1252 partitions
1253 .column_list
1254 .iter()
1255 .map(|x| {
1256 let x = ParserContext::canonicalize_identifier(x.clone());
1257 columns
1260 .iter()
1261 .find(|c| *c.name().value == x.value)
1262 .context(error::InvalidSqlSnafu {
1263 msg: format!("Partition column {:?} not defined", x.value),
1264 })
1265 })
1266 .collect::<Result<Vec<&Column>>>()
1267}
1268
1269#[cfg(test)]
1270mod tests {
1271 use std::assert_matches;
1272 use std::collections::HashMap;
1273
1274 use common_catalog::consts::FILE_ENGINE;
1275 use common_error::ext::ErrorExt;
1276 use sqlparser::ast::ColumnOption::NotNull;
1277 use sqlparser::ast::{BinaryOperator, Expr, ObjectName, ObjectNamePart, Value};
1278 use sqlparser::dialect::GenericDialect;
1279 use sqlparser::tokenizer::Tokenizer;
1280
1281 use super::*;
1282 use crate::dialect::GreptimeDbDialect;
1283 use crate::parser::ParseOptions;
1284
1285 fn string_option_map(
1286 entries: impl IntoIterator<Item = (&'static str, &'static str)>,
1287 ) -> OptionMap {
1288 OptionMap::new(entries.into_iter().map(|(key, value)| {
1289 (
1290 key.to_string(),
1291 OptionValue::try_new(Expr::Value(
1292 Value::SingleQuotedString(value.to_string()).into(),
1293 ))
1294 .unwrap(),
1295 )
1296 }))
1297 }
1298
1299 #[test]
1300 fn test_parse_create_table_like() {
1301 let sql = "CREATE TABLE t1 LIKE t2";
1302 let stmts =
1303 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1304 .unwrap();
1305
1306 assert_eq!(1, stmts.len());
1307 match &stmts[0] {
1308 Statement::CreateTableLike(c) => {
1309 assert_eq!(c.table_name.to_string(), "t1");
1310 assert_eq!(c.source_name.to_string(), "t2");
1311 }
1312 _ => unreachable!(),
1313 }
1314 }
1315
1316 #[test]
1317 fn test_validate_external_table_options() {
1318 let sql = "CREATE EXTERNAL TABLE city (
1319 host string,
1320 ts timestamp,
1321 cpu float64 default 0,
1322 memory float64,
1323 TIME INDEX (ts),
1324 PRIMARY KEY(ts, host)
1325 ) with(location='/var/data/city.csv',format='csv',foo='bar');";
1326
1327 let result =
1328 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1329 assert!(matches!(
1330 result,
1331 Err(error::Error::InvalidTableOption { .. })
1332 ));
1333 }
1334
1335 #[test]
1336 fn test_parse_create_external_table() {
1337 struct Test<'a> {
1338 sql: &'a str,
1339 expected_table_name: &'a str,
1340 expected_options: HashMap<&'a str, &'a str>,
1341 expected_engine: &'a str,
1342 expected_if_not_exist: bool,
1343 }
1344
1345 let tests = [
1346 Test {
1347 sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
1348 expected_table_name: "city",
1349 expected_options: HashMap::from([
1350 ("location", "/var/data/city.csv"),
1351 ("format", "csv"),
1352 ]),
1353 expected_engine: FILE_ENGINE,
1354 expected_if_not_exist: false,
1355 },
1356 Test {
1357 sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');",
1358 expected_table_name: "city",
1359 expected_options: HashMap::from([
1360 ("location", "/var/data/city.csv"),
1361 ("format", "csv"),
1362 ]),
1363 expected_engine: "foo",
1364 expected_if_not_exist: true,
1365 },
1366 Test {
1367 sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv','compaction.type'='bar');",
1368 expected_table_name: "city",
1369 expected_options: HashMap::from([
1370 ("location", "/var/data/city.csv"),
1371 ("format", "csv"),
1372 ("compaction.type", "bar"),
1373 ]),
1374 expected_engine: "foo",
1375 expected_if_not_exist: true,
1376 },
1377 ];
1378
1379 for test in tests {
1380 let stmts = ParserContext::create_with_dialect(
1381 test.sql,
1382 &GreptimeDbDialect {},
1383 ParseOptions::default(),
1384 )
1385 .unwrap();
1386 assert_eq!(1, stmts.len());
1387 match &stmts[0] {
1388 Statement::CreateExternalTable(c) => {
1389 assert_eq!(c.name.to_string(), test.expected_table_name.to_string());
1390 assert_eq!(c.options.to_str_map(), test.expected_options);
1391 assert_eq!(c.if_not_exists, test.expected_if_not_exist);
1392 assert_eq!(c.engine, test.expected_engine);
1393 }
1394 _ => unreachable!(),
1395 }
1396 }
1397 }
1398
1399 #[test]
1400 fn test_parse_create_external_table_with_schema() {
1401 let sql = "CREATE EXTERNAL TABLE city (
1402 host string,
1403 ts timestamp,
1404 cpu float32 default 0,
1405 memory float64,
1406 TIME INDEX (ts),
1407 PRIMARY KEY(ts, host),
1408 ) with(location='/var/data/city.csv',format='csv');";
1409
1410 let options = HashMap::from([("location", "/var/data/city.csv"), ("format", "csv")]);
1411
1412 let stmts =
1413 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1414 .unwrap();
1415 assert_eq!(1, stmts.len());
1416 match &stmts[0] {
1417 Statement::CreateExternalTable(c) => {
1418 assert_eq!(c.name.to_string(), "city");
1419 assert_eq!(c.options.to_str_map(), options);
1420
1421 let columns = &c.columns;
1422 assert_column_def(&columns[0].column_def, "host", "STRING");
1423 assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
1424 assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
1425 assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
1426
1427 let constraints = &c.constraints;
1428 assert_eq!(
1429 &constraints[0],
1430 &TableConstraint::TimeIndex {
1431 column: Ident::new("ts"),
1432 }
1433 );
1434 assert_eq!(
1435 &constraints[1],
1436 &TableConstraint::PrimaryKey {
1437 columns: vec![Ident::new("ts"), Ident::new("host")]
1438 }
1439 );
1440 }
1441 _ => unreachable!(),
1442 }
1443 }
1444
1445 #[test]
1446 fn test_parse_create_database() {
1447 let sql = "create database";
1448 let result =
1449 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1450 assert!(
1451 result
1452 .unwrap_err()
1453 .to_string()
1454 .contains("Unexpected token while parsing SQL statement")
1455 );
1456
1457 let sql = "create database prometheus";
1458 let stmts =
1459 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1460 .unwrap();
1461
1462 assert_eq!(1, stmts.len());
1463 match &stmts[0] {
1464 Statement::CreateDatabase(c) => {
1465 assert_eq!(c.name.to_string(), "prometheus");
1466 assert!(!c.if_not_exists);
1467 }
1468 _ => unreachable!(),
1469 }
1470
1471 let sql = "create database if not exists prometheus";
1472 let stmts =
1473 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1474 .unwrap();
1475
1476 assert_eq!(1, stmts.len());
1477 match &stmts[0] {
1478 Statement::CreateDatabase(c) => {
1479 assert_eq!(c.name.to_string(), "prometheus");
1480 assert!(c.if_not_exists);
1481 }
1482 _ => unreachable!(),
1483 }
1484
1485 let sql = "CREATE DATABASE `fOo`";
1486 let result =
1487 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1488 let stmts = result.unwrap();
1489 match &stmts.last().unwrap() {
1490 Statement::CreateDatabase(c) => {
1491 assert_eq!(c.name, vec![Ident::with_quote('`', "fOo")].into());
1492 assert!(!c.if_not_exists);
1493 }
1494 _ => unreachable!(),
1495 }
1496
1497 let sql = "CREATE DATABASE prometheus with (ttl='1h');";
1498 let result =
1499 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1500 let stmts = result.unwrap();
1501 match &stmts[0] {
1502 Statement::CreateDatabase(c) => {
1503 assert_eq!(c.name.to_string(), "prometheus");
1504 assert!(!c.if_not_exists);
1505 assert_eq!(c.options.get("ttl").unwrap(), "1h");
1506 }
1507 _ => unreachable!(),
1508 }
1509 }
1510
1511 #[test]
1512 fn test_parse_create_flow_more_testcases() {
1513 use pretty_assertions::assert_eq;
1514 fn parse_create_flow(sql: &str) -> CreateFlow {
1515 let stmts = ParserContext::create_with_dialect(
1516 sql,
1517 &GreptimeDbDialect {},
1518 ParseOptions::default(),
1519 )
1520 .unwrap();
1521 assert_eq!(1, stmts.len());
1522 match &stmts[0] {
1523 Statement::CreateFlow(c) => c.clone(),
1524 _ => unreachable!(),
1525 }
1526 }
1527 struct CreateFlowWoutQuery {
1528 pub flow_name: ObjectName,
1530 pub sink_table_name: ObjectName,
1532 pub or_replace: bool,
1534 pub if_not_exists: bool,
1536 pub expire_after: Option<i64>,
1539 pub comment: Option<String>,
1541 pub flow_options: OptionMap,
1543 }
1544 let testcases = vec![
1545 (
1546 r"
1547CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1548SINK TO schema_1.table_1
1549EXPIRE AFTER INTERVAL '5 minutes'
1550COMMENT 'test comment'
1551AS
1552SELECT max(c1), min(c2) FROM schema_2.table_2;",
1553 CreateFlowWoutQuery {
1554 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1555 sink_table_name: ObjectName::from(vec![
1556 Ident::new("schema_1"),
1557 Ident::new("table_1"),
1558 ]),
1559 or_replace: true,
1560 if_not_exists: true,
1561 expire_after: Some(300),
1562 comment: Some("test comment".to_string()),
1563 flow_options: OptionMap::default(),
1564 },
1565 ),
1566 (
1567 r"
1568CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1569SINK TO schema_1.table_1
1570EXPIRE AFTER INTERVAL '300 s'
1571COMMENT 'test comment'
1572AS
1573SELECT max(c1), min(c2) FROM schema_2.table_2;",
1574 CreateFlowWoutQuery {
1575 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1576 sink_table_name: ObjectName::from(vec![
1577 Ident::new("schema_1"),
1578 Ident::new("table_1"),
1579 ]),
1580 or_replace: true,
1581 if_not_exists: true,
1582 expire_after: Some(300),
1583 comment: Some("test comment".to_string()),
1584 flow_options: OptionMap::default(),
1585 },
1586 ),
1587 (
1588 r"
1589CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1590SINK TO schema_1.table_1
1591EXPIRE AFTER '5 minutes'
1592COMMENT 'test comment'
1593AS
1594SELECT max(c1), min(c2) FROM schema_2.table_2;",
1595 CreateFlowWoutQuery {
1596 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1597 sink_table_name: ObjectName::from(vec![
1598 Ident::new("schema_1"),
1599 Ident::new("table_1"),
1600 ]),
1601 or_replace: true,
1602 if_not_exists: true,
1603 expire_after: Some(300),
1604 comment: Some("test comment".to_string()),
1605 flow_options: OptionMap::default(),
1606 },
1607 ),
1608 (
1609 r"
1610CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1611SINK TO schema_1.table_1
1612EXPIRE AFTER '300 s'
1613COMMENT 'test comment'
1614AS
1615SELECT max(c1), min(c2) FROM schema_2.table_2;",
1616 CreateFlowWoutQuery {
1617 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1618 sink_table_name: ObjectName::from(vec![
1619 Ident::new("schema_1"),
1620 Ident::new("table_1"),
1621 ]),
1622 or_replace: true,
1623 if_not_exists: true,
1624 expire_after: Some(300),
1625 comment: Some("test comment".to_string()),
1626 flow_options: OptionMap::default(),
1627 },
1628 ),
1629 (
1630 r"
1631CREATE FLOW `task_2`
1632SINK TO schema_1.table_1
1633EXPIRE AFTER '2 days 1h 2 min'
1634AS
1635SELECT max(c1), min(c2) FROM schema_2.table_2;",
1636 CreateFlowWoutQuery {
1637 flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_2")]),
1638 sink_table_name: ObjectName::from(vec![
1639 Ident::new("schema_1"),
1640 Ident::new("table_1"),
1641 ]),
1642 or_replace: false,
1643 if_not_exists: false,
1644 expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1645 comment: None,
1646 flow_options: OptionMap::default(),
1647 },
1648 ),
1649 (
1650 r"
1651create flow `task_3`
1652sink to schema_1.table_1
1653expire after '10 minutes'
1654as
1655select max(c1), min(c2) from schema_2.table_2;",
1656 CreateFlowWoutQuery {
1657 flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_3")]),
1658 sink_table_name: ObjectName::from(vec![
1659 Ident::new("schema_1"),
1660 Ident::new("table_1"),
1661 ]),
1662 or_replace: false,
1663 if_not_exists: false,
1664 expire_after: Some(600), comment: None,
1666 flow_options: OptionMap::default(),
1667 },
1668 ),
1669 (
1670 r"
1671create or replace flow if not exists task_4
1672sink to schema_1.table_1
1673expire after interval '2 hours'
1674comment 'lowercase test'
1675as
1676select max(c1), min(c2) from schema_2.table_2;",
1677 CreateFlowWoutQuery {
1678 flow_name: ObjectName::from(vec![Ident::new("task_4")]),
1679 sink_table_name: ObjectName::from(vec![
1680 Ident::new("schema_1"),
1681 Ident::new("table_1"),
1682 ]),
1683 or_replace: true,
1684 if_not_exists: true,
1685 expire_after: Some(7200), comment: Some("lowercase test".to_string()),
1687 flow_options: OptionMap::default(),
1688 },
1689 ),
1690 (
1691 r"
1692CREATE FLOW task_5
1693SINK TO schema_1.table_1
1694WITH (defer_on_missing_source = 'true')
1695AS
1696SELECT max(c1), min(c2) FROM schema_2.table_2;",
1697 CreateFlowWoutQuery {
1698 flow_name: ObjectName::from(vec![Ident::new("task_5")]),
1699 sink_table_name: ObjectName::from(vec![
1700 Ident::new("schema_1"),
1701 Ident::new("table_1"),
1702 ]),
1703 or_replace: false,
1704 if_not_exists: false,
1705 expire_after: None,
1706 comment: None,
1707 flow_options: string_option_map([("defer_on_missing_source", "true")]),
1708 },
1709 ),
1710 ];
1711
1712 for (sql, expected) in testcases {
1713 let create_task = parse_create_flow(sql);
1714
1715 let expected = CreateFlow {
1716 flow_name: expected.flow_name,
1717 sink_table_name: expected.sink_table_name,
1718 or_replace: expected.or_replace,
1719 if_not_exists: expected.if_not_exists,
1720 expire_after: expected.expire_after,
1721 eval_interval: None,
1722 comment: expected.comment,
1723 flow_options: expected.flow_options,
1724 query: create_task.query.clone(),
1726 };
1727
1728 assert_eq!(create_task, expected, "input sql is:\n{sql}");
1729 let show_create = create_task.to_string();
1730 let recreated = parse_create_flow(&show_create);
1731 assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1732 }
1733 }
1734
1735 #[test]
1736 fn test_parse_create_flow() {
1737 use pretty_assertions::assert_eq;
1738 fn parse_create_flow(sql: &str) -> CreateFlow {
1739 let stmts = ParserContext::create_with_dialect(
1740 sql,
1741 &GreptimeDbDialect {},
1742 ParseOptions::default(),
1743 )
1744 .unwrap();
1745 assert_eq!(1, stmts.len());
1746 match &stmts[0] {
1747 Statement::CreateFlow(c) => c.clone(),
1748 _ => panic!("{:?}", stmts[0]),
1749 }
1750 }
1751 struct CreateFlowWoutQuery {
1752 pub flow_name: ObjectName,
1754 pub sink_table_name: ObjectName,
1756 pub or_replace: bool,
1758 pub if_not_exists: bool,
1760 pub expire_after: Option<i64>,
1763 pub eval_interval: Option<i64>,
1767 pub comment: Option<String>,
1769 pub flow_options: OptionMap,
1771 }
1772
1773 let testcases = vec![
1775 (
1776 r"
1777CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1778SINK TO schema_1.table_1
1779EXPIRE AFTER INTERVAL '5 minutes'
1780COMMENT 'test comment'
1781AS
1782SELECT max(c1), min(c2) FROM schema_2.table_2;",
1783 CreateFlowWoutQuery {
1784 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1785 sink_table_name: ObjectName(vec![
1786 ObjectNamePart::Identifier(Ident::new("schema_1")),
1787 ObjectNamePart::Identifier(Ident::new("table_1")),
1788 ]),
1789 or_replace: true,
1790 if_not_exists: true,
1791 expire_after: Some(300),
1792 eval_interval: None,
1793 comment: Some("test comment".to_string()),
1794 flow_options: OptionMap::default(),
1795 },
1796 ),
1797 (
1798 r"
1799CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1800SINK TO schema_1.table_1
1801EXPIRE AFTER INTERVAL '300 s'
1802COMMENT 'test comment'
1803AS
1804SELECT max(c1), min(c2) FROM schema_2.table_2;",
1805 CreateFlowWoutQuery {
1806 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1807 sink_table_name: ObjectName(vec![
1808 ObjectNamePart::Identifier(Ident::new("schema_1")),
1809 ObjectNamePart::Identifier(Ident::new("table_1")),
1810 ]),
1811 or_replace: true,
1812 if_not_exists: true,
1813 expire_after: Some(300),
1814 eval_interval: None,
1815 comment: Some("test comment".to_string()),
1816 flow_options: OptionMap::default(),
1817 },
1818 ),
1819 (
1820 r"
1821CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1822SINK TO schema_1.table_1
1823EXPIRE AFTER '5 minutes'
1824EVAL INTERVAL '10 seconds'
1825COMMENT 'test comment'
1826AS
1827SELECT max(c1), min(c2) FROM schema_2.table_2;",
1828 CreateFlowWoutQuery {
1829 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1830 sink_table_name: ObjectName(vec![
1831 ObjectNamePart::Identifier(Ident::new("schema_1")),
1832 ObjectNamePart::Identifier(Ident::new("table_1")),
1833 ]),
1834 or_replace: true,
1835 if_not_exists: true,
1836 expire_after: Some(300),
1837 eval_interval: Some(10),
1838 comment: Some("test comment".to_string()),
1839 flow_options: OptionMap::default(),
1840 },
1841 ),
1842 (
1843 r"
1844CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1845SINK TO schema_1.table_1
1846EXPIRE AFTER '5 minutes'
1847EVAL INTERVAL INTERVAL '10 seconds'
1848COMMENT 'test comment'
1849AS
1850SELECT max(c1), min(c2) FROM schema_2.table_2;",
1851 CreateFlowWoutQuery {
1852 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1853 sink_table_name: ObjectName(vec![
1854 ObjectNamePart::Identifier(Ident::new("schema_1")),
1855 ObjectNamePart::Identifier(Ident::new("table_1")),
1856 ]),
1857 or_replace: true,
1858 if_not_exists: true,
1859 expire_after: Some(300),
1860 eval_interval: Some(10),
1861 comment: Some("test comment".to_string()),
1862 flow_options: OptionMap::default(),
1863 },
1864 ),
1865 (
1866 r"
1867CREATE FLOW `task_2`
1868SINK TO schema_1.table_1
1869EXPIRE AFTER '2 days 1h 2 min'
1870AS
1871SELECT max(c1), min(c2) FROM schema_2.table_2;",
1872 CreateFlowWoutQuery {
1873 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::with_quote(
1874 '`', "task_2",
1875 ))]),
1876 sink_table_name: ObjectName(vec![
1877 ObjectNamePart::Identifier(Ident::new("schema_1")),
1878 ObjectNamePart::Identifier(Ident::new("table_1")),
1879 ]),
1880 or_replace: false,
1881 if_not_exists: false,
1882 expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1883 eval_interval: None,
1884 comment: None,
1885 flow_options: OptionMap::default(),
1886 },
1887 ),
1888 (
1889 r"
1890CREATE FLOW task_3
1891SINK TO schema_1.table_1
1892EVAL INTERVAL '10 seconds'
1893WITH (defer_on_missing_source = 'true', foo = 'bar')
1894AS
1895SELECT max(c1), min(c2) FROM schema_2.table_2;",
1896 CreateFlowWoutQuery {
1897 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_3"))]),
1898 sink_table_name: ObjectName(vec![
1899 ObjectNamePart::Identifier(Ident::new("schema_1")),
1900 ObjectNamePart::Identifier(Ident::new("table_1")),
1901 ]),
1902 or_replace: false,
1903 if_not_exists: false,
1904 expire_after: None,
1905 eval_interval: Some(10),
1906 comment: None,
1907 flow_options: string_option_map([
1908 ("defer_on_missing_source", "true"),
1909 ("foo", "bar"),
1910 ]),
1911 },
1912 ),
1913 ];
1914
1915 for (sql, expected) in testcases {
1916 let create_task = parse_create_flow(sql);
1917
1918 let expected = CreateFlow {
1919 flow_name: expected.flow_name,
1920 sink_table_name: expected.sink_table_name,
1921 or_replace: expected.or_replace,
1922 if_not_exists: expected.if_not_exists,
1923 expire_after: expected.expire_after,
1924 eval_interval: expected.eval_interval,
1925 comment: expected.comment,
1926 flow_options: expected.flow_options,
1927 query: create_task.query.clone(),
1929 };
1930
1931 assert_eq!(create_task, expected, "input sql is:\n{sql}");
1932 let show_create = create_task.to_string();
1933 let recreated = parse_create_flow(&show_create);
1934 assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1935 }
1936 }
1937
1938 #[test]
1939 fn test_parse_create_flow_with_tql_cte_query() {
1940 let sql = r#"
1941CREATE FLOW calc_reqs_cte
1942SINK TO cnt_reqs_cte
1943EVAL INTERVAL '1m'
1944AS
1945WITH tql(the_timestamp, the_value) AS (
1946 TQL EVAL (now() - '1m'::interval, now(), '5s') metric
1947)
1948SELECT * FROM tql;
1949"#;
1950
1951 let stmts =
1952 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1953 .unwrap();
1954 assert_eq!(1, stmts.len());
1955 let Statement::CreateFlow(create_flow) = &stmts[0] else {
1956 panic!("unexpected stmt: {:?}", stmts[0]);
1957 };
1958
1959 let query = create_flow.query.to_string();
1960 assert!(query.to_uppercase().contains("WITH"));
1961 assert!(query.to_uppercase().contains("TQL EVAL"));
1962 }
1963
1964 #[test]
1965 fn test_parse_create_flow_with_sql_cte_is_supported() {
1966 let sql = r#"
1967CREATE FLOW f
1968SINK TO s
1969AS
1970WITH cte AS (SELECT 1) SELECT * FROM cte;
1971"#;
1972
1973 let stmts =
1974 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1975 .unwrap();
1976 assert_eq!(1, stmts.len());
1977 let Statement::CreateFlow(create_flow) = &stmts[0] else {
1978 panic!("unexpected stmt: {:?}", stmts[0]);
1979 };
1980 assert_eq!(
1981 "WITH cte AS (SELECT 1) SELECT * FROM cte",
1982 create_flow.query.to_string()
1983 );
1984 }
1985
1986 #[test]
1987 fn test_parse_create_flow_with_tql_cte_requires_now_expr() {
1988 let sql = r#"
1989CREATE FLOW f
1990SINK TO s
1991EVAL INTERVAL '1m'
1992AS
1993WITH tql(ts, val) AS (
1994 TQL EVAL (0, 15, '5s') metric
1995)
1996SELECT * FROM tql;
1997"#;
1998
1999 let err =
2000 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2001 .unwrap_err();
2002
2003 let msg = format!("{err:?}");
2004 assert!(
2005 msg.contains("Expected expression containing `now()`"),
2006 "unexpected err: {msg}"
2007 );
2008 }
2009
2010 #[test]
2011 fn test_parse_create_flow_with_tql_cte_non_select_star_is_unsupported() {
2012 let sql = r#"
2013CREATE FLOW f
2014SINK TO s
2015AS
2016WITH tql(ts, val) AS (
2017 TQL EVAL (now() - '1m'::interval, now(), '5s') metric
2018)
2019SELECT ts FROM tql;
2020"#;
2021
2022 let err =
2023 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2024 .unwrap_err();
2025 assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
2026 }
2027
2028 #[test]
2029 fn test_parse_create_flow_with_tql_cte_filter_is_unsupported() {
2030 let sql = r#"
2031CREATE FLOW f
2032SINK TO s
2033AS
2034WITH tql(ts, val) AS (
2035 TQL EVAL (now() - '1m'::interval, now(), '5s') metric
2036)
2037SELECT * FROM tql WHERE ts > 0;
2038"#;
2039
2040 let err =
2041 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2042 .unwrap_err();
2043 assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
2044 }
2045
2046 #[test]
2047 fn test_parse_create_flow_with_mixed_sql_tql_cte_is_unsupported() {
2048 let sql = r#"
2049CREATE FLOW f
2050SINK TO s
2051AS
2052WITH s1 AS (SELECT 1),
2053 tql(ts, val) AS (TQL EVAL (now() - '1m'::interval, now(), '5s') metric)
2054SELECT * FROM tql;
2055"#;
2056
2057 let err =
2058 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2059 .unwrap_err();
2060 assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
2061 }
2062
2063 #[test]
2064 fn test_create_flow_no_month() {
2065 let sql = r"
2066CREATE FLOW `task_2`
2067SINK TO schema_1.table_1
2068EXPIRE AFTER '1 month 2 days 1h 2 min'
2069AS
2070SELECT max(c1), min(c2) FROM schema_2.table_2;";
2071 let stmts =
2072 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2073
2074 assert!(
2075 stmts.is_err()
2076 && stmts
2077 .unwrap_err()
2078 .to_string()
2079 .contains("Interval with months is not allowed")
2080 );
2081 }
2082
2083 #[test]
2084 fn test_validate_create() {
2085 let sql = r"
2086CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp TIME INDEX)
2087PARTITION ON COLUMNS(c, a) (
2088 a < 10,
2089 a > 10 AND a < 20,
2090 a > 20 AND c < 100,
2091 a > 20 AND c >= 100
2092)
2093ENGINE=mito";
2094 let result =
2095 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2096 let _ = result.unwrap();
2097
2098 let sql = r"
2099CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2100PARTITION ON COLUMNS(x) ()
2101ENGINE=mito";
2102 let result =
2103 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2104 assert!(
2105 result
2106 .unwrap_err()
2107 .to_string()
2108 .contains("Partition column \"x\" not defined")
2109 );
2110 }
2111
2112 #[test]
2113 fn test_parse_create_table_with_partitions() {
2114 let sql = r"
2115CREATE TABLE monitor (
2116 host_id INT,
2117 idc STRING,
2118 ts TIMESTAMP,
2119 cpu DOUBLE DEFAULT 0,
2120 memory DOUBLE,
2121 TIME INDEX (ts),
2122 PRIMARY KEY (host),
2123)
2124PARTITION ON COLUMNS(idc, host_id) (
2125 idc <= 'hz' AND host_id < 1000,
2126 idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
2127 idc > 'sh' AND host_id < 3000,
2128 idc > 'sh' AND host_id >= 3000,
2129)
2130ENGINE=mito";
2131 let result =
2132 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2133 .unwrap();
2134 assert_eq!(result.len(), 1);
2135 match &result[0] {
2136 Statement::CreateTable(c) => {
2137 assert!(c.partitions.is_some());
2138
2139 let partitions = c.partitions.as_ref().unwrap();
2140 let column_list = partitions
2141 .column_list
2142 .iter()
2143 .map(|x| &x.value)
2144 .collect::<Vec<&String>>();
2145 assert_eq!(column_list, vec!["idc", "host_id"]);
2146
2147 let exprs = &partitions.exprs;
2148
2149 assert_eq!(
2150 exprs[0],
2151 Expr::BinaryOp {
2152 left: Box::new(Expr::BinaryOp {
2153 left: Box::new(Expr::Identifier("idc".into())),
2154 op: BinaryOperator::LtEq,
2155 right: Box::new(Expr::Value(
2156 Value::SingleQuotedString("hz".to_string()).into()
2157 ))
2158 }),
2159 op: BinaryOperator::And,
2160 right: Box::new(Expr::BinaryOp {
2161 left: Box::new(Expr::Identifier("host_id".into())),
2162 op: BinaryOperator::Lt,
2163 right: Box::new(Expr::Value(
2164 Value::Number("1000".to_string(), false).into()
2165 ))
2166 })
2167 }
2168 );
2169 assert_eq!(
2170 exprs[1],
2171 Expr::BinaryOp {
2172 left: Box::new(Expr::BinaryOp {
2173 left: Box::new(Expr::BinaryOp {
2174 left: Box::new(Expr::Identifier("idc".into())),
2175 op: BinaryOperator::Gt,
2176 right: Box::new(Expr::Value(
2177 Value::SingleQuotedString("hz".to_string()).into()
2178 ))
2179 }),
2180 op: BinaryOperator::And,
2181 right: Box::new(Expr::BinaryOp {
2182 left: Box::new(Expr::Identifier("idc".into())),
2183 op: BinaryOperator::LtEq,
2184 right: Box::new(Expr::Value(
2185 Value::SingleQuotedString("sh".to_string()).into()
2186 ))
2187 })
2188 }),
2189 op: BinaryOperator::And,
2190 right: Box::new(Expr::BinaryOp {
2191 left: Box::new(Expr::Identifier("host_id".into())),
2192 op: BinaryOperator::Lt,
2193 right: Box::new(Expr::Value(
2194 Value::Number("2000".to_string(), false).into()
2195 ))
2196 })
2197 }
2198 );
2199 assert_eq!(
2200 exprs[2],
2201 Expr::BinaryOp {
2202 left: Box::new(Expr::BinaryOp {
2203 left: Box::new(Expr::Identifier("idc".into())),
2204 op: BinaryOperator::Gt,
2205 right: Box::new(Expr::Value(
2206 Value::SingleQuotedString("sh".to_string()).into()
2207 ))
2208 }),
2209 op: BinaryOperator::And,
2210 right: Box::new(Expr::BinaryOp {
2211 left: Box::new(Expr::Identifier("host_id".into())),
2212 op: BinaryOperator::Lt,
2213 right: Box::new(Expr::Value(
2214 Value::Number("3000".to_string(), false).into()
2215 ))
2216 })
2217 }
2218 );
2219 assert_eq!(
2220 exprs[3],
2221 Expr::BinaryOp {
2222 left: Box::new(Expr::BinaryOp {
2223 left: Box::new(Expr::Identifier("idc".into())),
2224 op: BinaryOperator::Gt,
2225 right: Box::new(Expr::Value(
2226 Value::SingleQuotedString("sh".to_string()).into()
2227 ))
2228 }),
2229 op: BinaryOperator::And,
2230 right: Box::new(Expr::BinaryOp {
2231 left: Box::new(Expr::Identifier("host_id".into())),
2232 op: BinaryOperator::GtEq,
2233 right: Box::new(Expr::Value(
2234 Value::Number("3000".to_string(), false).into()
2235 ))
2236 })
2237 }
2238 );
2239 }
2240 _ => unreachable!(),
2241 }
2242 }
2243
2244 #[test]
2245 fn test_parse_create_table_with_quoted_partitions() {
2246 let sql = r"
2247CREATE TABLE monitor (
2248 `host_id` INT,
2249 idc STRING,
2250 ts TIMESTAMP,
2251 cpu DOUBLE DEFAULT 0,
2252 memory DOUBLE,
2253 TIME INDEX (ts),
2254 PRIMARY KEY (host),
2255)
2256PARTITION ON COLUMNS(IdC, host_id) (
2257 idc <= 'hz' AND host_id < 1000,
2258 idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
2259 idc > 'sh' AND host_id < 3000,
2260 idc > 'sh' AND host_id >= 3000,
2261)";
2262 let result =
2263 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2264 .unwrap();
2265 assert_eq!(result.len(), 1);
2266 }
2267
2268 #[test]
2269 fn test_parse_create_table_with_timestamp_index() {
2270 let sql1 = r"
2271CREATE TABLE monitor (
2272 host_id INT,
2273 idc STRING,
2274 ts TIMESTAMP TIME INDEX,
2275 cpu DOUBLE DEFAULT 0,
2276 memory DOUBLE,
2277 PRIMARY KEY (host),
2278)
2279ENGINE=mito";
2280 let result1 = ParserContext::create_with_dialect(
2281 sql1,
2282 &GreptimeDbDialect {},
2283 ParseOptions::default(),
2284 )
2285 .unwrap();
2286
2287 if let Statement::CreateTable(c) = &result1[0] {
2288 assert_eq!(c.constraints.len(), 2);
2289 let tc = c.constraints[0].clone();
2290 match tc {
2291 TableConstraint::TimeIndex { column } => {
2292 assert_eq!(&column.value, "ts");
2293 }
2294 _ => panic!("should be time index constraint"),
2295 };
2296 } else {
2297 panic!("should be create_table statement");
2298 }
2299
2300 let sql2 = r"
2303CREATE TABLE monitor (
2304 host_id INT,
2305 idc STRING,
2306 ts TIMESTAMP NOT NULL,
2307 cpu DOUBLE DEFAULT 0,
2308 memory DOUBLE,
2309 TIME INDEX (ts),
2310 PRIMARY KEY (host),
2311)
2312ENGINE=mito";
2313 let result2 = ParserContext::create_with_dialect(
2314 sql2,
2315 &GreptimeDbDialect {},
2316 ParseOptions::default(),
2317 )
2318 .unwrap();
2319
2320 assert_eq!(result1, result2);
2321
2322 let sql3 = r"
2324CREATE TABLE monitor (
2325 host_id INT,
2326 idc STRING,
2327 ts TIMESTAMP,
2328 cpu DOUBLE DEFAULT 0,
2329 memory DOUBLE,
2330 TIME INDEX (ts),
2331 PRIMARY KEY (host),
2332)
2333ENGINE=mito";
2334
2335 let result3 = ParserContext::create_with_dialect(
2336 sql3,
2337 &GreptimeDbDialect {},
2338 ParseOptions::default(),
2339 )
2340 .unwrap();
2341
2342 assert_ne!(result1, result3);
2343
2344 let sql1 = r"
2346CREATE TABLE monitor (
2347 host_id INT,
2348 idc STRING,
2349 b bigint TIME INDEX,
2350 cpu DOUBLE DEFAULT 0,
2351 memory DOUBLE,
2352 PRIMARY KEY (host),
2353)
2354ENGINE=mito";
2355 let result1 = ParserContext::create_with_dialect(
2356 sql1,
2357 &GreptimeDbDialect {},
2358 ParseOptions::default(),
2359 );
2360
2361 assert!(
2362 result1
2363 .unwrap_err()
2364 .to_string()
2365 .contains("time index column data type should be timestamp")
2366 );
2367 }
2368
2369 #[test]
2370 fn test_parse_create_table_with_timestamp_index_not_null() {
2371 let sql = r"
2372CREATE TABLE monitor (
2373 host_id INT,
2374 idc STRING,
2375 ts TIMESTAMP TIME INDEX,
2376 cpu DOUBLE DEFAULT 0,
2377 memory DOUBLE,
2378 TIME INDEX (ts),
2379 PRIMARY KEY (host),
2380)
2381ENGINE=mito";
2382 let result =
2383 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2384 .unwrap();
2385
2386 assert_eq!(result.len(), 1);
2387 if let Statement::CreateTable(c) = &result[0] {
2388 let ts = c.columns[2].clone();
2389 assert_eq!(ts.name().to_string(), "ts");
2390 assert_eq!(ts.options()[0].option, NotNull);
2391 } else {
2392 panic!("should be create table statement");
2393 }
2394
2395 let sql1 = r"
2396CREATE TABLE monitor (
2397 host_id INT,
2398 idc STRING,
2399 ts TIMESTAMP NOT NULL TIME INDEX,
2400 cpu DOUBLE DEFAULT 0,
2401 memory DOUBLE,
2402 TIME INDEX (ts),
2403 PRIMARY KEY (host),
2404)
2405ENGINE=mito";
2406
2407 let result1 = ParserContext::create_with_dialect(
2408 sql1,
2409 &GreptimeDbDialect {},
2410 ParseOptions::default(),
2411 )
2412 .unwrap();
2413 assert_eq!(result, result1);
2414
2415 let sql2 = r"
2416CREATE TABLE monitor (
2417 host_id INT,
2418 idc STRING,
2419 ts TIMESTAMP TIME INDEX NOT NULL,
2420 cpu DOUBLE DEFAULT 0,
2421 memory DOUBLE,
2422 TIME INDEX (ts),
2423 PRIMARY KEY (host),
2424)
2425ENGINE=mito";
2426
2427 let result2 = ParserContext::create_with_dialect(
2428 sql2,
2429 &GreptimeDbDialect {},
2430 ParseOptions::default(),
2431 )
2432 .unwrap();
2433 assert_eq!(result, result2);
2434
2435 let sql3 = r"
2436CREATE TABLE monitor (
2437 host_id INT,
2438 idc STRING,
2439 ts TIMESTAMP TIME INDEX NULL NOT,
2440 cpu DOUBLE DEFAULT 0,
2441 memory DOUBLE,
2442 TIME INDEX (ts),
2443 PRIMARY KEY (host),
2444)
2445ENGINE=mito";
2446
2447 let result3 = ParserContext::create_with_dialect(
2448 sql3,
2449 &GreptimeDbDialect {},
2450 ParseOptions::default(),
2451 );
2452 assert!(result3.is_err());
2453
2454 let sql4 = r"
2455CREATE TABLE monitor (
2456 host_id INT,
2457 idc STRING,
2458 ts TIMESTAMP TIME INDEX NOT NULL NULL,
2459 cpu DOUBLE DEFAULT 0,
2460 memory DOUBLE,
2461 TIME INDEX (ts),
2462 PRIMARY KEY (host),
2463)
2464ENGINE=mito";
2465
2466 let result4 = ParserContext::create_with_dialect(
2467 sql4,
2468 &GreptimeDbDialect {},
2469 ParseOptions::default(),
2470 );
2471 assert!(result4.is_err());
2472
2473 let sql = r"
2474CREATE TABLE monitor (
2475 host_id INT,
2476 idc STRING,
2477 ts TIMESTAMP TIME INDEX DEFAULT CURRENT_TIMESTAMP,
2478 cpu DOUBLE DEFAULT 0,
2479 memory DOUBLE,
2480 TIME INDEX (ts),
2481 PRIMARY KEY (host),
2482)
2483ENGINE=mito";
2484
2485 let result =
2486 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2487 .unwrap();
2488
2489 if let Statement::CreateTable(c) = &result[0] {
2490 let tc = c.constraints[0].clone();
2491 match tc {
2492 TableConstraint::TimeIndex { column } => {
2493 assert_eq!(&column.value, "ts");
2494 }
2495 _ => panic!("should be time index constraint"),
2496 }
2497 let ts = c.columns[2].clone();
2498 assert_eq!(ts.name().to_string(), "ts");
2499 assert!(matches!(ts.options()[0].option, ColumnOption::Default(..)));
2500 assert_eq!(ts.options()[1].option, NotNull);
2501 } else {
2502 unreachable!("should be create table statement");
2503 }
2504 }
2505
2506 #[test]
2507 fn test_parse_partitions_with_error_syntax() {
2508 let sql = r"
2509CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2510PARTITION COLUMNS(c, a) (
2511 a < 10,
2512 a > 10 AND a < 20,
2513 a > 20 AND c < 100,
2514 a > 20 AND c >= 100
2515)
2516ENGINE=mito";
2517 let result =
2518 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2519 assert!(
2520 result
2521 .unwrap_err()
2522 .output_msg()
2523 .contains("sql parser error: Expected: ON, found: COLUMNS")
2524 );
2525 }
2526
2527 #[test]
2528 fn test_parse_partitions_without_rule() {
2529 let sql = r"
2530CREATE TABLE rcx ( a INT, b STRING, c INT, d TIMESTAMP TIME INDEX )
2531PARTITION ON COLUMNS(c, a) ()
2532ENGINE=mito";
2533 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2534 .unwrap();
2535 }
2536
2537 #[test]
2538 fn test_parse_partitions_unreferenced_column() {
2539 let sql = r"
2540CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2541PARTITION ON COLUMNS(c, a) (
2542 b = 'foo'
2543)
2544ENGINE=mito";
2545 let result =
2546 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2547 assert_eq!(
2548 result.unwrap_err().output_msg(),
2549 "Invalid SQL, error: Column \"b\" in rule expr is not referenced in PARTITION ON"
2550 );
2551 }
2552
2553 #[test]
2554 fn test_parse_partitions_not_binary_expr() {
2555 let sql = r"
2556CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2557PARTITION ON COLUMNS(c, a) (
2558 b
2559)
2560ENGINE=mito";
2561 let result =
2562 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2563 assert_eq!(
2564 result.unwrap_err().output_msg(),
2565 r#"Invalid SQL, error: Partition rule expr Identifier(Ident { value: "b", quote_style: None, span: Span(Location(4,5)..Location(4,6)) }) is not a binary expr"#
2566 );
2567 }
2568
2569 fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {
2570 assert_eq!(column.name.to_string(), name);
2571 assert_eq!(column.data_type.to_string(), data_type);
2572 }
2573
2574 #[test]
2575 pub fn test_parse_create_table() {
2576 let sql = r"create table demo(
2577 host string,
2578 ts timestamp,
2579 cpu float32 default 0,
2580 memory float64,
2581 TIME INDEX (ts),
2582 PRIMARY KEY(ts, host),
2583 ) engine=mito
2584 with(ttl='10s');
2585 ";
2586 let result =
2587 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2588 .unwrap();
2589 assert_eq!(1, result.len());
2590 match &result[0] {
2591 Statement::CreateTable(c) => {
2592 assert!(!c.if_not_exists);
2593 assert_eq!("demo", c.name.to_string());
2594 assert_eq!("mito", c.engine);
2595 assert_eq!(4, c.columns.len());
2596 let columns = &c.columns;
2597 assert_column_def(&columns[0].column_def, "host", "STRING");
2598 assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
2599 assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
2600 assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
2601
2602 let constraints = &c.constraints;
2603 assert_eq!(
2604 &constraints[0],
2605 &TableConstraint::TimeIndex {
2606 column: Ident::new("ts"),
2607 }
2608 );
2609 assert_eq!(
2610 &constraints[1],
2611 &TableConstraint::PrimaryKey {
2612 columns: vec![Ident::new("ts"), Ident::new("host")]
2613 }
2614 );
2615 assert_eq!(1, c.options.len());
2617 assert_eq!(
2618 [("ttl", "10s")].into_iter().collect::<HashMap<_, _>>(),
2619 c.options.to_str_map()
2620 );
2621 }
2622 _ => unreachable!(),
2623 }
2624 }
2625
2626 #[test]
2627 fn test_invalid_index_keys() {
2628 let sql = r"create table demo(
2629 host string,
2630 ts int64,
2631 cpu float64 default 0,
2632 memory float64,
2633 TIME INDEX (ts, host),
2634 PRIMARY KEY(ts, host)) engine=mito;
2635 ";
2636 let result =
2637 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2638 assert!(result.is_err());
2639 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2640 }
2641
2642 #[test]
2643 fn test_duplicated_time_index() {
2644 let sql = r"create table demo(
2645 host string,
2646 ts timestamp time index,
2647 t timestamp time index,
2648 cpu float64 default 0,
2649 memory float64,
2650 TIME INDEX (ts, host),
2651 PRIMARY KEY(ts, host)) engine=mito;
2652 ";
2653 let result =
2654 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2655 assert!(result.is_err());
2656 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2657
2658 let sql = r"create table demo(
2659 host string,
2660 ts timestamp time index,
2661 cpu float64 default 0,
2662 t timestamp,
2663 memory float64,
2664 TIME INDEX (t),
2665 PRIMARY KEY(ts, host)) engine=mito;
2666 ";
2667 let result =
2668 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2669 assert!(result.is_err());
2670 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2671 }
2672
2673 #[test]
2674 fn test_invalid_column_name() {
2675 let sql = "create table foo(user string, i timestamp time index)";
2676 let result =
2677 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2678 let err = result.unwrap_err().output_msg();
2679 assert!(err.contains("Cannot use keyword 'user' as column name"));
2680
2681 let sql = r#"
2683 create table foo("user" string, i timestamp time index)
2684 "#;
2685 let result =
2686 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2687 let _ = result.unwrap();
2688 }
2689
2690 #[test]
2691 fn test_incorrect_default_value_issue_3479() {
2692 let sql = r#"CREATE TABLE `ExcePTuRi`(
2693non TIMESTAMP(6) TIME INDEX,
2694`iUSTO` DOUBLE DEFAULT 0.047318541668048164
2695)"#;
2696 let result =
2697 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2698 .unwrap();
2699 assert_eq!(1, result.len());
2700 match &result[0] {
2701 Statement::CreateTable(c) => {
2702 assert_eq!(
2703 "`iUSTO` DOUBLE DEFAULT 0.047318541668048164",
2704 c.columns[1].to_string()
2705 );
2706 }
2707 _ => unreachable!(),
2708 }
2709 }
2710
2711 #[test]
2712 fn test_parse_create_view() {
2713 let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
2714
2715 let result =
2716 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2717 .unwrap();
2718 match &result[0] {
2719 Statement::CreateView(c) => {
2720 assert_eq!(c.to_string(), sql);
2721 assert!(!c.or_replace);
2722 assert!(!c.if_not_exists);
2723 assert_eq!("test", c.name.to_string());
2724 }
2725 _ => unreachable!(),
2726 }
2727
2728 let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS";
2729
2730 let result =
2731 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2732 .unwrap();
2733 match &result[0] {
2734 Statement::CreateView(c) => {
2735 assert_eq!(c.to_string(), sql);
2736 assert!(c.or_replace);
2737 assert!(c.if_not_exists);
2738 assert_eq!("test", c.name.to_string());
2739 }
2740 _ => unreachable!(),
2741 }
2742 }
2743
2744 #[test]
2745 fn test_parse_create_view_invalid_query() {
2746 let sql = "CREATE VIEW test AS DELETE from demo";
2747 let result =
2748 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2749 assert!(result.is_ok_and(|x| x.len() == 1));
2750 }
2751
2752 #[test]
2753 fn test_parse_create_table_fulltext_options() {
2754 let sql1 = r"
2755CREATE TABLE log (
2756 ts TIMESTAMP TIME INDEX,
2757 msg TEXT FULLTEXT INDEX,
2758)";
2759 let result1 = ParserContext::create_with_dialect(
2760 sql1,
2761 &GreptimeDbDialect {},
2762 ParseOptions::default(),
2763 )
2764 .unwrap();
2765
2766 if let Statement::CreateTable(c) = &result1[0] {
2767 c.columns.iter().for_each(|col| {
2768 if col.name().value == "msg" {
2769 assert!(
2770 col.extensions
2771 .fulltext_index_options
2772 .as_ref()
2773 .unwrap()
2774 .is_empty()
2775 );
2776 }
2777 });
2778 } else {
2779 panic!("should be create_table statement");
2780 }
2781
2782 let sql2 = r"
2783CREATE TABLE log (
2784 ts TIMESTAMP TIME INDEX,
2785 msg STRING FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false')
2786)";
2787 let result2 = ParserContext::create_with_dialect(
2788 sql2,
2789 &GreptimeDbDialect {},
2790 ParseOptions::default(),
2791 )
2792 .unwrap();
2793
2794 if let Statement::CreateTable(c) = &result2[0] {
2795 c.columns.iter().for_each(|col| {
2796 if col.name().value == "msg" {
2797 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2798 assert_eq!(options.len(), 2);
2799 assert_eq!(options.get("analyzer").unwrap(), "English");
2800 assert_eq!(options.get("case_sensitive").unwrap(), "false");
2801 }
2802 });
2803 } else {
2804 panic!("should be create_table statement");
2805 }
2806
2807 let sql3 = r"
2808CREATE TABLE log (
2809 ts TIMESTAMP TIME INDEX,
2810 msg1 TINYTEXT FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false'),
2811 msg2 CHAR(20) FULLTEXT INDEX WITH (analyzer='Chinese', case_sensitive='true')
2812)";
2813 let result3 = ParserContext::create_with_dialect(
2814 sql3,
2815 &GreptimeDbDialect {},
2816 ParseOptions::default(),
2817 )
2818 .unwrap();
2819
2820 if let Statement::CreateTable(c) = &result3[0] {
2821 c.columns.iter().for_each(|col| {
2822 if col.name().value == "msg1" {
2823 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2824 assert_eq!(options.len(), 2);
2825 assert_eq!(options.get("analyzer").unwrap(), "English");
2826 assert_eq!(options.get("case_sensitive").unwrap(), "false");
2827 } else if col.name().value == "msg2" {
2828 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2829 assert_eq!(options.len(), 2);
2830 assert_eq!(options.get("analyzer").unwrap(), "Chinese");
2831 assert_eq!(options.get("case_sensitive").unwrap(), "true");
2832 }
2833 });
2834 } else {
2835 panic!("should be create_table statement");
2836 }
2837 }
2838
2839 #[test]
2840 fn test_parse_create_table_fulltext_options_invalid_type() {
2841 let sql = r"
2842CREATE TABLE log (
2843 ts TIMESTAMP TIME INDEX,
2844 msg INT FULLTEXT INDEX,
2845)";
2846 let result =
2847 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2848 assert!(result.is_err());
2849 assert!(
2850 result
2851 .unwrap_err()
2852 .to_string()
2853 .contains("FULLTEXT index only supports string type")
2854 );
2855 }
2856
2857 #[test]
2858 fn test_parse_create_table_fulltext_options_duplicate() {
2859 let sql = r"
2860CREATE TABLE log (
2861 ts TIMESTAMP TIME INDEX,
2862 msg STRING FULLTEXT INDEX WITH (analyzer='English', analyzer='Chinese') FULLTEXT INDEX WITH (case_sensitive='false')
2863)";
2864 let result =
2865 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2866 assert!(result.is_err());
2867 assert!(
2868 result
2869 .unwrap_err()
2870 .to_string()
2871 .contains("duplicated FULLTEXT INDEX option")
2872 );
2873 }
2874
2875 #[test]
2876 fn test_parse_create_table_fulltext_options_invalid_option() {
2877 let sql = r"
2878CREATE TABLE log (
2879 ts TIMESTAMP TIME INDEX,
2880 msg STRING FULLTEXT INDEX WITH (analyzer='English', invalid_option='Chinese')
2881)";
2882 let result =
2883 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2884 assert!(result.is_err());
2885 assert!(
2886 result
2887 .unwrap_err()
2888 .to_string()
2889 .contains("invalid FULLTEXT INDEX option")
2890 );
2891 }
2892
2893 #[test]
2894 fn test_parse_create_table_skip_options() {
2895 let sql = r"
2896CREATE TABLE log (
2897 ts TIMESTAMP TIME INDEX,
2898 msg INT SKIPPING INDEX WITH (granularity='8192', type='bloom'),
2899)";
2900 let result =
2901 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2902 .unwrap();
2903
2904 if let Statement::CreateTable(c) = &result[0] {
2905 c.columns.iter().for_each(|col| {
2906 if col.name().value == "msg" {
2907 assert!(
2908 !col.extensions
2909 .skipping_index_options
2910 .as_ref()
2911 .unwrap()
2912 .is_empty()
2913 );
2914 }
2915 });
2916 } else {
2917 panic!("should be create_table statement");
2918 }
2919
2920 let sql = r"
2921 CREATE TABLE log (
2922 ts TIMESTAMP TIME INDEX,
2923 msg INT SKIPPING INDEX,
2924 )";
2925 let result =
2926 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2927 .unwrap();
2928
2929 if let Statement::CreateTable(c) = &result[0] {
2930 c.columns.iter().for_each(|col| {
2931 if col.name().value == "msg" {
2932 assert!(
2933 col.extensions
2934 .skipping_index_options
2935 .as_ref()
2936 .unwrap()
2937 .is_empty()
2938 );
2939 }
2940 });
2941 } else {
2942 panic!("should be create_table statement");
2943 }
2944 }
2945
2946 #[test]
2947 fn test_parse_create_view_with_columns() {
2948 let sql = "CREATE VIEW test () AS SELECT * FROM NUMBERS";
2949 let result =
2950 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2951 .unwrap();
2952
2953 match &result[0] {
2954 Statement::CreateView(c) => {
2955 assert_eq!(c.to_string(), "CREATE VIEW test AS SELECT * FROM NUMBERS");
2956 assert!(!c.or_replace);
2957 assert!(!c.if_not_exists);
2958 assert_eq!("test", c.name.to_string());
2959 }
2960 _ => unreachable!(),
2961 }
2962 assert_eq!(
2963 "CREATE VIEW test AS SELECT * FROM NUMBERS",
2964 result[0].to_string()
2965 );
2966
2967 let sql = "CREATE VIEW test (n1) AS SELECT * FROM NUMBERS";
2968 let result =
2969 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2970 .unwrap();
2971
2972 match &result[0] {
2973 Statement::CreateView(c) => {
2974 assert_eq!(c.to_string(), sql);
2975 assert!(!c.or_replace);
2976 assert!(!c.if_not_exists);
2977 assert_eq!("test", c.name.to_string());
2978 }
2979 _ => unreachable!(),
2980 }
2981 assert_eq!(sql, result[0].to_string());
2982
2983 let sql = "CREATE VIEW test (n1, n2) AS SELECT * FROM NUMBERS";
2984 let result =
2985 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2986 .unwrap();
2987
2988 match &result[0] {
2989 Statement::CreateView(c) => {
2990 assert_eq!(c.to_string(), sql);
2991 assert!(!c.or_replace);
2992 assert!(!c.if_not_exists);
2993 assert_eq!("test", c.name.to_string());
2994 }
2995 _ => unreachable!(),
2996 }
2997 assert_eq!(sql, result[0].to_string());
2998
2999 let sql = "CREATE VIEW test (n1 AS select * from demo";
3001 let result =
3002 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3003 assert!(result.is_err());
3004
3005 let sql = "CREATE VIEW test (n1, AS select * from demo";
3006 let result =
3007 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3008 assert!(result.is_err());
3009
3010 let sql = "CREATE VIEW test n1,n2) AS select * from demo";
3011 let result =
3012 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3013 assert!(result.is_err());
3014
3015 let sql = "CREATE VIEW test (1) AS select * from demo";
3016 let result =
3017 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3018 assert!(result.is_err());
3019
3020 let sql = "CREATE VIEW test (n1, select) AS select * from demo";
3022 let result =
3023 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3024 assert!(result.is_err());
3025 }
3026
3027 #[test]
3028 fn test_parse_column_extensions_vector() {
3029 let sql = "";
3031 let dialect = GenericDialect {};
3032 let mut tokenizer = Tokenizer::new(&dialect, sql);
3033 let tokens = tokenizer.tokenize().unwrap();
3034 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3035 let name = Ident::new("vec_col");
3036 let data_type =
3037 DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
3038 let mut extensions = ColumnExtensions::default();
3039
3040 let result =
3041 ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
3042 assert!(result.is_ok());
3043 assert!(extensions.vector_options.is_some());
3044 let vector_options = extensions.vector_options.unwrap();
3045 assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some("128"));
3046 }
3047
3048 #[test]
3049 fn test_parse_column_extensions_vector_invalid() {
3050 let sql = "";
3052 let dialect = GenericDialect {};
3053 let mut tokenizer = Tokenizer::new(&dialect, sql);
3054 let tokens = tokenizer.tokenize().unwrap();
3055 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3056 let name = Ident::new("vec_col");
3057 let data_type = DataType::Custom(vec![Ident::new("VECTOR")].into(), vec![]);
3058 let mut extensions = ColumnExtensions::default();
3059
3060 let result =
3061 ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
3062 assert!(result.is_err());
3063 }
3064
3065 #[test]
3066 fn test_parse_column_extensions_indices() {
3067 {
3069 let sql = "SKIPPING INDEX";
3070 let dialect = GenericDialect {};
3071 let mut tokenizer = Tokenizer::new(&dialect, sql);
3072 let tokens = tokenizer.tokenize().unwrap();
3073 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3074 let name = Ident::new("col");
3075 let data_type = DataType::String(None);
3076 let mut extensions = ColumnExtensions::default();
3077 let result = ParserContext::parse_column_extensions(
3078 &mut parser,
3079 &name,
3080 &data_type,
3081 &mut extensions,
3082 );
3083 assert!(result.is_ok());
3084 assert!(extensions.skipping_index_options.is_some());
3085 }
3086
3087 {
3089 let sql = "FULLTEXT INDEX WITH (analyzer = 'English', case_sensitive = 'true')";
3090 let dialect = GenericDialect {};
3091 let mut tokenizer = Tokenizer::new(&dialect, sql);
3092 let tokens = tokenizer.tokenize().unwrap();
3093 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3094 let name = Ident::new("text_col");
3095 let data_type = DataType::String(None);
3096 let mut extensions = ColumnExtensions::default();
3097 let result = ParserContext::parse_column_extensions(
3098 &mut parser,
3099 &name,
3100 &data_type,
3101 &mut extensions,
3102 );
3103 assert!(result.unwrap());
3104 assert!(extensions.fulltext_index_options.is_some());
3105 let fulltext_options = extensions.fulltext_index_options.unwrap();
3106 assert_eq!(fulltext_options.get("analyzer"), Some("English"));
3107 assert_eq!(fulltext_options.get("case_sensitive"), Some("true"));
3108 }
3109
3110 {
3112 let sql = "FULLTEXT INDEX WITH (analyzer = 'English')";
3113 let dialect = GenericDialect {};
3114 let mut tokenizer = Tokenizer::new(&dialect, sql);
3115 let tokens = tokenizer.tokenize().unwrap();
3116 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3117 let name = Ident::new("num_col");
3118 let data_type = DataType::Int(None); let mut extensions = ColumnExtensions::default();
3120 let result = ParserContext::parse_column_extensions(
3121 &mut parser,
3122 &name,
3123 &data_type,
3124 &mut extensions,
3125 );
3126 assert!(result.is_err());
3127 assert!(
3128 result
3129 .unwrap_err()
3130 .to_string()
3131 .contains("FULLTEXT index only supports string type")
3132 );
3133 }
3134
3135 {
3137 let sql = "FULLTEXT INDEX WITH (analyzer = 'Invalid', case_sensitive = 'true')";
3138 let dialect = GenericDialect {};
3139 let mut tokenizer = Tokenizer::new(&dialect, sql);
3140 let tokens = tokenizer.tokenize().unwrap();
3141 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3142 let name = Ident::new("text_col");
3143 let data_type = DataType::String(None);
3144 let mut extensions = ColumnExtensions::default();
3145 let result = ParserContext::parse_column_extensions(
3146 &mut parser,
3147 &name,
3148 &data_type,
3149 &mut extensions,
3150 );
3151 assert!(result.unwrap());
3152 }
3153
3154 {
3156 let sql = "INVERTED INDEX";
3157 let dialect = GenericDialect {};
3158 let mut tokenizer = Tokenizer::new(&dialect, sql);
3159 let tokens = tokenizer.tokenize().unwrap();
3160 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3161 let name = Ident::new("col");
3162 let data_type = DataType::String(None);
3163 let mut extensions = ColumnExtensions::default();
3164 let result = ParserContext::parse_column_extensions(
3165 &mut parser,
3166 &name,
3167 &data_type,
3168 &mut extensions,
3169 );
3170 assert!(result.is_ok());
3171 assert!(extensions.inverted_index_options.is_some());
3172 }
3173
3174 {
3176 let sql = "INVERTED INDEX WITH (analyzer = 'English')";
3177 let dialect = GenericDialect {};
3178 let mut tokenizer = Tokenizer::new(&dialect, sql);
3179 let tokens = tokenizer.tokenize().unwrap();
3180 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3181 let name = Ident::new("col");
3182 let data_type = DataType::String(None);
3183 let mut extensions = ColumnExtensions::default();
3184 let result = ParserContext::parse_column_extensions(
3185 &mut parser,
3186 &name,
3187 &data_type,
3188 &mut extensions,
3189 );
3190 assert!(result.is_err());
3191 assert!(
3192 result
3193 .unwrap_err()
3194 .to_string()
3195 .contains("INVERTED index doesn't support options")
3196 );
3197 }
3198
3199 {
3201 let sql = "SKIPPING INDEX FULLTEXT INDEX";
3202 let dialect = GenericDialect {};
3203 let mut tokenizer = Tokenizer::new(&dialect, sql);
3204 let tokens = tokenizer.tokenize().unwrap();
3205 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3206 let name = Ident::new("col");
3207 let data_type = DataType::String(None);
3208 let mut extensions = ColumnExtensions::default();
3209 let result = ParserContext::parse_column_extensions(
3210 &mut parser,
3211 &name,
3212 &data_type,
3213 &mut extensions,
3214 );
3215 assert!(result.unwrap());
3216 assert!(extensions.skipping_index_options.is_some());
3217 assert!(extensions.fulltext_index_options.is_some());
3218 }
3219 }
3220
3221 #[test]
3222 fn test_parse_interval_cast() {
3223 let s = "select '10s'::INTERVAL";
3224 let stmts =
3225 ParserContext::create_with_dialect(s, &GreptimeDbDialect {}, ParseOptions::default())
3226 .unwrap();
3227 assert_eq!("SELECT '10 seconds'::INTERVAL", &stmts[0].to_string());
3228 }
3229
3230 #[test]
3231 fn test_parse_create_table_vector_index_options() {
3232 let sql = r"
3234CREATE TABLE vectors (
3235 ts TIMESTAMP TIME INDEX,
3236 vec VECTOR(128) VECTOR INDEX,
3237)";
3238 let result =
3239 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3240 .unwrap();
3241
3242 if let Statement::CreateTable(c) = &result[0] {
3243 c.columns.iter().for_each(|col| {
3244 if col.name().value == "vec" {
3245 assert!(
3246 col.extensions
3247 .vector_index_options
3248 .as_ref()
3249 .unwrap()
3250 .is_empty()
3251 );
3252 }
3253 });
3254 } else {
3255 panic!("should be create_table statement");
3256 }
3257
3258 let sql = r"
3260CREATE TABLE vectors (
3261 ts TIMESTAMP TIME INDEX,
3262 vec VECTOR(128) VECTOR INDEX WITH (metric='cosine', connectivity='32', expansion_add='256', expansion_search='128')
3263)";
3264 let result =
3265 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3266 .unwrap();
3267
3268 if let Statement::CreateTable(c) = &result[0] {
3269 c.columns.iter().for_each(|col| {
3270 if col.name().value == "vec" {
3271 let options = col.extensions.vector_index_options.as_ref().unwrap();
3272 assert_eq!(options.len(), 4);
3273 assert_eq!(options.get("metric").unwrap(), "cosine");
3274 assert_eq!(options.get("connectivity").unwrap(), "32");
3275 assert_eq!(options.get("expansion_add").unwrap(), "256");
3276 assert_eq!(options.get("expansion_search").unwrap(), "128");
3277 }
3278 });
3279 } else {
3280 panic!("should be create_table statement");
3281 }
3282 }
3283
3284 #[test]
3285 fn test_parse_create_table_vector_index_invalid_type() {
3286 let sql = r"
3288CREATE TABLE vectors (
3289 ts TIMESTAMP TIME INDEX,
3290 col INT VECTOR INDEX,
3291)";
3292 let result =
3293 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3294 assert!(result.is_err());
3295 assert!(
3296 result
3297 .unwrap_err()
3298 .to_string()
3299 .contains("VECTOR INDEX only supports Vector type columns")
3300 );
3301 }
3302
3303 #[test]
3304 fn test_parse_create_table_vector_index_duplicate() {
3305 let sql = r"
3307CREATE TABLE vectors (
3308 ts TIMESTAMP TIME INDEX,
3309 vec VECTOR(128) VECTOR INDEX VECTOR INDEX,
3310)";
3311 let result =
3312 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3313 assert!(result.is_err());
3314 assert!(
3315 result
3316 .unwrap_err()
3317 .to_string()
3318 .contains("duplicated VECTOR INDEX option")
3319 );
3320 }
3321
3322 #[test]
3323 fn test_parse_create_table_vector_index_invalid_option() {
3324 let sql = r"
3326CREATE TABLE vectors (
3327 ts TIMESTAMP TIME INDEX,
3328 vec VECTOR(128) VECTOR INDEX WITH (metric='l2sq', invalid_option='foo')
3329)";
3330 let result =
3331 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3332 assert!(result.is_err());
3333 assert!(
3334 result
3335 .unwrap_err()
3336 .to_string()
3337 .contains("invalid VECTOR INDEX option")
3338 );
3339 }
3340
3341 #[test]
3342 fn test_parse_column_extensions_vector_index() {
3343 {
3345 let sql = "VECTOR INDEX WITH (metric = 'l2sq')";
3346 let dialect = GenericDialect {};
3347 let mut tokenizer = Tokenizer::new(&dialect, sql);
3348 let tokens = tokenizer.tokenize().unwrap();
3349 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3350 let name = Ident::new("vec_col");
3351 let data_type =
3352 DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
3353 let mut extensions = ColumnExtensions {
3355 vector_options: Some(OptionMap::from([(
3356 VECTOR_OPT_DIM.to_string(),
3357 "128".to_string(),
3358 )])),
3359 ..Default::default()
3360 };
3361
3362 let result = ParserContext::parse_column_extensions(
3363 &mut parser,
3364 &name,
3365 &data_type,
3366 &mut extensions,
3367 );
3368 assert!(result.is_ok());
3369 assert!(extensions.vector_index_options.is_some());
3370 let vi_options = extensions.vector_index_options.unwrap();
3371 assert_eq!(vi_options.get("metric"), Some("l2sq"));
3372 }
3373
3374 {
3376 let sql = "VECTOR INDEX";
3377 let dialect = GenericDialect {};
3378 let mut tokenizer = Tokenizer::new(&dialect, sql);
3379 let tokens = tokenizer.tokenize().unwrap();
3380 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3381 let name = Ident::new("num_col");
3382 let data_type = DataType::Int(None); let mut extensions = ColumnExtensions::default();
3384 let result = ParserContext::parse_column_extensions(
3385 &mut parser,
3386 &name,
3387 &data_type,
3388 &mut extensions,
3389 );
3390 assert!(result.is_err());
3391 assert!(
3392 result
3393 .unwrap_err()
3394 .to_string()
3395 .contains("VECTOR INDEX only supports Vector type columns")
3396 );
3397 }
3398 }
3399}