1#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::HashMap;
19
20use arrow_buffer::IntervalMonthDayNano;
21use common_catalog::consts::default_engine;
22use datafusion_common::ScalarValue;
23use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
24use datatypes::data_type::ConcreteDataType;
25use itertools::Itertools;
26use snafu::{OptionExt, ResultExt, ensure};
27use sqlparser::ast::{ColumnOption, ColumnOptionDef, DataType, Expr};
28use sqlparser::dialect::keywords::Keyword;
29use sqlparser::keywords::ALL_KEYWORDS;
30use sqlparser::parser::IsOptional::Mandatory;
31use sqlparser::parser::{Parser, ParserError};
32use sqlparser::tokenizer::{Token, TokenWithSpan, Word};
33use table::requests::{validate_database_option, validate_table_option};
34
35use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
36use crate::error::{
37 self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidIntervalSnafu,
38 InvalidSqlSnafu, InvalidTableOptionSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result,
39 SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu,
40};
41use crate::parser::{FLOW, ParserContext};
42use crate::parsers::tql_parser;
43use crate::parsers::utils::{
44 self, validate_column_fulltext_create_option, validate_column_skipping_index_create_option,
45};
46use crate::statements::create::{
47 Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
48 CreateTableLike, CreateView, Partitions, SqlOrTql, TableConstraint, VECTOR_OPT_DIM,
49};
50use crate::statements::statement::Statement;
51use crate::statements::transform::type_alias::get_data_type_by_alias_name;
52use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
53use crate::util::{location_to_index, parse_option_string};
54
55pub const ENGINE: &str = "ENGINE";
56pub const MAXVALUE: &str = "MAXVALUE";
57pub const SINK: &str = "SINK";
58pub const EXPIRE: &str = "EXPIRE";
59pub const AFTER: &str = "AFTER";
60pub const INVERTED: &str = "INVERTED";
61pub const SKIPPING: &str = "SKIPPING";
62
63pub type RawIntervalExpr = String;
64
65impl<'a> ParserContext<'a> {
67 pub(crate) fn parse_create(&mut self) -> Result<Statement> {
68 match self.parser.peek_token().token {
69 Token::Word(w) => match w.keyword {
70 Keyword::TABLE => self.parse_create_table(),
71
72 Keyword::SCHEMA | Keyword::DATABASE => self.parse_create_database(),
73
74 Keyword::EXTERNAL => self.parse_create_external_table(),
75
76 Keyword::OR => {
77 let _ = self.parser.next_token();
78 self.parser
79 .expect_keyword(Keyword::REPLACE)
80 .context(SyntaxSnafu)?;
81 match self.parser.next_token().token {
82 Token::Word(w) => match w.keyword {
83 Keyword::VIEW => self.parse_create_view(true),
84 Keyword::NoKeyword => {
85 let uppercase = w.value.to_uppercase();
86 match uppercase.as_str() {
87 FLOW => self.parse_create_flow(true),
88 _ => self.unsupported(w.to_string()),
89 }
90 }
91 _ => self.unsupported(w.to_string()),
92 },
93 _ => self.unsupported(w.to_string()),
94 }
95 }
96
97 Keyword::VIEW => {
98 let _ = self.parser.next_token();
99 self.parse_create_view(false)
100 }
101
102 #[cfg(feature = "enterprise")]
103 Keyword::TRIGGER => {
104 let _ = self.parser.next_token();
105 self.parse_create_trigger()
106 }
107
108 Keyword::NoKeyword => {
109 let _ = self.parser.next_token();
110 let uppercase = w.value.to_uppercase();
111 match uppercase.as_str() {
112 FLOW => self.parse_create_flow(false),
113 _ => self.unsupported(w.to_string()),
114 }
115 }
116 _ => self.unsupported(w.to_string()),
117 },
118 unexpected => self.unsupported(unexpected.to_string()),
119 }
120 }
121
122 fn parse_create_view(&mut self, or_replace: bool) -> Result<Statement> {
124 let if_not_exists = self.parse_if_not_exist()?;
125 let view_name = self.intern_parse_table_name()?;
126
127 let columns = self.parse_view_columns()?;
128
129 self.parser
130 .expect_keyword(Keyword::AS)
131 .context(SyntaxSnafu)?;
132
133 let query = self.parse_query()?;
134
135 Ok(Statement::CreateView(CreateView {
136 name: view_name,
137 columns,
138 or_replace,
139 query: Box::new(query),
140 if_not_exists,
141 }))
142 }
143
144 fn parse_view_columns(&mut self) -> Result<Vec<Ident>> {
145 let mut columns = vec![];
146 if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
147 return Ok(columns);
148 }
149
150 loop {
151 let name = self.parse_column_name().context(SyntaxSnafu)?;
152
153 columns.push(name);
154
155 let comma = self.parser.consume_token(&Token::Comma);
156 if self.parser.consume_token(&Token::RParen) {
157 break;
159 } else if !comma {
160 return self.expected("',' or ')' after column name", self.parser.peek_token());
161 }
162 }
163
164 Ok(columns)
165 }
166
167 fn parse_create_external_table(&mut self) -> Result<Statement> {
168 let _ = self.parser.next_token();
169 self.parser
170 .expect_keyword(Keyword::TABLE)
171 .context(SyntaxSnafu)?;
172 let if_not_exists = self.parse_if_not_exist()?;
173 let table_name = self.intern_parse_table_name()?;
174 let (columns, constraints) = self.parse_columns()?;
175 if !columns.is_empty() {
176 validate_time_index(&columns, &constraints)?;
177 }
178
179 let engine = self.parse_table_engine(common_catalog::consts::FILE_ENGINE)?;
180 let options = self.parse_create_table_options()?;
181 Ok(Statement::CreateExternalTable(CreateExternalTable {
182 name: table_name,
183 columns,
184 constraints,
185 options,
186 if_not_exists,
187 engine,
188 }))
189 }
190
191 fn parse_create_database(&mut self) -> Result<Statement> {
192 let _ = self.parser.next_token();
193 let if_not_exists = self.parse_if_not_exist()?;
194 let database_name = self.parse_object_name().context(error::UnexpectedSnafu {
195 expected: "a database name",
196 actual: self.peek_token_as_string(),
197 })?;
198 let database_name = Self::canonicalize_object_name(database_name);
199
200 let options = self
201 .parser
202 .parse_options(Keyword::WITH)
203 .context(SyntaxSnafu)?
204 .into_iter()
205 .map(parse_option_string)
206 .collect::<Result<HashMap<String, String>>>()?;
207
208 for key in options.keys() {
209 ensure!(
210 validate_database_option(key),
211 InvalidDatabaseOptionSnafu {
212 key: key.to_string()
213 }
214 );
215 }
216 if let Some(append_mode) = options.get("append_mode")
217 && append_mode == "true"
218 && options.contains_key("merge_mode")
219 {
220 return InvalidDatabaseOptionSnafu {
221 key: "merge_mode".to_string(),
222 }
223 .fail();
224 }
225
226 Ok(Statement::CreateDatabase(CreateDatabase {
227 name: database_name,
228 if_not_exists,
229 options: options.into(),
230 }))
231 }
232
233 fn parse_create_table(&mut self) -> Result<Statement> {
234 let _ = self.parser.next_token();
235
236 let if_not_exists = self.parse_if_not_exist()?;
237
238 let table_name = self.intern_parse_table_name()?;
239
240 if self.parser.parse_keyword(Keyword::LIKE) {
241 let source_name = self.intern_parse_table_name()?;
242
243 return Ok(Statement::CreateTableLike(CreateTableLike {
244 table_name,
245 source_name,
246 }));
247 }
248
249 let (columns, constraints) = self.parse_columns()?;
250 validate_time_index(&columns, &constraints)?;
251
252 let partitions = self.parse_partitions()?;
253 if let Some(partitions) = &partitions {
254 validate_partitions(&columns, partitions)?;
255 }
256
257 let engine = self.parse_table_engine(default_engine())?;
258 let options = self.parse_create_table_options()?;
259 let create_table = CreateTable {
260 if_not_exists,
261 name: table_name,
262 columns,
263 engine,
264 constraints,
265 options,
266 table_id: 0, partitions,
268 };
269
270 Ok(Statement::CreateTable(create_table))
271 }
272
273 fn parse_create_flow(&mut self, or_replace: bool) -> Result<Statement> {
275 let if_not_exists = self.parse_if_not_exist()?;
276
277 let flow_name = self.intern_parse_table_name()?;
278
279 if let Token::Word(word) = self.parser.peek_token().token
281 && word.value.eq_ignore_ascii_case(SINK)
282 {
283 self.parser.next_token();
284 } else {
285 Err(ParserError::ParserError(
286 "Expect `SINK` keyword".to_string(),
287 ))
288 .context(SyntaxSnafu)?
289 }
290 self.parser
291 .expect_keyword(Keyword::TO)
292 .context(SyntaxSnafu)?;
293
294 let output_table_name = self.intern_parse_table_name()?;
295
296 let expire_after = if self
297 .parser
298 .consume_tokens(&[Token::make_keyword(EXPIRE), Token::make_keyword(AFTER)])
299 {
300 Some(self.parse_interval_no_month("EXPIRE AFTER")?)
301 } else {
302 None
303 };
304
305 let eval_interval = if self
306 .parser
307 .consume_tokens(&[Token::make_keyword("EVAL"), Token::make_keyword("INTERVAL")])
308 {
309 Some(self.parse_interval_no_month("EVAL INTERVAL")?)
310 } else {
311 None
312 };
313
314 let comment = if self.parser.parse_keyword(Keyword::COMMENT) {
315 match self.parser.next_token() {
316 TokenWithSpan {
317 token: Token::SingleQuotedString(value, ..),
318 ..
319 } => Some(value),
320 unexpected => {
321 return self
322 .parser
323 .expected("string", unexpected)
324 .context(SyntaxSnafu);
325 }
326 }
327 } else {
328 None
329 };
330
331 self.parser
332 .expect_keyword(Keyword::AS)
333 .context(SyntaxSnafu)?;
334
335 let query = Box::new(self.parse_sql_or_tql(true)?);
336
337 Ok(Statement::CreateFlow(CreateFlow {
338 flow_name,
339 sink_table_name: output_table_name,
340 or_replace,
341 if_not_exists,
342 expire_after,
343 eval_interval,
344 comment,
345 query,
346 }))
347 }
348
349 fn parse_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
350 let start_loc = self.parser.peek_token().span.start;
351 let start_index = location_to_index(self.sql, &start_loc);
352
353 let query = match self.parser.peek_token().token {
355 Token::Word(w) => match w.keyword {
356 Keyword::SELECT => self.parse_query(),
357 Keyword::NoKeyword
358 if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
359 {
360 self.parse_tql(require_now_expr)
361 }
362
363 _ => self.unsupported(self.peek_token_as_string()),
364 },
365 _ => self.unsupported(self.peek_token_as_string()),
366 }?;
367
368 let end_token = self.parser.peek_token();
369
370 let raw_query = if end_token == Token::EOF {
371 &self.sql[start_index..]
372 } else {
373 let end_loc = end_token.span.end;
374 let end_index = location_to_index(self.sql, &end_loc);
375 &self.sql[start_index..end_index.min(self.sql.len())]
376 };
377 let raw_query = raw_query.trim_end_matches(";");
378 let query = SqlOrTql::try_from_statement(query, raw_query)?;
379 Ok(query)
380 }
381
382 fn parse_interval_no_month(&mut self, context: &str) -> Result<i64> {
384 let interval = self.parse_interval_month_day_nano()?.0;
385 if interval.months != 0 {
386 return InvalidIntervalSnafu {
387 reason: format!("Interval with months is not allowed in {context}"),
388 }
389 .fail();
390 }
391 Ok(
392 interval.nanoseconds / 1_000_000_000
393 + interval.days as i64 * 60 * 60 * 24
394 + interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, )
398 }
399
400 fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> {
402 let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
403 let raw_interval_expr = interval_expr.to_string();
404 let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone(), false)?
405 .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
406 .ok()
407 .with_context(|| InvalidIntervalSnafu {
408 reason: format!("cannot cast {} to interval type", interval_expr),
409 })?;
410 if let ScalarValue::IntervalMonthDayNano(Some(interval)) = interval {
411 Ok((interval, raw_interval_expr))
412 } else {
413 unreachable!()
414 }
415 }
416
417 fn parse_if_not_exist(&mut self) -> Result<bool> {
418 match self.parser.peek_token().token {
419 Token::Word(w) if Keyword::IF != w.keyword => return Ok(false),
420 _ => {}
421 }
422
423 if self.parser.parse_keywords(&[Keyword::IF, Keyword::NOT]) {
424 return self
425 .parser
426 .expect_keyword(Keyword::EXISTS)
427 .map(|_| true)
428 .context(UnexpectedSnafu {
429 expected: "EXISTS",
430 actual: self.peek_token_as_string(),
431 });
432 }
433
434 if self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]) {
435 return UnsupportedSnafu { keyword: "EXISTS" }.fail();
436 }
437
438 Ok(false)
439 }
440
441 fn parse_create_table_options(&mut self) -> Result<OptionMap> {
442 let options = self
443 .parser
444 .parse_options(Keyword::WITH)
445 .context(SyntaxSnafu)?
446 .into_iter()
447 .map(parse_option_string)
448 .collect::<Result<HashMap<String, String>>>()?;
449 for key in options.keys() {
450 ensure!(validate_table_option(key), InvalidTableOptionSnafu { key });
451 }
452 Ok(options.into())
453 }
454
455 fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
457 if !self.parser.parse_keyword(Keyword::PARTITION) {
458 return Ok(None);
459 }
460 self.parser
461 .expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
462 .context(error::UnexpectedSnafu {
463 expected: "ON, COLUMNS",
464 actual: self.peek_token_as_string(),
465 })?;
466
467 let raw_column_list = self
468 .parser
469 .parse_parenthesized_column_list(Mandatory, false)
470 .context(error::SyntaxSnafu)?;
471 let column_list = raw_column_list
472 .into_iter()
473 .map(Self::canonicalize_identifier)
474 .collect();
475
476 let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
477
478 Ok(Some(Partitions { column_list, exprs }))
479 }
480
481 fn parse_partition_entry(&mut self) -> Result<Expr> {
482 self.parser.parse_expr().context(error::SyntaxSnafu)
483 }
484
485 fn parse_comma_separated<T, F>(&mut self, mut f: F) -> Result<Vec<T>>
487 where
488 F: FnMut(&mut ParserContext<'a>) -> Result<T>,
489 {
490 self.parser
491 .expect_token(&Token::LParen)
492 .context(error::UnexpectedSnafu {
493 expected: "(",
494 actual: self.peek_token_as_string(),
495 })?;
496
497 let mut values = vec![];
498 while self.parser.peek_token() != Token::RParen {
499 values.push(f(self)?);
500 if !self.parser.consume_token(&Token::Comma) {
501 break;
502 }
503 }
504
505 self.parser
506 .expect_token(&Token::RParen)
507 .context(error::UnexpectedSnafu {
508 expected: ")",
509 actual: self.peek_token_as_string(),
510 })?;
511
512 Ok(values)
513 }
514
515 fn parse_columns(&mut self) -> Result<(Vec<Column>, Vec<TableConstraint>)> {
517 let mut columns = vec![];
518 let mut constraints = vec![];
519 if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
520 return Ok((columns, constraints));
521 }
522
523 loop {
524 if let Some(constraint) = self.parse_optional_table_constraint()? {
525 constraints.push(constraint);
526 } else if let Token::Word(_) = self.parser.peek_token().token {
527 self.parse_column(&mut columns, &mut constraints)?;
528 } else {
529 return self.expected(
530 "column name or constraint definition",
531 self.parser.peek_token(),
532 );
533 }
534 let comma = self.parser.consume_token(&Token::Comma);
535 if self.parser.consume_token(&Token::RParen) {
536 break;
538 } else if !comma {
539 return self.expected(
540 "',' or ')' after column definition",
541 self.parser.peek_token(),
542 );
543 }
544 }
545
546 Ok((columns, constraints))
547 }
548
549 fn parse_column(
550 &mut self,
551 columns: &mut Vec<Column>,
552 constraints: &mut Vec<TableConstraint>,
553 ) -> Result<()> {
554 let mut column = self.parse_column_def()?;
555
556 let mut time_index_opt_idx = None;
557 for (index, opt) in column.options().iter().enumerate() {
558 if let ColumnOption::DialectSpecific(tokens) = &opt.option
559 && matches!(
560 &tokens[..],
561 [
562 Token::Word(Word {
563 keyword: Keyword::TIME,
564 ..
565 }),
566 Token::Word(Word {
567 keyword: Keyword::INDEX,
568 ..
569 })
570 ]
571 )
572 {
573 ensure!(
574 time_index_opt_idx.is_none(),
575 InvalidColumnOptionSnafu {
576 name: column.name().to_string(),
577 msg: "duplicated time index",
578 }
579 );
580 time_index_opt_idx = Some(index);
581
582 let constraint = TableConstraint::TimeIndex {
583 column: Ident::new(column.name().value.clone()),
584 };
585 constraints.push(constraint);
586 }
587 }
588
589 if let Some(index) = time_index_opt_idx {
590 ensure!(
591 !column.options().contains(&ColumnOptionDef {
592 option: ColumnOption::Null,
593 name: None,
594 }),
595 InvalidColumnOptionSnafu {
596 name: column.name().to_string(),
597 msg: "time index column can't be null",
598 }
599 );
600
601 let data_type = get_unalias_type(column.data_type());
603 ensure!(
604 matches!(data_type, DataType::Timestamp(_, _)),
605 InvalidColumnOptionSnafu {
606 name: column.name().to_string(),
607 msg: "time index column data type should be timestamp",
608 }
609 );
610
611 let not_null_opt = ColumnOptionDef {
612 option: ColumnOption::NotNull,
613 name: None,
614 };
615
616 if !column.options().contains(¬_null_opt) {
617 column.mut_options().push(not_null_opt);
618 }
619
620 let _ = column.mut_options().remove(index);
621 }
622
623 columns.push(column);
624
625 Ok(())
626 }
627
628 fn parse_column_name(&mut self) -> std::result::Result<Ident, ParserError> {
630 let name = self.parser.parse_identifier()?;
631 if name.quote_style.is_none() &&
632 ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()
634 {
635 return Err(ParserError::ParserError(format!(
636 "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
637 &name.value
638 )));
639 }
640
641 Ok(name)
642 }
643
644 pub fn parse_column_def(&mut self) -> Result<Column> {
645 let name = self.parse_column_name().context(SyntaxSnafu)?;
646 let parser = &mut self.parser;
647
648 ensure!(
649 !(name.quote_style.is_none() &&
650 ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()),
652 InvalidSqlSnafu {
653 msg: format!(
654 "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
655 &name.value
656 ),
657 }
658 );
659
660 let data_type = parser.parse_data_type().context(SyntaxSnafu)?;
661 let mut options = vec![];
662 let mut extensions = ColumnExtensions::default();
663 loop {
664 if parser.parse_keyword(Keyword::CONSTRAINT) {
665 let name = Some(parser.parse_identifier().context(SyntaxSnafu)?);
666 if let Some(option) = Self::parse_optional_column_option(parser)? {
667 options.push(ColumnOptionDef { name, option });
668 } else {
669 return parser
670 .expected(
671 "constraint details after CONSTRAINT <name>",
672 parser.peek_token(),
673 )
674 .context(SyntaxSnafu);
675 }
676 } else if let Some(option) = Self::parse_optional_column_option(parser)? {
677 options.push(ColumnOptionDef { name: None, option });
678 } else if !Self::parse_column_extensions(parser, &name, &data_type, &mut extensions)? {
679 break;
680 };
681 }
682
683 Ok(Column {
684 column_def: ColumnDef {
685 name: Self::canonicalize_identifier(name),
686 data_type,
687 options,
688 },
689 extensions,
690 })
691 }
692
693 fn parse_optional_column_option(parser: &mut Parser<'_>) -> Result<Option<ColumnOption>> {
694 if parser.parse_keywords(&[Keyword::CHARACTER, Keyword::SET]) {
695 Ok(Some(ColumnOption::CharacterSet(
696 parser.parse_object_name(false).context(SyntaxSnafu)?,
697 )))
698 } else if parser.parse_keywords(&[Keyword::NOT, Keyword::NULL]) {
699 Ok(Some(ColumnOption::NotNull))
700 } else if parser.parse_keywords(&[Keyword::COMMENT]) {
701 match parser.next_token() {
702 TokenWithSpan {
703 token: Token::SingleQuotedString(value, ..),
704 ..
705 } => Ok(Some(ColumnOption::Comment(value))),
706 unexpected => parser.expected("string", unexpected).context(SyntaxSnafu),
707 }
708 } else if parser.parse_keyword(Keyword::NULL) {
709 Ok(Some(ColumnOption::Null))
710 } else if parser.parse_keyword(Keyword::DEFAULT) {
711 Ok(Some(ColumnOption::Default(
712 parser.parse_expr().context(SyntaxSnafu)?,
713 )))
714 } else if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
715 Ok(Some(ColumnOption::Unique {
716 is_primary: true,
717 characteristics: None,
718 }))
719 } else if parser.parse_keyword(Keyword::UNIQUE) {
720 Ok(Some(ColumnOption::Unique {
721 is_primary: false,
722 characteristics: None,
723 }))
724 } else if parser.parse_keywords(&[Keyword::TIME, Keyword::INDEX]) {
725 Ok(Some(ColumnOption::DialectSpecific(vec![
727 Token::Word(Word {
728 value: "TIME".to_string(),
729 quote_style: None,
730 keyword: Keyword::TIME,
731 }),
732 Token::Word(Word {
733 value: "INDEX".to_string(),
734 quote_style: None,
735 keyword: Keyword::INDEX,
736 }),
737 ])))
738 } else {
739 Ok(None)
740 }
741 }
742
743 fn parse_column_extensions(
749 parser: &mut Parser<'_>,
750 column_name: &Ident,
751 column_type: &DataType,
752 column_extensions: &mut ColumnExtensions,
753 ) -> Result<bool> {
754 if let DataType::Custom(name, tokens) = column_type
755 && name.0.len() == 1
756 && &name.0[0].to_string_unquoted().to_uppercase() == "VECTOR"
757 {
758 ensure!(
759 tokens.len() == 1,
760 InvalidColumnOptionSnafu {
761 name: column_name.to_string(),
762 msg: "VECTOR type should have dimension",
763 }
764 );
765
766 let dimension =
767 tokens[0]
768 .parse::<u32>()
769 .ok()
770 .with_context(|| InvalidColumnOptionSnafu {
771 name: column_name.to_string(),
772 msg: "dimension should be a positive integer",
773 })?;
774
775 let options = OptionMap::from([(VECTOR_OPT_DIM.to_string(), dimension.to_string())]);
776 column_extensions.vector_options = Some(options);
777 }
778
779 let mut is_index_declared = false;
781
782 if let Token::Word(word) = parser.peek_token().token
784 && word.value.eq_ignore_ascii_case(SKIPPING)
785 {
786 parser.next_token();
787 ensure!(
789 parser.parse_keyword(Keyword::INDEX),
790 InvalidColumnOptionSnafu {
791 name: column_name.to_string(),
792 msg: "expect INDEX after SKIPPING keyword",
793 }
794 );
795 ensure!(
796 column_extensions.skipping_index_options.is_none(),
797 InvalidColumnOptionSnafu {
798 name: column_name.to_string(),
799 msg: "duplicated SKIPPING index option",
800 }
801 );
802
803 let options = parser
804 .parse_options(Keyword::WITH)
805 .context(error::SyntaxSnafu)?
806 .into_iter()
807 .map(parse_option_string)
808 .collect::<Result<HashMap<String, String>>>()?;
809
810 for key in options.keys() {
811 ensure!(
812 validate_column_skipping_index_create_option(key),
813 InvalidColumnOptionSnafu {
814 name: column_name.to_string(),
815 msg: format!("invalid SKIPPING INDEX option: {key}"),
816 }
817 );
818 }
819
820 column_extensions.skipping_index_options = Some(options.into());
821 is_index_declared |= true;
822 }
823
824 if parser.parse_keyword(Keyword::FULLTEXT) {
826 ensure!(
828 parser.parse_keyword(Keyword::INDEX),
829 InvalidColumnOptionSnafu {
830 name: column_name.to_string(),
831 msg: "expect INDEX after FULLTEXT keyword",
832 }
833 );
834
835 ensure!(
836 column_extensions.fulltext_index_options.is_none(),
837 InvalidColumnOptionSnafu {
838 name: column_name.to_string(),
839 msg: "duplicated FULLTEXT INDEX option",
840 }
841 );
842
843 let column_type = get_unalias_type(column_type);
844 let data_type = sql_data_type_to_concrete_data_type(&column_type)?;
845 ensure!(
846 data_type == ConcreteDataType::string_datatype(),
847 InvalidColumnOptionSnafu {
848 name: column_name.to_string(),
849 msg: "FULLTEXT index only supports string type",
850 }
851 );
852
853 let options = parser
854 .parse_options(Keyword::WITH)
855 .context(error::SyntaxSnafu)?
856 .into_iter()
857 .map(parse_option_string)
858 .collect::<Result<HashMap<String, String>>>()?;
859
860 for key in options.keys() {
861 ensure!(
862 validate_column_fulltext_create_option(key),
863 InvalidColumnOptionSnafu {
864 name: column_name.to_string(),
865 msg: format!("invalid FULLTEXT INDEX option: {key}"),
866 }
867 );
868 }
869
870 column_extensions.fulltext_index_options = Some(options.into());
871 is_index_declared |= true;
872 }
873
874 if let Token::Word(word) = parser.peek_token().token
876 && word.value.eq_ignore_ascii_case(INVERTED)
877 {
878 parser.next_token();
879 ensure!(
881 parser.parse_keyword(Keyword::INDEX),
882 InvalidColumnOptionSnafu {
883 name: column_name.to_string(),
884 msg: "expect INDEX after INVERTED keyword",
885 }
886 );
887
888 ensure!(
889 column_extensions.inverted_index_options.is_none(),
890 InvalidColumnOptionSnafu {
891 name: column_name.to_string(),
892 msg: "duplicated INVERTED index option",
893 }
894 );
895
896 let with_token = parser.peek_token();
899 ensure!(
900 with_token.token
901 != Token::Word(Word {
902 value: "WITH".to_string(),
903 keyword: Keyword::WITH,
904 quote_style: None,
905 }),
906 InvalidColumnOptionSnafu {
907 name: column_name.to_string(),
908 msg: "INVERTED index doesn't support options",
909 }
910 );
911
912 column_extensions.inverted_index_options = Some(OptionMap::default());
913 is_index_declared |= true;
914 }
915
916 Ok(is_index_declared)
917 }
918
919 fn parse_optional_table_constraint(&mut self) -> Result<Option<TableConstraint>> {
920 match self.parser.next_token() {
921 TokenWithSpan {
922 token: Token::Word(w),
923 ..
924 } if w.keyword == Keyword::PRIMARY => {
925 self.parser
926 .expect_keyword(Keyword::KEY)
927 .context(error::UnexpectedSnafu {
928 expected: "KEY",
929 actual: self.peek_token_as_string(),
930 })?;
931 let raw_columns = self
932 .parser
933 .parse_parenthesized_column_list(Mandatory, false)
934 .context(error::SyntaxSnafu)?;
935 let columns = raw_columns
936 .into_iter()
937 .map(Self::canonicalize_identifier)
938 .collect();
939 Ok(Some(TableConstraint::PrimaryKey { columns }))
940 }
941 TokenWithSpan {
942 token: Token::Word(w),
943 ..
944 } if w.keyword == Keyword::TIME => {
945 self.parser
946 .expect_keyword(Keyword::INDEX)
947 .context(error::UnexpectedSnafu {
948 expected: "INDEX",
949 actual: self.peek_token_as_string(),
950 })?;
951
952 let raw_columns = self
953 .parser
954 .parse_parenthesized_column_list(Mandatory, false)
955 .context(error::SyntaxSnafu)?;
956 let mut columns = raw_columns
957 .into_iter()
958 .map(Self::canonicalize_identifier)
959 .collect::<Vec<_>>();
960
961 ensure!(
962 columns.len() == 1,
963 InvalidTimeIndexSnafu {
964 msg: "it should contain only one column in time index",
965 }
966 );
967
968 Ok(Some(TableConstraint::TimeIndex {
969 column: columns.pop().unwrap(),
970 }))
971 }
972 _ => {
973 self.parser.prev_token();
974 Ok(None)
975 }
976 }
977 }
978
979 fn parse_table_engine(&mut self, default: &str) -> Result<String> {
981 if !self.consume_token(ENGINE) {
982 return Ok(default.to_string());
983 }
984
985 self.parser
986 .expect_token(&Token::Eq)
987 .context(error::UnexpectedSnafu {
988 expected: "=",
989 actual: self.peek_token_as_string(),
990 })?;
991
992 let token = self.parser.next_token();
993 if let Token::Word(w) = token.token {
994 Ok(w.value)
995 } else {
996 self.expected("'Engine' is missing", token)
997 }
998 }
999}
1000
1001fn validate_time_index(columns: &[Column], constraints: &[TableConstraint]) -> Result<()> {
1002 let time_index_constraints: Vec<_> = constraints
1003 .iter()
1004 .filter_map(|c| match c {
1005 TableConstraint::TimeIndex { column } => Some(column),
1006 _ => None,
1007 })
1008 .unique()
1009 .collect();
1010
1011 ensure!(!time_index_constraints.is_empty(), MissingTimeIndexSnafu);
1012 ensure!(
1013 time_index_constraints.len() == 1,
1014 InvalidTimeIndexSnafu {
1015 msg: format!(
1016 "expected only one time index constraint but actual {}",
1017 time_index_constraints.len()
1018 ),
1019 }
1020 );
1021
1022 let time_index_column_ident = &time_index_constraints[0];
1025 let time_index_column = columns
1026 .iter()
1027 .find(|c| c.name().value == *time_index_column_ident.value)
1028 .with_context(|| InvalidTimeIndexSnafu {
1029 msg: format!(
1030 "time index column {} not found in columns",
1031 time_index_column_ident
1032 ),
1033 })?;
1034
1035 let time_index_data_type = get_unalias_type(time_index_column.data_type());
1036 ensure!(
1037 matches!(time_index_data_type, DataType::Timestamp(_, _)),
1038 InvalidColumnOptionSnafu {
1039 name: time_index_column.name().to_string(),
1040 msg: "time index column data type should be timestamp",
1041 }
1042 );
1043
1044 Ok(())
1045}
1046
1047fn get_unalias_type(data_type: &DataType) -> DataType {
1048 match data_type {
1049 DataType::Custom(name, tokens) if name.0.len() == 1 && tokens.is_empty() => {
1050 if let Some(real_type) =
1051 get_data_type_by_alias_name(name.0[0].to_string_unquoted().as_str())
1052 {
1053 real_type
1054 } else {
1055 data_type.clone()
1056 }
1057 }
1058 _ => data_type.clone(),
1059 }
1060}
1061
1062fn validate_partitions(columns: &[Column], partitions: &Partitions) -> Result<()> {
1063 let partition_columns = ensure_partition_columns_defined(columns, partitions)?;
1064
1065 ensure_exprs_are_binary(&partitions.exprs, &partition_columns)?;
1066
1067 Ok(())
1068}
1069
1070fn ensure_exprs_are_binary(exprs: &[Expr], columns: &[&Column]) -> Result<()> {
1072 for expr in exprs {
1073 if let Expr::BinaryOp { left, op: _, right } = expr {
1075 ensure_one_expr(left, columns)?;
1076 ensure_one_expr(right, columns)?;
1077 } else {
1078 return error::InvalidSqlSnafu {
1079 msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1080 }
1081 .fail();
1082 }
1083 }
1084 Ok(())
1085}
1086
1087fn ensure_one_expr(expr: &Expr, columns: &[&Column]) -> Result<()> {
1091 match expr {
1092 Expr::BinaryOp { left, op: _, right } => {
1093 ensure_one_expr(left, columns)?;
1094 ensure_one_expr(right, columns)?;
1095 Ok(())
1096 }
1097 Expr::Identifier(ident) => {
1098 let column_name = &ident.value;
1099 ensure!(
1100 columns.iter().any(|c| &c.name().value == column_name),
1101 error::InvalidSqlSnafu {
1102 msg: format!(
1103 "Column {:?} in rule expr is not referenced in PARTITION ON",
1104 column_name
1105 ),
1106 }
1107 );
1108 Ok(())
1109 }
1110 Expr::Value(_) => Ok(()),
1111 Expr::UnaryOp { expr, .. } => {
1112 ensure_one_expr(expr, columns)?;
1113 Ok(())
1114 }
1115 _ => error::InvalidSqlSnafu {
1116 msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1117 }
1118 .fail(),
1119 }
1120}
1121
1122fn ensure_partition_columns_defined<'a>(
1124 columns: &'a [Column],
1125 partitions: &'a Partitions,
1126) -> Result<Vec<&'a Column>> {
1127 partitions
1128 .column_list
1129 .iter()
1130 .map(|x| {
1131 let x = ParserContext::canonicalize_identifier(x.clone());
1132 columns
1135 .iter()
1136 .find(|c| *c.name().value == x.value)
1137 .context(error::InvalidSqlSnafu {
1138 msg: format!("Partition column {:?} not defined", x.value),
1139 })
1140 })
1141 .collect::<Result<Vec<&Column>>>()
1142}
1143
1144#[cfg(test)]
1145mod tests {
1146 use std::assert_matches::assert_matches;
1147 use std::collections::HashMap;
1148
1149 use common_catalog::consts::FILE_ENGINE;
1150 use common_error::ext::ErrorExt;
1151 use sqlparser::ast::ColumnOption::NotNull;
1152 use sqlparser::ast::{BinaryOperator, Expr, ObjectName, ObjectNamePart, Value};
1153 use sqlparser::dialect::GenericDialect;
1154 use sqlparser::tokenizer::Tokenizer;
1155
1156 use super::*;
1157 use crate::dialect::GreptimeDbDialect;
1158 use crate::parser::ParseOptions;
1159
1160 #[test]
1161 fn test_parse_create_table_like() {
1162 let sql = "CREATE TABLE t1 LIKE t2";
1163 let stmts =
1164 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1165 .unwrap();
1166
1167 assert_eq!(1, stmts.len());
1168 match &stmts[0] {
1169 Statement::CreateTableLike(c) => {
1170 assert_eq!(c.table_name.to_string(), "t1");
1171 assert_eq!(c.source_name.to_string(), "t2");
1172 }
1173 _ => unreachable!(),
1174 }
1175 }
1176
1177 #[test]
1178 fn test_validate_external_table_options() {
1179 let sql = "CREATE EXTERNAL TABLE city (
1180 host string,
1181 ts timestamp,
1182 cpu float64 default 0,
1183 memory float64,
1184 TIME INDEX (ts),
1185 PRIMARY KEY(ts, host)
1186 ) with(location='/var/data/city.csv',format='csv',foo='bar');";
1187
1188 let result =
1189 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1190 assert!(matches!(
1191 result,
1192 Err(error::Error::InvalidTableOption { .. })
1193 ));
1194 }
1195
1196 #[test]
1197 fn test_parse_create_external_table() {
1198 struct Test<'a> {
1199 sql: &'a str,
1200 expected_table_name: &'a str,
1201 expected_options: HashMap<String, String>,
1202 expected_engine: &'a str,
1203 expected_if_not_exist: bool,
1204 }
1205
1206 let tests = [
1207 Test {
1208 sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
1209 expected_table_name: "city",
1210 expected_options: HashMap::from([
1211 ("location".to_string(), "/var/data/city.csv".to_string()),
1212 ("format".to_string(), "csv".to_string()),
1213 ]),
1214 expected_engine: FILE_ENGINE,
1215 expected_if_not_exist: false,
1216 },
1217 Test {
1218 sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');",
1219 expected_table_name: "city",
1220 expected_options: HashMap::from([
1221 ("location".to_string(), "/var/data/city.csv".to_string()),
1222 ("format".to_string(), "csv".to_string()),
1223 ]),
1224 expected_engine: "foo",
1225 expected_if_not_exist: true,
1226 },
1227 Test {
1228 sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv','compaction.type'='bar');",
1229 expected_table_name: "city",
1230 expected_options: HashMap::from([
1231 ("location".to_string(), "/var/data/city.csv".to_string()),
1232 ("format".to_string(), "csv".to_string()),
1233 ("compaction.type".to_string(), "bar".to_string()),
1234 ]),
1235 expected_engine: "foo",
1236 expected_if_not_exist: true,
1237 },
1238 ];
1239
1240 for test in tests {
1241 let stmts = ParserContext::create_with_dialect(
1242 test.sql,
1243 &GreptimeDbDialect {},
1244 ParseOptions::default(),
1245 )
1246 .unwrap();
1247 assert_eq!(1, stmts.len());
1248 match &stmts[0] {
1249 Statement::CreateExternalTable(c) => {
1250 assert_eq!(c.name.to_string(), test.expected_table_name.to_string());
1251 assert_eq!(c.options, test.expected_options.into());
1252 assert_eq!(c.if_not_exists, test.expected_if_not_exist);
1253 assert_eq!(c.engine, test.expected_engine);
1254 }
1255 _ => unreachable!(),
1256 }
1257 }
1258 }
1259
1260 #[test]
1261 fn test_parse_create_external_table_with_schema() {
1262 let sql = "CREATE EXTERNAL TABLE city (
1263 host string,
1264 ts timestamp,
1265 cpu float32 default 0,
1266 memory float64,
1267 TIME INDEX (ts),
1268 PRIMARY KEY(ts, host),
1269 ) with(location='/var/data/city.csv',format='csv');";
1270
1271 let options = HashMap::from([
1272 ("location".to_string(), "/var/data/city.csv".to_string()),
1273 ("format".to_string(), "csv".to_string()),
1274 ]);
1275
1276 let stmts =
1277 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1278 .unwrap();
1279 assert_eq!(1, stmts.len());
1280 match &stmts[0] {
1281 Statement::CreateExternalTable(c) => {
1282 assert_eq!(c.name.to_string(), "city");
1283 assert_eq!(c.options, options.into());
1284
1285 let columns = &c.columns;
1286 assert_column_def(&columns[0].column_def, "host", "STRING");
1287 assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
1288 assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
1289 assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
1290
1291 let constraints = &c.constraints;
1292 assert_eq!(
1293 &constraints[0],
1294 &TableConstraint::TimeIndex {
1295 column: Ident::new("ts"),
1296 }
1297 );
1298 assert_eq!(
1299 &constraints[1],
1300 &TableConstraint::PrimaryKey {
1301 columns: vec![Ident::new("ts"), Ident::new("host")]
1302 }
1303 );
1304 }
1305 _ => unreachable!(),
1306 }
1307 }
1308
1309 #[test]
1310 fn test_parse_create_database() {
1311 let sql = "create database";
1312 let result =
1313 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1314 assert!(
1315 result
1316 .unwrap_err()
1317 .to_string()
1318 .contains("Unexpected token while parsing SQL statement")
1319 );
1320
1321 let sql = "create database prometheus";
1322 let stmts =
1323 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1324 .unwrap();
1325
1326 assert_eq!(1, stmts.len());
1327 match &stmts[0] {
1328 Statement::CreateDatabase(c) => {
1329 assert_eq!(c.name.to_string(), "prometheus");
1330 assert!(!c.if_not_exists);
1331 }
1332 _ => unreachable!(),
1333 }
1334
1335 let sql = "create database if not exists prometheus";
1336 let stmts =
1337 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1338 .unwrap();
1339
1340 assert_eq!(1, stmts.len());
1341 match &stmts[0] {
1342 Statement::CreateDatabase(c) => {
1343 assert_eq!(c.name.to_string(), "prometheus");
1344 assert!(c.if_not_exists);
1345 }
1346 _ => unreachable!(),
1347 }
1348
1349 let sql = "CREATE DATABASE `fOo`";
1350 let result =
1351 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1352 let stmts = result.unwrap();
1353 match &stmts.last().unwrap() {
1354 Statement::CreateDatabase(c) => {
1355 assert_eq!(c.name, vec![Ident::with_quote('`', "fOo")].into());
1356 assert!(!c.if_not_exists);
1357 }
1358 _ => unreachable!(),
1359 }
1360
1361 let sql = "CREATE DATABASE prometheus with (ttl='1h');";
1362 let result =
1363 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1364 let stmts = result.unwrap();
1365 match &stmts[0] {
1366 Statement::CreateDatabase(c) => {
1367 assert_eq!(c.name.to_string(), "prometheus");
1368 assert!(!c.if_not_exists);
1369 assert_eq!(c.options.get("ttl").unwrap(), "1h");
1370 }
1371 _ => unreachable!(),
1372 }
1373 }
1374
1375 #[test]
1376 fn test_parse_create_flow_more_testcases() {
1377 use pretty_assertions::assert_eq;
1378 fn parse_create_flow(sql: &str) -> CreateFlow {
1379 let stmts = ParserContext::create_with_dialect(
1380 sql,
1381 &GreptimeDbDialect {},
1382 ParseOptions::default(),
1383 )
1384 .unwrap();
1385 assert_eq!(1, stmts.len());
1386 match &stmts[0] {
1387 Statement::CreateFlow(c) => c.clone(),
1388 _ => unreachable!(),
1389 }
1390 }
1391 struct CreateFlowWoutQuery {
1392 pub flow_name: ObjectName,
1394 pub sink_table_name: ObjectName,
1396 pub or_replace: bool,
1398 pub if_not_exists: bool,
1400 pub expire_after: Option<i64>,
1403 pub comment: Option<String>,
1405 }
1406 let testcases = vec![
1407 (
1408 r"
1409CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1410SINK TO schema_1.table_1
1411EXPIRE AFTER INTERVAL '5 minutes'
1412COMMENT 'test comment'
1413AS
1414SELECT max(c1), min(c2) FROM schema_2.table_2;",
1415 CreateFlowWoutQuery {
1416 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1417 sink_table_name: ObjectName::from(vec![
1418 Ident::new("schema_1"),
1419 Ident::new("table_1"),
1420 ]),
1421 or_replace: true,
1422 if_not_exists: true,
1423 expire_after: Some(300),
1424 comment: Some("test comment".to_string()),
1425 },
1426 ),
1427 (
1428 r"
1429CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1430SINK TO schema_1.table_1
1431EXPIRE AFTER INTERVAL '300 s'
1432COMMENT 'test comment'
1433AS
1434SELECT max(c1), min(c2) FROM schema_2.table_2;",
1435 CreateFlowWoutQuery {
1436 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1437 sink_table_name: ObjectName::from(vec![
1438 Ident::new("schema_1"),
1439 Ident::new("table_1"),
1440 ]),
1441 or_replace: true,
1442 if_not_exists: true,
1443 expire_after: Some(300),
1444 comment: Some("test comment".to_string()),
1445 },
1446 ),
1447 (
1448 r"
1449CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1450SINK TO schema_1.table_1
1451EXPIRE AFTER '5 minutes'
1452COMMENT 'test comment'
1453AS
1454SELECT max(c1), min(c2) FROM schema_2.table_2;",
1455 CreateFlowWoutQuery {
1456 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1457 sink_table_name: ObjectName::from(vec![
1458 Ident::new("schema_1"),
1459 Ident::new("table_1"),
1460 ]),
1461 or_replace: true,
1462 if_not_exists: true,
1463 expire_after: Some(300),
1464 comment: Some("test comment".to_string()),
1465 },
1466 ),
1467 (
1468 r"
1469CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1470SINK TO schema_1.table_1
1471EXPIRE AFTER '300 s'
1472COMMENT 'test comment'
1473AS
1474SELECT max(c1), min(c2) FROM schema_2.table_2;",
1475 CreateFlowWoutQuery {
1476 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1477 sink_table_name: ObjectName::from(vec![
1478 Ident::new("schema_1"),
1479 Ident::new("table_1"),
1480 ]),
1481 or_replace: true,
1482 if_not_exists: true,
1483 expire_after: Some(300),
1484 comment: Some("test comment".to_string()),
1485 },
1486 ),
1487 (
1488 r"
1489CREATE FLOW `task_2`
1490SINK TO schema_1.table_1
1491EXPIRE AFTER '2 days 1h 2 min'
1492AS
1493SELECT max(c1), min(c2) FROM schema_2.table_2;",
1494 CreateFlowWoutQuery {
1495 flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_2")]),
1496 sink_table_name: ObjectName::from(vec![
1497 Ident::new("schema_1"),
1498 Ident::new("table_1"),
1499 ]),
1500 or_replace: false,
1501 if_not_exists: false,
1502 expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1503 comment: None,
1504 },
1505 ),
1506 ];
1507
1508 for (sql, expected) in testcases {
1509 let create_task = parse_create_flow(sql);
1510
1511 let expected = CreateFlow {
1512 flow_name: expected.flow_name,
1513 sink_table_name: expected.sink_table_name,
1514 or_replace: expected.or_replace,
1515 if_not_exists: expected.if_not_exists,
1516 expire_after: expected.expire_after,
1517 eval_interval: None,
1518 comment: expected.comment,
1519 query: create_task.query.clone(),
1521 };
1522
1523 assert_eq!(create_task, expected, "input sql is:\n{sql}");
1524 let show_create = create_task.to_string();
1525 let recreated = parse_create_flow(&show_create);
1526 assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1527 }
1528 }
1529
1530 #[test]
1531 fn test_parse_create_flow() {
1532 use pretty_assertions::assert_eq;
1533 fn parse_create_flow(sql: &str) -> CreateFlow {
1534 let stmts = ParserContext::create_with_dialect(
1535 sql,
1536 &GreptimeDbDialect {},
1537 ParseOptions::default(),
1538 )
1539 .unwrap();
1540 assert_eq!(1, stmts.len());
1541 match &stmts[0] {
1542 Statement::CreateFlow(c) => c.clone(),
1543 _ => panic!("{:?}", stmts[0]),
1544 }
1545 }
1546 struct CreateFlowWoutQuery {
1547 pub flow_name: ObjectName,
1549 pub sink_table_name: ObjectName,
1551 pub or_replace: bool,
1553 pub if_not_exists: bool,
1555 pub expire_after: Option<i64>,
1558 pub eval_interval: Option<i64>,
1562 pub comment: Option<String>,
1564 }
1565
1566 let testcases = vec![
1568 (
1569 r"
1570CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1571SINK TO schema_1.table_1
1572EXPIRE AFTER INTERVAL '5 minutes'
1573COMMENT 'test comment'
1574AS
1575SELECT max(c1), min(c2) FROM schema_2.table_2;",
1576 CreateFlowWoutQuery {
1577 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1578 sink_table_name: ObjectName(vec![
1579 ObjectNamePart::Identifier(Ident::new("schema_1")),
1580 ObjectNamePart::Identifier(Ident::new("table_1")),
1581 ]),
1582 or_replace: true,
1583 if_not_exists: true,
1584 expire_after: Some(300),
1585 eval_interval: None,
1586 comment: Some("test comment".to_string()),
1587 },
1588 ),
1589 (
1590 r"
1591CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1592SINK TO schema_1.table_1
1593EXPIRE AFTER INTERVAL '300 s'
1594COMMENT 'test comment'
1595AS
1596SELECT max(c1), min(c2) FROM schema_2.table_2;",
1597 CreateFlowWoutQuery {
1598 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1599 sink_table_name: ObjectName(vec![
1600 ObjectNamePart::Identifier(Ident::new("schema_1")),
1601 ObjectNamePart::Identifier(Ident::new("table_1")),
1602 ]),
1603 or_replace: true,
1604 if_not_exists: true,
1605 expire_after: Some(300),
1606 eval_interval: None,
1607 comment: Some("test comment".to_string()),
1608 },
1609 ),
1610 (
1611 r"
1612CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1613SINK TO schema_1.table_1
1614EXPIRE AFTER '5 minutes'
1615EVAL INTERVAL '10 seconds'
1616COMMENT 'test comment'
1617AS
1618SELECT max(c1), min(c2) FROM schema_2.table_2;",
1619 CreateFlowWoutQuery {
1620 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1621 sink_table_name: ObjectName(vec![
1622 ObjectNamePart::Identifier(Ident::new("schema_1")),
1623 ObjectNamePart::Identifier(Ident::new("table_1")),
1624 ]),
1625 or_replace: true,
1626 if_not_exists: true,
1627 expire_after: Some(300),
1628 eval_interval: Some(10),
1629 comment: Some("test comment".to_string()),
1630 },
1631 ),
1632 (
1633 r"
1634CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1635SINK TO schema_1.table_1
1636EXPIRE AFTER '5 minutes'
1637EVAL INTERVAL INTERVAL '10 seconds'
1638COMMENT 'test comment'
1639AS
1640SELECT max(c1), min(c2) FROM schema_2.table_2;",
1641 CreateFlowWoutQuery {
1642 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1643 sink_table_name: ObjectName(vec![
1644 ObjectNamePart::Identifier(Ident::new("schema_1")),
1645 ObjectNamePart::Identifier(Ident::new("table_1")),
1646 ]),
1647 or_replace: true,
1648 if_not_exists: true,
1649 expire_after: Some(300),
1650 eval_interval: Some(10),
1651 comment: Some("test comment".to_string()),
1652 },
1653 ),
1654 (
1655 r"
1656CREATE FLOW `task_2`
1657SINK TO schema_1.table_1
1658EXPIRE AFTER '2 days 1h 2 min'
1659AS
1660SELECT max(c1), min(c2) FROM schema_2.table_2;",
1661 CreateFlowWoutQuery {
1662 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::with_quote(
1663 '`', "task_2",
1664 ))]),
1665 sink_table_name: ObjectName(vec![
1666 ObjectNamePart::Identifier(Ident::new("schema_1")),
1667 ObjectNamePart::Identifier(Ident::new("table_1")),
1668 ]),
1669 or_replace: false,
1670 if_not_exists: false,
1671 expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1672 eval_interval: None,
1673 comment: None,
1674 },
1675 ),
1676 ];
1677
1678 for (sql, expected) in testcases {
1679 let create_task = parse_create_flow(sql);
1680
1681 let expected = CreateFlow {
1682 flow_name: expected.flow_name,
1683 sink_table_name: expected.sink_table_name,
1684 or_replace: expected.or_replace,
1685 if_not_exists: expected.if_not_exists,
1686 expire_after: expected.expire_after,
1687 eval_interval: expected.eval_interval,
1688 comment: expected.comment,
1689 query: create_task.query.clone(),
1691 };
1692
1693 assert_eq!(create_task, expected, "input sql is:\n{sql}");
1694 let show_create = create_task.to_string();
1695 let recreated = parse_create_flow(&show_create);
1696 assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1697 }
1698 }
1699
1700 #[test]
1701 fn test_create_flow_no_month() {
1702 let sql = r"
1703CREATE FLOW `task_2`
1704SINK TO schema_1.table_1
1705EXPIRE AFTER '1 month 2 days 1h 2 min'
1706AS
1707SELECT max(c1), min(c2) FROM schema_2.table_2;";
1708 let stmts =
1709 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1710
1711 assert!(
1712 stmts.is_err()
1713 && stmts
1714 .unwrap_err()
1715 .to_string()
1716 .contains("Interval with months is not allowed")
1717 );
1718 }
1719
1720 #[test]
1721 fn test_validate_create() {
1722 let sql = r"
1723CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp TIME INDEX)
1724PARTITION ON COLUMNS(c, a) (
1725 a < 10,
1726 a > 10 AND a < 20,
1727 a > 20 AND c < 100,
1728 a > 20 AND c >= 100
1729)
1730ENGINE=mito";
1731 let result =
1732 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1733 let _ = result.unwrap();
1734
1735 let sql = r"
1736CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
1737PARTITION ON COLUMNS(x) ()
1738ENGINE=mito";
1739 let result =
1740 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1741 assert!(
1742 result
1743 .unwrap_err()
1744 .to_string()
1745 .contains("Partition column \"x\" not defined")
1746 );
1747 }
1748
1749 #[test]
1750 fn test_parse_create_table_with_partitions() {
1751 let sql = r"
1752CREATE TABLE monitor (
1753 host_id INT,
1754 idc STRING,
1755 ts TIMESTAMP,
1756 cpu DOUBLE DEFAULT 0,
1757 memory DOUBLE,
1758 TIME INDEX (ts),
1759 PRIMARY KEY (host),
1760)
1761PARTITION ON COLUMNS(idc, host_id) (
1762 idc <= 'hz' AND host_id < 1000,
1763 idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
1764 idc > 'sh' AND host_id < 3000,
1765 idc > 'sh' AND host_id >= 3000,
1766)
1767ENGINE=mito";
1768 let result =
1769 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1770 .unwrap();
1771 assert_eq!(result.len(), 1);
1772 match &result[0] {
1773 Statement::CreateTable(c) => {
1774 assert!(c.partitions.is_some());
1775
1776 let partitions = c.partitions.as_ref().unwrap();
1777 let column_list = partitions
1778 .column_list
1779 .iter()
1780 .map(|x| &x.value)
1781 .collect::<Vec<&String>>();
1782 assert_eq!(column_list, vec!["idc", "host_id"]);
1783
1784 let exprs = &partitions.exprs;
1785
1786 assert_eq!(
1787 exprs[0],
1788 Expr::BinaryOp {
1789 left: Box::new(Expr::BinaryOp {
1790 left: Box::new(Expr::Identifier("idc".into())),
1791 op: BinaryOperator::LtEq,
1792 right: Box::new(Expr::Value(
1793 Value::SingleQuotedString("hz".to_string()).into()
1794 ))
1795 }),
1796 op: BinaryOperator::And,
1797 right: Box::new(Expr::BinaryOp {
1798 left: Box::new(Expr::Identifier("host_id".into())),
1799 op: BinaryOperator::Lt,
1800 right: Box::new(Expr::Value(
1801 Value::Number("1000".to_string(), false).into()
1802 ))
1803 })
1804 }
1805 );
1806 assert_eq!(
1807 exprs[1],
1808 Expr::BinaryOp {
1809 left: Box::new(Expr::BinaryOp {
1810 left: Box::new(Expr::BinaryOp {
1811 left: Box::new(Expr::Identifier("idc".into())),
1812 op: BinaryOperator::Gt,
1813 right: Box::new(Expr::Value(
1814 Value::SingleQuotedString("hz".to_string()).into()
1815 ))
1816 }),
1817 op: BinaryOperator::And,
1818 right: Box::new(Expr::BinaryOp {
1819 left: Box::new(Expr::Identifier("idc".into())),
1820 op: BinaryOperator::LtEq,
1821 right: Box::new(Expr::Value(
1822 Value::SingleQuotedString("sh".to_string()).into()
1823 ))
1824 })
1825 }),
1826 op: BinaryOperator::And,
1827 right: Box::new(Expr::BinaryOp {
1828 left: Box::new(Expr::Identifier("host_id".into())),
1829 op: BinaryOperator::Lt,
1830 right: Box::new(Expr::Value(
1831 Value::Number("2000".to_string(), false).into()
1832 ))
1833 })
1834 }
1835 );
1836 assert_eq!(
1837 exprs[2],
1838 Expr::BinaryOp {
1839 left: Box::new(Expr::BinaryOp {
1840 left: Box::new(Expr::Identifier("idc".into())),
1841 op: BinaryOperator::Gt,
1842 right: Box::new(Expr::Value(
1843 Value::SingleQuotedString("sh".to_string()).into()
1844 ))
1845 }),
1846 op: BinaryOperator::And,
1847 right: Box::new(Expr::BinaryOp {
1848 left: Box::new(Expr::Identifier("host_id".into())),
1849 op: BinaryOperator::Lt,
1850 right: Box::new(Expr::Value(
1851 Value::Number("3000".to_string(), false).into()
1852 ))
1853 })
1854 }
1855 );
1856 assert_eq!(
1857 exprs[3],
1858 Expr::BinaryOp {
1859 left: Box::new(Expr::BinaryOp {
1860 left: Box::new(Expr::Identifier("idc".into())),
1861 op: BinaryOperator::Gt,
1862 right: Box::new(Expr::Value(
1863 Value::SingleQuotedString("sh".to_string()).into()
1864 ))
1865 }),
1866 op: BinaryOperator::And,
1867 right: Box::new(Expr::BinaryOp {
1868 left: Box::new(Expr::Identifier("host_id".into())),
1869 op: BinaryOperator::GtEq,
1870 right: Box::new(Expr::Value(
1871 Value::Number("3000".to_string(), false).into()
1872 ))
1873 })
1874 }
1875 );
1876 }
1877 _ => unreachable!(),
1878 }
1879 }
1880
1881 #[test]
1882 fn test_parse_create_table_with_quoted_partitions() {
1883 let sql = r"
1884CREATE TABLE monitor (
1885 `host_id` INT,
1886 idc STRING,
1887 ts TIMESTAMP,
1888 cpu DOUBLE DEFAULT 0,
1889 memory DOUBLE,
1890 TIME INDEX (ts),
1891 PRIMARY KEY (host),
1892)
1893PARTITION ON COLUMNS(IdC, host_id) (
1894 idc <= 'hz' AND host_id < 1000,
1895 idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
1896 idc > 'sh' AND host_id < 3000,
1897 idc > 'sh' AND host_id >= 3000,
1898)";
1899 let result =
1900 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1901 .unwrap();
1902 assert_eq!(result.len(), 1);
1903 }
1904
1905 #[test]
1906 fn test_parse_create_table_with_timestamp_index() {
1907 let sql1 = r"
1908CREATE TABLE monitor (
1909 host_id INT,
1910 idc STRING,
1911 ts TIMESTAMP TIME INDEX,
1912 cpu DOUBLE DEFAULT 0,
1913 memory DOUBLE,
1914 PRIMARY KEY (host),
1915)
1916ENGINE=mito";
1917 let result1 = ParserContext::create_with_dialect(
1918 sql1,
1919 &GreptimeDbDialect {},
1920 ParseOptions::default(),
1921 )
1922 .unwrap();
1923
1924 if let Statement::CreateTable(c) = &result1[0] {
1925 assert_eq!(c.constraints.len(), 2);
1926 let tc = c.constraints[0].clone();
1927 match tc {
1928 TableConstraint::TimeIndex { column } => {
1929 assert_eq!(&column.value, "ts");
1930 }
1931 _ => panic!("should be time index constraint"),
1932 };
1933 } else {
1934 panic!("should be create_table statement");
1935 }
1936
1937 let sql2 = r"
1940CREATE TABLE monitor (
1941 host_id INT,
1942 idc STRING,
1943 ts TIMESTAMP NOT NULL,
1944 cpu DOUBLE DEFAULT 0,
1945 memory DOUBLE,
1946 TIME INDEX (ts),
1947 PRIMARY KEY (host),
1948)
1949ENGINE=mito";
1950 let result2 = ParserContext::create_with_dialect(
1951 sql2,
1952 &GreptimeDbDialect {},
1953 ParseOptions::default(),
1954 )
1955 .unwrap();
1956
1957 assert_eq!(result1, result2);
1958
1959 let sql3 = r"
1961CREATE TABLE monitor (
1962 host_id INT,
1963 idc STRING,
1964 ts TIMESTAMP,
1965 cpu DOUBLE DEFAULT 0,
1966 memory DOUBLE,
1967 TIME INDEX (ts),
1968 PRIMARY KEY (host),
1969)
1970ENGINE=mito";
1971
1972 let result3 = ParserContext::create_with_dialect(
1973 sql3,
1974 &GreptimeDbDialect {},
1975 ParseOptions::default(),
1976 )
1977 .unwrap();
1978
1979 assert_ne!(result1, result3);
1980
1981 let sql1 = r"
1983CREATE TABLE monitor (
1984 host_id INT,
1985 idc STRING,
1986 b bigint TIME INDEX,
1987 cpu DOUBLE DEFAULT 0,
1988 memory DOUBLE,
1989 PRIMARY KEY (host),
1990)
1991ENGINE=mito";
1992 let result1 = ParserContext::create_with_dialect(
1993 sql1,
1994 &GreptimeDbDialect {},
1995 ParseOptions::default(),
1996 );
1997
1998 assert!(
1999 result1
2000 .unwrap_err()
2001 .to_string()
2002 .contains("time index column data type should be timestamp")
2003 );
2004 }
2005
2006 #[test]
2007 fn test_parse_create_table_with_timestamp_index_not_null() {
2008 let sql = r"
2009CREATE TABLE monitor (
2010 host_id INT,
2011 idc STRING,
2012 ts TIMESTAMP TIME INDEX,
2013 cpu DOUBLE DEFAULT 0,
2014 memory DOUBLE,
2015 TIME INDEX (ts),
2016 PRIMARY KEY (host),
2017)
2018ENGINE=mito";
2019 let result =
2020 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2021 .unwrap();
2022
2023 assert_eq!(result.len(), 1);
2024 if let Statement::CreateTable(c) = &result[0] {
2025 let ts = c.columns[2].clone();
2026 assert_eq!(ts.name().to_string(), "ts");
2027 assert_eq!(ts.options()[0].option, NotNull);
2028 } else {
2029 panic!("should be create table statement");
2030 }
2031
2032 let sql1 = r"
2033CREATE TABLE monitor (
2034 host_id INT,
2035 idc STRING,
2036 ts TIMESTAMP NOT NULL TIME INDEX,
2037 cpu DOUBLE DEFAULT 0,
2038 memory DOUBLE,
2039 TIME INDEX (ts),
2040 PRIMARY KEY (host),
2041)
2042ENGINE=mito";
2043
2044 let result1 = ParserContext::create_with_dialect(
2045 sql1,
2046 &GreptimeDbDialect {},
2047 ParseOptions::default(),
2048 )
2049 .unwrap();
2050 assert_eq!(result, result1);
2051
2052 let sql2 = r"
2053CREATE TABLE monitor (
2054 host_id INT,
2055 idc STRING,
2056 ts TIMESTAMP TIME INDEX NOT NULL,
2057 cpu DOUBLE DEFAULT 0,
2058 memory DOUBLE,
2059 TIME INDEX (ts),
2060 PRIMARY KEY (host),
2061)
2062ENGINE=mito";
2063
2064 let result2 = ParserContext::create_with_dialect(
2065 sql2,
2066 &GreptimeDbDialect {},
2067 ParseOptions::default(),
2068 )
2069 .unwrap();
2070 assert_eq!(result, result2);
2071
2072 let sql3 = r"
2073CREATE TABLE monitor (
2074 host_id INT,
2075 idc STRING,
2076 ts TIMESTAMP TIME INDEX NULL NOT,
2077 cpu DOUBLE DEFAULT 0,
2078 memory DOUBLE,
2079 TIME INDEX (ts),
2080 PRIMARY KEY (host),
2081)
2082ENGINE=mito";
2083
2084 let result3 = ParserContext::create_with_dialect(
2085 sql3,
2086 &GreptimeDbDialect {},
2087 ParseOptions::default(),
2088 );
2089 assert!(result3.is_err());
2090
2091 let sql4 = r"
2092CREATE TABLE monitor (
2093 host_id INT,
2094 idc STRING,
2095 ts TIMESTAMP TIME INDEX NOT NULL NULL,
2096 cpu DOUBLE DEFAULT 0,
2097 memory DOUBLE,
2098 TIME INDEX (ts),
2099 PRIMARY KEY (host),
2100)
2101ENGINE=mito";
2102
2103 let result4 = ParserContext::create_with_dialect(
2104 sql4,
2105 &GreptimeDbDialect {},
2106 ParseOptions::default(),
2107 );
2108 assert!(result4.is_err());
2109
2110 let sql = r"
2111CREATE TABLE monitor (
2112 host_id INT,
2113 idc STRING,
2114 ts TIMESTAMP TIME INDEX DEFAULT CURRENT_TIMESTAMP,
2115 cpu DOUBLE DEFAULT 0,
2116 memory DOUBLE,
2117 TIME INDEX (ts),
2118 PRIMARY KEY (host),
2119)
2120ENGINE=mito";
2121
2122 let result =
2123 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2124 .unwrap();
2125
2126 if let Statement::CreateTable(c) = &result[0] {
2127 let tc = c.constraints[0].clone();
2128 match tc {
2129 TableConstraint::TimeIndex { column } => {
2130 assert_eq!(&column.value, "ts");
2131 }
2132 _ => panic!("should be time index constraint"),
2133 }
2134 let ts = c.columns[2].clone();
2135 assert_eq!(ts.name().to_string(), "ts");
2136 assert!(matches!(ts.options()[0].option, ColumnOption::Default(..)));
2137 assert_eq!(ts.options()[1].option, NotNull);
2138 } else {
2139 unreachable!("should be create table statement");
2140 }
2141 }
2142
2143 #[test]
2144 fn test_parse_partitions_with_error_syntax() {
2145 let sql = r"
2146CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2147PARTITION COLUMNS(c, a) (
2148 a < 10,
2149 a > 10 AND a < 20,
2150 a > 20 AND c < 100,
2151 a > 20 AND c >= 100
2152)
2153ENGINE=mito";
2154 let result =
2155 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2156 assert!(
2157 result
2158 .unwrap_err()
2159 .output_msg()
2160 .contains("sql parser error: Expected: ON, found: COLUMNS")
2161 );
2162 }
2163
2164 #[test]
2165 fn test_parse_partitions_without_rule() {
2166 let sql = r"
2167CREATE TABLE rcx ( a INT, b STRING, c INT, d TIMESTAMP TIME INDEX )
2168PARTITION ON COLUMNS(c, a) ()
2169ENGINE=mito";
2170 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2171 .unwrap();
2172 }
2173
2174 #[test]
2175 fn test_parse_partitions_unreferenced_column() {
2176 let sql = r"
2177CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2178PARTITION ON COLUMNS(c, a) (
2179 b = 'foo'
2180)
2181ENGINE=mito";
2182 let result =
2183 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2184 assert_eq!(
2185 result.unwrap_err().output_msg(),
2186 "Invalid SQL, error: Column \"b\" in rule expr is not referenced in PARTITION ON"
2187 );
2188 }
2189
2190 #[test]
2191 fn test_parse_partitions_not_binary_expr() {
2192 let sql = r"
2193CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2194PARTITION ON COLUMNS(c, a) (
2195 b
2196)
2197ENGINE=mito";
2198 let result =
2199 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2200 assert_eq!(
2201 result.unwrap_err().output_msg(),
2202 r#"Invalid SQL, error: Partition rule expr Identifier(Ident { value: "b", quote_style: None, span: Span(Location(4,5)..Location(4,6)) }) is not a binary expr"#
2203 );
2204 }
2205
2206 fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {
2207 assert_eq!(column.name.to_string(), name);
2208 assert_eq!(column.data_type.to_string(), data_type);
2209 }
2210
2211 #[test]
2212 pub fn test_parse_create_table() {
2213 let sql = r"create table demo(
2214 host string,
2215 ts timestamp,
2216 cpu float32 default 0,
2217 memory float64,
2218 TIME INDEX (ts),
2219 PRIMARY KEY(ts, host),
2220 ) engine=mito
2221 with(ttl='10s');
2222 ";
2223 let result =
2224 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2225 .unwrap();
2226 assert_eq!(1, result.len());
2227 match &result[0] {
2228 Statement::CreateTable(c) => {
2229 assert!(!c.if_not_exists);
2230 assert_eq!("demo", c.name.to_string());
2231 assert_eq!("mito", c.engine);
2232 assert_eq!(4, c.columns.len());
2233 let columns = &c.columns;
2234 assert_column_def(&columns[0].column_def, "host", "STRING");
2235 assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
2236 assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
2237 assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
2238
2239 let constraints = &c.constraints;
2240 assert_eq!(
2241 &constraints[0],
2242 &TableConstraint::TimeIndex {
2243 column: Ident::new("ts"),
2244 }
2245 );
2246 assert_eq!(
2247 &constraints[1],
2248 &TableConstraint::PrimaryKey {
2249 columns: vec![Ident::new("ts"), Ident::new("host")]
2250 }
2251 );
2252 assert_eq!(1, c.options.len());
2254 assert_eq!(
2255 [("ttl", "10s")].into_iter().collect::<HashMap<_, _>>(),
2256 c.options.to_str_map()
2257 );
2258 }
2259 _ => unreachable!(),
2260 }
2261 }
2262
2263 #[test]
2264 fn test_invalid_index_keys() {
2265 let sql = r"create table demo(
2266 host string,
2267 ts int64,
2268 cpu float64 default 0,
2269 memory float64,
2270 TIME INDEX (ts, host),
2271 PRIMARY KEY(ts, host)) engine=mito;
2272 ";
2273 let result =
2274 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2275 assert!(result.is_err());
2276 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2277 }
2278
2279 #[test]
2280 fn test_duplicated_time_index() {
2281 let sql = r"create table demo(
2282 host string,
2283 ts timestamp time index,
2284 t timestamp time index,
2285 cpu float64 default 0,
2286 memory float64,
2287 TIME INDEX (ts, host),
2288 PRIMARY KEY(ts, host)) engine=mito;
2289 ";
2290 let result =
2291 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2292 assert!(result.is_err());
2293 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2294
2295 let sql = r"create table demo(
2296 host string,
2297 ts timestamp time index,
2298 cpu float64 default 0,
2299 t timestamp,
2300 memory float64,
2301 TIME INDEX (t),
2302 PRIMARY KEY(ts, host)) engine=mito;
2303 ";
2304 let result =
2305 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2306 assert!(result.is_err());
2307 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2308 }
2309
2310 #[test]
2311 fn test_invalid_column_name() {
2312 let sql = "create table foo(user string, i timestamp time index)";
2313 let result =
2314 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2315 let err = result.unwrap_err().output_msg();
2316 assert!(err.contains("Cannot use keyword 'user' as column name"));
2317
2318 let sql = r#"
2320 create table foo("user" string, i timestamp time index)
2321 "#;
2322 let result =
2323 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2324 let _ = result.unwrap();
2325 }
2326
2327 #[test]
2328 fn test_incorrect_default_value_issue_3479() {
2329 let sql = r#"CREATE TABLE `ExcePTuRi`(
2330non TIMESTAMP(6) TIME INDEX,
2331`iUSTO` DOUBLE DEFAULT 0.047318541668048164
2332)"#;
2333 let result =
2334 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2335 .unwrap();
2336 assert_eq!(1, result.len());
2337 match &result[0] {
2338 Statement::CreateTable(c) => {
2339 assert_eq!(
2340 "`iUSTO` DOUBLE DEFAULT 0.047318541668048164",
2341 c.columns[1].to_string()
2342 );
2343 }
2344 _ => unreachable!(),
2345 }
2346 }
2347
2348 #[test]
2349 fn test_parse_create_view() {
2350 let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
2351
2352 let result =
2353 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2354 .unwrap();
2355 match &result[0] {
2356 Statement::CreateView(c) => {
2357 assert_eq!(c.to_string(), sql);
2358 assert!(!c.or_replace);
2359 assert!(!c.if_not_exists);
2360 assert_eq!("test", c.name.to_string());
2361 }
2362 _ => unreachable!(),
2363 }
2364
2365 let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS";
2366
2367 let result =
2368 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2369 .unwrap();
2370 match &result[0] {
2371 Statement::CreateView(c) => {
2372 assert_eq!(c.to_string(), sql);
2373 assert!(c.or_replace);
2374 assert!(c.if_not_exists);
2375 assert_eq!("test", c.name.to_string());
2376 }
2377 _ => unreachable!(),
2378 }
2379 }
2380
2381 #[test]
2382 fn test_parse_create_view_invalid_query() {
2383 let sql = "CREATE VIEW test AS DELETE from demo";
2384 let result =
2385 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2386 assert!(result.is_err());
2387 assert_matches!(result, Err(crate::error::Error::Syntax { .. }));
2388 }
2389
2390 #[test]
2391 fn test_parse_create_table_fulltext_options() {
2392 let sql1 = r"
2393CREATE TABLE log (
2394 ts TIMESTAMP TIME INDEX,
2395 msg TEXT FULLTEXT INDEX,
2396)";
2397 let result1 = ParserContext::create_with_dialect(
2398 sql1,
2399 &GreptimeDbDialect {},
2400 ParseOptions::default(),
2401 )
2402 .unwrap();
2403
2404 if let Statement::CreateTable(c) = &result1[0] {
2405 c.columns.iter().for_each(|col| {
2406 if col.name().value == "msg" {
2407 assert!(
2408 col.extensions
2409 .fulltext_index_options
2410 .as_ref()
2411 .unwrap()
2412 .is_empty()
2413 );
2414 }
2415 });
2416 } else {
2417 panic!("should be create_table statement");
2418 }
2419
2420 let sql2 = r"
2421CREATE TABLE log (
2422 ts TIMESTAMP TIME INDEX,
2423 msg STRING FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false')
2424)";
2425 let result2 = ParserContext::create_with_dialect(
2426 sql2,
2427 &GreptimeDbDialect {},
2428 ParseOptions::default(),
2429 )
2430 .unwrap();
2431
2432 if let Statement::CreateTable(c) = &result2[0] {
2433 c.columns.iter().for_each(|col| {
2434 if col.name().value == "msg" {
2435 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2436 assert_eq!(options.len(), 2);
2437 assert_eq!(options.get("analyzer").unwrap(), "English");
2438 assert_eq!(options.get("case_sensitive").unwrap(), "false");
2439 }
2440 });
2441 } else {
2442 panic!("should be create_table statement");
2443 }
2444
2445 let sql3 = r"
2446CREATE TABLE log (
2447 ts TIMESTAMP TIME INDEX,
2448 msg1 TINYTEXT FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false'),
2449 msg2 CHAR(20) FULLTEXT INDEX WITH (analyzer='Chinese', case_sensitive='true')
2450)";
2451 let result3 = ParserContext::create_with_dialect(
2452 sql3,
2453 &GreptimeDbDialect {},
2454 ParseOptions::default(),
2455 )
2456 .unwrap();
2457
2458 if let Statement::CreateTable(c) = &result3[0] {
2459 c.columns.iter().for_each(|col| {
2460 if col.name().value == "msg1" {
2461 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2462 assert_eq!(options.len(), 2);
2463 assert_eq!(options.get("analyzer").unwrap(), "English");
2464 assert_eq!(options.get("case_sensitive").unwrap(), "false");
2465 } else if col.name().value == "msg2" {
2466 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2467 assert_eq!(options.len(), 2);
2468 assert_eq!(options.get("analyzer").unwrap(), "Chinese");
2469 assert_eq!(options.get("case_sensitive").unwrap(), "true");
2470 }
2471 });
2472 } else {
2473 panic!("should be create_table statement");
2474 }
2475 }
2476
2477 #[test]
2478 fn test_parse_create_table_fulltext_options_invalid_type() {
2479 let sql = r"
2480CREATE TABLE log (
2481 ts TIMESTAMP TIME INDEX,
2482 msg INT FULLTEXT INDEX,
2483)";
2484 let result =
2485 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2486 assert!(result.is_err());
2487 assert!(
2488 result
2489 .unwrap_err()
2490 .to_string()
2491 .contains("FULLTEXT index only supports string type")
2492 );
2493 }
2494
2495 #[test]
2496 fn test_parse_create_table_fulltext_options_duplicate() {
2497 let sql = r"
2498CREATE TABLE log (
2499 ts TIMESTAMP TIME INDEX,
2500 msg STRING FULLTEXT INDEX WITH (analyzer='English', analyzer='Chinese') FULLTEXT INDEX WITH (case_sensitive='false')
2501)";
2502 let result =
2503 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2504 assert!(result.is_err());
2505 assert!(
2506 result
2507 .unwrap_err()
2508 .to_string()
2509 .contains("duplicated FULLTEXT INDEX option")
2510 );
2511 }
2512
2513 #[test]
2514 fn test_parse_create_table_fulltext_options_invalid_option() {
2515 let sql = r"
2516CREATE TABLE log (
2517 ts TIMESTAMP TIME INDEX,
2518 msg STRING FULLTEXT INDEX WITH (analyzer='English', invalid_option='Chinese')
2519)";
2520 let result =
2521 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2522 assert!(result.is_err());
2523 assert!(
2524 result
2525 .unwrap_err()
2526 .to_string()
2527 .contains("invalid FULLTEXT INDEX option")
2528 );
2529 }
2530
2531 #[test]
2532 fn test_parse_create_table_skip_options() {
2533 let sql = r"
2534CREATE TABLE log (
2535 ts TIMESTAMP TIME INDEX,
2536 msg INT SKIPPING INDEX WITH (granularity='8192', type='bloom'),
2537)";
2538 let result =
2539 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2540 .unwrap();
2541
2542 if let Statement::CreateTable(c) = &result[0] {
2543 c.columns.iter().for_each(|col| {
2544 if col.name().value == "msg" {
2545 assert!(
2546 !col.extensions
2547 .skipping_index_options
2548 .as_ref()
2549 .unwrap()
2550 .is_empty()
2551 );
2552 }
2553 });
2554 } else {
2555 panic!("should be create_table statement");
2556 }
2557
2558 let sql = r"
2559 CREATE TABLE log (
2560 ts TIMESTAMP TIME INDEX,
2561 msg INT SKIPPING INDEX,
2562 )";
2563 let result =
2564 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2565 .unwrap();
2566
2567 if let Statement::CreateTable(c) = &result[0] {
2568 c.columns.iter().for_each(|col| {
2569 if col.name().value == "msg" {
2570 assert!(
2571 col.extensions
2572 .skipping_index_options
2573 .as_ref()
2574 .unwrap()
2575 .is_empty()
2576 );
2577 }
2578 });
2579 } else {
2580 panic!("should be create_table statement");
2581 }
2582 }
2583
2584 #[test]
2585 fn test_parse_create_view_with_columns() {
2586 let sql = "CREATE VIEW test () AS SELECT * FROM NUMBERS";
2587 let result =
2588 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2589 .unwrap();
2590
2591 match &result[0] {
2592 Statement::CreateView(c) => {
2593 assert_eq!(c.to_string(), "CREATE VIEW test AS SELECT * FROM NUMBERS");
2594 assert!(!c.or_replace);
2595 assert!(!c.if_not_exists);
2596 assert_eq!("test", c.name.to_string());
2597 }
2598 _ => unreachable!(),
2599 }
2600 assert_eq!(
2601 "CREATE VIEW test AS SELECT * FROM NUMBERS",
2602 result[0].to_string()
2603 );
2604
2605 let sql = "CREATE VIEW test (n1) AS SELECT * FROM NUMBERS";
2606 let result =
2607 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2608 .unwrap();
2609
2610 match &result[0] {
2611 Statement::CreateView(c) => {
2612 assert_eq!(c.to_string(), sql);
2613 assert!(!c.or_replace);
2614 assert!(!c.if_not_exists);
2615 assert_eq!("test", c.name.to_string());
2616 }
2617 _ => unreachable!(),
2618 }
2619 assert_eq!(sql, result[0].to_string());
2620
2621 let sql = "CREATE VIEW test (n1, n2) AS SELECT * FROM NUMBERS";
2622 let result =
2623 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2624 .unwrap();
2625
2626 match &result[0] {
2627 Statement::CreateView(c) => {
2628 assert_eq!(c.to_string(), sql);
2629 assert!(!c.or_replace);
2630 assert!(!c.if_not_exists);
2631 assert_eq!("test", c.name.to_string());
2632 }
2633 _ => unreachable!(),
2634 }
2635 assert_eq!(sql, result[0].to_string());
2636
2637 let sql = "CREATE VIEW test (n1 AS select * from demo";
2639 let result =
2640 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2641 assert!(result.is_err());
2642
2643 let sql = "CREATE VIEW test (n1, AS select * from demo";
2644 let result =
2645 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2646 assert!(result.is_err());
2647
2648 let sql = "CREATE VIEW test n1,n2) AS select * from demo";
2649 let result =
2650 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2651 assert!(result.is_err());
2652
2653 let sql = "CREATE VIEW test (1) AS select * from demo";
2654 let result =
2655 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2656 assert!(result.is_err());
2657
2658 let sql = "CREATE VIEW test (n1, select) AS select * from demo";
2660 let result =
2661 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2662 assert!(result.is_err());
2663 }
2664
2665 #[test]
2666 fn test_parse_column_extensions_vector() {
2667 let sql = "VECTOR(128)";
2668 let dialect = GenericDialect {};
2669 let mut tokenizer = Tokenizer::new(&dialect, sql);
2670 let tokens = tokenizer.tokenize().unwrap();
2671 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2672 let name = Ident::new("vec_col");
2673 let data_type =
2674 DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
2675 let mut extensions = ColumnExtensions::default();
2676
2677 let result =
2678 ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2679 assert!(result.is_ok());
2680 assert!(extensions.vector_options.is_some());
2681 let vector_options = extensions.vector_options.unwrap();
2682 assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some(&"128".to_string()));
2683 }
2684
2685 #[test]
2686 fn test_parse_column_extensions_vector_invalid() {
2687 let sql = "VECTOR()";
2688 let dialect = GenericDialect {};
2689 let mut tokenizer = Tokenizer::new(&dialect, sql);
2690 let tokens = tokenizer.tokenize().unwrap();
2691 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2692 let name = Ident::new("vec_col");
2693 let data_type = DataType::Custom(vec![Ident::new("VECTOR")].into(), vec![]);
2694 let mut extensions = ColumnExtensions::default();
2695
2696 let result =
2697 ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2698 assert!(result.is_err());
2699 }
2700
2701 #[test]
2702 fn test_parse_column_extensions_indices() {
2703 {
2705 let sql = "SKIPPING INDEX";
2706 let dialect = GenericDialect {};
2707 let mut tokenizer = Tokenizer::new(&dialect, sql);
2708 let tokens = tokenizer.tokenize().unwrap();
2709 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2710 let name = Ident::new("col");
2711 let data_type = DataType::String(None);
2712 let mut extensions = ColumnExtensions::default();
2713 let result = ParserContext::parse_column_extensions(
2714 &mut parser,
2715 &name,
2716 &data_type,
2717 &mut extensions,
2718 );
2719 assert!(result.is_ok());
2720 assert!(extensions.skipping_index_options.is_some());
2721 }
2722
2723 {
2725 let sql = "FULLTEXT INDEX WITH (analyzer = 'English', case_sensitive = 'true')";
2726 let dialect = GenericDialect {};
2727 let mut tokenizer = Tokenizer::new(&dialect, sql);
2728 let tokens = tokenizer.tokenize().unwrap();
2729 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2730 let name = Ident::new("text_col");
2731 let data_type = DataType::String(None);
2732 let mut extensions = ColumnExtensions::default();
2733 let result = ParserContext::parse_column_extensions(
2734 &mut parser,
2735 &name,
2736 &data_type,
2737 &mut extensions,
2738 );
2739 assert!(result.unwrap());
2740 assert!(extensions.fulltext_index_options.is_some());
2741 let fulltext_options = extensions.fulltext_index_options.unwrap();
2742 assert_eq!(
2743 fulltext_options.get("analyzer"),
2744 Some(&"English".to_string())
2745 );
2746 assert_eq!(
2747 fulltext_options.get("case_sensitive"),
2748 Some(&"true".to_string())
2749 );
2750 }
2751
2752 {
2754 let sql = "FULLTEXT INDEX WITH (analyzer = 'English')";
2755 let dialect = GenericDialect {};
2756 let mut tokenizer = Tokenizer::new(&dialect, sql);
2757 let tokens = tokenizer.tokenize().unwrap();
2758 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2759 let name = Ident::new("num_col");
2760 let data_type = DataType::Int(None); let mut extensions = ColumnExtensions::default();
2762 let result = ParserContext::parse_column_extensions(
2763 &mut parser,
2764 &name,
2765 &data_type,
2766 &mut extensions,
2767 );
2768 assert!(result.is_err());
2769 assert!(
2770 result
2771 .unwrap_err()
2772 .to_string()
2773 .contains("FULLTEXT index only supports string type")
2774 );
2775 }
2776
2777 {
2779 let sql = "FULLTEXT INDEX WITH (analyzer = 'Invalid', case_sensitive = 'true')";
2780 let dialect = GenericDialect {};
2781 let mut tokenizer = Tokenizer::new(&dialect, sql);
2782 let tokens = tokenizer.tokenize().unwrap();
2783 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2784 let name = Ident::new("text_col");
2785 let data_type = DataType::String(None);
2786 let mut extensions = ColumnExtensions::default();
2787 let result = ParserContext::parse_column_extensions(
2788 &mut parser,
2789 &name,
2790 &data_type,
2791 &mut extensions,
2792 );
2793 assert!(result.unwrap());
2794 }
2795
2796 {
2798 let sql = "INVERTED INDEX";
2799 let dialect = GenericDialect {};
2800 let mut tokenizer = Tokenizer::new(&dialect, sql);
2801 let tokens = tokenizer.tokenize().unwrap();
2802 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2803 let name = Ident::new("col");
2804 let data_type = DataType::String(None);
2805 let mut extensions = ColumnExtensions::default();
2806 let result = ParserContext::parse_column_extensions(
2807 &mut parser,
2808 &name,
2809 &data_type,
2810 &mut extensions,
2811 );
2812 assert!(result.is_ok());
2813 assert!(extensions.inverted_index_options.is_some());
2814 }
2815
2816 {
2818 let sql = "INVERTED INDEX WITH (analyzer = 'English')";
2819 let dialect = GenericDialect {};
2820 let mut tokenizer = Tokenizer::new(&dialect, sql);
2821 let tokens = tokenizer.tokenize().unwrap();
2822 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2823 let name = Ident::new("col");
2824 let data_type = DataType::String(None);
2825 let mut extensions = ColumnExtensions::default();
2826 let result = ParserContext::parse_column_extensions(
2827 &mut parser,
2828 &name,
2829 &data_type,
2830 &mut extensions,
2831 );
2832 assert!(result.is_err());
2833 assert!(
2834 result
2835 .unwrap_err()
2836 .to_string()
2837 .contains("INVERTED index doesn't support options")
2838 );
2839 }
2840
2841 {
2843 let sql = "SKIPPING INDEX FULLTEXT INDEX";
2844 let dialect = GenericDialect {};
2845 let mut tokenizer = Tokenizer::new(&dialect, sql);
2846 let tokens = tokenizer.tokenize().unwrap();
2847 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2848 let name = Ident::new("col");
2849 let data_type = DataType::String(None);
2850 let mut extensions = ColumnExtensions::default();
2851 let result = ParserContext::parse_column_extensions(
2852 &mut parser,
2853 &name,
2854 &data_type,
2855 &mut extensions,
2856 );
2857 assert!(result.unwrap());
2858 assert!(extensions.skipping_index_options.is_some());
2859 assert!(extensions.fulltext_index_options.is_some());
2860 }
2861 }
2862
2863 #[test]
2864 fn test_parse_interval_cast() {
2865 let s = "select '10s'::INTERVAL";
2866 let stmts =
2867 ParserContext::create_with_dialect(s, &GreptimeDbDialect {}, ParseOptions::default())
2868 .unwrap();
2869 assert_eq!("SELECT '10 seconds'::INTERVAL", &stmts[0].to_string());
2870 }
2871}