1mod json;
16#[cfg(feature = "enterprise")]
17pub mod trigger;
18
19use std::collections::HashMap;
20
21use arrow_buffer::IntervalMonthDayNano;
22use common_catalog::consts::default_engine;
23use datafusion_common::ScalarValue;
24use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
25use datatypes::data_type::ConcreteDataType;
26use itertools::Itertools;
27use snafu::{OptionExt, ResultExt, ensure};
28use sqlparser::ast::{ColumnOption, ColumnOptionDef, DataType, Expr};
29use sqlparser::dialect::keywords::Keyword;
30use sqlparser::keywords::ALL_KEYWORDS;
31use sqlparser::parser::IsOptional::Mandatory;
32use sqlparser::parser::{Parser, ParserError};
33use sqlparser::tokenizer::{Token, TokenWithSpan, Word};
34use table::requests::validate_database_option;
35
36use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
37use crate::error::{
38 self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidIntervalSnafu,
39 InvalidSqlSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result, SyntaxSnafu,
40 UnexpectedSnafu, UnsupportedSnafu,
41};
42use crate::parser::{FLOW, ParserContext};
43use crate::parsers::tql_parser;
44use crate::parsers::utils::{
45 self, parse_with_options, validate_column_fulltext_create_option,
46 validate_column_skipping_index_create_option, validate_column_vector_index_create_option,
47};
48use crate::statements::create::{
49 Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
50 CreateTableLike, CreateView, Partitions, SqlOrTql, TableConstraint, VECTOR_OPT_DIM,
51};
52use crate::statements::statement::Statement;
53use crate::statements::transform::type_alias::get_data_type_by_alias_name;
54use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
55use crate::util::{OptionValue, location_to_index, parse_option_string};
56
57pub const ENGINE: &str = "ENGINE";
58pub const MAXVALUE: &str = "MAXVALUE";
59pub const SINK: &str = "SINK";
60pub const EXPIRE: &str = "EXPIRE";
61pub const AFTER: &str = "AFTER";
62pub const INVERTED: &str = "INVERTED";
63pub const SKIPPING: &str = "SKIPPING";
64pub const VECTOR: &str = "VECTOR";
65
66pub type RawIntervalExpr = String;
67
68impl<'a> ParserContext<'a> {
70 pub(crate) fn parse_create(&mut self) -> Result<Statement> {
71 match self.parser.peek_token().token {
72 Token::Word(w) => match w.keyword {
73 Keyword::TABLE => self.parse_create_table(),
74
75 Keyword::SCHEMA | Keyword::DATABASE => self.parse_create_database(),
76
77 Keyword::EXTERNAL => self.parse_create_external_table(),
78
79 Keyword::OR => {
80 let _ = self.parser.next_token();
81 self.parser
82 .expect_keyword(Keyword::REPLACE)
83 .context(SyntaxSnafu)?;
84 match self.parser.next_token().token {
85 Token::Word(w) => match w.keyword {
86 Keyword::VIEW => self.parse_create_view(true),
87 Keyword::NoKeyword => {
88 let uppercase = w.value.to_uppercase();
89 match uppercase.as_str() {
90 FLOW => self.parse_create_flow(true),
91 _ => self.unsupported(w.to_string()),
92 }
93 }
94 _ => self.unsupported(w.to_string()),
95 },
96 _ => self.unsupported(w.to_string()),
97 }
98 }
99
100 Keyword::VIEW => {
101 let _ = self.parser.next_token();
102 self.parse_create_view(false)
103 }
104
105 #[cfg(feature = "enterprise")]
106 Keyword::TRIGGER => {
107 let _ = self.parser.next_token();
108 self.parse_create_trigger()
109 }
110
111 Keyword::NoKeyword => {
112 let _ = self.parser.next_token();
113 let uppercase = w.value.to_uppercase();
114 match uppercase.as_str() {
115 FLOW => self.parse_create_flow(false),
116 _ => self.unsupported(w.to_string()),
117 }
118 }
119 _ => self.unsupported(w.to_string()),
120 },
121 unexpected => self.unsupported(unexpected.to_string()),
122 }
123 }
124
125 fn parse_create_view(&mut self, or_replace: bool) -> Result<Statement> {
127 let if_not_exists = self.parse_if_not_exist()?;
128 let view_name = self.intern_parse_table_name()?;
129
130 let columns = self.parse_view_columns()?;
131
132 self.parser
133 .expect_keyword(Keyword::AS)
134 .context(SyntaxSnafu)?;
135
136 let query = self.parse_query()?;
137
138 Ok(Statement::CreateView(CreateView {
139 name: view_name,
140 columns,
141 or_replace,
142 query: Box::new(query),
143 if_not_exists,
144 }))
145 }
146
147 fn parse_view_columns(&mut self) -> Result<Vec<Ident>> {
148 let mut columns = vec![];
149 if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
150 return Ok(columns);
151 }
152
153 loop {
154 let name = self.parse_column_name().context(SyntaxSnafu)?;
155
156 columns.push(name);
157
158 let comma = self.parser.consume_token(&Token::Comma);
159 if self.parser.consume_token(&Token::RParen) {
160 break;
162 } else if !comma {
163 return self.expected("',' or ')' after column name", self.parser.peek_token());
164 }
165 }
166
167 Ok(columns)
168 }
169
170 fn parse_create_external_table(&mut self) -> Result<Statement> {
171 let _ = self.parser.next_token();
172 self.parser
173 .expect_keyword(Keyword::TABLE)
174 .context(SyntaxSnafu)?;
175 let if_not_exists = self.parse_if_not_exist()?;
176 let table_name = self.intern_parse_table_name()?;
177 let (columns, constraints) = self.parse_columns()?;
178 if !columns.is_empty() {
179 validate_time_index(&columns, &constraints)?;
180 }
181
182 let engine = self.parse_table_engine(common_catalog::consts::FILE_ENGINE)?;
183 let options = self.parse_create_table_options()?;
184 Ok(Statement::CreateExternalTable(CreateExternalTable {
185 name: table_name,
186 columns,
187 constraints,
188 options,
189 if_not_exists,
190 engine,
191 }))
192 }
193
194 fn parse_create_database(&mut self) -> Result<Statement> {
195 let _ = self.parser.next_token();
196 let if_not_exists = self.parse_if_not_exist()?;
197 let database_name = self.parse_object_name().context(error::UnexpectedSnafu {
198 expected: "a database name",
199 actual: self.peek_token_as_string(),
200 })?;
201 let database_name = Self::canonicalize_object_name(database_name)?;
202
203 let options = self
204 .parser
205 .parse_options(Keyword::WITH)
206 .context(SyntaxSnafu)?
207 .into_iter()
208 .map(parse_option_string)
209 .collect::<Result<HashMap<String, OptionValue>>>()?;
210
211 for key in options.keys() {
212 ensure!(
213 validate_database_option(key),
214 InvalidDatabaseOptionSnafu { key: key.clone() }
215 );
216 }
217 if let Some(append_mode) = options.get("append_mode").and_then(|x| x.as_string())
218 && append_mode == "true"
219 && options.contains_key("merge_mode")
220 {
221 return InvalidDatabaseOptionSnafu {
222 key: "merge_mode".to_string(),
223 }
224 .fail();
225 }
226
227 Ok(Statement::CreateDatabase(CreateDatabase {
228 name: database_name,
229 if_not_exists,
230 options: OptionMap::new(options),
231 }))
232 }
233
234 fn parse_create_table(&mut self) -> Result<Statement> {
235 let _ = self.parser.next_token();
236
237 let if_not_exists = self.parse_if_not_exist()?;
238
239 let table_name = self.intern_parse_table_name()?;
240
241 if self.parser.parse_keyword(Keyword::LIKE) {
242 let source_name = self.intern_parse_table_name()?;
243
244 return Ok(Statement::CreateTableLike(CreateTableLike {
245 table_name,
246 source_name,
247 }));
248 }
249
250 let (columns, constraints) = self.parse_columns()?;
251 validate_time_index(&columns, &constraints)?;
252
253 let partitions = self.parse_partitions()?;
254 if let Some(partitions) = &partitions {
255 validate_partitions(&columns, partitions)?;
256 }
257
258 let engine = self.parse_table_engine(default_engine())?;
259 let options = self.parse_create_table_options()?;
260 let create_table = CreateTable {
261 if_not_exists,
262 name: table_name,
263 columns,
264 engine,
265 constraints,
266 options,
267 table_id: 0, partitions,
269 };
270
271 Ok(Statement::CreateTable(create_table))
272 }
273
274 fn parse_create_flow(&mut self, or_replace: bool) -> Result<Statement> {
276 let if_not_exists = self.parse_if_not_exist()?;
277
278 let flow_name = self.intern_parse_table_name()?;
279
280 if let Token::Word(word) = self.parser.peek_token().token
282 && word.value.eq_ignore_ascii_case(SINK)
283 {
284 self.parser.next_token();
285 } else {
286 Err(ParserError::ParserError(
287 "Expect `SINK` keyword".to_string(),
288 ))
289 .context(SyntaxSnafu)?
290 }
291 self.parser
292 .expect_keyword(Keyword::TO)
293 .context(SyntaxSnafu)?;
294
295 let output_table_name = self.intern_parse_table_name()?;
296
297 let expire_after = if let Token::Word(w1) = &self.parser.peek_token().token
298 && w1.value.eq_ignore_ascii_case(EXPIRE)
299 {
300 self.parser.next_token();
301 if let Token::Word(w2) = &self.parser.peek_token().token
302 && w2.value.eq_ignore_ascii_case(AFTER)
303 {
304 self.parser.next_token();
305 Some(self.parse_interval_no_month("EXPIRE AFTER")?)
306 } else {
307 None
308 }
309 } else {
310 None
311 };
312
313 let eval_interval = if self
314 .parser
315 .consume_tokens(&[Token::make_keyword("EVAL"), Token::make_keyword("INTERVAL")])
316 {
317 Some(self.parse_interval_no_month("EVAL INTERVAL")?)
318 } else {
319 None
320 };
321
322 let comment = if self.parser.parse_keyword(Keyword::COMMENT) {
323 match self.parser.next_token() {
324 TokenWithSpan {
325 token: Token::SingleQuotedString(value, ..),
326 ..
327 } => Some(value),
328 unexpected => {
329 return self
330 .parser
331 .expected("string", unexpected)
332 .context(SyntaxSnafu);
333 }
334 }
335 } else {
336 None
337 };
338
339 self.parser
340 .expect_keyword(Keyword::AS)
341 .context(SyntaxSnafu)?;
342
343 let query = Box::new(self.parse_sql_or_tql(true)?);
344
345 Ok(Statement::CreateFlow(CreateFlow {
346 flow_name,
347 sink_table_name: output_table_name,
348 or_replace,
349 if_not_exists,
350 expire_after,
351 eval_interval,
352 comment,
353 query,
354 }))
355 }
356
357 fn parse_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
358 let start_loc = self.parser.peek_token().span.start;
359 let start_index = location_to_index(self.sql, &start_loc);
360
361 let query = match self.parser.peek_token().token {
363 Token::Word(w) => match w.keyword {
364 Keyword::SELECT => self.parse_query(),
365 Keyword::NoKeyword
366 if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
367 {
368 self.parse_tql(require_now_expr)
369 }
370
371 _ => self.unsupported(self.peek_token_as_string()),
372 },
373 _ => self.unsupported(self.peek_token_as_string()),
374 }?;
375
376 let end_token = self.parser.peek_token();
377
378 let raw_query = if end_token == Token::EOF {
379 &self.sql[start_index..]
380 } else {
381 let end_loc = end_token.span.end;
382 let end_index = location_to_index(self.sql, &end_loc);
383 &self.sql[start_index..end_index.min(self.sql.len())]
384 };
385 let raw_query = raw_query.trim_end_matches(";");
386 let query = SqlOrTql::try_from_statement(query, raw_query)?;
387 Ok(query)
388 }
389
390 fn parse_interval_no_month(&mut self, context: &str) -> Result<i64> {
392 let interval = self.parse_interval_month_day_nano()?.0;
393 if interval.months != 0 {
394 return InvalidIntervalSnafu {
395 reason: format!("Interval with months is not allowed in {context}"),
396 }
397 .fail();
398 }
399 Ok(
400 interval.nanoseconds / 1_000_000_000
401 + interval.days as i64 * 60 * 60 * 24
402 + interval.months as i64 * 60 * 60 * 24 * 3044 / 1000, )
406 }
407
408 fn parse_interval_month_day_nano(&mut self) -> Result<(IntervalMonthDayNano, RawIntervalExpr)> {
410 let interval_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
411 let raw_interval_expr = interval_expr.to_string();
412 let interval = utils::parser_expr_to_scalar_value_literal(interval_expr.clone(), false)?
413 .cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
414 .ok()
415 .with_context(|| InvalidIntervalSnafu {
416 reason: format!("cannot cast {} to interval type", interval_expr),
417 })?;
418 if let ScalarValue::IntervalMonthDayNano(Some(interval)) = interval {
419 Ok((interval, raw_interval_expr))
420 } else {
421 unreachable!()
422 }
423 }
424
425 fn parse_if_not_exist(&mut self) -> Result<bool> {
426 match self.parser.peek_token().token {
427 Token::Word(w) if Keyword::IF != w.keyword => return Ok(false),
428 _ => {}
429 }
430
431 if self.parser.parse_keywords(&[Keyword::IF, Keyword::NOT]) {
432 return self
433 .parser
434 .expect_keyword(Keyword::EXISTS)
435 .map(|_| true)
436 .context(UnexpectedSnafu {
437 expected: "EXISTS",
438 actual: self.peek_token_as_string(),
439 });
440 }
441
442 if self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]) {
443 return UnsupportedSnafu { keyword: "EXISTS" }.fail();
444 }
445
446 Ok(false)
447 }
448
449 fn parse_create_table_options(&mut self) -> Result<OptionMap> {
450 parse_with_options(&mut self.parser)
451 }
452
453 fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
455 if !self.parser.parse_keyword(Keyword::PARTITION) {
456 return Ok(None);
457 }
458 self.parser
459 .expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
460 .context(error::UnexpectedSnafu {
461 expected: "ON, COLUMNS",
462 actual: self.peek_token_as_string(),
463 })?;
464
465 let raw_column_list = self
466 .parser
467 .parse_parenthesized_column_list(Mandatory, false)
468 .context(error::SyntaxSnafu)?;
469 let column_list = raw_column_list
470 .into_iter()
471 .map(Self::canonicalize_identifier)
472 .collect();
473
474 let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
475
476 Ok(Some(Partitions { column_list, exprs }))
477 }
478
479 fn parse_partition_entry(&mut self) -> Result<Expr> {
480 self.parser.parse_expr().context(error::SyntaxSnafu)
481 }
482
483 fn parse_comma_separated<T, F>(&mut self, mut f: F) -> Result<Vec<T>>
485 where
486 F: FnMut(&mut ParserContext<'a>) -> Result<T>,
487 {
488 self.parser
489 .expect_token(&Token::LParen)
490 .context(error::UnexpectedSnafu {
491 expected: "(",
492 actual: self.peek_token_as_string(),
493 })?;
494
495 let mut values = vec![];
496 while self.parser.peek_token() != Token::RParen {
497 values.push(f(self)?);
498 if !self.parser.consume_token(&Token::Comma) {
499 break;
500 }
501 }
502
503 self.parser
504 .expect_token(&Token::RParen)
505 .context(error::UnexpectedSnafu {
506 expected: ")",
507 actual: self.peek_token_as_string(),
508 })?;
509
510 Ok(values)
511 }
512
513 fn parse_columns(&mut self) -> Result<(Vec<Column>, Vec<TableConstraint>)> {
515 let mut columns = vec![];
516 let mut constraints = vec![];
517 if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) {
518 return Ok((columns, constraints));
519 }
520
521 loop {
522 if let Some(constraint) = self.parse_optional_table_constraint()? {
523 constraints.push(constraint);
524 } else if let Token::Word(_) = self.parser.peek_token().token {
525 self.parse_column(&mut columns, &mut constraints)?;
526 } else {
527 return self.expected(
528 "column name or constraint definition",
529 self.parser.peek_token(),
530 );
531 }
532 let comma = self.parser.consume_token(&Token::Comma);
533 if self.parser.consume_token(&Token::RParen) {
534 break;
536 } else if !comma {
537 return self.expected(
538 "',' or ')' after column definition",
539 self.parser.peek_token(),
540 );
541 }
542 }
543
544 Ok((columns, constraints))
545 }
546
547 fn parse_column(
548 &mut self,
549 columns: &mut Vec<Column>,
550 constraints: &mut Vec<TableConstraint>,
551 ) -> Result<()> {
552 let mut column = self.parse_column_def()?;
553
554 let mut time_index_opt_idx = None;
555 for (index, opt) in column.options().iter().enumerate() {
556 if let ColumnOption::DialectSpecific(tokens) = &opt.option
557 && matches!(
558 &tokens[..],
559 [
560 Token::Word(Word {
561 keyword: Keyword::TIME,
562 ..
563 }),
564 Token::Word(Word {
565 keyword: Keyword::INDEX,
566 ..
567 })
568 ]
569 )
570 {
571 ensure!(
572 time_index_opt_idx.is_none(),
573 InvalidColumnOptionSnafu {
574 name: column.name().to_string(),
575 msg: "duplicated time index",
576 }
577 );
578 time_index_opt_idx = Some(index);
579
580 let constraint = TableConstraint::TimeIndex {
581 column: Ident::new(column.name().value.clone()),
582 };
583 constraints.push(constraint);
584 }
585 }
586
587 if let Some(index) = time_index_opt_idx {
588 ensure!(
589 !column.options().contains(&ColumnOptionDef {
590 option: ColumnOption::Null,
591 name: None,
592 }),
593 InvalidColumnOptionSnafu {
594 name: column.name().to_string(),
595 msg: "time index column can't be null",
596 }
597 );
598
599 let data_type = get_unalias_type(column.data_type());
601 ensure!(
602 matches!(data_type, DataType::Timestamp(_, _)),
603 InvalidColumnOptionSnafu {
604 name: column.name().to_string(),
605 msg: "time index column data type should be timestamp",
606 }
607 );
608
609 let not_null_opt = ColumnOptionDef {
610 option: ColumnOption::NotNull,
611 name: None,
612 };
613
614 if !column.options().contains(¬_null_opt) {
615 column.mut_options().push(not_null_opt);
616 }
617
618 let _ = column.mut_options().remove(index);
619 }
620
621 columns.push(column);
622
623 Ok(())
624 }
625
626 fn parse_column_name(&mut self) -> std::result::Result<Ident, ParserError> {
628 let name = self.parser.parse_identifier()?;
629 if name.quote_style.is_none() &&
630 ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()
632 {
633 return Err(ParserError::ParserError(format!(
634 "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
635 &name.value
636 )));
637 }
638
639 Ok(name)
640 }
641
642 pub fn parse_column_def(&mut self) -> Result<Column> {
643 let name = self.parse_column_name().context(SyntaxSnafu)?;
644 let parser = &mut self.parser;
645
646 ensure!(
647 !(name.quote_style.is_none() &&
648 ALL_KEYWORDS.binary_search(&name.value.to_uppercase().as_str()).is_ok()),
650 InvalidSqlSnafu {
651 msg: format!(
652 "Cannot use keyword '{}' as column name. Hint: add quotes to the name.",
653 &name.value
654 ),
655 }
656 );
657
658 let mut extensions = ColumnExtensions::default();
659
660 let data_type = parser.parse_data_type().context(SyntaxSnafu)?;
661 if matches!(data_type, DataType::JSON) {
664 extensions.json_datatype_options = json::parse_json_datatype_options(parser)?;
665 }
666
667 let mut options = vec![];
668 loop {
669 if parser.parse_keyword(Keyword::CONSTRAINT) {
670 let name = Some(parser.parse_identifier().context(SyntaxSnafu)?);
671 if let Some(option) = Self::parse_optional_column_option(parser)? {
672 options.push(ColumnOptionDef { name, option });
673 } else {
674 return parser
675 .expected(
676 "constraint details after CONSTRAINT <name>",
677 parser.peek_token(),
678 )
679 .context(SyntaxSnafu);
680 }
681 } else if let Some(option) = Self::parse_optional_column_option(parser)? {
682 options.push(ColumnOptionDef { name: None, option });
683 } else if !Self::parse_column_extensions(parser, &name, &data_type, &mut extensions)? {
684 break;
685 };
686 }
687
688 Ok(Column {
689 column_def: ColumnDef {
690 name: Self::canonicalize_identifier(name),
691 data_type,
692 options,
693 },
694 extensions,
695 })
696 }
697
698 fn parse_optional_column_option(parser: &mut Parser<'_>) -> Result<Option<ColumnOption>> {
699 if parser.parse_keywords(&[Keyword::CHARACTER, Keyword::SET]) {
700 Ok(Some(ColumnOption::CharacterSet(
701 parser.parse_object_name(false).context(SyntaxSnafu)?,
702 )))
703 } else if parser.parse_keywords(&[Keyword::NOT, Keyword::NULL]) {
704 Ok(Some(ColumnOption::NotNull))
705 } else if parser.parse_keywords(&[Keyword::COMMENT]) {
706 match parser.next_token() {
707 TokenWithSpan {
708 token: Token::SingleQuotedString(value, ..),
709 ..
710 } => Ok(Some(ColumnOption::Comment(value))),
711 unexpected => parser.expected("string", unexpected).context(SyntaxSnafu),
712 }
713 } else if parser.parse_keyword(Keyword::NULL) {
714 Ok(Some(ColumnOption::Null))
715 } else if parser.parse_keyword(Keyword::DEFAULT) {
716 Ok(Some(ColumnOption::Default(
717 parser.parse_expr().context(SyntaxSnafu)?,
718 )))
719 } else if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
720 Ok(Some(ColumnOption::Unique {
721 is_primary: true,
722 characteristics: None,
723 }))
724 } else if parser.parse_keyword(Keyword::UNIQUE) {
725 Ok(Some(ColumnOption::Unique {
726 is_primary: false,
727 characteristics: None,
728 }))
729 } else if parser.parse_keywords(&[Keyword::TIME, Keyword::INDEX]) {
730 Ok(Some(ColumnOption::DialectSpecific(vec![
732 Token::Word(Word {
733 value: "TIME".to_string(),
734 quote_style: None,
735 keyword: Keyword::TIME,
736 }),
737 Token::Word(Word {
738 value: "INDEX".to_string(),
739 quote_style: None,
740 keyword: Keyword::INDEX,
741 }),
742 ])))
743 } else {
744 Ok(None)
745 }
746 }
747
748 fn parse_column_extensions(
754 parser: &mut Parser<'_>,
755 column_name: &Ident,
756 column_type: &DataType,
757 column_extensions: &mut ColumnExtensions,
758 ) -> Result<bool> {
759 if let DataType::Custom(name, tokens) = column_type
760 && name.0.len() == 1
761 && &name.0[0].to_string_unquoted().to_uppercase() == "VECTOR"
762 {
763 ensure!(
764 tokens.len() == 1,
765 InvalidColumnOptionSnafu {
766 name: column_name.to_string(),
767 msg: "VECTOR type should have dimension",
768 }
769 );
770
771 let dimension =
772 tokens[0]
773 .parse::<u32>()
774 .ok()
775 .with_context(|| InvalidColumnOptionSnafu {
776 name: column_name.to_string(),
777 msg: "dimension should be a positive integer",
778 })?;
779
780 let options = OptionMap::from([(VECTOR_OPT_DIM.to_string(), dimension.to_string())]);
781 column_extensions.vector_options = Some(options);
782 }
783
784 let mut is_index_declared = false;
786
787 if let Token::Word(word) = parser.peek_token().token
789 && word.value.eq_ignore_ascii_case(SKIPPING)
790 {
791 parser.next_token();
792 ensure!(
794 parser.parse_keyword(Keyword::INDEX),
795 InvalidColumnOptionSnafu {
796 name: column_name.to_string(),
797 msg: "expect INDEX after SKIPPING keyword",
798 }
799 );
800 ensure!(
801 column_extensions.skipping_index_options.is_none(),
802 InvalidColumnOptionSnafu {
803 name: column_name.to_string(),
804 msg: "duplicated SKIPPING index option",
805 }
806 );
807
808 let options = parser
809 .parse_options(Keyword::WITH)
810 .context(error::SyntaxSnafu)?
811 .into_iter()
812 .map(parse_option_string)
813 .collect::<Result<Vec<_>>>()?;
814
815 for (key, _) in options.iter() {
816 ensure!(
817 validate_column_skipping_index_create_option(key),
818 InvalidColumnOptionSnafu {
819 name: column_name.to_string(),
820 msg: format!("invalid SKIPPING INDEX option: {key}"),
821 }
822 );
823 }
824
825 let options = OptionMap::new(options);
826 column_extensions.skipping_index_options = Some(options);
827 is_index_declared |= true;
828 }
829
830 if parser.parse_keyword(Keyword::FULLTEXT) {
832 ensure!(
834 parser.parse_keyword(Keyword::INDEX),
835 InvalidColumnOptionSnafu {
836 name: column_name.to_string(),
837 msg: "expect INDEX after FULLTEXT keyword",
838 }
839 );
840
841 ensure!(
842 column_extensions.fulltext_index_options.is_none(),
843 InvalidColumnOptionSnafu {
844 name: column_name.to_string(),
845 msg: "duplicated FULLTEXT INDEX option",
846 }
847 );
848
849 let column_type = get_unalias_type(column_type);
850 let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
851 ensure!(
852 data_type == ConcreteDataType::string_datatype(),
853 InvalidColumnOptionSnafu {
854 name: column_name.to_string(),
855 msg: "FULLTEXT index only supports string type",
856 }
857 );
858
859 let options = parser
860 .parse_options(Keyword::WITH)
861 .context(error::SyntaxSnafu)?
862 .into_iter()
863 .map(parse_option_string)
864 .collect::<Result<Vec<_>>>()?;
865
866 for (key, _) in options.iter() {
867 ensure!(
868 validate_column_fulltext_create_option(key),
869 InvalidColumnOptionSnafu {
870 name: column_name.to_string(),
871 msg: format!("invalid FULLTEXT INDEX option: {key}"),
872 }
873 );
874 }
875
876 let options = OptionMap::new(options);
877 column_extensions.fulltext_index_options = Some(options);
878 is_index_declared |= true;
879 }
880
881 if let Token::Word(word) = parser.peek_token().token
883 && word.value.eq_ignore_ascii_case(INVERTED)
884 {
885 parser.next_token();
886 ensure!(
888 parser.parse_keyword(Keyword::INDEX),
889 InvalidColumnOptionSnafu {
890 name: column_name.to_string(),
891 msg: "expect INDEX after INVERTED keyword",
892 }
893 );
894
895 ensure!(
896 column_extensions.inverted_index_options.is_none(),
897 InvalidColumnOptionSnafu {
898 name: column_name.to_string(),
899 msg: "duplicated INVERTED index option",
900 }
901 );
902
903 let with_token = parser.peek_token();
906 ensure!(
907 with_token.token
908 != Token::Word(Word {
909 value: "WITH".to_string(),
910 keyword: Keyword::WITH,
911 quote_style: None,
912 }),
913 InvalidColumnOptionSnafu {
914 name: column_name.to_string(),
915 msg: "INVERTED index doesn't support options",
916 }
917 );
918
919 column_extensions.inverted_index_options = Some(OptionMap::default());
920 is_index_declared |= true;
921 }
922
923 if let Token::Word(word) = parser.peek_token().token
925 && word.value.eq_ignore_ascii_case(VECTOR)
926 {
927 parser.next_token();
928 ensure!(
930 parser.parse_keyword(Keyword::INDEX),
931 InvalidColumnOptionSnafu {
932 name: column_name.to_string(),
933 msg: "expect INDEX after VECTOR keyword",
934 }
935 );
936
937 ensure!(
938 column_extensions.vector_index_options.is_none(),
939 InvalidColumnOptionSnafu {
940 name: column_name.to_string(),
941 msg: "duplicated VECTOR INDEX option",
942 }
943 );
944
945 let column_type = get_unalias_type(column_type);
947 let data_type = sql_data_type_to_concrete_data_type(&column_type, column_extensions)?;
948 ensure!(
949 matches!(data_type, ConcreteDataType::Vector(_)),
950 InvalidColumnOptionSnafu {
951 name: column_name.to_string(),
952 msg: "VECTOR INDEX only supports Vector type columns",
953 }
954 );
955
956 let options = parser
957 .parse_options(Keyword::WITH)
958 .context(error::SyntaxSnafu)?
959 .into_iter()
960 .map(parse_option_string)
961 .collect::<Result<Vec<_>>>()?;
962
963 for (key, _) in options.iter() {
964 ensure!(
965 validate_column_vector_index_create_option(key),
966 InvalidColumnOptionSnafu {
967 name: column_name.to_string(),
968 msg: format!("invalid VECTOR INDEX option: {key}"),
969 }
970 );
971 }
972
973 let options = OptionMap::new(options);
974 column_extensions.vector_index_options = Some(options);
975 is_index_declared |= true;
976 }
977
978 Ok(is_index_declared)
979 }
980
981 fn parse_optional_table_constraint(&mut self) -> Result<Option<TableConstraint>> {
982 match self.parser.next_token() {
983 TokenWithSpan {
984 token: Token::Word(w),
985 ..
986 } if w.keyword == Keyword::PRIMARY => {
987 self.parser
988 .expect_keyword(Keyword::KEY)
989 .context(error::UnexpectedSnafu {
990 expected: "KEY",
991 actual: self.peek_token_as_string(),
992 })?;
993 let raw_columns = self
994 .parser
995 .parse_parenthesized_column_list(Mandatory, false)
996 .context(error::SyntaxSnafu)?;
997 let columns = raw_columns
998 .into_iter()
999 .map(Self::canonicalize_identifier)
1000 .collect();
1001 Ok(Some(TableConstraint::PrimaryKey { columns }))
1002 }
1003 TokenWithSpan {
1004 token: Token::Word(w),
1005 ..
1006 } if w.keyword == Keyword::TIME => {
1007 self.parser
1008 .expect_keyword(Keyword::INDEX)
1009 .context(error::UnexpectedSnafu {
1010 expected: "INDEX",
1011 actual: self.peek_token_as_string(),
1012 })?;
1013
1014 let raw_columns = self
1015 .parser
1016 .parse_parenthesized_column_list(Mandatory, false)
1017 .context(error::SyntaxSnafu)?;
1018 let mut columns = raw_columns
1019 .into_iter()
1020 .map(Self::canonicalize_identifier)
1021 .collect::<Vec<_>>();
1022
1023 ensure!(
1024 columns.len() == 1,
1025 InvalidTimeIndexSnafu {
1026 msg: "it should contain only one column in time index",
1027 }
1028 );
1029
1030 Ok(Some(TableConstraint::TimeIndex {
1031 column: columns.pop().unwrap(),
1032 }))
1033 }
1034 _ => {
1035 self.parser.prev_token();
1036 Ok(None)
1037 }
1038 }
1039 }
1040
1041 fn parse_table_engine(&mut self, default: &str) -> Result<String> {
1043 if !self.consume_token(ENGINE) {
1044 return Ok(default.to_string());
1045 }
1046
1047 self.parser
1048 .expect_token(&Token::Eq)
1049 .context(error::UnexpectedSnafu {
1050 expected: "=",
1051 actual: self.peek_token_as_string(),
1052 })?;
1053
1054 let token = self.parser.next_token();
1055 if let Token::Word(w) = token.token {
1056 Ok(w.value)
1057 } else {
1058 self.expected("'Engine' is missing", token)
1059 }
1060 }
1061}
1062
1063fn validate_time_index(columns: &[Column], constraints: &[TableConstraint]) -> Result<()> {
1064 let time_index_constraints: Vec<_> = constraints
1065 .iter()
1066 .filter_map(|c| match c {
1067 TableConstraint::TimeIndex { column } => Some(column),
1068 _ => None,
1069 })
1070 .unique()
1071 .collect();
1072
1073 ensure!(!time_index_constraints.is_empty(), MissingTimeIndexSnafu);
1074 ensure!(
1075 time_index_constraints.len() == 1,
1076 InvalidTimeIndexSnafu {
1077 msg: format!(
1078 "expected only one time index constraint but actual {}",
1079 time_index_constraints.len()
1080 ),
1081 }
1082 );
1083
1084 let time_index_column_ident = &time_index_constraints[0];
1087 let time_index_column = columns
1088 .iter()
1089 .find(|c| c.name().value == *time_index_column_ident.value)
1090 .with_context(|| InvalidTimeIndexSnafu {
1091 msg: format!(
1092 "time index column {} not found in columns",
1093 time_index_column_ident
1094 ),
1095 })?;
1096
1097 let time_index_data_type = get_unalias_type(time_index_column.data_type());
1098 ensure!(
1099 matches!(time_index_data_type, DataType::Timestamp(_, _)),
1100 InvalidColumnOptionSnafu {
1101 name: time_index_column.name().to_string(),
1102 msg: "time index column data type should be timestamp",
1103 }
1104 );
1105
1106 Ok(())
1107}
1108
1109fn get_unalias_type(data_type: &DataType) -> DataType {
1110 match data_type {
1111 DataType::Custom(name, tokens) if name.0.len() == 1 && tokens.is_empty() => {
1112 if let Some(real_type) =
1113 get_data_type_by_alias_name(name.0[0].to_string_unquoted().as_str())
1114 {
1115 real_type
1116 } else {
1117 data_type.clone()
1118 }
1119 }
1120 _ => data_type.clone(),
1121 }
1122}
1123
1124fn validate_partitions(columns: &[Column], partitions: &Partitions) -> Result<()> {
1125 let partition_columns = ensure_partition_columns_defined(columns, partitions)?;
1126
1127 ensure_exprs_are_binary(&partitions.exprs, &partition_columns)?;
1128
1129 Ok(())
1130}
1131
1132fn ensure_exprs_are_binary(exprs: &[Expr], columns: &[&Column]) -> Result<()> {
1134 for expr in exprs {
1135 if let Expr::BinaryOp { left, op: _, right } = expr {
1137 ensure_one_expr(left, columns)?;
1138 ensure_one_expr(right, columns)?;
1139 } else {
1140 return error::InvalidSqlSnafu {
1141 msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1142 }
1143 .fail();
1144 }
1145 }
1146 Ok(())
1147}
1148
1149fn ensure_one_expr(expr: &Expr, columns: &[&Column]) -> Result<()> {
1153 match expr {
1154 Expr::BinaryOp { left, op: _, right } => {
1155 ensure_one_expr(left, columns)?;
1156 ensure_one_expr(right, columns)?;
1157 Ok(())
1158 }
1159 Expr::Identifier(ident) => {
1160 let column_name = &ident.value;
1161 ensure!(
1162 columns.iter().any(|c| &c.name().value == column_name),
1163 error::InvalidSqlSnafu {
1164 msg: format!(
1165 "Column {:?} in rule expr is not referenced in PARTITION ON",
1166 column_name
1167 ),
1168 }
1169 );
1170 Ok(())
1171 }
1172 Expr::Value(_) => Ok(()),
1173 Expr::UnaryOp { expr, .. } => {
1174 ensure_one_expr(expr, columns)?;
1175 Ok(())
1176 }
1177 _ => error::InvalidSqlSnafu {
1178 msg: format!("Partition rule expr {:?} is not a binary expr", expr),
1179 }
1180 .fail(),
1181 }
1182}
1183
1184fn ensure_partition_columns_defined<'a>(
1186 columns: &'a [Column],
1187 partitions: &'a Partitions,
1188) -> Result<Vec<&'a Column>> {
1189 partitions
1190 .column_list
1191 .iter()
1192 .map(|x| {
1193 let x = ParserContext::canonicalize_identifier(x.clone());
1194 columns
1197 .iter()
1198 .find(|c| *c.name().value == x.value)
1199 .context(error::InvalidSqlSnafu {
1200 msg: format!("Partition column {:?} not defined", x.value),
1201 })
1202 })
1203 .collect::<Result<Vec<&Column>>>()
1204}
1205
1206#[cfg(test)]
1207mod tests {
1208 use std::assert_matches::assert_matches;
1209 use std::collections::HashMap;
1210
1211 use common_catalog::consts::FILE_ENGINE;
1212 use common_error::ext::ErrorExt;
1213 use sqlparser::ast::ColumnOption::NotNull;
1214 use sqlparser::ast::{BinaryOperator, Expr, ObjectName, ObjectNamePart, Value};
1215 use sqlparser::dialect::GenericDialect;
1216 use sqlparser::tokenizer::Tokenizer;
1217
1218 use super::*;
1219 use crate::dialect::GreptimeDbDialect;
1220 use crate::parser::ParseOptions;
1221
1222 #[test]
1223 fn test_parse_create_table_like() {
1224 let sql = "CREATE TABLE t1 LIKE t2";
1225 let stmts =
1226 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1227 .unwrap();
1228
1229 assert_eq!(1, stmts.len());
1230 match &stmts[0] {
1231 Statement::CreateTableLike(c) => {
1232 assert_eq!(c.table_name.to_string(), "t1");
1233 assert_eq!(c.source_name.to_string(), "t2");
1234 }
1235 _ => unreachable!(),
1236 }
1237 }
1238
1239 #[test]
1240 fn test_validate_external_table_options() {
1241 let sql = "CREATE EXTERNAL TABLE city (
1242 host string,
1243 ts timestamp,
1244 cpu float64 default 0,
1245 memory float64,
1246 TIME INDEX (ts),
1247 PRIMARY KEY(ts, host)
1248 ) with(location='/var/data/city.csv',format='csv',foo='bar');";
1249
1250 let result =
1251 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1252 assert!(matches!(
1253 result,
1254 Err(error::Error::InvalidTableOption { .. })
1255 ));
1256 }
1257
1258 #[test]
1259 fn test_parse_create_external_table() {
1260 struct Test<'a> {
1261 sql: &'a str,
1262 expected_table_name: &'a str,
1263 expected_options: HashMap<&'a str, &'a str>,
1264 expected_engine: &'a str,
1265 expected_if_not_exist: bool,
1266 }
1267
1268 let tests = [
1269 Test {
1270 sql: "CREATE EXTERNAL TABLE city with(location='/var/data/city.csv',format='csv');",
1271 expected_table_name: "city",
1272 expected_options: HashMap::from([
1273 ("location", "/var/data/city.csv"),
1274 ("format", "csv"),
1275 ]),
1276 expected_engine: FILE_ENGINE,
1277 expected_if_not_exist: false,
1278 },
1279 Test {
1280 sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv');",
1281 expected_table_name: "city",
1282 expected_options: HashMap::from([
1283 ("location", "/var/data/city.csv"),
1284 ("format", "csv"),
1285 ]),
1286 expected_engine: "foo",
1287 expected_if_not_exist: true,
1288 },
1289 Test {
1290 sql: "CREATE EXTERNAL TABLE IF NOT EXISTS city ENGINE=foo with(location='/var/data/city.csv',format='csv','compaction.type'='bar');",
1291 expected_table_name: "city",
1292 expected_options: HashMap::from([
1293 ("location", "/var/data/city.csv"),
1294 ("format", "csv"),
1295 ("compaction.type", "bar"),
1296 ]),
1297 expected_engine: "foo",
1298 expected_if_not_exist: true,
1299 },
1300 ];
1301
1302 for test in tests {
1303 let stmts = ParserContext::create_with_dialect(
1304 test.sql,
1305 &GreptimeDbDialect {},
1306 ParseOptions::default(),
1307 )
1308 .unwrap();
1309 assert_eq!(1, stmts.len());
1310 match &stmts[0] {
1311 Statement::CreateExternalTable(c) => {
1312 assert_eq!(c.name.to_string(), test.expected_table_name.to_string());
1313 assert_eq!(c.options.to_str_map(), test.expected_options);
1314 assert_eq!(c.if_not_exists, test.expected_if_not_exist);
1315 assert_eq!(c.engine, test.expected_engine);
1316 }
1317 _ => unreachable!(),
1318 }
1319 }
1320 }
1321
1322 #[test]
1323 fn test_parse_create_external_table_with_schema() {
1324 let sql = "CREATE EXTERNAL TABLE city (
1325 host string,
1326 ts timestamp,
1327 cpu float32 default 0,
1328 memory float64,
1329 TIME INDEX (ts),
1330 PRIMARY KEY(ts, host),
1331 ) with(location='/var/data/city.csv',format='csv');";
1332
1333 let options = HashMap::from([("location", "/var/data/city.csv"), ("format", "csv")]);
1334
1335 let stmts =
1336 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1337 .unwrap();
1338 assert_eq!(1, stmts.len());
1339 match &stmts[0] {
1340 Statement::CreateExternalTable(c) => {
1341 assert_eq!(c.name.to_string(), "city");
1342 assert_eq!(c.options.to_str_map(), options);
1343
1344 let columns = &c.columns;
1345 assert_column_def(&columns[0].column_def, "host", "STRING");
1346 assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
1347 assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
1348 assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
1349
1350 let constraints = &c.constraints;
1351 assert_eq!(
1352 &constraints[0],
1353 &TableConstraint::TimeIndex {
1354 column: Ident::new("ts"),
1355 }
1356 );
1357 assert_eq!(
1358 &constraints[1],
1359 &TableConstraint::PrimaryKey {
1360 columns: vec![Ident::new("ts"), Ident::new("host")]
1361 }
1362 );
1363 }
1364 _ => unreachable!(),
1365 }
1366 }
1367
1368 #[test]
1369 fn test_parse_create_database() {
1370 let sql = "create database";
1371 let result =
1372 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1373 assert!(
1374 result
1375 .unwrap_err()
1376 .to_string()
1377 .contains("Unexpected token while parsing SQL statement")
1378 );
1379
1380 let sql = "create database prometheus";
1381 let stmts =
1382 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1383 .unwrap();
1384
1385 assert_eq!(1, stmts.len());
1386 match &stmts[0] {
1387 Statement::CreateDatabase(c) => {
1388 assert_eq!(c.name.to_string(), "prometheus");
1389 assert!(!c.if_not_exists);
1390 }
1391 _ => unreachable!(),
1392 }
1393
1394 let sql = "create database if not exists prometheus";
1395 let stmts =
1396 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1397 .unwrap();
1398
1399 assert_eq!(1, stmts.len());
1400 match &stmts[0] {
1401 Statement::CreateDatabase(c) => {
1402 assert_eq!(c.name.to_string(), "prometheus");
1403 assert!(c.if_not_exists);
1404 }
1405 _ => unreachable!(),
1406 }
1407
1408 let sql = "CREATE DATABASE `fOo`";
1409 let result =
1410 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1411 let stmts = result.unwrap();
1412 match &stmts.last().unwrap() {
1413 Statement::CreateDatabase(c) => {
1414 assert_eq!(c.name, vec![Ident::with_quote('`', "fOo")].into());
1415 assert!(!c.if_not_exists);
1416 }
1417 _ => unreachable!(),
1418 }
1419
1420 let sql = "CREATE DATABASE prometheus with (ttl='1h');";
1421 let result =
1422 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1423 let stmts = result.unwrap();
1424 match &stmts[0] {
1425 Statement::CreateDatabase(c) => {
1426 assert_eq!(c.name.to_string(), "prometheus");
1427 assert!(!c.if_not_exists);
1428 assert_eq!(c.options.get("ttl").unwrap(), "1h");
1429 }
1430 _ => unreachable!(),
1431 }
1432 }
1433
1434 #[test]
1435 fn test_parse_create_flow_more_testcases() {
1436 use pretty_assertions::assert_eq;
1437 fn parse_create_flow(sql: &str) -> CreateFlow {
1438 let stmts = ParserContext::create_with_dialect(
1439 sql,
1440 &GreptimeDbDialect {},
1441 ParseOptions::default(),
1442 )
1443 .unwrap();
1444 assert_eq!(1, stmts.len());
1445 match &stmts[0] {
1446 Statement::CreateFlow(c) => c.clone(),
1447 _ => unreachable!(),
1448 }
1449 }
1450 struct CreateFlowWoutQuery {
1451 pub flow_name: ObjectName,
1453 pub sink_table_name: ObjectName,
1455 pub or_replace: bool,
1457 pub if_not_exists: bool,
1459 pub expire_after: Option<i64>,
1462 pub comment: Option<String>,
1464 }
1465 let testcases = vec![
1466 (
1467 r"
1468CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1469SINK TO schema_1.table_1
1470EXPIRE AFTER INTERVAL '5 minutes'
1471COMMENT 'test comment'
1472AS
1473SELECT max(c1), min(c2) FROM schema_2.table_2;",
1474 CreateFlowWoutQuery {
1475 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1476 sink_table_name: ObjectName::from(vec![
1477 Ident::new("schema_1"),
1478 Ident::new("table_1"),
1479 ]),
1480 or_replace: true,
1481 if_not_exists: true,
1482 expire_after: Some(300),
1483 comment: Some("test comment".to_string()),
1484 },
1485 ),
1486 (
1487 r"
1488CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1489SINK TO schema_1.table_1
1490EXPIRE AFTER INTERVAL '300 s'
1491COMMENT 'test comment'
1492AS
1493SELECT max(c1), min(c2) FROM schema_2.table_2;",
1494 CreateFlowWoutQuery {
1495 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1496 sink_table_name: ObjectName::from(vec![
1497 Ident::new("schema_1"),
1498 Ident::new("table_1"),
1499 ]),
1500 or_replace: true,
1501 if_not_exists: true,
1502 expire_after: Some(300),
1503 comment: Some("test comment".to_string()),
1504 },
1505 ),
1506 (
1507 r"
1508CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1509SINK TO schema_1.table_1
1510EXPIRE AFTER '5 minutes'
1511COMMENT 'test comment'
1512AS
1513SELECT max(c1), min(c2) FROM schema_2.table_2;",
1514 CreateFlowWoutQuery {
1515 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1516 sink_table_name: ObjectName::from(vec![
1517 Ident::new("schema_1"),
1518 Ident::new("table_1"),
1519 ]),
1520 or_replace: true,
1521 if_not_exists: true,
1522 expire_after: Some(300),
1523 comment: Some("test comment".to_string()),
1524 },
1525 ),
1526 (
1527 r"
1528CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1529SINK TO schema_1.table_1
1530EXPIRE AFTER '300 s'
1531COMMENT 'test comment'
1532AS
1533SELECT max(c1), min(c2) FROM schema_2.table_2;",
1534 CreateFlowWoutQuery {
1535 flow_name: ObjectName::from(vec![Ident::new("task_1")]),
1536 sink_table_name: ObjectName::from(vec![
1537 Ident::new("schema_1"),
1538 Ident::new("table_1"),
1539 ]),
1540 or_replace: true,
1541 if_not_exists: true,
1542 expire_after: Some(300),
1543 comment: Some("test comment".to_string()),
1544 },
1545 ),
1546 (
1547 r"
1548CREATE FLOW `task_2`
1549SINK TO schema_1.table_1
1550EXPIRE AFTER '2 days 1h 2 min'
1551AS
1552SELECT max(c1), min(c2) FROM schema_2.table_2;",
1553 CreateFlowWoutQuery {
1554 flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_2")]),
1555 sink_table_name: ObjectName::from(vec![
1556 Ident::new("schema_1"),
1557 Ident::new("table_1"),
1558 ]),
1559 or_replace: false,
1560 if_not_exists: false,
1561 expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1562 comment: None,
1563 },
1564 ),
1565 (
1566 r"
1567create flow `task_3`
1568sink to schema_1.table_1
1569expire after '10 minutes'
1570as
1571select max(c1), min(c2) from schema_2.table_2;",
1572 CreateFlowWoutQuery {
1573 flow_name: ObjectName::from(vec![Ident::with_quote('`', "task_3")]),
1574 sink_table_name: ObjectName::from(vec![
1575 Ident::new("schema_1"),
1576 Ident::new("table_1"),
1577 ]),
1578 or_replace: false,
1579 if_not_exists: false,
1580 expire_after: Some(600), comment: None,
1582 },
1583 ),
1584 (
1585 r"
1586create or replace flow if not exists task_4
1587sink to schema_1.table_1
1588expire after interval '2 hours'
1589comment 'lowercase test'
1590as
1591select max(c1), min(c2) from schema_2.table_2;",
1592 CreateFlowWoutQuery {
1593 flow_name: ObjectName::from(vec![Ident::new("task_4")]),
1594 sink_table_name: ObjectName::from(vec![
1595 Ident::new("schema_1"),
1596 Ident::new("table_1"),
1597 ]),
1598 or_replace: true,
1599 if_not_exists: true,
1600 expire_after: Some(7200), comment: Some("lowercase test".to_string()),
1602 },
1603 ),
1604 ];
1605
1606 for (sql, expected) in testcases {
1607 let create_task = parse_create_flow(sql);
1608
1609 let expected = CreateFlow {
1610 flow_name: expected.flow_name,
1611 sink_table_name: expected.sink_table_name,
1612 or_replace: expected.or_replace,
1613 if_not_exists: expected.if_not_exists,
1614 expire_after: expected.expire_after,
1615 eval_interval: None,
1616 comment: expected.comment,
1617 query: create_task.query.clone(),
1619 };
1620
1621 assert_eq!(create_task, expected, "input sql is:\n{sql}");
1622 let show_create = create_task.to_string();
1623 let recreated = parse_create_flow(&show_create);
1624 assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1625 }
1626 }
1627
1628 #[test]
1629 fn test_parse_create_flow() {
1630 use pretty_assertions::assert_eq;
1631 fn parse_create_flow(sql: &str) -> CreateFlow {
1632 let stmts = ParserContext::create_with_dialect(
1633 sql,
1634 &GreptimeDbDialect {},
1635 ParseOptions::default(),
1636 )
1637 .unwrap();
1638 assert_eq!(1, stmts.len());
1639 match &stmts[0] {
1640 Statement::CreateFlow(c) => c.clone(),
1641 _ => panic!("{:?}", stmts[0]),
1642 }
1643 }
1644 struct CreateFlowWoutQuery {
1645 pub flow_name: ObjectName,
1647 pub sink_table_name: ObjectName,
1649 pub or_replace: bool,
1651 pub if_not_exists: bool,
1653 pub expire_after: Option<i64>,
1656 pub eval_interval: Option<i64>,
1660 pub comment: Option<String>,
1662 }
1663
1664 let testcases = vec![
1666 (
1667 r"
1668CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1669SINK TO schema_1.table_1
1670EXPIRE AFTER INTERVAL '5 minutes'
1671COMMENT 'test comment'
1672AS
1673SELECT max(c1), min(c2) FROM schema_2.table_2;",
1674 CreateFlowWoutQuery {
1675 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1676 sink_table_name: ObjectName(vec![
1677 ObjectNamePart::Identifier(Ident::new("schema_1")),
1678 ObjectNamePart::Identifier(Ident::new("table_1")),
1679 ]),
1680 or_replace: true,
1681 if_not_exists: true,
1682 expire_after: Some(300),
1683 eval_interval: None,
1684 comment: Some("test comment".to_string()),
1685 },
1686 ),
1687 (
1688 r"
1689CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1690SINK TO schema_1.table_1
1691EXPIRE AFTER INTERVAL '300 s'
1692COMMENT 'test comment'
1693AS
1694SELECT max(c1), min(c2) FROM schema_2.table_2;",
1695 CreateFlowWoutQuery {
1696 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1697 sink_table_name: ObjectName(vec![
1698 ObjectNamePart::Identifier(Ident::new("schema_1")),
1699 ObjectNamePart::Identifier(Ident::new("table_1")),
1700 ]),
1701 or_replace: true,
1702 if_not_exists: true,
1703 expire_after: Some(300),
1704 eval_interval: None,
1705 comment: Some("test comment".to_string()),
1706 },
1707 ),
1708 (
1709 r"
1710CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1711SINK TO schema_1.table_1
1712EXPIRE AFTER '5 minutes'
1713EVAL INTERVAL '10 seconds'
1714COMMENT 'test comment'
1715AS
1716SELECT max(c1), min(c2) FROM schema_2.table_2;",
1717 CreateFlowWoutQuery {
1718 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1719 sink_table_name: ObjectName(vec![
1720 ObjectNamePart::Identifier(Ident::new("schema_1")),
1721 ObjectNamePart::Identifier(Ident::new("table_1")),
1722 ]),
1723 or_replace: true,
1724 if_not_exists: true,
1725 expire_after: Some(300),
1726 eval_interval: Some(10),
1727 comment: Some("test comment".to_string()),
1728 },
1729 ),
1730 (
1731 r"
1732CREATE OR REPLACE FLOW IF NOT EXISTS task_1
1733SINK TO schema_1.table_1
1734EXPIRE AFTER '5 minutes'
1735EVAL INTERVAL INTERVAL '10 seconds'
1736COMMENT 'test comment'
1737AS
1738SELECT max(c1), min(c2) FROM schema_2.table_2;",
1739 CreateFlowWoutQuery {
1740 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_1"))]),
1741 sink_table_name: ObjectName(vec![
1742 ObjectNamePart::Identifier(Ident::new("schema_1")),
1743 ObjectNamePart::Identifier(Ident::new("table_1")),
1744 ]),
1745 or_replace: true,
1746 if_not_exists: true,
1747 expire_after: Some(300),
1748 eval_interval: Some(10),
1749 comment: Some("test comment".to_string()),
1750 },
1751 ),
1752 (
1753 r"
1754CREATE FLOW `task_2`
1755SINK TO schema_1.table_1
1756EXPIRE AFTER '2 days 1h 2 min'
1757AS
1758SELECT max(c1), min(c2) FROM schema_2.table_2;",
1759 CreateFlowWoutQuery {
1760 flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::with_quote(
1761 '`', "task_2",
1762 ))]),
1763 sink_table_name: ObjectName(vec![
1764 ObjectNamePart::Identifier(Ident::new("schema_1")),
1765 ObjectNamePart::Identifier(Ident::new("table_1")),
1766 ]),
1767 or_replace: false,
1768 if_not_exists: false,
1769 expire_after: Some(2 * 86400 + 3600 + 2 * 60),
1770 eval_interval: None,
1771 comment: None,
1772 },
1773 ),
1774 ];
1775
1776 for (sql, expected) in testcases {
1777 let create_task = parse_create_flow(sql);
1778
1779 let expected = CreateFlow {
1780 flow_name: expected.flow_name,
1781 sink_table_name: expected.sink_table_name,
1782 or_replace: expected.or_replace,
1783 if_not_exists: expected.if_not_exists,
1784 expire_after: expected.expire_after,
1785 eval_interval: expected.eval_interval,
1786 comment: expected.comment,
1787 query: create_task.query.clone(),
1789 };
1790
1791 assert_eq!(create_task, expected, "input sql is:\n{sql}");
1792 let show_create = create_task.to_string();
1793 let recreated = parse_create_flow(&show_create);
1794 assert_eq!(recreated, expected, "input sql is:\n{show_create}");
1795 }
1796 }
1797
1798 #[test]
1799 fn test_create_flow_no_month() {
1800 let sql = r"
1801CREATE FLOW `task_2`
1802SINK TO schema_1.table_1
1803EXPIRE AFTER '1 month 2 days 1h 2 min'
1804AS
1805SELECT max(c1), min(c2) FROM schema_2.table_2;";
1806 let stmts =
1807 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1808
1809 assert!(
1810 stmts.is_err()
1811 && stmts
1812 .unwrap_err()
1813 .to_string()
1814 .contains("Interval with months is not allowed")
1815 );
1816 }
1817
1818 #[test]
1819 fn test_validate_create() {
1820 let sql = r"
1821CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp TIME INDEX)
1822PARTITION ON COLUMNS(c, a) (
1823 a < 10,
1824 a > 10 AND a < 20,
1825 a > 20 AND c < 100,
1826 a > 20 AND c >= 100
1827)
1828ENGINE=mito";
1829 let result =
1830 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1831 let _ = result.unwrap();
1832
1833 let sql = r"
1834CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
1835PARTITION ON COLUMNS(x) ()
1836ENGINE=mito";
1837 let result =
1838 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1839 assert!(
1840 result
1841 .unwrap_err()
1842 .to_string()
1843 .contains("Partition column \"x\" not defined")
1844 );
1845 }
1846
1847 #[test]
1848 fn test_parse_create_table_with_partitions() {
1849 let sql = r"
1850CREATE TABLE monitor (
1851 host_id INT,
1852 idc STRING,
1853 ts TIMESTAMP,
1854 cpu DOUBLE DEFAULT 0,
1855 memory DOUBLE,
1856 TIME INDEX (ts),
1857 PRIMARY KEY (host),
1858)
1859PARTITION ON COLUMNS(idc, host_id) (
1860 idc <= 'hz' AND host_id < 1000,
1861 idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
1862 idc > 'sh' AND host_id < 3000,
1863 idc > 'sh' AND host_id >= 3000,
1864)
1865ENGINE=mito";
1866 let result =
1867 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1868 .unwrap();
1869 assert_eq!(result.len(), 1);
1870 match &result[0] {
1871 Statement::CreateTable(c) => {
1872 assert!(c.partitions.is_some());
1873
1874 let partitions = c.partitions.as_ref().unwrap();
1875 let column_list = partitions
1876 .column_list
1877 .iter()
1878 .map(|x| &x.value)
1879 .collect::<Vec<&String>>();
1880 assert_eq!(column_list, vec!["idc", "host_id"]);
1881
1882 let exprs = &partitions.exprs;
1883
1884 assert_eq!(
1885 exprs[0],
1886 Expr::BinaryOp {
1887 left: Box::new(Expr::BinaryOp {
1888 left: Box::new(Expr::Identifier("idc".into())),
1889 op: BinaryOperator::LtEq,
1890 right: Box::new(Expr::Value(
1891 Value::SingleQuotedString("hz".to_string()).into()
1892 ))
1893 }),
1894 op: BinaryOperator::And,
1895 right: Box::new(Expr::BinaryOp {
1896 left: Box::new(Expr::Identifier("host_id".into())),
1897 op: BinaryOperator::Lt,
1898 right: Box::new(Expr::Value(
1899 Value::Number("1000".to_string(), false).into()
1900 ))
1901 })
1902 }
1903 );
1904 assert_eq!(
1905 exprs[1],
1906 Expr::BinaryOp {
1907 left: Box::new(Expr::BinaryOp {
1908 left: Box::new(Expr::BinaryOp {
1909 left: Box::new(Expr::Identifier("idc".into())),
1910 op: BinaryOperator::Gt,
1911 right: Box::new(Expr::Value(
1912 Value::SingleQuotedString("hz".to_string()).into()
1913 ))
1914 }),
1915 op: BinaryOperator::And,
1916 right: Box::new(Expr::BinaryOp {
1917 left: Box::new(Expr::Identifier("idc".into())),
1918 op: BinaryOperator::LtEq,
1919 right: Box::new(Expr::Value(
1920 Value::SingleQuotedString("sh".to_string()).into()
1921 ))
1922 })
1923 }),
1924 op: BinaryOperator::And,
1925 right: Box::new(Expr::BinaryOp {
1926 left: Box::new(Expr::Identifier("host_id".into())),
1927 op: BinaryOperator::Lt,
1928 right: Box::new(Expr::Value(
1929 Value::Number("2000".to_string(), false).into()
1930 ))
1931 })
1932 }
1933 );
1934 assert_eq!(
1935 exprs[2],
1936 Expr::BinaryOp {
1937 left: Box::new(Expr::BinaryOp {
1938 left: Box::new(Expr::Identifier("idc".into())),
1939 op: BinaryOperator::Gt,
1940 right: Box::new(Expr::Value(
1941 Value::SingleQuotedString("sh".to_string()).into()
1942 ))
1943 }),
1944 op: BinaryOperator::And,
1945 right: Box::new(Expr::BinaryOp {
1946 left: Box::new(Expr::Identifier("host_id".into())),
1947 op: BinaryOperator::Lt,
1948 right: Box::new(Expr::Value(
1949 Value::Number("3000".to_string(), false).into()
1950 ))
1951 })
1952 }
1953 );
1954 assert_eq!(
1955 exprs[3],
1956 Expr::BinaryOp {
1957 left: Box::new(Expr::BinaryOp {
1958 left: Box::new(Expr::Identifier("idc".into())),
1959 op: BinaryOperator::Gt,
1960 right: Box::new(Expr::Value(
1961 Value::SingleQuotedString("sh".to_string()).into()
1962 ))
1963 }),
1964 op: BinaryOperator::And,
1965 right: Box::new(Expr::BinaryOp {
1966 left: Box::new(Expr::Identifier("host_id".into())),
1967 op: BinaryOperator::GtEq,
1968 right: Box::new(Expr::Value(
1969 Value::Number("3000".to_string(), false).into()
1970 ))
1971 })
1972 }
1973 );
1974 }
1975 _ => unreachable!(),
1976 }
1977 }
1978
1979 #[test]
1980 fn test_parse_create_table_with_quoted_partitions() {
1981 let sql = r"
1982CREATE TABLE monitor (
1983 `host_id` INT,
1984 idc STRING,
1985 ts TIMESTAMP,
1986 cpu DOUBLE DEFAULT 0,
1987 memory DOUBLE,
1988 TIME INDEX (ts),
1989 PRIMARY KEY (host),
1990)
1991PARTITION ON COLUMNS(IdC, host_id) (
1992 idc <= 'hz' AND host_id < 1000,
1993 idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
1994 idc > 'sh' AND host_id < 3000,
1995 idc > 'sh' AND host_id >= 3000,
1996)";
1997 let result =
1998 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1999 .unwrap();
2000 assert_eq!(result.len(), 1);
2001 }
2002
2003 #[test]
2004 fn test_parse_create_table_with_timestamp_index() {
2005 let sql1 = r"
2006CREATE TABLE monitor (
2007 host_id INT,
2008 idc STRING,
2009 ts TIMESTAMP TIME INDEX,
2010 cpu DOUBLE DEFAULT 0,
2011 memory DOUBLE,
2012 PRIMARY KEY (host),
2013)
2014ENGINE=mito";
2015 let result1 = ParserContext::create_with_dialect(
2016 sql1,
2017 &GreptimeDbDialect {},
2018 ParseOptions::default(),
2019 )
2020 .unwrap();
2021
2022 if let Statement::CreateTable(c) = &result1[0] {
2023 assert_eq!(c.constraints.len(), 2);
2024 let tc = c.constraints[0].clone();
2025 match tc {
2026 TableConstraint::TimeIndex { column } => {
2027 assert_eq!(&column.value, "ts");
2028 }
2029 _ => panic!("should be time index constraint"),
2030 };
2031 } else {
2032 panic!("should be create_table statement");
2033 }
2034
2035 let sql2 = r"
2038CREATE TABLE monitor (
2039 host_id INT,
2040 idc STRING,
2041 ts TIMESTAMP NOT NULL,
2042 cpu DOUBLE DEFAULT 0,
2043 memory DOUBLE,
2044 TIME INDEX (ts),
2045 PRIMARY KEY (host),
2046)
2047ENGINE=mito";
2048 let result2 = ParserContext::create_with_dialect(
2049 sql2,
2050 &GreptimeDbDialect {},
2051 ParseOptions::default(),
2052 )
2053 .unwrap();
2054
2055 assert_eq!(result1, result2);
2056
2057 let sql3 = r"
2059CREATE TABLE monitor (
2060 host_id INT,
2061 idc STRING,
2062 ts TIMESTAMP,
2063 cpu DOUBLE DEFAULT 0,
2064 memory DOUBLE,
2065 TIME INDEX (ts),
2066 PRIMARY KEY (host),
2067)
2068ENGINE=mito";
2069
2070 let result3 = ParserContext::create_with_dialect(
2071 sql3,
2072 &GreptimeDbDialect {},
2073 ParseOptions::default(),
2074 )
2075 .unwrap();
2076
2077 assert_ne!(result1, result3);
2078
2079 let sql1 = r"
2081CREATE TABLE monitor (
2082 host_id INT,
2083 idc STRING,
2084 b bigint TIME INDEX,
2085 cpu DOUBLE DEFAULT 0,
2086 memory DOUBLE,
2087 PRIMARY KEY (host),
2088)
2089ENGINE=mito";
2090 let result1 = ParserContext::create_with_dialect(
2091 sql1,
2092 &GreptimeDbDialect {},
2093 ParseOptions::default(),
2094 );
2095
2096 assert!(
2097 result1
2098 .unwrap_err()
2099 .to_string()
2100 .contains("time index column data type should be timestamp")
2101 );
2102 }
2103
2104 #[test]
2105 fn test_parse_create_table_with_timestamp_index_not_null() {
2106 let sql = r"
2107CREATE TABLE monitor (
2108 host_id INT,
2109 idc STRING,
2110 ts TIMESTAMP TIME INDEX,
2111 cpu DOUBLE DEFAULT 0,
2112 memory DOUBLE,
2113 TIME INDEX (ts),
2114 PRIMARY KEY (host),
2115)
2116ENGINE=mito";
2117 let result =
2118 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2119 .unwrap();
2120
2121 assert_eq!(result.len(), 1);
2122 if let Statement::CreateTable(c) = &result[0] {
2123 let ts = c.columns[2].clone();
2124 assert_eq!(ts.name().to_string(), "ts");
2125 assert_eq!(ts.options()[0].option, NotNull);
2126 } else {
2127 panic!("should be create table statement");
2128 }
2129
2130 let sql1 = r"
2131CREATE TABLE monitor (
2132 host_id INT,
2133 idc STRING,
2134 ts TIMESTAMP NOT NULL TIME INDEX,
2135 cpu DOUBLE DEFAULT 0,
2136 memory DOUBLE,
2137 TIME INDEX (ts),
2138 PRIMARY KEY (host),
2139)
2140ENGINE=mito";
2141
2142 let result1 = ParserContext::create_with_dialect(
2143 sql1,
2144 &GreptimeDbDialect {},
2145 ParseOptions::default(),
2146 )
2147 .unwrap();
2148 assert_eq!(result, result1);
2149
2150 let sql2 = r"
2151CREATE TABLE monitor (
2152 host_id INT,
2153 idc STRING,
2154 ts TIMESTAMP TIME INDEX NOT NULL,
2155 cpu DOUBLE DEFAULT 0,
2156 memory DOUBLE,
2157 TIME INDEX (ts),
2158 PRIMARY KEY (host),
2159)
2160ENGINE=mito";
2161
2162 let result2 = ParserContext::create_with_dialect(
2163 sql2,
2164 &GreptimeDbDialect {},
2165 ParseOptions::default(),
2166 )
2167 .unwrap();
2168 assert_eq!(result, result2);
2169
2170 let sql3 = r"
2171CREATE TABLE monitor (
2172 host_id INT,
2173 idc STRING,
2174 ts TIMESTAMP TIME INDEX NULL NOT,
2175 cpu DOUBLE DEFAULT 0,
2176 memory DOUBLE,
2177 TIME INDEX (ts),
2178 PRIMARY KEY (host),
2179)
2180ENGINE=mito";
2181
2182 let result3 = ParserContext::create_with_dialect(
2183 sql3,
2184 &GreptimeDbDialect {},
2185 ParseOptions::default(),
2186 );
2187 assert!(result3.is_err());
2188
2189 let sql4 = r"
2190CREATE TABLE monitor (
2191 host_id INT,
2192 idc STRING,
2193 ts TIMESTAMP TIME INDEX NOT NULL NULL,
2194 cpu DOUBLE DEFAULT 0,
2195 memory DOUBLE,
2196 TIME INDEX (ts),
2197 PRIMARY KEY (host),
2198)
2199ENGINE=mito";
2200
2201 let result4 = ParserContext::create_with_dialect(
2202 sql4,
2203 &GreptimeDbDialect {},
2204 ParseOptions::default(),
2205 );
2206 assert!(result4.is_err());
2207
2208 let sql = r"
2209CREATE TABLE monitor (
2210 host_id INT,
2211 idc STRING,
2212 ts TIMESTAMP TIME INDEX DEFAULT CURRENT_TIMESTAMP,
2213 cpu DOUBLE DEFAULT 0,
2214 memory DOUBLE,
2215 TIME INDEX (ts),
2216 PRIMARY KEY (host),
2217)
2218ENGINE=mito";
2219
2220 let result =
2221 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2222 .unwrap();
2223
2224 if let Statement::CreateTable(c) = &result[0] {
2225 let tc = c.constraints[0].clone();
2226 match tc {
2227 TableConstraint::TimeIndex { column } => {
2228 assert_eq!(&column.value, "ts");
2229 }
2230 _ => panic!("should be time index constraint"),
2231 }
2232 let ts = c.columns[2].clone();
2233 assert_eq!(ts.name().to_string(), "ts");
2234 assert!(matches!(ts.options()[0].option, ColumnOption::Default(..)));
2235 assert_eq!(ts.options()[1].option, NotNull);
2236 } else {
2237 unreachable!("should be create table statement");
2238 }
2239 }
2240
2241 #[test]
2242 fn test_parse_partitions_with_error_syntax() {
2243 let sql = r"
2244CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2245PARTITION COLUMNS(c, a) (
2246 a < 10,
2247 a > 10 AND a < 20,
2248 a > 20 AND c < 100,
2249 a > 20 AND c >= 100
2250)
2251ENGINE=mito";
2252 let result =
2253 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2254 assert!(
2255 result
2256 .unwrap_err()
2257 .output_msg()
2258 .contains("sql parser error: Expected: ON, found: COLUMNS")
2259 );
2260 }
2261
2262 #[test]
2263 fn test_parse_partitions_without_rule() {
2264 let sql = r"
2265CREATE TABLE rcx ( a INT, b STRING, c INT, d TIMESTAMP TIME INDEX )
2266PARTITION ON COLUMNS(c, a) ()
2267ENGINE=mito";
2268 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2269 .unwrap();
2270 }
2271
2272 #[test]
2273 fn test_parse_partitions_unreferenced_column() {
2274 let sql = r"
2275CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2276PARTITION ON COLUMNS(c, a) (
2277 b = 'foo'
2278)
2279ENGINE=mito";
2280 let result =
2281 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2282 assert_eq!(
2283 result.unwrap_err().output_msg(),
2284 "Invalid SQL, error: Column \"b\" in rule expr is not referenced in PARTITION ON"
2285 );
2286 }
2287
2288 #[test]
2289 fn test_parse_partitions_not_binary_expr() {
2290 let sql = r"
2291CREATE TABLE rcx ( ts TIMESTAMP TIME INDEX, a INT, b STRING, c INT )
2292PARTITION ON COLUMNS(c, a) (
2293 b
2294)
2295ENGINE=mito";
2296 let result =
2297 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2298 assert_eq!(
2299 result.unwrap_err().output_msg(),
2300 r#"Invalid SQL, error: Partition rule expr Identifier(Ident { value: "b", quote_style: None, span: Span(Location(4,5)..Location(4,6)) }) is not a binary expr"#
2301 );
2302 }
2303
2304 fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {
2305 assert_eq!(column.name.to_string(), name);
2306 assert_eq!(column.data_type.to_string(), data_type);
2307 }
2308
2309 #[test]
2310 pub fn test_parse_create_table() {
2311 let sql = r"create table demo(
2312 host string,
2313 ts timestamp,
2314 cpu float32 default 0,
2315 memory float64,
2316 TIME INDEX (ts),
2317 PRIMARY KEY(ts, host),
2318 ) engine=mito
2319 with(ttl='10s');
2320 ";
2321 let result =
2322 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2323 .unwrap();
2324 assert_eq!(1, result.len());
2325 match &result[0] {
2326 Statement::CreateTable(c) => {
2327 assert!(!c.if_not_exists);
2328 assert_eq!("demo", c.name.to_string());
2329 assert_eq!("mito", c.engine);
2330 assert_eq!(4, c.columns.len());
2331 let columns = &c.columns;
2332 assert_column_def(&columns[0].column_def, "host", "STRING");
2333 assert_column_def(&columns[1].column_def, "ts", "TIMESTAMP");
2334 assert_column_def(&columns[2].column_def, "cpu", "FLOAT");
2335 assert_column_def(&columns[3].column_def, "memory", "DOUBLE");
2336
2337 let constraints = &c.constraints;
2338 assert_eq!(
2339 &constraints[0],
2340 &TableConstraint::TimeIndex {
2341 column: Ident::new("ts"),
2342 }
2343 );
2344 assert_eq!(
2345 &constraints[1],
2346 &TableConstraint::PrimaryKey {
2347 columns: vec![Ident::new("ts"), Ident::new("host")]
2348 }
2349 );
2350 assert_eq!(1, c.options.len());
2352 assert_eq!(
2353 [("ttl", "10s")].into_iter().collect::<HashMap<_, _>>(),
2354 c.options.to_str_map()
2355 );
2356 }
2357 _ => unreachable!(),
2358 }
2359 }
2360
2361 #[test]
2362 fn test_invalid_index_keys() {
2363 let sql = r"create table demo(
2364 host string,
2365 ts int64,
2366 cpu float64 default 0,
2367 memory float64,
2368 TIME INDEX (ts, host),
2369 PRIMARY KEY(ts, host)) engine=mito;
2370 ";
2371 let result =
2372 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2373 assert!(result.is_err());
2374 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2375 }
2376
2377 #[test]
2378 fn test_duplicated_time_index() {
2379 let sql = r"create table demo(
2380 host string,
2381 ts timestamp time index,
2382 t timestamp time index,
2383 cpu float64 default 0,
2384 memory float64,
2385 TIME INDEX (ts, host),
2386 PRIMARY KEY(ts, host)) engine=mito;
2387 ";
2388 let result =
2389 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2390 assert!(result.is_err());
2391 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2392
2393 let sql = r"create table demo(
2394 host string,
2395 ts timestamp time index,
2396 cpu float64 default 0,
2397 t timestamp,
2398 memory float64,
2399 TIME INDEX (t),
2400 PRIMARY KEY(ts, host)) engine=mito;
2401 ";
2402 let result =
2403 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2404 assert!(result.is_err());
2405 assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. }));
2406 }
2407
2408 #[test]
2409 fn test_invalid_column_name() {
2410 let sql = "create table foo(user string, i timestamp time index)";
2411 let result =
2412 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2413 let err = result.unwrap_err().output_msg();
2414 assert!(err.contains("Cannot use keyword 'user' as column name"));
2415
2416 let sql = r#"
2418 create table foo("user" string, i timestamp time index)
2419 "#;
2420 let result =
2421 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2422 let _ = result.unwrap();
2423 }
2424
2425 #[test]
2426 fn test_incorrect_default_value_issue_3479() {
2427 let sql = r#"CREATE TABLE `ExcePTuRi`(
2428non TIMESTAMP(6) TIME INDEX,
2429`iUSTO` DOUBLE DEFAULT 0.047318541668048164
2430)"#;
2431 let result =
2432 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2433 .unwrap();
2434 assert_eq!(1, result.len());
2435 match &result[0] {
2436 Statement::CreateTable(c) => {
2437 assert_eq!(
2438 "`iUSTO` DOUBLE DEFAULT 0.047318541668048164",
2439 c.columns[1].to_string()
2440 );
2441 }
2442 _ => unreachable!(),
2443 }
2444 }
2445
2446 #[test]
2447 fn test_parse_create_view() {
2448 let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
2449
2450 let result =
2451 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2452 .unwrap();
2453 match &result[0] {
2454 Statement::CreateView(c) => {
2455 assert_eq!(c.to_string(), sql);
2456 assert!(!c.or_replace);
2457 assert!(!c.if_not_exists);
2458 assert_eq!("test", c.name.to_string());
2459 }
2460 _ => unreachable!(),
2461 }
2462
2463 let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS";
2464
2465 let result =
2466 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2467 .unwrap();
2468 match &result[0] {
2469 Statement::CreateView(c) => {
2470 assert_eq!(c.to_string(), sql);
2471 assert!(c.or_replace);
2472 assert!(c.if_not_exists);
2473 assert_eq!("test", c.name.to_string());
2474 }
2475 _ => unreachable!(),
2476 }
2477 }
2478
2479 #[test]
2480 fn test_parse_create_view_invalid_query() {
2481 let sql = "CREATE VIEW test AS DELETE from demo";
2482 let result =
2483 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2484 assert!(result.is_ok_and(|x| x.len() == 1));
2485 }
2486
2487 #[test]
2488 fn test_parse_create_table_fulltext_options() {
2489 let sql1 = r"
2490CREATE TABLE log (
2491 ts TIMESTAMP TIME INDEX,
2492 msg TEXT FULLTEXT INDEX,
2493)";
2494 let result1 = ParserContext::create_with_dialect(
2495 sql1,
2496 &GreptimeDbDialect {},
2497 ParseOptions::default(),
2498 )
2499 .unwrap();
2500
2501 if let Statement::CreateTable(c) = &result1[0] {
2502 c.columns.iter().for_each(|col| {
2503 if col.name().value == "msg" {
2504 assert!(
2505 col.extensions
2506 .fulltext_index_options
2507 .as_ref()
2508 .unwrap()
2509 .is_empty()
2510 );
2511 }
2512 });
2513 } else {
2514 panic!("should be create_table statement");
2515 }
2516
2517 let sql2 = r"
2518CREATE TABLE log (
2519 ts TIMESTAMP TIME INDEX,
2520 msg STRING FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false')
2521)";
2522 let result2 = ParserContext::create_with_dialect(
2523 sql2,
2524 &GreptimeDbDialect {},
2525 ParseOptions::default(),
2526 )
2527 .unwrap();
2528
2529 if let Statement::CreateTable(c) = &result2[0] {
2530 c.columns.iter().for_each(|col| {
2531 if col.name().value == "msg" {
2532 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2533 assert_eq!(options.len(), 2);
2534 assert_eq!(options.get("analyzer").unwrap(), "English");
2535 assert_eq!(options.get("case_sensitive").unwrap(), "false");
2536 }
2537 });
2538 } else {
2539 panic!("should be create_table statement");
2540 }
2541
2542 let sql3 = r"
2543CREATE TABLE log (
2544 ts TIMESTAMP TIME INDEX,
2545 msg1 TINYTEXT FULLTEXT INDEX WITH (analyzer='English', case_sensitive='false'),
2546 msg2 CHAR(20) FULLTEXT INDEX WITH (analyzer='Chinese', case_sensitive='true')
2547)";
2548 let result3 = ParserContext::create_with_dialect(
2549 sql3,
2550 &GreptimeDbDialect {},
2551 ParseOptions::default(),
2552 )
2553 .unwrap();
2554
2555 if let Statement::CreateTable(c) = &result3[0] {
2556 c.columns.iter().for_each(|col| {
2557 if col.name().value == "msg1" {
2558 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2559 assert_eq!(options.len(), 2);
2560 assert_eq!(options.get("analyzer").unwrap(), "English");
2561 assert_eq!(options.get("case_sensitive").unwrap(), "false");
2562 } else if col.name().value == "msg2" {
2563 let options = col.extensions.fulltext_index_options.as_ref().unwrap();
2564 assert_eq!(options.len(), 2);
2565 assert_eq!(options.get("analyzer").unwrap(), "Chinese");
2566 assert_eq!(options.get("case_sensitive").unwrap(), "true");
2567 }
2568 });
2569 } else {
2570 panic!("should be create_table statement");
2571 }
2572 }
2573
2574 #[test]
2575 fn test_parse_create_table_fulltext_options_invalid_type() {
2576 let sql = r"
2577CREATE TABLE log (
2578 ts TIMESTAMP TIME INDEX,
2579 msg INT FULLTEXT INDEX,
2580)";
2581 let result =
2582 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2583 assert!(result.is_err());
2584 assert!(
2585 result
2586 .unwrap_err()
2587 .to_string()
2588 .contains("FULLTEXT index only supports string type")
2589 );
2590 }
2591
2592 #[test]
2593 fn test_parse_create_table_fulltext_options_duplicate() {
2594 let sql = r"
2595CREATE TABLE log (
2596 ts TIMESTAMP TIME INDEX,
2597 msg STRING FULLTEXT INDEX WITH (analyzer='English', analyzer='Chinese') FULLTEXT INDEX WITH (case_sensitive='false')
2598)";
2599 let result =
2600 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2601 assert!(result.is_err());
2602 assert!(
2603 result
2604 .unwrap_err()
2605 .to_string()
2606 .contains("duplicated FULLTEXT INDEX option")
2607 );
2608 }
2609
2610 #[test]
2611 fn test_parse_create_table_fulltext_options_invalid_option() {
2612 let sql = r"
2613CREATE TABLE log (
2614 ts TIMESTAMP TIME INDEX,
2615 msg STRING FULLTEXT INDEX WITH (analyzer='English', invalid_option='Chinese')
2616)";
2617 let result =
2618 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2619 assert!(result.is_err());
2620 assert!(
2621 result
2622 .unwrap_err()
2623 .to_string()
2624 .contains("invalid FULLTEXT INDEX option")
2625 );
2626 }
2627
2628 #[test]
2629 fn test_parse_create_table_skip_options() {
2630 let sql = r"
2631CREATE TABLE log (
2632 ts TIMESTAMP TIME INDEX,
2633 msg INT SKIPPING INDEX WITH (granularity='8192', type='bloom'),
2634)";
2635 let result =
2636 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2637 .unwrap();
2638
2639 if let Statement::CreateTable(c) = &result[0] {
2640 c.columns.iter().for_each(|col| {
2641 if col.name().value == "msg" {
2642 assert!(
2643 !col.extensions
2644 .skipping_index_options
2645 .as_ref()
2646 .unwrap()
2647 .is_empty()
2648 );
2649 }
2650 });
2651 } else {
2652 panic!("should be create_table statement");
2653 }
2654
2655 let sql = r"
2656 CREATE TABLE log (
2657 ts TIMESTAMP TIME INDEX,
2658 msg INT SKIPPING INDEX,
2659 )";
2660 let result =
2661 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2662 .unwrap();
2663
2664 if let Statement::CreateTable(c) = &result[0] {
2665 c.columns.iter().for_each(|col| {
2666 if col.name().value == "msg" {
2667 assert!(
2668 col.extensions
2669 .skipping_index_options
2670 .as_ref()
2671 .unwrap()
2672 .is_empty()
2673 );
2674 }
2675 });
2676 } else {
2677 panic!("should be create_table statement");
2678 }
2679 }
2680
2681 #[test]
2682 fn test_parse_create_view_with_columns() {
2683 let sql = "CREATE VIEW test () AS SELECT * FROM NUMBERS";
2684 let result =
2685 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2686 .unwrap();
2687
2688 match &result[0] {
2689 Statement::CreateView(c) => {
2690 assert_eq!(c.to_string(), "CREATE VIEW test AS SELECT * FROM NUMBERS");
2691 assert!(!c.or_replace);
2692 assert!(!c.if_not_exists);
2693 assert_eq!("test", c.name.to_string());
2694 }
2695 _ => unreachable!(),
2696 }
2697 assert_eq!(
2698 "CREATE VIEW test AS SELECT * FROM NUMBERS",
2699 result[0].to_string()
2700 );
2701
2702 let sql = "CREATE VIEW test (n1) AS SELECT * FROM NUMBERS";
2703 let result =
2704 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2705 .unwrap();
2706
2707 match &result[0] {
2708 Statement::CreateView(c) => {
2709 assert_eq!(c.to_string(), sql);
2710 assert!(!c.or_replace);
2711 assert!(!c.if_not_exists);
2712 assert_eq!("test", c.name.to_string());
2713 }
2714 _ => unreachable!(),
2715 }
2716 assert_eq!(sql, result[0].to_string());
2717
2718 let sql = "CREATE VIEW test (n1, n2) AS SELECT * FROM NUMBERS";
2719 let result =
2720 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2721 .unwrap();
2722
2723 match &result[0] {
2724 Statement::CreateView(c) => {
2725 assert_eq!(c.to_string(), sql);
2726 assert!(!c.or_replace);
2727 assert!(!c.if_not_exists);
2728 assert_eq!("test", c.name.to_string());
2729 }
2730 _ => unreachable!(),
2731 }
2732 assert_eq!(sql, result[0].to_string());
2733
2734 let sql = "CREATE VIEW test (n1 AS select * from demo";
2736 let result =
2737 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2738 assert!(result.is_err());
2739
2740 let sql = "CREATE VIEW test (n1, AS select * from demo";
2741 let result =
2742 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2743 assert!(result.is_err());
2744
2745 let sql = "CREATE VIEW test n1,n2) AS select * from demo";
2746 let result =
2747 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2748 assert!(result.is_err());
2749
2750 let sql = "CREATE VIEW test (1) AS select * from demo";
2751 let result =
2752 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2753 assert!(result.is_err());
2754
2755 let sql = "CREATE VIEW test (n1, select) AS select * from demo";
2757 let result =
2758 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
2759 assert!(result.is_err());
2760 }
2761
2762 #[test]
2763 fn test_parse_column_extensions_vector() {
2764 let sql = "";
2766 let dialect = GenericDialect {};
2767 let mut tokenizer = Tokenizer::new(&dialect, sql);
2768 let tokens = tokenizer.tokenize().unwrap();
2769 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2770 let name = Ident::new("vec_col");
2771 let data_type =
2772 DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
2773 let mut extensions = ColumnExtensions::default();
2774
2775 let result =
2776 ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2777 assert!(result.is_ok());
2778 assert!(extensions.vector_options.is_some());
2779 let vector_options = extensions.vector_options.unwrap();
2780 assert_eq!(vector_options.get(VECTOR_OPT_DIM), Some("128"));
2781 }
2782
2783 #[test]
2784 fn test_parse_column_extensions_vector_invalid() {
2785 let sql = "";
2787 let dialect = GenericDialect {};
2788 let mut tokenizer = Tokenizer::new(&dialect, sql);
2789 let tokens = tokenizer.tokenize().unwrap();
2790 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2791 let name = Ident::new("vec_col");
2792 let data_type = DataType::Custom(vec![Ident::new("VECTOR")].into(), vec![]);
2793 let mut extensions = ColumnExtensions::default();
2794
2795 let result =
2796 ParserContext::parse_column_extensions(&mut parser, &name, &data_type, &mut extensions);
2797 assert!(result.is_err());
2798 }
2799
2800 #[test]
2801 fn test_parse_column_extensions_indices() {
2802 {
2804 let sql = "SKIPPING INDEX";
2805 let dialect = GenericDialect {};
2806 let mut tokenizer = Tokenizer::new(&dialect, sql);
2807 let tokens = tokenizer.tokenize().unwrap();
2808 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2809 let name = Ident::new("col");
2810 let data_type = DataType::String(None);
2811 let mut extensions = ColumnExtensions::default();
2812 let result = ParserContext::parse_column_extensions(
2813 &mut parser,
2814 &name,
2815 &data_type,
2816 &mut extensions,
2817 );
2818 assert!(result.is_ok());
2819 assert!(extensions.skipping_index_options.is_some());
2820 }
2821
2822 {
2824 let sql = "FULLTEXT INDEX WITH (analyzer = 'English', case_sensitive = 'true')";
2825 let dialect = GenericDialect {};
2826 let mut tokenizer = Tokenizer::new(&dialect, sql);
2827 let tokens = tokenizer.tokenize().unwrap();
2828 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2829 let name = Ident::new("text_col");
2830 let data_type = DataType::String(None);
2831 let mut extensions = ColumnExtensions::default();
2832 let result = ParserContext::parse_column_extensions(
2833 &mut parser,
2834 &name,
2835 &data_type,
2836 &mut extensions,
2837 );
2838 assert!(result.unwrap());
2839 assert!(extensions.fulltext_index_options.is_some());
2840 let fulltext_options = extensions.fulltext_index_options.unwrap();
2841 assert_eq!(fulltext_options.get("analyzer"), Some("English"));
2842 assert_eq!(fulltext_options.get("case_sensitive"), Some("true"));
2843 }
2844
2845 {
2847 let sql = "FULLTEXT INDEX WITH (analyzer = 'English')";
2848 let dialect = GenericDialect {};
2849 let mut tokenizer = Tokenizer::new(&dialect, sql);
2850 let tokens = tokenizer.tokenize().unwrap();
2851 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2852 let name = Ident::new("num_col");
2853 let data_type = DataType::Int(None); let mut extensions = ColumnExtensions::default();
2855 let result = ParserContext::parse_column_extensions(
2856 &mut parser,
2857 &name,
2858 &data_type,
2859 &mut extensions,
2860 );
2861 assert!(result.is_err());
2862 assert!(
2863 result
2864 .unwrap_err()
2865 .to_string()
2866 .contains("FULLTEXT index only supports string type")
2867 );
2868 }
2869
2870 {
2872 let sql = "FULLTEXT INDEX WITH (analyzer = 'Invalid', case_sensitive = 'true')";
2873 let dialect = GenericDialect {};
2874 let mut tokenizer = Tokenizer::new(&dialect, sql);
2875 let tokens = tokenizer.tokenize().unwrap();
2876 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2877 let name = Ident::new("text_col");
2878 let data_type = DataType::String(None);
2879 let mut extensions = ColumnExtensions::default();
2880 let result = ParserContext::parse_column_extensions(
2881 &mut parser,
2882 &name,
2883 &data_type,
2884 &mut extensions,
2885 );
2886 assert!(result.unwrap());
2887 }
2888
2889 {
2891 let sql = "INVERTED INDEX";
2892 let dialect = GenericDialect {};
2893 let mut tokenizer = Tokenizer::new(&dialect, sql);
2894 let tokens = tokenizer.tokenize().unwrap();
2895 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2896 let name = Ident::new("col");
2897 let data_type = DataType::String(None);
2898 let mut extensions = ColumnExtensions::default();
2899 let result = ParserContext::parse_column_extensions(
2900 &mut parser,
2901 &name,
2902 &data_type,
2903 &mut extensions,
2904 );
2905 assert!(result.is_ok());
2906 assert!(extensions.inverted_index_options.is_some());
2907 }
2908
2909 {
2911 let sql = "INVERTED INDEX WITH (analyzer = 'English')";
2912 let dialect = GenericDialect {};
2913 let mut tokenizer = Tokenizer::new(&dialect, sql);
2914 let tokens = tokenizer.tokenize().unwrap();
2915 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2916 let name = Ident::new("col");
2917 let data_type = DataType::String(None);
2918 let mut extensions = ColumnExtensions::default();
2919 let result = ParserContext::parse_column_extensions(
2920 &mut parser,
2921 &name,
2922 &data_type,
2923 &mut extensions,
2924 );
2925 assert!(result.is_err());
2926 assert!(
2927 result
2928 .unwrap_err()
2929 .to_string()
2930 .contains("INVERTED index doesn't support options")
2931 );
2932 }
2933
2934 {
2936 let sql = "SKIPPING INDEX FULLTEXT INDEX";
2937 let dialect = GenericDialect {};
2938 let mut tokenizer = Tokenizer::new(&dialect, sql);
2939 let tokens = tokenizer.tokenize().unwrap();
2940 let mut parser = Parser::new(&dialect).with_tokens(tokens);
2941 let name = Ident::new("col");
2942 let data_type = DataType::String(None);
2943 let mut extensions = ColumnExtensions::default();
2944 let result = ParserContext::parse_column_extensions(
2945 &mut parser,
2946 &name,
2947 &data_type,
2948 &mut extensions,
2949 );
2950 assert!(result.unwrap());
2951 assert!(extensions.skipping_index_options.is_some());
2952 assert!(extensions.fulltext_index_options.is_some());
2953 }
2954 }
2955
2956 #[test]
2957 fn test_parse_interval_cast() {
2958 let s = "select '10s'::INTERVAL";
2959 let stmts =
2960 ParserContext::create_with_dialect(s, &GreptimeDbDialect {}, ParseOptions::default())
2961 .unwrap();
2962 assert_eq!("SELECT '10 seconds'::INTERVAL", &stmts[0].to_string());
2963 }
2964
2965 #[test]
2966 fn test_parse_create_table_vector_index_options() {
2967 let sql = r"
2969CREATE TABLE vectors (
2970 ts TIMESTAMP TIME INDEX,
2971 vec VECTOR(128) VECTOR INDEX,
2972)";
2973 let result =
2974 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2975 .unwrap();
2976
2977 if let Statement::CreateTable(c) = &result[0] {
2978 c.columns.iter().for_each(|col| {
2979 if col.name().value == "vec" {
2980 assert!(
2981 col.extensions
2982 .vector_index_options
2983 .as_ref()
2984 .unwrap()
2985 .is_empty()
2986 );
2987 }
2988 });
2989 } else {
2990 panic!("should be create_table statement");
2991 }
2992
2993 let sql = r"
2995CREATE TABLE vectors (
2996 ts TIMESTAMP TIME INDEX,
2997 vec VECTOR(128) VECTOR INDEX WITH (metric='cosine', connectivity='32', expansion_add='256', expansion_search='128')
2998)";
2999 let result =
3000 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
3001 .unwrap();
3002
3003 if let Statement::CreateTable(c) = &result[0] {
3004 c.columns.iter().for_each(|col| {
3005 if col.name().value == "vec" {
3006 let options = col.extensions.vector_index_options.as_ref().unwrap();
3007 assert_eq!(options.len(), 4);
3008 assert_eq!(options.get("metric").unwrap(), "cosine");
3009 assert_eq!(options.get("connectivity").unwrap(), "32");
3010 assert_eq!(options.get("expansion_add").unwrap(), "256");
3011 assert_eq!(options.get("expansion_search").unwrap(), "128");
3012 }
3013 });
3014 } else {
3015 panic!("should be create_table statement");
3016 }
3017 }
3018
3019 #[test]
3020 fn test_parse_create_table_vector_index_invalid_type() {
3021 let sql = r"
3023CREATE TABLE vectors (
3024 ts TIMESTAMP TIME INDEX,
3025 col INT VECTOR INDEX,
3026)";
3027 let result =
3028 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3029 assert!(result.is_err());
3030 assert!(
3031 result
3032 .unwrap_err()
3033 .to_string()
3034 .contains("VECTOR INDEX only supports Vector type columns")
3035 );
3036 }
3037
3038 #[test]
3039 fn test_parse_create_table_vector_index_duplicate() {
3040 let sql = r"
3042CREATE TABLE vectors (
3043 ts TIMESTAMP TIME INDEX,
3044 vec VECTOR(128) VECTOR INDEX VECTOR INDEX,
3045)";
3046 let result =
3047 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3048 assert!(result.is_err());
3049 assert!(
3050 result
3051 .unwrap_err()
3052 .to_string()
3053 .contains("duplicated VECTOR INDEX option")
3054 );
3055 }
3056
3057 #[test]
3058 fn test_parse_create_table_vector_index_invalid_option() {
3059 let sql = r"
3061CREATE TABLE vectors (
3062 ts TIMESTAMP TIME INDEX,
3063 vec VECTOR(128) VECTOR INDEX WITH (metric='l2sq', invalid_option='foo')
3064)";
3065 let result =
3066 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
3067 assert!(result.is_err());
3068 assert!(
3069 result
3070 .unwrap_err()
3071 .to_string()
3072 .contains("invalid VECTOR INDEX option")
3073 );
3074 }
3075
3076 #[test]
3077 fn test_parse_column_extensions_vector_index() {
3078 {
3080 let sql = "VECTOR INDEX WITH (metric = 'l2sq')";
3081 let dialect = GenericDialect {};
3082 let mut tokenizer = Tokenizer::new(&dialect, sql);
3083 let tokens = tokenizer.tokenize().unwrap();
3084 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3085 let name = Ident::new("vec_col");
3086 let data_type =
3087 DataType::Custom(vec![Ident::new("VECTOR")].into(), vec!["128".to_string()]);
3088 let mut extensions = ColumnExtensions {
3090 vector_options: Some(OptionMap::from([(
3091 VECTOR_OPT_DIM.to_string(),
3092 "128".to_string(),
3093 )])),
3094 ..Default::default()
3095 };
3096
3097 let result = ParserContext::parse_column_extensions(
3098 &mut parser,
3099 &name,
3100 &data_type,
3101 &mut extensions,
3102 );
3103 assert!(result.is_ok());
3104 assert!(extensions.vector_index_options.is_some());
3105 let vi_options = extensions.vector_index_options.unwrap();
3106 assert_eq!(vi_options.get("metric"), Some("l2sq"));
3107 }
3108
3109 {
3111 let sql = "VECTOR INDEX";
3112 let dialect = GenericDialect {};
3113 let mut tokenizer = Tokenizer::new(&dialect, sql);
3114 let tokens = tokenizer.tokenize().unwrap();
3115 let mut parser = Parser::new(&dialect).with_tokens(tokens);
3116 let name = Ident::new("num_col");
3117 let data_type = DataType::Int(None); let mut extensions = ColumnExtensions::default();
3119 let result = ParserContext::parse_column_extensions(
3120 &mut parser,
3121 &name,
3122 &data_type,
3123 &mut extensions,
3124 );
3125 assert!(result.is_err());
3126 assert!(
3127 result
3128 .unwrap_err()
3129 .to_string()
3130 .contains("VECTOR INDEX only supports Vector type columns")
3131 );
3132 }
3133 }
3134}