1mod json;
16#[cfg(feature = "enterprise")]
17pub mod trigger;
18
19use std::collections::HashMap;
20
21use arrow_buffer::IntervalMonthDayNano;
22use common_catalog::consts::default_engine;
23use datafusion_common::ScalarValue;
24use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
25use datatypes::data_type::ConcreteDataType;
26use itertools::Itertools;
27use snafu::{OptionExt, ResultExt, ensure};
28use sqlparser::ast::{
29 ColumnOption, ColumnOptionDef, DataType, Expr, KeyOrIndexDisplay, NullsDistinctOption,
30 PrimaryKeyConstraint, UniqueConstraint,
31};
32use sqlparser::dialect::keywords::Keyword;
33use sqlparser::keywords::ALL_KEYWORDS;
34use sqlparser::parser::IsOptional::Mandatory;
35use sqlparser::parser::{Parser, ParserError};
36use sqlparser::tokenizer::{Token, TokenWithSpan, Word};
37use table::requests::validate_database_option;
38
39use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
40use crate::error::{
41 self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidIntervalSnafu,
42 InvalidSqlSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result, SyntaxSnafu,
43 UnexpectedSnafu, UnsupportedSnafu,
44};
45use crate::parser::{FLOW, ParserContext};
46use crate::parsers::tql_parser;
47use crate::parsers::utils::{
48 self, parse_with_options, validate_column_fulltext_create_option,
49 validate_column_skipping_index_create_option, validate_column_vector_index_create_option,
50};
51use crate::statements::create::{
52 Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
53 CreateTableLike, CreateView, Partitions, SqlOrTql, TableConstraint, VECTOR_OPT_DIM,
54};
55use crate::statements::statement::Statement;
56use crate::statements::transform::type_alias::get_data_type_by_alias_name;
57use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
58use crate::util::{OptionValue, location_to_index, parse_option_string};
59
60pub const ENGINE: &str = "ENGINE";
61pub const MAXVALUE: &str = "MAXVALUE";
62pub const SINK: &str = "SINK";
63pub const EXPIRE: &str = "EXPIRE";
64pub const AFTER: &str = "AFTER";
65pub const INVERTED: &str = "INVERTED";
66pub const SKIPPING: &str = "SKIPPING";
67pub const VECTOR: &str = "VECTOR";
68
69pub type RawIntervalExpr = String;
70
71impl<'a> ParserContext<'a> {
73 pub(crate) fn parse_create(&mut self) -> Result<Statement> {
74 match self.parser.peek_token().token {
75 Token::Word(w) => match w.keyword {
76 Keyword::TABLE => self.parse_create_table(),
77
78 Keyword::SCHEMA | Keyword::DATABASE => self.parse_create_database(),
79
80 Keyword::EXTERNAL => self.parse_create_external_table(),
81
82 Keyword::OR => {
83 let _ = self.parser.next_token();
84 self.parser
85 .expect_keyword(Keyword::REPLACE)
86 .context(SyntaxSnafu)?;
87 match self.parser.next_token().token {
88 Token::Word(w) => match w.keyword {
89 Keyword::VIEW => self.parse_create_view(true),
90 Keyword::NoKeyword => {
91 let uppercase = w.value.to_uppercase();
92 match uppercase.as_str() {
93 FLOW => self.parse_create_flow(true),
94 _ => self.unsupported(w.to_string()),
95 }
96 }
97 _ => self.unsupported(w.to_string()),
98 },
99 _ => self.unsupported(w.to_string()),
100 }
101 }
102
103 Keyword::VIEW => {
104 let _ = self.parser.next_token();
105 self.parse_create_view(false)
106 }
107
108 #[cfg(feature = "enterprise")]
109 Keyword::TRIGGER => {
110 let _ = self.parser.next_token();
111 self.parse_create_trigger()
112 }
113
114 Keyword::NoKeyword => {
115 let _ = self.parser.next_token();
116 let uppercase = w.value.to_uppercase();
117 match uppercase.as_str() {
118 FLOW => self.parse_create_flow(false),
119 _ => self.unsupported(w.to_string()),
120 }
121 }
122 _ => self.unsupported(w.to_string()),
123 },
124 unexpected => self.unsupported(unexpected.to_string()),
125 }
126 }
127
128 fn parse_create_view(&mut self, or_replace: bool) -> Result<Statement> {
130 let if_not_exists = self.parse_if_not_exist()?;
131 let view_name = self.intern_parse_table_name()?;
132
133 let columns = self.parse_view_columns()?;
134
135 self.parser
136 .expect_keyword(Keyword::AS)
137 .context(SyntaxSnafu)?;
138
139 let query = self.parse_query()?;
140
141 Ok(Statement::CreateView(CreateView {
142 name: view_name,
143 columns,
144 or_replace,
145 query: Box::new(query),
146 if_not_exists,
147 }))
148 }
149
150 fn parse_view_columns(&mut self) -> Result<Vec<Ident>> {
151 let mut columns = vec![];
152 if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
153 return Ok(columns);
154 }
155
156 loop {
157 let name = self.parse_column_name().context(SyntaxSnafu)?;
158
159 columns.push(name);
160
161 let comma = self.parser.consume_token(&Token::Comma);
162 if self.parser.consume_token(&Token::RParen) {
163 break;
165 } else if !comma {
166 return self.expected("',' or ')' after column name", self.parser.peek_token());
167 }
168 }
169
170 Ok(columns)
171 }
172
173 fn parse_create_external_table(&mut self) -> Result<Statement> {
174 let _ = self.parser.next_token();
175 self.parser
176 .expect_keyword(Keyword::TABLE)
177 .context(SyntaxSnafu)?;
178 let if_not_exists = self.parse_if_not_exist()?;
179 let table_name = self.intern_parse_table_name()?;
180 let (columns, constraints) = self.parse_columns()?;
181 if !columns.is_empty() {
182 validate_time_index(&columns, &constraints)?;
183 }
184
185 let engine = self.parse_table_engine(common_catalog::consts::FILE_ENGINE)?;
186 let options = self.parse_create_table_options()?;
187 Ok(Statement::CreateExternalTable(CreateExternalTable {
188 name: table_name,
189 columns,
190 constraints,
191 options,
192 if_not_exists,
193 engine,
194 }))
195 }
196
197 fn parse_create_database(&mut self) -> Result<Statement> {
198 let _ = self.parser.next_token();
199 let if_not_exists = self.parse_if_not_exist()?;
200 let database_name = self.parse_object_name().context(error::UnexpectedSnafu {
201 expected: "a database name",
202 actual: self.peek_token_as_string(),
203 })?;
204 let database_name = Self::canonicalize_object_name(database_name)?;
205
206 let options = self
207 .parser
208 .parse_options(Keyword::WITH)
209 .context(SyntaxSnafu)?
210 .into_iter()
211 .map(parse_option_string)
212 .collect::<Result<HashMap<String, OptionValue>>>()?;
213
214 for key in options.keys() {
215 ensure!(
216 validate_database_option(key),
217 InvalidDatabaseOptionSnafu { key: key.clone() }
218 );
219 }
220 if let Some(append_mode) = options.get("append_mode").and_then(|x| x.as_string())
221 && append_mode == "true"
222 && options.contains_key("merge_mode")
223 {
224 return InvalidDatabaseOptionSnafu {
225 key: "merge_mode".to_string(),
226 }
227 .fail();
228 }
229
230 Ok(Statement::CreateDatabase(CreateDatabase {
231 name: database_name,
232 if_not_exists,
233 options: OptionMap::new(options),
234 }))
235 }
236
237 fn parse_create_table(&mut self) -> Result<Statement> {
238 let _ = self.parser.next_token();
239
240 let if_not_exists = self.parse_if_not_exist()?;
241
242 let table_name = self.intern_parse_table_name()?;
243
244 if self.parser.parse_keyword(Keyword::LIKE) {
245 let source_name = self.intern_parse_table_name()?;
246
247 return Ok(Statement::CreateTableLike(CreateTableLike {
248 table_name,
249 source_name,
250 }));
251 }
252
253 let (columns, constraints) = self.parse_columns()?;
254 validate_time_index(&columns, &constraints)?;
255
256 let partitions = self.parse_partitions()?;
257 if let Some(partitions) = &partitions {
258 validate_partitions(&columns, partitions)?;
259 }
260
261 let engine = self.parse_table_engine(default_engine())?;
262 let options = self.parse_create_table_options()?;
263 let create_table = CreateTable {
264 if_not_exists,
265 name: table_name,
266 columns,
267 engine,
268 constraints,
269 options,
270 table_id: 0, partitions,
272 };
273
274 Ok(Statement::CreateTable(create_table))
275 }
276
277 fn parse_create_flow(&mut self, or_replace: bool) -> Result<Statement> {
279 let if_not_exists = self.parse_if_not_exist()?;
280
281 let flow_name = self.intern_parse_table_name()?;
282
283 if let Token::Word(word) = self.parser.peek_token().token
285 && word.value.eq_ignore_ascii_case(SINK)
286 {
287 self.parser.next_token();
288 } else {
289 Err(ParserError::ParserError(
290 "Expect `SINK` keyword".to_string(),
291 ))
292 .context(SyntaxSnafu)?
293 }
294 self.parser
295 .expect_keyword(Keyword::TO)
296 .context(SyntaxSnafu)?;
297
298 let output_table_name = self.intern_parse_table_name()?;
299
300 let expire_after = if let Token::Word(w1) = &self.parser.peek_token().token
301 && w1.value.eq_ignore_ascii_case(EXPIRE)
302 {
303 self.parser.next_token();
304 if let Token::Word(w2) = &self.parser.peek_token().token
305 && w2.value.eq_ignore_ascii_case(AFTER)
306 {
307 self.parser.next_token();
308 Some(self.parse_interval_no_month("EXPIRE AFTER")?)
309 } else {
310 None
311 }
312 } else {
313 None
314 };
315
316 let eval_interval = if self
317 .parser
318 .consume_tokens(&[Token::make_keyword("EVAL"), Token::make_keyword("INTERVAL")])
319 {
320 Some(self.parse_interval_no_month("EVAL INTERVAL")?)
321 } else {
322 None
323 };
324
325 let comment = if self.parser.parse_keyword(Keyword::COMMENT) {
326 match self.parser.next_token() {
327 TokenWithSpan {
328 token: Token::SingleQuotedString(value, ..),
329 ..
330 } => Some(value),
331 unexpected => {
332 return self
333 .parser
334 .expected("string", unexpected)
335 .context(SyntaxSnafu);
336 }
337 }
338 } else {
339 None
340 };
341
342 self.parser
343 .expect_keyword(Keyword::AS)
344 .context(SyntaxSnafu)?;
345
346 let query = Box::new(self.parse_sql_or_tql(true)?);
347
348 Ok(Statement::CreateFlow(CreateFlow {
349 flow_name,
350 sink_table_name: output_table_name,
351 or_replace,
352 if_not_exists,
353 expire_after,
354 eval_interval,
355 comment,
356 query,
357 }))
358 }
359
360 fn parse_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
361 let start_loc = self.parser.peek_token().span.start;
362 let start_index = location_to_index(self.sql, &start_loc);
363
364 let query = match self.parser.peek_token().token {
366 Token::Word(w) => match w.keyword {
367 Keyword::SELECT => self.parse_query(),
368 Keyword::NoKeyword
369 if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
370 {
371 self.parse_tql(require_now_expr)
372 }
373
374 _ => self.unsupported(self.peek_token_as_string()),
375 },
376 _ => self.unsupported(self.peek_token_as_string()),
377 }?;
378
379 let end_token = self.parser.peek_token();
380
381 let raw_query = if end_token == Token::EOF {
382 &self.sql[start_index..]
383 } else {
384 let end_loc = end_token.span.end;
385 let end_index = location_to_index(self.sql, &end_loc);
386 &self.sql[start_index..end_index.min(self.sql.len())]
387 };
388 let raw_query = raw_query.trim_end_matches(";");
389 let query = SqlOrTql::try_from_statement(query, raw_query)?;
390 Ok(query)
391 }
392
393 fn parse_interval_no_month(&mut self, context: &str) -> Result<i64> {
395 let interval = self.parse_interval_month_day_nano()?.0;
396 if interval.months != 0 {
397 return InvalidIntervalSnafu {
398 reason: format!("Interval with months is not allowed in {context}"),
399 }
400 .fail();
401 }
402 Ok(
403 interval.nanoseconds / 1_000_000_000
404 + interval.days as i64 * 60 * 60 * 24
405 + interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, )
409 }
410
411 fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> {
413 let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
414 let raw_interval_expr = interval_expr.to_string();
415 let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone(), false)?
416 .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
417 .ok()
418 .with_context(|| InvalidIntervalSnafu {
419 reason: format!("cannot cast {} to interval type", interval_expr),
420 })?;
421 if let ScalarValue::IntervalMonthDayNano(Some(interval)) = interval {
422 Ok((interval, raw_interval_expr))
423 } else {
424 unreachable!()
425 }
426 }
427
428 fn parse_if_not_exist(&mut self) -> Result<bool> {
429 match self.parser.peek_token().token {
430 Token::Word(w) if Keyword::IF != w.keyword => return Ok(false),
431 _ => {}
432 }
433
434 if self.parser.parse_keywords(&[Keyword::IF, Keyword::NOT]) {
435 return self
436 .parser
437 .expect_keyword(Keyword::EXISTS)
438 .map(|_| true)
439 .context(UnexpectedSnafu {
440 expected: "EXISTS",
441 actual: self.peek_token_as_string(),
442 });
443 }
444
445 if self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]) {
446 return UnsupportedSnafu { keyword: "EXISTS" }.fail();
447 }
448
449 Ok(false)
450 }
451
452 fn parse_create_table_options(&mut self) -> Result<OptionMap> {
453 parse_with_options(&mut self.parser)
454 }
455
456 fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
458 if !self.parser.parse_keyword(Keyword::PARTITION) {
459 return Ok(None);
460 }
461 self.parser
462 .expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
463 .context(error::UnexpectedSnafu {
464 expected: "ON, COLUMNS",
465 actual: self.peek_token_as_string(),
466 })?;
467
468 let raw_column_list = self
469 .parser
470 .parse_parenthesized_column_list(Mandatory, false)
471 .context(error::SyntaxSnafu)?;
472 let column_list = raw_column_list
473 .into_iter()
474 .map(Self::canonicalize_identifier)
475 .collect();
476
477 let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
478
479 Ok(Some(Partitions { column_list, exprs }))
480 }
481
482 fn parse_partition_entry(&mut self) -> Result<Expr> {
483 self.parser.parse_expr().context(error::SyntaxSnafu)
484 }
485
486 fn parse_comma_separated<T, F>(&mut self, mut f: F) -> Result<Vec<T>>
488 where
489 F: FnMut(&mut ParserContext<'a>) -> Result<T>,
490 {
491 self.parser
492 .expect_token(&Token::LParen)
493 .context(error::UnexpectedSnafu {
494 expected: "(",
495 actual: self.peek_token_as_string(),
496 })?;
497
498 let mut values = vec![];
499 while self.parser.peek_token() != Token::RParen {
500 values.push(f(self)?);
501 if !self.parser.consume_token(&Token::Comma) {
502 break;
503 }
504 }
505
506 self.parser
507 .expect_token(&Token::RParen)
508 .context(error::UnexpectedSnafu {
509 expected: ")",
510 actual: self.peek_token_as_string(),
511 })?;
512
513 Ok(values)
514 }
515
516 fn parse_columns(&mut self) -> Result<(Vec<Column>, Vec<TableConstraint>)> {
518 let mut columns = vec![];
519 let mut constraints = vec![];
520 if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
521 return Ok((columns, constraints));
522 }
523
524 loop {
525 if let Some(constraint) = self.parse_optional_table_constraint()? {
526 constraints.push(constraint);
527 } else if let Token::Word(_) = self.parser.peek_token().token {
528 self.parse_column(&mut columns, &mut constraints)?;
529 } else {
530 return self.expected(
531 "column name or constraint definition",
532 self.parser.peek_token(),
533 );
534 }
535 let comma = self.parser.consume_token(&Token::Comma);
536 if self.parser.consume_token(&Token::RParen) {
537 break;
539 } else if !comma {
540 return self.expected(
541 "',' or ')' after column definition",
542 self.parser.peek_token(),
543 );
544 }
545 }
546
547 Ok((columns, constraints))
548 }
549
550 fn parse_column(
551 &mut self,
552 columns: &mut Vec<Column>,
553 constraints: &mut Vec<TableConstraint>,
554 ) -> Result<()> {
555 let mut column = self.parse_column_def()?;
556
557 let mut time_index_opt_idx = None;
558 for (index, opt) in column.options().iter().enumerate() {
559 if let ColumnOption::DialectSpecific(tokens) = &opt.option
560 && matches!(
561 &tokens[..],
562 [
563 Token::Word(Word {
564 keyword: Keyword::TIME,
565 ..
566 }),
567 Token::Word(Word {
568 keyword: Keyword::INDEX,
569 ..
570 })
571 ]
572 )
573 {
574 ensure!(
575 time_index_opt_idx.is_none(),
576 InvalidColumnOptionSnafu {
577 name: column.name().to_string(),
578 msg: "duplicated time index",
579 }
580 );
581 time_index_opt_idx = Some(index);
582
583 let constraint = TableConstraint::TimeIndex {
584 column: Ident::new(column.name().value.clone()),
585 };
586 constraints.push(constraint);
587 }
588 }
589
590 if let Some(index) = time_index_opt_idx {
591 ensure!(
592 !column.options().contains(&ColumnOptionDef {
593 option: ColumnOption::Null,
594 name: None,
595 }),
596 InvalidColumnOptionSnafu {
597 name: column.name().to_string(),
598 msg: "time index column can't be null",
599 }
600 );
601
602 let data_type = get_unalias_type(column.data_type());
604 ensure!(
605 matches!(data_type, DataType::Timestamp(_, _)),
606 InvalidColumnOptionSnafu {
607 name: column.name().to_string(),
608 msg: "time index column data type should be timestamp",
609 }
610 );
611
612 let not_null_opt = ColumnOptionDef {
613 option: ColumnOption::NotNull,
614 name: None,
615 };
616
617 if !column.options().contains(¬_null_opt) {
618 column.mut_options().push(not_null_opt);
619 }
620
621 let _ = column.mut_options().remove(index);
622 }
623
624 columns.push(column);
625
626 Ok(())
627 }
628
629 fn parse_column_name(&mut self) -> std::result::Result<Ident, ParserError> {
631 let name = self.parser.parse_identifier()?;
632 if name.quote_style.is_none() &&
633 ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()
635 {
636 return Err(ParserError::ParserError(format!(
637 "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
638 &name.value
639 )));
640 }
641
642 Ok(name)
643 }
644
645 pub fn parse_column_def(&mut self) -> Result<Column> {
646 let name = self.parse_column_name().context(SyntaxSnafu)?;
647 let parser = &mut self.parser;
648
649 ensure!(
650 !(name.quote_style.is_none() &&
651 ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()),
653 InvalidSqlSnafu {
654 msg: format!(
655 "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
656 &name.value
657 ),
658 }
659 );
660
661 let mut extensions = ColumnExtensions::default();
662
663 let data_type = parser.parse_data_type().context(SyntaxSnafu)?;
664 if matches!(data_type, DataType::JSON) {
667 extensions.json_datatype_options = json::parse_json_datatype_options(parser)?;
668 }
669
670 let mut options = vec![];
671 loop {
672 if parser.parse_keyword(Keyword::CONSTRAINT) {
673 let name = Some(parser.parse_identifier().context(SyntaxSnafu)?);
674 if let Some(option) = Self::parse_optional_column_option(parser)? {
675 options.push(ColumnOptionDef { name, option });
676 } else {
677 return parser
678 .expected(
679 "constraint details after CONSTRAINT <name>",
680 parser.peek_token(),
681 )
682 .context(SyntaxSnafu);
683 }
684 } else if let Some(option) = Self::parse_optional_column_option(parser)? {
685 options.push(ColumnOptionDef { name: None, option });
686 } else if !Self::parse_column_extensions(parser, &name, &data_type, &mut extensions)? {
687 break;
688 };
689 }
690
691 Ok(Column {
692 column_def: ColumnDef {
693 name: Self::canonicalize_identifier(name),
694 data_type,
695 options,
696 },
697 extensions,
698 })
699 }
700
701 fn parse_optional_column_option(parser: &mut Parser<'_>) -> Result<Option<ColumnOption>> {
702 if parser.parse_keywords(&[Keyword::CHARACTER, Keyword::SET]) {
703 Ok(Some(ColumnOption::CharacterSet(
704 parser.parse_object_name(false).context(SyntaxSnafu)?,
705 )))
706 } else if parser.parse_keywords(&[Keyword::NOT, Keyword::NULL]) {
707 Ok(Some(ColumnOption::NotNull))
708 } else if parser.parse_keywords(&[Keyword::COMMENT]) {
709 match parser.next_token() {
710 TokenWithSpan {
711 token: Token::SingleQuotedString(value, ..),
712 ..
713 } => Ok(Some(ColumnOption::Comment(value))),
714 unexpected => parser.expected("string", unexpected).context(SyntaxSnafu),
715 }
716 } else if parser.parse_keyword(Keyword::NULL) {
717 Ok(Some(ColumnOption::Null))
718 } else if parser.parse_keyword(Keyword::DEFAULT) {
719 Ok(Some(ColumnOption::Default(
720 parser.parse_expr().context(SyntaxSnafu)?,
721 )))
722 } else if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
723 Ok(Some(ColumnOption::PrimaryKey(PrimaryKeyConstraint {
724 name: None,
725 index_name: None,
726 index_type: None,
727 columns: vec![],
728 index_options: vec![],
729 characteristics: None,
730 })))
731 } else if parser.parse_keyword(Keyword::UNIQUE) {
732 Ok(Some(ColumnOption::Unique(UniqueConstraint {
733 name: None,
734 index_name: None,
735 index_type_display: KeyOrIndexDisplay::None,
736 index_type: None,
737 columns: vec![],
738 index_options: vec![],
739 characteristics: None,
740 nulls_distinct: NullsDistinctOption::None,
741 })))
742 } else if parser.parse_keywords(&[Keyword::TIME, Keyword::INDEX]) {
743 Ok(Some(ColumnOption::DialectSpecific(vec![
745 Token::Word(Word {
746 value: "TIME".to_string(),
747 quote_style: None,
748 keyword: Keyword::TIME,
749 }),
750 Token::Word(Word {
751 value: "INDEX".to_string(),
752 quote_style: None,
753 keyword: Keyword::INDEX,
754 }),
755 ])))
756 } else {
757 Ok(None)
758 }
759 }
760
761 fn parse_column_extensions(
767 parser: &mut Parser<'_>,
768 column_name: &Ident,
769 column_type: &DataType,
770 column_extensions: &mut ColumnExtensions,
771 ) -> Result<bool> {
772 if let DataType::Custom(name, tokens) = column_type
773 && name.0.len() == 1
774 && &name.0[0].to_string_unquoted().to_uppercase() == "VECTOR"
775 {
776 ensure!(
777 tokens.len() == 1,
778 InvalidColumnOptionSnafu {
779 name: column_name.to_string(),
780 msg: "VECTOR type should have dimension",
781 }
782 );
783
784 let dimension =
785 tokens[0]
786 .parse::<u32>()
787 .ok()
788 .with_context(|| InvalidColumnOptionSnafu {
789 name: column_name.to_string(),
790 msg: "dimension should be a positive integer",
791 })?;
792
793 let options = OptionMap::from([(VECTOR_OPT_DIM.to_string(), dimension.to_string())]);
794 column_extensions.vector_options = Some(options);
795 }
796
797 let mut is_index_declared = false;
799
800 if let Token::Word(word) = parser.peek_token().token
802 && word.value.eq_ignore_ascii_case(SKIPPING)
803 {
804 parser.next_token();
805 ensure!(
807 parser.parse_keyword(Keyword::INDEX),
808 InvalidColumnOptionSnafu {
809 name: column_name.to_string(),
810 msg: "expect INDEX after SKIPPING keyword",
811 }
812 );
813 ensure!(
814 column_extensions.skipping_index_options.is_none(),
815 InvalidColumnOptionSnafu {
816 name: column_name.to_string(),
817 msg: "duplicated SKIPPING index option",
818 }
819 );
820
821 let options = parser
822 .parse_options(Keyword::WITH)
823 .context(error::SyntaxSnafu)?
824 .into_iter()
825 .map(parse_option_string)
826 .collect::<Result<Vec<_>>>()?;
827
828 for (key, _) in options.iter() {
829 ensure!(
830 validate_column_skipping_index_create_option(key),
831 InvalidColumnOptionSnafu {
832 name: column_name.to_string(),
833 msg: format!("invalid SKIPPING INDEX option: {key}"),
834 }
835 );
836 }
837
838 let options = OptionMap::new(options);
839 column_extensions.skipping_index_options = Some(options);
840 is_index_declared |= true;
841 }
842
843 if parser.parse_keyword(Keyword::FULLTEXT) {
845 ensure!(
847 parser.parse_keyword(Keyword::INDEX),
848 InvalidColumnOptionSnafu {
849 name: column_name.to_string(),
850 msg: "expect INDEX after FULLTEXT keyword",
851 }
852 );
853
854 ensure!(
855 column_extensions.fulltext_index_options.is_none(),
856 InvalidColumnOptionSnafu {
857 name: column_name.to_string(),
858 msg: "duplicated FULLTEXT INDEX option",
859 }
860 );
861
862 let column_type = get_unalias_type(column_type);
863 let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
864 ensure!(
865 data_type == ConcreteDataType::string_datatype(),
866 InvalidColumnOptionSnafu {
867 name: column_name.to_string(),
868 msg: "FULLTEXT index only supports string type",
869 }
870 );
871
872 let options = parser
873 .parse_options(Keyword::WITH)
874 .context(error::SyntaxSnafu)?
875 .into_iter()
876 .map(parse_option_string)
877 .collect::<Result<Vec<_>>>()?;
878
879 for (key, _) in options.iter() {
880 ensure!(
881 validate_column_fulltext_create_option(key),
882 InvalidColumnOptionSnafu {
883 name: column_name.to_string(),
884 msg: format!("invalid FULLTEXT INDEX option: {key}"),
885 }
886 );
887 }
888
889 let options = OptionMap::new(options);
890 column_extensions.fulltext_index_options = Some(options);
891 is_index_declared |= true;
892 }
893
894 if let Token::Word(word) = parser.peek_token().token
896 && word.value.eq_ignore_ascii_case(INVERTED)
897 {
898 parser.next_token();
899 ensure!(
901 parser.parse_keyword(Keyword::INDEX),
902 InvalidColumnOptionSnafu {
903 name: column_name.to_string(),
904 msg: "expect INDEX after INVERTED keyword",
905 }
906 );
907
908 ensure!(
909 column_extensions.inverted_index_options.is_none(),
910 InvalidColumnOptionSnafu {
911 name: column_name.to_string(),
912 msg: "duplicated INVERTED index option",
913 }
914 );
915
916 let with_token = parser.peek_token();
919 ensure!(
920 with_token.token
921 != Token::Word(Word {
922 value: "WITH".to_string(),
923 keyword: Keyword::WITH,
924 quote_style: None,
925 }),
926 InvalidColumnOptionSnafu {
927 name: column_name.to_string(),
928 msg: "INVERTED index doesn't support options",
929 }
930 );
931
932 column_extensions.inverted_index_options = Some(OptionMap::default());
933 is_index_declared |= true;
934 }
935
936 if let Token::Word(word) = parser.peek_token().token
938 && word.value.eq_ignore_ascii_case(VECTOR)
939 {
940 parser.next_token();
941 ensure!(
943 parser.parse_keyword(Keyword::INDEX),
944 InvalidColumnOptionSnafu {
945 name: column_name.to_string(),
946 msg: "expect INDEX after VECTOR keyword",
947 }
948 );
949
950 ensure!(
951 column_extensions.vector_index_options.is_none(),
952 InvalidColumnOptionSnafu {
953 name: column_name.to_string(),
954 msg: "duplicated VECTOR INDEX option",
955 }
956 );
957
958 let column_type = get_unalias_type(column_type);
960 let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
961 ensure!(
962 matches!(data_type, ConcreteDataType::Vector(_)),
963 InvalidColumnOptionSnafu {
964 name: column_name.to_string(),
965 msg: "VECTOR INDEX only supports Vector type columns",
966 }
967 );
968
969 let options = parser
970 .parse_options(Keyword::WITH)
971 .context(error::SyntaxSnafu)?
972 .into_iter()
973 .map(parse_option_string)
974 .collect::<Result<Vec<_>>>()?;
975
976 for (key, _) in options.iter() {
977 ensure!(
978 validate_column_vector_index_create_option(key),
979 InvalidColumnOptionSnafu {
980 name: column_name.to_string(),
981 msg: format!("invalid VECTOR INDEX option: {key}"),
982 }
983 );
984 }
985
986 let options = OptionMap::new(options);
987 column_extensions.vector_index_options = Some(options);
988 is_index_declared |= true;
989 }
990
991 Ok(is_index_declared)
992 }
993
994 fn parse_optional_table_constraint(&mut self) -> Result<Option<TableConstraint>> {
995 match self.parser.next_token() {
996 TokenWithSpan {
997 token: Token::Word(w),
998 ..
999 } if w.keyword == Keyword::PRIMARY => {
1000 self.parser
1001 .expect_keyword(Keyword::KEY)
1002 .context(error::UnexpectedSnafu {
1003 expected: "KEY",
1004 actual: self.peek_token_as_string(),
1005 })?;
1006 let raw_columns = self
1007 .parser
1008 .parse_parenthesized_column_list(Mandatory, false)
1009 .context(error::SyntaxSnafu)?;
1010 let columns = raw_columns
1011 .into_iter()
1012 .map(Self::canonicalize_identifier)
1013 .collect();
1014 Ok(Some(TableConstraint::PrimaryKey { columns }))
1015 }
1016 TokenWithSpan {
1017 token: Token::Word(w),
1018 ..
1019 } if w.keyword == Keyword::TIME => {
1020 self.parser
1021 .expect_keyword(Keyword::INDEX)
1022 .context(error::UnexpectedSnafu {
1023 expected: "INDEX",
1024 actual: self.peek_token_as_string(),
1025 })?;
1026
1027 let raw_columns = self
1028 .parser
1029 .parse_parenthesized_column_list(Mandatory, false)
1030 .context(error::SyntaxSnafu)?;
1031 let mut columns = raw_columns
1032 .into_iter()
1033 .map(Self::canonicalize_identifier)
1034 .collect::<Vec<_>>();
1035
1036 ensure!(
1037 columns.len() == 1,
1038 InvalidTimeIndexSnafu {
1039 msg: "it should contain only one column in time index",
1040 }
1041 );
1042
1043 Ok(Some(TableConstraint::TimeIndex {
1044 column: columns.pop().unwrap(),
1045 }))
1046 }
1047 _ => {
1048 self.parser.prev_token();
1049 Ok(None)
1050 }
1051 }
1052 }
1053
1054 fn parse_table_engine(&mut self, default: &str) -> Result<String> {
1056 if !self.consume_token(ENGINE) {
1057 return Ok(default.to_string());
1058 }
1059
1060 self.parser
1061 .expect_token(&Token::Eq)
1062 .context(error::UnexpectedSnafu {
1063 expected: "=",
1064 actual: self.peek_token_as_string(),
1065 })?;
1066
1067 let token = self.parser.next_token();
1068 if let Token::Word(w) = token.token {
1069 Ok(w.value)
1070 } else {
1071 self.expected("'Engine' is missing", token)
1072 }
1073 }
1074}
1075
1076fn validate_time_index(columns: &[Column], constraints: &[TableConstraint]) -> Result<()> {
1077 let time_index_constraints: Vec<_> = constraints
1078 .iter()
1079 .filter_map(|c| match c {
1080 TableConstraint::TimeIndex { column } => Some(column),
1081 _ => None,
1082 })
1083 .unique()
1084 .collect();
1085
1086 ensure!(!time_index_constraints.is_empty(), MissingTimeIndexSnafu);
1087 ensure!(
1088 time_index_constraints.len() == 1,
1089 InvalidTimeIndexSnafu {
1090 msg: format!(
1091 "expected only one time index constraint but actual {}",
1092 time_index_constraints.len()
1093 ),
1094 }
1095 );
1096
1097 let time_index_column_ident = &time_index_constraints[0];
1100 let time_index_column = columns
1101 .iter()
1102 .find(|c| c.name().value == *time_index_column_ident.value)
1103 .with_context(|| InvalidTimeIndexSnafu {
1104 msg: format!(
1105 "time index column {} not found in columns",
1106 time_index_column_ident
1107 ),
1108 })?;
1109
1110 let time_index_data_type = get_unalias_type(time_index_column.data_type());
1111 ensure!(
1112 matches!(time_index_data_type, DataType::Timestamp(_, _)),
1113 InvalidColumnOptionSnafu {
1114 name: time_index_column.name().to_string(),
1115 msg: "time index column data type should be timestamp",
1116 }
1117 );
1118
1119 Ok(())
1120}
1121
1122fn get_unalias_type(data_type: &DataType) -> DataType {
1123 match data_type {
1124 DataType::Custom(name, tokens) if name.0.len() == 1 && tokens.is_empty() => {
1125 if let Some(real_type) =
1126 get_data_type_by_alias_name(name.0[0].to_string_unquoted().as_str())
1127 {
1128 real_type
1129 } else {
1130 data_type.clone()
1131 }
1132 }
1133 _ => data_type.clone(),
1134 }
1135}
1136
1137fn validate_partitions(columns: &[Column], partitions: &Partitions) -> Result<()> {
1138 let partition_columns = ensure_partition_columns_defined(columns, partitions)?;
1139
1140 ensure_exprs_are_binary(&partitions.exprs, &partition_columns)?;
1141
1142 Ok(())
1143}
1144
1145fn ensure_exprs_are_binary(exprs: &[Expr], columns: &[&Column]) -> Result<()> {
1147 for expr in exprs {
1148 if let Expr::BinaryOp { left, op: _, right } = expr {
1150 ensure_one_expr(left, columns)?;
1151 ensure_one_expr(right, columns)?;
1152 } else {
1153 return error::InvalidSqlSnafu {
1154 msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1155 }
1156 .fail();
1157 }
1158 }
1159 Ok(())
1160}
1161
1162fn ensure_one_expr(expr: &Expr, columns: &[&Column]) -> Result<()> {
1166 match expr {
1167 Expr::BinaryOp { left, op: _, right } => {
1168 ensure_one_expr(left, columns)?;
1169 ensure_one_expr(right, columns)?;
1170 Ok(())
1171 }
1172 Expr::Identifier(ident) => {
1173 let column_name = &ident.value;
1174 ensure!(
1175 columns.iter().any(|c| &c.name().value == column_name),
1176 error::InvalidSqlSnafu {
1177 msg: format!(
1178 "Column {:?} in rule expr is not referenced in PARTITION ON",
1179 column_name
1180 ),
1181 }
1182 );
1183 Ok(())
1184 }
1185 Expr::Value(_) => Ok(()),
1186 Expr::UnaryOp { expr, .. } => {
1187 ensure_one_expr(expr, columns)?;
1188 Ok(())
1189 }
1190 _ => error::InvalidSqlSnafu {
1191 msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1192 }
1193 .fail(),
1194 }
1195}
1196
1197fn ensure_partition_columns_defined<'a>(
1199 columns: &'a [Column],
1200 partitions: &'a Partitions,
1201) -> Result<Vec<&'a Column>> {
1202 partitions
1203 .column_list
1204 .iter()
1205 .map(|x| {
1206 let x = ParserContext::canonicalize_identifier(x.clone());
1207 columns
1210 .iter()
1211 .find(|c| *c.name().value == x.value)
1212 .context(error::InvalidSqlSnafu {
1213 msg: format!("Partition column {:?} not defined", x.value),
1214 })
1215 })
1216 .collect::<Result<Vec<&Column>>>()
1217}
1218
1219#[cfg(test)]
1220mod tests {
1221 use std::assert_matches::assert_matches;
1222 use std::collections::HashMap;
1223
1224 use common_catalog::consts::FILE_ENGINE;
1225 use common_error::ext::ErrorExt;
1226 use sqlparser::ast::ColumnOption::NotNull;
1227 use sqlparser::ast::{BinaryOperator, Expr, ObjectName, ObjectNamePart, Value};
1228 use sqlparser::dialect::GenericDialect;
1229 use sqlparser::tokenizer::Tokenizer;
1230
1231 use super::*;
1232 use crate::dialect::GreptimeDbDialect;
1233 use crate::parser::ParseOptions;
1234
1235 #[test]
1236 fn test_parse_create_table_like() {
1237 let sql = "CREATE TABLE t1 LIKE t2";
1238 let stmts =
1239 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1240 .unwrap();
1241
1242 assert_eq!(1, stmts.len());
1243 match &stmts[0] {
1244 Statement::CreateTableLike(c) => {
1245 assert_eq!(c.table_name.to_string(), "t1");
1246 assert_eq!(c.source_name.to_string(), "t2");
1247 }
1248 _ => unreachable!(),
1249 }
1250 }
1251
1252 #[test]
1253 fn test_validate_external_table_options() {
1254 let sql = "CREATE EXTERNAL TABLE city (
1255 host string,
1256 ts timestamp,
1257 cpu float64 default 0,
1258 memory float64,
1259 TIME INDEX (ts),
1260 PRIMARY KEY(ts, host)
1261 ) with(location='/var/data/city.csv',format='csv',foo='bar');";
1262
1263 let result =
1264 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1265 assert!(matches!(
1266 result,
1267 Err(error::Error::InvalidTableOption { .. })
1268 ));
1269 }
1270
1271 #[test]
1272 fn test_parse_create_external_table() {
1273 struct Test<'a> {
1274 sql: &'a str,
1275 expected_table_name: &'a str,
1276 expected_options: HashMap<&'a str, &'a str>,
1277 expected_engine: &'a str,
1278 expected_if_not_exist: bool,
1279 }
1280
1281 let tests = [
1282 Test {
1283 sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
1284 expected_table_name: "city",
1285 expected_options: HashMap::from([
1286 ("location", "/var/data/city.csv"),
1287 ("format", "csv"),
1288 ]),
1289 expected_engine: FILE_ENGINE,
1290 expected_if_not_exist: false,
1291 },
1292 Test {
1293 sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');",
1294 expected_table_name: "city",
1295 expected_options: HashMap::from([
1296 ("location", "/var/data/city.csv"),
1297 ("format", "csv"),
1298 ]),
1299 expected_engine: "foo",
1300 expected_if_not_exist: true,
1301 },
1302 Test {
1303 sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv','compaction.type'='bar');",
1304 expected_table_name: "city",
1305 expected_options: HashMap::from([
1306 ("location", "/var/data/city.csv"),
1307 ("format", "csv"),
1308 ("compaction.type", "bar"),
1309 ]),
1310 expected_engine: "foo",
1311 expected_if_not_exist: true,
1312 },
1313 ];
1314
1315 for test in tests {
1316 let stmts = ParserContext::create_with_dialect(
1317 test.sql,
1318 &GreptimeDbDialect {},
1319 ParseOptions::default(),
1320 )
1321 .unwrap();
1322 assert_eq!(1, stmts.len());
1323 match &stmts[0] {
1324 Statement::CreateExternalTable(c) => {
1325 assert_eq!(c.name.to_string(), test.expected_table_name.to_string());
1326 assert_eq!(c.options.to_str_map(), test.expected_options);
1327 assert_eq!(c.if_not_exists, test.expected_if_not_exist);
1328 assert_eq!(c.engine, test.expected_engine);
1329 }
1330 _ => unreachable!(),
1331 }
1332 }
1333 }
1334
1335 #[test]
1336 fn test_parse_create_external_table_with_schema() {
1337 let sql = "CREATE EXTERNAL TABLE city (
1338 host string,
1339 ts timestamp,
1340 cpu float32 default 0,
1341 memory float64,
1342 TIME INDEX (ts),
1343 PRIMARY KEY(ts, host),
1344 ) with(location='/var/data/city.csv',format='csv');";
1345
1346 let options = HashMap::from([("location", "/var/data/city.csv"), ("format", "csv")]);
1347
1348 let stmts =
1349 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1350 .unwrap();
1351 assert_eq!(1, stmts.len());
1352 match &stmts[0] {
1353 Statement::CreateExternalTable(c) => {
1354 assert_eq!(c.name.to_string(), "city");
1355 assert_eq!(c.options.to_str_map(), options);
1356
1357 let columns = &c.columns;
1358 assert_column_def(&columns[0].column_def, "host", "STRING");
1359 assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
1360 assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
1361 assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
1362
1363 let constraints = &c.constraints;
1364 assert_eq!(
1365 &constraints[0],
1366 &TableConstraint::TimeIndex {
1367 column: Ident::new("ts"),
1368 }
1369 );
1370 assert_eq!(
1371 &constraints[1],
1372 &TableConstraint::PrimaryKey {
1373 columns: vec![Ident::new("ts"), Ident::new("host")]
1374 }
1375 );
1376 }
1377 _ => unreachable!(),
1378 }
1379 }
1380
1381 #[test]
1382 fn test_parse_create_database() {
1383 let sql = "create database";
1384 let result =
1385 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1386 assert!(
1387 result
1388 .unwrap_err()
1389 .to_string()
1390 .contains("Unexpected token while parsing SQL statement")
1391 );
1392
1393 let sql = "create database prometheus";
1394 let stmts =
1395 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1396 .unwrap();
1397
1398 assert_eq!(1, stmts.len());
1399 match &stmts[0] {
1400 Statement::CreateDatabase(c) => {
1401 assert_eq!(c.name.to_string(), "prometheus");
1402 assert!(!c.if_not_exists);
1403 }
1404 _ => unreachable!(),
1405 }
1406
1407 let sql = "create database if not exists prometheus";
1408 let stmts =
1409 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1410 .unwrap();
1411
1412 assert_eq!(1, stmts.len());
1413 match &stmts[0] {
1414 Statement::CreateDatabase(c) => {
1415 assert_eq!(c.name.to_string(), "prometheus");
1416 assert!(c.if_not_exists);
1417 }
1418 _ => unreachable!(),
1419 }
1420
1421 let sql = "CREATE DATABASE `fOo`";
1422 let result =
1423 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1424 let stmts = result.unwrap();
1425 match &stmts.last().unwrap() {
1426 Statement::CreateDatabase(c) => {
1427 assert_eq!(c.name, vec![Ident::with_quote('`', "fOo")].into());
1428 assert!(!c.if_not_exists);
1429 }
1430 _ => unreachable!(),
1431 }
1432
1433 let sql = "CREATE DATABASE prometheus with (ttl='1h');";
1434 let result =
1435 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1436 let stmts = result.unwrap();
1437 match &stmts[0] {
1438 Statement::CreateDatabase(c) => {
1439 assert_eq!(c.name.to_string(), "prometheus");
1440 assert!(!c.if_not_exists);
1441 assert_eq!(c.options.get("ttl").unwrap(), "1h");
1442 }
1443 _ => unreachable!(),
1444 }
1445 }
1446
1447 #[test]
1448 fn test_parse_create_flow_more_testcases() {
1449 use pretty_assertions::assert_eq;
1450 fn parse_create_flow(sql: &str) -> CreateFlow {
1451 let stmts = ParserContext::create_with_dialect(
1452 sql,
1453 &GreptimeDbDialect {},
1454 ParseOptions::default(),
1455 )
1456 .unwrap();
1457 assert_eq!(1, stmts.len());
1458 match &stmts[0] {
1459 Statement::CreateFlow(c) => c.clone(),
1460 _ => unreachable!(),
1461 }
1462 }
1463 struct CreateFlowWoutQuery {
1464 pub flow_name: ObjectName,
1466 pub sink_table_name: ObjectName,
1468 pub or_replace: bool,
1470 pub if_not_exists: bool,
1472 pub expire_after: Option<i64>,
1475 pub comment: Option<String>,
1477 }
1478 let testcases = vec![
1479 (
1480 r"
1481CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1482SINK TO schema_1.table_1
1483EXPIRE AFTER INTERVAL '5 minutes'
1484COMMENT 'test comment'
1485AS
1486SELECT max(c1), min(c2) FROM schema_2.table_2;",
1487 CreateFlowWoutQuery {
1488 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1489 sink_table_name: ObjectName::from(vec![
1490 Ident::new("schema_1"),
1491 Ident::new("table_1"),
1492 ]),
1493 or_replace: true,
1494 if_not_exists: true,
1495 expire_after: Some(300),
1496 comment: Some("test comment".to_string()),
1497 },
1498 ),
1499 (
1500 r"
1501CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1502SINK TO schema_1.table_1
1503EXPIRE AFTER INTERVAL '300 s'
1504COMMENT 'test comment'
1505AS
1506SELECT max(c1), min(c2) FROM schema_2.table_2;",
1507 CreateFlowWoutQuery {
1508 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1509 sink_table_name: ObjectName::from(vec![
1510 Ident::new("schema_1"),
1511 Ident::new("table_1"),
1512 ]),
1513 or_replace: true,
1514 if_not_exists: true,
1515 expire_after: Some(300),
1516 comment: Some("test comment".to_string()),
1517 },
1518 ),
1519 (
1520 r"
1521CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1522SINK TO schema_1.table_1
1523EXPIRE AFTER '5 minutes'
1524COMMENT 'test comment'
1525AS
1526SELECT max(c1), min(c2) FROM schema_2.table_2;",
1527 CreateFlowWoutQuery {
1528 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1529 sink_table_name: ObjectName::from(vec![
1530 Ident::new("schema_1"),
1531 Ident::new("table_1"),
1532 ]),
1533 or_replace: true,
1534 if_not_exists: true,
1535 expire_after: Some(300),
1536 comment: Some("test comment".to_string()),
1537 },
1538 ),
1539 (
1540 r"
1541CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1542SINK TO schema_1.table_1
1543EXPIRE AFTER '300 s'
1544COMMENT 'test comment'
1545AS
1546SELECT max(c1), min(c2) FROM schema_2.table_2;",
1547 CreateFlowWoutQuery {
1548 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1549 sink_table_name: ObjectName::from(vec![
1550 Ident::new("schema_1"),
1551 Ident::new("table_1"),
1552 ]),
1553 or_replace: true,
1554 if_not_exists: true,
1555 expire_after: Some(300),
1556 comment: Some("test comment".to_string()),
1557 },
1558 ),
1559 (
1560 r"
1561CREATE FLOW `task_2`
1562SINK TO schema_1.table_1
1563EXPIRE AFTER '2 days 1h 2 min'
1564AS
1565SELECT max(c1), min(c2) FROM schema_2.table_2;",
1566 CreateFlowWoutQuery {
1567 flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_2")]),
1568 sink_table_name: ObjectName::from(vec![
1569 Ident::new("schema_1"),
1570 Ident::new("table_1"),
1571 ]),
1572 or_replace: false,
1573 if_not_exists: false,
1574 expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1575 comment: None,
1576 },
1577 ),
1578 (
1579 r"
1580create flow `task_3`
1581sink to schema_1.table_1
1582expire after '10 minutes'
1583as
1584select max(c1), min(c2) from schema_2.table_2;",
1585 CreateFlowWoutQuery {
1586 flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_3")]),
1587 sink_table_name: ObjectName::from(vec![
1588 Ident::new("schema_1"),
1589 Ident::new("table_1"),
1590 ]),
1591 or_replace: false,
1592 if_not_exists: false,
1593 expire_after: Some(600), comment: None,
1595 },
1596 ),
1597 (
1598 r"
1599create or replace flow if not exists task_4
1600sink to schema_1.table_1
1601expire after interval '2 hours'
1602comment 'lowercase test'
1603as
1604select max(c1), min(c2) from schema_2.table_2;",
1605 CreateFlowWoutQuery {
1606 flow_name: ObjectName::from(vec![Ident::new("task_4")]),
1607 sink_table_name: ObjectName::from(vec![
1608 Ident::new("schema_1"),
1609 Ident::new("table_1"),
1610 ]),
1611 or_replace: true,
1612 if_not_exists: true,
1613 expire_after: Some(7200), comment: Some("lowercase test".to_string()),
1615 },
1616 ),
1617 ];
1618
1619 for (sql, expected) in testcases {
1620 let create_task = parse_create_flow(sql);
1621
1622 let expected = CreateFlow {
1623 flow_name: expected.flow_name,
1624 sink_table_name: expected.sink_table_name,
1625 or_replace: expected.or_replace,
1626 if_not_exists: expected.if_not_exists,
1627 expire_after: expected.expire_after,
1628 eval_interval: None,
1629 comment: expected.comment,
1630 query: create_task.query.clone(),
1632 };
1633
1634 assert_eq!(create_task, expected, "input sql is:\n{sql}");
1635 let show_create = create_task.to_string();
1636 let recreated = parse_create_flow(&show_create);
1637 assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1638 }
1639 }
1640
1641 #[test]
1642 fn test_parse_create_flow() {
1643 use pretty_assertions::assert_eq;
1644 fn parse_create_flow(sql: &str) -> CreateFlow {
1645 let stmts = ParserContext::create_with_dialect(
1646 sql,
1647 &GreptimeDbDialect {},
1648 ParseOptions::default(),
1649 )
1650 .unwrap();
1651 assert_eq!(1, stmts.len());
1652 match &stmts[0] {
1653 Statement::CreateFlow(c) => c.clone(),
1654 _ => panic!("{:?}", stmts[0]),
1655 }
1656 }
1657 struct CreateFlowWoutQuery {
1658 pub flow_name: ObjectName,
1660 pub sink_table_name: ObjectName,
1662 pub or_replace: bool,
1664 pub if_not_exists: bool,
1666 pub expire_after: Option<i64>,
1669 pub eval_interval: Option<i64>,
1673 pub comment: Option<String>,
1675 }
1676
1677 let testcases = vec![
1679 (
1680 r"
1681CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1682SINK TO schema_1.table_1
1683EXPIRE AFTER INTERVAL '5 minutes'
1684COMMENT 'test comment'
1685AS
1686SELECT max(c1), min(c2) FROM schema_2.table_2;",
1687 CreateFlowWoutQuery {
1688 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1689 sink_table_name: ObjectName(vec![
1690 ObjectNamePart::Identifier(Ident::new("schema_1")),
1691 ObjectNamePart::Identifier(Ident::new("table_1")),
1692 ]),
1693 or_replace: true,
1694 if_not_exists: true,
1695 expire_after: Some(300),
1696 eval_interval: None,
1697 comment: Some("test comment".to_string()),
1698 },
1699 ),
1700 (
1701 r"
1702CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1703SINK TO schema_1.table_1
1704EXPIRE AFTER INTERVAL '300 s'
1705COMMENT 'test comment'
1706AS
1707SELECT max(c1), min(c2) FROM schema_2.table_2;",
1708 CreateFlowWoutQuery {
1709 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1710 sink_table_name: ObjectName(vec![
1711 ObjectNamePart::Identifier(Ident::new("schema_1")),
1712 ObjectNamePart::Identifier(Ident::new("table_1")),
1713 ]),
1714 or_replace: true,
1715 if_not_exists: true,
1716 expire_after: Some(300),
1717 eval_interval: None,
1718 comment: Some("test comment".to_string()),
1719 },
1720 ),
1721 (
1722 r"
1723CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1724SINK TO schema_1.table_1
1725EXPIRE AFTER '5 minutes'
1726EVAL INTERVAL '10 seconds'
1727COMMENT 'test comment'
1728AS
1729SELECT max(c1), min(c2) FROM schema_2.table_2;",
1730 CreateFlowWoutQuery {
1731 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1732 sink_table_name: ObjectName(vec![
1733 ObjectNamePart::Identifier(Ident::new("schema_1")),
1734 ObjectNamePart::Identifier(Ident::new("table_1")),
1735 ]),
1736 or_replace: true,
1737 if_not_exists: true,
1738 expire_after: Some(300),
1739 eval_interval: Some(10),
1740 comment: Some("test comment".to_string()),
1741 },
1742 ),
1743 (
1744 r"
1745CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1746SINK TO schema_1.table_1
1747EXPIRE AFTER '5 minutes'
1748EVAL INTERVAL INTERVAL '10 seconds'
1749COMMENT 'test comment'
1750AS
1751SELECT max(c1), min(c2) FROM schema_2.table_2;",
1752 CreateFlowWoutQuery {
1753 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1754 sink_table_name: ObjectName(vec![
1755 ObjectNamePart::Identifier(Ident::new("schema_1")),
1756 ObjectNamePart::Identifier(Ident::new("table_1")),
1757 ]),
1758 or_replace: true,
1759 if_not_exists: true,
1760 expire_after: Some(300),
1761 eval_interval: Some(10),
1762 comment: Some("test comment".to_string()),
1763 },
1764 ),
1765 (
1766 r"
1767CREATE FLOW `task_2`
1768SINK TO schema_1.table_1
1769EXPIRE AFTER '2 days 1h 2 min'
1770AS
1771SELECT max(c1), min(c2) FROM schema_2.table_2;",
1772 CreateFlowWoutQuery {
1773 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::with_quote(
1774 '`', "task_2",
1775 ))]),
1776 sink_table_name: ObjectName(vec![
1777 ObjectNamePart::Identifier(Ident::new("schema_1")),
1778 ObjectNamePart::Identifier(Ident::new("table_1")),
1779 ]),
1780 or_replace: false,
1781 if_not_exists: false,
1782 expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1783 eval_interval: None,
1784 comment: None,
1785 },
1786 ),
1787 ];
1788
1789 for (sql, expected) in testcases {
1790 let create_task = parse_create_flow(sql);
1791
1792 let expected = CreateFlow {
1793 flow_name: expected.flow_name,
1794 sink_table_name: expected.sink_table_name,
1795 or_replace: expected.or_replace,
1796 if_not_exists: expected.if_not_exists,
1797 expire_after: expected.expire_after,
1798 eval_interval: expected.eval_interval,
1799 comment: expected.comment,
1800 query: create_task.query.clone(),
1802 };
1803
1804 assert_eq!(create_task, expected, "input sql is:\n{sql}");
1805 let show_create = create_task.to_string();
1806 let recreated = parse_create_flow(&show_create);
1807 assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1808 }
1809 }
1810
1811 #[test]
1812 fn test_create_flow_no_month() {
1813 let sql = r"
1814CREATE FLOW `task_2`
1815SINK TO schema_1.table_1
1816EXPIRE AFTER '1 month 2 days 1h 2 min'
1817AS
1818SELECT max(c1), min(c2) FROM schema_2.table_2;";
1819 let stmts =
1820 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1821
1822 assert!(
1823 stmts.is_err()
1824 && stmts
1825 .unwrap_err()
1826 .to_string()
1827 .contains("Interval with months is not allowed")
1828 );
1829 }
1830
1831 #[test]
1832 fn test_validate_create() {
1833 let sql = r"
1834CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp TIME INDEX)
1835PARTITION ON COLUMNS(c, a) (
1836 a < 10,
1837 a > 10 AND a < 20,
1838 a > 20 AND c < 100,
1839 a > 20 AND c >= 100
1840)
1841ENGINE=mito";
1842 let result =
1843 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1844 let _ = result.unwrap();
1845
1846 let sql = r"
1847CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
1848PARTITION ON COLUMNS(x) ()
1849ENGINE=mito";
1850 let result =
1851 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1852 assert!(
1853 result
1854 .unwrap_err()
1855 .to_string()
1856 .contains("Partition column \"x\" not defined")
1857 );
1858 }
1859
1860 #[test]
1861 fn test_parse_create_table_with_partitions() {
1862 let sql = r"
1863CREATE TABLE monitor (
1864 host_id INT,
1865 idc STRING,
1866 ts TIMESTAMP,
1867 cpu DOUBLE DEFAULT 0,
1868 memory DOUBLE,
1869 TIME INDEX (ts),
1870 PRIMARY KEY (host),
1871)
1872PARTITION ON COLUMNS(idc, host_id) (
1873 idc <= 'hz' AND host_id < 1000,
1874 idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
1875 idc > 'sh' AND host_id < 3000,
1876 idc > 'sh' AND host_id >= 3000,
1877)
1878ENGINE=mito";
1879 let result =
1880 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1881 .unwrap();
1882 assert_eq!(result.len(), 1);
1883 match &result[0] {
1884 Statement::CreateTable(c) => {
1885 assert!(c.partitions.is_some());
1886
1887 let partitions = c.partitions.as_ref().unwrap();
1888 let column_list = partitions
1889 .column_list
1890 .iter()
1891 .map(|x| &x.value)
1892 .collect::<Vec<&String>>();
1893 assert_eq!(column_list, vec!["idc", "host_id"]);
1894
1895 let exprs = &partitions.exprs;
1896
1897 assert_eq!(
1898 exprs[0],
1899 Expr::BinaryOp {
1900 left: Box::new(Expr::BinaryOp {
1901 left: Box::new(Expr::Identifier("idc".into())),
1902 op: BinaryOperator::LtEq,
1903 right: Box::new(Expr::Value(
1904 Value::SingleQuotedString("hz".to_string()).into()
1905 ))
1906 }),
1907 op: BinaryOperator::And,
1908 right: Box::new(Expr::BinaryOp {
1909 left: Box::new(Expr::Identifier("host_id".into())),
1910 op: BinaryOperator::Lt,
1911 right: Box::new(Expr::Value(
1912 Value::Number("1000".to_string(), false).into()
1913 ))
1914 })
1915 }
1916 );
1917 assert_eq!(
1918 exprs[1],
1919 Expr::BinaryOp {
1920 left: Box::new(Expr::BinaryOp {
1921 left: Box::new(Expr::BinaryOp {
1922 left: Box::new(Expr::Identifier("idc".into())),
1923 op: BinaryOperator::Gt,
1924 right: Box::new(Expr::Value(
1925 Value::SingleQuotedString("hz".to_string()).into()
1926 ))
1927 }),
1928 op: BinaryOperator::And,
1929 right: Box::new(Expr::BinaryOp {
1930 left: Box::new(Expr::Identifier("idc".into())),
1931 op: BinaryOperator::LtEq,
1932 right: Box::new(Expr::Value(
1933 Value::SingleQuotedString("sh".to_string()).into()
1934 ))
1935 })
1936 }),
1937 op: BinaryOperator::And,
1938 right: Box::new(Expr::BinaryOp {
1939 left: Box::new(Expr::Identifier("host_id".into())),
1940 op: BinaryOperator::Lt,
1941 right: Box::new(Expr::Value(
1942 Value::Number("2000".to_string(), false).into()
1943 ))
1944 })
1945 }
1946 );
1947 assert_eq!(
1948 exprs[2],
1949 Expr::BinaryOp {
1950 left: Box::new(Expr::BinaryOp {
1951 left: Box::new(Expr::Identifier("idc".into())),
1952 op: BinaryOperator::Gt,
1953 right: Box::new(Expr::Value(
1954 Value::SingleQuotedString("sh".to_string()).into()
1955 ))
1956 }),
1957 op: BinaryOperator::And,
1958 right: Box::new(Expr::BinaryOp {
1959 left: Box::new(Expr::Identifier("host_id".into())),
1960 op: BinaryOperator::Lt,
1961 right: Box::new(Expr::Value(
1962 Value::Number("3000".to_string(), false).into()
1963 ))
1964 })
1965 }
1966 );
1967 assert_eq!(
1968 exprs[3],
1969 Expr::BinaryOp {
1970 left: Box::new(Expr::BinaryOp {
1971 left: Box::new(Expr::Identifier("idc".into())),
1972 op: BinaryOperator::Gt,
1973 right: Box::new(Expr::Value(
1974 Value::SingleQuotedString("sh".to_string()).into()
1975 ))
1976 }),
1977 op: BinaryOperator::And,
1978 right: Box::new(Expr::BinaryOp {
1979 left: Box::new(Expr::Identifier("host_id".into())),
1980 op: BinaryOperator::GtEq,
1981 right: Box::new(Expr::Value(
1982 Value::Number("3000".to_string(), false).into()
1983 ))
1984 })
1985 }
1986 );
1987 }
1988 _ => unreachable!(),
1989 }
1990 }
1991
1992 #[test]
1993 fn test_parse_create_table_with_quoted_partitions() {
1994 let sql = r"
1995CREATE TABLE monitor (
1996 `host_id` INT,
1997 idc STRING,
1998 ts TIMESTAMP,
1999 cpu DOUBLE DEFAULT 0,
2000 memory DOUBLE,
2001 TIME INDEX (ts),
2002 PRIMARY KEY (host),
2003)
2004PARTITION ON COLUMNS(IdC, host_id) (
2005 idc <= 'hz' AND host_id < 1000,
2006 idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
2007 idc > 'sh' AND host_id < 3000,
2008 idc > 'sh' AND host_id >= 3000,
2009)";
2010 let result =
2011 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2012 .unwrap();
2013 assert_eq!(result.len(), 1);
2014 }
2015
2016 #[test]
2017 fn test_parse_create_table_with_timestamp_index() {
2018 let sql1 = r"
2019CREATE TABLE monitor (
2020 host_id INT,
2021 idc STRING,
2022 ts TIMESTAMP TIME INDEX,
2023 cpu DOUBLE DEFAULT 0,
2024 memory DOUBLE,
2025 PRIMARY KEY (host),
2026)
2027ENGINE=mito";
2028 let result1 = ParserContext::create_with_dialect(
2029 sql1,
2030 &GreptimeDbDialect {},
2031 ParseOptions::default(),
2032 )
2033 .unwrap();
2034
2035 if let Statement::CreateTable(c) = &result1[0] {
2036 assert_eq!(c.constraints.len(), 2);
2037 let tc = c.constraints[0].clone();
2038 match tc {
2039 TableConstraint::TimeIndex { column } => {
2040 assert_eq!(&column.value, "ts");
2041 }
2042 _ => panic!("should be time index constraint"),
2043 };
2044 } else {
2045 panic!("should be create_table statement");
2046 }
2047
2048 let sql2 = r"
2051CREATE TABLE monitor (
2052 host_id INT,
2053 idc STRING,
2054 ts TIMESTAMP NOT NULL,
2055 cpu DOUBLE DEFAULT 0,
2056 memory DOUBLE,
2057 TIME INDEX (ts),
2058 PRIMARY KEY (host),
2059)
2060ENGINE=mito";
2061 let result2 = ParserContext::create_with_dialect(
2062 sql2,
2063 &GreptimeDbDialect {},
2064 ParseOptions::default(),
2065 )
2066 .unwrap();
2067
2068 assert_eq!(result1, result2);
2069
2070 let sql3 = r"
2072CREATE TABLE monitor (
2073 host_id INT,
2074 idc STRING,
2075 ts TIMESTAMP,
2076 cpu DOUBLE DEFAULT 0,
2077 memory DOUBLE,
2078 TIME INDEX (ts),
2079 PRIMARY KEY (host),
2080)
2081ENGINE=mito";
2082
2083 let result3 = ParserContext::create_with_dialect(
2084 sql3,
2085 &GreptimeDbDialect {},
2086 ParseOptions::default(),
2087 )
2088 .unwrap();
2089
2090 assert_ne!(result1, result3);
2091
2092 let sql1 = r"
2094CREATE TABLE monitor (
2095 host_id INT,
2096 idc STRING,
2097 b bigint TIME INDEX,
2098 cpu DOUBLE DEFAULT 0,
2099 memory DOUBLE,
2100 PRIMARY KEY (host),
2101)
2102ENGINE=mito";
2103 let result1 = ParserContext::create_with_dialect(
2104 sql1,
2105 &GreptimeDbDialect {},
2106 ParseOptions::default(),
2107 );
2108
2109 assert!(
2110 result1
2111 .unwrap_err()
2112 .to_string()
2113 .contains("time index column data type should be timestamp")
2114 );
2115 }
2116
2117 #[test]
2118 fn test_parse_create_table_with_timestamp_index_not_null() {
2119 let sql = r"
2120CREATE TABLE monitor (
2121 host_id INT,
2122 idc STRING,
2123 ts TIMESTAMP TIME INDEX,
2124 cpu DOUBLE DEFAULT 0,
2125 memory DOUBLE,
2126 TIME INDEX (ts),
2127 PRIMARY KEY (host),
2128)
2129ENGINE=mito";
2130 let result =
2131 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2132 .unwrap();
2133
2134 assert_eq!(result.len(), 1);
2135 if let Statement::CreateTable(c) = &result[0] {
2136 let ts = c.columns[2].clone();
2137 assert_eq!(ts.name().to_string(), "ts");
2138 assert_eq!(ts.options()[0].option, NotNull);
2139 } else {
2140 panic!("should be create table statement");
2141 }
2142
2143 let sql1 = r"
2144CREATE TABLE monitor (
2145 host_id INT,
2146 idc STRING,
2147 ts TIMESTAMP NOT NULL TIME INDEX,
2148 cpu DOUBLE DEFAULT 0,
2149 memory DOUBLE,
2150 TIME INDEX (ts),
2151 PRIMARY KEY (host),
2152)
2153ENGINE=mito";
2154
2155 let result1 = ParserContext::create_with_dialect(
2156 sql1,
2157 &GreptimeDbDialect {},
2158 ParseOptions::default(),
2159 )
2160 .unwrap();
2161 assert_eq!(result, result1);
2162
2163 let sql2 = r"
2164CREATE TABLE monitor (
2165 host_id INT,
2166 idc STRING,
2167 ts TIMESTAMP TIME INDEX NOT NULL,
2168 cpu DOUBLE DEFAULT 0,
2169 memory DOUBLE,
2170 TIME INDEX (ts),
2171 PRIMARY KEY (host),
2172)
2173ENGINE=mito";
2174
2175 let result2 = ParserContext::create_with_dialect(
2176 sql2,
2177 &GreptimeDbDialect {},
2178 ParseOptions::default(),
2179 )
2180 .unwrap();
2181 assert_eq!(result, result2);
2182
2183 let sql3 = r"
2184CREATE TABLE monitor (
2185 host_id INT,
2186 idc STRING,
2187 ts TIMESTAMP TIME INDEX NULL NOT,
2188 cpu DOUBLE DEFAULT 0,
2189 memory DOUBLE,
2190 TIME INDEX (ts),
2191 PRIMARY KEY (host),
2192)
2193ENGINE=mito";
2194
2195 let result3 = ParserContext::create_with_dialect(
2196 sql3,
2197 &GreptimeDbDialect {},
2198 ParseOptions::default(),
2199 );
2200 assert!(result3.is_err());
2201
2202 let sql4 = r"
2203CREATE TABLE monitor (
2204 host_id INT,
2205 idc STRING,
2206 ts TIMESTAMP TIME INDEX NOT NULL NULL,
2207 cpu DOUBLE DEFAULT 0,
2208 memory DOUBLE,
2209 TIME INDEX (ts),
2210 PRIMARY KEY (host),
2211)
2212ENGINE=mito";
2213
2214 let result4 = ParserContext::create_with_dialect(
2215 sql4,
2216 &GreptimeDbDialect {},
2217 ParseOptions::default(),
2218 );
2219 assert!(result4.is_err());
2220
2221 let sql = r"
2222CREATE TABLE monitor (
2223 host_id INT,
2224 idc STRING,
2225 ts TIMESTAMP TIME INDEX DEFAULT CURRENT_TIMESTAMP,
2226 cpu DOUBLE DEFAULT 0,
2227 memory DOUBLE,
2228 TIME INDEX (ts),
2229 PRIMARY KEY (host),
2230)
2231ENGINE=mito";
2232
2233 let result =
2234 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2235 .unwrap();
2236
2237 if let Statement::CreateTable(c) = &result[0] {
2238 let tc = c.constraints[0].clone();
2239 match tc {
2240 TableConstraint::TimeIndex { column } => {
2241 assert_eq!(&column.value, "ts");
2242 }
2243 _ => panic!("should be time index constraint"),
2244 }
2245 let ts = c.columns[2].clone();
2246 assert_eq!(ts.name().to_string(), "ts");
2247 assert!(matches!(ts.options()[0].option, ColumnOption::Default(..)));
2248 assert_eq!(ts.options()[1].option, NotNull);
2249 } else {
2250 unreachable!("should be create table statement");
2251 }
2252 }
2253
2254 #[test]
2255 fn test_parse_partitions_with_error_syntax() {
2256 let sql = r"
2257CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2258PARTITION COLUMNS(c, a) (
2259 a < 10,
2260 a > 10 AND a < 20,
2261 a > 20 AND c < 100,
2262 a > 20 AND c >= 100
2263)
2264ENGINE=mito";
2265 let result =
2266 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2267 assert!(
2268 result
2269 .unwrap_err()
2270 .output_msg()
2271 .contains("sql parser error: Expected: ON, found: COLUMNS")
2272 );
2273 }
2274
2275 #[test]
2276 fn test_parse_partitions_without_rule() {
2277 let sql = r"
2278CREATE TABLE rcx ( a INT, b STRING, c INT, d TIMESTAMP TIME INDEX )
2279PARTITION ON COLUMNS(c, a) ()
2280ENGINE=mito";
2281 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2282 .unwrap();
2283 }
2284
2285 #[test]
2286 fn test_parse_partitions_unreferenced_column() {
2287 let sql = r"
2288CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2289PARTITION ON COLUMNS(c, a) (
2290 b = 'foo'
2291)
2292ENGINE=mito";
2293 let result =
2294 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2295 assert_eq!(
2296 result.unwrap_err().output_msg(),
2297 "Invalid SQL, error: Column \"b\" in rule expr is not referenced in PARTITION ON"
2298 );
2299 }
2300
2301 #[test]
2302 fn test_parse_partitions_not_binary_expr() {
2303 let sql = r"
2304CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2305PARTITION ON COLUMNS(c, a) (
2306 b
2307)
2308ENGINE=mito";
2309 let result =
2310 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2311 assert_eq!(
2312 result.unwrap_err().output_msg(),
2313 r#"Invalid SQL, error: Partition rule expr Identifier(Ident { value: "b", quote_style: None, span: Span(Location(4,5)..Location(4,6)) }) is not a binary expr"#
2314 );
2315 }
2316
2317 fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {
2318 assert_eq!(column.name.to_string(), name);
2319 assert_eq!(column.data_type.to_string(), data_type);
2320 }
2321
2322 #[test]
2323 pub fn test_parse_create_table() {
2324 let sql = r"create table demo(
2325 host string,
2326 ts timestamp,
2327 cpu float32 default 0,
2328 memory float64,
2329 TIME INDEX (ts),
2330 PRIMARY KEY(ts, host),
2331 ) engine=mito
2332 with(ttl='10s');
2333 ";
2334 let result =
2335 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2336 .unwrap();
2337 assert_eq!(1, result.len());
2338 match &result[0] {
2339 Statement::CreateTable(c) => {
2340 assert!(!c.if_not_exists);
2341 assert_eq!("demo", c.name.to_string());
2342 assert_eq!("mito", c.engine);
2343 assert_eq!(4, c.columns.len());
2344 let columns = &c.columns;
2345 assert_column_def(&columns[0].column_def, "host", "STRING");
2346 assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
2347 assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
2348 assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
2349
2350 let constraints = &c.constraints;
2351 assert_eq!(
2352 &constraints[0],
2353 &TableConstraint::TimeIndex {
2354 column: Ident::new("ts"),
2355 }
2356 );
2357 assert_eq!(
2358 &constraints[1],
2359 &TableConstraint::PrimaryKey {
2360 columns: vec![Ident::new("ts"), Ident::new("host")]
2361 }
2362 );
2363 assert_eq!(1, c.options.len());
2365 assert_eq!(
2366 [("ttl", "10s")].into_iter().collect::<HashMap<_, _>>(),
2367 c.options.to_str_map()
2368 );
2369 }
2370 _ => unreachable!(),
2371 }
2372 }
2373
2374 #[test]
2375 fn test_invalid_index_keys() {
2376 let sql = r"create table demo(
2377 host string,
2378 ts int64,
2379 cpu float64 default 0,
2380 memory float64,
2381 TIME INDEX (ts, host),
2382 PRIMARY KEY(ts, host)) engine=mito;
2383 ";
2384 let result =
2385 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2386 assert!(result.is_err());
2387 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2388 }
2389
2390 #[test]
2391 fn test_duplicated_time_index() {
2392 let sql = r"create table demo(
2393 host string,
2394 ts timestamp time index,
2395 t timestamp time index,
2396 cpu float64 default 0,
2397 memory float64,
2398 TIME INDEX (ts, host),
2399 PRIMARY KEY(ts, host)) engine=mito;
2400 ";
2401 let result =
2402 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2403 assert!(result.is_err());
2404 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2405
2406 let sql = r"create table demo(
2407 host string,
2408 ts timestamp time index,
2409 cpu float64 default 0,
2410 t timestamp,
2411 memory float64,
2412 TIME INDEX (t),
2413 PRIMARY KEY(ts, host)) engine=mito;
2414 ";
2415 let result =
2416 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2417 assert!(result.is_err());
2418 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2419 }
2420
2421 #[test]
2422 fn test_invalid_column_name() {
2423 let sql = "create table foo(user string, i timestamp time index)";
2424 let result =
2425 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2426 let err = result.unwrap_err().output_msg();
2427 assert!(err.contains("Cannot use keyword 'user' as column name"));
2428
2429 let sql = r#"
2431 create table foo("user" string, i timestamp time index)
2432 "#;
2433 let result =
2434 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2435 let _ = result.unwrap();
2436 }
2437
2438 #[test]
2439 fn test_incorrect_default_value_issue_3479() {
2440 let sql = r#"CREATE TABLE `ExcePTuRi`(
2441non TIMESTAMP(6) TIME INDEX,
2442`iUSTO` DOUBLE DEFAULT 0.047318541668048164
2443)"#;
2444 let result =
2445 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2446 .unwrap();
2447 assert_eq!(1, result.len());
2448 match &result[0] {
2449 Statement::CreateTable(c) => {
2450 assert_eq!(
2451 "`iUSTO` DOUBLE DEFAULT 0.047318541668048164",
2452 c.columns[1].to_string()
2453 );
2454 }
2455 _ => unreachable!(),
2456 }
2457 }
2458
2459 #[test]
2460 fn test_parse_create_view() {
2461 let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
2462
2463 let result =
2464 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2465 .unwrap();
2466 match &result[0] {
2467 Statement::CreateView(c) => {
2468 assert_eq!(c.to_string(), sql);
2469 assert!(!c.or_replace);
2470 assert!(!c.if_not_exists);
2471 assert_eq!("test", c.name.to_string());
2472 }
2473 _ => unreachable!(),
2474 }
2475
2476 let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS";
2477
2478 let result =
2479 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2480 .unwrap();
2481 match &result[0] {
2482 Statement::CreateView(c) => {
2483 assert_eq!(c.to_string(), sql);
2484 assert!(c.or_replace);
2485 assert!(c.if_not_exists);
2486 assert_eq!("test", c.name.to_string());
2487 }
2488 _ => unreachable!(),
2489 }
2490 }
2491
2492 #[test]
2493 fn test_parse_create_view_invalid_query() {
2494 let sql = "CREATE VIEW test AS DELETE from demo";
2495 let result =
2496 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2497 assert!(result.is_ok_and(|x| x.len() == 1));
2498 }
2499
2500 #[test]
2501 fn test_parse_create_table_fulltext_options() {
2502 let sql1 = r"
2503CREATE TABLE log (
2504 ts TIMESTAMP TIME INDEX,
2505 msg TEXT FULLTEXT INDEX,
2506)";
2507 let result1 = ParserContext::create_with_dialect(
2508 sql1,
2509 &GreptimeDbDialect {},
2510 ParseOptions::default(),
2511 )
2512 .unwrap();
2513
2514 if let Statement::CreateTable(c) = &result1[0] {
2515 c.columns.iter().for_each(|col| {
2516 if col.name().value == "msg" {
2517 assert!(
2518 col.extensions
2519 .fulltext_index_options
2520 .as_ref()
2521 .unwrap()
2522 .is_empty()
2523 );
2524 }
2525 });
2526 } else {
2527 panic!("should be create_table statement");
2528 }
2529
2530 let sql2 = r"
2531CREATE TABLE log (
2532 ts TIMESTAMP TIME INDEX,
2533 msg STRING FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false')
2534)";
2535 let result2 = ParserContext::create_with_dialect(
2536 sql2,
2537 &GreptimeDbDialect {},
2538 ParseOptions::default(),
2539 )
2540 .unwrap();
2541
2542 if let Statement::CreateTable(c) = &result2[0] {
2543 c.columns.iter().for_each(|col| {
2544 if col.name().value == "msg" {
2545 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2546 assert_eq!(options.len(), 2);
2547 assert_eq!(options.get("analyzer").unwrap(), "English");
2548 assert_eq!(options.get("case_sensitive").unwrap(), "false");
2549 }
2550 });
2551 } else {
2552 panic!("should be create_table statement");
2553 }
2554
2555 let sql3 = r"
2556CREATE TABLE log (
2557 ts TIMESTAMP TIME INDEX,
2558 msg1 TINYTEXT FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false'),
2559 msg2 CHAR(20) FULLTEXT INDEX WITH (analyzer='Chinese', case_sensitive='true')
2560)";
2561 let result3 = ParserContext::create_with_dialect(
2562 sql3,
2563 &GreptimeDbDialect {},
2564 ParseOptions::default(),
2565 )
2566 .unwrap();
2567
2568 if let Statement::CreateTable(c) = &result3[0] {
2569 c.columns.iter().for_each(|col| {
2570 if col.name().value == "msg1" {
2571 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2572 assert_eq!(options.len(), 2);
2573 assert_eq!(options.get("analyzer").unwrap(), "English");
2574 assert_eq!(options.get("case_sensitive").unwrap(), "false");
2575 } else if col.name().value == "msg2" {
2576 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2577 assert_eq!(options.len(), 2);
2578 assert_eq!(options.get("analyzer").unwrap(), "Chinese");
2579 assert_eq!(options.get("case_sensitive").unwrap(), "true");
2580 }
2581 });
2582 } else {
2583 panic!("should be create_table statement");
2584 }
2585 }
2586
2587 #[test]
2588 fn test_parse_create_table_fulltext_options_invalid_type() {
2589 let sql = r"
2590CREATE TABLE log (
2591 ts TIMESTAMP TIME INDEX,
2592 msg INT FULLTEXT INDEX,
2593)";
2594 let result =
2595 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2596 assert!(result.is_err());
2597 assert!(
2598 result
2599 .unwrap_err()
2600 .to_string()
2601 .contains("FULLTEXT index only supports string type")
2602 );
2603 }
2604
2605 #[test]
2606 fn test_parse_create_table_fulltext_options_duplicate() {
2607 let sql = r"
2608CREATE TABLE log (
2609 ts TIMESTAMP TIME INDEX,
2610 msg STRING FULLTEXT INDEX WITH (analyzer='English', analyzer='Chinese') FULLTEXT INDEX WITH (case_sensitive='false')
2611)";
2612 let result =
2613 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2614 assert!(result.is_err());
2615 assert!(
2616 result
2617 .unwrap_err()
2618 .to_string()
2619 .contains("duplicated FULLTEXT INDEX option")
2620 );
2621 }
2622
2623 #[test]
2624 fn test_parse_create_table_fulltext_options_invalid_option() {
2625 let sql = r"
2626CREATE TABLE log (
2627 ts TIMESTAMP TIME INDEX,
2628 msg STRING FULLTEXT INDEX WITH (analyzer='English', invalid_option='Chinese')
2629)";
2630 let result =
2631 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2632 assert!(result.is_err());
2633 assert!(
2634 result
2635 .unwrap_err()
2636 .to_string()
2637 .contains("invalid FULLTEXT INDEX option")
2638 );
2639 }
2640
2641 #[test]
2642 fn test_parse_create_table_skip_options() {
2643 let sql = r"
2644CREATE TABLE log (
2645 ts TIMESTAMP TIME INDEX,
2646 msg INT SKIPPING INDEX WITH (granularity='8192', type='bloom'),
2647)";
2648 let result =
2649 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2650 .unwrap();
2651
2652 if let Statement::CreateTable(c) = &result[0] {
2653 c.columns.iter().for_each(|col| {
2654 if col.name().value == "msg" {
2655 assert!(
2656 !col.extensions
2657 .skipping_index_options
2658 .as_ref()
2659 .unwrap()
2660 .is_empty()
2661 );
2662 }
2663 });
2664 } else {
2665 panic!("should be create_table statement");
2666 }
2667
2668 let sql = r"
2669 CREATE TABLE log (
2670 ts TIMESTAMP TIME INDEX,
2671 msg INT SKIPPING INDEX,
2672 )";
2673 let result =
2674 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2675 .unwrap();
2676
2677 if let Statement::CreateTable(c) = &result[0] {
2678 c.columns.iter().for_each(|col| {
2679 if col.name().value == "msg" {
2680 assert!(
2681 col.extensions
2682 .skipping_index_options
2683 .as_ref()
2684 .unwrap()
2685 .is_empty()
2686 );
2687 }
2688 });
2689 } else {
2690 panic!("should be create_table statement");
2691 }
2692 }
2693
2694 #[test]
2695 fn test_parse_create_view_with_columns() {
2696 let sql = "CREATE VIEW test () AS SELECT * FROM NUMBERS";
2697 let result =
2698 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2699 .unwrap();
2700
2701 match &result[0] {
2702 Statement::CreateView(c) => {
2703 assert_eq!(c.to_string(), "CREATE VIEW test AS SELECT * FROM NUMBERS");
2704 assert!(!c.or_replace);
2705 assert!(!c.if_not_exists);
2706 assert_eq!("test", c.name.to_string());
2707 }
2708 _ => unreachable!(),
2709 }
2710 assert_eq!(
2711 "CREATE VIEW test AS SELECT * FROM NUMBERS",
2712 result[0].to_string()
2713 );
2714
2715 let sql = "CREATE VIEW test (n1) AS SELECT * FROM NUMBERS";
2716 let result =
2717 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2718 .unwrap();
2719
2720 match &result[0] {
2721 Statement::CreateView(c) => {
2722 assert_eq!(c.to_string(), sql);
2723 assert!(!c.or_replace);
2724 assert!(!c.if_not_exists);
2725 assert_eq!("test", c.name.to_string());
2726 }
2727 _ => unreachable!(),
2728 }
2729 assert_eq!(sql, result[0].to_string());
2730
2731 let sql = "CREATE VIEW test (n1, n2) AS SELECT * FROM NUMBERS";
2732 let result =
2733 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2734 .unwrap();
2735
2736 match &result[0] {
2737 Statement::CreateView(c) => {
2738 assert_eq!(c.to_string(), sql);
2739 assert!(!c.or_replace);
2740 assert!(!c.if_not_exists);
2741 assert_eq!("test", c.name.to_string());
2742 }
2743 _ => unreachable!(),
2744 }
2745 assert_eq!(sql, result[0].to_string());
2746
2747 let sql = "CREATE VIEW test (n1 AS select * from demo";
2749 let result =
2750 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2751 assert!(result.is_err());
2752
2753 let sql = "CREATE VIEW test (n1, AS select * from demo";
2754 let result =
2755 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2756 assert!(result.is_err());
2757
2758 let sql = "CREATE VIEW test n1,n2) AS select * from demo";
2759 let result =
2760 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2761 assert!(result.is_err());
2762
2763 let sql = "CREATE VIEW test (1) AS select * from demo";
2764 let result =
2765 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2766 assert!(result.is_err());
2767
2768 let sql = "CREATE VIEW test (n1, select) AS select * from demo";
2770 let result =
2771 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2772 assert!(result.is_err());
2773 }
2774
2775 #[test]
2776 fn test_parse_column_extensions_vector() {
2777 let sql = "";
2779 let dialect = GenericDialect {};
2780 let mut tokenizer = Tokenizer::new(&dialect, sql);
2781 let tokens = tokenizer.tokenize().unwrap();
2782 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2783 let name = Ident::new("vec_col");
2784 let data_type =
2785 DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
2786 let mut extensions = ColumnExtensions::default();
2787
2788 let result =
2789 ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2790 assert!(result.is_ok());
2791 assert!(extensions.vector_options.is_some());
2792 let vector_options = extensions.vector_options.unwrap();
2793 assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some("128"));
2794 }
2795
2796 #[test]
2797 fn test_parse_column_extensions_vector_invalid() {
2798 let sql = "";
2800 let dialect = GenericDialect {};
2801 let mut tokenizer = Tokenizer::new(&dialect, sql);
2802 let tokens = tokenizer.tokenize().unwrap();
2803 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2804 let name = Ident::new("vec_col");
2805 let data_type = DataType::Custom(vec![Ident::new("VECTOR")].into(), vec![]);
2806 let mut extensions = ColumnExtensions::default();
2807
2808 let result =
2809 ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2810 assert!(result.is_err());
2811 }
2812
2813 #[test]
2814 fn test_parse_column_extensions_indices() {
2815 {
2817 let sql = "SKIPPING INDEX";
2818 let dialect = GenericDialect {};
2819 let mut tokenizer = Tokenizer::new(&dialect, sql);
2820 let tokens = tokenizer.tokenize().unwrap();
2821 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2822 let name = Ident::new("col");
2823 let data_type = DataType::String(None);
2824 let mut extensions = ColumnExtensions::default();
2825 let result = ParserContext::parse_column_extensions(
2826 &mut parser,
2827 &name,
2828 &data_type,
2829 &mut extensions,
2830 );
2831 assert!(result.is_ok());
2832 assert!(extensions.skipping_index_options.is_some());
2833 }
2834
2835 {
2837 let sql = "FULLTEXT INDEX WITH (analyzer = 'English', case_sensitive = 'true')";
2838 let dialect = GenericDialect {};
2839 let mut tokenizer = Tokenizer::new(&dialect, sql);
2840 let tokens = tokenizer.tokenize().unwrap();
2841 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2842 let name = Ident::new("text_col");
2843 let data_type = DataType::String(None);
2844 let mut extensions = ColumnExtensions::default();
2845 let result = ParserContext::parse_column_extensions(
2846 &mut parser,
2847 &name,
2848 &data_type,
2849 &mut extensions,
2850 );
2851 assert!(result.unwrap());
2852 assert!(extensions.fulltext_index_options.is_some());
2853 let fulltext_options = extensions.fulltext_index_options.unwrap();
2854 assert_eq!(fulltext_options.get("analyzer"), Some("English"));
2855 assert_eq!(fulltext_options.get("case_sensitive"), Some("true"));
2856 }
2857
2858 {
2860 let sql = "FULLTEXT INDEX WITH (analyzer = 'English')";
2861 let dialect = GenericDialect {};
2862 let mut tokenizer = Tokenizer::new(&dialect, sql);
2863 let tokens = tokenizer.tokenize().unwrap();
2864 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2865 let name = Ident::new("num_col");
2866 let data_type = DataType::Int(None); let mut extensions = ColumnExtensions::default();
2868 let result = ParserContext::parse_column_extensions(
2869 &mut parser,
2870 &name,
2871 &data_type,
2872 &mut extensions,
2873 );
2874 assert!(result.is_err());
2875 assert!(
2876 result
2877 .unwrap_err()
2878 .to_string()
2879 .contains("FULLTEXT index only supports string type")
2880 );
2881 }
2882
2883 {
2885 let sql = "FULLTEXT INDEX WITH (analyzer = 'Invalid', case_sensitive = 'true')";
2886 let dialect = GenericDialect {};
2887 let mut tokenizer = Tokenizer::new(&dialect, sql);
2888 let tokens = tokenizer.tokenize().unwrap();
2889 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2890 let name = Ident::new("text_col");
2891 let data_type = DataType::String(None);
2892 let mut extensions = ColumnExtensions::default();
2893 let result = ParserContext::parse_column_extensions(
2894 &mut parser,
2895 &name,
2896 &data_type,
2897 &mut extensions,
2898 );
2899 assert!(result.unwrap());
2900 }
2901
2902 {
2904 let sql = "INVERTED INDEX";
2905 let dialect = GenericDialect {};
2906 let mut tokenizer = Tokenizer::new(&dialect, sql);
2907 let tokens = tokenizer.tokenize().unwrap();
2908 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2909 let name = Ident::new("col");
2910 let data_type = DataType::String(None);
2911 let mut extensions = ColumnExtensions::default();
2912 let result = ParserContext::parse_column_extensions(
2913 &mut parser,
2914 &name,
2915 &data_type,
2916 &mut extensions,
2917 );
2918 assert!(result.is_ok());
2919 assert!(extensions.inverted_index_options.is_some());
2920 }
2921
2922 {
2924 let sql = "INVERTED INDEX WITH (analyzer = 'English')";
2925 let dialect = GenericDialect {};
2926 let mut tokenizer = Tokenizer::new(&dialect, sql);
2927 let tokens = tokenizer.tokenize().unwrap();
2928 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2929 let name = Ident::new("col");
2930 let data_type = DataType::String(None);
2931 let mut extensions = ColumnExtensions::default();
2932 let result = ParserContext::parse_column_extensions(
2933 &mut parser,
2934 &name,
2935 &data_type,
2936 &mut extensions,
2937 );
2938 assert!(result.is_err());
2939 assert!(
2940 result
2941 .unwrap_err()
2942 .to_string()
2943 .contains("INVERTED index doesn't support options")
2944 );
2945 }
2946
2947 {
2949 let sql = "SKIPPING INDEX FULLTEXT INDEX";
2950 let dialect = GenericDialect {};
2951 let mut tokenizer = Tokenizer::new(&dialect, sql);
2952 let tokens = tokenizer.tokenize().unwrap();
2953 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2954 let name = Ident::new("col");
2955 let data_type = DataType::String(None);
2956 let mut extensions = ColumnExtensions::default();
2957 let result = ParserContext::parse_column_extensions(
2958 &mut parser,
2959 &name,
2960 &data_type,
2961 &mut extensions,
2962 );
2963 assert!(result.unwrap());
2964 assert!(extensions.skipping_index_options.is_some());
2965 assert!(extensions.fulltext_index_options.is_some());
2966 }
2967 }
2968
2969 #[test]
2970 fn test_parse_interval_cast() {
2971 let s = "select '10s'::INTERVAL";
2972 let stmts =
2973 ParserContext::create_with_dialect(s, &GreptimeDbDialect {}, ParseOptions::default())
2974 .unwrap();
2975 assert_eq!("SELECT '10 seconds'::INTERVAL", &stmts[0].to_string());
2976 }
2977
2978 #[test]
2979 fn test_parse_create_table_vector_index_options() {
2980 let sql = r"
2982CREATE TABLE vectors (
2983 ts TIMESTAMP TIME INDEX,
2984 vec VECTOR(128) VECTOR INDEX,
2985)";
2986 let result =
2987 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2988 .unwrap();
2989
2990 if let Statement::CreateTable(c) = &result[0] {
2991 c.columns.iter().for_each(|col| {
2992 if col.name().value == "vec" {
2993 assert!(
2994 col.extensions
2995 .vector_index_options
2996 .as_ref()
2997 .unwrap()
2998 .is_empty()
2999 );
3000 }
3001 });
3002 } else {
3003 panic!("should be create_table statement");
3004 }
3005
3006 let sql = r"
3008CREATE TABLE vectors (
3009 ts TIMESTAMP TIME INDEX,
3010 vec VECTOR(128) VECTOR INDEX WITH (metric='cosine', connectivity='32', expansion_add='256', expansion_search='128')
3011)";
3012 let result =
3013 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3014 .unwrap();
3015
3016 if let Statement::CreateTable(c) = &result[0] {
3017 c.columns.iter().for_each(|col| {
3018 if col.name().value == "vec" {
3019 let options = col.extensions.vector_index_options.as_ref().unwrap();
3020 assert_eq!(options.len(), 4);
3021 assert_eq!(options.get("metric").unwrap(), "cosine");
3022 assert_eq!(options.get("connectivity").unwrap(), "32");
3023 assert_eq!(options.get("expansion_add").unwrap(), "256");
3024 assert_eq!(options.get("expansion_search").unwrap(), "128");
3025 }
3026 });
3027 } else {
3028 panic!("should be create_table statement");
3029 }
3030 }
3031
3032 #[test]
3033 fn test_parse_create_table_vector_index_invalid_type() {
3034 let sql = r"
3036CREATE TABLE vectors (
3037 ts TIMESTAMP TIME INDEX,
3038 col INT VECTOR INDEX,
3039)";
3040 let result =
3041 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3042 assert!(result.is_err());
3043 assert!(
3044 result
3045 .unwrap_err()
3046 .to_string()
3047 .contains("VECTOR INDEX only supports Vector type columns")
3048 );
3049 }
3050
3051 #[test]
3052 fn test_parse_create_table_vector_index_duplicate() {
3053 let sql = r"
3055CREATE TABLE vectors (
3056 ts TIMESTAMP TIME INDEX,
3057 vec VECTOR(128) VECTOR INDEX VECTOR INDEX,
3058)";
3059 let result =
3060 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3061 assert!(result.is_err());
3062 assert!(
3063 result
3064 .unwrap_err()
3065 .to_string()
3066 .contains("duplicated VECTOR INDEX option")
3067 );
3068 }
3069
3070 #[test]
3071 fn test_parse_create_table_vector_index_invalid_option() {
3072 let sql = r"
3074CREATE TABLE vectors (
3075 ts TIMESTAMP TIME INDEX,
3076 vec VECTOR(128) VECTOR INDEX WITH (metric='l2sq', invalid_option='foo')
3077)";
3078 let result =
3079 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3080 assert!(result.is_err());
3081 assert!(
3082 result
3083 .unwrap_err()
3084 .to_string()
3085 .contains("invalid VECTOR INDEX option")
3086 );
3087 }
3088
3089 #[test]
3090 fn test_parse_column_extensions_vector_index() {
3091 {
3093 let sql = "VECTOR INDEX WITH (metric = 'l2sq')";
3094 let dialect = GenericDialect {};
3095 let mut tokenizer = Tokenizer::new(&dialect, sql);
3096 let tokens = tokenizer.tokenize().unwrap();
3097 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3098 let name = Ident::new("vec_col");
3099 let data_type =
3100 DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
3101 let mut extensions = ColumnExtensions {
3103 vector_options: Some(OptionMap::from([(
3104 VECTOR_OPT_DIM.to_string(),
3105 "128".to_string(),
3106 )])),
3107 ..Default::default()
3108 };
3109
3110 let result = ParserContext::parse_column_extensions(
3111 &mut parser,
3112 &name,
3113 &data_type,
3114 &mut extensions,
3115 );
3116 assert!(result.is_ok());
3117 assert!(extensions.vector_index_options.is_some());
3118 let vi_options = extensions.vector_index_options.unwrap();
3119 assert_eq!(vi_options.get("metric"), Some("l2sq"));
3120 }
3121
3122 {
3124 let sql = "VECTOR INDEX";
3125 let dialect = GenericDialect {};
3126 let mut tokenizer = Tokenizer::new(&dialect, sql);
3127 let tokens = tokenizer.tokenize().unwrap();
3128 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3129 let name = Ident::new("num_col");
3130 let data_type = DataType::Int(None); let mut extensions = ColumnExtensions::default();
3132 let result = ParserContext::parse_column_extensions(
3133 &mut parser,
3134 &name,
3135 &data_type,
3136 &mut extensions,
3137 );
3138 assert!(result.is_err());
3139 assert!(
3140 result
3141 .unwrap_err()
3142 .to_string()
3143 .contains("VECTOR INDEX only supports Vector type columns")
3144 );
3145 }
3146 }
3147}