1mod json;
16#[cfg(feature = "enterprise")]
17pub mod trigger;
18
19use std::collections::HashMap;
20
21use arrow_buffer::IntervalMonthDayNano;
22use common_catalog::consts::default_engine;
23use datafusion_common::ScalarValue;
24use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
25use datatypes::data_type::ConcreteDataType;
26use itertools::Itertools;
27use snafu::{OptionExt, ResultExt, ensure};
28use sqlparser::ast::{ColumnOption, ColumnOptionDef, DataType, Expr};
29use sqlparser::dialect::keywords::Keyword;
30use sqlparser::keywords::ALL_KEYWORDS;
31use sqlparser::parser::IsOptional::Mandatory;
32use sqlparser::parser::{Parser, ParserError};
33use sqlparser::tokenizer::{Token, TokenWithSpan, Word};
34use table::requests::{validate_database_option, validate_table_option};
35
36use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
37use crate::error::{
38 self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidIntervalSnafu,
39 InvalidSqlSnafu, InvalidTableOptionSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result,
40 SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu,
41};
42use crate::parser::{FLOW, ParserContext};
43use crate::parsers::tql_parser;
44use crate::parsers::utils::{
45 self, validate_column_fulltext_create_option, validate_column_skipping_index_create_option,
46 validate_column_vector_index_create_option,
47};
48use crate::statements::create::{
49 Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
50 CreateTableLike, CreateView, Partitions, SqlOrTql, TableConstraint, VECTOR_OPT_DIM,
51};
52use crate::statements::statement::Statement;
53use crate::statements::transform::type_alias::get_data_type_by_alias_name;
54use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
55use crate::util::{OptionValue, location_to_index, parse_option_string};
56
57pub const ENGINE: &str = "ENGINE";
58pub const MAXVALUE: &str = "MAXVALUE";
59pub const SINK: &str = "SINK";
60pub const EXPIRE: &str = "EXPIRE";
61pub const AFTER: &str = "AFTER";
62pub const INVERTED: &str = "INVERTED";
63pub const SKIPPING: &str = "SKIPPING";
64pub const VECTOR: &str = "VECTOR";
65
66pub type RawIntervalExpr = String;
67
68impl<'a> ParserContext<'a> {
70 pub(crate) fn parse_create(&mut self) -> Result<Statement> {
71 match self.parser.peek_token().token {
72 Token::Word(w) => match w.keyword {
73 Keyword::TABLE => self.parse_create_table(),
74
75 Keyword::SCHEMA | Keyword::DATABASE => self.parse_create_database(),
76
77 Keyword::EXTERNAL => self.parse_create_external_table(),
78
79 Keyword::OR => {
80 let _ = self.parser.next_token();
81 self.parser
82 .expect_keyword(Keyword::REPLACE)
83 .context(SyntaxSnafu)?;
84 match self.parser.next_token().token {
85 Token::Word(w) => match w.keyword {
86 Keyword::VIEW => self.parse_create_view(true),
87 Keyword::NoKeyword => {
88 let uppercase = w.value.to_uppercase();
89 match uppercase.as_str() {
90 FLOW => self.parse_create_flow(true),
91 _ => self.unsupported(w.to_string()),
92 }
93 }
94 _ => self.unsupported(w.to_string()),
95 },
96 _ => self.unsupported(w.to_string()),
97 }
98 }
99
100 Keyword::VIEW => {
101 let _ = self.parser.next_token();
102 self.parse_create_view(false)
103 }
104
105 #[cfg(feature = "enterprise")]
106 Keyword::TRIGGER => {
107 let _ = self.parser.next_token();
108 self.parse_create_trigger()
109 }
110
111 Keyword::NoKeyword => {
112 let _ = self.parser.next_token();
113 let uppercase = w.value.to_uppercase();
114 match uppercase.as_str() {
115 FLOW => self.parse_create_flow(false),
116 _ => self.unsupported(w.to_string()),
117 }
118 }
119 _ => self.unsupported(w.to_string()),
120 },
121 unexpected => self.unsupported(unexpected.to_string()),
122 }
123 }
124
125 fn parse_create_view(&mut self, or_replace: bool) -> Result<Statement> {
127 let if_not_exists = self.parse_if_not_exist()?;
128 let view_name = self.intern_parse_table_name()?;
129
130 let columns = self.parse_view_columns()?;
131
132 self.parser
133 .expect_keyword(Keyword::AS)
134 .context(SyntaxSnafu)?;
135
136 let query = self.parse_query()?;
137
138 Ok(Statement::CreateView(CreateView {
139 name: view_name,
140 columns,
141 or_replace,
142 query: Box::new(query),
143 if_not_exists,
144 }))
145 }
146
147 fn parse_view_columns(&mut self) -> Result<Vec<Ident>> {
148 let mut columns = vec![];
149 if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
150 return Ok(columns);
151 }
152
153 loop {
154 let name = self.parse_column_name().context(SyntaxSnafu)?;
155
156 columns.push(name);
157
158 let comma = self.parser.consume_token(&Token::Comma);
159 if self.parser.consume_token(&Token::RParen) {
160 break;
162 } else if !comma {
163 return self.expected("',' or ')' after column name", self.parser.peek_token());
164 }
165 }
166
167 Ok(columns)
168 }
169
170 fn parse_create_external_table(&mut self) -> Result<Statement> {
171 let _ = self.parser.next_token();
172 self.parser
173 .expect_keyword(Keyword::TABLE)
174 .context(SyntaxSnafu)?;
175 let if_not_exists = self.parse_if_not_exist()?;
176 let table_name = self.intern_parse_table_name()?;
177 let (columns, constraints) = self.parse_columns()?;
178 if !columns.is_empty() {
179 validate_time_index(&columns, &constraints)?;
180 }
181
182 let engine = self.parse_table_engine(common_catalog::consts::FILE_ENGINE)?;
183 let options = self.parse_create_table_options()?;
184 Ok(Statement::CreateExternalTable(CreateExternalTable {
185 name: table_name,
186 columns,
187 constraints,
188 options,
189 if_not_exists,
190 engine,
191 }))
192 }
193
194 fn parse_create_database(&mut self) -> Result<Statement> {
195 let _ = self.parser.next_token();
196 let if_not_exists = self.parse_if_not_exist()?;
197 let database_name = self.parse_object_name().context(error::UnexpectedSnafu {
198 expected: "a database name",
199 actual: self.peek_token_as_string(),
200 })?;
201 let database_name = Self::canonicalize_object_name(database_name)?;
202
203 let options = self
204 .parser
205 .parse_options(Keyword::WITH)
206 .context(SyntaxSnafu)?
207 .into_iter()
208 .map(parse_option_string)
209 .collect::<Result<HashMap<String, OptionValue>>>()?;
210
211 for key in options.keys() {
212 ensure!(
213 validate_database_option(key),
214 InvalidDatabaseOptionSnafu { key: key.clone() }
215 );
216 }
217 if let Some(append_mode) = options.get("append_mode").and_then(|x| x.as_string())
218 && append_mode == "true"
219 && options.contains_key("merge_mode")
220 {
221 return InvalidDatabaseOptionSnafu {
222 key: "merge_mode".to_string(),
223 }
224 .fail();
225 }
226
227 Ok(Statement::CreateDatabase(CreateDatabase {
228 name: database_name,
229 if_not_exists,
230 options: OptionMap::new(options),
231 }))
232 }
233
234 fn parse_create_table(&mut self) -> Result<Statement> {
235 let _ = self.parser.next_token();
236
237 let if_not_exists = self.parse_if_not_exist()?;
238
239 let table_name = self.intern_parse_table_name()?;
240
241 if self.parser.parse_keyword(Keyword::LIKE) {
242 let source_name = self.intern_parse_table_name()?;
243
244 return Ok(Statement::CreateTableLike(CreateTableLike {
245 table_name,
246 source_name,
247 }));
248 }
249
250 let (columns, constraints) = self.parse_columns()?;
251 validate_time_index(&columns, &constraints)?;
252
253 let partitions = self.parse_partitions()?;
254 if let Some(partitions) = &partitions {
255 validate_partitions(&columns, partitions)?;
256 }
257
258 let engine = self.parse_table_engine(default_engine())?;
259 let options = self.parse_create_table_options()?;
260 let create_table = CreateTable {
261 if_not_exists,
262 name: table_name,
263 columns,
264 engine,
265 constraints,
266 options,
267 table_id: 0, partitions,
269 };
270
271 Ok(Statement::CreateTable(create_table))
272 }
273
274 fn parse_create_flow(&mut self, or_replace: bool) -> Result<Statement> {
276 let if_not_exists = self.parse_if_not_exist()?;
277
278 let flow_name = self.intern_parse_table_name()?;
279
280 if let Token::Word(word) = self.parser.peek_token().token
282 && word.value.eq_ignore_ascii_case(SINK)
283 {
284 self.parser.next_token();
285 } else {
286 Err(ParserError::ParserError(
287 "Expect `SINK` keyword".to_string(),
288 ))
289 .context(SyntaxSnafu)?
290 }
291 self.parser
292 .expect_keyword(Keyword::TO)
293 .context(SyntaxSnafu)?;
294
295 let output_table_name = self.intern_parse_table_name()?;
296
297 let expire_after = if let Token::Word(w1) = &self.parser.peek_token().token
298 && w1.value.eq_ignore_ascii_case(EXPIRE)
299 {
300 self.parser.next_token();
301 if let Token::Word(w2) = &self.parser.peek_token().token
302 && w2.value.eq_ignore_ascii_case(AFTER)
303 {
304 self.parser.next_token();
305 Some(self.parse_interval_no_month("EXPIRE AFTER")?)
306 } else {
307 None
308 }
309 } else {
310 None
311 };
312
313 let eval_interval = if self
314 .parser
315 .consume_tokens(&[Token::make_keyword("EVAL"), Token::make_keyword("INTERVAL")])
316 {
317 Some(self.parse_interval_no_month("EVAL INTERVAL")?)
318 } else {
319 None
320 };
321
322 let comment = if self.parser.parse_keyword(Keyword::COMMENT) {
323 match self.parser.next_token() {
324 TokenWithSpan {
325 token: Token::SingleQuotedString(value, ..),
326 ..
327 } => Some(value),
328 unexpected => {
329 return self
330 .parser
331 .expected("string", unexpected)
332 .context(SyntaxSnafu);
333 }
334 }
335 } else {
336 None
337 };
338
339 self.parser
340 .expect_keyword(Keyword::AS)
341 .context(SyntaxSnafu)?;
342
343 let query = Box::new(self.parse_sql_or_tql(true)?);
344
345 Ok(Statement::CreateFlow(CreateFlow {
346 flow_name,
347 sink_table_name: output_table_name,
348 or_replace,
349 if_not_exists,
350 expire_after,
351 eval_interval,
352 comment,
353 query,
354 }))
355 }
356
357 fn parse_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
358 let start_loc = self.parser.peek_token().span.start;
359 let start_index = location_to_index(self.sql, &start_loc);
360
361 let query = match self.parser.peek_token().token {
363 Token::Word(w) => match w.keyword {
364 Keyword::SELECT => self.parse_query(),
365 Keyword::NoKeyword
366 if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
367 {
368 self.parse_tql(require_now_expr)
369 }
370
371 _ => self.unsupported(self.peek_token_as_string()),
372 },
373 _ => self.unsupported(self.peek_token_as_string()),
374 }?;
375
376 let end_token = self.parser.peek_token();
377
378 let raw_query = if end_token == Token::EOF {
379 &self.sql[start_index..]
380 } else {
381 let end_loc = end_token.span.end;
382 let end_index = location_to_index(self.sql, &end_loc);
383 &self.sql[start_index..end_index.min(self.sql.len())]
384 };
385 let raw_query = raw_query.trim_end_matches(";");
386 let query = SqlOrTql::try_from_statement(query, raw_query)?;
387 Ok(query)
388 }
389
390 fn parse_interval_no_month(&mut self, context: &str) -> Result<i64> {
392 let interval = self.parse_interval_month_day_nano()?.0;
393 if interval.months != 0 {
394 return InvalidIntervalSnafu {
395 reason: format!("Interval with months is not allowed in {context}"),
396 }
397 .fail();
398 }
399 Ok(
400 interval.nanoseconds / 1_000_000_000
401 + interval.days as i64 * 60 * 60 * 24
402 + interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, )
406 }
407
408 fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> {
410 let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
411 let raw_interval_expr = interval_expr.to_string();
412 let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone(), false)?
413 .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
414 .ok()
415 .with_context(|| InvalidIntervalSnafu {
416 reason: format!("cannot cast {} to interval type", interval_expr),
417 })?;
418 if let ScalarValue::IntervalMonthDayNano(Some(interval)) = interval {
419 Ok((interval, raw_interval_expr))
420 } else {
421 unreachable!()
422 }
423 }
424
425 fn parse_if_not_exist(&mut self) -> Result<bool> {
426 match self.parser.peek_token().token {
427 Token::Word(w) if Keyword::IF != w.keyword => return Ok(false),
428 _ => {}
429 }
430
431 if self.parser.parse_keywords(&[Keyword::IF, Keyword::NOT]) {
432 return self
433 .parser
434 .expect_keyword(Keyword::EXISTS)
435 .map(|_| true)
436 .context(UnexpectedSnafu {
437 expected: "EXISTS",
438 actual: self.peek_token_as_string(),
439 });
440 }
441
442 if self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]) {
443 return UnsupportedSnafu { keyword: "EXISTS" }.fail();
444 }
445
446 Ok(false)
447 }
448
449 fn parse_create_table_options(&mut self) -> Result<OptionMap> {
450 let options = self
451 .parser
452 .parse_options(Keyword::WITH)
453 .context(SyntaxSnafu)?
454 .into_iter()
455 .map(parse_option_string)
456 .collect::<Result<HashMap<String, OptionValue>>>()?;
457 for key in options.keys() {
458 ensure!(validate_table_option(key), InvalidTableOptionSnafu { key });
459 }
460 Ok(OptionMap::new(options))
461 }
462
463 fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
465 if !self.parser.parse_keyword(Keyword::PARTITION) {
466 return Ok(None);
467 }
468 self.parser
469 .expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
470 .context(error::UnexpectedSnafu {
471 expected: "ON, COLUMNS",
472 actual: self.peek_token_as_string(),
473 })?;
474
475 let raw_column_list = self
476 .parser
477 .parse_parenthesized_column_list(Mandatory, false)
478 .context(error::SyntaxSnafu)?;
479 let column_list = raw_column_list
480 .into_iter()
481 .map(Self::canonicalize_identifier)
482 .collect();
483
484 let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
485
486 Ok(Some(Partitions { column_list, exprs }))
487 }
488
489 fn parse_partition_entry(&mut self) -> Result<Expr> {
490 self.parser.parse_expr().context(error::SyntaxSnafu)
491 }
492
493 fn parse_comma_separated<T, F>(&mut self, mut f: F) -> Result<Vec<T>>
495 where
496 F: FnMut(&mut ParserContext<'a>) -> Result<T>,
497 {
498 self.parser
499 .expect_token(&Token::LParen)
500 .context(error::UnexpectedSnafu {
501 expected: "(",
502 actual: self.peek_token_as_string(),
503 })?;
504
505 let mut values = vec![];
506 while self.parser.peek_token() != Token::RParen {
507 values.push(f(self)?);
508 if !self.parser.consume_token(&Token::Comma) {
509 break;
510 }
511 }
512
513 self.parser
514 .expect_token(&Token::RParen)
515 .context(error::UnexpectedSnafu {
516 expected: ")",
517 actual: self.peek_token_as_string(),
518 })?;
519
520 Ok(values)
521 }
522
523 fn parse_columns(&mut self) -> Result<(Vec<Column>, Vec<TableConstraint>)> {
525 let mut columns = vec![];
526 let mut constraints = vec![];
527 if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
528 return Ok((columns, constraints));
529 }
530
531 loop {
532 if let Some(constraint) = self.parse_optional_table_constraint()? {
533 constraints.push(constraint);
534 } else if let Token::Word(_) = self.parser.peek_token().token {
535 self.parse_column(&mut columns, &mut constraints)?;
536 } else {
537 return self.expected(
538 "column name or constraint definition",
539 self.parser.peek_token(),
540 );
541 }
542 let comma = self.parser.consume_token(&Token::Comma);
543 if self.parser.consume_token(&Token::RParen) {
544 break;
546 } else if !comma {
547 return self.expected(
548 "',' or ')' after column definition",
549 self.parser.peek_token(),
550 );
551 }
552 }
553
554 Ok((columns, constraints))
555 }
556
557 fn parse_column(
558 &mut self,
559 columns: &mut Vec<Column>,
560 constraints: &mut Vec<TableConstraint>,
561 ) -> Result<()> {
562 let mut column = self.parse_column_def()?;
563
564 let mut time_index_opt_idx = None;
565 for (index, opt) in column.options().iter().enumerate() {
566 if let ColumnOption::DialectSpecific(tokens) = &opt.option
567 && matches!(
568 &tokens[..],
569 [
570 Token::Word(Word {
571 keyword: Keyword::TIME,
572 ..
573 }),
574 Token::Word(Word {
575 keyword: Keyword::INDEX,
576 ..
577 })
578 ]
579 )
580 {
581 ensure!(
582 time_index_opt_idx.is_none(),
583 InvalidColumnOptionSnafu {
584 name: column.name().to_string(),
585 msg: "duplicated time index",
586 }
587 );
588 time_index_opt_idx = Some(index);
589
590 let constraint = TableConstraint::TimeIndex {
591 column: Ident::new(column.name().value.clone()),
592 };
593 constraints.push(constraint);
594 }
595 }
596
597 if let Some(index) = time_index_opt_idx {
598 ensure!(
599 !column.options().contains(&ColumnOptionDef {
600 option: ColumnOption::Null,
601 name: None,
602 }),
603 InvalidColumnOptionSnafu {
604 name: column.name().to_string(),
605 msg: "time index column can't be null",
606 }
607 );
608
609 let data_type = get_unalias_type(column.data_type());
611 ensure!(
612 matches!(data_type, DataType::Timestamp(_, _)),
613 InvalidColumnOptionSnafu {
614 name: column.name().to_string(),
615 msg: "time index column data type should be timestamp",
616 }
617 );
618
619 let not_null_opt = ColumnOptionDef {
620 option: ColumnOption::NotNull,
621 name: None,
622 };
623
624 if !column.options().contains(¬_null_opt) {
625 column.mut_options().push(not_null_opt);
626 }
627
628 let _ = column.mut_options().remove(index);
629 }
630
631 columns.push(column);
632
633 Ok(())
634 }
635
636 fn parse_column_name(&mut self) -> std::result::Result<Ident, ParserError> {
638 let name = self.parser.parse_identifier()?;
639 if name.quote_style.is_none() &&
640 ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()
642 {
643 return Err(ParserError::ParserError(format!(
644 "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
645 &name.value
646 )));
647 }
648
649 Ok(name)
650 }
651
652 pub fn parse_column_def(&mut self) -> Result<Column> {
653 let name = self.parse_column_name().context(SyntaxSnafu)?;
654 let parser = &mut self.parser;
655
656 ensure!(
657 !(name.quote_style.is_none() &&
658 ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()),
660 InvalidSqlSnafu {
661 msg: format!(
662 "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
663 &name.value
664 ),
665 }
666 );
667
668 let mut extensions = ColumnExtensions::default();
669
670 let data_type = parser.parse_data_type().context(SyntaxSnafu)?;
671 if matches!(data_type, DataType::JSON) {
674 extensions.json_datatype_options = json::parse_json_datatype_options(parser)?;
675 }
676
677 let mut options = vec![];
678 loop {
679 if parser.parse_keyword(Keyword::CONSTRAINT) {
680 let name = Some(parser.parse_identifier().context(SyntaxSnafu)?);
681 if let Some(option) = Self::parse_optional_column_option(parser)? {
682 options.push(ColumnOptionDef { name, option });
683 } else {
684 return parser
685 .expected(
686 "constraint details after CONSTRAINT <name>",
687 parser.peek_token(),
688 )
689 .context(SyntaxSnafu);
690 }
691 } else if let Some(option) = Self::parse_optional_column_option(parser)? {
692 options.push(ColumnOptionDef { name: None, option });
693 } else if !Self::parse_column_extensions(parser, &name, &data_type, &mut extensions)? {
694 break;
695 };
696 }
697
698 Ok(Column {
699 column_def: ColumnDef {
700 name: Self::canonicalize_identifier(name),
701 data_type,
702 options,
703 },
704 extensions,
705 })
706 }
707
708 fn parse_optional_column_option(parser: &mut Parser<'_>) -> Result<Option<ColumnOption>> {
709 if parser.parse_keywords(&[Keyword::CHARACTER, Keyword::SET]) {
710 Ok(Some(ColumnOption::CharacterSet(
711 parser.parse_object_name(false).context(SyntaxSnafu)?,
712 )))
713 } else if parser.parse_keywords(&[Keyword::NOT, Keyword::NULL]) {
714 Ok(Some(ColumnOption::NotNull))
715 } else if parser.parse_keywords(&[Keyword::COMMENT]) {
716 match parser.next_token() {
717 TokenWithSpan {
718 token: Token::SingleQuotedString(value, ..),
719 ..
720 } => Ok(Some(ColumnOption::Comment(value))),
721 unexpected => parser.expected("string", unexpected).context(SyntaxSnafu),
722 }
723 } else if parser.parse_keyword(Keyword::NULL) {
724 Ok(Some(ColumnOption::Null))
725 } else if parser.parse_keyword(Keyword::DEFAULT) {
726 Ok(Some(ColumnOption::Default(
727 parser.parse_expr().context(SyntaxSnafu)?,
728 )))
729 } else if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
730 Ok(Some(ColumnOption::Unique {
731 is_primary: true,
732 characteristics: None,
733 }))
734 } else if parser.parse_keyword(Keyword::UNIQUE) {
735 Ok(Some(ColumnOption::Unique {
736 is_primary: false,
737 characteristics: None,
738 }))
739 } else if parser.parse_keywords(&[Keyword::TIME, Keyword::INDEX]) {
740 Ok(Some(ColumnOption::DialectSpecific(vec![
742 Token::Word(Word {
743 value: "TIME".to_string(),
744 quote_style: None,
745 keyword: Keyword::TIME,
746 }),
747 Token::Word(Word {
748 value: "INDEX".to_string(),
749 quote_style: None,
750 keyword: Keyword::INDEX,
751 }),
752 ])))
753 } else {
754 Ok(None)
755 }
756 }
757
758 fn parse_column_extensions(
764 parser: &mut Parser<'_>,
765 column_name: &Ident,
766 column_type: &DataType,
767 column_extensions: &mut ColumnExtensions,
768 ) -> Result<bool> {
769 if let DataType::Custom(name, tokens) = column_type
770 && name.0.len() == 1
771 && &name.0[0].to_string_unquoted().to_uppercase() == "VECTOR"
772 {
773 ensure!(
774 tokens.len() == 1,
775 InvalidColumnOptionSnafu {
776 name: column_name.to_string(),
777 msg: "VECTOR type should have dimension",
778 }
779 );
780
781 let dimension =
782 tokens[0]
783 .parse::<u32>()
784 .ok()
785 .with_context(|| InvalidColumnOptionSnafu {
786 name: column_name.to_string(),
787 msg: "dimension should be a positive integer",
788 })?;
789
790 let options = OptionMap::from([(VECTOR_OPT_DIM.to_string(), dimension.to_string())]);
791 column_extensions.vector_options = Some(options);
792 }
793
794 let mut is_index_declared = false;
796
797 if let Token::Word(word) = parser.peek_token().token
799 && word.value.eq_ignore_ascii_case(SKIPPING)
800 {
801 parser.next_token();
802 ensure!(
804 parser.parse_keyword(Keyword::INDEX),
805 InvalidColumnOptionSnafu {
806 name: column_name.to_string(),
807 msg: "expect INDEX after SKIPPING keyword",
808 }
809 );
810 ensure!(
811 column_extensions.skipping_index_options.is_none(),
812 InvalidColumnOptionSnafu {
813 name: column_name.to_string(),
814 msg: "duplicated SKIPPING index option",
815 }
816 );
817
818 let options = parser
819 .parse_options(Keyword::WITH)
820 .context(error::SyntaxSnafu)?
821 .into_iter()
822 .map(parse_option_string)
823 .collect::<Result<Vec<_>>>()?;
824
825 for (key, _) in options.iter() {
826 ensure!(
827 validate_column_skipping_index_create_option(key),
828 InvalidColumnOptionSnafu {
829 name: column_name.to_string(),
830 msg: format!("invalid SKIPPING INDEX option: {key}"),
831 }
832 );
833 }
834
835 let options = OptionMap::new(options);
836 column_extensions.skipping_index_options = Some(options);
837 is_index_declared |= true;
838 }
839
840 if parser.parse_keyword(Keyword::FULLTEXT) {
842 ensure!(
844 parser.parse_keyword(Keyword::INDEX),
845 InvalidColumnOptionSnafu {
846 name: column_name.to_string(),
847 msg: "expect INDEX after FULLTEXT keyword",
848 }
849 );
850
851 ensure!(
852 column_extensions.fulltext_index_options.is_none(),
853 InvalidColumnOptionSnafu {
854 name: column_name.to_string(),
855 msg: "duplicated FULLTEXT INDEX option",
856 }
857 );
858
859 let column_type = get_unalias_type(column_type);
860 let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
861 ensure!(
862 data_type == ConcreteDataType::string_datatype(),
863 InvalidColumnOptionSnafu {
864 name: column_name.to_string(),
865 msg: "FULLTEXT index only supports string type",
866 }
867 );
868
869 let options = parser
870 .parse_options(Keyword::WITH)
871 .context(error::SyntaxSnafu)?
872 .into_iter()
873 .map(parse_option_string)
874 .collect::<Result<Vec<_>>>()?;
875
876 for (key, _) in options.iter() {
877 ensure!(
878 validate_column_fulltext_create_option(key),
879 InvalidColumnOptionSnafu {
880 name: column_name.to_string(),
881 msg: format!("invalid FULLTEXT INDEX option: {key}"),
882 }
883 );
884 }
885
886 let options = OptionMap::new(options);
887 column_extensions.fulltext_index_options = Some(options);
888 is_index_declared |= true;
889 }
890
891 if let Token::Word(word) = parser.peek_token().token
893 && word.value.eq_ignore_ascii_case(INVERTED)
894 {
895 parser.next_token();
896 ensure!(
898 parser.parse_keyword(Keyword::INDEX),
899 InvalidColumnOptionSnafu {
900 name: column_name.to_string(),
901 msg: "expect INDEX after INVERTED keyword",
902 }
903 );
904
905 ensure!(
906 column_extensions.inverted_index_options.is_none(),
907 InvalidColumnOptionSnafu {
908 name: column_name.to_string(),
909 msg: "duplicated INVERTED index option",
910 }
911 );
912
913 let with_token = parser.peek_token();
916 ensure!(
917 with_token.token
918 != Token::Word(Word {
919 value: "WITH".to_string(),
920 keyword: Keyword::WITH,
921 quote_style: None,
922 }),
923 InvalidColumnOptionSnafu {
924 name: column_name.to_string(),
925 msg: "INVERTED index doesn't support options",
926 }
927 );
928
929 column_extensions.inverted_index_options = Some(OptionMap::default());
930 is_index_declared |= true;
931 }
932
933 if let Token::Word(word) = parser.peek_token().token
935 && word.value.eq_ignore_ascii_case(VECTOR)
936 {
937 parser.next_token();
938 ensure!(
940 parser.parse_keyword(Keyword::INDEX),
941 InvalidColumnOptionSnafu {
942 name: column_name.to_string(),
943 msg: "expect INDEX after VECTOR keyword",
944 }
945 );
946
947 ensure!(
948 column_extensions.vector_index_options.is_none(),
949 InvalidColumnOptionSnafu {
950 name: column_name.to_string(),
951 msg: "duplicated VECTOR INDEX option",
952 }
953 );
954
955 let column_type = get_unalias_type(column_type);
957 let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
958 ensure!(
959 matches!(data_type, ConcreteDataType::Vector(_)),
960 InvalidColumnOptionSnafu {
961 name: column_name.to_string(),
962 msg: "VECTOR INDEX only supports Vector type columns",
963 }
964 );
965
966 let options = parser
967 .parse_options(Keyword::WITH)
968 .context(error::SyntaxSnafu)?
969 .into_iter()
970 .map(parse_option_string)
971 .collect::<Result<Vec<_>>>()?;
972
973 for (key, _) in options.iter() {
974 ensure!(
975 validate_column_vector_index_create_option(key),
976 InvalidColumnOptionSnafu {
977 name: column_name.to_string(),
978 msg: format!("invalid VECTOR INDEX option: {key}"),
979 }
980 );
981 }
982
983 let options = OptionMap::new(options);
984 column_extensions.vector_index_options = Some(options);
985 is_index_declared |= true;
986 }
987
988 Ok(is_index_declared)
989 }
990
991 fn parse_optional_table_constraint(&mut self) -> Result<Option<TableConstraint>> {
992 match self.parser.next_token() {
993 TokenWithSpan {
994 token: Token::Word(w),
995 ..
996 } if w.keyword == Keyword::PRIMARY => {
997 self.parser
998 .expect_keyword(Keyword::KEY)
999 .context(error::UnexpectedSnafu {
1000 expected: "KEY",
1001 actual: self.peek_token_as_string(),
1002 })?;
1003 let raw_columns = self
1004 .parser
1005 .parse_parenthesized_column_list(Mandatory, false)
1006 .context(error::SyntaxSnafu)?;
1007 let columns = raw_columns
1008 .into_iter()
1009 .map(Self::canonicalize_identifier)
1010 .collect();
1011 Ok(Some(TableConstraint::PrimaryKey { columns }))
1012 }
1013 TokenWithSpan {
1014 token: Token::Word(w),
1015 ..
1016 } if w.keyword == Keyword::TIME => {
1017 self.parser
1018 .expect_keyword(Keyword::INDEX)
1019 .context(error::UnexpectedSnafu {
1020 expected: "INDEX",
1021 actual: self.peek_token_as_string(),
1022 })?;
1023
1024 let raw_columns = self
1025 .parser
1026 .parse_parenthesized_column_list(Mandatory, false)
1027 .context(error::SyntaxSnafu)?;
1028 let mut columns = raw_columns
1029 .into_iter()
1030 .map(Self::canonicalize_identifier)
1031 .collect::<Vec<_>>();
1032
1033 ensure!(
1034 columns.len() == 1,
1035 InvalidTimeIndexSnafu {
1036 msg: "it should contain only one column in time index",
1037 }
1038 );
1039
1040 Ok(Some(TableConstraint::TimeIndex {
1041 column: columns.pop().unwrap(),
1042 }))
1043 }
1044 _ => {
1045 self.parser.prev_token();
1046 Ok(None)
1047 }
1048 }
1049 }
1050
1051 fn parse_table_engine(&mut self, default: &str) -> Result<String> {
1053 if !self.consume_token(ENGINE) {
1054 return Ok(default.to_string());
1055 }
1056
1057 self.parser
1058 .expect_token(&Token::Eq)
1059 .context(error::UnexpectedSnafu {
1060 expected: "=",
1061 actual: self.peek_token_as_string(),
1062 })?;
1063
1064 let token = self.parser.next_token();
1065 if let Token::Word(w) = token.token {
1066 Ok(w.value)
1067 } else {
1068 self.expected("'Engine' is missing", token)
1069 }
1070 }
1071}
1072
1073fn validate_time_index(columns: &[Column], constraints: &[TableConstraint]) -> Result<()> {
1074 let time_index_constraints: Vec<_> = constraints
1075 .iter()
1076 .filter_map(|c| match c {
1077 TableConstraint::TimeIndex { column } => Some(column),
1078 _ => None,
1079 })
1080 .unique()
1081 .collect();
1082
1083 ensure!(!time_index_constraints.is_empty(), MissingTimeIndexSnafu);
1084 ensure!(
1085 time_index_constraints.len() == 1,
1086 InvalidTimeIndexSnafu {
1087 msg: format!(
1088 "expected only one time index constraint but actual {}",
1089 time_index_constraints.len()
1090 ),
1091 }
1092 );
1093
1094 let time_index_column_ident = &time_index_constraints[0];
1097 let time_index_column = columns
1098 .iter()
1099 .find(|c| c.name().value == *time_index_column_ident.value)
1100 .with_context(|| InvalidTimeIndexSnafu {
1101 msg: format!(
1102 "time index column {} not found in columns",
1103 time_index_column_ident
1104 ),
1105 })?;
1106
1107 let time_index_data_type = get_unalias_type(time_index_column.data_type());
1108 ensure!(
1109 matches!(time_index_data_type, DataType::Timestamp(_, _)),
1110 InvalidColumnOptionSnafu {
1111 name: time_index_column.name().to_string(),
1112 msg: "time index column data type should be timestamp",
1113 }
1114 );
1115
1116 Ok(())
1117}
1118
1119fn get_unalias_type(data_type: &DataType) -> DataType {
1120 match data_type {
1121 DataType::Custom(name, tokens) if name.0.len() == 1 && tokens.is_empty() => {
1122 if let Some(real_type) =
1123 get_data_type_by_alias_name(name.0[0].to_string_unquoted().as_str())
1124 {
1125 real_type
1126 } else {
1127 data_type.clone()
1128 }
1129 }
1130 _ => data_type.clone(),
1131 }
1132}
1133
1134fn validate_partitions(columns: &[Column], partitions: &Partitions) -> Result<()> {
1135 let partition_columns = ensure_partition_columns_defined(columns, partitions)?;
1136
1137 ensure_exprs_are_binary(&partitions.exprs, &partition_columns)?;
1138
1139 Ok(())
1140}
1141
1142fn ensure_exprs_are_binary(exprs: &[Expr], columns: &[&Column]) -> Result<()> {
1144 for expr in exprs {
1145 if let Expr::BinaryOp { left, op: _, right } = expr {
1147 ensure_one_expr(left, columns)?;
1148 ensure_one_expr(right, columns)?;
1149 } else {
1150 return error::InvalidSqlSnafu {
1151 msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1152 }
1153 .fail();
1154 }
1155 }
1156 Ok(())
1157}
1158
1159fn ensure_one_expr(expr: &Expr, columns: &[&Column]) -> Result<()> {
1163 match expr {
1164 Expr::BinaryOp { left, op: _, right } => {
1165 ensure_one_expr(left, columns)?;
1166 ensure_one_expr(right, columns)?;
1167 Ok(())
1168 }
1169 Expr::Identifier(ident) => {
1170 let column_name = &ident.value;
1171 ensure!(
1172 columns.iter().any(|c| &c.name().value == column_name),
1173 error::InvalidSqlSnafu {
1174 msg: format!(
1175 "Column {:?} in rule expr is not referenced in PARTITION ON",
1176 column_name
1177 ),
1178 }
1179 );
1180 Ok(())
1181 }
1182 Expr::Value(_) => Ok(()),
1183 Expr::UnaryOp { expr, .. } => {
1184 ensure_one_expr(expr, columns)?;
1185 Ok(())
1186 }
1187 _ => error::InvalidSqlSnafu {
1188 msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1189 }
1190 .fail(),
1191 }
1192}
1193
1194fn ensure_partition_columns_defined<'a>(
1196 columns: &'a [Column],
1197 partitions: &'a Partitions,
1198) -> Result<Vec<&'a Column>> {
1199 partitions
1200 .column_list
1201 .iter()
1202 .map(|x| {
1203 let x = ParserContext::canonicalize_identifier(x.clone());
1204 columns
1207 .iter()
1208 .find(|c| *c.name().value == x.value)
1209 .context(error::InvalidSqlSnafu {
1210 msg: format!("Partition column {:?} not defined", x.value),
1211 })
1212 })
1213 .collect::<Result<Vec<&Column>>>()
1214}
1215
1216#[cfg(test)]
1217mod tests {
1218 use std::assert_matches::assert_matches;
1219 use std::collections::HashMap;
1220
1221 use common_catalog::consts::FILE_ENGINE;
1222 use common_error::ext::ErrorExt;
1223 use sqlparser::ast::ColumnOption::NotNull;
1224 use sqlparser::ast::{BinaryOperator, Expr, ObjectName, ObjectNamePart, Value};
1225 use sqlparser::dialect::GenericDialect;
1226 use sqlparser::tokenizer::Tokenizer;
1227
1228 use super::*;
1229 use crate::dialect::GreptimeDbDialect;
1230 use crate::parser::ParseOptions;
1231
1232 #[test]
1233 fn test_parse_create_table_like() {
1234 let sql = "CREATE TABLE t1 LIKE t2";
1235 let stmts =
1236 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1237 .unwrap();
1238
1239 assert_eq!(1, stmts.len());
1240 match &stmts[0] {
1241 Statement::CreateTableLike(c) => {
1242 assert_eq!(c.table_name.to_string(), "t1");
1243 assert_eq!(c.source_name.to_string(), "t2");
1244 }
1245 _ => unreachable!(),
1246 }
1247 }
1248
1249 #[test]
1250 fn test_validate_external_table_options() {
1251 let sql = "CREATE EXTERNAL TABLE city (
1252 host string,
1253 ts timestamp,
1254 cpu float64 default 0,
1255 memory float64,
1256 TIME INDEX (ts),
1257 PRIMARY KEY(ts, host)
1258 ) with(location='/var/data/city.csv',format='csv',foo='bar');";
1259
1260 let result =
1261 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1262 assert!(matches!(
1263 result,
1264 Err(error::Error::InvalidTableOption { .. })
1265 ));
1266 }
1267
1268 #[test]
1269 fn test_parse_create_external_table() {
1270 struct Test<'a> {
1271 sql: &'a str,
1272 expected_table_name: &'a str,
1273 expected_options: HashMap<&'a str, &'a str>,
1274 expected_engine: &'a str,
1275 expected_if_not_exist: bool,
1276 }
1277
1278 let tests = [
1279 Test {
1280 sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
1281 expected_table_name: "city",
1282 expected_options: HashMap::from([
1283 ("location", "/var/data/city.csv"),
1284 ("format", "csv"),
1285 ]),
1286 expected_engine: FILE_ENGINE,
1287 expected_if_not_exist: false,
1288 },
1289 Test {
1290 sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');",
1291 expected_table_name: "city",
1292 expected_options: HashMap::from([
1293 ("location", "/var/data/city.csv"),
1294 ("format", "csv"),
1295 ]),
1296 expected_engine: "foo",
1297 expected_if_not_exist: true,
1298 },
1299 Test {
1300 sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv','compaction.type'='bar');",
1301 expected_table_name: "city",
1302 expected_options: HashMap::from([
1303 ("location", "/var/data/city.csv"),
1304 ("format", "csv"),
1305 ("compaction.type", "bar"),
1306 ]),
1307 expected_engine: "foo",
1308 expected_if_not_exist: true,
1309 },
1310 ];
1311
1312 for test in tests {
1313 let stmts = ParserContext::create_with_dialect(
1314 test.sql,
1315 &GreptimeDbDialect {},
1316 ParseOptions::default(),
1317 )
1318 .unwrap();
1319 assert_eq!(1, stmts.len());
1320 match &stmts[0] {
1321 Statement::CreateExternalTable(c) => {
1322 assert_eq!(c.name.to_string(), test.expected_table_name.to_string());
1323 assert_eq!(c.options.to_str_map(), test.expected_options);
1324 assert_eq!(c.if_not_exists, test.expected_if_not_exist);
1325 assert_eq!(c.engine, test.expected_engine);
1326 }
1327 _ => unreachable!(),
1328 }
1329 }
1330 }
1331
1332 #[test]
1333 fn test_parse_create_external_table_with_schema() {
1334 let sql = "CREATE EXTERNAL TABLE city (
1335 host string,
1336 ts timestamp,
1337 cpu float32 default 0,
1338 memory float64,
1339 TIME INDEX (ts),
1340 PRIMARY KEY(ts, host),
1341 ) with(location='/var/data/city.csv',format='csv');";
1342
1343 let options = HashMap::from([("location", "/var/data/city.csv"), ("format", "csv")]);
1344
1345 let stmts =
1346 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1347 .unwrap();
1348 assert_eq!(1, stmts.len());
1349 match &stmts[0] {
1350 Statement::CreateExternalTable(c) => {
1351 assert_eq!(c.name.to_string(), "city");
1352 assert_eq!(c.options.to_str_map(), options);
1353
1354 let columns = &c.columns;
1355 assert_column_def(&columns[0].column_def, "host", "STRING");
1356 assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
1357 assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
1358 assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
1359
1360 let constraints = &c.constraints;
1361 assert_eq!(
1362 &constraints[0],
1363 &TableConstraint::TimeIndex {
1364 column: Ident::new("ts"),
1365 }
1366 );
1367 assert_eq!(
1368 &constraints[1],
1369 &TableConstraint::PrimaryKey {
1370 columns: vec![Ident::new("ts"), Ident::new("host")]
1371 }
1372 );
1373 }
1374 _ => unreachable!(),
1375 }
1376 }
1377
1378 #[test]
1379 fn test_parse_create_database() {
1380 let sql = "create database";
1381 let result =
1382 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1383 assert!(
1384 result
1385 .unwrap_err()
1386 .to_string()
1387 .contains("Unexpected token while parsing SQL statement")
1388 );
1389
1390 let sql = "create database prometheus";
1391 let stmts =
1392 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1393 .unwrap();
1394
1395 assert_eq!(1, stmts.len());
1396 match &stmts[0] {
1397 Statement::CreateDatabase(c) => {
1398 assert_eq!(c.name.to_string(), "prometheus");
1399 assert!(!c.if_not_exists);
1400 }
1401 _ => unreachable!(),
1402 }
1403
1404 let sql = "create database if not exists prometheus";
1405 let stmts =
1406 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1407 .unwrap();
1408
1409 assert_eq!(1, stmts.len());
1410 match &stmts[0] {
1411 Statement::CreateDatabase(c) => {
1412 assert_eq!(c.name.to_string(), "prometheus");
1413 assert!(c.if_not_exists);
1414 }
1415 _ => unreachable!(),
1416 }
1417
1418 let sql = "CREATE DATABASE `fOo`";
1419 let result =
1420 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1421 let stmts = result.unwrap();
1422 match &stmts.last().unwrap() {
1423 Statement::CreateDatabase(c) => {
1424 assert_eq!(c.name, vec![Ident::with_quote('`', "fOo")].into());
1425 assert!(!c.if_not_exists);
1426 }
1427 _ => unreachable!(),
1428 }
1429
1430 let sql = "CREATE DATABASE prometheus with (ttl='1h');";
1431 let result =
1432 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1433 let stmts = result.unwrap();
1434 match &stmts[0] {
1435 Statement::CreateDatabase(c) => {
1436 assert_eq!(c.name.to_string(), "prometheus");
1437 assert!(!c.if_not_exists);
1438 assert_eq!(c.options.get("ttl").unwrap(), "1h");
1439 }
1440 _ => unreachable!(),
1441 }
1442 }
1443
1444 #[test]
1445 fn test_parse_create_flow_more_testcases() {
1446 use pretty_assertions::assert_eq;
1447 fn parse_create_flow(sql: &str) -> CreateFlow {
1448 let stmts = ParserContext::create_with_dialect(
1449 sql,
1450 &GreptimeDbDialect {},
1451 ParseOptions::default(),
1452 )
1453 .unwrap();
1454 assert_eq!(1, stmts.len());
1455 match &stmts[0] {
1456 Statement::CreateFlow(c) => c.clone(),
1457 _ => unreachable!(),
1458 }
1459 }
1460 struct CreateFlowWoutQuery {
1461 pub flow_name: ObjectName,
1463 pub sink_table_name: ObjectName,
1465 pub or_replace: bool,
1467 pub if_not_exists: bool,
1469 pub expire_after: Option<i64>,
1472 pub comment: Option<String>,
1474 }
1475 let testcases = vec![
1476 (
1477 r"
1478CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1479SINK TO schema_1.table_1
1480EXPIRE AFTER INTERVAL '5 minutes'
1481COMMENT 'test comment'
1482AS
1483SELECT max(c1), min(c2) FROM schema_2.table_2;",
1484 CreateFlowWoutQuery {
1485 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1486 sink_table_name: ObjectName::from(vec![
1487 Ident::new("schema_1"),
1488 Ident::new("table_1"),
1489 ]),
1490 or_replace: true,
1491 if_not_exists: true,
1492 expire_after: Some(300),
1493 comment: Some("test comment".to_string()),
1494 },
1495 ),
1496 (
1497 r"
1498CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1499SINK TO schema_1.table_1
1500EXPIRE AFTER INTERVAL '300 s'
1501COMMENT 'test comment'
1502AS
1503SELECT max(c1), min(c2) FROM schema_2.table_2;",
1504 CreateFlowWoutQuery {
1505 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1506 sink_table_name: ObjectName::from(vec![
1507 Ident::new("schema_1"),
1508 Ident::new("table_1"),
1509 ]),
1510 or_replace: true,
1511 if_not_exists: true,
1512 expire_after: Some(300),
1513 comment: Some("test comment".to_string()),
1514 },
1515 ),
1516 (
1517 r"
1518CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1519SINK TO schema_1.table_1
1520EXPIRE AFTER '5 minutes'
1521COMMENT 'test comment'
1522AS
1523SELECT max(c1), min(c2) FROM schema_2.table_2;",
1524 CreateFlowWoutQuery {
1525 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1526 sink_table_name: ObjectName::from(vec![
1527 Ident::new("schema_1"),
1528 Ident::new("table_1"),
1529 ]),
1530 or_replace: true,
1531 if_not_exists: true,
1532 expire_after: Some(300),
1533 comment: Some("test comment".to_string()),
1534 },
1535 ),
1536 (
1537 r"
1538CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1539SINK TO schema_1.table_1
1540EXPIRE AFTER '300 s'
1541COMMENT 'test comment'
1542AS
1543SELECT max(c1), min(c2) FROM schema_2.table_2;",
1544 CreateFlowWoutQuery {
1545 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1546 sink_table_name: ObjectName::from(vec![
1547 Ident::new("schema_1"),
1548 Ident::new("table_1"),
1549 ]),
1550 or_replace: true,
1551 if_not_exists: true,
1552 expire_after: Some(300),
1553 comment: Some("test comment".to_string()),
1554 },
1555 ),
1556 (
1557 r"
1558CREATE FLOW `task_2`
1559SINK TO schema_1.table_1
1560EXPIRE AFTER '2 days 1h 2 min'
1561AS
1562SELECT max(c1), min(c2) FROM schema_2.table_2;",
1563 CreateFlowWoutQuery {
1564 flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_2")]),
1565 sink_table_name: ObjectName::from(vec![
1566 Ident::new("schema_1"),
1567 Ident::new("table_1"),
1568 ]),
1569 or_replace: false,
1570 if_not_exists: false,
1571 expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1572 comment: None,
1573 },
1574 ),
1575 (
1576 r"
1577create flow `task_3`
1578sink to schema_1.table_1
1579expire after '10 minutes'
1580as
1581select max(c1), min(c2) from schema_2.table_2;",
1582 CreateFlowWoutQuery {
1583 flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_3")]),
1584 sink_table_name: ObjectName::from(vec![
1585 Ident::new("schema_1"),
1586 Ident::new("table_1"),
1587 ]),
1588 or_replace: false,
1589 if_not_exists: false,
1590 expire_after: Some(600), comment: None,
1592 },
1593 ),
1594 (
1595 r"
1596create or replace flow if not exists task_4
1597sink to schema_1.table_1
1598expire after interval '2 hours'
1599comment 'lowercase test'
1600as
1601select max(c1), min(c2) from schema_2.table_2;",
1602 CreateFlowWoutQuery {
1603 flow_name: ObjectName::from(vec![Ident::new("task_4")]),
1604 sink_table_name: ObjectName::from(vec![
1605 Ident::new("schema_1"),
1606 Ident::new("table_1"),
1607 ]),
1608 or_replace: true,
1609 if_not_exists: true,
1610 expire_after: Some(7200), comment: Some("lowercase test".to_string()),
1612 },
1613 ),
1614 ];
1615
1616 for (sql, expected) in testcases {
1617 let create_task = parse_create_flow(sql);
1618
1619 let expected = CreateFlow {
1620 flow_name: expected.flow_name,
1621 sink_table_name: expected.sink_table_name,
1622 or_replace: expected.or_replace,
1623 if_not_exists: expected.if_not_exists,
1624 expire_after: expected.expire_after,
1625 eval_interval: None,
1626 comment: expected.comment,
1627 query: create_task.query.clone(),
1629 };
1630
1631 assert_eq!(create_task, expected, "input sql is:\n{sql}");
1632 let show_create = create_task.to_string();
1633 let recreated = parse_create_flow(&show_create);
1634 assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1635 }
1636 }
1637
1638 #[test]
1639 fn test_parse_create_flow() {
1640 use pretty_assertions::assert_eq;
1641 fn parse_create_flow(sql: &str) -> CreateFlow {
1642 let stmts = ParserContext::create_with_dialect(
1643 sql,
1644 &GreptimeDbDialect {},
1645 ParseOptions::default(),
1646 )
1647 .unwrap();
1648 assert_eq!(1, stmts.len());
1649 match &stmts[0] {
1650 Statement::CreateFlow(c) => c.clone(),
1651 _ => panic!("{:?}", stmts[0]),
1652 }
1653 }
1654 struct CreateFlowWoutQuery {
1655 pub flow_name: ObjectName,
1657 pub sink_table_name: ObjectName,
1659 pub or_replace: bool,
1661 pub if_not_exists: bool,
1663 pub expire_after: Option<i64>,
1666 pub eval_interval: Option<i64>,
1670 pub comment: Option<String>,
1672 }
1673
1674 let testcases = vec![
1676 (
1677 r"
1678CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1679SINK TO schema_1.table_1
1680EXPIRE AFTER INTERVAL '5 minutes'
1681COMMENT 'test comment'
1682AS
1683SELECT max(c1), min(c2) FROM schema_2.table_2;",
1684 CreateFlowWoutQuery {
1685 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1686 sink_table_name: ObjectName(vec![
1687 ObjectNamePart::Identifier(Ident::new("schema_1")),
1688 ObjectNamePart::Identifier(Ident::new("table_1")),
1689 ]),
1690 or_replace: true,
1691 if_not_exists: true,
1692 expire_after: Some(300),
1693 eval_interval: None,
1694 comment: Some("test comment".to_string()),
1695 },
1696 ),
1697 (
1698 r"
1699CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1700SINK TO schema_1.table_1
1701EXPIRE AFTER INTERVAL '300 s'
1702COMMENT 'test comment'
1703AS
1704SELECT max(c1), min(c2) FROM schema_2.table_2;",
1705 CreateFlowWoutQuery {
1706 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1707 sink_table_name: ObjectName(vec![
1708 ObjectNamePart::Identifier(Ident::new("schema_1")),
1709 ObjectNamePart::Identifier(Ident::new("table_1")),
1710 ]),
1711 or_replace: true,
1712 if_not_exists: true,
1713 expire_after: Some(300),
1714 eval_interval: None,
1715 comment: Some("test comment".to_string()),
1716 },
1717 ),
1718 (
1719 r"
1720CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1721SINK TO schema_1.table_1
1722EXPIRE AFTER '5 minutes'
1723EVAL INTERVAL '10 seconds'
1724COMMENT 'test comment'
1725AS
1726SELECT max(c1), min(c2) FROM schema_2.table_2;",
1727 CreateFlowWoutQuery {
1728 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1729 sink_table_name: ObjectName(vec![
1730 ObjectNamePart::Identifier(Ident::new("schema_1")),
1731 ObjectNamePart::Identifier(Ident::new("table_1")),
1732 ]),
1733 or_replace: true,
1734 if_not_exists: true,
1735 expire_after: Some(300),
1736 eval_interval: Some(10),
1737 comment: Some("test comment".to_string()),
1738 },
1739 ),
1740 (
1741 r"
1742CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1743SINK TO schema_1.table_1
1744EXPIRE AFTER '5 minutes'
1745EVAL INTERVAL INTERVAL '10 seconds'
1746COMMENT 'test comment'
1747AS
1748SELECT max(c1), min(c2) FROM schema_2.table_2;",
1749 CreateFlowWoutQuery {
1750 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1751 sink_table_name: ObjectName(vec![
1752 ObjectNamePart::Identifier(Ident::new("schema_1")),
1753 ObjectNamePart::Identifier(Ident::new("table_1")),
1754 ]),
1755 or_replace: true,
1756 if_not_exists: true,
1757 expire_after: Some(300),
1758 eval_interval: Some(10),
1759 comment: Some("test comment".to_string()),
1760 },
1761 ),
1762 (
1763 r"
1764CREATE FLOW `task_2`
1765SINK TO schema_1.table_1
1766EXPIRE AFTER '2 days 1h 2 min'
1767AS
1768SELECT max(c1), min(c2) FROM schema_2.table_2;",
1769 CreateFlowWoutQuery {
1770 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::with_quote(
1771 '`', "task_2",
1772 ))]),
1773 sink_table_name: ObjectName(vec![
1774 ObjectNamePart::Identifier(Ident::new("schema_1")),
1775 ObjectNamePart::Identifier(Ident::new("table_1")),
1776 ]),
1777 or_replace: false,
1778 if_not_exists: false,
1779 expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1780 eval_interval: None,
1781 comment: None,
1782 },
1783 ),
1784 ];
1785
1786 for (sql, expected) in testcases {
1787 let create_task = parse_create_flow(sql);
1788
1789 let expected = CreateFlow {
1790 flow_name: expected.flow_name,
1791 sink_table_name: expected.sink_table_name,
1792 or_replace: expected.or_replace,
1793 if_not_exists: expected.if_not_exists,
1794 expire_after: expected.expire_after,
1795 eval_interval: expected.eval_interval,
1796 comment: expected.comment,
1797 query: create_task.query.clone(),
1799 };
1800
1801 assert_eq!(create_task, expected, "input sql is:\n{sql}");
1802 let show_create = create_task.to_string();
1803 let recreated = parse_create_flow(&show_create);
1804 assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1805 }
1806 }
1807
1808 #[test]
1809 fn test_create_flow_no_month() {
1810 let sql = r"
1811CREATE FLOW `task_2`
1812SINK TO schema_1.table_1
1813EXPIRE AFTER '1 month 2 days 1h 2 min'
1814AS
1815SELECT max(c1), min(c2) FROM schema_2.table_2;";
1816 let stmts =
1817 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1818
1819 assert!(
1820 stmts.is_err()
1821 && stmts
1822 .unwrap_err()
1823 .to_string()
1824 .contains("Interval with months is not allowed")
1825 );
1826 }
1827
1828 #[test]
1829 fn test_validate_create() {
1830 let sql = r"
1831CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp TIME INDEX)
1832PARTITION ON COLUMNS(c, a) (
1833 a < 10,
1834 a > 10 AND a < 20,
1835 a > 20 AND c < 100,
1836 a > 20 AND c >= 100
1837)
1838ENGINE=mito";
1839 let result =
1840 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1841 let _ = result.unwrap();
1842
1843 let sql = r"
1844CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
1845PARTITION ON COLUMNS(x) ()
1846ENGINE=mito";
1847 let result =
1848 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1849 assert!(
1850 result
1851 .unwrap_err()
1852 .to_string()
1853 .contains("Partition column \"x\" not defined")
1854 );
1855 }
1856
1857 #[test]
1858 fn test_parse_create_table_with_partitions() {
1859 let sql = r"
1860CREATE TABLE monitor (
1861 host_id INT,
1862 idc STRING,
1863 ts TIMESTAMP,
1864 cpu DOUBLE DEFAULT 0,
1865 memory DOUBLE,
1866 TIME INDEX (ts),
1867 PRIMARY KEY (host),
1868)
1869PARTITION ON COLUMNS(idc, host_id) (
1870 idc <= 'hz' AND host_id < 1000,
1871 idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
1872 idc > 'sh' AND host_id < 3000,
1873 idc > 'sh' AND host_id >= 3000,
1874)
1875ENGINE=mito";
1876 let result =
1877 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1878 .unwrap();
1879 assert_eq!(result.len(), 1);
1880 match &result[0] {
1881 Statement::CreateTable(c) => {
1882 assert!(c.partitions.is_some());
1883
1884 let partitions = c.partitions.as_ref().unwrap();
1885 let column_list = partitions
1886 .column_list
1887 .iter()
1888 .map(|x| &x.value)
1889 .collect::<Vec<&String>>();
1890 assert_eq!(column_list, vec!["idc", "host_id"]);
1891
1892 let exprs = &partitions.exprs;
1893
1894 assert_eq!(
1895 exprs[0],
1896 Expr::BinaryOp {
1897 left: Box::new(Expr::BinaryOp {
1898 left: Box::new(Expr::Identifier("idc".into())),
1899 op: BinaryOperator::LtEq,
1900 right: Box::new(Expr::Value(
1901 Value::SingleQuotedString("hz".to_string()).into()
1902 ))
1903 }),
1904 op: BinaryOperator::And,
1905 right: Box::new(Expr::BinaryOp {
1906 left: Box::new(Expr::Identifier("host_id".into())),
1907 op: BinaryOperator::Lt,
1908 right: Box::new(Expr::Value(
1909 Value::Number("1000".to_string(), false).into()
1910 ))
1911 })
1912 }
1913 );
1914 assert_eq!(
1915 exprs[1],
1916 Expr::BinaryOp {
1917 left: Box::new(Expr::BinaryOp {
1918 left: Box::new(Expr::BinaryOp {
1919 left: Box::new(Expr::Identifier("idc".into())),
1920 op: BinaryOperator::Gt,
1921 right: Box::new(Expr::Value(
1922 Value::SingleQuotedString("hz".to_string()).into()
1923 ))
1924 }),
1925 op: BinaryOperator::And,
1926 right: Box::new(Expr::BinaryOp {
1927 left: Box::new(Expr::Identifier("idc".into())),
1928 op: BinaryOperator::LtEq,
1929 right: Box::new(Expr::Value(
1930 Value::SingleQuotedString("sh".to_string()).into()
1931 ))
1932 })
1933 }),
1934 op: BinaryOperator::And,
1935 right: Box::new(Expr::BinaryOp {
1936 left: Box::new(Expr::Identifier("host_id".into())),
1937 op: BinaryOperator::Lt,
1938 right: Box::new(Expr::Value(
1939 Value::Number("2000".to_string(), false).into()
1940 ))
1941 })
1942 }
1943 );
1944 assert_eq!(
1945 exprs[2],
1946 Expr::BinaryOp {
1947 left: Box::new(Expr::BinaryOp {
1948 left: Box::new(Expr::Identifier("idc".into())),
1949 op: BinaryOperator::Gt,
1950 right: Box::new(Expr::Value(
1951 Value::SingleQuotedString("sh".to_string()).into()
1952 ))
1953 }),
1954 op: BinaryOperator::And,
1955 right: Box::new(Expr::BinaryOp {
1956 left: Box::new(Expr::Identifier("host_id".into())),
1957 op: BinaryOperator::Lt,
1958 right: Box::new(Expr::Value(
1959 Value::Number("3000".to_string(), false).into()
1960 ))
1961 })
1962 }
1963 );
1964 assert_eq!(
1965 exprs[3],
1966 Expr::BinaryOp {
1967 left: Box::new(Expr::BinaryOp {
1968 left: Box::new(Expr::Identifier("idc".into())),
1969 op: BinaryOperator::Gt,
1970 right: Box::new(Expr::Value(
1971 Value::SingleQuotedString("sh".to_string()).into()
1972 ))
1973 }),
1974 op: BinaryOperator::And,
1975 right: Box::new(Expr::BinaryOp {
1976 left: Box::new(Expr::Identifier("host_id".into())),
1977 op: BinaryOperator::GtEq,
1978 right: Box::new(Expr::Value(
1979 Value::Number("3000".to_string(), false).into()
1980 ))
1981 })
1982 }
1983 );
1984 }
1985 _ => unreachable!(),
1986 }
1987 }
1988
1989 #[test]
1990 fn test_parse_create_table_with_quoted_partitions() {
1991 let sql = r"
1992CREATE TABLE monitor (
1993 `host_id` INT,
1994 idc STRING,
1995 ts TIMESTAMP,
1996 cpu DOUBLE DEFAULT 0,
1997 memory DOUBLE,
1998 TIME INDEX (ts),
1999 PRIMARY KEY (host),
2000)
2001PARTITION ON COLUMNS(IdC, host_id) (
2002 idc <= 'hz' AND host_id < 1000,
2003 idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
2004 idc > 'sh' AND host_id < 3000,
2005 idc > 'sh' AND host_id >= 3000,
2006)";
2007 let result =
2008 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2009 .unwrap();
2010 assert_eq!(result.len(), 1);
2011 }
2012
2013 #[test]
2014 fn test_parse_create_table_with_timestamp_index() {
2015 let sql1 = r"
2016CREATE TABLE monitor (
2017 host_id INT,
2018 idc STRING,
2019 ts TIMESTAMP TIME INDEX,
2020 cpu DOUBLE DEFAULT 0,
2021 memory DOUBLE,
2022 PRIMARY KEY (host),
2023)
2024ENGINE=mito";
2025 let result1 = ParserContext::create_with_dialect(
2026 sql1,
2027 &GreptimeDbDialect {},
2028 ParseOptions::default(),
2029 )
2030 .unwrap();
2031
2032 if let Statement::CreateTable(c) = &result1[0] {
2033 assert_eq!(c.constraints.len(), 2);
2034 let tc = c.constraints[0].clone();
2035 match tc {
2036 TableConstraint::TimeIndex { column } => {
2037 assert_eq!(&column.value, "ts");
2038 }
2039 _ => panic!("should be time index constraint"),
2040 };
2041 } else {
2042 panic!("should be create_table statement");
2043 }
2044
2045 let sql2 = r"
2048CREATE TABLE monitor (
2049 host_id INT,
2050 idc STRING,
2051 ts TIMESTAMP NOT NULL,
2052 cpu DOUBLE DEFAULT 0,
2053 memory DOUBLE,
2054 TIME INDEX (ts),
2055 PRIMARY KEY (host),
2056)
2057ENGINE=mito";
2058 let result2 = ParserContext::create_with_dialect(
2059 sql2,
2060 &GreptimeDbDialect {},
2061 ParseOptions::default(),
2062 )
2063 .unwrap();
2064
2065 assert_eq!(result1, result2);
2066
2067 let sql3 = r"
2069CREATE TABLE monitor (
2070 host_id INT,
2071 idc STRING,
2072 ts TIMESTAMP,
2073 cpu DOUBLE DEFAULT 0,
2074 memory DOUBLE,
2075 TIME INDEX (ts),
2076 PRIMARY KEY (host),
2077)
2078ENGINE=mito";
2079
2080 let result3 = ParserContext::create_with_dialect(
2081 sql3,
2082 &GreptimeDbDialect {},
2083 ParseOptions::default(),
2084 )
2085 .unwrap();
2086
2087 assert_ne!(result1, result3);
2088
2089 let sql1 = r"
2091CREATE TABLE monitor (
2092 host_id INT,
2093 idc STRING,
2094 b bigint TIME INDEX,
2095 cpu DOUBLE DEFAULT 0,
2096 memory DOUBLE,
2097 PRIMARY KEY (host),
2098)
2099ENGINE=mito";
2100 let result1 = ParserContext::create_with_dialect(
2101 sql1,
2102 &GreptimeDbDialect {},
2103 ParseOptions::default(),
2104 );
2105
2106 assert!(
2107 result1
2108 .unwrap_err()
2109 .to_string()
2110 .contains("time index column data type should be timestamp")
2111 );
2112 }
2113
2114 #[test]
2115 fn test_parse_create_table_with_timestamp_index_not_null() {
2116 let sql = r"
2117CREATE TABLE monitor (
2118 host_id INT,
2119 idc STRING,
2120 ts TIMESTAMP TIME INDEX,
2121 cpu DOUBLE DEFAULT 0,
2122 memory DOUBLE,
2123 TIME INDEX (ts),
2124 PRIMARY KEY (host),
2125)
2126ENGINE=mito";
2127 let result =
2128 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2129 .unwrap();
2130
2131 assert_eq!(result.len(), 1);
2132 if let Statement::CreateTable(c) = &result[0] {
2133 let ts = c.columns[2].clone();
2134 assert_eq!(ts.name().to_string(), "ts");
2135 assert_eq!(ts.options()[0].option, NotNull);
2136 } else {
2137 panic!("should be create table statement");
2138 }
2139
2140 let sql1 = r"
2141CREATE TABLE monitor (
2142 host_id INT,
2143 idc STRING,
2144 ts TIMESTAMP NOT NULL TIME INDEX,
2145 cpu DOUBLE DEFAULT 0,
2146 memory DOUBLE,
2147 TIME INDEX (ts),
2148 PRIMARY KEY (host),
2149)
2150ENGINE=mito";
2151
2152 let result1 = ParserContext::create_with_dialect(
2153 sql1,
2154 &GreptimeDbDialect {},
2155 ParseOptions::default(),
2156 )
2157 .unwrap();
2158 assert_eq!(result, result1);
2159
2160 let sql2 = r"
2161CREATE TABLE monitor (
2162 host_id INT,
2163 idc STRING,
2164 ts TIMESTAMP TIME INDEX NOT NULL,
2165 cpu DOUBLE DEFAULT 0,
2166 memory DOUBLE,
2167 TIME INDEX (ts),
2168 PRIMARY KEY (host),
2169)
2170ENGINE=mito";
2171
2172 let result2 = ParserContext::create_with_dialect(
2173 sql2,
2174 &GreptimeDbDialect {},
2175 ParseOptions::default(),
2176 )
2177 .unwrap();
2178 assert_eq!(result, result2);
2179
2180 let sql3 = r"
2181CREATE TABLE monitor (
2182 host_id INT,
2183 idc STRING,
2184 ts TIMESTAMP TIME INDEX NULL NOT,
2185 cpu DOUBLE DEFAULT 0,
2186 memory DOUBLE,
2187 TIME INDEX (ts),
2188 PRIMARY KEY (host),
2189)
2190ENGINE=mito";
2191
2192 let result3 = ParserContext::create_with_dialect(
2193 sql3,
2194 &GreptimeDbDialect {},
2195 ParseOptions::default(),
2196 );
2197 assert!(result3.is_err());
2198
2199 let sql4 = r"
2200CREATE TABLE monitor (
2201 host_id INT,
2202 idc STRING,
2203 ts TIMESTAMP TIME INDEX NOT NULL NULL,
2204 cpu DOUBLE DEFAULT 0,
2205 memory DOUBLE,
2206 TIME INDEX (ts),
2207 PRIMARY KEY (host),
2208)
2209ENGINE=mito";
2210
2211 let result4 = ParserContext::create_with_dialect(
2212 sql4,
2213 &GreptimeDbDialect {},
2214 ParseOptions::default(),
2215 );
2216 assert!(result4.is_err());
2217
2218 let sql = r"
2219CREATE TABLE monitor (
2220 host_id INT,
2221 idc STRING,
2222 ts TIMESTAMP TIME INDEX DEFAULT CURRENT_TIMESTAMP,
2223 cpu DOUBLE DEFAULT 0,
2224 memory DOUBLE,
2225 TIME INDEX (ts),
2226 PRIMARY KEY (host),
2227)
2228ENGINE=mito";
2229
2230 let result =
2231 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2232 .unwrap();
2233
2234 if let Statement::CreateTable(c) = &result[0] {
2235 let tc = c.constraints[0].clone();
2236 match tc {
2237 TableConstraint::TimeIndex { column } => {
2238 assert_eq!(&column.value, "ts");
2239 }
2240 _ => panic!("should be time index constraint"),
2241 }
2242 let ts = c.columns[2].clone();
2243 assert_eq!(ts.name().to_string(), "ts");
2244 assert!(matches!(ts.options()[0].option, ColumnOption::Default(..)));
2245 assert_eq!(ts.options()[1].option, NotNull);
2246 } else {
2247 unreachable!("should be create table statement");
2248 }
2249 }
2250
2251 #[test]
2252 fn test_parse_partitions_with_error_syntax() {
2253 let sql = r"
2254CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2255PARTITION COLUMNS(c, a) (
2256 a < 10,
2257 a > 10 AND a < 20,
2258 a > 20 AND c < 100,
2259 a > 20 AND c >= 100
2260)
2261ENGINE=mito";
2262 let result =
2263 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2264 assert!(
2265 result
2266 .unwrap_err()
2267 .output_msg()
2268 .contains("sql parser error: Expected: ON, found: COLUMNS")
2269 );
2270 }
2271
2272 #[test]
2273 fn test_parse_partitions_without_rule() {
2274 let sql = r"
2275CREATE TABLE rcx ( a INT, b STRING, c INT, d TIMESTAMP TIME INDEX )
2276PARTITION ON COLUMNS(c, a) ()
2277ENGINE=mito";
2278 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2279 .unwrap();
2280 }
2281
2282 #[test]
2283 fn test_parse_partitions_unreferenced_column() {
2284 let sql = r"
2285CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2286PARTITION ON COLUMNS(c, a) (
2287 b = 'foo'
2288)
2289ENGINE=mito";
2290 let result =
2291 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2292 assert_eq!(
2293 result.unwrap_err().output_msg(),
2294 "Invalid SQL, error: Column \"b\" in rule expr is not referenced in PARTITION ON"
2295 );
2296 }
2297
2298 #[test]
2299 fn test_parse_partitions_not_binary_expr() {
2300 let sql = r"
2301CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2302PARTITION ON COLUMNS(c, a) (
2303 b
2304)
2305ENGINE=mito";
2306 let result =
2307 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2308 assert_eq!(
2309 result.unwrap_err().output_msg(),
2310 r#"Invalid SQL, error: Partition rule expr Identifier(Ident { value: "b", quote_style: None, span: Span(Location(4,5)..Location(4,6)) }) is not a binary expr"#
2311 );
2312 }
2313
2314 fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {
2315 assert_eq!(column.name.to_string(), name);
2316 assert_eq!(column.data_type.to_string(), data_type);
2317 }
2318
2319 #[test]
2320 pub fn test_parse_create_table() {
2321 let sql = r"create table demo(
2322 host string,
2323 ts timestamp,
2324 cpu float32 default 0,
2325 memory float64,
2326 TIME INDEX (ts),
2327 PRIMARY KEY(ts, host),
2328 ) engine=mito
2329 with(ttl='10s');
2330 ";
2331 let result =
2332 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2333 .unwrap();
2334 assert_eq!(1, result.len());
2335 match &result[0] {
2336 Statement::CreateTable(c) => {
2337 assert!(!c.if_not_exists);
2338 assert_eq!("demo", c.name.to_string());
2339 assert_eq!("mito", c.engine);
2340 assert_eq!(4, c.columns.len());
2341 let columns = &c.columns;
2342 assert_column_def(&columns[0].column_def, "host", "STRING");
2343 assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
2344 assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
2345 assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
2346
2347 let constraints = &c.constraints;
2348 assert_eq!(
2349 &constraints[0],
2350 &TableConstraint::TimeIndex {
2351 column: Ident::new("ts"),
2352 }
2353 );
2354 assert_eq!(
2355 &constraints[1],
2356 &TableConstraint::PrimaryKey {
2357 columns: vec![Ident::new("ts"), Ident::new("host")]
2358 }
2359 );
2360 assert_eq!(1, c.options.len());
2362 assert_eq!(
2363 [("ttl", "10s")].into_iter().collect::<HashMap<_, _>>(),
2364 c.options.to_str_map()
2365 );
2366 }
2367 _ => unreachable!(),
2368 }
2369 }
2370
2371 #[test]
2372 fn test_invalid_index_keys() {
2373 let sql = r"create table demo(
2374 host string,
2375 ts int64,
2376 cpu float64 default 0,
2377 memory float64,
2378 TIME INDEX (ts, host),
2379 PRIMARY KEY(ts, host)) engine=mito;
2380 ";
2381 let result =
2382 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2383 assert!(result.is_err());
2384 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2385 }
2386
2387 #[test]
2388 fn test_duplicated_time_index() {
2389 let sql = r"create table demo(
2390 host string,
2391 ts timestamp time index,
2392 t timestamp time index,
2393 cpu float64 default 0,
2394 memory float64,
2395 TIME INDEX (ts, host),
2396 PRIMARY KEY(ts, host)) engine=mito;
2397 ";
2398 let result =
2399 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2400 assert!(result.is_err());
2401 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2402
2403 let sql = r"create table demo(
2404 host string,
2405 ts timestamp time index,
2406 cpu float64 default 0,
2407 t timestamp,
2408 memory float64,
2409 TIME INDEX (t),
2410 PRIMARY KEY(ts, host)) engine=mito;
2411 ";
2412 let result =
2413 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2414 assert!(result.is_err());
2415 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2416 }
2417
2418 #[test]
2419 fn test_invalid_column_name() {
2420 let sql = "create table foo(user string, i timestamp time index)";
2421 let result =
2422 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2423 let err = result.unwrap_err().output_msg();
2424 assert!(err.contains("Cannot use keyword 'user' as column name"));
2425
2426 let sql = r#"
2428 create table foo("user" string, i timestamp time index)
2429 "#;
2430 let result =
2431 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2432 let _ = result.unwrap();
2433 }
2434
2435 #[test]
2436 fn test_incorrect_default_value_issue_3479() {
2437 let sql = r#"CREATE TABLE `ExcePTuRi`(
2438non TIMESTAMP(6) TIME INDEX,
2439`iUSTO` DOUBLE DEFAULT 0.047318541668048164
2440)"#;
2441 let result =
2442 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2443 .unwrap();
2444 assert_eq!(1, result.len());
2445 match &result[0] {
2446 Statement::CreateTable(c) => {
2447 assert_eq!(
2448 "`iUSTO` DOUBLE DEFAULT 0.047318541668048164",
2449 c.columns[1].to_string()
2450 );
2451 }
2452 _ => unreachable!(),
2453 }
2454 }
2455
2456 #[test]
2457 fn test_parse_create_view() {
2458 let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
2459
2460 let result =
2461 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2462 .unwrap();
2463 match &result[0] {
2464 Statement::CreateView(c) => {
2465 assert_eq!(c.to_string(), sql);
2466 assert!(!c.or_replace);
2467 assert!(!c.if_not_exists);
2468 assert_eq!("test", c.name.to_string());
2469 }
2470 _ => unreachable!(),
2471 }
2472
2473 let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS";
2474
2475 let result =
2476 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2477 .unwrap();
2478 match &result[0] {
2479 Statement::CreateView(c) => {
2480 assert_eq!(c.to_string(), sql);
2481 assert!(c.or_replace);
2482 assert!(c.if_not_exists);
2483 assert_eq!("test", c.name.to_string());
2484 }
2485 _ => unreachable!(),
2486 }
2487 }
2488
2489 #[test]
2490 fn test_parse_create_view_invalid_query() {
2491 let sql = "CREATE VIEW test AS DELETE from demo";
2492 let result =
2493 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2494 assert!(result.is_ok_and(|x| x.len() == 1));
2495 }
2496
2497 #[test]
2498 fn test_parse_create_table_fulltext_options() {
2499 let sql1 = r"
2500CREATE TABLE log (
2501 ts TIMESTAMP TIME INDEX,
2502 msg TEXT FULLTEXT INDEX,
2503)";
2504 let result1 = ParserContext::create_with_dialect(
2505 sql1,
2506 &GreptimeDbDialect {},
2507 ParseOptions::default(),
2508 )
2509 .unwrap();
2510
2511 if let Statement::CreateTable(c) = &result1[0] {
2512 c.columns.iter().for_each(|col| {
2513 if col.name().value == "msg" {
2514 assert!(
2515 col.extensions
2516 .fulltext_index_options
2517 .as_ref()
2518 .unwrap()
2519 .is_empty()
2520 );
2521 }
2522 });
2523 } else {
2524 panic!("should be create_table statement");
2525 }
2526
2527 let sql2 = r"
2528CREATE TABLE log (
2529 ts TIMESTAMP TIME INDEX,
2530 msg STRING FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false')
2531)";
2532 let result2 = ParserContext::create_with_dialect(
2533 sql2,
2534 &GreptimeDbDialect {},
2535 ParseOptions::default(),
2536 )
2537 .unwrap();
2538
2539 if let Statement::CreateTable(c) = &result2[0] {
2540 c.columns.iter().for_each(|col| {
2541 if col.name().value == "msg" {
2542 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2543 assert_eq!(options.len(), 2);
2544 assert_eq!(options.get("analyzer").unwrap(), "English");
2545 assert_eq!(options.get("case_sensitive").unwrap(), "false");
2546 }
2547 });
2548 } else {
2549 panic!("should be create_table statement");
2550 }
2551
2552 let sql3 = r"
2553CREATE TABLE log (
2554 ts TIMESTAMP TIME INDEX,
2555 msg1 TINYTEXT FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false'),
2556 msg2 CHAR(20) FULLTEXT INDEX WITH (analyzer='Chinese', case_sensitive='true')
2557)";
2558 let result3 = ParserContext::create_with_dialect(
2559 sql3,
2560 &GreptimeDbDialect {},
2561 ParseOptions::default(),
2562 )
2563 .unwrap();
2564
2565 if let Statement::CreateTable(c) = &result3[0] {
2566 c.columns.iter().for_each(|col| {
2567 if col.name().value == "msg1" {
2568 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2569 assert_eq!(options.len(), 2);
2570 assert_eq!(options.get("analyzer").unwrap(), "English");
2571 assert_eq!(options.get("case_sensitive").unwrap(), "false");
2572 } else if col.name().value == "msg2" {
2573 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2574 assert_eq!(options.len(), 2);
2575 assert_eq!(options.get("analyzer").unwrap(), "Chinese");
2576 assert_eq!(options.get("case_sensitive").unwrap(), "true");
2577 }
2578 });
2579 } else {
2580 panic!("should be create_table statement");
2581 }
2582 }
2583
2584 #[test]
2585 fn test_parse_create_table_fulltext_options_invalid_type() {
2586 let sql = r"
2587CREATE TABLE log (
2588 ts TIMESTAMP TIME INDEX,
2589 msg INT FULLTEXT INDEX,
2590)";
2591 let result =
2592 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2593 assert!(result.is_err());
2594 assert!(
2595 result
2596 .unwrap_err()
2597 .to_string()
2598 .contains("FULLTEXT index only supports string type")
2599 );
2600 }
2601
2602 #[test]
2603 fn test_parse_create_table_fulltext_options_duplicate() {
2604 let sql = r"
2605CREATE TABLE log (
2606 ts TIMESTAMP TIME INDEX,
2607 msg STRING FULLTEXT INDEX WITH (analyzer='English', analyzer='Chinese') FULLTEXT INDEX WITH (case_sensitive='false')
2608)";
2609 let result =
2610 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2611 assert!(result.is_err());
2612 assert!(
2613 result
2614 .unwrap_err()
2615 .to_string()
2616 .contains("duplicated FULLTEXT INDEX option")
2617 );
2618 }
2619
2620 #[test]
2621 fn test_parse_create_table_fulltext_options_invalid_option() {
2622 let sql = r"
2623CREATE TABLE log (
2624 ts TIMESTAMP TIME INDEX,
2625 msg STRING FULLTEXT INDEX WITH (analyzer='English', invalid_option='Chinese')
2626)";
2627 let result =
2628 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2629 assert!(result.is_err());
2630 assert!(
2631 result
2632 .unwrap_err()
2633 .to_string()
2634 .contains("invalid FULLTEXT INDEX option")
2635 );
2636 }
2637
2638 #[test]
2639 fn test_parse_create_table_skip_options() {
2640 let sql = r"
2641CREATE TABLE log (
2642 ts TIMESTAMP TIME INDEX,
2643 msg INT SKIPPING INDEX WITH (granularity='8192', type='bloom'),
2644)";
2645 let result =
2646 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2647 .unwrap();
2648
2649 if let Statement::CreateTable(c) = &result[0] {
2650 c.columns.iter().for_each(|col| {
2651 if col.name().value == "msg" {
2652 assert!(
2653 !col.extensions
2654 .skipping_index_options
2655 .as_ref()
2656 .unwrap()
2657 .is_empty()
2658 );
2659 }
2660 });
2661 } else {
2662 panic!("should be create_table statement");
2663 }
2664
2665 let sql = r"
2666 CREATE TABLE log (
2667 ts TIMESTAMP TIME INDEX,
2668 msg INT SKIPPING INDEX,
2669 )";
2670 let result =
2671 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2672 .unwrap();
2673
2674 if let Statement::CreateTable(c) = &result[0] {
2675 c.columns.iter().for_each(|col| {
2676 if col.name().value == "msg" {
2677 assert!(
2678 col.extensions
2679 .skipping_index_options
2680 .as_ref()
2681 .unwrap()
2682 .is_empty()
2683 );
2684 }
2685 });
2686 } else {
2687 panic!("should be create_table statement");
2688 }
2689 }
2690
2691 #[test]
2692 fn test_parse_create_view_with_columns() {
2693 let sql = "CREATE VIEW test () AS SELECT * FROM NUMBERS";
2694 let result =
2695 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2696 .unwrap();
2697
2698 match &result[0] {
2699 Statement::CreateView(c) => {
2700 assert_eq!(c.to_string(), "CREATE VIEW test AS SELECT * FROM NUMBERS");
2701 assert!(!c.or_replace);
2702 assert!(!c.if_not_exists);
2703 assert_eq!("test", c.name.to_string());
2704 }
2705 _ => unreachable!(),
2706 }
2707 assert_eq!(
2708 "CREATE VIEW test AS SELECT * FROM NUMBERS",
2709 result[0].to_string()
2710 );
2711
2712 let sql = "CREATE VIEW test (n1) AS SELECT * FROM NUMBERS";
2713 let result =
2714 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2715 .unwrap();
2716
2717 match &result[0] {
2718 Statement::CreateView(c) => {
2719 assert_eq!(c.to_string(), sql);
2720 assert!(!c.or_replace);
2721 assert!(!c.if_not_exists);
2722 assert_eq!("test", c.name.to_string());
2723 }
2724 _ => unreachable!(),
2725 }
2726 assert_eq!(sql, result[0].to_string());
2727
2728 let sql = "CREATE VIEW test (n1, n2) AS SELECT * FROM NUMBERS";
2729 let result =
2730 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2731 .unwrap();
2732
2733 match &result[0] {
2734 Statement::CreateView(c) => {
2735 assert_eq!(c.to_string(), sql);
2736 assert!(!c.or_replace);
2737 assert!(!c.if_not_exists);
2738 assert_eq!("test", c.name.to_string());
2739 }
2740 _ => unreachable!(),
2741 }
2742 assert_eq!(sql, result[0].to_string());
2743
2744 let sql = "CREATE VIEW test (n1 AS select * from demo";
2746 let result =
2747 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2748 assert!(result.is_err());
2749
2750 let sql = "CREATE VIEW test (n1, AS select * from demo";
2751 let result =
2752 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2753 assert!(result.is_err());
2754
2755 let sql = "CREATE VIEW test n1,n2) AS select * from demo";
2756 let result =
2757 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2758 assert!(result.is_err());
2759
2760 let sql = "CREATE VIEW test (1) AS select * from demo";
2761 let result =
2762 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2763 assert!(result.is_err());
2764
2765 let sql = "CREATE VIEW test (n1, select) AS select * from demo";
2767 let result =
2768 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2769 assert!(result.is_err());
2770 }
2771
2772 #[test]
2773 fn test_parse_column_extensions_vector() {
2774 let sql = "";
2776 let dialect = GenericDialect {};
2777 let mut tokenizer = Tokenizer::new(&dialect, sql);
2778 let tokens = tokenizer.tokenize().unwrap();
2779 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2780 let name = Ident::new("vec_col");
2781 let data_type =
2782 DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
2783 let mut extensions = ColumnExtensions::default();
2784
2785 let result =
2786 ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2787 assert!(result.is_ok());
2788 assert!(extensions.vector_options.is_some());
2789 let vector_options = extensions.vector_options.unwrap();
2790 assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some("128"));
2791 }
2792
2793 #[test]
2794 fn test_parse_column_extensions_vector_invalid() {
2795 let sql = "";
2797 let dialect = GenericDialect {};
2798 let mut tokenizer = Tokenizer::new(&dialect, sql);
2799 let tokens = tokenizer.tokenize().unwrap();
2800 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2801 let name = Ident::new("vec_col");
2802 let data_type = DataType::Custom(vec![Ident::new("VECTOR")].into(), vec![]);
2803 let mut extensions = ColumnExtensions::default();
2804
2805 let result =
2806 ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2807 assert!(result.is_err());
2808 }
2809
2810 #[test]
2811 fn test_parse_column_extensions_indices() {
2812 {
2814 let sql = "SKIPPING INDEX";
2815 let dialect = GenericDialect {};
2816 let mut tokenizer = Tokenizer::new(&dialect, sql);
2817 let tokens = tokenizer.tokenize().unwrap();
2818 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2819 let name = Ident::new("col");
2820 let data_type = DataType::String(None);
2821 let mut extensions = ColumnExtensions::default();
2822 let result = ParserContext::parse_column_extensions(
2823 &mut parser,
2824 &name,
2825 &data_type,
2826 &mut extensions,
2827 );
2828 assert!(result.is_ok());
2829 assert!(extensions.skipping_index_options.is_some());
2830 }
2831
2832 {
2834 let sql = "FULLTEXT INDEX WITH (analyzer = 'English', case_sensitive = 'true')";
2835 let dialect = GenericDialect {};
2836 let mut tokenizer = Tokenizer::new(&dialect, sql);
2837 let tokens = tokenizer.tokenize().unwrap();
2838 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2839 let name = Ident::new("text_col");
2840 let data_type = DataType::String(None);
2841 let mut extensions = ColumnExtensions::default();
2842 let result = ParserContext::parse_column_extensions(
2843 &mut parser,
2844 &name,
2845 &data_type,
2846 &mut extensions,
2847 );
2848 assert!(result.unwrap());
2849 assert!(extensions.fulltext_index_options.is_some());
2850 let fulltext_options = extensions.fulltext_index_options.unwrap();
2851 assert_eq!(fulltext_options.get("analyzer"), Some("English"));
2852 assert_eq!(fulltext_options.get("case_sensitive"), Some("true"));
2853 }
2854
2855 {
2857 let sql = "FULLTEXT INDEX WITH (analyzer = 'English')";
2858 let dialect = GenericDialect {};
2859 let mut tokenizer = Tokenizer::new(&dialect, sql);
2860 let tokens = tokenizer.tokenize().unwrap();
2861 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2862 let name = Ident::new("num_col");
2863 let data_type = DataType::Int(None); let mut extensions = ColumnExtensions::default();
2865 let result = ParserContext::parse_column_extensions(
2866 &mut parser,
2867 &name,
2868 &data_type,
2869 &mut extensions,
2870 );
2871 assert!(result.is_err());
2872 assert!(
2873 result
2874 .unwrap_err()
2875 .to_string()
2876 .contains("FULLTEXT index only supports string type")
2877 );
2878 }
2879
2880 {
2882 let sql = "FULLTEXT INDEX WITH (analyzer = 'Invalid', case_sensitive = 'true')";
2883 let dialect = GenericDialect {};
2884 let mut tokenizer = Tokenizer::new(&dialect, sql);
2885 let tokens = tokenizer.tokenize().unwrap();
2886 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2887 let name = Ident::new("text_col");
2888 let data_type = DataType::String(None);
2889 let mut extensions = ColumnExtensions::default();
2890 let result = ParserContext::parse_column_extensions(
2891 &mut parser,
2892 &name,
2893 &data_type,
2894 &mut extensions,
2895 );
2896 assert!(result.unwrap());
2897 }
2898
2899 {
2901 let sql = "INVERTED INDEX";
2902 let dialect = GenericDialect {};
2903 let mut tokenizer = Tokenizer::new(&dialect, sql);
2904 let tokens = tokenizer.tokenize().unwrap();
2905 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2906 let name = Ident::new("col");
2907 let data_type = DataType::String(None);
2908 let mut extensions = ColumnExtensions::default();
2909 let result = ParserContext::parse_column_extensions(
2910 &mut parser,
2911 &name,
2912 &data_type,
2913 &mut extensions,
2914 );
2915 assert!(result.is_ok());
2916 assert!(extensions.inverted_index_options.is_some());
2917 }
2918
2919 {
2921 let sql = "INVERTED INDEX WITH (analyzer = 'English')";
2922 let dialect = GenericDialect {};
2923 let mut tokenizer = Tokenizer::new(&dialect, sql);
2924 let tokens = tokenizer.tokenize().unwrap();
2925 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2926 let name = Ident::new("col");
2927 let data_type = DataType::String(None);
2928 let mut extensions = ColumnExtensions::default();
2929 let result = ParserContext::parse_column_extensions(
2930 &mut parser,
2931 &name,
2932 &data_type,
2933 &mut extensions,
2934 );
2935 assert!(result.is_err());
2936 assert!(
2937 result
2938 .unwrap_err()
2939 .to_string()
2940 .contains("INVERTED index doesn't support options")
2941 );
2942 }
2943
2944 {
2946 let sql = "SKIPPING INDEX FULLTEXT INDEX";
2947 let dialect = GenericDialect {};
2948 let mut tokenizer = Tokenizer::new(&dialect, sql);
2949 let tokens = tokenizer.tokenize().unwrap();
2950 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2951 let name = Ident::new("col");
2952 let data_type = DataType::String(None);
2953 let mut extensions = ColumnExtensions::default();
2954 let result = ParserContext::parse_column_extensions(
2955 &mut parser,
2956 &name,
2957 &data_type,
2958 &mut extensions,
2959 );
2960 assert!(result.unwrap());
2961 assert!(extensions.skipping_index_options.is_some());
2962 assert!(extensions.fulltext_index_options.is_some());
2963 }
2964 }
2965
2966 #[test]
2967 fn test_parse_interval_cast() {
2968 let s = "select '10s'::INTERVAL";
2969 let stmts =
2970 ParserContext::create_with_dialect(s, &GreptimeDbDialect {}, ParseOptions::default())
2971 .unwrap();
2972 assert_eq!("SELECT '10 seconds'::INTERVAL", &stmts[0].to_string());
2973 }
2974
2975 #[test]
2976 fn test_parse_create_table_vector_index_options() {
2977 let sql = r"
2979CREATE TABLE vectors (
2980 ts TIMESTAMP TIME INDEX,
2981 vec VECTOR(128) VECTOR INDEX,
2982)";
2983 let result =
2984 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2985 .unwrap();
2986
2987 if let Statement::CreateTable(c) = &result[0] {
2988 c.columns.iter().for_each(|col| {
2989 if col.name().value == "vec" {
2990 assert!(
2991 col.extensions
2992 .vector_index_options
2993 .as_ref()
2994 .unwrap()
2995 .is_empty()
2996 );
2997 }
2998 });
2999 } else {
3000 panic!("should be create_table statement");
3001 }
3002
3003 let sql = r"
3005CREATE TABLE vectors (
3006 ts TIMESTAMP TIME INDEX,
3007 vec VECTOR(128) VECTOR INDEX WITH (metric='cosine', connectivity='32', expansion_add='256', expansion_search='128')
3008)";
3009 let result =
3010 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3011 .unwrap();
3012
3013 if let Statement::CreateTable(c) = &result[0] {
3014 c.columns.iter().for_each(|col| {
3015 if col.name().value == "vec" {
3016 let options = col.extensions.vector_index_options.as_ref().unwrap();
3017 assert_eq!(options.len(), 4);
3018 assert_eq!(options.get("metric").unwrap(), "cosine");
3019 assert_eq!(options.get("connectivity").unwrap(), "32");
3020 assert_eq!(options.get("expansion_add").unwrap(), "256");
3021 assert_eq!(options.get("expansion_search").unwrap(), "128");
3022 }
3023 });
3024 } else {
3025 panic!("should be create_table statement");
3026 }
3027 }
3028
3029 #[test]
3030 fn test_parse_create_table_vector_index_invalid_type() {
3031 let sql = r"
3033CREATE TABLE vectors (
3034 ts TIMESTAMP TIME INDEX,
3035 col INT VECTOR INDEX,
3036)";
3037 let result =
3038 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3039 assert!(result.is_err());
3040 assert!(
3041 result
3042 .unwrap_err()
3043 .to_string()
3044 .contains("VECTOR INDEX only supports Vector type columns")
3045 );
3046 }
3047
3048 #[test]
3049 fn test_parse_create_table_vector_index_duplicate() {
3050 let sql = r"
3052CREATE TABLE vectors (
3053 ts TIMESTAMP TIME INDEX,
3054 vec VECTOR(128) VECTOR INDEX VECTOR INDEX,
3055)";
3056 let result =
3057 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3058 assert!(result.is_err());
3059 assert!(
3060 result
3061 .unwrap_err()
3062 .to_string()
3063 .contains("duplicated VECTOR INDEX option")
3064 );
3065 }
3066
3067 #[test]
3068 fn test_parse_create_table_vector_index_invalid_option() {
3069 let sql = r"
3071CREATE TABLE vectors (
3072 ts TIMESTAMP TIME INDEX,
3073 vec VECTOR(128) VECTOR INDEX WITH (metric='l2sq', invalid_option='foo')
3074)";
3075 let result =
3076 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3077 assert!(result.is_err());
3078 assert!(
3079 result
3080 .unwrap_err()
3081 .to_string()
3082 .contains("invalid VECTOR INDEX option")
3083 );
3084 }
3085
3086 #[test]
3087 fn test_parse_column_extensions_vector_index() {
3088 {
3090 let sql = "VECTOR INDEX WITH (metric = 'l2sq')";
3091 let dialect = GenericDialect {};
3092 let mut tokenizer = Tokenizer::new(&dialect, sql);
3093 let tokens = tokenizer.tokenize().unwrap();
3094 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3095 let name = Ident::new("vec_col");
3096 let data_type =
3097 DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
3098 let mut extensions = ColumnExtensions {
3100 vector_options: Some(OptionMap::from([(
3101 VECTOR_OPT_DIM.to_string(),
3102 "128".to_string(),
3103 )])),
3104 ..Default::default()
3105 };
3106
3107 let result = ParserContext::parse_column_extensions(
3108 &mut parser,
3109 &name,
3110 &data_type,
3111 &mut extensions,
3112 );
3113 assert!(result.is_ok());
3114 assert!(extensions.vector_index_options.is_some());
3115 let vi_options = extensions.vector_index_options.unwrap();
3116 assert_eq!(vi_options.get("metric"), Some("l2sq"));
3117 }
3118
3119 {
3121 let sql = "VECTOR INDEX";
3122 let dialect = GenericDialect {};
3123 let mut tokenizer = Tokenizer::new(&dialect, sql);
3124 let tokens = tokenizer.tokenize().unwrap();
3125 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3126 let name = Ident::new("num_col");
3127 let data_type = DataType::Int(None); let mut extensions = ColumnExtensions::default();
3129 let result = ParserContext::parse_column_extensions(
3130 &mut parser,
3131 &name,
3132 &data_type,
3133 &mut extensions,
3134 );
3135 assert!(result.is_err());
3136 assert!(
3137 result
3138 .unwrap_err()
3139 .to_string()
3140 .contains("VECTOR INDEX only supports Vector type columns")
3141 );
3142 }
3143 }
3144}