1mod json;
16#[cfg(feature = "enterprise")]
17pub mod trigger;
18
19use std::collections::HashMap;
20
21use arrow_buffer::IntervalMonthDayNano;
22use common_catalog::consts::default_engine;
23use datafusion_common::ScalarValue;
24use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
25use datatypes::data_type::ConcreteDataType;
26use itertools::Itertools;
27use snafu::{OptionExt, ResultExt, ensure};
28use sqlparser::ast::{
29 ColumnOption, ColumnOptionDef, DataType, Expr, KeyOrIndexDisplay, NullsDistinctOption,
30 PrimaryKeyConstraint, UniqueConstraint,
31};
32use sqlparser::dialect::keywords::Keyword;
33use sqlparser::keywords::ALL_KEYWORDS;
34use sqlparser::parser::IsOptional::Mandatory;
35use sqlparser::parser::{Parser, ParserError};
36use sqlparser::tokenizer::{Token, TokenWithSpan, Word};
37use table::requests::validate_database_option;
38
39use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
40use crate::error::{
41 self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidFlowQuerySnafu,
42 InvalidIntervalSnafu, InvalidSqlSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result,
43 SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu,
44};
45use crate::parser::{FLOW, ParserContext};
46use crate::parsers::tql_parser;
47use crate::parsers::utils::{
48 self, parse_with_options, validate_column_fulltext_create_option,
49 validate_column_skipping_index_create_option, validate_column_vector_index_create_option,
50};
51use crate::statements::create::{
52 Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
53 CreateTableLike, CreateView, Partitions, SqlOrTql, TableConstraint, VECTOR_OPT_DIM,
54};
55use crate::statements::statement::Statement;
56use crate::statements::transform::type_alias::get_data_type_by_alias_name;
57use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
58use crate::util::{OptionValue, location_to_index, parse_option_string};
59
60pub const ENGINE: &str = "ENGINE";
61pub const MAXVALUE: &str = "MAXVALUE";
62pub const SINK: &str = "SINK";
63pub const EXPIRE: &str = "EXPIRE";
64pub const AFTER: &str = "AFTER";
65pub const INVERTED: &str = "INVERTED";
66pub const SKIPPING: &str = "SKIPPING";
67pub const VECTOR: &str = "VECTOR";
68
69pub type RawIntervalExpr = String;
70
71impl<'a> ParserContext<'a> {
73 pub(crate) fn parse_create(&mut self) -> Result<Statement> {
74 match self.parser.peek_token().token {
75 Token::Word(w) => match w.keyword {
76 Keyword::TABLE => self.parse_create_table(),
77
78 Keyword::SCHEMA | Keyword::DATABASE => self.parse_create_database(),
79
80 Keyword::EXTERNAL => self.parse_create_external_table(),
81
82 Keyword::OR => {
83 let _ = self.parser.next_token();
84 self.parser
85 .expect_keyword(Keyword::REPLACE)
86 .context(SyntaxSnafu)?;
87 match self.parser.next_token().token {
88 Token::Word(w) => match w.keyword {
89 Keyword::VIEW => self.parse_create_view(true),
90 Keyword::NoKeyword => {
91 let uppercase = w.value.to_uppercase();
92 match uppercase.as_str() {
93 FLOW => self.parse_create_flow(true),
94 _ => self.unsupported(w.to_string()),
95 }
96 }
97 _ => self.unsupported(w.to_string()),
98 },
99 _ => self.unsupported(w.to_string()),
100 }
101 }
102
103 Keyword::VIEW => {
104 let _ = self.parser.next_token();
105 self.parse_create_view(false)
106 }
107
108 #[cfg(feature = "enterprise")]
109 Keyword::TRIGGER => {
110 let _ = self.parser.next_token();
111 self.parse_create_trigger()
112 }
113
114 Keyword::NoKeyword => {
115 let _ = self.parser.next_token();
116 let uppercase = w.value.to_uppercase();
117 match uppercase.as_str() {
118 FLOW => self.parse_create_flow(false),
119 _ => self.unsupported(w.to_string()),
120 }
121 }
122 _ => self.unsupported(w.to_string()),
123 },
124 unexpected => self.unsupported(unexpected.to_string()),
125 }
126 }
127
128 fn parse_create_view(&mut self, or_replace: bool) -> Result<Statement> {
130 let if_not_exists = self.parse_if_not_exist()?;
131 let view_name = self.intern_parse_table_name()?;
132
133 let columns = self.parse_view_columns()?;
134
135 self.parser
136 .expect_keyword(Keyword::AS)
137 .context(SyntaxSnafu)?;
138
139 let query = self.parse_query()?;
140
141 Ok(Statement::CreateView(CreateView {
142 name: view_name,
143 columns,
144 or_replace,
145 query: Box::new(query),
146 if_not_exists,
147 }))
148 }
149
150 fn parse_view_columns(&mut self) -> Result<Vec<Ident>> {
151 let mut columns = vec![];
152 if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
153 return Ok(columns);
154 }
155
156 loop {
157 let name = self.parse_column_name().context(SyntaxSnafu)?;
158
159 columns.push(name);
160
161 let comma = self.parser.consume_token(&Token::Comma);
162 if self.parser.consume_token(&Token::RParen) {
163 break;
165 } else if !comma {
166 return self.expected("',' or ')' after column name", self.parser.peek_token());
167 }
168 }
169
170 Ok(columns)
171 }
172
173 fn parse_create_external_table(&mut self) -> Result<Statement> {
174 let _ = self.parser.next_token();
175 self.parser
176 .expect_keyword(Keyword::TABLE)
177 .context(SyntaxSnafu)?;
178 let if_not_exists = self.parse_if_not_exist()?;
179 let table_name = self.intern_parse_table_name()?;
180 let (columns, constraints) = self.parse_columns()?;
181 if !columns.is_empty() {
182 validate_time_index(&columns, &constraints)?;
183 }
184
185 let engine = self.parse_table_engine(common_catalog::consts::FILE_ENGINE)?;
186 let options = self.parse_create_table_options()?;
187 Ok(Statement::CreateExternalTable(CreateExternalTable {
188 name: table_name,
189 columns,
190 constraints,
191 options,
192 if_not_exists,
193 engine,
194 }))
195 }
196
197 fn parse_create_database(&mut self) -> Result<Statement> {
198 let _ = self.parser.next_token();
199 let if_not_exists = self.parse_if_not_exist()?;
200 let database_name = self.parse_object_name().context(error::UnexpectedSnafu {
201 expected: "a database name",
202 actual: self.peek_token_as_string(),
203 })?;
204 let database_name = Self::canonicalize_object_name(database_name)?;
205
206 let options = self
207 .parser
208 .parse_options(Keyword::WITH)
209 .context(SyntaxSnafu)?
210 .into_iter()
211 .map(parse_option_string)
212 .collect::<Result<HashMap<String, OptionValue>>>()?;
213
214 for key in options.keys() {
215 ensure!(
216 validate_database_option(key),
217 InvalidDatabaseOptionSnafu { key: key.clone() }
218 );
219 }
220 if let Some(append_mode) = options.get("append_mode").and_then(|x| x.as_string())
221 && append_mode == "true"
222 && options.contains_key("merge_mode")
223 {
224 return InvalidDatabaseOptionSnafu {
225 key: "merge_mode".to_string(),
226 }
227 .fail();
228 }
229
230 Ok(Statement::CreateDatabase(CreateDatabase {
231 name: database_name,
232 if_not_exists,
233 options: OptionMap::new(options),
234 }))
235 }
236
237 fn parse_create_table(&mut self) -> Result<Statement> {
238 let _ = self.parser.next_token();
239
240 let if_not_exists = self.parse_if_not_exist()?;
241
242 let table_name = self.intern_parse_table_name()?;
243
244 if self.parser.parse_keyword(Keyword::LIKE) {
245 let source_name = self.intern_parse_table_name()?;
246
247 return Ok(Statement::CreateTableLike(CreateTableLike {
248 table_name,
249 source_name,
250 }));
251 }
252
253 let (columns, constraints) = self.parse_columns()?;
254 validate_time_index(&columns, &constraints)?;
255
256 let partitions = self.parse_partitions()?;
257 if let Some(partitions) = &partitions {
258 validate_partitions(&columns, partitions)?;
259 }
260
261 let engine = self.parse_table_engine(default_engine())?;
262 let options = self.parse_create_table_options()?;
263 let create_table = CreateTable {
264 if_not_exists,
265 name: table_name,
266 columns,
267 engine,
268 constraints,
269 options,
270 table_id: 0, partitions,
272 };
273
274 Ok(Statement::CreateTable(create_table))
275 }
276
277 fn parse_create_flow(&mut self, or_replace: bool) -> Result<Statement> {
279 let if_not_exists = self.parse_if_not_exist()?;
280
281 let flow_name = self.intern_parse_table_name()?;
282
283 if let Token::Word(word) = self.parser.peek_token().token
285 && word.value.eq_ignore_ascii_case(SINK)
286 {
287 self.parser.next_token();
288 } else {
289 Err(ParserError::ParserError(
290 "Expect `SINK` keyword".to_string(),
291 ))
292 .context(SyntaxSnafu)?
293 }
294 self.parser
295 .expect_keyword(Keyword::TO)
296 .context(SyntaxSnafu)?;
297
298 let output_table_name = self.intern_parse_table_name()?;
299
300 let expire_after = if let Token::Word(w1) = &self.parser.peek_token().token
301 && w1.value.eq_ignore_ascii_case(EXPIRE)
302 {
303 self.parser.next_token();
304 if let Token::Word(w2) = &self.parser.peek_token().token
305 && w2.value.eq_ignore_ascii_case(AFTER)
306 {
307 self.parser.next_token();
308 Some(self.parse_interval_no_month("EXPIRE AFTER")?)
309 } else {
310 None
311 }
312 } else {
313 None
314 };
315
316 let eval_interval = if self
317 .parser
318 .consume_tokens(&[Token::make_keyword("EVAL"), Token::make_keyword("INTERVAL")])
319 {
320 Some(self.parse_interval_no_month("EVAL INTERVAL")?)
321 } else {
322 None
323 };
324
325 let comment = if self.parser.parse_keyword(Keyword::COMMENT) {
326 match self.parser.next_token() {
327 TokenWithSpan {
328 token: Token::SingleQuotedString(value, ..),
329 ..
330 } => Some(value),
331 unexpected => {
332 return self
333 .parser
334 .expected("string", unexpected)
335 .context(SyntaxSnafu);
336 }
337 }
338 } else {
339 None
340 };
341
342 self.parser
343 .expect_keyword(Keyword::AS)
344 .context(SyntaxSnafu)?;
345
346 let query = Box::new(self.parse_flow_sql_or_tql(true)?);
347
348 Ok(Statement::CreateFlow(CreateFlow {
349 flow_name,
350 sink_table_name: output_table_name,
351 or_replace,
352 if_not_exists,
353 expire_after,
354 eval_interval,
355 comment,
356 query,
357 }))
358 }
359
360 fn parse_flow_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
361 let start_loc = self.parser.peek_token().span.start;
362 let start_index = location_to_index(self.sql, &start_loc);
363
364 let starts_with_with = matches!(
365 self.parser.peek_token().token,
366 Token::Word(w) if w.keyword == Keyword::WITH
367 );
368
369 let query = match self.parser.peek_token().token {
371 Token::Word(w) => match w.keyword {
372 Keyword::SELECT => self.parse_query(),
373 Keyword::WITH => self.parse_with_tql_with_now(require_now_expr),
374 Keyword::NoKeyword
375 if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
376 {
377 self.parse_tql(require_now_expr)
378 }
379
380 _ => self.unsupported(self.peek_token_as_string()),
381 },
382 _ => self.unsupported(self.peek_token_as_string()),
383 }?;
384
385 if starts_with_with {
386 let Statement::Query(query) = &query else {
387 return InvalidFlowQuerySnafu {
388 reason: "Expect a query after WITH".to_string(),
389 }
390 .fail();
391 };
392
393 if !utils::is_simple_tql_cte_query(query) {
394 return InvalidFlowQuerySnafu {
395 reason: "WITH is only supported for the simplest TQL CTE in CREATE FLOW"
396 .to_string(),
397 }
398 .fail();
399 }
400 }
401
402 let end_token = self.parser.peek_token();
403
404 let raw_query = if end_token == Token::EOF {
405 &self.sql[start_index..]
406 } else {
407 let end_loc = end_token.span.end;
408 let end_index = location_to_index(self.sql, &end_loc);
409 &self.sql[start_index..end_index.min(self.sql.len())]
410 };
411 let raw_query = raw_query.trim_end_matches(";");
412
413 let query = SqlOrTql::try_from_statement(query, raw_query)?;
414 Ok(query)
415 }
416
417 fn parse_interval_no_month(&mut self, context: &str) -> Result<i64> {
419 let interval = self.parse_interval_month_day_nano()?.0;
420 if interval.months != 0 {
421 return InvalidIntervalSnafu {
422 reason: format!("Interval with months is not allowed in {context}"),
423 }
424 .fail();
425 }
426 Ok(
427 interval.nanoseconds / 1_000_000_000
428 + interval.days as i64 * 60 * 60 * 24
429 + interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, )
433 }
434
435 fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> {
437 let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
438 let raw_interval_expr = interval_expr.to_string();
439 let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone(), false)?
440 .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
441 .ok()
442 .with_context(|| InvalidIntervalSnafu {
443 reason: format!("cannot cast {} to interval type", interval_expr),
444 })?;
445 if let ScalarValue::IntervalMonthDayNano(Some(interval)) = interval {
446 Ok((interval, raw_interval_expr))
447 } else {
448 unreachable!()
449 }
450 }
451
452 fn parse_if_not_exist(&mut self) -> Result<bool> {
453 match self.parser.peek_token().token {
454 Token::Word(w) if Keyword::IF != w.keyword => return Ok(false),
455 _ => {}
456 }
457
458 if self.parser.parse_keywords(&[Keyword::IF, Keyword::NOT]) {
459 return self
460 .parser
461 .expect_keyword(Keyword::EXISTS)
462 .map(|_| true)
463 .context(UnexpectedSnafu {
464 expected: "EXISTS",
465 actual: self.peek_token_as_string(),
466 });
467 }
468
469 if self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]) {
470 return UnsupportedSnafu { keyword: "EXISTS" }.fail();
471 }
472
473 Ok(false)
474 }
475
476 fn parse_create_table_options(&mut self) -> Result<OptionMap> {
477 parse_with_options(&mut self.parser)
478 }
479
480 fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
482 if !self.parser.parse_keyword(Keyword::PARTITION) {
483 return Ok(None);
484 }
485 self.parser
486 .expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
487 .context(error::UnexpectedSnafu {
488 expected: "ON, COLUMNS",
489 actual: self.peek_token_as_string(),
490 })?;
491
492 let raw_column_list = self
493 .parser
494 .parse_parenthesized_column_list(Mandatory, false)
495 .context(error::SyntaxSnafu)?;
496 let column_list = raw_column_list
497 .into_iter()
498 .map(Self::canonicalize_identifier)
499 .collect();
500
501 let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
502
503 Ok(Some(Partitions { column_list, exprs }))
504 }
505
506 fn parse_partition_entry(&mut self) -> Result<Expr> {
507 self.parser.parse_expr().context(error::SyntaxSnafu)
508 }
509
510 fn parse_comma_separated<T, F>(&mut self, mut f: F) -> Result<Vec<T>>
512 where
513 F: FnMut(&mut ParserContext<'a>) -> Result<T>,
514 {
515 self.parser
516 .expect_token(&Token::LParen)
517 .context(error::UnexpectedSnafu {
518 expected: "(",
519 actual: self.peek_token_as_string(),
520 })?;
521
522 let mut values = vec![];
523 while self.parser.peek_token() != Token::RParen {
524 values.push(f(self)?);
525 if !self.parser.consume_token(&Token::Comma) {
526 break;
527 }
528 }
529
530 self.parser
531 .expect_token(&Token::RParen)
532 .context(error::UnexpectedSnafu {
533 expected: ")",
534 actual: self.peek_token_as_string(),
535 })?;
536
537 Ok(values)
538 }
539
540 fn parse_columns(&mut self) -> Result<(Vec<Column>, Vec<TableConstraint>)> {
542 let mut columns = vec![];
543 let mut constraints = vec![];
544 if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
545 return Ok((columns, constraints));
546 }
547
548 loop {
549 if let Some(constraint) = self.parse_optional_table_constraint()? {
550 constraints.push(constraint);
551 } else if let Token::Word(_) = self.parser.peek_token().token {
552 self.parse_column(&mut columns, &mut constraints)?;
553 } else {
554 return self.expected(
555 "column name or constraint definition",
556 self.parser.peek_token(),
557 );
558 }
559 let comma = self.parser.consume_token(&Token::Comma);
560 if self.parser.consume_token(&Token::RParen) {
561 break;
563 } else if !comma {
564 return self.expected(
565 "',' or ')' after column definition",
566 self.parser.peek_token(),
567 );
568 }
569 }
570
571 Ok((columns, constraints))
572 }
573
574 fn parse_column(
575 &mut self,
576 columns: &mut Vec<Column>,
577 constraints: &mut Vec<TableConstraint>,
578 ) -> Result<()> {
579 let mut column = self.parse_column_def()?;
580
581 let mut time_index_opt_idx = None;
582 for (index, opt) in column.options().iter().enumerate() {
583 if let ColumnOption::DialectSpecific(tokens) = &opt.option
584 && matches!(
585 &tokens[..],
586 [
587 Token::Word(Word {
588 keyword: Keyword::TIME,
589 ..
590 }),
591 Token::Word(Word {
592 keyword: Keyword::INDEX,
593 ..
594 })
595 ]
596 )
597 {
598 ensure!(
599 time_index_opt_idx.is_none(),
600 InvalidColumnOptionSnafu {
601 name: column.name().to_string(),
602 msg: "duplicated time index",
603 }
604 );
605 time_index_opt_idx = Some(index);
606
607 let constraint = TableConstraint::TimeIndex {
608 column: Ident::new(column.name().value.clone()),
609 };
610 constraints.push(constraint);
611 }
612 }
613
614 if let Some(index) = time_index_opt_idx {
615 ensure!(
616 !column.options().contains(&ColumnOptionDef {
617 option: ColumnOption::Null,
618 name: None,
619 }),
620 InvalidColumnOptionSnafu {
621 name: column.name().to_string(),
622 msg: "time index column can't be null",
623 }
624 );
625
626 let data_type = get_unalias_type(column.data_type());
628 ensure!(
629 matches!(data_type, DataType::Timestamp(_, _)),
630 InvalidColumnOptionSnafu {
631 name: column.name().to_string(),
632 msg: "time index column data type should be timestamp",
633 }
634 );
635
636 let not_null_opt = ColumnOptionDef {
637 option: ColumnOption::NotNull,
638 name: None,
639 };
640
641 if !column.options().contains(¬_null_opt) {
642 column.mut_options().push(not_null_opt);
643 }
644
645 let _ = column.mut_options().remove(index);
646 }
647
648 columns.push(column);
649
650 Ok(())
651 }
652
653 fn parse_column_name(&mut self) -> std::result::Result<Ident, ParserError> {
655 let name = self.parser.parse_identifier()?;
656 if name.quote_style.is_none() &&
657 ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()
659 {
660 return Err(ParserError::ParserError(format!(
661 "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
662 &name.value
663 )));
664 }
665
666 Ok(name)
667 }
668
669 pub fn parse_column_def(&mut self) -> Result<Column> {
670 let name = self.parse_column_name().context(SyntaxSnafu)?;
671 let parser = &mut self.parser;
672
673 ensure!(
674 !(name.quote_style.is_none() &&
675 ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()),
677 InvalidSqlSnafu {
678 msg: format!(
679 "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
680 &name.value
681 ),
682 }
683 );
684
685 let mut extensions = ColumnExtensions::default();
686
687 let data_type = parser.parse_data_type().context(SyntaxSnafu)?;
688 if matches!(data_type, DataType::JSON) {
691 extensions.json_datatype_options = json::parse_json_datatype_options(parser)?;
692 }
693
694 let mut options = vec![];
695 loop {
696 if parser.parse_keyword(Keyword::CONSTRAINT) {
697 let name = Some(parser.parse_identifier().context(SyntaxSnafu)?);
698 if let Some(option) = Self::parse_optional_column_option(parser)? {
699 options.push(ColumnOptionDef { name, option });
700 } else {
701 return parser
702 .expected(
703 "constraint details after CONSTRAINT <name>",
704 parser.peek_token(),
705 )
706 .context(SyntaxSnafu);
707 }
708 } else if let Some(option) = Self::parse_optional_column_option(parser)? {
709 options.push(ColumnOptionDef { name: None, option });
710 } else if !Self::parse_column_extensions(parser, &name, &data_type, &mut extensions)? {
711 break;
712 };
713 }
714
715 Ok(Column {
716 column_def: ColumnDef {
717 name: Self::canonicalize_identifier(name),
718 data_type,
719 options,
720 },
721 extensions,
722 })
723 }
724
725 fn parse_optional_column_option(parser: &mut Parser<'_>) -> Result<Option<ColumnOption>> {
726 if parser.parse_keywords(&[Keyword::CHARACTER, Keyword::SET]) {
727 Ok(Some(ColumnOption::CharacterSet(
728 parser.parse_object_name(false).context(SyntaxSnafu)?,
729 )))
730 } else if parser.parse_keywords(&[Keyword::NOT, Keyword::NULL]) {
731 Ok(Some(ColumnOption::NotNull))
732 } else if parser.parse_keywords(&[Keyword::COMMENT]) {
733 match parser.next_token() {
734 TokenWithSpan {
735 token: Token::SingleQuotedString(value, ..),
736 ..
737 } => Ok(Some(ColumnOption::Comment(value))),
738 unexpected => parser.expected("string", unexpected).context(SyntaxSnafu),
739 }
740 } else if parser.parse_keyword(Keyword::NULL) {
741 Ok(Some(ColumnOption::Null))
742 } else if parser.parse_keyword(Keyword::DEFAULT) {
743 Ok(Some(ColumnOption::Default(
744 parser.parse_expr().context(SyntaxSnafu)?,
745 )))
746 } else if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
747 Ok(Some(ColumnOption::PrimaryKey(PrimaryKeyConstraint {
748 name: None,
749 index_name: None,
750 index_type: None,
751 columns: vec![],
752 index_options: vec![],
753 characteristics: None,
754 })))
755 } else if parser.parse_keyword(Keyword::UNIQUE) {
756 Ok(Some(ColumnOption::Unique(UniqueConstraint {
757 name: None,
758 index_name: None,
759 index_type_display: KeyOrIndexDisplay::None,
760 index_type: None,
761 columns: vec![],
762 index_options: vec![],
763 characteristics: None,
764 nulls_distinct: NullsDistinctOption::None,
765 })))
766 } else if parser.parse_keywords(&[Keyword::TIME, Keyword::INDEX]) {
767 Ok(Some(ColumnOption::DialectSpecific(vec![
769 Token::Word(Word {
770 value: "TIME".to_string(),
771 quote_style: None,
772 keyword: Keyword::TIME,
773 }),
774 Token::Word(Word {
775 value: "INDEX".to_string(),
776 quote_style: None,
777 keyword: Keyword::INDEX,
778 }),
779 ])))
780 } else {
781 Ok(None)
782 }
783 }
784
785 fn parse_column_extensions(
791 parser: &mut Parser<'_>,
792 column_name: &Ident,
793 column_type: &DataType,
794 column_extensions: &mut ColumnExtensions,
795 ) -> Result<bool> {
796 if let DataType::Custom(name, tokens) = column_type
797 && name.0.len() == 1
798 && &name.0[0].to_string_unquoted().to_uppercase() == "VECTOR"
799 {
800 ensure!(
801 tokens.len() == 1,
802 InvalidColumnOptionSnafu {
803 name: column_name.to_string(),
804 msg: "VECTOR type should have dimension",
805 }
806 );
807
808 let dimension =
809 tokens[0]
810 .parse::<u32>()
811 .ok()
812 .with_context(|| InvalidColumnOptionSnafu {
813 name: column_name.to_string(),
814 msg: "dimension should be a positive integer",
815 })?;
816
817 let options = OptionMap::from([(VECTOR_OPT_DIM.to_string(), dimension.to_string())]);
818 column_extensions.vector_options = Some(options);
819 }
820
821 let mut is_index_declared = false;
823
824 if let Token::Word(word) = parser.peek_token().token
826 && word.value.eq_ignore_ascii_case(SKIPPING)
827 {
828 parser.next_token();
829 ensure!(
831 parser.parse_keyword(Keyword::INDEX),
832 InvalidColumnOptionSnafu {
833 name: column_name.to_string(),
834 msg: "expect INDEX after SKIPPING keyword",
835 }
836 );
837 ensure!(
838 column_extensions.skipping_index_options.is_none(),
839 InvalidColumnOptionSnafu {
840 name: column_name.to_string(),
841 msg: "duplicated SKIPPING index option",
842 }
843 );
844
845 let options = parser
846 .parse_options(Keyword::WITH)
847 .context(error::SyntaxSnafu)?
848 .into_iter()
849 .map(parse_option_string)
850 .collect::<Result<Vec<_>>>()?;
851
852 for (key, _) in options.iter() {
853 ensure!(
854 validate_column_skipping_index_create_option(key),
855 InvalidColumnOptionSnafu {
856 name: column_name.to_string(),
857 msg: format!("invalid SKIPPING INDEX option: {key}"),
858 }
859 );
860 }
861
862 let options = OptionMap::new(options);
863 column_extensions.skipping_index_options = Some(options);
864 is_index_declared |= true;
865 }
866
867 if parser.parse_keyword(Keyword::FULLTEXT) {
869 ensure!(
871 parser.parse_keyword(Keyword::INDEX),
872 InvalidColumnOptionSnafu {
873 name: column_name.to_string(),
874 msg: "expect INDEX after FULLTEXT keyword",
875 }
876 );
877
878 ensure!(
879 column_extensions.fulltext_index_options.is_none(),
880 InvalidColumnOptionSnafu {
881 name: column_name.to_string(),
882 msg: "duplicated FULLTEXT INDEX option",
883 }
884 );
885
886 let column_type = get_unalias_type(column_type);
887 let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
888 ensure!(
889 data_type == ConcreteDataType::string_datatype(),
890 InvalidColumnOptionSnafu {
891 name: column_name.to_string(),
892 msg: "FULLTEXT index only supports string type",
893 }
894 );
895
896 let options = parser
897 .parse_options(Keyword::WITH)
898 .context(error::SyntaxSnafu)?
899 .into_iter()
900 .map(parse_option_string)
901 .collect::<Result<Vec<_>>>()?;
902
903 for (key, _) in options.iter() {
904 ensure!(
905 validate_column_fulltext_create_option(key),
906 InvalidColumnOptionSnafu {
907 name: column_name.to_string(),
908 msg: format!("invalid FULLTEXT INDEX option: {key}"),
909 }
910 );
911 }
912
913 let options = OptionMap::new(options);
914 column_extensions.fulltext_index_options = Some(options);
915 is_index_declared |= true;
916 }
917
918 if let Token::Word(word) = parser.peek_token().token
920 && word.value.eq_ignore_ascii_case(INVERTED)
921 {
922 parser.next_token();
923 ensure!(
925 parser.parse_keyword(Keyword::INDEX),
926 InvalidColumnOptionSnafu {
927 name: column_name.to_string(),
928 msg: "expect INDEX after INVERTED keyword",
929 }
930 );
931
932 ensure!(
933 column_extensions.inverted_index_options.is_none(),
934 InvalidColumnOptionSnafu {
935 name: column_name.to_string(),
936 msg: "duplicated INVERTED index option",
937 }
938 );
939
940 let with_token = parser.peek_token();
943 ensure!(
944 with_token.token
945 != Token::Word(Word {
946 value: "WITH".to_string(),
947 keyword: Keyword::WITH,
948 quote_style: None,
949 }),
950 InvalidColumnOptionSnafu {
951 name: column_name.to_string(),
952 msg: "INVERTED index doesn't support options",
953 }
954 );
955
956 column_extensions.inverted_index_options = Some(OptionMap::default());
957 is_index_declared |= true;
958 }
959
960 if let Token::Word(word) = parser.peek_token().token
962 && word.value.eq_ignore_ascii_case(VECTOR)
963 {
964 parser.next_token();
965 ensure!(
967 parser.parse_keyword(Keyword::INDEX),
968 InvalidColumnOptionSnafu {
969 name: column_name.to_string(),
970 msg: "expect INDEX after VECTOR keyword",
971 }
972 );
973
974 ensure!(
975 column_extensions.vector_index_options.is_none(),
976 InvalidColumnOptionSnafu {
977 name: column_name.to_string(),
978 msg: "duplicated VECTOR INDEX option",
979 }
980 );
981
982 let column_type = get_unalias_type(column_type);
984 let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
985 ensure!(
986 matches!(data_type, ConcreteDataType::Vector(_)),
987 InvalidColumnOptionSnafu {
988 name: column_name.to_string(),
989 msg: "VECTOR INDEX only supports Vector type columns",
990 }
991 );
992
993 let options = parser
994 .parse_options(Keyword::WITH)
995 .context(error::SyntaxSnafu)?
996 .into_iter()
997 .map(parse_option_string)
998 .collect::<Result<Vec<_>>>()?;
999
1000 for (key, _) in options.iter() {
1001 ensure!(
1002 validate_column_vector_index_create_option(key),
1003 InvalidColumnOptionSnafu {
1004 name: column_name.to_string(),
1005 msg: format!("invalid VECTOR INDEX option: {key}"),
1006 }
1007 );
1008 }
1009
1010 let options = OptionMap::new(options);
1011 column_extensions.vector_index_options = Some(options);
1012 is_index_declared |= true;
1013 }
1014
1015 Ok(is_index_declared)
1016 }
1017
1018 fn parse_optional_table_constraint(&mut self) -> Result<Option<TableConstraint>> {
1019 match self.parser.next_token() {
1020 TokenWithSpan {
1021 token: Token::Word(w),
1022 ..
1023 } if w.keyword == Keyword::PRIMARY => {
1024 self.parser
1025 .expect_keyword(Keyword::KEY)
1026 .context(error::UnexpectedSnafu {
1027 expected: "KEY",
1028 actual: self.peek_token_as_string(),
1029 })?;
1030 let raw_columns = self
1031 .parser
1032 .parse_parenthesized_column_list(Mandatory, false)
1033 .context(error::SyntaxSnafu)?;
1034 let columns = raw_columns
1035 .into_iter()
1036 .map(Self::canonicalize_identifier)
1037 .collect();
1038 Ok(Some(TableConstraint::PrimaryKey { columns }))
1039 }
1040 TokenWithSpan {
1041 token: Token::Word(w),
1042 ..
1043 } if w.keyword == Keyword::TIME => {
1044 self.parser
1045 .expect_keyword(Keyword::INDEX)
1046 .context(error::UnexpectedSnafu {
1047 expected: "INDEX",
1048 actual: self.peek_token_as_string(),
1049 })?;
1050
1051 let raw_columns = self
1052 .parser
1053 .parse_parenthesized_column_list(Mandatory, false)
1054 .context(error::SyntaxSnafu)?;
1055 let mut columns = raw_columns
1056 .into_iter()
1057 .map(Self::canonicalize_identifier)
1058 .collect::<Vec<_>>();
1059
1060 ensure!(
1061 columns.len() == 1,
1062 InvalidTimeIndexSnafu {
1063 msg: "it should contain only one column in time index",
1064 }
1065 );
1066
1067 Ok(Some(TableConstraint::TimeIndex {
1068 column: columns.pop().unwrap(),
1069 }))
1070 }
1071 _ => {
1072 self.parser.prev_token();
1073 Ok(None)
1074 }
1075 }
1076 }
1077
1078 fn parse_table_engine(&mut self, default: &str) -> Result<String> {
1080 if !self.consume_token(ENGINE) {
1081 return Ok(default.to_string());
1082 }
1083
1084 self.parser
1085 .expect_token(&Token::Eq)
1086 .context(error::UnexpectedSnafu {
1087 expected: "=",
1088 actual: self.peek_token_as_string(),
1089 })?;
1090
1091 let token = self.parser.next_token();
1092 if let Token::Word(w) = token.token {
1093 Ok(w.value)
1094 } else {
1095 self.expected("'Engine' is missing", token)
1096 }
1097 }
1098}
1099
1100fn validate_time_index(columns: &[Column], constraints: &[TableConstraint]) -> Result<()> {
1101 let time_index_constraints: Vec<_> = constraints
1102 .iter()
1103 .filter_map(|c| match c {
1104 TableConstraint::TimeIndex { column } => Some(column),
1105 _ => None,
1106 })
1107 .unique()
1108 .collect();
1109
1110 ensure!(!time_index_constraints.is_empty(), MissingTimeIndexSnafu);
1111 ensure!(
1112 time_index_constraints.len() == 1,
1113 InvalidTimeIndexSnafu {
1114 msg: format!(
1115 "expected only one time index constraint but actual {}",
1116 time_index_constraints.len()
1117 ),
1118 }
1119 );
1120
1121 let time_index_column_ident = &time_index_constraints[0];
1124 let time_index_column = columns
1125 .iter()
1126 .find(|c| c.name().value == *time_index_column_ident.value)
1127 .with_context(|| InvalidTimeIndexSnafu {
1128 msg: format!(
1129 "time index column {} not found in columns",
1130 time_index_column_ident
1131 ),
1132 })?;
1133
1134 let time_index_data_type = get_unalias_type(time_index_column.data_type());
1135 ensure!(
1136 matches!(time_index_data_type, DataType::Timestamp(_, _)),
1137 InvalidColumnOptionSnafu {
1138 name: time_index_column.name().to_string(),
1139 msg: "time index column data type should be timestamp",
1140 }
1141 );
1142
1143 Ok(())
1144}
1145
1146fn get_unalias_type(data_type: &DataType) -> DataType {
1147 match data_type {
1148 DataType::Custom(name, tokens) if name.0.len() == 1 && tokens.is_empty() => {
1149 if let Some(real_type) =
1150 get_data_type_by_alias_name(name.0[0].to_string_unquoted().as_str())
1151 {
1152 real_type
1153 } else {
1154 data_type.clone()
1155 }
1156 }
1157 _ => data_type.clone(),
1158 }
1159}
1160
1161fn validate_partitions(columns: &[Column], partitions: &Partitions) -> Result<()> {
1162 let partition_columns = ensure_partition_columns_defined(columns, partitions)?;
1163
1164 ensure_exprs_are_binary(&partitions.exprs, &partition_columns)?;
1165
1166 Ok(())
1167}
1168
1169fn ensure_exprs_are_binary(exprs: &[Expr], columns: &[&Column]) -> Result<()> {
1171 for expr in exprs {
1172 if let Expr::BinaryOp { left, op: _, right } = expr {
1174 ensure_one_expr(left, columns)?;
1175 ensure_one_expr(right, columns)?;
1176 } else {
1177 return error::InvalidSqlSnafu {
1178 msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1179 }
1180 .fail();
1181 }
1182 }
1183 Ok(())
1184}
1185
1186fn ensure_one_expr(expr: &Expr, columns: &[&Column]) -> Result<()> {
1190 match expr {
1191 Expr::BinaryOp { left, op: _, right } => {
1192 ensure_one_expr(left, columns)?;
1193 ensure_one_expr(right, columns)?;
1194 Ok(())
1195 }
1196 Expr::Identifier(ident) => {
1197 let column_name = &ident.value;
1198 ensure!(
1199 columns.iter().any(|c| &c.name().value == column_name),
1200 error::InvalidSqlSnafu {
1201 msg: format!(
1202 "Column {:?} in rule expr is not referenced in PARTITION ON",
1203 column_name
1204 ),
1205 }
1206 );
1207 Ok(())
1208 }
1209 Expr::Value(_) => Ok(()),
1210 Expr::UnaryOp { expr, .. } => {
1211 ensure_one_expr(expr, columns)?;
1212 Ok(())
1213 }
1214 _ => error::InvalidSqlSnafu {
1215 msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1216 }
1217 .fail(),
1218 }
1219}
1220
1221fn ensure_partition_columns_defined<'a>(
1223 columns: &'a [Column],
1224 partitions: &'a Partitions,
1225) -> Result<Vec<&'a Column>> {
1226 partitions
1227 .column_list
1228 .iter()
1229 .map(|x| {
1230 let x = ParserContext::canonicalize_identifier(x.clone());
1231 columns
1234 .iter()
1235 .find(|c| *c.name().value == x.value)
1236 .context(error::InvalidSqlSnafu {
1237 msg: format!("Partition column {:?} not defined", x.value),
1238 })
1239 })
1240 .collect::<Result<Vec<&Column>>>()
1241}
1242
1243#[cfg(test)]
1244mod tests {
1245 use std::assert_matches::assert_matches;
1246 use std::collections::HashMap;
1247
1248 use common_catalog::consts::FILE_ENGINE;
1249 use common_error::ext::ErrorExt;
1250 use sqlparser::ast::ColumnOption::NotNull;
1251 use sqlparser::ast::{BinaryOperator, Expr, ObjectName, ObjectNamePart, Value};
1252 use sqlparser::dialect::GenericDialect;
1253 use sqlparser::tokenizer::Tokenizer;
1254
1255 use super::*;
1256 use crate::dialect::GreptimeDbDialect;
1257 use crate::parser::ParseOptions;
1258
1259 #[test]
1260 fn test_parse_create_table_like() {
1261 let sql = "CREATE TABLE t1 LIKE t2";
1262 let stmts =
1263 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1264 .unwrap();
1265
1266 assert_eq!(1, stmts.len());
1267 match &stmts[0] {
1268 Statement::CreateTableLike(c) => {
1269 assert_eq!(c.table_name.to_string(), "t1");
1270 assert_eq!(c.source_name.to_string(), "t2");
1271 }
1272 _ => unreachable!(),
1273 }
1274 }
1275
1276 #[test]
1277 fn test_validate_external_table_options() {
1278 let sql = "CREATE EXTERNAL TABLE city (
1279 host string,
1280 ts timestamp,
1281 cpu float64 default 0,
1282 memory float64,
1283 TIME INDEX (ts),
1284 PRIMARY KEY(ts, host)
1285 ) with(location='/var/data/city.csv',format='csv',foo='bar');";
1286
1287 let result =
1288 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1289 assert!(matches!(
1290 result,
1291 Err(error::Error::InvalidTableOption { .. })
1292 ));
1293 }
1294
1295 #[test]
1296 fn test_parse_create_external_table() {
1297 struct Test<'a> {
1298 sql: &'a str,
1299 expected_table_name: &'a str,
1300 expected_options: HashMap<&'a str, &'a str>,
1301 expected_engine: &'a str,
1302 expected_if_not_exist: bool,
1303 }
1304
1305 let tests = [
1306 Test {
1307 sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
1308 expected_table_name: "city",
1309 expected_options: HashMap::from([
1310 ("location", "/var/data/city.csv"),
1311 ("format", "csv"),
1312 ]),
1313 expected_engine: FILE_ENGINE,
1314 expected_if_not_exist: false,
1315 },
1316 Test {
1317 sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');",
1318 expected_table_name: "city",
1319 expected_options: HashMap::from([
1320 ("location", "/var/data/city.csv"),
1321 ("format", "csv"),
1322 ]),
1323 expected_engine: "foo",
1324 expected_if_not_exist: true,
1325 },
1326 Test {
1327 sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv','compaction.type'='bar');",
1328 expected_table_name: "city",
1329 expected_options: HashMap::from([
1330 ("location", "/var/data/city.csv"),
1331 ("format", "csv"),
1332 ("compaction.type", "bar"),
1333 ]),
1334 expected_engine: "foo",
1335 expected_if_not_exist: true,
1336 },
1337 ];
1338
1339 for test in tests {
1340 let stmts = ParserContext::create_with_dialect(
1341 test.sql,
1342 &GreptimeDbDialect {},
1343 ParseOptions::default(),
1344 )
1345 .unwrap();
1346 assert_eq!(1, stmts.len());
1347 match &stmts[0] {
1348 Statement::CreateExternalTable(c) => {
1349 assert_eq!(c.name.to_string(), test.expected_table_name.to_string());
1350 assert_eq!(c.options.to_str_map(), test.expected_options);
1351 assert_eq!(c.if_not_exists, test.expected_if_not_exist);
1352 assert_eq!(c.engine, test.expected_engine);
1353 }
1354 _ => unreachable!(),
1355 }
1356 }
1357 }
1358
1359 #[test]
1360 fn test_parse_create_external_table_with_schema() {
1361 let sql = "CREATE EXTERNAL TABLE city (
1362 host string,
1363 ts timestamp,
1364 cpu float32 default 0,
1365 memory float64,
1366 TIME INDEX (ts),
1367 PRIMARY KEY(ts, host),
1368 ) with(location='/var/data/city.csv',format='csv');";
1369
1370 let options = HashMap::from([("location", "/var/data/city.csv"), ("format", "csv")]);
1371
1372 let stmts =
1373 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1374 .unwrap();
1375 assert_eq!(1, stmts.len());
1376 match &stmts[0] {
1377 Statement::CreateExternalTable(c) => {
1378 assert_eq!(c.name.to_string(), "city");
1379 assert_eq!(c.options.to_str_map(), options);
1380
1381 let columns = &c.columns;
1382 assert_column_def(&columns[0].column_def, "host", "STRING");
1383 assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
1384 assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
1385 assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
1386
1387 let constraints = &c.constraints;
1388 assert_eq!(
1389 &constraints[0],
1390 &TableConstraint::TimeIndex {
1391 column: Ident::new("ts"),
1392 }
1393 );
1394 assert_eq!(
1395 &constraints[1],
1396 &TableConstraint::PrimaryKey {
1397 columns: vec![Ident::new("ts"), Ident::new("host")]
1398 }
1399 );
1400 }
1401 _ => unreachable!(),
1402 }
1403 }
1404
1405 #[test]
1406 fn test_parse_create_database() {
1407 let sql = "create database";
1408 let result =
1409 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1410 assert!(
1411 result
1412 .unwrap_err()
1413 .to_string()
1414 .contains("Unexpected token while parsing SQL statement")
1415 );
1416
1417 let sql = "create database prometheus";
1418 let stmts =
1419 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1420 .unwrap();
1421
1422 assert_eq!(1, stmts.len());
1423 match &stmts[0] {
1424 Statement::CreateDatabase(c) => {
1425 assert_eq!(c.name.to_string(), "prometheus");
1426 assert!(!c.if_not_exists);
1427 }
1428 _ => unreachable!(),
1429 }
1430
1431 let sql = "create database if not exists prometheus";
1432 let stmts =
1433 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1434 .unwrap();
1435
1436 assert_eq!(1, stmts.len());
1437 match &stmts[0] {
1438 Statement::CreateDatabase(c) => {
1439 assert_eq!(c.name.to_string(), "prometheus");
1440 assert!(c.if_not_exists);
1441 }
1442 _ => unreachable!(),
1443 }
1444
1445 let sql = "CREATE DATABASE `fOo`";
1446 let result =
1447 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1448 let stmts = result.unwrap();
1449 match &stmts.last().unwrap() {
1450 Statement::CreateDatabase(c) => {
1451 assert_eq!(c.name, vec![Ident::with_quote('`', "fOo")].into());
1452 assert!(!c.if_not_exists);
1453 }
1454 _ => unreachable!(),
1455 }
1456
1457 let sql = "CREATE DATABASE prometheus with (ttl='1h');";
1458 let result =
1459 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1460 let stmts = result.unwrap();
1461 match &stmts[0] {
1462 Statement::CreateDatabase(c) => {
1463 assert_eq!(c.name.to_string(), "prometheus");
1464 assert!(!c.if_not_exists);
1465 assert_eq!(c.options.get("ttl").unwrap(), "1h");
1466 }
1467 _ => unreachable!(),
1468 }
1469 }
1470
1471 #[test]
1472 fn test_parse_create_flow_more_testcases() {
1473 use pretty_assertions::assert_eq;
1474 fn parse_create_flow(sql: &str) -> CreateFlow {
1475 let stmts = ParserContext::create_with_dialect(
1476 sql,
1477 &GreptimeDbDialect {},
1478 ParseOptions::default(),
1479 )
1480 .unwrap();
1481 assert_eq!(1, stmts.len());
1482 match &stmts[0] {
1483 Statement::CreateFlow(c) => c.clone(),
1484 _ => unreachable!(),
1485 }
1486 }
1487 struct CreateFlowWoutQuery {
1488 pub flow_name: ObjectName,
1490 pub sink_table_name: ObjectName,
1492 pub or_replace: bool,
1494 pub if_not_exists: bool,
1496 pub expire_after: Option<i64>,
1499 pub comment: Option<String>,
1501 }
1502 let testcases = vec![
1503 (
1504 r"
1505CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1506SINK TO schema_1.table_1
1507EXPIRE AFTER INTERVAL '5 minutes'
1508COMMENT 'test comment'
1509AS
1510SELECT max(c1), min(c2) FROM schema_2.table_2;",
1511 CreateFlowWoutQuery {
1512 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1513 sink_table_name: ObjectName::from(vec![
1514 Ident::new("schema_1"),
1515 Ident::new("table_1"),
1516 ]),
1517 or_replace: true,
1518 if_not_exists: true,
1519 expire_after: Some(300),
1520 comment: Some("test comment".to_string()),
1521 },
1522 ),
1523 (
1524 r"
1525CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1526SINK TO schema_1.table_1
1527EXPIRE AFTER INTERVAL '300 s'
1528COMMENT 'test comment'
1529AS
1530SELECT max(c1), min(c2) FROM schema_2.table_2;",
1531 CreateFlowWoutQuery {
1532 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1533 sink_table_name: ObjectName::from(vec![
1534 Ident::new("schema_1"),
1535 Ident::new("table_1"),
1536 ]),
1537 or_replace: true,
1538 if_not_exists: true,
1539 expire_after: Some(300),
1540 comment: Some("test comment".to_string()),
1541 },
1542 ),
1543 (
1544 r"
1545CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1546SINK TO schema_1.table_1
1547EXPIRE AFTER '5 minutes'
1548COMMENT 'test comment'
1549AS
1550SELECT max(c1), min(c2) FROM schema_2.table_2;",
1551 CreateFlowWoutQuery {
1552 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1553 sink_table_name: ObjectName::from(vec![
1554 Ident::new("schema_1"),
1555 Ident::new("table_1"),
1556 ]),
1557 or_replace: true,
1558 if_not_exists: true,
1559 expire_after: Some(300),
1560 comment: Some("test comment".to_string()),
1561 },
1562 ),
1563 (
1564 r"
1565CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1566SINK TO schema_1.table_1
1567EXPIRE AFTER '300 s'
1568COMMENT 'test comment'
1569AS
1570SELECT max(c1), min(c2) FROM schema_2.table_2;",
1571 CreateFlowWoutQuery {
1572 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1573 sink_table_name: ObjectName::from(vec![
1574 Ident::new("schema_1"),
1575 Ident::new("table_1"),
1576 ]),
1577 or_replace: true,
1578 if_not_exists: true,
1579 expire_after: Some(300),
1580 comment: Some("test comment".to_string()),
1581 },
1582 ),
1583 (
1584 r"
1585CREATE FLOW `task_2`
1586SINK TO schema_1.table_1
1587EXPIRE AFTER '2 days 1h 2 min'
1588AS
1589SELECT max(c1), min(c2) FROM schema_2.table_2;",
1590 CreateFlowWoutQuery {
1591 flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_2")]),
1592 sink_table_name: ObjectName::from(vec![
1593 Ident::new("schema_1"),
1594 Ident::new("table_1"),
1595 ]),
1596 or_replace: false,
1597 if_not_exists: false,
1598 expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1599 comment: None,
1600 },
1601 ),
1602 (
1603 r"
1604create flow `task_3`
1605sink to schema_1.table_1
1606expire after '10 minutes'
1607as
1608select max(c1), min(c2) from schema_2.table_2;",
1609 CreateFlowWoutQuery {
1610 flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_3")]),
1611 sink_table_name: ObjectName::from(vec![
1612 Ident::new("schema_1"),
1613 Ident::new("table_1"),
1614 ]),
1615 or_replace: false,
1616 if_not_exists: false,
1617 expire_after: Some(600), comment: None,
1619 },
1620 ),
1621 (
1622 r"
1623create or replace flow if not exists task_4
1624sink to schema_1.table_1
1625expire after interval '2 hours'
1626comment 'lowercase test'
1627as
1628select max(c1), min(c2) from schema_2.table_2;",
1629 CreateFlowWoutQuery {
1630 flow_name: ObjectName::from(vec![Ident::new("task_4")]),
1631 sink_table_name: ObjectName::from(vec![
1632 Ident::new("schema_1"),
1633 Ident::new("table_1"),
1634 ]),
1635 or_replace: true,
1636 if_not_exists: true,
1637 expire_after: Some(7200), comment: Some("lowercase test".to_string()),
1639 },
1640 ),
1641 ];
1642
1643 for (sql, expected) in testcases {
1644 let create_task = parse_create_flow(sql);
1645
1646 let expected = CreateFlow {
1647 flow_name: expected.flow_name,
1648 sink_table_name: expected.sink_table_name,
1649 or_replace: expected.or_replace,
1650 if_not_exists: expected.if_not_exists,
1651 expire_after: expected.expire_after,
1652 eval_interval: None,
1653 comment: expected.comment,
1654 query: create_task.query.clone(),
1656 };
1657
1658 assert_eq!(create_task, expected, "input sql is:\n{sql}");
1659 let show_create = create_task.to_string();
1660 let recreated = parse_create_flow(&show_create);
1661 assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1662 }
1663 }
1664
1665 #[test]
1666 fn test_parse_create_flow() {
1667 use pretty_assertions::assert_eq;
1668 fn parse_create_flow(sql: &str) -> CreateFlow {
1669 let stmts = ParserContext::create_with_dialect(
1670 sql,
1671 &GreptimeDbDialect {},
1672 ParseOptions::default(),
1673 )
1674 .unwrap();
1675 assert_eq!(1, stmts.len());
1676 match &stmts[0] {
1677 Statement::CreateFlow(c) => c.clone(),
1678 _ => panic!("{:?}", stmts[0]),
1679 }
1680 }
1681 struct CreateFlowWoutQuery {
1682 pub flow_name: ObjectName,
1684 pub sink_table_name: ObjectName,
1686 pub or_replace: bool,
1688 pub if_not_exists: bool,
1690 pub expire_after: Option<i64>,
1693 pub eval_interval: Option<i64>,
1697 pub comment: Option<String>,
1699 }
1700
1701 let testcases = vec![
1703 (
1704 r"
1705CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1706SINK TO schema_1.table_1
1707EXPIRE AFTER INTERVAL '5 minutes'
1708COMMENT 'test comment'
1709AS
1710SELECT max(c1), min(c2) FROM schema_2.table_2;",
1711 CreateFlowWoutQuery {
1712 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1713 sink_table_name: ObjectName(vec![
1714 ObjectNamePart::Identifier(Ident::new("schema_1")),
1715 ObjectNamePart::Identifier(Ident::new("table_1")),
1716 ]),
1717 or_replace: true,
1718 if_not_exists: true,
1719 expire_after: Some(300),
1720 eval_interval: None,
1721 comment: Some("test comment".to_string()),
1722 },
1723 ),
1724 (
1725 r"
1726CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1727SINK TO schema_1.table_1
1728EXPIRE AFTER INTERVAL '300 s'
1729COMMENT 'test comment'
1730AS
1731SELECT max(c1), min(c2) FROM schema_2.table_2;",
1732 CreateFlowWoutQuery {
1733 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1734 sink_table_name: ObjectName(vec![
1735 ObjectNamePart::Identifier(Ident::new("schema_1")),
1736 ObjectNamePart::Identifier(Ident::new("table_1")),
1737 ]),
1738 or_replace: true,
1739 if_not_exists: true,
1740 expire_after: Some(300),
1741 eval_interval: None,
1742 comment: Some("test comment".to_string()),
1743 },
1744 ),
1745 (
1746 r"
1747CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1748SINK TO schema_1.table_1
1749EXPIRE AFTER '5 minutes'
1750EVAL INTERVAL '10 seconds'
1751COMMENT 'test comment'
1752AS
1753SELECT max(c1), min(c2) FROM schema_2.table_2;",
1754 CreateFlowWoutQuery {
1755 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1756 sink_table_name: ObjectName(vec![
1757 ObjectNamePart::Identifier(Ident::new("schema_1")),
1758 ObjectNamePart::Identifier(Ident::new("table_1")),
1759 ]),
1760 or_replace: true,
1761 if_not_exists: true,
1762 expire_after: Some(300),
1763 eval_interval: Some(10),
1764 comment: Some("test comment".to_string()),
1765 },
1766 ),
1767 (
1768 r"
1769CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1770SINK TO schema_1.table_1
1771EXPIRE AFTER '5 minutes'
1772EVAL INTERVAL INTERVAL '10 seconds'
1773COMMENT 'test comment'
1774AS
1775SELECT max(c1), min(c2) FROM schema_2.table_2;",
1776 CreateFlowWoutQuery {
1777 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1778 sink_table_name: ObjectName(vec![
1779 ObjectNamePart::Identifier(Ident::new("schema_1")),
1780 ObjectNamePart::Identifier(Ident::new("table_1")),
1781 ]),
1782 or_replace: true,
1783 if_not_exists: true,
1784 expire_after: Some(300),
1785 eval_interval: Some(10),
1786 comment: Some("test comment".to_string()),
1787 },
1788 ),
1789 (
1790 r"
1791CREATE FLOW `task_2`
1792SINK TO schema_1.table_1
1793EXPIRE AFTER '2 days 1h 2 min'
1794AS
1795SELECT max(c1), min(c2) FROM schema_2.table_2;",
1796 CreateFlowWoutQuery {
1797 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::with_quote(
1798 '`', "task_2",
1799 ))]),
1800 sink_table_name: ObjectName(vec![
1801 ObjectNamePart::Identifier(Ident::new("schema_1")),
1802 ObjectNamePart::Identifier(Ident::new("table_1")),
1803 ]),
1804 or_replace: false,
1805 if_not_exists: false,
1806 expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1807 eval_interval: None,
1808 comment: None,
1809 },
1810 ),
1811 ];
1812
1813 for (sql, expected) in testcases {
1814 let create_task = parse_create_flow(sql);
1815
1816 let expected = CreateFlow {
1817 flow_name: expected.flow_name,
1818 sink_table_name: expected.sink_table_name,
1819 or_replace: expected.or_replace,
1820 if_not_exists: expected.if_not_exists,
1821 expire_after: expected.expire_after,
1822 eval_interval: expected.eval_interval,
1823 comment: expected.comment,
1824 query: create_task.query.clone(),
1826 };
1827
1828 assert_eq!(create_task, expected, "input sql is:\n{sql}");
1829 let show_create = create_task.to_string();
1830 let recreated = parse_create_flow(&show_create);
1831 assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1832 }
1833 }
1834
1835 #[test]
1836 fn test_parse_create_flow_with_tql_cte_query() {
1837 let sql = r#"
1838CREATE FLOW calc_reqs_cte
1839SINK TO cnt_reqs_cte
1840EVAL INTERVAL '1m'
1841AS
1842WITH tql(the_timestamp, the_value) AS (
1843 TQL EVAL (now() - '1m'::interval, now(), '5s') metric
1844)
1845SELECT * FROM tql;
1846"#;
1847
1848 let stmts =
1849 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1850 .unwrap();
1851 assert_eq!(1, stmts.len());
1852 let Statement::CreateFlow(create_flow) = &stmts[0] else {
1853 panic!("unexpected stmt: {:?}", stmts[0]);
1854 };
1855
1856 let query = create_flow.query.to_string();
1857 assert!(query.to_uppercase().contains("WITH"));
1858 assert!(query.to_uppercase().contains("TQL EVAL"));
1859 }
1860
1861 #[test]
1862 fn test_parse_create_flow_with_sql_cte_is_unsupported() {
1863 let sql = r#"
1864CREATE FLOW f
1865SINK TO s
1866AS
1867WITH cte AS (SELECT 1) SELECT * FROM cte;
1868"#;
1869
1870 let err =
1871 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1872 .unwrap_err();
1873 let msg = err.to_string();
1874 assert!(msg.to_uppercase().contains("WITH"), "err: {msg}");
1875 }
1876
1877 #[test]
1878 fn test_parse_create_flow_with_tql_cte_requires_now_expr() {
1879 let sql = r#"
1880CREATE FLOW f
1881SINK TO s
1882EVAL INTERVAL '1m'
1883AS
1884WITH tql(ts, val) AS (
1885 TQL EVAL (0, 15, '5s') metric
1886)
1887SELECT * FROM tql;
1888"#;
1889
1890 let err =
1891 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1892 .unwrap_err();
1893
1894 let msg = format!("{err:?}");
1895 assert!(
1896 msg.contains("Expected expression containing `now()`"),
1897 "unexpected err: {msg}"
1898 );
1899 }
1900
1901 #[test]
1902 fn test_parse_create_flow_with_tql_cte_non_select_star_is_unsupported() {
1903 let sql = r#"
1904CREATE FLOW f
1905SINK TO s
1906AS
1907WITH tql(ts, val) AS (
1908 TQL EVAL (now() - '1m'::interval, now(), '5s') metric
1909)
1910SELECT ts FROM tql;
1911"#;
1912
1913 let err =
1914 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1915 .unwrap_err();
1916 assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
1917 }
1918
1919 #[test]
1920 fn test_parse_create_flow_with_tql_cte_filter_is_unsupported() {
1921 let sql = r#"
1922CREATE FLOW f
1923SINK TO s
1924AS
1925WITH tql(ts, val) AS (
1926 TQL EVAL (now() - '1m'::interval, now(), '5s') metric
1927)
1928SELECT * FROM tql WHERE ts > 0;
1929"#;
1930
1931 let err =
1932 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1933 .unwrap_err();
1934 assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
1935 }
1936
1937 #[test]
1938 fn test_parse_create_flow_with_mixed_sql_tql_cte_is_unsupported() {
1939 let sql = r#"
1940CREATE FLOW f
1941SINK TO s
1942AS
1943WITH s1 AS (SELECT 1),
1944 tql(ts, val) AS (TQL EVAL (now() - '1m'::interval, now(), '5s') metric)
1945SELECT * FROM tql;
1946"#;
1947
1948 let err =
1949 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1950 .unwrap_err();
1951 assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
1952 }
1953
1954 #[test]
1955 fn test_create_flow_no_month() {
1956 let sql = r"
1957CREATE FLOW `task_2`
1958SINK TO schema_1.table_1
1959EXPIRE AFTER '1 month 2 days 1h 2 min'
1960AS
1961SELECT max(c1), min(c2) FROM schema_2.table_2;";
1962 let stmts =
1963 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1964
1965 assert!(
1966 stmts.is_err()
1967 && stmts
1968 .unwrap_err()
1969 .to_string()
1970 .contains("Interval with months is not allowed")
1971 );
1972 }
1973
1974 #[test]
1975 fn test_validate_create() {
1976 let sql = r"
1977CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp TIME INDEX)
1978PARTITION ON COLUMNS(c, a) (
1979 a < 10,
1980 a > 10 AND a < 20,
1981 a > 20 AND c < 100,
1982 a > 20 AND c >= 100
1983)
1984ENGINE=mito";
1985 let result =
1986 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1987 let _ = result.unwrap();
1988
1989 let sql = r"
1990CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
1991PARTITION ON COLUMNS(x) ()
1992ENGINE=mito";
1993 let result =
1994 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1995 assert!(
1996 result
1997 .unwrap_err()
1998 .to_string()
1999 .contains("Partition column \"x\" not defined")
2000 );
2001 }
2002
2003 #[test]
2004 fn test_parse_create_table_with_partitions() {
2005 let sql = r"
2006CREATE TABLE monitor (
2007 host_id INT,
2008 idc STRING,
2009 ts TIMESTAMP,
2010 cpu DOUBLE DEFAULT 0,
2011 memory DOUBLE,
2012 TIME INDEX (ts),
2013 PRIMARY KEY (host),
2014)
2015PARTITION ON COLUMNS(idc, host_id) (
2016 idc <= 'hz' AND host_id < 1000,
2017 idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
2018 idc > 'sh' AND host_id < 3000,
2019 idc > 'sh' AND host_id >= 3000,
2020)
2021ENGINE=mito";
2022 let result =
2023 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2024 .unwrap();
2025 assert_eq!(result.len(), 1);
2026 match &result[0] {
2027 Statement::CreateTable(c) => {
2028 assert!(c.partitions.is_some());
2029
2030 let partitions = c.partitions.as_ref().unwrap();
2031 let column_list = partitions
2032 .column_list
2033 .iter()
2034 .map(|x| &x.value)
2035 .collect::<Vec<&String>>();
2036 assert_eq!(column_list, vec!["idc", "host_id"]);
2037
2038 let exprs = &partitions.exprs;
2039
2040 assert_eq!(
2041 exprs[0],
2042 Expr::BinaryOp {
2043 left: Box::new(Expr::BinaryOp {
2044 left: Box::new(Expr::Identifier("idc".into())),
2045 op: BinaryOperator::LtEq,
2046 right: Box::new(Expr::Value(
2047 Value::SingleQuotedString("hz".to_string()).into()
2048 ))
2049 }),
2050 op: BinaryOperator::And,
2051 right: Box::new(Expr::BinaryOp {
2052 left: Box::new(Expr::Identifier("host_id".into())),
2053 op: BinaryOperator::Lt,
2054 right: Box::new(Expr::Value(
2055 Value::Number("1000".to_string(), false).into()
2056 ))
2057 })
2058 }
2059 );
2060 assert_eq!(
2061 exprs[1],
2062 Expr::BinaryOp {
2063 left: Box::new(Expr::BinaryOp {
2064 left: Box::new(Expr::BinaryOp {
2065 left: Box::new(Expr::Identifier("idc".into())),
2066 op: BinaryOperator::Gt,
2067 right: Box::new(Expr::Value(
2068 Value::SingleQuotedString("hz".to_string()).into()
2069 ))
2070 }),
2071 op: BinaryOperator::And,
2072 right: Box::new(Expr::BinaryOp {
2073 left: Box::new(Expr::Identifier("idc".into())),
2074 op: BinaryOperator::LtEq,
2075 right: Box::new(Expr::Value(
2076 Value::SingleQuotedString("sh".to_string()).into()
2077 ))
2078 })
2079 }),
2080 op: BinaryOperator::And,
2081 right: Box::new(Expr::BinaryOp {
2082 left: Box::new(Expr::Identifier("host_id".into())),
2083 op: BinaryOperator::Lt,
2084 right: Box::new(Expr::Value(
2085 Value::Number("2000".to_string(), false).into()
2086 ))
2087 })
2088 }
2089 );
2090 assert_eq!(
2091 exprs[2],
2092 Expr::BinaryOp {
2093 left: Box::new(Expr::BinaryOp {
2094 left: Box::new(Expr::Identifier("idc".into())),
2095 op: BinaryOperator::Gt,
2096 right: Box::new(Expr::Value(
2097 Value::SingleQuotedString("sh".to_string()).into()
2098 ))
2099 }),
2100 op: BinaryOperator::And,
2101 right: Box::new(Expr::BinaryOp {
2102 left: Box::new(Expr::Identifier("host_id".into())),
2103 op: BinaryOperator::Lt,
2104 right: Box::new(Expr::Value(
2105 Value::Number("3000".to_string(), false).into()
2106 ))
2107 })
2108 }
2109 );
2110 assert_eq!(
2111 exprs[3],
2112 Expr::BinaryOp {
2113 left: Box::new(Expr::BinaryOp {
2114 left: Box::new(Expr::Identifier("idc".into())),
2115 op: BinaryOperator::Gt,
2116 right: Box::new(Expr::Value(
2117 Value::SingleQuotedString("sh".to_string()).into()
2118 ))
2119 }),
2120 op: BinaryOperator::And,
2121 right: Box::new(Expr::BinaryOp {
2122 left: Box::new(Expr::Identifier("host_id".into())),
2123 op: BinaryOperator::GtEq,
2124 right: Box::new(Expr::Value(
2125 Value::Number("3000".to_string(), false).into()
2126 ))
2127 })
2128 }
2129 );
2130 }
2131 _ => unreachable!(),
2132 }
2133 }
2134
2135 #[test]
2136 fn test_parse_create_table_with_quoted_partitions() {
2137 let sql = r"
2138CREATE TABLE monitor (
2139 `host_id` INT,
2140 idc STRING,
2141 ts TIMESTAMP,
2142 cpu DOUBLE DEFAULT 0,
2143 memory DOUBLE,
2144 TIME INDEX (ts),
2145 PRIMARY KEY (host),
2146)
2147PARTITION ON COLUMNS(IdC, host_id) (
2148 idc <= 'hz' AND host_id < 1000,
2149 idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
2150 idc > 'sh' AND host_id < 3000,
2151 idc > 'sh' AND host_id >= 3000,
2152)";
2153 let result =
2154 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2155 .unwrap();
2156 assert_eq!(result.len(), 1);
2157 }
2158
2159 #[test]
2160 fn test_parse_create_table_with_timestamp_index() {
2161 let sql1 = r"
2162CREATE TABLE monitor (
2163 host_id INT,
2164 idc STRING,
2165 ts TIMESTAMP TIME INDEX,
2166 cpu DOUBLE DEFAULT 0,
2167 memory DOUBLE,
2168 PRIMARY KEY (host),
2169)
2170ENGINE=mito";
2171 let result1 = ParserContext::create_with_dialect(
2172 sql1,
2173 &GreptimeDbDialect {},
2174 ParseOptions::default(),
2175 )
2176 .unwrap();
2177
2178 if let Statement::CreateTable(c) = &result1[0] {
2179 assert_eq!(c.constraints.len(), 2);
2180 let tc = c.constraints[0].clone();
2181 match tc {
2182 TableConstraint::TimeIndex { column } => {
2183 assert_eq!(&column.value, "ts");
2184 }
2185 _ => panic!("should be time index constraint"),
2186 };
2187 } else {
2188 panic!("should be create_table statement");
2189 }
2190
2191 let sql2 = r"
2194CREATE TABLE monitor (
2195 host_id INT,
2196 idc STRING,
2197 ts TIMESTAMP NOT NULL,
2198 cpu DOUBLE DEFAULT 0,
2199 memory DOUBLE,
2200 TIME INDEX (ts),
2201 PRIMARY KEY (host),
2202)
2203ENGINE=mito";
2204 let result2 = ParserContext::create_with_dialect(
2205 sql2,
2206 &GreptimeDbDialect {},
2207 ParseOptions::default(),
2208 )
2209 .unwrap();
2210
2211 assert_eq!(result1, result2);
2212
2213 let sql3 = r"
2215CREATE TABLE monitor (
2216 host_id INT,
2217 idc STRING,
2218 ts TIMESTAMP,
2219 cpu DOUBLE DEFAULT 0,
2220 memory DOUBLE,
2221 TIME INDEX (ts),
2222 PRIMARY KEY (host),
2223)
2224ENGINE=mito";
2225
2226 let result3 = ParserContext::create_with_dialect(
2227 sql3,
2228 &GreptimeDbDialect {},
2229 ParseOptions::default(),
2230 )
2231 .unwrap();
2232
2233 assert_ne!(result1, result3);
2234
2235 let sql1 = r"
2237CREATE TABLE monitor (
2238 host_id INT,
2239 idc STRING,
2240 b bigint TIME INDEX,
2241 cpu DOUBLE DEFAULT 0,
2242 memory DOUBLE,
2243 PRIMARY KEY (host),
2244)
2245ENGINE=mito";
2246 let result1 = ParserContext::create_with_dialect(
2247 sql1,
2248 &GreptimeDbDialect {},
2249 ParseOptions::default(),
2250 );
2251
2252 assert!(
2253 result1
2254 .unwrap_err()
2255 .to_string()
2256 .contains("time index column data type should be timestamp")
2257 );
2258 }
2259
2260 #[test]
2261 fn test_parse_create_table_with_timestamp_index_not_null() {
2262 let sql = r"
2263CREATE TABLE monitor (
2264 host_id INT,
2265 idc STRING,
2266 ts TIMESTAMP TIME INDEX,
2267 cpu DOUBLE DEFAULT 0,
2268 memory DOUBLE,
2269 TIME INDEX (ts),
2270 PRIMARY KEY (host),
2271)
2272ENGINE=mito";
2273 let result =
2274 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2275 .unwrap();
2276
2277 assert_eq!(result.len(), 1);
2278 if let Statement::CreateTable(c) = &result[0] {
2279 let ts = c.columns[2].clone();
2280 assert_eq!(ts.name().to_string(), "ts");
2281 assert_eq!(ts.options()[0].option, NotNull);
2282 } else {
2283 panic!("should be create table statement");
2284 }
2285
2286 let sql1 = r"
2287CREATE TABLE monitor (
2288 host_id INT,
2289 idc STRING,
2290 ts TIMESTAMP NOT NULL TIME INDEX,
2291 cpu DOUBLE DEFAULT 0,
2292 memory DOUBLE,
2293 TIME INDEX (ts),
2294 PRIMARY KEY (host),
2295)
2296ENGINE=mito";
2297
2298 let result1 = ParserContext::create_with_dialect(
2299 sql1,
2300 &GreptimeDbDialect {},
2301 ParseOptions::default(),
2302 )
2303 .unwrap();
2304 assert_eq!(result, result1);
2305
2306 let sql2 = r"
2307CREATE TABLE monitor (
2308 host_id INT,
2309 idc STRING,
2310 ts TIMESTAMP TIME INDEX NOT NULL,
2311 cpu DOUBLE DEFAULT 0,
2312 memory DOUBLE,
2313 TIME INDEX (ts),
2314 PRIMARY KEY (host),
2315)
2316ENGINE=mito";
2317
2318 let result2 = ParserContext::create_with_dialect(
2319 sql2,
2320 &GreptimeDbDialect {},
2321 ParseOptions::default(),
2322 )
2323 .unwrap();
2324 assert_eq!(result, result2);
2325
2326 let sql3 = r"
2327CREATE TABLE monitor (
2328 host_id INT,
2329 idc STRING,
2330 ts TIMESTAMP TIME INDEX NULL NOT,
2331 cpu DOUBLE DEFAULT 0,
2332 memory DOUBLE,
2333 TIME INDEX (ts),
2334 PRIMARY KEY (host),
2335)
2336ENGINE=mito";
2337
2338 let result3 = ParserContext::create_with_dialect(
2339 sql3,
2340 &GreptimeDbDialect {},
2341 ParseOptions::default(),
2342 );
2343 assert!(result3.is_err());
2344
2345 let sql4 = r"
2346CREATE TABLE monitor (
2347 host_id INT,
2348 idc STRING,
2349 ts TIMESTAMP TIME INDEX NOT NULL NULL,
2350 cpu DOUBLE DEFAULT 0,
2351 memory DOUBLE,
2352 TIME INDEX (ts),
2353 PRIMARY KEY (host),
2354)
2355ENGINE=mito";
2356
2357 let result4 = ParserContext::create_with_dialect(
2358 sql4,
2359 &GreptimeDbDialect {},
2360 ParseOptions::default(),
2361 );
2362 assert!(result4.is_err());
2363
2364 let sql = r"
2365CREATE TABLE monitor (
2366 host_id INT,
2367 idc STRING,
2368 ts TIMESTAMP TIME INDEX DEFAULT CURRENT_TIMESTAMP,
2369 cpu DOUBLE DEFAULT 0,
2370 memory DOUBLE,
2371 TIME INDEX (ts),
2372 PRIMARY KEY (host),
2373)
2374ENGINE=mito";
2375
2376 let result =
2377 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2378 .unwrap();
2379
2380 if let Statement::CreateTable(c) = &result[0] {
2381 let tc = c.constraints[0].clone();
2382 match tc {
2383 TableConstraint::TimeIndex { column } => {
2384 assert_eq!(&column.value, "ts");
2385 }
2386 _ => panic!("should be time index constraint"),
2387 }
2388 let ts = c.columns[2].clone();
2389 assert_eq!(ts.name().to_string(), "ts");
2390 assert!(matches!(ts.options()[0].option, ColumnOption::Default(..)));
2391 assert_eq!(ts.options()[1].option, NotNull);
2392 } else {
2393 unreachable!("should be create table statement");
2394 }
2395 }
2396
2397 #[test]
2398 fn test_parse_partitions_with_error_syntax() {
2399 let sql = r"
2400CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2401PARTITION COLUMNS(c, a) (
2402 a < 10,
2403 a > 10 AND a < 20,
2404 a > 20 AND c < 100,
2405 a > 20 AND c >= 100
2406)
2407ENGINE=mito";
2408 let result =
2409 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2410 assert!(
2411 result
2412 .unwrap_err()
2413 .output_msg()
2414 .contains("sql parser error: Expected: ON, found: COLUMNS")
2415 );
2416 }
2417
2418 #[test]
2419 fn test_parse_partitions_without_rule() {
2420 let sql = r"
2421CREATE TABLE rcx ( a INT, b STRING, c INT, d TIMESTAMP TIME INDEX )
2422PARTITION ON COLUMNS(c, a) ()
2423ENGINE=mito";
2424 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2425 .unwrap();
2426 }
2427
2428 #[test]
2429 fn test_parse_partitions_unreferenced_column() {
2430 let sql = r"
2431CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2432PARTITION ON COLUMNS(c, a) (
2433 b = 'foo'
2434)
2435ENGINE=mito";
2436 let result =
2437 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2438 assert_eq!(
2439 result.unwrap_err().output_msg(),
2440 "Invalid SQL, error: Column \"b\" in rule expr is not referenced in PARTITION ON"
2441 );
2442 }
2443
2444 #[test]
2445 fn test_parse_partitions_not_binary_expr() {
2446 let sql = r"
2447CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2448PARTITION ON COLUMNS(c, a) (
2449 b
2450)
2451ENGINE=mito";
2452 let result =
2453 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2454 assert_eq!(
2455 result.unwrap_err().output_msg(),
2456 r#"Invalid SQL, error: Partition rule expr Identifier(Ident { value: "b", quote_style: None, span: Span(Location(4,5)..Location(4,6)) }) is not a binary expr"#
2457 );
2458 }
2459
2460 fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {
2461 assert_eq!(column.name.to_string(), name);
2462 assert_eq!(column.data_type.to_string(), data_type);
2463 }
2464
2465 #[test]
2466 pub fn test_parse_create_table() {
2467 let sql = r"create table demo(
2468 host string,
2469 ts timestamp,
2470 cpu float32 default 0,
2471 memory float64,
2472 TIME INDEX (ts),
2473 PRIMARY KEY(ts, host),
2474 ) engine=mito
2475 with(ttl='10s');
2476 ";
2477 let result =
2478 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2479 .unwrap();
2480 assert_eq!(1, result.len());
2481 match &result[0] {
2482 Statement::CreateTable(c) => {
2483 assert!(!c.if_not_exists);
2484 assert_eq!("demo", c.name.to_string());
2485 assert_eq!("mito", c.engine);
2486 assert_eq!(4, c.columns.len());
2487 let columns = &c.columns;
2488 assert_column_def(&columns[0].column_def, "host", "STRING");
2489 assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
2490 assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
2491 assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
2492
2493 let constraints = &c.constraints;
2494 assert_eq!(
2495 &constraints[0],
2496 &TableConstraint::TimeIndex {
2497 column: Ident::new("ts"),
2498 }
2499 );
2500 assert_eq!(
2501 &constraints[1],
2502 &TableConstraint::PrimaryKey {
2503 columns: vec![Ident::new("ts"), Ident::new("host")]
2504 }
2505 );
2506 assert_eq!(1, c.options.len());
2508 assert_eq!(
2509 [("ttl", "10s")].into_iter().collect::<HashMap<_, _>>(),
2510 c.options.to_str_map()
2511 );
2512 }
2513 _ => unreachable!(),
2514 }
2515 }
2516
2517 #[test]
2518 fn test_invalid_index_keys() {
2519 let sql = r"create table demo(
2520 host string,
2521 ts int64,
2522 cpu float64 default 0,
2523 memory float64,
2524 TIME INDEX (ts, host),
2525 PRIMARY KEY(ts, host)) engine=mito;
2526 ";
2527 let result =
2528 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2529 assert!(result.is_err());
2530 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2531 }
2532
2533 #[test]
2534 fn test_duplicated_time_index() {
2535 let sql = r"create table demo(
2536 host string,
2537 ts timestamp time index,
2538 t timestamp time index,
2539 cpu float64 default 0,
2540 memory float64,
2541 TIME INDEX (ts, host),
2542 PRIMARY KEY(ts, host)) engine=mito;
2543 ";
2544 let result =
2545 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2546 assert!(result.is_err());
2547 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2548
2549 let sql = r"create table demo(
2550 host string,
2551 ts timestamp time index,
2552 cpu float64 default 0,
2553 t timestamp,
2554 memory float64,
2555 TIME INDEX (t),
2556 PRIMARY KEY(ts, host)) engine=mito;
2557 ";
2558 let result =
2559 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2560 assert!(result.is_err());
2561 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2562 }
2563
2564 #[test]
2565 fn test_invalid_column_name() {
2566 let sql = "create table foo(user string, i timestamp time index)";
2567 let result =
2568 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2569 let err = result.unwrap_err().output_msg();
2570 assert!(err.contains("Cannot use keyword 'user' as column name"));
2571
2572 let sql = r#"
2574 create table foo("user" string, i timestamp time index)
2575 "#;
2576 let result =
2577 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2578 let _ = result.unwrap();
2579 }
2580
2581 #[test]
2582 fn test_incorrect_default_value_issue_3479() {
2583 let sql = r#"CREATE TABLE `ExcePTuRi`(
2584non TIMESTAMP(6) TIME INDEX,
2585`iUSTO` DOUBLE DEFAULT 0.047318541668048164
2586)"#;
2587 let result =
2588 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2589 .unwrap();
2590 assert_eq!(1, result.len());
2591 match &result[0] {
2592 Statement::CreateTable(c) => {
2593 assert_eq!(
2594 "`iUSTO` DOUBLE DEFAULT 0.047318541668048164",
2595 c.columns[1].to_string()
2596 );
2597 }
2598 _ => unreachable!(),
2599 }
2600 }
2601
2602 #[test]
2603 fn test_parse_create_view() {
2604 let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
2605
2606 let result =
2607 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2608 .unwrap();
2609 match &result[0] {
2610 Statement::CreateView(c) => {
2611 assert_eq!(c.to_string(), sql);
2612 assert!(!c.or_replace);
2613 assert!(!c.if_not_exists);
2614 assert_eq!("test", c.name.to_string());
2615 }
2616 _ => unreachable!(),
2617 }
2618
2619 let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS";
2620
2621 let result =
2622 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2623 .unwrap();
2624 match &result[0] {
2625 Statement::CreateView(c) => {
2626 assert_eq!(c.to_string(), sql);
2627 assert!(c.or_replace);
2628 assert!(c.if_not_exists);
2629 assert_eq!("test", c.name.to_string());
2630 }
2631 _ => unreachable!(),
2632 }
2633 }
2634
2635 #[test]
2636 fn test_parse_create_view_invalid_query() {
2637 let sql = "CREATE VIEW test AS DELETE from demo";
2638 let result =
2639 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2640 assert!(result.is_ok_and(|x| x.len() == 1));
2641 }
2642
2643 #[test]
2644 fn test_parse_create_table_fulltext_options() {
2645 let sql1 = r"
2646CREATE TABLE log (
2647 ts TIMESTAMP TIME INDEX,
2648 msg TEXT FULLTEXT INDEX,
2649)";
2650 let result1 = ParserContext::create_with_dialect(
2651 sql1,
2652 &GreptimeDbDialect {},
2653 ParseOptions::default(),
2654 )
2655 .unwrap();
2656
2657 if let Statement::CreateTable(c) = &result1[0] {
2658 c.columns.iter().for_each(|col| {
2659 if col.name().value == "msg" {
2660 assert!(
2661 col.extensions
2662 .fulltext_index_options
2663 .as_ref()
2664 .unwrap()
2665 .is_empty()
2666 );
2667 }
2668 });
2669 } else {
2670 panic!("should be create_table statement");
2671 }
2672
2673 let sql2 = r"
2674CREATE TABLE log (
2675 ts TIMESTAMP TIME INDEX,
2676 msg STRING FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false')
2677)";
2678 let result2 = ParserContext::create_with_dialect(
2679 sql2,
2680 &GreptimeDbDialect {},
2681 ParseOptions::default(),
2682 )
2683 .unwrap();
2684
2685 if let Statement::CreateTable(c) = &result2[0] {
2686 c.columns.iter().for_each(|col| {
2687 if col.name().value == "msg" {
2688 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2689 assert_eq!(options.len(), 2);
2690 assert_eq!(options.get("analyzer").unwrap(), "English");
2691 assert_eq!(options.get("case_sensitive").unwrap(), "false");
2692 }
2693 });
2694 } else {
2695 panic!("should be create_table statement");
2696 }
2697
2698 let sql3 = r"
2699CREATE TABLE log (
2700 ts TIMESTAMP TIME INDEX,
2701 msg1 TINYTEXT FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false'),
2702 msg2 CHAR(20) FULLTEXT INDEX WITH (analyzer='Chinese', case_sensitive='true')
2703)";
2704 let result3 = ParserContext::create_with_dialect(
2705 sql3,
2706 &GreptimeDbDialect {},
2707 ParseOptions::default(),
2708 )
2709 .unwrap();
2710
2711 if let Statement::CreateTable(c) = &result3[0] {
2712 c.columns.iter().for_each(|col| {
2713 if col.name().value == "msg1" {
2714 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2715 assert_eq!(options.len(), 2);
2716 assert_eq!(options.get("analyzer").unwrap(), "English");
2717 assert_eq!(options.get("case_sensitive").unwrap(), "false");
2718 } else if col.name().value == "msg2" {
2719 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2720 assert_eq!(options.len(), 2);
2721 assert_eq!(options.get("analyzer").unwrap(), "Chinese");
2722 assert_eq!(options.get("case_sensitive").unwrap(), "true");
2723 }
2724 });
2725 } else {
2726 panic!("should be create_table statement");
2727 }
2728 }
2729
2730 #[test]
2731 fn test_parse_create_table_fulltext_options_invalid_type() {
2732 let sql = r"
2733CREATE TABLE log (
2734 ts TIMESTAMP TIME INDEX,
2735 msg INT FULLTEXT INDEX,
2736)";
2737 let result =
2738 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2739 assert!(result.is_err());
2740 assert!(
2741 result
2742 .unwrap_err()
2743 .to_string()
2744 .contains("FULLTEXT index only supports string type")
2745 );
2746 }
2747
2748 #[test]
2749 fn test_parse_create_table_fulltext_options_duplicate() {
2750 let sql = r"
2751CREATE TABLE log (
2752 ts TIMESTAMP TIME INDEX,
2753 msg STRING FULLTEXT INDEX WITH (analyzer='English', analyzer='Chinese') FULLTEXT INDEX WITH (case_sensitive='false')
2754)";
2755 let result =
2756 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2757 assert!(result.is_err());
2758 assert!(
2759 result
2760 .unwrap_err()
2761 .to_string()
2762 .contains("duplicated FULLTEXT INDEX option")
2763 );
2764 }
2765
2766 #[test]
2767 fn test_parse_create_table_fulltext_options_invalid_option() {
2768 let sql = r"
2769CREATE TABLE log (
2770 ts TIMESTAMP TIME INDEX,
2771 msg STRING FULLTEXT INDEX WITH (analyzer='English', invalid_option='Chinese')
2772)";
2773 let result =
2774 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2775 assert!(result.is_err());
2776 assert!(
2777 result
2778 .unwrap_err()
2779 .to_string()
2780 .contains("invalid FULLTEXT INDEX option")
2781 );
2782 }
2783
2784 #[test]
2785 fn test_parse_create_table_skip_options() {
2786 let sql = r"
2787CREATE TABLE log (
2788 ts TIMESTAMP TIME INDEX,
2789 msg INT SKIPPING INDEX WITH (granularity='8192', type='bloom'),
2790)";
2791 let result =
2792 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2793 .unwrap();
2794
2795 if let Statement::CreateTable(c) = &result[0] {
2796 c.columns.iter().for_each(|col| {
2797 if col.name().value == "msg" {
2798 assert!(
2799 !col.extensions
2800 .skipping_index_options
2801 .as_ref()
2802 .unwrap()
2803 .is_empty()
2804 );
2805 }
2806 });
2807 } else {
2808 panic!("should be create_table statement");
2809 }
2810
2811 let sql = r"
2812 CREATE TABLE log (
2813 ts TIMESTAMP TIME INDEX,
2814 msg INT SKIPPING INDEX,
2815 )";
2816 let result =
2817 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2818 .unwrap();
2819
2820 if let Statement::CreateTable(c) = &result[0] {
2821 c.columns.iter().for_each(|col| {
2822 if col.name().value == "msg" {
2823 assert!(
2824 col.extensions
2825 .skipping_index_options
2826 .as_ref()
2827 .unwrap()
2828 .is_empty()
2829 );
2830 }
2831 });
2832 } else {
2833 panic!("should be create_table statement");
2834 }
2835 }
2836
2837 #[test]
2838 fn test_parse_create_view_with_columns() {
2839 let sql = "CREATE VIEW test () AS SELECT * FROM NUMBERS";
2840 let result =
2841 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2842 .unwrap();
2843
2844 match &result[0] {
2845 Statement::CreateView(c) => {
2846 assert_eq!(c.to_string(), "CREATE VIEW test AS SELECT * FROM NUMBERS");
2847 assert!(!c.or_replace);
2848 assert!(!c.if_not_exists);
2849 assert_eq!("test", c.name.to_string());
2850 }
2851 _ => unreachable!(),
2852 }
2853 assert_eq!(
2854 "CREATE VIEW test AS SELECT * FROM NUMBERS",
2855 result[0].to_string()
2856 );
2857
2858 let sql = "CREATE VIEW test (n1) AS SELECT * FROM NUMBERS";
2859 let result =
2860 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2861 .unwrap();
2862
2863 match &result[0] {
2864 Statement::CreateView(c) => {
2865 assert_eq!(c.to_string(), sql);
2866 assert!(!c.or_replace);
2867 assert!(!c.if_not_exists);
2868 assert_eq!("test", c.name.to_string());
2869 }
2870 _ => unreachable!(),
2871 }
2872 assert_eq!(sql, result[0].to_string());
2873
2874 let sql = "CREATE VIEW test (n1, n2) AS SELECT * FROM NUMBERS";
2875 let result =
2876 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2877 .unwrap();
2878
2879 match &result[0] {
2880 Statement::CreateView(c) => {
2881 assert_eq!(c.to_string(), sql);
2882 assert!(!c.or_replace);
2883 assert!(!c.if_not_exists);
2884 assert_eq!("test", c.name.to_string());
2885 }
2886 _ => unreachable!(),
2887 }
2888 assert_eq!(sql, result[0].to_string());
2889
2890 let sql = "CREATE VIEW test (n1 AS select * from demo";
2892 let result =
2893 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2894 assert!(result.is_err());
2895
2896 let sql = "CREATE VIEW test (n1, AS select * from demo";
2897 let result =
2898 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2899 assert!(result.is_err());
2900
2901 let sql = "CREATE VIEW test n1,n2) AS select * from demo";
2902 let result =
2903 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2904 assert!(result.is_err());
2905
2906 let sql = "CREATE VIEW test (1) AS select * from demo";
2907 let result =
2908 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2909 assert!(result.is_err());
2910
2911 let sql = "CREATE VIEW test (n1, select) AS select * from demo";
2913 let result =
2914 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2915 assert!(result.is_err());
2916 }
2917
2918 #[test]
2919 fn test_parse_column_extensions_vector() {
2920 let sql = "";
2922 let dialect = GenericDialect {};
2923 let mut tokenizer = Tokenizer::new(&dialect, sql);
2924 let tokens = tokenizer.tokenize().unwrap();
2925 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2926 let name = Ident::new("vec_col");
2927 let data_type =
2928 DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
2929 let mut extensions = ColumnExtensions::default();
2930
2931 let result =
2932 ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2933 assert!(result.is_ok());
2934 assert!(extensions.vector_options.is_some());
2935 let vector_options = extensions.vector_options.unwrap();
2936 assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some("128"));
2937 }
2938
2939 #[test]
2940 fn test_parse_column_extensions_vector_invalid() {
2941 let sql = "";
2943 let dialect = GenericDialect {};
2944 let mut tokenizer = Tokenizer::new(&dialect, sql);
2945 let tokens = tokenizer.tokenize().unwrap();
2946 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2947 let name = Ident::new("vec_col");
2948 let data_type = DataType::Custom(vec![Ident::new("VECTOR")].into(), vec![]);
2949 let mut extensions = ColumnExtensions::default();
2950
2951 let result =
2952 ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2953 assert!(result.is_err());
2954 }
2955
2956 #[test]
2957 fn test_parse_column_extensions_indices() {
2958 {
2960 let sql = "SKIPPING INDEX";
2961 let dialect = GenericDialect {};
2962 let mut tokenizer = Tokenizer::new(&dialect, sql);
2963 let tokens = tokenizer.tokenize().unwrap();
2964 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2965 let name = Ident::new("col");
2966 let data_type = DataType::String(None);
2967 let mut extensions = ColumnExtensions::default();
2968 let result = ParserContext::parse_column_extensions(
2969 &mut parser,
2970 &name,
2971 &data_type,
2972 &mut extensions,
2973 );
2974 assert!(result.is_ok());
2975 assert!(extensions.skipping_index_options.is_some());
2976 }
2977
2978 {
2980 let sql = "FULLTEXT INDEX WITH (analyzer = 'English', case_sensitive = 'true')";
2981 let dialect = GenericDialect {};
2982 let mut tokenizer = Tokenizer::new(&dialect, sql);
2983 let tokens = tokenizer.tokenize().unwrap();
2984 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2985 let name = Ident::new("text_col");
2986 let data_type = DataType::String(None);
2987 let mut extensions = ColumnExtensions::default();
2988 let result = ParserContext::parse_column_extensions(
2989 &mut parser,
2990 &name,
2991 &data_type,
2992 &mut extensions,
2993 );
2994 assert!(result.unwrap());
2995 assert!(extensions.fulltext_index_options.is_some());
2996 let fulltext_options = extensions.fulltext_index_options.unwrap();
2997 assert_eq!(fulltext_options.get("analyzer"), Some("English"));
2998 assert_eq!(fulltext_options.get("case_sensitive"), Some("true"));
2999 }
3000
3001 {
3003 let sql = "FULLTEXT INDEX WITH (analyzer = 'English')";
3004 let dialect = GenericDialect {};
3005 let mut tokenizer = Tokenizer::new(&dialect, sql);
3006 let tokens = tokenizer.tokenize().unwrap();
3007 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3008 let name = Ident::new("num_col");
3009 let data_type = DataType::Int(None); let mut extensions = ColumnExtensions::default();
3011 let result = ParserContext::parse_column_extensions(
3012 &mut parser,
3013 &name,
3014 &data_type,
3015 &mut extensions,
3016 );
3017 assert!(result.is_err());
3018 assert!(
3019 result
3020 .unwrap_err()
3021 .to_string()
3022 .contains("FULLTEXT index only supports string type")
3023 );
3024 }
3025
3026 {
3028 let sql = "FULLTEXT INDEX WITH (analyzer = 'Invalid', case_sensitive = 'true')";
3029 let dialect = GenericDialect {};
3030 let mut tokenizer = Tokenizer::new(&dialect, sql);
3031 let tokens = tokenizer.tokenize().unwrap();
3032 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3033 let name = Ident::new("text_col");
3034 let data_type = DataType::String(None);
3035 let mut extensions = ColumnExtensions::default();
3036 let result = ParserContext::parse_column_extensions(
3037 &mut parser,
3038 &name,
3039 &data_type,
3040 &mut extensions,
3041 );
3042 assert!(result.unwrap());
3043 }
3044
3045 {
3047 let sql = "INVERTED INDEX";
3048 let dialect = GenericDialect {};
3049 let mut tokenizer = Tokenizer::new(&dialect, sql);
3050 let tokens = tokenizer.tokenize().unwrap();
3051 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3052 let name = Ident::new("col");
3053 let data_type = DataType::String(None);
3054 let mut extensions = ColumnExtensions::default();
3055 let result = ParserContext::parse_column_extensions(
3056 &mut parser,
3057 &name,
3058 &data_type,
3059 &mut extensions,
3060 );
3061 assert!(result.is_ok());
3062 assert!(extensions.inverted_index_options.is_some());
3063 }
3064
3065 {
3067 let sql = "INVERTED INDEX WITH (analyzer = 'English')";
3068 let dialect = GenericDialect {};
3069 let mut tokenizer = Tokenizer::new(&dialect, sql);
3070 let tokens = tokenizer.tokenize().unwrap();
3071 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3072 let name = Ident::new("col");
3073 let data_type = DataType::String(None);
3074 let mut extensions = ColumnExtensions::default();
3075 let result = ParserContext::parse_column_extensions(
3076 &mut parser,
3077 &name,
3078 &data_type,
3079 &mut extensions,
3080 );
3081 assert!(result.is_err());
3082 assert!(
3083 result
3084 .unwrap_err()
3085 .to_string()
3086 .contains("INVERTED index doesn't support options")
3087 );
3088 }
3089
3090 {
3092 let sql = "SKIPPING INDEX FULLTEXT INDEX";
3093 let dialect = GenericDialect {};
3094 let mut tokenizer = Tokenizer::new(&dialect, sql);
3095 let tokens = tokenizer.tokenize().unwrap();
3096 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3097 let name = Ident::new("col");
3098 let data_type = DataType::String(None);
3099 let mut extensions = ColumnExtensions::default();
3100 let result = ParserContext::parse_column_extensions(
3101 &mut parser,
3102 &name,
3103 &data_type,
3104 &mut extensions,
3105 );
3106 assert!(result.unwrap());
3107 assert!(extensions.skipping_index_options.is_some());
3108 assert!(extensions.fulltext_index_options.is_some());
3109 }
3110 }
3111
3112 #[test]
3113 fn test_parse_interval_cast() {
3114 let s = "select '10s'::INTERVAL";
3115 let stmts =
3116 ParserContext::create_with_dialect(s, &GreptimeDbDialect {}, ParseOptions::default())
3117 .unwrap();
3118 assert_eq!("SELECT '10 seconds'::INTERVAL", &stmts[0].to_string());
3119 }
3120
3121 #[test]
3122 fn test_parse_create_table_vector_index_options() {
3123 let sql = r"
3125CREATE TABLE vectors (
3126 ts TIMESTAMP TIME INDEX,
3127 vec VECTOR(128) VECTOR INDEX,
3128)";
3129 let result =
3130 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3131 .unwrap();
3132
3133 if let Statement::CreateTable(c) = &result[0] {
3134 c.columns.iter().for_each(|col| {
3135 if col.name().value == "vec" {
3136 assert!(
3137 col.extensions
3138 .vector_index_options
3139 .as_ref()
3140 .unwrap()
3141 .is_empty()
3142 );
3143 }
3144 });
3145 } else {
3146 panic!("should be create_table statement");
3147 }
3148
3149 let sql = r"
3151CREATE TABLE vectors (
3152 ts TIMESTAMP TIME INDEX,
3153 vec VECTOR(128) VECTOR INDEX WITH (metric='cosine', connectivity='32', expansion_add='256', expansion_search='128')
3154)";
3155 let result =
3156 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3157 .unwrap();
3158
3159 if let Statement::CreateTable(c) = &result[0] {
3160 c.columns.iter().for_each(|col| {
3161 if col.name().value == "vec" {
3162 let options = col.extensions.vector_index_options.as_ref().unwrap();
3163 assert_eq!(options.len(), 4);
3164 assert_eq!(options.get("metric").unwrap(), "cosine");
3165 assert_eq!(options.get("connectivity").unwrap(), "32");
3166 assert_eq!(options.get("expansion_add").unwrap(), "256");
3167 assert_eq!(options.get("expansion_search").unwrap(), "128");
3168 }
3169 });
3170 } else {
3171 panic!("should be create_table statement");
3172 }
3173 }
3174
3175 #[test]
3176 fn test_parse_create_table_vector_index_invalid_type() {
3177 let sql = r"
3179CREATE TABLE vectors (
3180 ts TIMESTAMP TIME INDEX,
3181 col INT VECTOR INDEX,
3182)";
3183 let result =
3184 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3185 assert!(result.is_err());
3186 assert!(
3187 result
3188 .unwrap_err()
3189 .to_string()
3190 .contains("VECTOR INDEX only supports Vector type columns")
3191 );
3192 }
3193
3194 #[test]
3195 fn test_parse_create_table_vector_index_duplicate() {
3196 let sql = r"
3198CREATE TABLE vectors (
3199 ts TIMESTAMP TIME INDEX,
3200 vec VECTOR(128) VECTOR INDEX VECTOR INDEX,
3201)";
3202 let result =
3203 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3204 assert!(result.is_err());
3205 assert!(
3206 result
3207 .unwrap_err()
3208 .to_string()
3209 .contains("duplicated VECTOR INDEX option")
3210 );
3211 }
3212
3213 #[test]
3214 fn test_parse_create_table_vector_index_invalid_option() {
3215 let sql = r"
3217CREATE TABLE vectors (
3218 ts TIMESTAMP TIME INDEX,
3219 vec VECTOR(128) VECTOR INDEX WITH (metric='l2sq', invalid_option='foo')
3220)";
3221 let result =
3222 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3223 assert!(result.is_err());
3224 assert!(
3225 result
3226 .unwrap_err()
3227 .to_string()
3228 .contains("invalid VECTOR INDEX option")
3229 );
3230 }
3231
3232 #[test]
3233 fn test_parse_column_extensions_vector_index() {
3234 {
3236 let sql = "VECTOR INDEX WITH (metric = 'l2sq')";
3237 let dialect = GenericDialect {};
3238 let mut tokenizer = Tokenizer::new(&dialect, sql);
3239 let tokens = tokenizer.tokenize().unwrap();
3240 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3241 let name = Ident::new("vec_col");
3242 let data_type =
3243 DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
3244 let mut extensions = ColumnExtensions {
3246 vector_options: Some(OptionMap::from([(
3247 VECTOR_OPT_DIM.to_string(),
3248 "128".to_string(),
3249 )])),
3250 ..Default::default()
3251 };
3252
3253 let result = ParserContext::parse_column_extensions(
3254 &mut parser,
3255 &name,
3256 &data_type,
3257 &mut extensions,
3258 );
3259 assert!(result.is_ok());
3260 assert!(extensions.vector_index_options.is_some());
3261 let vi_options = extensions.vector_index_options.unwrap();
3262 assert_eq!(vi_options.get("metric"), Some("l2sq"));
3263 }
3264
3265 {
3267 let sql = "VECTOR INDEX";
3268 let dialect = GenericDialect {};
3269 let mut tokenizer = Tokenizer::new(&dialect, sql);
3270 let tokens = tokenizer.tokenize().unwrap();
3271 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3272 let name = Ident::new("num_col");
3273 let data_type = DataType::Int(None); let mut extensions = ColumnExtensions::default();
3275 let result = ParserContext::parse_column_extensions(
3276 &mut parser,
3277 &name,
3278 &data_type,
3279 &mut extensions,
3280 );
3281 assert!(result.is_err());
3282 assert!(
3283 result
3284 .unwrap_err()
3285 .to_string()
3286 .contains("VECTOR INDEX only supports Vector type columns")
3287 );
3288 }
3289 }
3290}