1#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::HashMap;
19
20use arrow_buffer::IntervalMonthDayNano;
21use common_catalog::consts::default_engine;
22use datafusion_common::ScalarValue;
23use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
24use datatypes::data_type::ConcreteDataType;
25use itertools::Itertools;
26use snafu::{ensure, OptionExt, ResultExt};
27use sqlparser::ast::{ColumnOption, ColumnOptionDef, DataType, Expr};
28use sqlparser::dialect::keywords::Keyword;
29use sqlparser::keywords::ALL_KEYWORDS;
30use sqlparser::parser::IsOptional::Mandatory;
31use sqlparser::parser::{Parser, ParserError};
32use sqlparser::tokenizer::{Token, TokenWithSpan, Word};
33use table::requests::{validate_database_option, validate_table_option};
34
35use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
36use crate::error::{
37 self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidIntervalSnafu,
38 InvalidSqlSnafu, InvalidTableOptionSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result,
39 SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu,
40};
41use crate::parser::{ParserContext, FLOW};
42use crate::parsers::utils::{
43 self, validate_column_fulltext_create_option, validate_column_skipping_index_create_option,
44};
45use crate::statements::create::{
46 Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
47 CreateTableLike, CreateView, Partitions, SqlOrTql, TableConstraint, VECTOR_OPT_DIM,
48};
49use crate::statements::statement::Statement;
50use crate::statements::transform::type_alias::get_data_type_by_alias_name;
51use crate::statements::{sql_data_type_to_concrete_data_type, OptionMap};
52use crate::util::{location_to_index, parse_option_string};
53
54pub const ENGINE: &str = "ENGINE";
55pub const MAXVALUE: &str = "MAXVALUE";
56pub const SINK: &str = "SINK";
57pub const EXPIRE: &str = "EXPIRE";
58pub const AFTER: &str = "AFTER";
59pub const INVERTED: &str = "INVERTED";
60pub const SKIPPING: &str = "SKIPPING";
61
62pub type RawIntervalExpr = String;
63
64impl<'a> ParserContext<'a> {
66 pub(crate) fn parse_create(&mut self) -> Result<Statement> {
67 match self.parser.peek_token().token {
68 Token::Word(w) => match w.keyword {
69 Keyword::TABLE => self.parse_create_table(),
70
71 Keyword::SCHEMA | Keyword::DATABASE => self.parse_create_database(),
72
73 Keyword::EXTERNAL => self.parse_create_external_table(),
74
75 Keyword::OR => {
76 let _ = self.parser.next_token();
77 self.parser
78 .expect_keyword(Keyword::REPLACE)
79 .context(SyntaxSnafu)?;
80 match self.parser.next_token().token {
81 Token::Word(w) => match w.keyword {
82 Keyword::VIEW => self.parse_create_view(true),
83 Keyword::NoKeyword => {
84 let uppercase = w.value.to_uppercase();
85 match uppercase.as_str() {
86 FLOW => self.parse_create_flow(true),
87 _ => self.unsupported(w.to_string()),
88 }
89 }
90 _ => self.unsupported(w.to_string()),
91 },
92 _ => self.unsupported(w.to_string()),
93 }
94 }
95
96 Keyword::VIEW => {
97 let _ = self.parser.next_token();
98 self.parse_create_view(false)
99 }
100
101 #[cfg(feature = "enterprise")]
102 Keyword::TRIGGER => {
103 let _ = self.parser.next_token();
104 self.parse_create_trigger()
105 }
106
107 Keyword::NoKeyword => {
108 let _ = self.parser.next_token();
109 let uppercase = w.value.to_uppercase();
110 match uppercase.as_str() {
111 FLOW => self.parse_create_flow(false),
112 _ => self.unsupported(w.to_string()),
113 }
114 }
115 _ => self.unsupported(w.to_string()),
116 },
117 unexpected => self.unsupported(unexpected.to_string()),
118 }
119 }
120
121 fn parse_create_view(&mut self, or_replace: bool) -> Result<Statement> {
123 let if_not_exists = self.parse_if_not_exist()?;
124 let view_name = self.intern_parse_table_name()?;
125
126 let columns = self.parse_view_columns()?;
127
128 self.parser
129 .expect_keyword(Keyword::AS)
130 .context(SyntaxSnafu)?;
131
132 let query = self.parse_query()?;
133
134 Ok(Statement::CreateView(CreateView {
135 name: view_name,
136 columns,
137 or_replace,
138 query: Box::new(query),
139 if_not_exists,
140 }))
141 }
142
143 fn parse_view_columns(&mut self) -> Result<Vec<Ident>> {
144 let mut columns = vec![];
145 if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
146 return Ok(columns);
147 }
148
149 loop {
150 let name = self.parse_column_name().context(SyntaxSnafu)?;
151
152 columns.push(name);
153
154 let comma = self.parser.consume_token(&Token::Comma);
155 if self.parser.consume_token(&Token::RParen) {
156 break;
158 } else if !comma {
159 return self.expected("',' or ')' after column name", self.parser.peek_token());
160 }
161 }
162
163 Ok(columns)
164 }
165
166 fn parse_create_external_table(&mut self) -> Result<Statement> {
167 let _ = self.parser.next_token();
168 self.parser
169 .expect_keyword(Keyword::TABLE)
170 .context(SyntaxSnafu)?;
171 let if_not_exists = self.parse_if_not_exist()?;
172 let table_name = self.intern_parse_table_name()?;
173 let (columns, constraints) = self.parse_columns()?;
174 if !columns.is_empty() {
175 validate_time_index(&columns, &constraints)?;
176 }
177
178 let engine = self.parse_table_engine(common_catalog::consts::FILE_ENGINE)?;
179 let options = self.parse_create_table_options()?;
180 Ok(Statement::CreateExternalTable(CreateExternalTable {
181 name: table_name,
182 columns,
183 constraints,
184 options,
185 if_not_exists,
186 engine,
187 }))
188 }
189
190 fn parse_create_database(&mut self) -> Result<Statement> {
191 let _ = self.parser.next_token();
192 let if_not_exists = self.parse_if_not_exist()?;
193 let database_name = self.parse_object_name().context(error::UnexpectedSnafu {
194 expected: "a database name",
195 actual: self.peek_token_as_string(),
196 })?;
197 let database_name = Self::canonicalize_object_name(database_name);
198
199 let options = self
200 .parser
201 .parse_options(Keyword::WITH)
202 .context(SyntaxSnafu)?
203 .into_iter()
204 .map(parse_option_string)
205 .collect::<Result<HashMap<String, String>>>()?;
206
207 for key in options.keys() {
208 ensure!(
209 validate_database_option(key),
210 InvalidDatabaseOptionSnafu {
211 key: key.to_string()
212 }
213 );
214 }
215 if let Some(append_mode) = options.get("append_mode") {
216 if append_mode == "true" && options.contains_key("merge_mode") {
217 return InvalidDatabaseOptionSnafu {
218 key: "merge_mode".to_string(),
219 }
220 .fail();
221 }
222 }
223
224 Ok(Statement::CreateDatabase(CreateDatabase {
225 name: database_name,
226 if_not_exists,
227 options: options.into(),
228 }))
229 }
230
231 fn parse_create_table(&mut self) -> Result<Statement> {
232 let _ = self.parser.next_token();
233
234 let if_not_exists = self.parse_if_not_exist()?;
235
236 let table_name = self.intern_parse_table_name()?;
237
238 if self.parser.parse_keyword(Keyword::LIKE) {
239 let source_name = self.intern_parse_table_name()?;
240
241 return Ok(Statement::CreateTableLike(CreateTableLike {
242 table_name,
243 source_name,
244 }));
245 }
246
247 let (columns, constraints) = self.parse_columns()?;
248 validate_time_index(&columns, &constraints)?;
249
250 let partitions = self.parse_partitions()?;
251 if let Some(partitions) = &partitions {
252 validate_partitions(&columns, partitions)?;
253 }
254
255 let engine = self.parse_table_engine(default_engine())?;
256 let options = self.parse_create_table_options()?;
257 let create_table = CreateTable {
258 if_not_exists,
259 name: table_name,
260 columns,
261 engine,
262 constraints,
263 options,
264 table_id: 0, partitions,
266 };
267
268 Ok(Statement::CreateTable(create_table))
269 }
270
271 fn parse_create_flow(&mut self, or_replace: bool) -> Result<Statement> {
273 let if_not_exists = self.parse_if_not_exist()?;
274
275 let flow_name = self.intern_parse_table_name()?;
276
277 if let Token::Word(word) = self.parser.peek_token().token
279 && word.value.eq_ignore_ascii_case(SINK)
280 {
281 self.parser.next_token();
282 } else {
283 Err(ParserError::ParserError(
284 "Expect `SINK` keyword".to_string(),
285 ))
286 .context(SyntaxSnafu)?
287 }
288 self.parser
289 .expect_keyword(Keyword::TO)
290 .context(SyntaxSnafu)?;
291
292 let output_table_name = self.intern_parse_table_name()?;
293
294 let expire_after = if self
295 .parser
296 .consume_tokens(&[Token::make_keyword(EXPIRE), Token::make_keyword(AFTER)])
297 {
298 Some(self.parse_interval()?)
299 } else {
300 None
301 };
302
303 let comment = if self.parser.parse_keyword(Keyword::COMMENT) {
304 match self.parser.next_token() {
305 TokenWithSpan {
306 token: Token::SingleQuotedString(value, ..),
307 ..
308 } => Some(value),
309 unexpected => {
310 return self
311 .parser
312 .expected("string", unexpected)
313 .context(SyntaxSnafu)
314 }
315 }
316 } else {
317 None
318 };
319
320 self.parser
321 .expect_keyword(Keyword::AS)
322 .context(SyntaxSnafu)?;
323
324 let start_loc = self.parser.peek_token().span.start;
325 let start_index = location_to_index(self.sql, &start_loc);
326
327 let query = self.parse_statement()?;
328 let end_token = self.parser.peek_token();
329
330 let raw_query = if end_token == Token::EOF {
331 &self.sql[start_index..]
332 } else {
333 let end_loc = end_token.span.end;
334 let end_index = location_to_index(self.sql, &end_loc);
335 &self.sql[start_index..end_index.min(self.sql.len())]
336 };
337 let raw_query = raw_query.trim_end_matches(";");
338
339 let query = Box::new(SqlOrTql::try_from_statement(query, raw_query)?);
340
341 Ok(Statement::CreateFlow(CreateFlow {
342 flow_name,
343 sink_table_name: output_table_name,
344 or_replace,
345 if_not_exists,
346 expire_after,
347 comment,
348 query,
349 }))
350 }
351
352 fn parse_interval(&mut self) -> Result<i64> {
354 let interval = self.parse_interval_month_day_nano()?.0;
355 Ok(
356 interval.nanoseconds / 1_000_000_000
357 + interval.days as i64 * 60 * 60 * 24
358 + interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, )
362 }
363
364 fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> {
366 let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
367 let raw_interval_expr = interval_expr.to_string();
368 let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone())?
369 .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
370 .ok()
371 .with_context(|| InvalidIntervalSnafu {
372 reason: format!("cannot cast {} to interval type", interval_expr),
373 })?;
374 if let ScalarValue::IntervalMonthDayNano(Some(interval)) = interval {
375 Ok((interval, raw_interval_expr))
376 } else {
377 unreachable!()
378 }
379 }
380
381 fn parse_if_not_exist(&mut self) -> Result<bool> {
382 match self.parser.peek_token().token {
383 Token::Word(w) if Keyword::IF != w.keyword => return Ok(false),
384 _ => {}
385 }
386
387 if self.parser.parse_keywords(&[Keyword::IF, Keyword::NOT]) {
388 return self
389 .parser
390 .expect_keyword(Keyword::EXISTS)
391 .map(|_| true)
392 .context(UnexpectedSnafu {
393 expected: "EXISTS",
394 actual: self.peek_token_as_string(),
395 });
396 }
397
398 if self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]) {
399 return UnsupportedSnafu { keyword: "EXISTS" }.fail();
400 }
401
402 Ok(false)
403 }
404
405 fn parse_create_table_options(&mut self) -> Result<OptionMap> {
406 let options = self
407 .parser
408 .parse_options(Keyword::WITH)
409 .context(SyntaxSnafu)?
410 .into_iter()
411 .map(parse_option_string)
412 .collect::<Result<HashMap<String, String>>>()?;
413 for key in options.keys() {
414 ensure!(validate_table_option(key), InvalidTableOptionSnafu { key });
415 }
416 Ok(options.into())
417 }
418
419 fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
421 if !self.parser.parse_keyword(Keyword::PARTITION) {
422 return Ok(None);
423 }
424 self.parser
425 .expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
426 .context(error::UnexpectedSnafu {
427 expected: "ON, COLUMNS",
428 actual: self.peek_token_as_string(),
429 })?;
430
431 let raw_column_list = self
432 .parser
433 .parse_parenthesized_column_list(Mandatory, false)
434 .context(error::SyntaxSnafu)?;
435 let column_list = raw_column_list
436 .into_iter()
437 .map(Self::canonicalize_identifier)
438 .collect();
439
440 let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
441
442 Ok(Some(Partitions { column_list, exprs }))
443 }
444
445 fn parse_partition_entry(&mut self) -> Result<Expr> {
446 self.parser.parse_expr().context(error::SyntaxSnafu)
447 }
448
449 fn parse_comma_separated<T, F>(&mut self, mut f: F) -> Result<Vec<T>>
451 where
452 F: FnMut(&mut ParserContext<'a>) -> Result<T>,
453 {
454 self.parser
455 .expect_token(&Token::LParen)
456 .context(error::UnexpectedSnafu {
457 expected: "(",
458 actual: self.peek_token_as_string(),
459 })?;
460
461 let mut values = vec![];
462 while self.parser.peek_token() != Token::RParen {
463 values.push(f(self)?);
464 if !self.parser.consume_token(&Token::Comma) {
465 break;
466 }
467 }
468
469 self.parser
470 .expect_token(&Token::RParen)
471 .context(error::UnexpectedSnafu {
472 expected: ")",
473 actual: self.peek_token_as_string(),
474 })?;
475
476 Ok(values)
477 }
478
479 fn parse_columns(&mut self) -> Result<(Vec<Column>, Vec<TableConstraint>)> {
481 let mut columns = vec![];
482 let mut constraints = vec![];
483 if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
484 return Ok((columns, constraints));
485 }
486
487 loop {
488 if let Some(constraint) = self.parse_optional_table_constraint()? {
489 constraints.push(constraint);
490 } else if let Token::Word(_) = self.parser.peek_token().token {
491 self.parse_column(&mut columns, &mut constraints)?;
492 } else {
493 return self.expected(
494 "column name or constraint definition",
495 self.parser.peek_token(),
496 );
497 }
498 let comma = self.parser.consume_token(&Token::Comma);
499 if self.parser.consume_token(&Token::RParen) {
500 break;
502 } else if !comma {
503 return self.expected(
504 "',' or ')' after column definition",
505 self.parser.peek_token(),
506 );
507 }
508 }
509
510 Ok((columns, constraints))
511 }
512
513 fn parse_column(
514 &mut self,
515 columns: &mut Vec<Column>,
516 constraints: &mut Vec<TableConstraint>,
517 ) -> Result<()> {
518 let mut column = self.parse_column_def()?;
519
520 let mut time_index_opt_idx = None;
521 for (index, opt) in column.options().iter().enumerate() {
522 if let ColumnOption::DialectSpecific(tokens) = &opt.option {
523 if matches!(
524 &tokens[..],
525 [
526 Token::Word(Word {
527 keyword: Keyword::TIME,
528 ..
529 }),
530 Token::Word(Word {
531 keyword: Keyword::INDEX,
532 ..
533 })
534 ]
535 ) {
536 ensure!(
537 time_index_opt_idx.is_none(),
538 InvalidColumnOptionSnafu {
539 name: column.name().to_string(),
540 msg: "duplicated time index",
541 }
542 );
543 time_index_opt_idx = Some(index);
544
545 let constraint = TableConstraint::TimeIndex {
546 column: Ident::new(column.name().value.clone()),
547 };
548 constraints.push(constraint);
549 }
550 }
551 }
552
553 if let Some(index) = time_index_opt_idx {
554 ensure!(
555 !column.options().contains(&ColumnOptionDef {
556 option: ColumnOption::Null,
557 name: None,
558 }),
559 InvalidColumnOptionSnafu {
560 name: column.name().to_string(),
561 msg: "time index column can't be null",
562 }
563 );
564
565 let data_type = get_unalias_type(column.data_type());
567 ensure!(
568 matches!(data_type, DataType::Timestamp(_, _)),
569 InvalidColumnOptionSnafu {
570 name: column.name().to_string(),
571 msg: "time index column data type should be timestamp",
572 }
573 );
574
575 let not_null_opt = ColumnOptionDef {
576 option: ColumnOption::NotNull,
577 name: None,
578 };
579
580 if !column.options().contains(¬_null_opt) {
581 column.mut_options().push(not_null_opt);
582 }
583
584 let _ = column.mut_options().remove(index);
585 }
586
587 columns.push(column);
588
589 Ok(())
590 }
591
592 fn parse_column_name(&mut self) -> std::result::Result<Ident, ParserError> {
594 let name = self.parser.parse_identifier()?;
595 if name.quote_style.is_none() &&
596 ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()
598 {
599 return Err(ParserError::ParserError(format!(
600 "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
601 &name.value
602 )));
603 }
604
605 Ok(name)
606 }
607
608 pub fn parse_column_def(&mut self) -> Result<Column> {
609 let name = self.parse_column_name().context(SyntaxSnafu)?;
610 let parser = &mut self.parser;
611
612 ensure!(
613 !(name.quote_style.is_none() &&
614 ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()),
616 InvalidSqlSnafu {
617 msg: format!(
618 "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
619 &name.value
620 ),
621 }
622 );
623
624 let data_type = parser.parse_data_type().context(SyntaxSnafu)?;
625 let mut options = vec![];
626 let mut extensions = ColumnExtensions::default();
627 loop {
628 if parser.parse_keyword(Keyword::CONSTRAINT) {
629 let name = Some(parser.parse_identifier().context(SyntaxSnafu)?);
630 if let Some(option) = Self::parse_optional_column_option(parser)? {
631 options.push(ColumnOptionDef { name, option });
632 } else {
633 return parser
634 .expected(
635 "constraint details after CONSTRAINT <name>",
636 parser.peek_token(),
637 )
638 .context(SyntaxSnafu);
639 }
640 } else if let Some(option) = Self::parse_optional_column_option(parser)? {
641 options.push(ColumnOptionDef { name: None, option });
642 } else if !Self::parse_column_extensions(parser, &name, &data_type, &mut extensions)? {
643 break;
644 };
645 }
646
647 Ok(Column {
648 column_def: ColumnDef {
649 name: Self::canonicalize_identifier(name),
650 data_type,
651 options,
652 },
653 extensions,
654 })
655 }
656
657 fn parse_optional_column_option(parser: &mut Parser<'_>) -> Result<Option<ColumnOption>> {
658 if parser.parse_keywords(&[Keyword::CHARACTER, Keyword::SET]) {
659 Ok(Some(ColumnOption::CharacterSet(
660 parser.parse_object_name(false).context(SyntaxSnafu)?,
661 )))
662 } else if parser.parse_keywords(&[Keyword::NOT, Keyword::NULL]) {
663 Ok(Some(ColumnOption::NotNull))
664 } else if parser.parse_keywords(&[Keyword::COMMENT]) {
665 match parser.next_token() {
666 TokenWithSpan {
667 token: Token::SingleQuotedString(value, ..),
668 ..
669 } => Ok(Some(ColumnOption::Comment(value))),
670 unexpected => parser.expected("string", unexpected).context(SyntaxSnafu),
671 }
672 } else if parser.parse_keyword(Keyword::NULL) {
673 Ok(Some(ColumnOption::Null))
674 } else if parser.parse_keyword(Keyword::DEFAULT) {
675 Ok(Some(ColumnOption::Default(
676 parser.parse_expr().context(SyntaxSnafu)?,
677 )))
678 } else if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
679 Ok(Some(ColumnOption::Unique {
680 is_primary: true,
681 characteristics: None,
682 }))
683 } else if parser.parse_keyword(Keyword::UNIQUE) {
684 Ok(Some(ColumnOption::Unique {
685 is_primary: false,
686 characteristics: None,
687 }))
688 } else if parser.parse_keywords(&[Keyword::TIME, Keyword::INDEX]) {
689 Ok(Some(ColumnOption::DialectSpecific(vec![
691 Token::Word(Word {
692 value: "TIME".to_string(),
693 quote_style: None,
694 keyword: Keyword::TIME,
695 }),
696 Token::Word(Word {
697 value: "INDEX".to_string(),
698 quote_style: None,
699 keyword: Keyword::INDEX,
700 }),
701 ])))
702 } else {
703 Ok(None)
704 }
705 }
706
707 fn parse_column_extensions(
713 parser: &mut Parser<'_>,
714 column_name: &Ident,
715 column_type: &DataType,
716 column_extensions: &mut ColumnExtensions,
717 ) -> Result<bool> {
718 if let DataType::Custom(name, tokens) = column_type
719 && name.0.len() == 1
720 && &name.0[0].to_string_unquoted().to_uppercase() == "VECTOR"
721 {
722 ensure!(
723 tokens.len() == 1,
724 InvalidColumnOptionSnafu {
725 name: column_name.to_string(),
726 msg: "VECTOR type should have dimension",
727 }
728 );
729
730 let dimension =
731 tokens[0]
732 .parse::<u32>()
733 .ok()
734 .with_context(|| InvalidColumnOptionSnafu {
735 name: column_name.to_string(),
736 msg: "dimension should be a positive integer",
737 })?;
738
739 let options = OptionMap::from([(VECTOR_OPT_DIM.to_string(), dimension.to_string())]);
740 column_extensions.vector_options = Some(options);
741 }
742
743 let mut is_index_declared = false;
745
746 if let Token::Word(word) = parser.peek_token().token
748 && word.value.eq_ignore_ascii_case(SKIPPING)
749 {
750 parser.next_token();
751 ensure!(
753 parser.parse_keyword(Keyword::INDEX),
754 InvalidColumnOptionSnafu {
755 name: column_name.to_string(),
756 msg: "expect INDEX after SKIPPING keyword",
757 }
758 );
759 ensure!(
760 column_extensions.skipping_index_options.is_none(),
761 InvalidColumnOptionSnafu {
762 name: column_name.to_string(),
763 msg: "duplicated SKIPPING index option",
764 }
765 );
766
767 let options = parser
768 .parse_options(Keyword::WITH)
769 .context(error::SyntaxSnafu)?
770 .into_iter()
771 .map(parse_option_string)
772 .collect::<Result<HashMap<String, String>>>()?;
773
774 for key in options.keys() {
775 ensure!(
776 validate_column_skipping_index_create_option(key),
777 InvalidColumnOptionSnafu {
778 name: column_name.to_string(),
779 msg: format!("invalid SKIPPING INDEX option: {key}"),
780 }
781 );
782 }
783
784 column_extensions.skipping_index_options = Some(options.into());
785 is_index_declared |= true;
786 }
787
788 if parser.parse_keyword(Keyword::FULLTEXT) {
790 ensure!(
792 parser.parse_keyword(Keyword::INDEX),
793 InvalidColumnOptionSnafu {
794 name: column_name.to_string(),
795 msg: "expect INDEX after FULLTEXT keyword",
796 }
797 );
798
799 ensure!(
800 column_extensions.fulltext_index_options.is_none(),
801 InvalidColumnOptionSnafu {
802 name: column_name.to_string(),
803 msg: "duplicated FULLTEXT INDEX option",
804 }
805 );
806
807 let column_type = get_unalias_type(column_type);
808 let data_type = sql_data_type_to_concrete_data_type(&column_type)?;
809 ensure!(
810 data_type == ConcreteDataType::string_datatype(),
811 InvalidColumnOptionSnafu {
812 name: column_name.to_string(),
813 msg: "FULLTEXT index only supports string type",
814 }
815 );
816
817 let options = parser
818 .parse_options(Keyword::WITH)
819 .context(error::SyntaxSnafu)?
820 .into_iter()
821 .map(parse_option_string)
822 .collect::<Result<HashMap<String, String>>>()?;
823
824 for key in options.keys() {
825 ensure!(
826 validate_column_fulltext_create_option(key),
827 InvalidColumnOptionSnafu {
828 name: column_name.to_string(),
829 msg: format!("invalid FULLTEXT INDEX option: {key}"),
830 }
831 );
832 }
833
834 column_extensions.fulltext_index_options = Some(options.into());
835 is_index_declared |= true;
836 }
837
838 if let Token::Word(word) = parser.peek_token().token
840 && word.value.eq_ignore_ascii_case(INVERTED)
841 {
842 parser.next_token();
843 ensure!(
845 parser.parse_keyword(Keyword::INDEX),
846 InvalidColumnOptionSnafu {
847 name: column_name.to_string(),
848 msg: "expect INDEX after INVERTED keyword",
849 }
850 );
851
852 ensure!(
853 column_extensions.inverted_index_options.is_none(),
854 InvalidColumnOptionSnafu {
855 name: column_name.to_string(),
856 msg: "duplicated INVERTED index option",
857 }
858 );
859
860 let with_token = parser.peek_token();
863 ensure!(
864 with_token.token
865 != Token::Word(Word {
866 value: "WITH".to_string(),
867 keyword: Keyword::WITH,
868 quote_style: None,
869 }),
870 InvalidColumnOptionSnafu {
871 name: column_name.to_string(),
872 msg: "INVERTED index doesn't support options",
873 }
874 );
875
876 column_extensions.inverted_index_options = Some(OptionMap::default());
877 is_index_declared |= true;
878 }
879
880 Ok(is_index_declared)
881 }
882
883 fn parse_optional_table_constraint(&mut self) -> Result<Option<TableConstraint>> {
884 match self.parser.next_token() {
885 TokenWithSpan {
886 token: Token::Word(w),
887 ..
888 } if w.keyword == Keyword::PRIMARY => {
889 self.parser
890 .expect_keyword(Keyword::KEY)
891 .context(error::UnexpectedSnafu {
892 expected: "KEY",
893 actual: self.peek_token_as_string(),
894 })?;
895 let raw_columns = self
896 .parser
897 .parse_parenthesized_column_list(Mandatory, false)
898 .context(error::SyntaxSnafu)?;
899 let columns = raw_columns
900 .into_iter()
901 .map(Self::canonicalize_identifier)
902 .collect();
903 Ok(Some(TableConstraint::PrimaryKey { columns }))
904 }
905 TokenWithSpan {
906 token: Token::Word(w),
907 ..
908 } if w.keyword == Keyword::TIME => {
909 self.parser
910 .expect_keyword(Keyword::INDEX)
911 .context(error::UnexpectedSnafu {
912 expected: "INDEX",
913 actual: self.peek_token_as_string(),
914 })?;
915
916 let raw_columns = self
917 .parser
918 .parse_parenthesized_column_list(Mandatory, false)
919 .context(error::SyntaxSnafu)?;
920 let mut columns = raw_columns
921 .into_iter()
922 .map(Self::canonicalize_identifier)
923 .collect::<Vec<_>>();
924
925 ensure!(
926 columns.len() == 1,
927 InvalidTimeIndexSnafu {
928 msg: "it should contain only one column in time index",
929 }
930 );
931
932 Ok(Some(TableConstraint::TimeIndex {
933 column: columns.pop().unwrap(),
934 }))
935 }
936 _ => {
937 self.parser.prev_token();
938 Ok(None)
939 }
940 }
941 }
942
943 fn parse_table_engine(&mut self, default: &str) -> Result<String> {
945 if !self.consume_token(ENGINE) {
946 return Ok(default.to_string());
947 }
948
949 self.parser
950 .expect_token(&Token::Eq)
951 .context(error::UnexpectedSnafu {
952 expected: "=",
953 actual: self.peek_token_as_string(),
954 })?;
955
956 let token = self.parser.next_token();
957 if let Token::Word(w) = token.token {
958 Ok(w.value)
959 } else {
960 self.expected("'Engine' is missing", token)
961 }
962 }
963}
964
965fn validate_time_index(columns: &[Column], constraints: &[TableConstraint]) -> Result<()> {
966 let time_index_constraints: Vec<_> = constraints
967 .iter()
968 .filter_map(|c| match c {
969 TableConstraint::TimeIndex { column } => Some(column),
970 _ => None,
971 })
972 .unique()
973 .collect();
974
975 ensure!(!time_index_constraints.is_empty(), MissingTimeIndexSnafu);
976 ensure!(
977 time_index_constraints.len() == 1,
978 InvalidTimeIndexSnafu {
979 msg: format!(
980 "expected only one time index constraint but actual {}",
981 time_index_constraints.len()
982 ),
983 }
984 );
985
986 let time_index_column_ident = &time_index_constraints[0];
989 let time_index_column = columns
990 .iter()
991 .find(|c| c.name().value == *time_index_column_ident.value)
992 .with_context(|| InvalidTimeIndexSnafu {
993 msg: format!(
994 "time index column {} not found in columns",
995 time_index_column_ident
996 ),
997 })?;
998
999 let time_index_data_type = get_unalias_type(time_index_column.data_type());
1000 ensure!(
1001 matches!(time_index_data_type, DataType::Timestamp(_, _)),
1002 InvalidColumnOptionSnafu {
1003 name: time_index_column.name().to_string(),
1004 msg: "time index column data type should be timestamp",
1005 }
1006 );
1007
1008 Ok(())
1009}
1010
1011fn get_unalias_type(data_type: &DataType) -> DataType {
1012 match data_type {
1013 DataType::Custom(name, tokens) if name.0.len() == 1 && tokens.is_empty() => {
1014 if let Some(real_type) =
1015 get_data_type_by_alias_name(name.0[0].to_string_unquoted().as_str())
1016 {
1017 real_type
1018 } else {
1019 data_type.clone()
1020 }
1021 }
1022 _ => data_type.clone(),
1023 }
1024}
1025
1026fn validate_partitions(columns: &[Column], partitions: &Partitions) -> Result<()> {
1027 let partition_columns = ensure_partition_columns_defined(columns, partitions)?;
1028
1029 ensure_exprs_are_binary(&partitions.exprs, &partition_columns)?;
1030
1031 Ok(())
1032}
1033
1034fn ensure_exprs_are_binary(exprs: &[Expr], columns: &[&Column]) -> Result<()> {
1036 for expr in exprs {
1037 if let Expr::BinaryOp { left, op: _, right } = expr {
1039 ensure_one_expr(left, columns)?;
1040 ensure_one_expr(right, columns)?;
1041 } else {
1042 return error::InvalidSqlSnafu {
1043 msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1044 }
1045 .fail();
1046 }
1047 }
1048 Ok(())
1049}
1050
1051fn ensure_one_expr(expr: &Expr, columns: &[&Column]) -> Result<()> {
1055 match expr {
1056 Expr::BinaryOp { left, op: _, right } => {
1057 ensure_one_expr(left, columns)?;
1058 ensure_one_expr(right, columns)?;
1059 Ok(())
1060 }
1061 Expr::Identifier(ident) => {
1062 let column_name = &ident.value;
1063 ensure!(
1064 columns.iter().any(|c| &c.name().value == column_name),
1065 error::InvalidSqlSnafu {
1066 msg: format!(
1067 "Column {:?} in rule expr is not referenced in PARTITION ON",
1068 column_name
1069 ),
1070 }
1071 );
1072 Ok(())
1073 }
1074 Expr::Value(_) => Ok(()),
1075 Expr::UnaryOp { expr, .. } => {
1076 ensure_one_expr(expr, columns)?;
1077 Ok(())
1078 }
1079 _ => error::InvalidSqlSnafu {
1080 msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1081 }
1082 .fail(),
1083 }
1084}
1085
1086fn ensure_partition_columns_defined<'a>(
1088 columns: &'a [Column],
1089 partitions: &'a Partitions,
1090) -> Result<Vec<&'a Column>> {
1091 partitions
1092 .column_list
1093 .iter()
1094 .map(|x| {
1095 let x = ParserContext::canonicalize_identifier(x.clone());
1096 columns
1099 .iter()
1100 .find(|c| *c.name().value == x.value)
1101 .context(error::InvalidSqlSnafu {
1102 msg: format!("Partition column {:?} not defined", x.value),
1103 })
1104 })
1105 .collect::<Result<Vec<&Column>>>()
1106}
1107
1108#[cfg(test)]
1109mod tests {
1110 use std::assert_matches::assert_matches;
1111 use std::collections::HashMap;
1112
1113 use common_catalog::consts::FILE_ENGINE;
1114 use common_error::ext::ErrorExt;
1115 use sqlparser::ast::ColumnOption::NotNull;
1116 use sqlparser::ast::{BinaryOperator, Expr, ObjectName, Value};
1117 use sqlparser::dialect::GenericDialect;
1118 use sqlparser::tokenizer::Tokenizer;
1119
1120 use super::*;
1121 use crate::dialect::GreptimeDbDialect;
1122 use crate::parser::ParseOptions;
1123
1124 #[test]
1125 fn test_parse_create_table_like() {
1126 let sql = "CREATE TABLE t1 LIKE t2";
1127 let stmts =
1128 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1129 .unwrap();
1130
1131 assert_eq!(1, stmts.len());
1132 match &stmts[0] {
1133 Statement::CreateTableLike(c) => {
1134 assert_eq!(c.table_name.to_string(), "t1");
1135 assert_eq!(c.source_name.to_string(), "t2");
1136 }
1137 _ => unreachable!(),
1138 }
1139 }
1140
1141 #[test]
1142 fn test_validate_external_table_options() {
1143 let sql = "CREATE EXTERNAL TABLE city (
1144 host string,
1145 ts timestamp,
1146 cpu float64 default 0,
1147 memory float64,
1148 TIME INDEX (ts),
1149 PRIMARY KEY(ts, host)
1150 ) with(location='/var/data/city.csv',format='csv',foo='bar');";
1151
1152 let result =
1153 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1154 assert!(matches!(
1155 result,
1156 Err(error::Error::InvalidTableOption { .. })
1157 ));
1158 }
1159
1160 #[test]
1161 fn test_parse_create_external_table() {
1162 struct Test<'a> {
1163 sql: &'a str,
1164 expected_table_name: &'a str,
1165 expected_options: HashMap<String, String>,
1166 expected_engine: &'a str,
1167 expected_if_not_exist: bool,
1168 }
1169
1170 let tests = [
1171 Test {
1172 sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
1173 expected_table_name: "city",
1174 expected_options: HashMap::from([
1175 ("location".to_string(), "/var/data/city.csv".to_string()),
1176 ("format".to_string(), "csv".to_string()),
1177 ]),
1178 expected_engine: FILE_ENGINE,
1179 expected_if_not_exist: false,
1180 },
1181 Test {
1182 sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');",
1183 expected_table_name: "city",
1184 expected_options: HashMap::from([
1185 ("location".to_string(), "/var/data/city.csv".to_string()),
1186 ("format".to_string(), "csv".to_string()),
1187 ]),
1188 expected_engine: "foo",
1189 expected_if_not_exist: true,
1190 },
1191 Test {
1192 sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv','compaction.type'='bar');",
1193 expected_table_name: "city",
1194 expected_options: HashMap::from([
1195 ("location".to_string(), "/var/data/city.csv".to_string()),
1196 ("format".to_string(), "csv".to_string()),
1197 ("compaction.type".to_string(), "bar".to_string()),
1198 ]),
1199 expected_engine: "foo",
1200 expected_if_not_exist: true,
1201 },
1202 ];
1203
1204 for test in tests {
1205 let stmts = ParserContext::create_with_dialect(
1206 test.sql,
1207 &GreptimeDbDialect {},
1208 ParseOptions::default(),
1209 )
1210 .unwrap();
1211 assert_eq!(1, stmts.len());
1212 match &stmts[0] {
1213 Statement::CreateExternalTable(c) => {
1214 assert_eq!(c.name.to_string(), test.expected_table_name.to_string());
1215 assert_eq!(c.options, test.expected_options.into());
1216 assert_eq!(c.if_not_exists, test.expected_if_not_exist);
1217 assert_eq!(c.engine, test.expected_engine);
1218 }
1219 _ => unreachable!(),
1220 }
1221 }
1222 }
1223
1224 #[test]
1225 fn test_parse_create_external_table_with_schema() {
1226 let sql = "CREATE EXTERNAL TABLE city (
1227 host string,
1228 ts timestamp,
1229 cpu float32 default 0,
1230 memory float64,
1231 TIME INDEX (ts),
1232 PRIMARY KEY(ts, host),
1233 ) with(location='/var/data/city.csv',format='csv');";
1234
1235 let options = HashMap::from([
1236 ("location".to_string(), "/var/data/city.csv".to_string()),
1237 ("format".to_string(), "csv".to_string()),
1238 ]);
1239
1240 let stmts =
1241 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1242 .unwrap();
1243 assert_eq!(1, stmts.len());
1244 match &stmts[0] {
1245 Statement::CreateExternalTable(c) => {
1246 assert_eq!(c.name.to_string(), "city");
1247 assert_eq!(c.options, options.into());
1248
1249 let columns = &c.columns;
1250 assert_column_def(&columns[0].column_def, "host", "STRING");
1251 assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
1252 assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
1253 assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
1254
1255 let constraints = &c.constraints;
1256 assert_eq!(
1257 &constraints[0],
1258 &TableConstraint::TimeIndex {
1259 column: Ident::new("ts"),
1260 }
1261 );
1262 assert_eq!(
1263 &constraints[1],
1264 &TableConstraint::PrimaryKey {
1265 columns: vec![Ident::new("ts"), Ident::new("host")]
1266 }
1267 );
1268 }
1269 _ => unreachable!(),
1270 }
1271 }
1272
1273 #[test]
1274 fn test_parse_create_database() {
1275 let sql = "create database";
1276 let result =
1277 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1278 assert!(result
1279 .unwrap_err()
1280 .to_string()
1281 .contains("Unexpected token while parsing SQL statement"));
1282
1283 let sql = "create database prometheus";
1284 let stmts =
1285 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1286 .unwrap();
1287
1288 assert_eq!(1, stmts.len());
1289 match &stmts[0] {
1290 Statement::CreateDatabase(c) => {
1291 assert_eq!(c.name.to_string(), "prometheus");
1292 assert!(!c.if_not_exists);
1293 }
1294 _ => unreachable!(),
1295 }
1296
1297 let sql = "create database if not exists prometheus";
1298 let stmts =
1299 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1300 .unwrap();
1301
1302 assert_eq!(1, stmts.len());
1303 match &stmts[0] {
1304 Statement::CreateDatabase(c) => {
1305 assert_eq!(c.name.to_string(), "prometheus");
1306 assert!(c.if_not_exists);
1307 }
1308 _ => unreachable!(),
1309 }
1310
1311 let sql = "CREATE DATABASE `fOo`";
1312 let result =
1313 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1314 let stmts = result.unwrap();
1315 match &stmts.last().unwrap() {
1316 Statement::CreateDatabase(c) => {
1317 assert_eq!(c.name, vec![Ident::with_quote('`', "fOo")].into());
1318 assert!(!c.if_not_exists);
1319 }
1320 _ => unreachable!(),
1321 }
1322
1323 let sql = "CREATE DATABASE prometheus with (ttl='1h');";
1324 let result =
1325 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1326 let stmts = result.unwrap();
1327 match &stmts[0] {
1328 Statement::CreateDatabase(c) => {
1329 assert_eq!(c.name.to_string(), "prometheus");
1330 assert!(!c.if_not_exists);
1331 assert_eq!(c.options.get("ttl").unwrap(), "1h");
1332 }
1333 _ => unreachable!(),
1334 }
1335 }
1336
1337 #[test]
1338 fn test_parse_create_flow_more_testcases() {
1339 use pretty_assertions::assert_eq;
1340 fn parse_create_flow(sql: &str) -> CreateFlow {
1341 let stmts = ParserContext::create_with_dialect(
1342 sql,
1343 &GreptimeDbDialect {},
1344 ParseOptions::default(),
1345 )
1346 .unwrap();
1347 assert_eq!(1, stmts.len());
1348 match &stmts[0] {
1349 Statement::CreateFlow(c) => c.clone(),
1350 _ => unreachable!(),
1351 }
1352 }
1353 struct CreateFlowWoutQuery {
1354 pub flow_name: ObjectName,
1356 pub sink_table_name: ObjectName,
1358 pub or_replace: bool,
1360 pub if_not_exists: bool,
1362 pub expire_after: Option<i64>,
1365 pub comment: Option<String>,
1367 }
1368 let testcases = vec![
1369 (
1370 r"
1371CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1372SINK TO schema_1.table_1
1373EXPIRE AFTER INTERVAL '5 minutes'
1374COMMENT 'test comment'
1375AS
1376SELECT max(c1), min(c2) FROM schema_2.table_2;",
1377 CreateFlowWoutQuery {
1378 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1379 sink_table_name: ObjectName::from(vec![
1380 Ident::new("schema_1"),
1381 Ident::new("table_1"),
1382 ]),
1383 or_replace: true,
1384 if_not_exists: true,
1385 expire_after: Some(300),
1386 comment: Some("test comment".to_string()),
1387 },
1388 ),
1389 (
1390 r"
1391CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1392SINK TO schema_1.table_1
1393EXPIRE AFTER INTERVAL '300 s'
1394COMMENT 'test comment'
1395AS
1396SELECT max(c1), min(c2) FROM schema_2.table_2;",
1397 CreateFlowWoutQuery {
1398 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1399 sink_table_name: ObjectName::from(vec![
1400 Ident::new("schema_1"),
1401 Ident::new("table_1"),
1402 ]),
1403 or_replace: true,
1404 if_not_exists: true,
1405 expire_after: Some(300),
1406 comment: Some("test comment".to_string()),
1407 },
1408 ),
1409 (
1410 r"
1411CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1412SINK TO schema_1.table_1
1413EXPIRE AFTER '5 minutes'
1414COMMENT 'test comment'
1415AS
1416SELECT max(c1), min(c2) FROM schema_2.table_2;",
1417 CreateFlowWoutQuery {
1418 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1419 sink_table_name: ObjectName::from(vec![
1420 Ident::new("schema_1"),
1421 Ident::new("table_1"),
1422 ]),
1423 or_replace: true,
1424 if_not_exists: true,
1425 expire_after: Some(300),
1426 comment: Some("test comment".to_string()),
1427 },
1428 ),
1429 (
1430 r"
1431CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1432SINK TO schema_1.table_1
1433EXPIRE AFTER '300 s'
1434COMMENT 'test comment'
1435AS
1436SELECT max(c1), min(c2) FROM schema_2.table_2;",
1437 CreateFlowWoutQuery {
1438 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1439 sink_table_name: ObjectName::from(vec![
1440 Ident::new("schema_1"),
1441 Ident::new("table_1"),
1442 ]),
1443 or_replace: true,
1444 if_not_exists: true,
1445 expire_after: Some(300),
1446 comment: Some("test comment".to_string()),
1447 },
1448 ),
1449 (
1450 r"
1451CREATE FLOW `task_2`
1452SINK TO schema_1.table_1
1453EXPIRE AFTER '1 month 2 days 1h 2 min'
1454AS
1455SELECT max(c1), min(c2) FROM schema_2.table_2;",
1456 CreateFlowWoutQuery {
1457 flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_2")]),
1458 sink_table_name: ObjectName::from(vec![
1459 Ident::new("schema_1"),
1460 Ident::new("table_1"),
1461 ]),
1462 or_replace: false,
1463 if_not_exists: false,
1464 expire_after: Some(86400 * 3044 / 1000 + 2 * 86400 + 3600 + 2 * 60),
1465 comment: None,
1466 },
1467 ),
1468 ];
1469
1470 for (sql, expected) in testcases {
1471 let create_task = parse_create_flow(sql);
1472
1473 let expected = CreateFlow {
1474 flow_name: expected.flow_name,
1475 sink_table_name: expected.sink_table_name,
1476 or_replace: expected.or_replace,
1477 if_not_exists: expected.if_not_exists,
1478 expire_after: expected.expire_after,
1479 comment: expected.comment,
1480 query: create_task.query.clone(),
1482 };
1483
1484 assert_eq!(create_task, expected, "input sql is:\n{sql}");
1485 let show_create = create_task.to_string();
1486 let recreated = parse_create_flow(&show_create);
1487 assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1488 }
1489 }
1490
1491 #[test]
1492 fn test_parse_create_flow() {
1493 let sql = r"
1494CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1495SINK TO schema_1.table_1
1496EXPIRE AFTER INTERVAL '5 minutes'
1497COMMENT 'test comment'
1498AS
1499SELECT max(c1), min(c2) FROM schema_2.table_2;";
1500 let stmts =
1501 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1502 .unwrap();
1503 assert_eq!(1, stmts.len());
1504 let create_task = match &stmts[0] {
1505 Statement::CreateFlow(c) => c,
1506 _ => unreachable!(),
1507 };
1508
1509 let expected = CreateFlow {
1510 flow_name: vec![Ident::new("task_1")].into(),
1511 sink_table_name: vec![Ident::new("schema_1"), Ident::new("table_1")].into(),
1512 or_replace: true,
1513 if_not_exists: true,
1514 expire_after: Some(300),
1515 comment: Some("test comment".to_string()),
1516 query: create_task.query.clone(),
1518 };
1519 assert_eq!(create_task, &expected);
1520
1521 let sql = r"
1523CREATE FLOW `task_2`
1524SINK TO schema_1.table_1
1525EXPIRE AFTER '1 month 2 days 1h 2 min'
1526AS
1527SELECT max(c1), min(c2) FROM schema_2.table_2;";
1528 let stmts =
1529 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1530 .unwrap();
1531 assert_eq!(1, stmts.len());
1532 let create_task = match &stmts[0] {
1533 Statement::CreateFlow(c) => c,
1534 _ => unreachable!(),
1535 };
1536 assert!(!create_task.or_replace);
1537 assert!(!create_task.if_not_exists);
1538 assert_eq!(
1539 create_task.expire_after,
1540 Some(86400 * 3044 / 1000 + 2 * 86400 + 3600 + 2 * 60)
1541 );
1542 assert!(create_task.comment.is_none());
1543 assert_eq!(create_task.flow_name.to_string(), "`task_2`");
1544 }
1545
1546 #[test]
1547 fn test_validate_create() {
1548 let sql = r"
1549CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp TIME INDEX)
1550PARTITION ON COLUMNS(c, a) (
1551 a < 10,
1552 a > 10 AND a < 20,
1553 a > 20 AND c < 100,
1554 a > 20 AND c >= 100
1555)
1556ENGINE=mito";
1557 let result =
1558 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1559 let _ = result.unwrap();
1560
1561 let sql = r"
1562CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
1563PARTITION ON COLUMNS(x) ()
1564ENGINE=mito";
1565 let result =
1566 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1567 assert!(result
1568 .unwrap_err()
1569 .to_string()
1570 .contains("Partition column \"x\" not defined"));
1571 }
1572
1573 #[test]
1574 fn test_parse_create_table_with_partitions() {
1575 let sql = r"
1576CREATE TABLE monitor (
1577 host_id INT,
1578 idc STRING,
1579 ts TIMESTAMP,
1580 cpu DOUBLE DEFAULT 0,
1581 memory DOUBLE,
1582 TIME INDEX (ts),
1583 PRIMARY KEY (host),
1584)
1585PARTITION ON COLUMNS(idc, host_id) (
1586 idc <= 'hz' AND host_id < 1000,
1587 idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
1588 idc > 'sh' AND host_id < 3000,
1589 idc > 'sh' AND host_id >= 3000,
1590)
1591ENGINE=mito";
1592 let result =
1593 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1594 .unwrap();
1595 assert_eq!(result.len(), 1);
1596 match &result[0] {
1597 Statement::CreateTable(c) => {
1598 assert!(c.partitions.is_some());
1599
1600 let partitions = c.partitions.as_ref().unwrap();
1601 let column_list = partitions
1602 .column_list
1603 .iter()
1604 .map(|x| &x.value)
1605 .collect::<Vec<&String>>();
1606 assert_eq!(column_list, vec!["idc", "host_id"]);
1607
1608 let exprs = &partitions.exprs;
1609
1610 assert_eq!(
1611 exprs[0],
1612 Expr::BinaryOp {
1613 left: Box::new(Expr::BinaryOp {
1614 left: Box::new(Expr::Identifier("idc".into())),
1615 op: BinaryOperator::LtEq,
1616 right: Box::new(Expr::Value(
1617 Value::SingleQuotedString("hz".to_string()).into()
1618 ))
1619 }),
1620 op: BinaryOperator::And,
1621 right: Box::new(Expr::BinaryOp {
1622 left: Box::new(Expr::Identifier("host_id".into())),
1623 op: BinaryOperator::Lt,
1624 right: Box::new(Expr::Value(
1625 Value::Number("1000".to_string(), false).into()
1626 ))
1627 })
1628 }
1629 );
1630 assert_eq!(
1631 exprs[1],
1632 Expr::BinaryOp {
1633 left: Box::new(Expr::BinaryOp {
1634 left: Box::new(Expr::BinaryOp {
1635 left: Box::new(Expr::Identifier("idc".into())),
1636 op: BinaryOperator::Gt,
1637 right: Box::new(Expr::Value(
1638 Value::SingleQuotedString("hz".to_string()).into()
1639 ))
1640 }),
1641 op: BinaryOperator::And,
1642 right: Box::new(Expr::BinaryOp {
1643 left: Box::new(Expr::Identifier("idc".into())),
1644 op: BinaryOperator::LtEq,
1645 right: Box::new(Expr::Value(
1646 Value::SingleQuotedString("sh".to_string()).into()
1647 ))
1648 })
1649 }),
1650 op: BinaryOperator::And,
1651 right: Box::new(Expr::BinaryOp {
1652 left: Box::new(Expr::Identifier("host_id".into())),
1653 op: BinaryOperator::Lt,
1654 right: Box::new(Expr::Value(
1655 Value::Number("2000".to_string(), false).into()
1656 ))
1657 })
1658 }
1659 );
1660 assert_eq!(
1661 exprs[2],
1662 Expr::BinaryOp {
1663 left: Box::new(Expr::BinaryOp {
1664 left: Box::new(Expr::Identifier("idc".into())),
1665 op: BinaryOperator::Gt,
1666 right: Box::new(Expr::Value(
1667 Value::SingleQuotedString("sh".to_string()).into()
1668 ))
1669 }),
1670 op: BinaryOperator::And,
1671 right: Box::new(Expr::BinaryOp {
1672 left: Box::new(Expr::Identifier("host_id".into())),
1673 op: BinaryOperator::Lt,
1674 right: Box::new(Expr::Value(
1675 Value::Number("3000".to_string(), false).into()
1676 ))
1677 })
1678 }
1679 );
1680 assert_eq!(
1681 exprs[3],
1682 Expr::BinaryOp {
1683 left: Box::new(Expr::BinaryOp {
1684 left: Box::new(Expr::Identifier("idc".into())),
1685 op: BinaryOperator::Gt,
1686 right: Box::new(Expr::Value(
1687 Value::SingleQuotedString("sh".to_string()).into()
1688 ))
1689 }),
1690 op: BinaryOperator::And,
1691 right: Box::new(Expr::BinaryOp {
1692 left: Box::new(Expr::Identifier("host_id".into())),
1693 op: BinaryOperator::GtEq,
1694 right: Box::new(Expr::Value(
1695 Value::Number("3000".to_string(), false).into()
1696 ))
1697 })
1698 }
1699 );
1700 }
1701 _ => unreachable!(),
1702 }
1703 }
1704
1705 #[test]
1706 fn test_parse_create_table_with_quoted_partitions() {
1707 let sql = r"
1708CREATE TABLE monitor (
1709 `host_id` INT,
1710 idc STRING,
1711 ts TIMESTAMP,
1712 cpu DOUBLE DEFAULT 0,
1713 memory DOUBLE,
1714 TIME INDEX (ts),
1715 PRIMARY KEY (host),
1716)
1717PARTITION ON COLUMNS(IdC, host_id) (
1718 idc <= 'hz' AND host_id < 1000,
1719 idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
1720 idc > 'sh' AND host_id < 3000,
1721 idc > 'sh' AND host_id >= 3000,
1722)";
1723 let result =
1724 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1725 .unwrap();
1726 assert_eq!(result.len(), 1);
1727 }
1728
1729 #[test]
1730 fn test_parse_create_table_with_timestamp_index() {
1731 let sql1 = r"
1732CREATE TABLE monitor (
1733 host_id INT,
1734 idc STRING,
1735 ts TIMESTAMP TIME INDEX,
1736 cpu DOUBLE DEFAULT 0,
1737 memory DOUBLE,
1738 PRIMARY KEY (host),
1739)
1740ENGINE=mito";
1741 let result1 = ParserContext::create_with_dialect(
1742 sql1,
1743 &GreptimeDbDialect {},
1744 ParseOptions::default(),
1745 )
1746 .unwrap();
1747
1748 if let Statement::CreateTable(c) = &result1[0] {
1749 assert_eq!(c.constraints.len(), 2);
1750 let tc = c.constraints[0].clone();
1751 match tc {
1752 TableConstraint::TimeIndex { column } => {
1753 assert_eq!(&column.value, "ts");
1754 }
1755 _ => panic!("should be time index constraint"),
1756 };
1757 } else {
1758 panic!("should be create_table statement");
1759 }
1760
1761 let sql2 = r"
1764CREATE TABLE monitor (
1765 host_id INT,
1766 idc STRING,
1767 ts TIMESTAMP NOT NULL,
1768 cpu DOUBLE DEFAULT 0,
1769 memory DOUBLE,
1770 TIME INDEX (ts),
1771 PRIMARY KEY (host),
1772)
1773ENGINE=mito";
1774 let result2 = ParserContext::create_with_dialect(
1775 sql2,
1776 &GreptimeDbDialect {},
1777 ParseOptions::default(),
1778 )
1779 .unwrap();
1780
1781 assert_eq!(result1, result2);
1782
1783 let sql3 = r"
1785CREATE TABLE monitor (
1786 host_id INT,
1787 idc STRING,
1788 ts TIMESTAMP,
1789 cpu DOUBLE DEFAULT 0,
1790 memory DOUBLE,
1791 TIME INDEX (ts),
1792 PRIMARY KEY (host),
1793)
1794ENGINE=mito";
1795
1796 let result3 = ParserContext::create_with_dialect(
1797 sql3,
1798 &GreptimeDbDialect {},
1799 ParseOptions::default(),
1800 )
1801 .unwrap();
1802
1803 assert_ne!(result1, result3);
1804
1805 let sql1 = r"
1807CREATE TABLE monitor (
1808 host_id INT,
1809 idc STRING,
1810 b bigint TIME INDEX,
1811 cpu DOUBLE DEFAULT 0,
1812 memory DOUBLE,
1813 PRIMARY KEY (host),
1814)
1815ENGINE=mito";
1816 let result1 = ParserContext::create_with_dialect(
1817 sql1,
1818 &GreptimeDbDialect {},
1819 ParseOptions::default(),
1820 );
1821
1822 assert!(result1
1823 .unwrap_err()
1824 .to_string()
1825 .contains("time index column data type should be timestamp"));
1826 }
1827
1828 #[test]
1829 fn test_parse_create_table_with_timestamp_index_not_null() {
1830 let sql = r"
1831CREATE TABLE monitor (
1832 host_id INT,
1833 idc STRING,
1834 ts TIMESTAMP TIME INDEX,
1835 cpu DOUBLE DEFAULT 0,
1836 memory DOUBLE,
1837 TIME INDEX (ts),
1838 PRIMARY KEY (host),
1839)
1840ENGINE=mito";
1841 let result =
1842 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1843 .unwrap();
1844
1845 assert_eq!(result.len(), 1);
1846 if let Statement::CreateTable(c) = &result[0] {
1847 let ts = c.columns[2].clone();
1848 assert_eq!(ts.name().to_string(), "ts");
1849 assert_eq!(ts.options()[0].option, NotNull);
1850 } else {
1851 panic!("should be create table statement");
1852 }
1853
1854 let sql1 = r"
1855CREATE TABLE monitor (
1856 host_id INT,
1857 idc STRING,
1858 ts TIMESTAMP NOT NULL TIME INDEX,
1859 cpu DOUBLE DEFAULT 0,
1860 memory DOUBLE,
1861 TIME INDEX (ts),
1862 PRIMARY KEY (host),
1863)
1864ENGINE=mito";
1865
1866 let result1 = ParserContext::create_with_dialect(
1867 sql1,
1868 &GreptimeDbDialect {},
1869 ParseOptions::default(),
1870 )
1871 .unwrap();
1872 assert_eq!(result, result1);
1873
1874 let sql2 = r"
1875CREATE TABLE monitor (
1876 host_id INT,
1877 idc STRING,
1878 ts TIMESTAMP TIME INDEX NOT NULL,
1879 cpu DOUBLE DEFAULT 0,
1880 memory DOUBLE,
1881 TIME INDEX (ts),
1882 PRIMARY KEY (host),
1883)
1884ENGINE=mito";
1885
1886 let result2 = ParserContext::create_with_dialect(
1887 sql2,
1888 &GreptimeDbDialect {},
1889 ParseOptions::default(),
1890 )
1891 .unwrap();
1892 assert_eq!(result, result2);
1893
1894 let sql3 = r"
1895CREATE TABLE monitor (
1896 host_id INT,
1897 idc STRING,
1898 ts TIMESTAMP TIME INDEX NULL NOT,
1899 cpu DOUBLE DEFAULT 0,
1900 memory DOUBLE,
1901 TIME INDEX (ts),
1902 PRIMARY KEY (host),
1903)
1904ENGINE=mito";
1905
1906 let result3 = ParserContext::create_with_dialect(
1907 sql3,
1908 &GreptimeDbDialect {},
1909 ParseOptions::default(),
1910 );
1911 assert!(result3.is_err());
1912
1913 let sql4 = r"
1914CREATE TABLE monitor (
1915 host_id INT,
1916 idc STRING,
1917 ts TIMESTAMP TIME INDEX NOT NULL NULL,
1918 cpu DOUBLE DEFAULT 0,
1919 memory DOUBLE,
1920 TIME INDEX (ts),
1921 PRIMARY KEY (host),
1922)
1923ENGINE=mito";
1924
1925 let result4 = ParserContext::create_with_dialect(
1926 sql4,
1927 &GreptimeDbDialect {},
1928 ParseOptions::default(),
1929 );
1930 assert!(result4.is_err());
1931
1932 let sql = r"
1933CREATE TABLE monitor (
1934 host_id INT,
1935 idc STRING,
1936 ts TIMESTAMP TIME INDEX DEFAULT CURRENT_TIMESTAMP,
1937 cpu DOUBLE DEFAULT 0,
1938 memory DOUBLE,
1939 TIME INDEX (ts),
1940 PRIMARY KEY (host),
1941)
1942ENGINE=mito";
1943
1944 let result =
1945 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1946 .unwrap();
1947
1948 if let Statement::CreateTable(c) = &result[0] {
1949 let tc = c.constraints[0].clone();
1950 match tc {
1951 TableConstraint::TimeIndex { column } => {
1952 assert_eq!(&column.value, "ts");
1953 }
1954 _ => panic!("should be time index constraint"),
1955 }
1956 let ts = c.columns[2].clone();
1957 assert_eq!(ts.name().to_string(), "ts");
1958 assert!(matches!(ts.options()[0].option, ColumnOption::Default(..)));
1959 assert_eq!(ts.options()[1].option, NotNull);
1960 } else {
1961 unreachable!("should be create table statement");
1962 }
1963 }
1964
1965 #[test]
1966 fn test_parse_partitions_with_error_syntax() {
1967 let sql = r"
1968CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
1969PARTITION COLUMNS(c, a) (
1970 a < 10,
1971 a > 10 AND a < 20,
1972 a > 20 AND c < 100,
1973 a > 20 AND c >= 100
1974)
1975ENGINE=mito";
1976 let result =
1977 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1978 assert!(result
1979 .unwrap_err()
1980 .output_msg()
1981 .contains("sql parser error: Expected: ON, found: COLUMNS"));
1982 }
1983
1984 #[test]
1985 fn test_parse_partitions_without_rule() {
1986 let sql = r"
1987CREATE TABLE rcx ( a INT, b STRING, c INT, d TIMESTAMP TIME INDEX )
1988PARTITION ON COLUMNS(c, a) ()
1989ENGINE=mito";
1990 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1991 .unwrap();
1992 }
1993
1994 #[test]
1995 fn test_parse_partitions_unreferenced_column() {
1996 let sql = r"
1997CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
1998PARTITION ON COLUMNS(c, a) (
1999 b = 'foo'
2000)
2001ENGINE=mito";
2002 let result =
2003 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2004 assert_eq!(
2005 result.unwrap_err().output_msg(),
2006 "Invalid SQL, error: Column \"b\" in rule expr is not referenced in PARTITION ON"
2007 );
2008 }
2009
2010 #[test]
2011 fn test_parse_partitions_not_binary_expr() {
2012 let sql = r"
2013CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2014PARTITION ON COLUMNS(c, a) (
2015 b
2016)
2017ENGINE=mito";
2018 let result =
2019 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2020 assert_eq!(
2021 result.unwrap_err().output_msg(),
2022 r#"Invalid SQL, error: Partition rule expr Identifier(Ident { value: "b", quote_style: None, span: Span(Location(4,5)..Location(4,6)) }) is not a binary expr"#
2023 );
2024 }
2025
2026 fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {
2027 assert_eq!(column.name.to_string(), name);
2028 assert_eq!(column.data_type.to_string(), data_type);
2029 }
2030
2031 #[test]
2032 pub fn test_parse_create_table() {
2033 let sql = r"create table demo(
2034 host string,
2035 ts timestamp,
2036 cpu float32 default 0,
2037 memory float64,
2038 TIME INDEX (ts),
2039 PRIMARY KEY(ts, host),
2040 ) engine=mito
2041 with(ttl='10s');
2042 ";
2043 let result =
2044 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2045 .unwrap();
2046 assert_eq!(1, result.len());
2047 match &result[0] {
2048 Statement::CreateTable(c) => {
2049 assert!(!c.if_not_exists);
2050 assert_eq!("demo", c.name.to_string());
2051 assert_eq!("mito", c.engine);
2052 assert_eq!(4, c.columns.len());
2053 let columns = &c.columns;
2054 assert_column_def(&columns[0].column_def, "host", "STRING");
2055 assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
2056 assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
2057 assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
2058
2059 let constraints = &c.constraints;
2060 assert_eq!(
2061 &constraints[0],
2062 &TableConstraint::TimeIndex {
2063 column: Ident::new("ts"),
2064 }
2065 );
2066 assert_eq!(
2067 &constraints[1],
2068 &TableConstraint::PrimaryKey {
2069 columns: vec![Ident::new("ts"), Ident::new("host")]
2070 }
2071 );
2072 assert_eq!(1, c.options.len());
2074 assert_eq!(
2075 [("ttl", "10s")].into_iter().collect::<HashMap<_, _>>(),
2076 c.options.to_str_map()
2077 );
2078 }
2079 _ => unreachable!(),
2080 }
2081 }
2082
2083 #[test]
2084 fn test_invalid_index_keys() {
2085 let sql = r"create table demo(
2086 host string,
2087 ts int64,
2088 cpu float64 default 0,
2089 memory float64,
2090 TIME INDEX (ts, host),
2091 PRIMARY KEY(ts, host)) engine=mito;
2092 ";
2093 let result =
2094 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2095 assert!(result.is_err());
2096 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2097 }
2098
2099 #[test]
2100 fn test_duplicated_time_index() {
2101 let sql = r"create table demo(
2102 host string,
2103 ts timestamp time index,
2104 t timestamp time index,
2105 cpu float64 default 0,
2106 memory float64,
2107 TIME INDEX (ts, host),
2108 PRIMARY KEY(ts, host)) engine=mito;
2109 ";
2110 let result =
2111 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2112 assert!(result.is_err());
2113 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2114
2115 let sql = r"create table demo(
2116 host string,
2117 ts timestamp time index,
2118 cpu float64 default 0,
2119 t timestamp,
2120 memory float64,
2121 TIME INDEX (t),
2122 PRIMARY KEY(ts, host)) engine=mito;
2123 ";
2124 let result =
2125 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2126 assert!(result.is_err());
2127 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2128 }
2129
2130 #[test]
2131 fn test_invalid_column_name() {
2132 let sql = "create table foo(user string, i timestamp time index)";
2133 let result =
2134 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2135 let err = result.unwrap_err().output_msg();
2136 assert!(err.contains("Cannot use keyword 'user' as column name"));
2137
2138 let sql = r#"
2140 create table foo("user" string, i timestamp time index)
2141 "#;
2142 let result =
2143 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2144 let _ = result.unwrap();
2145 }
2146
2147 #[test]
2148 fn test_incorrect_default_value_issue_3479() {
2149 let sql = r#"CREATE TABLE `ExcePTuRi`(
2150non TIMESTAMP(6) TIME INDEX,
2151`iUSTO` DOUBLE DEFAULT 0.047318541668048164
2152)"#;
2153 let result =
2154 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2155 .unwrap();
2156 assert_eq!(1, result.len());
2157 match &result[0] {
2158 Statement::CreateTable(c) => {
2159 assert_eq!(
2160 "`iUSTO` DOUBLE DEFAULT 0.047318541668048164",
2161 c.columns[1].to_string()
2162 );
2163 }
2164 _ => unreachable!(),
2165 }
2166 }
2167
2168 #[test]
2169 fn test_parse_create_view() {
2170 let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
2171
2172 let result =
2173 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2174 .unwrap();
2175 match &result[0] {
2176 Statement::CreateView(c) => {
2177 assert_eq!(c.to_string(), sql);
2178 assert!(!c.or_replace);
2179 assert!(!c.if_not_exists);
2180 assert_eq!("test", c.name.to_string());
2181 }
2182 _ => unreachable!(),
2183 }
2184
2185 let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS";
2186
2187 let result =
2188 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2189 .unwrap();
2190 match &result[0] {
2191 Statement::CreateView(c) => {
2192 assert_eq!(c.to_string(), sql);
2193 assert!(c.or_replace);
2194 assert!(c.if_not_exists);
2195 assert_eq!("test", c.name.to_string());
2196 }
2197 _ => unreachable!(),
2198 }
2199 }
2200
2201 #[test]
2202 fn test_parse_create_view_invalid_query() {
2203 let sql = "CREATE VIEW test AS DELETE from demo";
2204 let result =
2205 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2206 assert!(result.is_err());
2207 assert_matches!(result, Err(crate::error::Error::Syntax { .. }));
2208 }
2209
2210 #[test]
2211 fn test_parse_create_table_fulltext_options() {
2212 let sql1 = r"
2213CREATE TABLE log (
2214 ts TIMESTAMP TIME INDEX,
2215 msg TEXT FULLTEXT INDEX,
2216)";
2217 let result1 = ParserContext::create_with_dialect(
2218 sql1,
2219 &GreptimeDbDialect {},
2220 ParseOptions::default(),
2221 )
2222 .unwrap();
2223
2224 if let Statement::CreateTable(c) = &result1[0] {
2225 c.columns.iter().for_each(|col| {
2226 if col.name().value == "msg" {
2227 assert!(col
2228 .extensions
2229 .fulltext_index_options
2230 .as_ref()
2231 .unwrap()
2232 .is_empty());
2233 }
2234 });
2235 } else {
2236 panic!("should be create_table statement");
2237 }
2238
2239 let sql2 = r"
2240CREATE TABLE log (
2241 ts TIMESTAMP TIME INDEX,
2242 msg STRING FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false')
2243)";
2244 let result2 = ParserContext::create_with_dialect(
2245 sql2,
2246 &GreptimeDbDialect {},
2247 ParseOptions::default(),
2248 )
2249 .unwrap();
2250
2251 if let Statement::CreateTable(c) = &result2[0] {
2252 c.columns.iter().for_each(|col| {
2253 if col.name().value == "msg" {
2254 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2255 assert_eq!(options.len(), 2);
2256 assert_eq!(options.get("analyzer").unwrap(), "English");
2257 assert_eq!(options.get("case_sensitive").unwrap(), "false");
2258 }
2259 });
2260 } else {
2261 panic!("should be create_table statement");
2262 }
2263
2264 let sql3 = r"
2265CREATE TABLE log (
2266 ts TIMESTAMP TIME INDEX,
2267 msg1 TINYTEXT FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false'),
2268 msg2 CHAR(20) FULLTEXT INDEX WITH (analyzer='Chinese', case_sensitive='true')
2269)";
2270 let result3 = ParserContext::create_with_dialect(
2271 sql3,
2272 &GreptimeDbDialect {},
2273 ParseOptions::default(),
2274 )
2275 .unwrap();
2276
2277 if let Statement::CreateTable(c) = &result3[0] {
2278 c.columns.iter().for_each(|col| {
2279 if col.name().value == "msg1" {
2280 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2281 assert_eq!(options.len(), 2);
2282 assert_eq!(options.get("analyzer").unwrap(), "English");
2283 assert_eq!(options.get("case_sensitive").unwrap(), "false");
2284 } else if col.name().value == "msg2" {
2285 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2286 assert_eq!(options.len(), 2);
2287 assert_eq!(options.get("analyzer").unwrap(), "Chinese");
2288 assert_eq!(options.get("case_sensitive").unwrap(), "true");
2289 }
2290 });
2291 } else {
2292 panic!("should be create_table statement");
2293 }
2294 }
2295
2296 #[test]
2297 fn test_parse_create_table_fulltext_options_invalid_type() {
2298 let sql = r"
2299CREATE TABLE log (
2300 ts TIMESTAMP TIME INDEX,
2301 msg INT FULLTEXT INDEX,
2302)";
2303 let result =
2304 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2305 assert!(result.is_err());
2306 assert!(result
2307 .unwrap_err()
2308 .to_string()
2309 .contains("FULLTEXT index only supports string type"));
2310 }
2311
2312 #[test]
2313 fn test_parse_create_table_fulltext_options_duplicate() {
2314 let sql = r"
2315CREATE TABLE log (
2316 ts TIMESTAMP TIME INDEX,
2317 msg STRING FULLTEXT INDEX WITH (analyzer='English', analyzer='Chinese') FULLTEXT INDEX WITH (case_sensitive='false')
2318)";
2319 let result =
2320 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2321 assert!(result.is_err());
2322 assert!(result
2323 .unwrap_err()
2324 .to_string()
2325 .contains("duplicated FULLTEXT INDEX option"));
2326 }
2327
2328 #[test]
2329 fn test_parse_create_table_fulltext_options_invalid_option() {
2330 let sql = r"
2331CREATE TABLE log (
2332 ts TIMESTAMP TIME INDEX,
2333 msg STRING FULLTEXT INDEX WITH (analyzer='English', invalid_option='Chinese')
2334)";
2335 let result =
2336 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2337 assert!(result.is_err());
2338 assert!(result
2339 .unwrap_err()
2340 .to_string()
2341 .contains("invalid FULLTEXT INDEX option"));
2342 }
2343
2344 #[test]
2345 fn test_parse_create_table_skip_options() {
2346 let sql = r"
2347CREATE TABLE log (
2348 ts TIMESTAMP TIME INDEX,
2349 msg INT SKIPPING INDEX WITH (granularity='8192', type='bloom'),
2350)";
2351 let result =
2352 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2353 .unwrap();
2354
2355 if let Statement::CreateTable(c) = &result[0] {
2356 c.columns.iter().for_each(|col| {
2357 if col.name().value == "msg" {
2358 assert!(!col
2359 .extensions
2360 .skipping_index_options
2361 .as_ref()
2362 .unwrap()
2363 .is_empty());
2364 }
2365 });
2366 } else {
2367 panic!("should be create_table statement");
2368 }
2369
2370 let sql = r"
2371 CREATE TABLE log (
2372 ts TIMESTAMP TIME INDEX,
2373 msg INT SKIPPING INDEX,
2374 )";
2375 let result =
2376 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2377 .unwrap();
2378
2379 if let Statement::CreateTable(c) = &result[0] {
2380 c.columns.iter().for_each(|col| {
2381 if col.name().value == "msg" {
2382 assert!(col
2383 .extensions
2384 .skipping_index_options
2385 .as_ref()
2386 .unwrap()
2387 .is_empty());
2388 }
2389 });
2390 } else {
2391 panic!("should be create_table statement");
2392 }
2393 }
2394
2395 #[test]
2396 fn test_parse_create_view_with_columns() {
2397 let sql = "CREATE VIEW test () AS SELECT * FROM NUMBERS";
2398 let result =
2399 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2400 .unwrap();
2401
2402 match &result[0] {
2403 Statement::CreateView(c) => {
2404 assert_eq!(c.to_string(), "CREATE VIEW test AS SELECT * FROM NUMBERS");
2405 assert!(!c.or_replace);
2406 assert!(!c.if_not_exists);
2407 assert_eq!("test", c.name.to_string());
2408 }
2409 _ => unreachable!(),
2410 }
2411 assert_eq!(
2412 "CREATE VIEW test AS SELECT * FROM NUMBERS",
2413 result[0].to_string()
2414 );
2415
2416 let sql = "CREATE VIEW test (n1) AS SELECT * FROM NUMBERS";
2417 let result =
2418 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2419 .unwrap();
2420
2421 match &result[0] {
2422 Statement::CreateView(c) => {
2423 assert_eq!(c.to_string(), sql);
2424 assert!(!c.or_replace);
2425 assert!(!c.if_not_exists);
2426 assert_eq!("test", c.name.to_string());
2427 }
2428 _ => unreachable!(),
2429 }
2430 assert_eq!(sql, result[0].to_string());
2431
2432 let sql = "CREATE VIEW test (n1, n2) AS SELECT * FROM NUMBERS";
2433 let result =
2434 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2435 .unwrap();
2436
2437 match &result[0] {
2438 Statement::CreateView(c) => {
2439 assert_eq!(c.to_string(), sql);
2440 assert!(!c.or_replace);
2441 assert!(!c.if_not_exists);
2442 assert_eq!("test", c.name.to_string());
2443 }
2444 _ => unreachable!(),
2445 }
2446 assert_eq!(sql, result[0].to_string());
2447
2448 let sql = "CREATE VIEW test (n1 AS select * from demo";
2450 let result =
2451 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2452 assert!(result.is_err());
2453
2454 let sql = "CREATE VIEW test (n1, AS select * from demo";
2455 let result =
2456 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2457 assert!(result.is_err());
2458
2459 let sql = "CREATE VIEW test n1,n2) AS select * from demo";
2460 let result =
2461 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2462 assert!(result.is_err());
2463
2464 let sql = "CREATE VIEW test (1) AS select * from demo";
2465 let result =
2466 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2467 assert!(result.is_err());
2468
2469 let sql = "CREATE VIEW test (n1, select) AS select * from demo";
2471 let result =
2472 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2473 assert!(result.is_err());
2474 }
2475
2476 #[test]
2477 fn test_parse_column_extensions_vector() {
2478 let sql = "VECTOR(128)";
2479 let dialect = GenericDialect {};
2480 let mut tokenizer = Tokenizer::new(&dialect, sql);
2481 let tokens = tokenizer.tokenize().unwrap();
2482 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2483 let name = Ident::new("vec_col");
2484 let data_type =
2485 DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
2486 let mut extensions = ColumnExtensions::default();
2487
2488 let result =
2489 ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2490 assert!(result.is_ok());
2491 assert!(extensions.vector_options.is_some());
2492 let vector_options = extensions.vector_options.unwrap();
2493 assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some(&"128".to_string()));
2494 }
2495
2496 #[test]
2497 fn test_parse_column_extensions_vector_invalid() {
2498 let sql = "VECTOR()";
2499 let dialect = GenericDialect {};
2500 let mut tokenizer = Tokenizer::new(&dialect, sql);
2501 let tokens = tokenizer.tokenize().unwrap();
2502 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2503 let name = Ident::new("vec_col");
2504 let data_type = DataType::Custom(vec![Ident::new("VECTOR")].into(), vec![]);
2505 let mut extensions = ColumnExtensions::default();
2506
2507 let result =
2508 ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2509 assert!(result.is_err());
2510 }
2511
2512 #[test]
2513 fn test_parse_column_extensions_indices() {
2514 {
2516 let sql = "SKIPPING INDEX";
2517 let dialect = GenericDialect {};
2518 let mut tokenizer = Tokenizer::new(&dialect, sql);
2519 let tokens = tokenizer.tokenize().unwrap();
2520 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2521 let name = Ident::new("col");
2522 let data_type = DataType::String(None);
2523 let mut extensions = ColumnExtensions::default();
2524 let result = ParserContext::parse_column_extensions(
2525 &mut parser,
2526 &name,
2527 &data_type,
2528 &mut extensions,
2529 );
2530 assert!(result.is_ok());
2531 assert!(extensions.skipping_index_options.is_some());
2532 }
2533
2534 {
2536 let sql = "FULLTEXT INDEX WITH (analyzer = 'English', case_sensitive = 'true')";
2537 let dialect = GenericDialect {};
2538 let mut tokenizer = Tokenizer::new(&dialect, sql);
2539 let tokens = tokenizer.tokenize().unwrap();
2540 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2541 let name = Ident::new("text_col");
2542 let data_type = DataType::String(None);
2543 let mut extensions = ColumnExtensions::default();
2544 let result = ParserContext::parse_column_extensions(
2545 &mut parser,
2546 &name,
2547 &data_type,
2548 &mut extensions,
2549 );
2550 assert!(result.unwrap());
2551 assert!(extensions.fulltext_index_options.is_some());
2552 let fulltext_options = extensions.fulltext_index_options.unwrap();
2553 assert_eq!(
2554 fulltext_options.get("analyzer"),
2555 Some(&"English".to_string())
2556 );
2557 assert_eq!(
2558 fulltext_options.get("case_sensitive"),
2559 Some(&"true".to_string())
2560 );
2561 }
2562
2563 {
2565 let sql = "FULLTEXT INDEX WITH (analyzer = 'English')";
2566 let dialect = GenericDialect {};
2567 let mut tokenizer = Tokenizer::new(&dialect, sql);
2568 let tokens = tokenizer.tokenize().unwrap();
2569 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2570 let name = Ident::new("num_col");
2571 let data_type = DataType::Int(None); let mut extensions = ColumnExtensions::default();
2573 let result = ParserContext::parse_column_extensions(
2574 &mut parser,
2575 &name,
2576 &data_type,
2577 &mut extensions,
2578 );
2579 assert!(result.is_err());
2580 assert!(result
2581 .unwrap_err()
2582 .to_string()
2583 .contains("FULLTEXT index only supports string type"));
2584 }
2585
2586 {
2588 let sql = "FULLTEXT INDEX WITH (analyzer = 'Invalid', case_sensitive = 'true')";
2589 let dialect = GenericDialect {};
2590 let mut tokenizer = Tokenizer::new(&dialect, sql);
2591 let tokens = tokenizer.tokenize().unwrap();
2592 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2593 let name = Ident::new("text_col");
2594 let data_type = DataType::String(None);
2595 let mut extensions = ColumnExtensions::default();
2596 let result = ParserContext::parse_column_extensions(
2597 &mut parser,
2598 &name,
2599 &data_type,
2600 &mut extensions,
2601 );
2602 assert!(result.unwrap());
2603 }
2604
2605 {
2607 let sql = "INVERTED INDEX";
2608 let dialect = GenericDialect {};
2609 let mut tokenizer = Tokenizer::new(&dialect, sql);
2610 let tokens = tokenizer.tokenize().unwrap();
2611 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2612 let name = Ident::new("col");
2613 let data_type = DataType::String(None);
2614 let mut extensions = ColumnExtensions::default();
2615 let result = ParserContext::parse_column_extensions(
2616 &mut parser,
2617 &name,
2618 &data_type,
2619 &mut extensions,
2620 );
2621 assert!(result.is_ok());
2622 assert!(extensions.inverted_index_options.is_some());
2623 }
2624
2625 {
2627 let sql = "INVERTED INDEX WITH (analyzer = 'English')";
2628 let dialect = GenericDialect {};
2629 let mut tokenizer = Tokenizer::new(&dialect, sql);
2630 let tokens = tokenizer.tokenize().unwrap();
2631 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2632 let name = Ident::new("col");
2633 let data_type = DataType::String(None);
2634 let mut extensions = ColumnExtensions::default();
2635 let result = ParserContext::parse_column_extensions(
2636 &mut parser,
2637 &name,
2638 &data_type,
2639 &mut extensions,
2640 );
2641 assert!(result.is_err());
2642 assert!(result
2643 .unwrap_err()
2644 .to_string()
2645 .contains("INVERTED index doesn't support options"));
2646 }
2647
2648 {
2650 let sql = "SKIPPING INDEX FULLTEXT INDEX";
2651 let dialect = GenericDialect {};
2652 let mut tokenizer = Tokenizer::new(&dialect, sql);
2653 let tokens = tokenizer.tokenize().unwrap();
2654 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2655 let name = Ident::new("col");
2656 let data_type = DataType::String(None);
2657 let mut extensions = ColumnExtensions::default();
2658 let result = ParserContext::parse_column_extensions(
2659 &mut parser,
2660 &name,
2661 &data_type,
2662 &mut extensions,
2663 );
2664 assert!(result.unwrap());
2665 assert!(extensions.skipping_index_options.is_some());
2666 assert!(extensions.fulltext_index_options.is_some());
2667 }
2668 }
2669
2670 #[test]
2671 fn test_parse_interval_cast() {
2672 let s = "select '10s'::INTERVAL";
2673 let stmts =
2674 ParserContext::create_with_dialect(s, &GreptimeDbDialect {}, ParseOptions::default())
2675 .unwrap();
2676 assert_eq!("SELECT '10 seconds'::INTERVAL", &stmts[0].to_string());
2677 }
2678}