1mod json;
16#[cfg(feature = "enterprise")]
17pub mod trigger;
18
19use std::collections::HashMap;
20
21use arrow_buffer::IntervalMonthDayNano;
22use common_catalog::consts::default_engine;
23use datafusion_common::ScalarValue;
24use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
25use datatypes::data_type::ConcreteDataType;
26use itertools::Itertools;
27use snafu::{OptionExt, ResultExt, ensure};
28use sqlparser::ast::{
29 ColumnOption, ColumnOptionDef, DataType, Expr, KeyOrIndexDisplay, NullsDistinctOption,
30 PrimaryKeyConstraint, UniqueConstraint,
31};
32use sqlparser::dialect::keywords::Keyword;
33use sqlparser::keywords::ALL_KEYWORDS;
34use sqlparser::parser::IsOptional::Mandatory;
35use sqlparser::parser::{Parser, ParserError};
36use sqlparser::tokenizer::{Token, TokenWithSpan, Word};
37use table::requests::validate_database_option;
38
39use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
40use crate::error::{
41 self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidFlowQuerySnafu,
42 InvalidIntervalSnafu, InvalidSqlSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result,
43 SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu,
44};
45use crate::parser::{FLOW, ParserContext};
46use crate::parsers::tql_parser;
47use crate::parsers::utils::{
48 self, parse_with_options, validate_column_fulltext_create_option,
49 validate_column_skipping_index_create_option, validate_column_vector_index_create_option,
50};
51use crate::statements::create::{
52 Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
53 CreateTableLike, CreateView, Partitions, SqlOrTql, TableConstraint, VECTOR_OPT_DIM,
54};
55use crate::statements::statement::Statement;
56use crate::statements::transform::type_alias::get_data_type_by_alias_name;
57use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
58use crate::util::{OptionValue, location_to_index, parse_option_string};
59
60pub const ENGINE: &str = "ENGINE";
61pub const MAXVALUE: &str = "MAXVALUE";
62pub const SINK: &str = "SINK";
63pub const EXPIRE: &str = "EXPIRE";
64pub const AFTER: &str = "AFTER";
65pub const INVERTED: &str = "INVERTED";
66pub const SKIPPING: &str = "SKIPPING";
67pub const VECTOR: &str = "VECTOR";
68
69pub type RawIntervalExpr = String;
70
71fn flow_option_map(options: HashMap<String, OptionValue>) -> OptionMap {
75 let mut flow_options = OptionMap::default();
76 for (key, value) in options {
77 flow_options.insert_options(&key, value);
78 }
79 flow_options
80}
81
82impl<'a> ParserContext<'a> {
84 pub(crate) fn parse_create(&mut self) -> Result<Statement> {
85 match self.parser.peek_token().token {
86 Token::Word(w) => match w.keyword {
87 Keyword::TABLE => self.parse_create_table(),
88
89 Keyword::SCHEMA | Keyword::DATABASE => self.parse_create_database(),
90
91 Keyword::EXTERNAL => self.parse_create_external_table(),
92
93 Keyword::OR => {
94 let _ = self.parser.next_token();
95 self.parser
96 .expect_keyword(Keyword::REPLACE)
97 .context(SyntaxSnafu)?;
98 match self.parser.next_token().token {
99 Token::Word(w) => match w.keyword {
100 Keyword::VIEW => self.parse_create_view(true),
101 Keyword::NoKeyword => {
102 let uppercase = w.value.to_uppercase();
103 match uppercase.as_str() {
104 FLOW => self.parse_create_flow(true),
105 _ => self.unsupported(w.to_string()),
106 }
107 }
108 _ => self.unsupported(w.to_string()),
109 },
110 _ => self.unsupported(w.to_string()),
111 }
112 }
113
114 Keyword::VIEW => {
115 let _ = self.parser.next_token();
116 self.parse_create_view(false)
117 }
118
119 #[cfg(feature = "enterprise")]
120 Keyword::TRIGGER => {
121 let _ = self.parser.next_token();
122 self.parse_create_trigger()
123 }
124
125 Keyword::NoKeyword => {
126 let _ = self.parser.next_token();
127 let uppercase = w.value.to_uppercase();
128 match uppercase.as_str() {
129 FLOW => self.parse_create_flow(false),
130 _ => self.unsupported(w.to_string()),
131 }
132 }
133 _ => self.unsupported(w.to_string()),
134 },
135 unexpected => self.unsupported(unexpected.to_string()),
136 }
137 }
138
139 fn parse_create_view(&mut self, or_replace: bool) -> Result<Statement> {
141 let if_not_exists = self.parse_if_not_exist()?;
142 let view_name = self.intern_parse_table_name()?;
143
144 let columns = self.parse_view_columns()?;
145
146 self.parser
147 .expect_keyword(Keyword::AS)
148 .context(SyntaxSnafu)?;
149
150 let query = self.parse_query()?;
151
152 Ok(Statement::CreateView(CreateView {
153 name: view_name,
154 columns,
155 or_replace,
156 query: Box::new(query),
157 if_not_exists,
158 }))
159 }
160
161 fn parse_view_columns(&mut self) -> Result<Vec<Ident>> {
162 let mut columns = vec![];
163 if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
164 return Ok(columns);
165 }
166
167 loop {
168 let name = self.parse_column_name().context(SyntaxSnafu)?;
169
170 columns.push(name);
171
172 let comma = self.parser.consume_token(&Token::Comma);
173 if self.parser.consume_token(&Token::RParen) {
174 break;
176 } else if !comma {
177 return self.expected("',' or ')' after column name", self.parser.peek_token());
178 }
179 }
180
181 Ok(columns)
182 }
183
184 fn parse_create_external_table(&mut self) -> Result<Statement> {
185 let _ = self.parser.next_token();
186 self.parser
187 .expect_keyword(Keyword::TABLE)
188 .context(SyntaxSnafu)?;
189 let if_not_exists = self.parse_if_not_exist()?;
190 let table_name = self.intern_parse_table_name()?;
191 let (columns, constraints) = self.parse_columns()?;
192 if !columns.is_empty() {
193 validate_time_index(&columns, &constraints)?;
194 }
195
196 let engine = self.parse_table_engine(common_catalog::consts::FILE_ENGINE)?;
197 let options = self.parse_create_table_options()?;
198 Ok(Statement::CreateExternalTable(CreateExternalTable {
199 name: table_name,
200 columns,
201 constraints,
202 options,
203 if_not_exists,
204 engine,
205 }))
206 }
207
208 fn parse_create_database(&mut self) -> Result<Statement> {
209 let _ = self.parser.next_token();
210 let if_not_exists = self.parse_if_not_exist()?;
211 let database_name = self.parse_object_name().context(error::UnexpectedSnafu {
212 expected: "a database name",
213 actual: self.peek_token_as_string(),
214 })?;
215 let database_name = Self::canonicalize_object_name(database_name)?;
216
217 let options = self
218 .parser
219 .parse_options(Keyword::WITH)
220 .context(SyntaxSnafu)?
221 .into_iter()
222 .map(parse_option_string)
223 .collect::<Result<HashMap<String, OptionValue>>>()?;
224
225 for key in options.keys() {
226 ensure!(
227 validate_database_option(key),
228 InvalidDatabaseOptionSnafu { key: key.clone() }
229 );
230 }
231 if let Some(append_mode) = options.get("append_mode").and_then(|x| x.as_string())
232 && append_mode == "true"
233 && options.contains_key("merge_mode")
234 {
235 return InvalidDatabaseOptionSnafu {
236 key: "merge_mode".to_string(),
237 }
238 .fail();
239 }
240
241 Ok(Statement::CreateDatabase(CreateDatabase {
242 name: database_name,
243 if_not_exists,
244 options: OptionMap::new(options),
245 }))
246 }
247
248 fn parse_create_table(&mut self) -> Result<Statement> {
249 let _ = self.parser.next_token();
250
251 let if_not_exists = self.parse_if_not_exist()?;
252
253 let table_name = self.intern_parse_table_name()?;
254
255 if self.parser.parse_keyword(Keyword::LIKE) {
256 let source_name = self.intern_parse_table_name()?;
257
258 return Ok(Statement::CreateTableLike(CreateTableLike {
259 table_name,
260 source_name,
261 }));
262 }
263
264 let (columns, constraints) = self.parse_columns()?;
265 validate_time_index(&columns, &constraints)?;
266
267 let partitions = self.parse_partitions()?;
268 if let Some(partitions) = &partitions {
269 validate_partitions(&columns, partitions)?;
270 }
271
272 let engine = self.parse_table_engine(default_engine())?;
273 let options = self.parse_create_table_options()?;
274 let create_table = CreateTable {
275 if_not_exists,
276 name: table_name,
277 columns,
278 engine,
279 constraints,
280 options,
281 table_id: 0, partitions,
283 };
284
285 Ok(Statement::CreateTable(create_table))
286 }
287
288 fn parse_create_flow(&mut self, or_replace: bool) -> Result<Statement> {
290 let if_not_exists = self.parse_if_not_exist()?;
291
292 let flow_name = self.intern_parse_table_name()?;
293
294 if let Token::Word(word) = self.parser.peek_token().token
296 && word.value.eq_ignore_ascii_case(SINK)
297 {
298 self.parser.next_token();
299 } else {
300 Err(ParserError::ParserError(
301 "Expect `SINK` keyword".to_string(),
302 ))
303 .context(SyntaxSnafu)?
304 }
305 self.parser
306 .expect_keyword(Keyword::TO)
307 .context(SyntaxSnafu)?;
308
309 let output_table_name = self.intern_parse_table_name()?;
310
311 let expire_after = if let Token::Word(w1) = &self.parser.peek_token().token
312 && w1.value.eq_ignore_ascii_case(EXPIRE)
313 {
314 self.parser.next_token();
315 if let Token::Word(w2) = &self.parser.peek_token().token
316 && w2.value.eq_ignore_ascii_case(AFTER)
317 {
318 self.parser.next_token();
319 Some(self.parse_interval_no_month("EXPIRE AFTER")?)
320 } else {
321 None
322 }
323 } else {
324 None
325 };
326
327 let eval_interval = if self
328 .parser
329 .consume_tokens(&[Token::make_keyword("EVAL"), Token::make_keyword("INTERVAL")])
330 {
331 Some(self.parse_interval_no_month("EVAL INTERVAL")?)
332 } else {
333 None
334 };
335
336 let comment = if self.parser.parse_keyword(Keyword::COMMENT) {
337 match self.parser.next_token() {
338 TokenWithSpan {
339 token: Token::SingleQuotedString(value, ..),
340 ..
341 } => Some(value),
342 unexpected => {
343 return self
344 .parser
345 .expected("string", unexpected)
346 .context(SyntaxSnafu);
347 }
348 }
349 } else {
350 None
351 };
352
353 let flow_options = self
354 .parser
355 .parse_options(Keyword::WITH)
356 .context(SyntaxSnafu)?
357 .into_iter()
358 .map(parse_option_string)
359 .collect::<Result<HashMap<String, OptionValue>>>()?;
360
361 self.parser
362 .expect_keyword(Keyword::AS)
363 .context(SyntaxSnafu)?;
364
365 let query = Box::new(self.parse_flow_sql_or_tql(true)?);
366
367 Ok(Statement::CreateFlow(CreateFlow {
368 flow_name,
369 sink_table_name: output_table_name,
370 or_replace,
371 if_not_exists,
372 expire_after,
373 eval_interval,
374 comment,
375 flow_options: flow_option_map(flow_options),
376 query,
377 }))
378 }
379
380 fn parse_flow_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
381 let start_loc = self.parser.peek_token().span.start;
382 let start_index = location_to_index(self.sql, &start_loc);
383
384 let starts_with_with = matches!(
385 self.parser.peek_token().token,
386 Token::Word(w) if w.keyword == Keyword::WITH
387 );
388
389 let query = match self.parser.peek_token().token {
391 Token::Word(w) => match w.keyword {
392 Keyword::SELECT => self.parse_query(),
393 Keyword::WITH => self.parse_with_tql_with_now(require_now_expr),
394 Keyword::NoKeyword
395 if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
396 {
397 self.parse_tql(require_now_expr)
398 }
399
400 _ => self.unsupported(self.peek_token_as_string()),
401 },
402 _ => self.unsupported(self.peek_token_as_string()),
403 }?;
404
405 if starts_with_with {
406 let Statement::Query(query) = &query else {
407 return InvalidFlowQuerySnafu {
408 reason: "Expect a query after WITH".to_string(),
409 }
410 .fail();
411 };
412
413 if !utils::is_simple_tql_cte_query(query) {
414 return InvalidFlowQuerySnafu {
415 reason: "WITH is only supported for the simplest TQL CTE in CREATE FLOW"
416 .to_string(),
417 }
418 .fail();
419 }
420 }
421
422 let end_token = self.parser.peek_token();
423
424 let raw_query = if end_token == Token::EOF {
425 &self.sql[start_index..]
426 } else {
427 let end_loc = end_token.span.end;
428 let end_index = location_to_index(self.sql, &end_loc);
429 &self.sql[start_index..end_index.min(self.sql.len())]
430 };
431 let raw_query = raw_query.trim_end_matches(";");
432
433 let query = SqlOrTql::try_from_statement(query, raw_query)?;
434 Ok(query)
435 }
436
437 fn parse_interval_no_month(&mut self, context: &str) -> Result<i64> {
439 let interval = self.parse_interval_month_day_nano()?.0;
440 if interval.months != 0 {
441 return InvalidIntervalSnafu {
442 reason: format!("Interval with months is not allowed in {context}"),
443 }
444 .fail();
445 }
446 Ok(
447 interval.nanoseconds / 1_000_000_000
448 + interval.days as i64 * 60 * 60 * 24
449 + interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, )
453 }
454
455 fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> {
457 let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
458 let raw_interval_expr = interval_expr.to_string();
459 let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone(), false)?
460 .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
461 .ok()
462 .with_context(|| InvalidIntervalSnafu {
463 reason: format!("cannot cast {} to interval type", interval_expr),
464 })?;
465 if let ScalarValue::IntervalMonthDayNano(Some(interval)) = interval {
466 Ok((interval, raw_interval_expr))
467 } else {
468 unreachable!()
469 }
470 }
471
472 fn parse_if_not_exist(&mut self) -> Result<bool> {
473 match self.parser.peek_token().token {
474 Token::Word(w) if Keyword::IF != w.keyword => return Ok(false),
475 _ => {}
476 }
477
478 if self.parser.parse_keywords(&[Keyword::IF, Keyword::NOT]) {
479 return self
480 .parser
481 .expect_keyword(Keyword::EXISTS)
482 .map(|_| true)
483 .context(UnexpectedSnafu {
484 expected: "EXISTS",
485 actual: self.peek_token_as_string(),
486 });
487 }
488
489 if self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]) {
490 return UnsupportedSnafu { keyword: "EXISTS" }.fail();
491 }
492
493 Ok(false)
494 }
495
496 fn parse_create_table_options(&mut self) -> Result<OptionMap> {
497 parse_with_options(&mut self.parser)
498 }
499
500 fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
502 if !self.parser.parse_keyword(Keyword::PARTITION) {
503 return Ok(None);
504 }
505 self.parser
506 .expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
507 .context(error::UnexpectedSnafu {
508 expected: "ON, COLUMNS",
509 actual: self.peek_token_as_string(),
510 })?;
511
512 let raw_column_list = self
513 .parser
514 .parse_parenthesized_column_list(Mandatory, false)
515 .context(error::SyntaxSnafu)?;
516 let column_list = raw_column_list
517 .into_iter()
518 .map(Self::canonicalize_identifier)
519 .collect();
520
521 let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
522
523 Ok(Some(Partitions { column_list, exprs }))
524 }
525
526 fn parse_partition_entry(&mut self) -> Result<Expr> {
527 self.parser.parse_expr().context(error::SyntaxSnafu)
528 }
529
530 fn parse_comma_separated<T, F>(&mut self, mut f: F) -> Result<Vec<T>>
532 where
533 F: FnMut(&mut ParserContext<'a>) -> Result<T>,
534 {
535 self.parser
536 .expect_token(&Token::LParen)
537 .context(error::UnexpectedSnafu {
538 expected: "(",
539 actual: self.peek_token_as_string(),
540 })?;
541
542 let mut values = vec![];
543 while self.parser.peek_token() != Token::RParen {
544 values.push(f(self)?);
545 if !self.parser.consume_token(&Token::Comma) {
546 break;
547 }
548 }
549
550 self.parser
551 .expect_token(&Token::RParen)
552 .context(error::UnexpectedSnafu {
553 expected: ")",
554 actual: self.peek_token_as_string(),
555 })?;
556
557 Ok(values)
558 }
559
560 fn parse_columns(&mut self) -> Result<(Vec<Column>, Vec<TableConstraint>)> {
562 let mut columns = vec![];
563 let mut constraints = vec![];
564 if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
565 return Ok((columns, constraints));
566 }
567
568 loop {
569 if let Some(constraint) = self.parse_optional_table_constraint()? {
570 constraints.push(constraint);
571 } else if let Token::Word(_) = self.parser.peek_token().token {
572 self.parse_column(&mut columns, &mut constraints)?;
573 } else {
574 return self.expected(
575 "column name or constraint definition",
576 self.parser.peek_token(),
577 );
578 }
579 let comma = self.parser.consume_token(&Token::Comma);
580 if self.parser.consume_token(&Token::RParen) {
581 break;
583 } else if !comma {
584 return self.expected(
585 "',' or ')' after column definition",
586 self.parser.peek_token(),
587 );
588 }
589 }
590
591 Ok((columns, constraints))
592 }
593
594 fn parse_column(
595 &mut self,
596 columns: &mut Vec<Column>,
597 constraints: &mut Vec<TableConstraint>,
598 ) -> Result<()> {
599 let mut column = self.parse_column_def()?;
600
601 let mut time_index_opt_idx = None;
602 for (index, opt) in column.options().iter().enumerate() {
603 if let ColumnOption::DialectSpecific(tokens) = &opt.option
604 && matches!(
605 &tokens[..],
606 [
607 Token::Word(Word {
608 keyword: Keyword::TIME,
609 ..
610 }),
611 Token::Word(Word {
612 keyword: Keyword::INDEX,
613 ..
614 })
615 ]
616 )
617 {
618 ensure!(
619 time_index_opt_idx.is_none(),
620 InvalidColumnOptionSnafu {
621 name: column.name().to_string(),
622 msg: "duplicated time index",
623 }
624 );
625 time_index_opt_idx = Some(index);
626
627 let constraint = TableConstraint::TimeIndex {
628 column: Ident::new(column.name().value.clone()),
629 };
630 constraints.push(constraint);
631 }
632 }
633
634 if let Some(index) = time_index_opt_idx {
635 ensure!(
636 !column.options().contains(&ColumnOptionDef {
637 option: ColumnOption::Null,
638 name: None,
639 }),
640 InvalidColumnOptionSnafu {
641 name: column.name().to_string(),
642 msg: "time index column can't be null",
643 }
644 );
645
646 let data_type = get_unalias_type(column.data_type());
648 ensure!(
649 matches!(data_type, DataType::Timestamp(_, _)),
650 InvalidColumnOptionSnafu {
651 name: column.name().to_string(),
652 msg: "time index column data type should be timestamp",
653 }
654 );
655
656 let not_null_opt = ColumnOptionDef {
657 option: ColumnOption::NotNull,
658 name: None,
659 };
660
661 if !column.options().contains(¬_null_opt) {
662 column.mut_options().push(not_null_opt);
663 }
664
665 let _ = column.mut_options().remove(index);
666 }
667
668 columns.push(column);
669
670 Ok(())
671 }
672
673 fn parse_column_name(&mut self) -> std::result::Result<Ident, ParserError> {
675 let name = self.parser.parse_identifier()?;
676 if name.quote_style.is_none() &&
677 ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()
679 {
680 return Err(ParserError::ParserError(format!(
681 "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
682 &name.value
683 )));
684 }
685
686 Ok(name)
687 }
688
689 pub fn parse_column_def(&mut self) -> Result<Column> {
690 let name = self.parse_column_name().context(SyntaxSnafu)?;
691 let parser = &mut self.parser;
692
693 ensure!(
694 !(name.quote_style.is_none() &&
695 ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()),
697 InvalidSqlSnafu {
698 msg: format!(
699 "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
700 &name.value
701 ),
702 }
703 );
704
705 let mut extensions = ColumnExtensions::default();
706
707 let data_type = parser.parse_data_type().context(SyntaxSnafu)?;
708 if matches!(data_type, DataType::JSON) {
711 extensions.json_datatype_options = json::parse_json_datatype_options(parser)?;
712 }
713
714 let mut options = vec![];
715 loop {
716 if parser.parse_keyword(Keyword::CONSTRAINT) {
717 let name = Some(parser.parse_identifier().context(SyntaxSnafu)?);
718 if let Some(option) = Self::parse_optional_column_option(parser)? {
719 options.push(ColumnOptionDef { name, option });
720 } else {
721 return parser
722 .expected(
723 "constraint details after CONSTRAINT <name>",
724 parser.peek_token(),
725 )
726 .context(SyntaxSnafu);
727 }
728 } else if let Some(option) = Self::parse_optional_column_option(parser)? {
729 options.push(ColumnOptionDef { name: None, option });
730 } else if !Self::parse_column_extensions(parser, &name, &data_type, &mut extensions)? {
731 break;
732 };
733 }
734
735 Ok(Column {
736 column_def: ColumnDef {
737 name: Self::canonicalize_identifier(name),
738 data_type,
739 options,
740 },
741 extensions,
742 })
743 }
744
745 fn parse_optional_column_option(parser: &mut Parser<'_>) -> Result<Option<ColumnOption>> {
746 if parser.parse_keywords(&[Keyword::CHARACTER, Keyword::SET]) {
747 Ok(Some(ColumnOption::CharacterSet(
748 parser.parse_object_name(false).context(SyntaxSnafu)?,
749 )))
750 } else if parser.parse_keywords(&[Keyword::NOT, Keyword::NULL]) {
751 Ok(Some(ColumnOption::NotNull))
752 } else if parser.parse_keywords(&[Keyword::COMMENT]) {
753 match parser.next_token() {
754 TokenWithSpan {
755 token: Token::SingleQuotedString(value, ..),
756 ..
757 } => Ok(Some(ColumnOption::Comment(value))),
758 unexpected => parser.expected("string", unexpected).context(SyntaxSnafu),
759 }
760 } else if parser.parse_keyword(Keyword::NULL) {
761 Ok(Some(ColumnOption::Null))
762 } else if parser.parse_keyword(Keyword::DEFAULT) {
763 Ok(Some(ColumnOption::Default(
764 parser.parse_expr().context(SyntaxSnafu)?,
765 )))
766 } else if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
767 Ok(Some(ColumnOption::PrimaryKey(PrimaryKeyConstraint {
768 name: None,
769 index_name: None,
770 index_type: None,
771 columns: vec![],
772 index_options: vec![],
773 characteristics: None,
774 })))
775 } else if parser.parse_keyword(Keyword::UNIQUE) {
776 Ok(Some(ColumnOption::Unique(UniqueConstraint {
777 name: None,
778 index_name: None,
779 index_type_display: KeyOrIndexDisplay::None,
780 index_type: None,
781 columns: vec![],
782 index_options: vec![],
783 characteristics: None,
784 nulls_distinct: NullsDistinctOption::None,
785 })))
786 } else if parser.parse_keywords(&[Keyword::TIME, Keyword::INDEX]) {
787 Ok(Some(ColumnOption::DialectSpecific(vec![
789 Token::Word(Word {
790 value: "TIME".to_string(),
791 quote_style: None,
792 keyword: Keyword::TIME,
793 }),
794 Token::Word(Word {
795 value: "INDEX".to_string(),
796 quote_style: None,
797 keyword: Keyword::INDEX,
798 }),
799 ])))
800 } else {
801 Ok(None)
802 }
803 }
804
805 fn parse_column_extensions(
811 parser: &mut Parser<'_>,
812 column_name: &Ident,
813 column_type: &DataType,
814 column_extensions: &mut ColumnExtensions,
815 ) -> Result<bool> {
816 if let DataType::Custom(name, tokens) = column_type
817 && name.0.len() == 1
818 && &name.0[0].to_string_unquoted().to_uppercase() == "VECTOR"
819 {
820 ensure!(
821 tokens.len() == 1,
822 InvalidColumnOptionSnafu {
823 name: column_name.to_string(),
824 msg: "VECTOR type should have dimension",
825 }
826 );
827
828 let dimension =
829 tokens[0]
830 .parse::<u32>()
831 .ok()
832 .with_context(|| InvalidColumnOptionSnafu {
833 name: column_name.to_string(),
834 msg: "dimension should be a positive integer",
835 })?;
836
837 let options = OptionMap::from([(VECTOR_OPT_DIM.to_string(), dimension.to_string())]);
838 column_extensions.vector_options = Some(options);
839 }
840
841 let mut is_index_declared = false;
843
844 if let Token::Word(word) = parser.peek_token().token
846 && word.value.eq_ignore_ascii_case(SKIPPING)
847 {
848 parser.next_token();
849 ensure!(
851 parser.parse_keyword(Keyword::INDEX),
852 InvalidColumnOptionSnafu {
853 name: column_name.to_string(),
854 msg: "expect INDEX after SKIPPING keyword",
855 }
856 );
857 ensure!(
858 column_extensions.skipping_index_options.is_none(),
859 InvalidColumnOptionSnafu {
860 name: column_name.to_string(),
861 msg: "duplicated SKIPPING index option",
862 }
863 );
864
865 let options = parser
866 .parse_options(Keyword::WITH)
867 .context(error::SyntaxSnafu)?
868 .into_iter()
869 .map(parse_option_string)
870 .collect::<Result<Vec<_>>>()?;
871
872 for (key, _) in options.iter() {
873 ensure!(
874 validate_column_skipping_index_create_option(key),
875 InvalidColumnOptionSnafu {
876 name: column_name.to_string(),
877 msg: format!("invalid SKIPPING INDEX option: {key}"),
878 }
879 );
880 }
881
882 let options = OptionMap::new(options);
883 column_extensions.skipping_index_options = Some(options);
884 is_index_declared |= true;
885 }
886
887 if parser.parse_keyword(Keyword::FULLTEXT) {
889 ensure!(
891 parser.parse_keyword(Keyword::INDEX),
892 InvalidColumnOptionSnafu {
893 name: column_name.to_string(),
894 msg: "expect INDEX after FULLTEXT keyword",
895 }
896 );
897
898 ensure!(
899 column_extensions.fulltext_index_options.is_none(),
900 InvalidColumnOptionSnafu {
901 name: column_name.to_string(),
902 msg: "duplicated FULLTEXT INDEX option",
903 }
904 );
905
906 let column_type = get_unalias_type(column_type);
907 let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
908 ensure!(
909 data_type == ConcreteDataType::string_datatype(),
910 InvalidColumnOptionSnafu {
911 name: column_name.to_string(),
912 msg: "FULLTEXT index only supports string type",
913 }
914 );
915
916 let options = parser
917 .parse_options(Keyword::WITH)
918 .context(error::SyntaxSnafu)?
919 .into_iter()
920 .map(parse_option_string)
921 .collect::<Result<Vec<_>>>()?;
922
923 for (key, _) in options.iter() {
924 ensure!(
925 validate_column_fulltext_create_option(key),
926 InvalidColumnOptionSnafu {
927 name: column_name.to_string(),
928 msg: format!("invalid FULLTEXT INDEX option: {key}"),
929 }
930 );
931 }
932
933 let options = OptionMap::new(options);
934 column_extensions.fulltext_index_options = Some(options);
935 is_index_declared |= true;
936 }
937
938 if let Token::Word(word) = parser.peek_token().token
940 && word.value.eq_ignore_ascii_case(INVERTED)
941 {
942 parser.next_token();
943 ensure!(
945 parser.parse_keyword(Keyword::INDEX),
946 InvalidColumnOptionSnafu {
947 name: column_name.to_string(),
948 msg: "expect INDEX after INVERTED keyword",
949 }
950 );
951
952 ensure!(
953 column_extensions.inverted_index_options.is_none(),
954 InvalidColumnOptionSnafu {
955 name: column_name.to_string(),
956 msg: "duplicated INVERTED index option",
957 }
958 );
959
960 let with_token = parser.peek_token();
963 ensure!(
964 with_token.token
965 != Token::Word(Word {
966 value: "WITH".to_string(),
967 keyword: Keyword::WITH,
968 quote_style: None,
969 }),
970 InvalidColumnOptionSnafu {
971 name: column_name.to_string(),
972 msg: "INVERTED index doesn't support options",
973 }
974 );
975
976 column_extensions.inverted_index_options = Some(OptionMap::default());
977 is_index_declared |= true;
978 }
979
980 if let Token::Word(word) = parser.peek_token().token
982 && word.value.eq_ignore_ascii_case(VECTOR)
983 {
984 parser.next_token();
985 ensure!(
987 parser.parse_keyword(Keyword::INDEX),
988 InvalidColumnOptionSnafu {
989 name: column_name.to_string(),
990 msg: "expect INDEX after VECTOR keyword",
991 }
992 );
993
994 ensure!(
995 column_extensions.vector_index_options.is_none(),
996 InvalidColumnOptionSnafu {
997 name: column_name.to_string(),
998 msg: "duplicated VECTOR INDEX option",
999 }
1000 );
1001
1002 let column_type = get_unalias_type(column_type);
1004 let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
1005 ensure!(
1006 matches!(data_type, ConcreteDataType::Vector(_)),
1007 InvalidColumnOptionSnafu {
1008 name: column_name.to_string(),
1009 msg: "VECTOR INDEX only supports Vector type columns",
1010 }
1011 );
1012
1013 let options = parser
1014 .parse_options(Keyword::WITH)
1015 .context(error::SyntaxSnafu)?
1016 .into_iter()
1017 .map(parse_option_string)
1018 .collect::<Result<Vec<_>>>()?;
1019
1020 for (key, _) in options.iter() {
1021 ensure!(
1022 validate_column_vector_index_create_option(key),
1023 InvalidColumnOptionSnafu {
1024 name: column_name.to_string(),
1025 msg: format!("invalid VECTOR INDEX option: {key}"),
1026 }
1027 );
1028 }
1029
1030 let options = OptionMap::new(options);
1031 column_extensions.vector_index_options = Some(options);
1032 is_index_declared |= true;
1033 }
1034
1035 Ok(is_index_declared)
1036 }
1037
1038 fn parse_optional_table_constraint(&mut self) -> Result<Option<TableConstraint>> {
1039 match self.parser.next_token() {
1040 TokenWithSpan {
1041 token: Token::Word(w),
1042 ..
1043 } if w.keyword == Keyword::PRIMARY => {
1044 self.parser
1045 .expect_keyword(Keyword::KEY)
1046 .context(error::UnexpectedSnafu {
1047 expected: "KEY",
1048 actual: self.peek_token_as_string(),
1049 })?;
1050 let raw_columns = self
1051 .parser
1052 .parse_parenthesized_column_list(Mandatory, false)
1053 .context(error::SyntaxSnafu)?;
1054 let columns = raw_columns
1055 .into_iter()
1056 .map(Self::canonicalize_identifier)
1057 .collect();
1058 Ok(Some(TableConstraint::PrimaryKey { columns }))
1059 }
1060 TokenWithSpan {
1061 token: Token::Word(w),
1062 ..
1063 } if w.keyword == Keyword::TIME => {
1064 self.parser
1065 .expect_keyword(Keyword::INDEX)
1066 .context(error::UnexpectedSnafu {
1067 expected: "INDEX",
1068 actual: self.peek_token_as_string(),
1069 })?;
1070
1071 let raw_columns = self
1072 .parser
1073 .parse_parenthesized_column_list(Mandatory, false)
1074 .context(error::SyntaxSnafu)?;
1075 let mut columns = raw_columns
1076 .into_iter()
1077 .map(Self::canonicalize_identifier)
1078 .collect::<Vec<_>>();
1079
1080 ensure!(
1081 columns.len() == 1,
1082 InvalidTimeIndexSnafu {
1083 msg: "it should contain only one column in time index",
1084 }
1085 );
1086
1087 Ok(Some(TableConstraint::TimeIndex {
1088 column: columns.pop().unwrap(),
1089 }))
1090 }
1091 _ => {
1092 self.parser.prev_token();
1093 Ok(None)
1094 }
1095 }
1096 }
1097
1098 fn parse_table_engine(&mut self, default: &str) -> Result<String> {
1100 if !self.consume_token(ENGINE) {
1101 return Ok(default.to_string());
1102 }
1103
1104 self.parser
1105 .expect_token(&Token::Eq)
1106 .context(error::UnexpectedSnafu {
1107 expected: "=",
1108 actual: self.peek_token_as_string(),
1109 })?;
1110
1111 let token = self.parser.next_token();
1112 if let Token::Word(w) = token.token {
1113 Ok(w.value)
1114 } else {
1115 self.expected("'Engine' is missing", token)
1116 }
1117 }
1118}
1119
1120fn validate_time_index(columns: &[Column], constraints: &[TableConstraint]) -> Result<()> {
1121 let time_index_constraints: Vec<_> = constraints
1122 .iter()
1123 .filter_map(|c| match c {
1124 TableConstraint::TimeIndex { column } => Some(column),
1125 _ => None,
1126 })
1127 .unique()
1128 .collect();
1129
1130 ensure!(!time_index_constraints.is_empty(), MissingTimeIndexSnafu);
1131 ensure!(
1132 time_index_constraints.len() == 1,
1133 InvalidTimeIndexSnafu {
1134 msg: format!(
1135 "expected only one time index constraint but actual {}",
1136 time_index_constraints.len()
1137 ),
1138 }
1139 );
1140
1141 let time_index_column_ident = &time_index_constraints[0];
1144 let time_index_column = columns
1145 .iter()
1146 .find(|c| c.name().value == *time_index_column_ident.value)
1147 .with_context(|| InvalidTimeIndexSnafu {
1148 msg: format!(
1149 "time index column {} not found in columns",
1150 time_index_column_ident
1151 ),
1152 })?;
1153
1154 let time_index_data_type = get_unalias_type(time_index_column.data_type());
1155 ensure!(
1156 matches!(time_index_data_type, DataType::Timestamp(_, _)),
1157 InvalidColumnOptionSnafu {
1158 name: time_index_column.name().to_string(),
1159 msg: "time index column data type should be timestamp",
1160 }
1161 );
1162
1163 Ok(())
1164}
1165
1166fn get_unalias_type(data_type: &DataType) -> DataType {
1167 match data_type {
1168 DataType::Custom(name, tokens) if name.0.len() == 1 && tokens.is_empty() => {
1169 if let Some(real_type) =
1170 get_data_type_by_alias_name(name.0[0].to_string_unquoted().as_str())
1171 {
1172 real_type
1173 } else {
1174 data_type.clone()
1175 }
1176 }
1177 _ => data_type.clone(),
1178 }
1179}
1180
1181fn validate_partitions(columns: &[Column], partitions: &Partitions) -> Result<()> {
1182 let partition_columns = ensure_partition_columns_defined(columns, partitions)?;
1183
1184 ensure_exprs_are_binary(&partitions.exprs, &partition_columns)?;
1185
1186 Ok(())
1187}
1188
1189fn ensure_exprs_are_binary(exprs: &[Expr], columns: &[&Column]) -> Result<()> {
1191 for expr in exprs {
1192 if let Expr::BinaryOp { left, op: _, right } = expr {
1194 ensure_one_expr(left, columns)?;
1195 ensure_one_expr(right, columns)?;
1196 } else {
1197 return error::InvalidSqlSnafu {
1198 msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1199 }
1200 .fail();
1201 }
1202 }
1203 Ok(())
1204}
1205
1206fn ensure_one_expr(expr: &Expr, columns: &[&Column]) -> Result<()> {
1210 match expr {
1211 Expr::BinaryOp { left, op: _, right } => {
1212 ensure_one_expr(left, columns)?;
1213 ensure_one_expr(right, columns)?;
1214 Ok(())
1215 }
1216 Expr::Identifier(ident) => {
1217 let column_name = &ident.value;
1218 ensure!(
1219 columns.iter().any(|c| &c.name().value == column_name),
1220 error::InvalidSqlSnafu {
1221 msg: format!(
1222 "Column {:?} in rule expr is not referenced in PARTITION ON",
1223 column_name
1224 ),
1225 }
1226 );
1227 Ok(())
1228 }
1229 Expr::Value(_) => Ok(()),
1230 Expr::UnaryOp { expr, .. } => {
1231 ensure_one_expr(expr, columns)?;
1232 Ok(())
1233 }
1234 _ => error::InvalidSqlSnafu {
1235 msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1236 }
1237 .fail(),
1238 }
1239}
1240
1241fn ensure_partition_columns_defined<'a>(
1243 columns: &'a [Column],
1244 partitions: &'a Partitions,
1245) -> Result<Vec<&'a Column>> {
1246 partitions
1247 .column_list
1248 .iter()
1249 .map(|x| {
1250 let x = ParserContext::canonicalize_identifier(x.clone());
1251 columns
1254 .iter()
1255 .find(|c| *c.name().value == x.value)
1256 .context(error::InvalidSqlSnafu {
1257 msg: format!("Partition column {:?} not defined", x.value),
1258 })
1259 })
1260 .collect::<Result<Vec<&Column>>>()
1261}
1262
1263#[cfg(test)]
1264mod tests {
1265 use std::assert_matches;
1266 use std::collections::HashMap;
1267
1268 use common_catalog::consts::FILE_ENGINE;
1269 use common_error::ext::ErrorExt;
1270 use sqlparser::ast::ColumnOption::NotNull;
1271 use sqlparser::ast::{BinaryOperator, Expr, ObjectName, ObjectNamePart, Value};
1272 use sqlparser::dialect::GenericDialect;
1273 use sqlparser::tokenizer::Tokenizer;
1274
1275 use super::*;
1276 use crate::dialect::GreptimeDbDialect;
1277 use crate::parser::ParseOptions;
1278
1279 fn string_option_map(
1280 entries: impl IntoIterator<Item = (&'static str, &'static str)>,
1281 ) -> OptionMap {
1282 OptionMap::new(entries.into_iter().map(|(key, value)| {
1283 (
1284 key.to_string(),
1285 OptionValue::try_new(Expr::Value(
1286 Value::SingleQuotedString(value.to_string()).into(),
1287 ))
1288 .unwrap(),
1289 )
1290 }))
1291 }
1292
1293 #[test]
1294 fn test_parse_create_table_like() {
1295 let sql = "CREATE TABLE t1 LIKE t2";
1296 let stmts =
1297 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1298 .unwrap();
1299
1300 assert_eq!(1, stmts.len());
1301 match &stmts[0] {
1302 Statement::CreateTableLike(c) => {
1303 assert_eq!(c.table_name.to_string(), "t1");
1304 assert_eq!(c.source_name.to_string(), "t2");
1305 }
1306 _ => unreachable!(),
1307 }
1308 }
1309
1310 #[test]
1311 fn test_validate_external_table_options() {
1312 let sql = "CREATE EXTERNAL TABLE city (
1313 host string,
1314 ts timestamp,
1315 cpu float64 default 0,
1316 memory float64,
1317 TIME INDEX (ts),
1318 PRIMARY KEY(ts, host)
1319 ) with(location='/var/data/city.csv',format='csv',foo='bar');";
1320
1321 let result =
1322 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1323 assert!(matches!(
1324 result,
1325 Err(error::Error::InvalidTableOption { .. })
1326 ));
1327 }
1328
1329 #[test]
1330 fn test_parse_create_external_table() {
1331 struct Test<'a> {
1332 sql: &'a str,
1333 expected_table_name: &'a str,
1334 expected_options: HashMap<&'a str, &'a str>,
1335 expected_engine: &'a str,
1336 expected_if_not_exist: bool,
1337 }
1338
1339 let tests = [
1340 Test {
1341 sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
1342 expected_table_name: "city",
1343 expected_options: HashMap::from([
1344 ("location", "/var/data/city.csv"),
1345 ("format", "csv"),
1346 ]),
1347 expected_engine: FILE_ENGINE,
1348 expected_if_not_exist: false,
1349 },
1350 Test {
1351 sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');",
1352 expected_table_name: "city",
1353 expected_options: HashMap::from([
1354 ("location", "/var/data/city.csv"),
1355 ("format", "csv"),
1356 ]),
1357 expected_engine: "foo",
1358 expected_if_not_exist: true,
1359 },
1360 Test {
1361 sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv','compaction.type'='bar');",
1362 expected_table_name: "city",
1363 expected_options: HashMap::from([
1364 ("location", "/var/data/city.csv"),
1365 ("format", "csv"),
1366 ("compaction.type", "bar"),
1367 ]),
1368 expected_engine: "foo",
1369 expected_if_not_exist: true,
1370 },
1371 ];
1372
1373 for test in tests {
1374 let stmts = ParserContext::create_with_dialect(
1375 test.sql,
1376 &GreptimeDbDialect {},
1377 ParseOptions::default(),
1378 )
1379 .unwrap();
1380 assert_eq!(1, stmts.len());
1381 match &stmts[0] {
1382 Statement::CreateExternalTable(c) => {
1383 assert_eq!(c.name.to_string(), test.expected_table_name.to_string());
1384 assert_eq!(c.options.to_str_map(), test.expected_options);
1385 assert_eq!(c.if_not_exists, test.expected_if_not_exist);
1386 assert_eq!(c.engine, test.expected_engine);
1387 }
1388 _ => unreachable!(),
1389 }
1390 }
1391 }
1392
1393 #[test]
1394 fn test_parse_create_external_table_with_schema() {
1395 let sql = "CREATE EXTERNAL TABLE city (
1396 host string,
1397 ts timestamp,
1398 cpu float32 default 0,
1399 memory float64,
1400 TIME INDEX (ts),
1401 PRIMARY KEY(ts, host),
1402 ) with(location='/var/data/city.csv',format='csv');";
1403
1404 let options = HashMap::from([("location", "/var/data/city.csv"), ("format", "csv")]);
1405
1406 let stmts =
1407 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1408 .unwrap();
1409 assert_eq!(1, stmts.len());
1410 match &stmts[0] {
1411 Statement::CreateExternalTable(c) => {
1412 assert_eq!(c.name.to_string(), "city");
1413 assert_eq!(c.options.to_str_map(), options);
1414
1415 let columns = &c.columns;
1416 assert_column_def(&columns[0].column_def, "host", "STRING");
1417 assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
1418 assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
1419 assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
1420
1421 let constraints = &c.constraints;
1422 assert_eq!(
1423 &constraints[0],
1424 &TableConstraint::TimeIndex {
1425 column: Ident::new("ts"),
1426 }
1427 );
1428 assert_eq!(
1429 &constraints[1],
1430 &TableConstraint::PrimaryKey {
1431 columns: vec![Ident::new("ts"), Ident::new("host")]
1432 }
1433 );
1434 }
1435 _ => unreachable!(),
1436 }
1437 }
1438
1439 #[test]
1440 fn test_parse_create_database() {
1441 let sql = "create database";
1442 let result =
1443 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1444 assert!(
1445 result
1446 .unwrap_err()
1447 .to_string()
1448 .contains("Unexpected token while parsing SQL statement")
1449 );
1450
1451 let sql = "create database prometheus";
1452 let stmts =
1453 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1454 .unwrap();
1455
1456 assert_eq!(1, stmts.len());
1457 match &stmts[0] {
1458 Statement::CreateDatabase(c) => {
1459 assert_eq!(c.name.to_string(), "prometheus");
1460 assert!(!c.if_not_exists);
1461 }
1462 _ => unreachable!(),
1463 }
1464
1465 let sql = "create database if not exists prometheus";
1466 let stmts =
1467 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1468 .unwrap();
1469
1470 assert_eq!(1, stmts.len());
1471 match &stmts[0] {
1472 Statement::CreateDatabase(c) => {
1473 assert_eq!(c.name.to_string(), "prometheus");
1474 assert!(c.if_not_exists);
1475 }
1476 _ => unreachable!(),
1477 }
1478
1479 let sql = "CREATE DATABASE `fOo`";
1480 let result =
1481 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1482 let stmts = result.unwrap();
1483 match &stmts.last().unwrap() {
1484 Statement::CreateDatabase(c) => {
1485 assert_eq!(c.name, vec![Ident::with_quote('`', "fOo")].into());
1486 assert!(!c.if_not_exists);
1487 }
1488 _ => unreachable!(),
1489 }
1490
1491 let sql = "CREATE DATABASE prometheus with (ttl='1h');";
1492 let result =
1493 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1494 let stmts = result.unwrap();
1495 match &stmts[0] {
1496 Statement::CreateDatabase(c) => {
1497 assert_eq!(c.name.to_string(), "prometheus");
1498 assert!(!c.if_not_exists);
1499 assert_eq!(c.options.get("ttl").unwrap(), "1h");
1500 }
1501 _ => unreachable!(),
1502 }
1503 }
1504
1505 #[test]
1506 fn test_parse_create_flow_more_testcases() {
1507 use pretty_assertions::assert_eq;
1508 fn parse_create_flow(sql: &str) -> CreateFlow {
1509 let stmts = ParserContext::create_with_dialect(
1510 sql,
1511 &GreptimeDbDialect {},
1512 ParseOptions::default(),
1513 )
1514 .unwrap();
1515 assert_eq!(1, stmts.len());
1516 match &stmts[0] {
1517 Statement::CreateFlow(c) => c.clone(),
1518 _ => unreachable!(),
1519 }
1520 }
1521 struct CreateFlowWoutQuery {
1522 pub flow_name: ObjectName,
1524 pub sink_table_name: ObjectName,
1526 pub or_replace: bool,
1528 pub if_not_exists: bool,
1530 pub expire_after: Option<i64>,
1533 pub comment: Option<String>,
1535 pub flow_options: OptionMap,
1537 }
1538 let testcases = vec![
1539 (
1540 r"
1541CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1542SINK TO schema_1.table_1
1543EXPIRE AFTER INTERVAL '5 minutes'
1544COMMENT 'test comment'
1545AS
1546SELECT max(c1), min(c2) FROM schema_2.table_2;",
1547 CreateFlowWoutQuery {
1548 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1549 sink_table_name: ObjectName::from(vec![
1550 Ident::new("schema_1"),
1551 Ident::new("table_1"),
1552 ]),
1553 or_replace: true,
1554 if_not_exists: true,
1555 expire_after: Some(300),
1556 comment: Some("test comment".to_string()),
1557 flow_options: OptionMap::default(),
1558 },
1559 ),
1560 (
1561 r"
1562CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1563SINK TO schema_1.table_1
1564EXPIRE AFTER INTERVAL '300 s'
1565COMMENT 'test comment'
1566AS
1567SELECT max(c1), min(c2) FROM schema_2.table_2;",
1568 CreateFlowWoutQuery {
1569 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1570 sink_table_name: ObjectName::from(vec![
1571 Ident::new("schema_1"),
1572 Ident::new("table_1"),
1573 ]),
1574 or_replace: true,
1575 if_not_exists: true,
1576 expire_after: Some(300),
1577 comment: Some("test comment".to_string()),
1578 flow_options: OptionMap::default(),
1579 },
1580 ),
1581 (
1582 r"
1583CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1584SINK TO schema_1.table_1
1585EXPIRE AFTER '5 minutes'
1586COMMENT 'test comment'
1587AS
1588SELECT max(c1), min(c2) FROM schema_2.table_2;",
1589 CreateFlowWoutQuery {
1590 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1591 sink_table_name: ObjectName::from(vec![
1592 Ident::new("schema_1"),
1593 Ident::new("table_1"),
1594 ]),
1595 or_replace: true,
1596 if_not_exists: true,
1597 expire_after: Some(300),
1598 comment: Some("test comment".to_string()),
1599 flow_options: OptionMap::default(),
1600 },
1601 ),
1602 (
1603 r"
1604CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1605SINK TO schema_1.table_1
1606EXPIRE AFTER '300 s'
1607COMMENT 'test comment'
1608AS
1609SELECT max(c1), min(c2) FROM schema_2.table_2;",
1610 CreateFlowWoutQuery {
1611 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1612 sink_table_name: ObjectName::from(vec![
1613 Ident::new("schema_1"),
1614 Ident::new("table_1"),
1615 ]),
1616 or_replace: true,
1617 if_not_exists: true,
1618 expire_after: Some(300),
1619 comment: Some("test comment".to_string()),
1620 flow_options: OptionMap::default(),
1621 },
1622 ),
1623 (
1624 r"
1625CREATE FLOW `task_2`
1626SINK TO schema_1.table_1
1627EXPIRE AFTER '2 days 1h 2 min'
1628AS
1629SELECT max(c1), min(c2) FROM schema_2.table_2;",
1630 CreateFlowWoutQuery {
1631 flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_2")]),
1632 sink_table_name: ObjectName::from(vec![
1633 Ident::new("schema_1"),
1634 Ident::new("table_1"),
1635 ]),
1636 or_replace: false,
1637 if_not_exists: false,
1638 expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1639 comment: None,
1640 flow_options: OptionMap::default(),
1641 },
1642 ),
1643 (
1644 r"
1645create flow `task_3`
1646sink to schema_1.table_1
1647expire after '10 minutes'
1648as
1649select max(c1), min(c2) from schema_2.table_2;",
1650 CreateFlowWoutQuery {
1651 flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_3")]),
1652 sink_table_name: ObjectName::from(vec![
1653 Ident::new("schema_1"),
1654 Ident::new("table_1"),
1655 ]),
1656 or_replace: false,
1657 if_not_exists: false,
1658 expire_after: Some(600), comment: None,
1660 flow_options: OptionMap::default(),
1661 },
1662 ),
1663 (
1664 r"
1665create or replace flow if not exists task_4
1666sink to schema_1.table_1
1667expire after interval '2 hours'
1668comment 'lowercase test'
1669as
1670select max(c1), min(c2) from schema_2.table_2;",
1671 CreateFlowWoutQuery {
1672 flow_name: ObjectName::from(vec![Ident::new("task_4")]),
1673 sink_table_name: ObjectName::from(vec![
1674 Ident::new("schema_1"),
1675 Ident::new("table_1"),
1676 ]),
1677 or_replace: true,
1678 if_not_exists: true,
1679 expire_after: Some(7200), comment: Some("lowercase test".to_string()),
1681 flow_options: OptionMap::default(),
1682 },
1683 ),
1684 (
1685 r"
1686CREATE FLOW task_5
1687SINK TO schema_1.table_1
1688WITH (defer_on_missing_source = 'true')
1689AS
1690SELECT max(c1), min(c2) FROM schema_2.table_2;",
1691 CreateFlowWoutQuery {
1692 flow_name: ObjectName::from(vec![Ident::new("task_5")]),
1693 sink_table_name: ObjectName::from(vec![
1694 Ident::new("schema_1"),
1695 Ident::new("table_1"),
1696 ]),
1697 or_replace: false,
1698 if_not_exists: false,
1699 expire_after: None,
1700 comment: None,
1701 flow_options: string_option_map([("defer_on_missing_source", "true")]),
1702 },
1703 ),
1704 ];
1705
1706 for (sql, expected) in testcases {
1707 let create_task = parse_create_flow(sql);
1708
1709 let expected = CreateFlow {
1710 flow_name: expected.flow_name,
1711 sink_table_name: expected.sink_table_name,
1712 or_replace: expected.or_replace,
1713 if_not_exists: expected.if_not_exists,
1714 expire_after: expected.expire_after,
1715 eval_interval: None,
1716 comment: expected.comment,
1717 flow_options: expected.flow_options,
1718 query: create_task.query.clone(),
1720 };
1721
1722 assert_eq!(create_task, expected, "input sql is:\n{sql}");
1723 let show_create = create_task.to_string();
1724 let recreated = parse_create_flow(&show_create);
1725 assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1726 }
1727 }
1728
1729 #[test]
1730 fn test_parse_create_flow() {
1731 use pretty_assertions::assert_eq;
1732 fn parse_create_flow(sql: &str) -> CreateFlow {
1733 let stmts = ParserContext::create_with_dialect(
1734 sql,
1735 &GreptimeDbDialect {},
1736 ParseOptions::default(),
1737 )
1738 .unwrap();
1739 assert_eq!(1, stmts.len());
1740 match &stmts[0] {
1741 Statement::CreateFlow(c) => c.clone(),
1742 _ => panic!("{:?}", stmts[0]),
1743 }
1744 }
1745 struct CreateFlowWoutQuery {
1746 pub flow_name: ObjectName,
1748 pub sink_table_name: ObjectName,
1750 pub or_replace: bool,
1752 pub if_not_exists: bool,
1754 pub expire_after: Option<i64>,
1757 pub eval_interval: Option<i64>,
1761 pub comment: Option<String>,
1763 pub flow_options: OptionMap,
1765 }
1766
1767 let testcases = vec![
1769 (
1770 r"
1771CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1772SINK TO schema_1.table_1
1773EXPIRE AFTER INTERVAL '5 minutes'
1774COMMENT 'test comment'
1775AS
1776SELECT max(c1), min(c2) FROM schema_2.table_2;",
1777 CreateFlowWoutQuery {
1778 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1779 sink_table_name: ObjectName(vec![
1780 ObjectNamePart::Identifier(Ident::new("schema_1")),
1781 ObjectNamePart::Identifier(Ident::new("table_1")),
1782 ]),
1783 or_replace: true,
1784 if_not_exists: true,
1785 expire_after: Some(300),
1786 eval_interval: None,
1787 comment: Some("test comment".to_string()),
1788 flow_options: OptionMap::default(),
1789 },
1790 ),
1791 (
1792 r"
1793CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1794SINK TO schema_1.table_1
1795EXPIRE AFTER INTERVAL '300 s'
1796COMMENT 'test comment'
1797AS
1798SELECT max(c1), min(c2) FROM schema_2.table_2;",
1799 CreateFlowWoutQuery {
1800 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1801 sink_table_name: ObjectName(vec![
1802 ObjectNamePart::Identifier(Ident::new("schema_1")),
1803 ObjectNamePart::Identifier(Ident::new("table_1")),
1804 ]),
1805 or_replace: true,
1806 if_not_exists: true,
1807 expire_after: Some(300),
1808 eval_interval: None,
1809 comment: Some("test comment".to_string()),
1810 flow_options: OptionMap::default(),
1811 },
1812 ),
1813 (
1814 r"
1815CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1816SINK TO schema_1.table_1
1817EXPIRE AFTER '5 minutes'
1818EVAL INTERVAL '10 seconds'
1819COMMENT 'test comment'
1820AS
1821SELECT max(c1), min(c2) FROM schema_2.table_2;",
1822 CreateFlowWoutQuery {
1823 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1824 sink_table_name: ObjectName(vec![
1825 ObjectNamePart::Identifier(Ident::new("schema_1")),
1826 ObjectNamePart::Identifier(Ident::new("table_1")),
1827 ]),
1828 or_replace: true,
1829 if_not_exists: true,
1830 expire_after: Some(300),
1831 eval_interval: Some(10),
1832 comment: Some("test comment".to_string()),
1833 flow_options: OptionMap::default(),
1834 },
1835 ),
1836 (
1837 r"
1838CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1839SINK TO schema_1.table_1
1840EXPIRE AFTER '5 minutes'
1841EVAL INTERVAL INTERVAL '10 seconds'
1842COMMENT 'test comment'
1843AS
1844SELECT max(c1), min(c2) FROM schema_2.table_2;",
1845 CreateFlowWoutQuery {
1846 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1847 sink_table_name: ObjectName(vec![
1848 ObjectNamePart::Identifier(Ident::new("schema_1")),
1849 ObjectNamePart::Identifier(Ident::new("table_1")),
1850 ]),
1851 or_replace: true,
1852 if_not_exists: true,
1853 expire_after: Some(300),
1854 eval_interval: Some(10),
1855 comment: Some("test comment".to_string()),
1856 flow_options: OptionMap::default(),
1857 },
1858 ),
1859 (
1860 r"
1861CREATE FLOW `task_2`
1862SINK TO schema_1.table_1
1863EXPIRE AFTER '2 days 1h 2 min'
1864AS
1865SELECT max(c1), min(c2) FROM schema_2.table_2;",
1866 CreateFlowWoutQuery {
1867 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::with_quote(
1868 '`', "task_2",
1869 ))]),
1870 sink_table_name: ObjectName(vec![
1871 ObjectNamePart::Identifier(Ident::new("schema_1")),
1872 ObjectNamePart::Identifier(Ident::new("table_1")),
1873 ]),
1874 or_replace: false,
1875 if_not_exists: false,
1876 expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1877 eval_interval: None,
1878 comment: None,
1879 flow_options: OptionMap::default(),
1880 },
1881 ),
1882 (
1883 r"
1884CREATE FLOW task_3
1885SINK TO schema_1.table_1
1886EVAL INTERVAL '10 seconds'
1887WITH (defer_on_missing_source = 'true', foo = 'bar')
1888AS
1889SELECT max(c1), min(c2) FROM schema_2.table_2;",
1890 CreateFlowWoutQuery {
1891 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_3"))]),
1892 sink_table_name: ObjectName(vec![
1893 ObjectNamePart::Identifier(Ident::new("schema_1")),
1894 ObjectNamePart::Identifier(Ident::new("table_1")),
1895 ]),
1896 or_replace: false,
1897 if_not_exists: false,
1898 expire_after: None,
1899 eval_interval: Some(10),
1900 comment: None,
1901 flow_options: string_option_map([
1902 ("defer_on_missing_source", "true"),
1903 ("foo", "bar"),
1904 ]),
1905 },
1906 ),
1907 ];
1908
1909 for (sql, expected) in testcases {
1910 let create_task = parse_create_flow(sql);
1911
1912 let expected = CreateFlow {
1913 flow_name: expected.flow_name,
1914 sink_table_name: expected.sink_table_name,
1915 or_replace: expected.or_replace,
1916 if_not_exists: expected.if_not_exists,
1917 expire_after: expected.expire_after,
1918 eval_interval: expected.eval_interval,
1919 comment: expected.comment,
1920 flow_options: expected.flow_options,
1921 query: create_task.query.clone(),
1923 };
1924
1925 assert_eq!(create_task, expected, "input sql is:\n{sql}");
1926 let show_create = create_task.to_string();
1927 let recreated = parse_create_flow(&show_create);
1928 assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1929 }
1930 }
1931
1932 #[test]
1933 fn test_parse_create_flow_with_tql_cte_query() {
1934 let sql = r#"
1935CREATE FLOW calc_reqs_cte
1936SINK TO cnt_reqs_cte
1937EVAL INTERVAL '1m'
1938AS
1939WITH tql(the_timestamp, the_value) AS (
1940 TQL EVAL (now() - '1m'::interval, now(), '5s') metric
1941)
1942SELECT * FROM tql;
1943"#;
1944
1945 let stmts =
1946 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1947 .unwrap();
1948 assert_eq!(1, stmts.len());
1949 let Statement::CreateFlow(create_flow) = &stmts[0] else {
1950 panic!("unexpected stmt: {:?}", stmts[0]);
1951 };
1952
1953 let query = create_flow.query.to_string();
1954 assert!(query.to_uppercase().contains("WITH"));
1955 assert!(query.to_uppercase().contains("TQL EVAL"));
1956 }
1957
1958 #[test]
1959 fn test_parse_create_flow_with_sql_cte_is_unsupported() {
1960 let sql = r#"
1961CREATE FLOW f
1962SINK TO s
1963AS
1964WITH cte AS (SELECT 1) SELECT * FROM cte;
1965"#;
1966
1967 let err =
1968 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1969 .unwrap_err();
1970 let msg = err.to_string();
1971 assert!(msg.to_uppercase().contains("WITH"), "err: {msg}");
1972 }
1973
1974 #[test]
1975 fn test_parse_create_flow_with_tql_cte_requires_now_expr() {
1976 let sql = r#"
1977CREATE FLOW f
1978SINK TO s
1979EVAL INTERVAL '1m'
1980AS
1981WITH tql(ts, val) AS (
1982 TQL EVAL (0, 15, '5s') metric
1983)
1984SELECT * FROM tql;
1985"#;
1986
1987 let err =
1988 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1989 .unwrap_err();
1990
1991 let msg = format!("{err:?}");
1992 assert!(
1993 msg.contains("Expected expression containing `now()`"),
1994 "unexpected err: {msg}"
1995 );
1996 }
1997
1998 #[test]
1999 fn test_parse_create_flow_with_tql_cte_non_select_star_is_unsupported() {
2000 let sql = r#"
2001CREATE FLOW f
2002SINK TO s
2003AS
2004WITH tql(ts, val) AS (
2005 TQL EVAL (now() - '1m'::interval, now(), '5s') metric
2006)
2007SELECT ts FROM tql;
2008"#;
2009
2010 let err =
2011 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2012 .unwrap_err();
2013 assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
2014 }
2015
2016 #[test]
2017 fn test_parse_create_flow_with_tql_cte_filter_is_unsupported() {
2018 let sql = r#"
2019CREATE FLOW f
2020SINK TO s
2021AS
2022WITH tql(ts, val) AS (
2023 TQL EVAL (now() - '1m'::interval, now(), '5s') metric
2024)
2025SELECT * FROM tql WHERE ts > 0;
2026"#;
2027
2028 let err =
2029 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2030 .unwrap_err();
2031 assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
2032 }
2033
2034 #[test]
2035 fn test_parse_create_flow_with_mixed_sql_tql_cte_is_unsupported() {
2036 let sql = r#"
2037CREATE FLOW f
2038SINK TO s
2039AS
2040WITH s1 AS (SELECT 1),
2041 tql(ts, val) AS (TQL EVAL (now() - '1m'::interval, now(), '5s') metric)
2042SELECT * FROM tql;
2043"#;
2044
2045 let err =
2046 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2047 .unwrap_err();
2048 assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
2049 }
2050
2051 #[test]
2052 fn test_create_flow_no_month() {
2053 let sql = r"
2054CREATE FLOW `task_2`
2055SINK TO schema_1.table_1
2056EXPIRE AFTER '1 month 2 days 1h 2 min'
2057AS
2058SELECT max(c1), min(c2) FROM schema_2.table_2;";
2059 let stmts =
2060 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2061
2062 assert!(
2063 stmts.is_err()
2064 && stmts
2065 .unwrap_err()
2066 .to_string()
2067 .contains("Interval with months is not allowed")
2068 );
2069 }
2070
2071 #[test]
2072 fn test_validate_create() {
2073 let sql = r"
2074CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp TIME INDEX)
2075PARTITION ON COLUMNS(c, a) (
2076 a < 10,
2077 a > 10 AND a < 20,
2078 a > 20 AND c < 100,
2079 a > 20 AND c >= 100
2080)
2081ENGINE=mito";
2082 let result =
2083 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2084 let _ = result.unwrap();
2085
2086 let sql = r"
2087CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2088PARTITION ON COLUMNS(x) ()
2089ENGINE=mito";
2090 let result =
2091 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2092 assert!(
2093 result
2094 .unwrap_err()
2095 .to_string()
2096 .contains("Partition column \"x\" not defined")
2097 );
2098 }
2099
2100 #[test]
2101 fn test_parse_create_table_with_partitions() {
2102 let sql = r"
2103CREATE TABLE monitor (
2104 host_id INT,
2105 idc STRING,
2106 ts TIMESTAMP,
2107 cpu DOUBLE DEFAULT 0,
2108 memory DOUBLE,
2109 TIME INDEX (ts),
2110 PRIMARY KEY (host),
2111)
2112PARTITION ON COLUMNS(idc, host_id) (
2113 idc <= 'hz' AND host_id < 1000,
2114 idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
2115 idc > 'sh' AND host_id < 3000,
2116 idc > 'sh' AND host_id >= 3000,
2117)
2118ENGINE=mito";
2119 let result =
2120 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2121 .unwrap();
2122 assert_eq!(result.len(), 1);
2123 match &result[0] {
2124 Statement::CreateTable(c) => {
2125 assert!(c.partitions.is_some());
2126
2127 let partitions = c.partitions.as_ref().unwrap();
2128 let column_list = partitions
2129 .column_list
2130 .iter()
2131 .map(|x| &x.value)
2132 .collect::<Vec<&String>>();
2133 assert_eq!(column_list, vec!["idc", "host_id"]);
2134
2135 let exprs = &partitions.exprs;
2136
2137 assert_eq!(
2138 exprs[0],
2139 Expr::BinaryOp {
2140 left: Box::new(Expr::BinaryOp {
2141 left: Box::new(Expr::Identifier("idc".into())),
2142 op: BinaryOperator::LtEq,
2143 right: Box::new(Expr::Value(
2144 Value::SingleQuotedString("hz".to_string()).into()
2145 ))
2146 }),
2147 op: BinaryOperator::And,
2148 right: Box::new(Expr::BinaryOp {
2149 left: Box::new(Expr::Identifier("host_id".into())),
2150 op: BinaryOperator::Lt,
2151 right: Box::new(Expr::Value(
2152 Value::Number("1000".to_string(), false).into()
2153 ))
2154 })
2155 }
2156 );
2157 assert_eq!(
2158 exprs[1],
2159 Expr::BinaryOp {
2160 left: Box::new(Expr::BinaryOp {
2161 left: Box::new(Expr::BinaryOp {
2162 left: Box::new(Expr::Identifier("idc".into())),
2163 op: BinaryOperator::Gt,
2164 right: Box::new(Expr::Value(
2165 Value::SingleQuotedString("hz".to_string()).into()
2166 ))
2167 }),
2168 op: BinaryOperator::And,
2169 right: Box::new(Expr::BinaryOp {
2170 left: Box::new(Expr::Identifier("idc".into())),
2171 op: BinaryOperator::LtEq,
2172 right: Box::new(Expr::Value(
2173 Value::SingleQuotedString("sh".to_string()).into()
2174 ))
2175 })
2176 }),
2177 op: BinaryOperator::And,
2178 right: Box::new(Expr::BinaryOp {
2179 left: Box::new(Expr::Identifier("host_id".into())),
2180 op: BinaryOperator::Lt,
2181 right: Box::new(Expr::Value(
2182 Value::Number("2000".to_string(), false).into()
2183 ))
2184 })
2185 }
2186 );
2187 assert_eq!(
2188 exprs[2],
2189 Expr::BinaryOp {
2190 left: Box::new(Expr::BinaryOp {
2191 left: Box::new(Expr::Identifier("idc".into())),
2192 op: BinaryOperator::Gt,
2193 right: Box::new(Expr::Value(
2194 Value::SingleQuotedString("sh".to_string()).into()
2195 ))
2196 }),
2197 op: BinaryOperator::And,
2198 right: Box::new(Expr::BinaryOp {
2199 left: Box::new(Expr::Identifier("host_id".into())),
2200 op: BinaryOperator::Lt,
2201 right: Box::new(Expr::Value(
2202 Value::Number("3000".to_string(), false).into()
2203 ))
2204 })
2205 }
2206 );
2207 assert_eq!(
2208 exprs[3],
2209 Expr::BinaryOp {
2210 left: Box::new(Expr::BinaryOp {
2211 left: Box::new(Expr::Identifier("idc".into())),
2212 op: BinaryOperator::Gt,
2213 right: Box::new(Expr::Value(
2214 Value::SingleQuotedString("sh".to_string()).into()
2215 ))
2216 }),
2217 op: BinaryOperator::And,
2218 right: Box::new(Expr::BinaryOp {
2219 left: Box::new(Expr::Identifier("host_id".into())),
2220 op: BinaryOperator::GtEq,
2221 right: Box::new(Expr::Value(
2222 Value::Number("3000".to_string(), false).into()
2223 ))
2224 })
2225 }
2226 );
2227 }
2228 _ => unreachable!(),
2229 }
2230 }
2231
2232 #[test]
2233 fn test_parse_create_table_with_quoted_partitions() {
2234 let sql = r"
2235CREATE TABLE monitor (
2236 `host_id` INT,
2237 idc STRING,
2238 ts TIMESTAMP,
2239 cpu DOUBLE DEFAULT 0,
2240 memory DOUBLE,
2241 TIME INDEX (ts),
2242 PRIMARY KEY (host),
2243)
2244PARTITION ON COLUMNS(IdC, host_id) (
2245 idc <= 'hz' AND host_id < 1000,
2246 idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
2247 idc > 'sh' AND host_id < 3000,
2248 idc > 'sh' AND host_id >= 3000,
2249)";
2250 let result =
2251 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2252 .unwrap();
2253 assert_eq!(result.len(), 1);
2254 }
2255
2256 #[test]
2257 fn test_parse_create_table_with_timestamp_index() {
2258 let sql1 = r"
2259CREATE TABLE monitor (
2260 host_id INT,
2261 idc STRING,
2262 ts TIMESTAMP TIME INDEX,
2263 cpu DOUBLE DEFAULT 0,
2264 memory DOUBLE,
2265 PRIMARY KEY (host),
2266)
2267ENGINE=mito";
2268 let result1 = ParserContext::create_with_dialect(
2269 sql1,
2270 &GreptimeDbDialect {},
2271 ParseOptions::default(),
2272 )
2273 .unwrap();
2274
2275 if let Statement::CreateTable(c) = &result1[0] {
2276 assert_eq!(c.constraints.len(), 2);
2277 let tc = c.constraints[0].clone();
2278 match tc {
2279 TableConstraint::TimeIndex { column } => {
2280 assert_eq!(&column.value, "ts");
2281 }
2282 _ => panic!("should be time index constraint"),
2283 };
2284 } else {
2285 panic!("should be create_table statement");
2286 }
2287
2288 let sql2 = r"
2291CREATE TABLE monitor (
2292 host_id INT,
2293 idc STRING,
2294 ts TIMESTAMP NOT NULL,
2295 cpu DOUBLE DEFAULT 0,
2296 memory DOUBLE,
2297 TIME INDEX (ts),
2298 PRIMARY KEY (host),
2299)
2300ENGINE=mito";
2301 let result2 = ParserContext::create_with_dialect(
2302 sql2,
2303 &GreptimeDbDialect {},
2304 ParseOptions::default(),
2305 )
2306 .unwrap();
2307
2308 assert_eq!(result1, result2);
2309
2310 let sql3 = r"
2312CREATE TABLE monitor (
2313 host_id INT,
2314 idc STRING,
2315 ts TIMESTAMP,
2316 cpu DOUBLE DEFAULT 0,
2317 memory DOUBLE,
2318 TIME INDEX (ts),
2319 PRIMARY KEY (host),
2320)
2321ENGINE=mito";
2322
2323 let result3 = ParserContext::create_with_dialect(
2324 sql3,
2325 &GreptimeDbDialect {},
2326 ParseOptions::default(),
2327 )
2328 .unwrap();
2329
2330 assert_ne!(result1, result3);
2331
2332 let sql1 = r"
2334CREATE TABLE monitor (
2335 host_id INT,
2336 idc STRING,
2337 b bigint TIME INDEX,
2338 cpu DOUBLE DEFAULT 0,
2339 memory DOUBLE,
2340 PRIMARY KEY (host),
2341)
2342ENGINE=mito";
2343 let result1 = ParserContext::create_with_dialect(
2344 sql1,
2345 &GreptimeDbDialect {},
2346 ParseOptions::default(),
2347 );
2348
2349 assert!(
2350 result1
2351 .unwrap_err()
2352 .to_string()
2353 .contains("time index column data type should be timestamp")
2354 );
2355 }
2356
2357 #[test]
2358 fn test_parse_create_table_with_timestamp_index_not_null() {
2359 let sql = r"
2360CREATE TABLE monitor (
2361 host_id INT,
2362 idc STRING,
2363 ts TIMESTAMP TIME INDEX,
2364 cpu DOUBLE DEFAULT 0,
2365 memory DOUBLE,
2366 TIME INDEX (ts),
2367 PRIMARY KEY (host),
2368)
2369ENGINE=mito";
2370 let result =
2371 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2372 .unwrap();
2373
2374 assert_eq!(result.len(), 1);
2375 if let Statement::CreateTable(c) = &result[0] {
2376 let ts = c.columns[2].clone();
2377 assert_eq!(ts.name().to_string(), "ts");
2378 assert_eq!(ts.options()[0].option, NotNull);
2379 } else {
2380 panic!("should be create table statement");
2381 }
2382
2383 let sql1 = r"
2384CREATE TABLE monitor (
2385 host_id INT,
2386 idc STRING,
2387 ts TIMESTAMP NOT NULL TIME INDEX,
2388 cpu DOUBLE DEFAULT 0,
2389 memory DOUBLE,
2390 TIME INDEX (ts),
2391 PRIMARY KEY (host),
2392)
2393ENGINE=mito";
2394
2395 let result1 = ParserContext::create_with_dialect(
2396 sql1,
2397 &GreptimeDbDialect {},
2398 ParseOptions::default(),
2399 )
2400 .unwrap();
2401 assert_eq!(result, result1);
2402
2403 let sql2 = r"
2404CREATE TABLE monitor (
2405 host_id INT,
2406 idc STRING,
2407 ts TIMESTAMP TIME INDEX NOT NULL,
2408 cpu DOUBLE DEFAULT 0,
2409 memory DOUBLE,
2410 TIME INDEX (ts),
2411 PRIMARY KEY (host),
2412)
2413ENGINE=mito";
2414
2415 let result2 = ParserContext::create_with_dialect(
2416 sql2,
2417 &GreptimeDbDialect {},
2418 ParseOptions::default(),
2419 )
2420 .unwrap();
2421 assert_eq!(result, result2);
2422
2423 let sql3 = r"
2424CREATE TABLE monitor (
2425 host_id INT,
2426 idc STRING,
2427 ts TIMESTAMP TIME INDEX NULL NOT,
2428 cpu DOUBLE DEFAULT 0,
2429 memory DOUBLE,
2430 TIME INDEX (ts),
2431 PRIMARY KEY (host),
2432)
2433ENGINE=mito";
2434
2435 let result3 = ParserContext::create_with_dialect(
2436 sql3,
2437 &GreptimeDbDialect {},
2438 ParseOptions::default(),
2439 );
2440 assert!(result3.is_err());
2441
2442 let sql4 = r"
2443CREATE TABLE monitor (
2444 host_id INT,
2445 idc STRING,
2446 ts TIMESTAMP TIME INDEX NOT NULL NULL,
2447 cpu DOUBLE DEFAULT 0,
2448 memory DOUBLE,
2449 TIME INDEX (ts),
2450 PRIMARY KEY (host),
2451)
2452ENGINE=mito";
2453
2454 let result4 = ParserContext::create_with_dialect(
2455 sql4,
2456 &GreptimeDbDialect {},
2457 ParseOptions::default(),
2458 );
2459 assert!(result4.is_err());
2460
2461 let sql = r"
2462CREATE TABLE monitor (
2463 host_id INT,
2464 idc STRING,
2465 ts TIMESTAMP TIME INDEX DEFAULT CURRENT_TIMESTAMP,
2466 cpu DOUBLE DEFAULT 0,
2467 memory DOUBLE,
2468 TIME INDEX (ts),
2469 PRIMARY KEY (host),
2470)
2471ENGINE=mito";
2472
2473 let result =
2474 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2475 .unwrap();
2476
2477 if let Statement::CreateTable(c) = &result[0] {
2478 let tc = c.constraints[0].clone();
2479 match tc {
2480 TableConstraint::TimeIndex { column } => {
2481 assert_eq!(&column.value, "ts");
2482 }
2483 _ => panic!("should be time index constraint"),
2484 }
2485 let ts = c.columns[2].clone();
2486 assert_eq!(ts.name().to_string(), "ts");
2487 assert!(matches!(ts.options()[0].option, ColumnOption::Default(..)));
2488 assert_eq!(ts.options()[1].option, NotNull);
2489 } else {
2490 unreachable!("should be create table statement");
2491 }
2492 }
2493
2494 #[test]
2495 fn test_parse_partitions_with_error_syntax() {
2496 let sql = r"
2497CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2498PARTITION COLUMNS(c, a) (
2499 a < 10,
2500 a > 10 AND a < 20,
2501 a > 20 AND c < 100,
2502 a > 20 AND c >= 100
2503)
2504ENGINE=mito";
2505 let result =
2506 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2507 assert!(
2508 result
2509 .unwrap_err()
2510 .output_msg()
2511 .contains("sql parser error: Expected: ON, found: COLUMNS")
2512 );
2513 }
2514
2515 #[test]
2516 fn test_parse_partitions_without_rule() {
2517 let sql = r"
2518CREATE TABLE rcx ( a INT, b STRING, c INT, d TIMESTAMP TIME INDEX )
2519PARTITION ON COLUMNS(c, a) ()
2520ENGINE=mito";
2521 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2522 .unwrap();
2523 }
2524
2525 #[test]
2526 fn test_parse_partitions_unreferenced_column() {
2527 let sql = r"
2528CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2529PARTITION ON COLUMNS(c, a) (
2530 b = 'foo'
2531)
2532ENGINE=mito";
2533 let result =
2534 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2535 assert_eq!(
2536 result.unwrap_err().output_msg(),
2537 "Invalid SQL, error: Column \"b\" in rule expr is not referenced in PARTITION ON"
2538 );
2539 }
2540
2541 #[test]
2542 fn test_parse_partitions_not_binary_expr() {
2543 let sql = r"
2544CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2545PARTITION ON COLUMNS(c, a) (
2546 b
2547)
2548ENGINE=mito";
2549 let result =
2550 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2551 assert_eq!(
2552 result.unwrap_err().output_msg(),
2553 r#"Invalid SQL, error: Partition rule expr Identifier(Ident { value: "b", quote_style: None, span: Span(Location(4,5)..Location(4,6)) }) is not a binary expr"#
2554 );
2555 }
2556
2557 fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {
2558 assert_eq!(column.name.to_string(), name);
2559 assert_eq!(column.data_type.to_string(), data_type);
2560 }
2561
2562 #[test]
2563 pub fn test_parse_create_table() {
2564 let sql = r"create table demo(
2565 host string,
2566 ts timestamp,
2567 cpu float32 default 0,
2568 memory float64,
2569 TIME INDEX (ts),
2570 PRIMARY KEY(ts, host),
2571 ) engine=mito
2572 with(ttl='10s');
2573 ";
2574 let result =
2575 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2576 .unwrap();
2577 assert_eq!(1, result.len());
2578 match &result[0] {
2579 Statement::CreateTable(c) => {
2580 assert!(!c.if_not_exists);
2581 assert_eq!("demo", c.name.to_string());
2582 assert_eq!("mito", c.engine);
2583 assert_eq!(4, c.columns.len());
2584 let columns = &c.columns;
2585 assert_column_def(&columns[0].column_def, "host", "STRING");
2586 assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
2587 assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
2588 assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
2589
2590 let constraints = &c.constraints;
2591 assert_eq!(
2592 &constraints[0],
2593 &TableConstraint::TimeIndex {
2594 column: Ident::new("ts"),
2595 }
2596 );
2597 assert_eq!(
2598 &constraints[1],
2599 &TableConstraint::PrimaryKey {
2600 columns: vec![Ident::new("ts"), Ident::new("host")]
2601 }
2602 );
2603 assert_eq!(1, c.options.len());
2605 assert_eq!(
2606 [("ttl", "10s")].into_iter().collect::<HashMap<_, _>>(),
2607 c.options.to_str_map()
2608 );
2609 }
2610 _ => unreachable!(),
2611 }
2612 }
2613
2614 #[test]
2615 fn test_invalid_index_keys() {
2616 let sql = r"create table demo(
2617 host string,
2618 ts int64,
2619 cpu float64 default 0,
2620 memory float64,
2621 TIME INDEX (ts, host),
2622 PRIMARY KEY(ts, host)) engine=mito;
2623 ";
2624 let result =
2625 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2626 assert!(result.is_err());
2627 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2628 }
2629
2630 #[test]
2631 fn test_duplicated_time_index() {
2632 let sql = r"create table demo(
2633 host string,
2634 ts timestamp time index,
2635 t timestamp time index,
2636 cpu float64 default 0,
2637 memory float64,
2638 TIME INDEX (ts, host),
2639 PRIMARY KEY(ts, host)) engine=mito;
2640 ";
2641 let result =
2642 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2643 assert!(result.is_err());
2644 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2645
2646 let sql = r"create table demo(
2647 host string,
2648 ts timestamp time index,
2649 cpu float64 default 0,
2650 t timestamp,
2651 memory float64,
2652 TIME INDEX (t),
2653 PRIMARY KEY(ts, host)) engine=mito;
2654 ";
2655 let result =
2656 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2657 assert!(result.is_err());
2658 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2659 }
2660
2661 #[test]
2662 fn test_invalid_column_name() {
2663 let sql = "create table foo(user string, i timestamp time index)";
2664 let result =
2665 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2666 let err = result.unwrap_err().output_msg();
2667 assert!(err.contains("Cannot use keyword 'user' as column name"));
2668
2669 let sql = r#"
2671 create table foo("user" string, i timestamp time index)
2672 "#;
2673 let result =
2674 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2675 let _ = result.unwrap();
2676 }
2677
2678 #[test]
2679 fn test_incorrect_default_value_issue_3479() {
2680 let sql = r#"CREATE TABLE `ExcePTuRi`(
2681non TIMESTAMP(6) TIME INDEX,
2682`iUSTO` DOUBLE DEFAULT 0.047318541668048164
2683)"#;
2684 let result =
2685 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2686 .unwrap();
2687 assert_eq!(1, result.len());
2688 match &result[0] {
2689 Statement::CreateTable(c) => {
2690 assert_eq!(
2691 "`iUSTO` DOUBLE DEFAULT 0.047318541668048164",
2692 c.columns[1].to_string()
2693 );
2694 }
2695 _ => unreachable!(),
2696 }
2697 }
2698
2699 #[test]
2700 fn test_parse_create_view() {
2701 let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
2702
2703 let result =
2704 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2705 .unwrap();
2706 match &result[0] {
2707 Statement::CreateView(c) => {
2708 assert_eq!(c.to_string(), sql);
2709 assert!(!c.or_replace);
2710 assert!(!c.if_not_exists);
2711 assert_eq!("test", c.name.to_string());
2712 }
2713 _ => unreachable!(),
2714 }
2715
2716 let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS";
2717
2718 let result =
2719 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2720 .unwrap();
2721 match &result[0] {
2722 Statement::CreateView(c) => {
2723 assert_eq!(c.to_string(), sql);
2724 assert!(c.or_replace);
2725 assert!(c.if_not_exists);
2726 assert_eq!("test", c.name.to_string());
2727 }
2728 _ => unreachable!(),
2729 }
2730 }
2731
2732 #[test]
2733 fn test_parse_create_view_invalid_query() {
2734 let sql = "CREATE VIEW test AS DELETE from demo";
2735 let result =
2736 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2737 assert!(result.is_ok_and(|x| x.len() == 1));
2738 }
2739
2740 #[test]
2741 fn test_parse_create_table_fulltext_options() {
2742 let sql1 = r"
2743CREATE TABLE log (
2744 ts TIMESTAMP TIME INDEX,
2745 msg TEXT FULLTEXT INDEX,
2746)";
2747 let result1 = ParserContext::create_with_dialect(
2748 sql1,
2749 &GreptimeDbDialect {},
2750 ParseOptions::default(),
2751 )
2752 .unwrap();
2753
2754 if let Statement::CreateTable(c) = &result1[0] {
2755 c.columns.iter().for_each(|col| {
2756 if col.name().value == "msg" {
2757 assert!(
2758 col.extensions
2759 .fulltext_index_options
2760 .as_ref()
2761 .unwrap()
2762 .is_empty()
2763 );
2764 }
2765 });
2766 } else {
2767 panic!("should be create_table statement");
2768 }
2769
2770 let sql2 = r"
2771CREATE TABLE log (
2772 ts TIMESTAMP TIME INDEX,
2773 msg STRING FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false')
2774)";
2775 let result2 = ParserContext::create_with_dialect(
2776 sql2,
2777 &GreptimeDbDialect {},
2778 ParseOptions::default(),
2779 )
2780 .unwrap();
2781
2782 if let Statement::CreateTable(c) = &result2[0] {
2783 c.columns.iter().for_each(|col| {
2784 if col.name().value == "msg" {
2785 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2786 assert_eq!(options.len(), 2);
2787 assert_eq!(options.get("analyzer").unwrap(), "English");
2788 assert_eq!(options.get("case_sensitive").unwrap(), "false");
2789 }
2790 });
2791 } else {
2792 panic!("should be create_table statement");
2793 }
2794
2795 let sql3 = r"
2796CREATE TABLE log (
2797 ts TIMESTAMP TIME INDEX,
2798 msg1 TINYTEXT FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false'),
2799 msg2 CHAR(20) FULLTEXT INDEX WITH (analyzer='Chinese', case_sensitive='true')
2800)";
2801 let result3 = ParserContext::create_with_dialect(
2802 sql3,
2803 &GreptimeDbDialect {},
2804 ParseOptions::default(),
2805 )
2806 .unwrap();
2807
2808 if let Statement::CreateTable(c) = &result3[0] {
2809 c.columns.iter().for_each(|col| {
2810 if col.name().value == "msg1" {
2811 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2812 assert_eq!(options.len(), 2);
2813 assert_eq!(options.get("analyzer").unwrap(), "English");
2814 assert_eq!(options.get("case_sensitive").unwrap(), "false");
2815 } else if col.name().value == "msg2" {
2816 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2817 assert_eq!(options.len(), 2);
2818 assert_eq!(options.get("analyzer").unwrap(), "Chinese");
2819 assert_eq!(options.get("case_sensitive").unwrap(), "true");
2820 }
2821 });
2822 } else {
2823 panic!("should be create_table statement");
2824 }
2825 }
2826
2827 #[test]
2828 fn test_parse_create_table_fulltext_options_invalid_type() {
2829 let sql = r"
2830CREATE TABLE log (
2831 ts TIMESTAMP TIME INDEX,
2832 msg INT FULLTEXT INDEX,
2833)";
2834 let result =
2835 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2836 assert!(result.is_err());
2837 assert!(
2838 result
2839 .unwrap_err()
2840 .to_string()
2841 .contains("FULLTEXT index only supports string type")
2842 );
2843 }
2844
2845 #[test]
2846 fn test_parse_create_table_fulltext_options_duplicate() {
2847 let sql = r"
2848CREATE TABLE log (
2849 ts TIMESTAMP TIME INDEX,
2850 msg STRING FULLTEXT INDEX WITH (analyzer='English', analyzer='Chinese') FULLTEXT INDEX WITH (case_sensitive='false')
2851)";
2852 let result =
2853 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2854 assert!(result.is_err());
2855 assert!(
2856 result
2857 .unwrap_err()
2858 .to_string()
2859 .contains("duplicated FULLTEXT INDEX option")
2860 );
2861 }
2862
2863 #[test]
2864 fn test_parse_create_table_fulltext_options_invalid_option() {
2865 let sql = r"
2866CREATE TABLE log (
2867 ts TIMESTAMP TIME INDEX,
2868 msg STRING FULLTEXT INDEX WITH (analyzer='English', invalid_option='Chinese')
2869)";
2870 let result =
2871 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2872 assert!(result.is_err());
2873 assert!(
2874 result
2875 .unwrap_err()
2876 .to_string()
2877 .contains("invalid FULLTEXT INDEX option")
2878 );
2879 }
2880
2881 #[test]
2882 fn test_parse_create_table_skip_options() {
2883 let sql = r"
2884CREATE TABLE log (
2885 ts TIMESTAMP TIME INDEX,
2886 msg INT SKIPPING INDEX WITH (granularity='8192', type='bloom'),
2887)";
2888 let result =
2889 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2890 .unwrap();
2891
2892 if let Statement::CreateTable(c) = &result[0] {
2893 c.columns.iter().for_each(|col| {
2894 if col.name().value == "msg" {
2895 assert!(
2896 !col.extensions
2897 .skipping_index_options
2898 .as_ref()
2899 .unwrap()
2900 .is_empty()
2901 );
2902 }
2903 });
2904 } else {
2905 panic!("should be create_table statement");
2906 }
2907
2908 let sql = r"
2909 CREATE TABLE log (
2910 ts TIMESTAMP TIME INDEX,
2911 msg INT SKIPPING INDEX,
2912 )";
2913 let result =
2914 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2915 .unwrap();
2916
2917 if let Statement::CreateTable(c) = &result[0] {
2918 c.columns.iter().for_each(|col| {
2919 if col.name().value == "msg" {
2920 assert!(
2921 col.extensions
2922 .skipping_index_options
2923 .as_ref()
2924 .unwrap()
2925 .is_empty()
2926 );
2927 }
2928 });
2929 } else {
2930 panic!("should be create_table statement");
2931 }
2932 }
2933
2934 #[test]
2935 fn test_parse_create_view_with_columns() {
2936 let sql = "CREATE VIEW test () AS SELECT * FROM NUMBERS";
2937 let result =
2938 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2939 .unwrap();
2940
2941 match &result[0] {
2942 Statement::CreateView(c) => {
2943 assert_eq!(c.to_string(), "CREATE VIEW test AS SELECT * FROM NUMBERS");
2944 assert!(!c.or_replace);
2945 assert!(!c.if_not_exists);
2946 assert_eq!("test", c.name.to_string());
2947 }
2948 _ => unreachable!(),
2949 }
2950 assert_eq!(
2951 "CREATE VIEW test AS SELECT * FROM NUMBERS",
2952 result[0].to_string()
2953 );
2954
2955 let sql = "CREATE VIEW test (n1) AS SELECT * FROM NUMBERS";
2956 let result =
2957 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2958 .unwrap();
2959
2960 match &result[0] {
2961 Statement::CreateView(c) => {
2962 assert_eq!(c.to_string(), sql);
2963 assert!(!c.or_replace);
2964 assert!(!c.if_not_exists);
2965 assert_eq!("test", c.name.to_string());
2966 }
2967 _ => unreachable!(),
2968 }
2969 assert_eq!(sql, result[0].to_string());
2970
2971 let sql = "CREATE VIEW test (n1, n2) AS SELECT * FROM NUMBERS";
2972 let result =
2973 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2974 .unwrap();
2975
2976 match &result[0] {
2977 Statement::CreateView(c) => {
2978 assert_eq!(c.to_string(), sql);
2979 assert!(!c.or_replace);
2980 assert!(!c.if_not_exists);
2981 assert_eq!("test", c.name.to_string());
2982 }
2983 _ => unreachable!(),
2984 }
2985 assert_eq!(sql, result[0].to_string());
2986
2987 let sql = "CREATE VIEW test (n1 AS select * from demo";
2989 let result =
2990 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2991 assert!(result.is_err());
2992
2993 let sql = "CREATE VIEW test (n1, AS select * from demo";
2994 let result =
2995 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2996 assert!(result.is_err());
2997
2998 let sql = "CREATE VIEW test n1,n2) AS select * from demo";
2999 let result =
3000 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3001 assert!(result.is_err());
3002
3003 let sql = "CREATE VIEW test (1) AS select * from demo";
3004 let result =
3005 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3006 assert!(result.is_err());
3007
3008 let sql = "CREATE VIEW test (n1, select) AS select * from demo";
3010 let result =
3011 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3012 assert!(result.is_err());
3013 }
3014
3015 #[test]
3016 fn test_parse_column_extensions_vector() {
3017 let sql = "";
3019 let dialect = GenericDialect {};
3020 let mut tokenizer = Tokenizer::new(&dialect, sql);
3021 let tokens = tokenizer.tokenize().unwrap();
3022 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3023 let name = Ident::new("vec_col");
3024 let data_type =
3025 DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
3026 let mut extensions = ColumnExtensions::default();
3027
3028 let result =
3029 ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
3030 assert!(result.is_ok());
3031 assert!(extensions.vector_options.is_some());
3032 let vector_options = extensions.vector_options.unwrap();
3033 assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some("128"));
3034 }
3035
3036 #[test]
3037 fn test_parse_column_extensions_vector_invalid() {
3038 let sql = "";
3040 let dialect = GenericDialect {};
3041 let mut tokenizer = Tokenizer::new(&dialect, sql);
3042 let tokens = tokenizer.tokenize().unwrap();
3043 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3044 let name = Ident::new("vec_col");
3045 let data_type = DataType::Custom(vec![Ident::new("VECTOR")].into(), vec![]);
3046 let mut extensions = ColumnExtensions::default();
3047
3048 let result =
3049 ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
3050 assert!(result.is_err());
3051 }
3052
3053 #[test]
3054 fn test_parse_column_extensions_indices() {
3055 {
3057 let sql = "SKIPPING INDEX";
3058 let dialect = GenericDialect {};
3059 let mut tokenizer = Tokenizer::new(&dialect, sql);
3060 let tokens = tokenizer.tokenize().unwrap();
3061 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3062 let name = Ident::new("col");
3063 let data_type = DataType::String(None);
3064 let mut extensions = ColumnExtensions::default();
3065 let result = ParserContext::parse_column_extensions(
3066 &mut parser,
3067 &name,
3068 &data_type,
3069 &mut extensions,
3070 );
3071 assert!(result.is_ok());
3072 assert!(extensions.skipping_index_options.is_some());
3073 }
3074
3075 {
3077 let sql = "FULLTEXT INDEX WITH (analyzer = 'English', case_sensitive = 'true')";
3078 let dialect = GenericDialect {};
3079 let mut tokenizer = Tokenizer::new(&dialect, sql);
3080 let tokens = tokenizer.tokenize().unwrap();
3081 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3082 let name = Ident::new("text_col");
3083 let data_type = DataType::String(None);
3084 let mut extensions = ColumnExtensions::default();
3085 let result = ParserContext::parse_column_extensions(
3086 &mut parser,
3087 &name,
3088 &data_type,
3089 &mut extensions,
3090 );
3091 assert!(result.unwrap());
3092 assert!(extensions.fulltext_index_options.is_some());
3093 let fulltext_options = extensions.fulltext_index_options.unwrap();
3094 assert_eq!(fulltext_options.get("analyzer"), Some("English"));
3095 assert_eq!(fulltext_options.get("case_sensitive"), Some("true"));
3096 }
3097
3098 {
3100 let sql = "FULLTEXT INDEX WITH (analyzer = 'English')";
3101 let dialect = GenericDialect {};
3102 let mut tokenizer = Tokenizer::new(&dialect, sql);
3103 let tokens = tokenizer.tokenize().unwrap();
3104 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3105 let name = Ident::new("num_col");
3106 let data_type = DataType::Int(None); let mut extensions = ColumnExtensions::default();
3108 let result = ParserContext::parse_column_extensions(
3109 &mut parser,
3110 &name,
3111 &data_type,
3112 &mut extensions,
3113 );
3114 assert!(result.is_err());
3115 assert!(
3116 result
3117 .unwrap_err()
3118 .to_string()
3119 .contains("FULLTEXT index only supports string type")
3120 );
3121 }
3122
3123 {
3125 let sql = "FULLTEXT INDEX WITH (analyzer = 'Invalid', case_sensitive = 'true')";
3126 let dialect = GenericDialect {};
3127 let mut tokenizer = Tokenizer::new(&dialect, sql);
3128 let tokens = tokenizer.tokenize().unwrap();
3129 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3130 let name = Ident::new("text_col");
3131 let data_type = DataType::String(None);
3132 let mut extensions = ColumnExtensions::default();
3133 let result = ParserContext::parse_column_extensions(
3134 &mut parser,
3135 &name,
3136 &data_type,
3137 &mut extensions,
3138 );
3139 assert!(result.unwrap());
3140 }
3141
3142 {
3144 let sql = "INVERTED INDEX";
3145 let dialect = GenericDialect {};
3146 let mut tokenizer = Tokenizer::new(&dialect, sql);
3147 let tokens = tokenizer.tokenize().unwrap();
3148 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3149 let name = Ident::new("col");
3150 let data_type = DataType::String(None);
3151 let mut extensions = ColumnExtensions::default();
3152 let result = ParserContext::parse_column_extensions(
3153 &mut parser,
3154 &name,
3155 &data_type,
3156 &mut extensions,
3157 );
3158 assert!(result.is_ok());
3159 assert!(extensions.inverted_index_options.is_some());
3160 }
3161
3162 {
3164 let sql = "INVERTED INDEX WITH (analyzer = 'English')";
3165 let dialect = GenericDialect {};
3166 let mut tokenizer = Tokenizer::new(&dialect, sql);
3167 let tokens = tokenizer.tokenize().unwrap();
3168 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3169 let name = Ident::new("col");
3170 let data_type = DataType::String(None);
3171 let mut extensions = ColumnExtensions::default();
3172 let result = ParserContext::parse_column_extensions(
3173 &mut parser,
3174 &name,
3175 &data_type,
3176 &mut extensions,
3177 );
3178 assert!(result.is_err());
3179 assert!(
3180 result
3181 .unwrap_err()
3182 .to_string()
3183 .contains("INVERTED index doesn't support options")
3184 );
3185 }
3186
3187 {
3189 let sql = "SKIPPING INDEX FULLTEXT INDEX";
3190 let dialect = GenericDialect {};
3191 let mut tokenizer = Tokenizer::new(&dialect, sql);
3192 let tokens = tokenizer.tokenize().unwrap();
3193 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3194 let name = Ident::new("col");
3195 let data_type = DataType::String(None);
3196 let mut extensions = ColumnExtensions::default();
3197 let result = ParserContext::parse_column_extensions(
3198 &mut parser,
3199 &name,
3200 &data_type,
3201 &mut extensions,
3202 );
3203 assert!(result.unwrap());
3204 assert!(extensions.skipping_index_options.is_some());
3205 assert!(extensions.fulltext_index_options.is_some());
3206 }
3207 }
3208
3209 #[test]
3210 fn test_parse_interval_cast() {
3211 let s = "select '10s'::INTERVAL";
3212 let stmts =
3213 ParserContext::create_with_dialect(s, &GreptimeDbDialect {}, ParseOptions::default())
3214 .unwrap();
3215 assert_eq!("SELECT '10 seconds'::INTERVAL", &stmts[0].to_string());
3216 }
3217
3218 #[test]
3219 fn test_parse_create_table_vector_index_options() {
3220 let sql = r"
3222CREATE TABLE vectors (
3223 ts TIMESTAMP TIME INDEX,
3224 vec VECTOR(128) VECTOR INDEX,
3225)";
3226 let result =
3227 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3228 .unwrap();
3229
3230 if let Statement::CreateTable(c) = &result[0] {
3231 c.columns.iter().for_each(|col| {
3232 if col.name().value == "vec" {
3233 assert!(
3234 col.extensions
3235 .vector_index_options
3236 .as_ref()
3237 .unwrap()
3238 .is_empty()
3239 );
3240 }
3241 });
3242 } else {
3243 panic!("should be create_table statement");
3244 }
3245
3246 let sql = r"
3248CREATE TABLE vectors (
3249 ts TIMESTAMP TIME INDEX,
3250 vec VECTOR(128) VECTOR INDEX WITH (metric='cosine', connectivity='32', expansion_add='256', expansion_search='128')
3251)";
3252 let result =
3253 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3254 .unwrap();
3255
3256 if let Statement::CreateTable(c) = &result[0] {
3257 c.columns.iter().for_each(|col| {
3258 if col.name().value == "vec" {
3259 let options = col.extensions.vector_index_options.as_ref().unwrap();
3260 assert_eq!(options.len(), 4);
3261 assert_eq!(options.get("metric").unwrap(), "cosine");
3262 assert_eq!(options.get("connectivity").unwrap(), "32");
3263 assert_eq!(options.get("expansion_add").unwrap(), "256");
3264 assert_eq!(options.get("expansion_search").unwrap(), "128");
3265 }
3266 });
3267 } else {
3268 panic!("should be create_table statement");
3269 }
3270 }
3271
3272 #[test]
3273 fn test_parse_create_table_vector_index_invalid_type() {
3274 let sql = r"
3276CREATE TABLE vectors (
3277 ts TIMESTAMP TIME INDEX,
3278 col INT VECTOR INDEX,
3279)";
3280 let result =
3281 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3282 assert!(result.is_err());
3283 assert!(
3284 result
3285 .unwrap_err()
3286 .to_string()
3287 .contains("VECTOR INDEX only supports Vector type columns")
3288 );
3289 }
3290
3291 #[test]
3292 fn test_parse_create_table_vector_index_duplicate() {
3293 let sql = r"
3295CREATE TABLE vectors (
3296 ts TIMESTAMP TIME INDEX,
3297 vec VECTOR(128) VECTOR INDEX VECTOR INDEX,
3298)";
3299 let result =
3300 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3301 assert!(result.is_err());
3302 assert!(
3303 result
3304 .unwrap_err()
3305 .to_string()
3306 .contains("duplicated VECTOR INDEX option")
3307 );
3308 }
3309
3310 #[test]
3311 fn test_parse_create_table_vector_index_invalid_option() {
3312 let sql = r"
3314CREATE TABLE vectors (
3315 ts TIMESTAMP TIME INDEX,
3316 vec VECTOR(128) VECTOR INDEX WITH (metric='l2sq', invalid_option='foo')
3317)";
3318 let result =
3319 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3320 assert!(result.is_err());
3321 assert!(
3322 result
3323 .unwrap_err()
3324 .to_string()
3325 .contains("invalid VECTOR INDEX option")
3326 );
3327 }
3328
3329 #[test]
3330 fn test_parse_column_extensions_vector_index() {
3331 {
3333 let sql = "VECTOR INDEX WITH (metric = 'l2sq')";
3334 let dialect = GenericDialect {};
3335 let mut tokenizer = Tokenizer::new(&dialect, sql);
3336 let tokens = tokenizer.tokenize().unwrap();
3337 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3338 let name = Ident::new("vec_col");
3339 let data_type =
3340 DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
3341 let mut extensions = ColumnExtensions {
3343 vector_options: Some(OptionMap::from([(
3344 VECTOR_OPT_DIM.to_string(),
3345 "128".to_string(),
3346 )])),
3347 ..Default::default()
3348 };
3349
3350 let result = ParserContext::parse_column_extensions(
3351 &mut parser,
3352 &name,
3353 &data_type,
3354 &mut extensions,
3355 );
3356 assert!(result.is_ok());
3357 assert!(extensions.vector_index_options.is_some());
3358 let vi_options = extensions.vector_index_options.unwrap();
3359 assert_eq!(vi_options.get("metric"), Some("l2sq"));
3360 }
3361
3362 {
3364 let sql = "VECTOR INDEX";
3365 let dialect = GenericDialect {};
3366 let mut tokenizer = Tokenizer::new(&dialect, sql);
3367 let tokens = tokenizer.tokenize().unwrap();
3368 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3369 let name = Ident::new("num_col");
3370 let data_type = DataType::Int(None); let mut extensions = ColumnExtensions::default();
3372 let result = ParserContext::parse_column_extensions(
3373 &mut parser,
3374 &name,
3375 &data_type,
3376 &mut extensions,
3377 );
3378 assert!(result.is_err());
3379 assert!(
3380 result
3381 .unwrap_err()
3382 .to_string()
3383 .contains("VECTOR INDEX only supports Vector type columns")
3384 );
3385 }
3386 }
3387}