1use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17
18use common_catalog::consts::FILE_ENGINE;
19use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
20use itertools::Itertools;
21use serde::Serialize;
22use snafu::ResultExt;
23use sqlparser::ast::{ColumnOptionDef, DataType, Expr, Query};
24use sqlparser_derive::{Visit, VisitMut};
25
26use crate::ast::{ColumnDef, Ident, ObjectName, Value as SqlValue};
27use crate::error::{
28 InvalidFlowQuerySnafu, Result, SetFulltextOptionSnafu, SetSkippingIndexOptionSnafu,
29};
30use crate::statements::OptionMap;
31use crate::statements::statement::Statement;
32use crate::statements::tql::Tql;
33
34const LINE_SEP: &str = ",\n";
35const COMMA_SEP: &str = ", ";
36const INDENT: usize = 2;
37pub const VECTOR_OPT_DIM: &str = "dim";
38
39macro_rules! format_indent {
40 ($fmt: expr, $arg: expr) => {
41 format!($fmt, format_args!("{: >1$}", "", INDENT), $arg)
42 };
43 ($arg: expr) => {
44 format_indent!("{}{}", $arg)
45 };
46}
47
48macro_rules! format_list_indent {
49 ($list: expr) => {
50 $list.iter().map(|e| format_indent!(e)).join(LINE_SEP)
51 };
52}
53
54macro_rules! format_list_comma {
55 ($list: expr) => {
56 $list.iter().map(|e| format!("{}", e)).join(COMMA_SEP)
57 };
58}
59
60#[cfg(feature = "enterprise")]
61pub mod trigger;
62
63fn format_table_constraint(constraints: &[TableConstraint]) -> String {
64 constraints.iter().map(|c| format_indent!(c)).join(LINE_SEP)
65}
66
67#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
69pub enum TableConstraint {
70 PrimaryKey { columns: Vec<Ident> },
72 TimeIndex { column: Ident },
74}
75
76impl Display for TableConstraint {
77 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
78 match self {
79 TableConstraint::PrimaryKey { columns } => {
80 write!(f, "PRIMARY KEY ({})", format_list_comma!(columns))
81 }
82 TableConstraint::TimeIndex { column } => {
83 write!(f, "TIME INDEX ({})", column)
84 }
85 }
86 }
87}
88
89#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
90pub struct CreateTable {
91 pub if_not_exists: bool,
93 pub table_id: u32,
94 pub name: ObjectName,
96 pub columns: Vec<Column>,
97 pub engine: String,
98 pub constraints: Vec<TableConstraint>,
99 pub options: OptionMap,
101 pub partitions: Option<Partitions>,
102}
103
104#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
106pub struct Column {
107 pub column_def: ColumnDef,
109 pub extensions: ColumnExtensions,
111}
112
113#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Default, Serialize)]
115pub struct ColumnExtensions {
116 pub vector_options: Option<OptionMap>,
118
119 pub fulltext_index_options: Option<OptionMap>,
121 pub skipping_index_options: Option<OptionMap>,
123 pub inverted_index_options: Option<OptionMap>,
127}
128
129impl Column {
130 pub fn name(&self) -> &Ident {
131 &self.column_def.name
132 }
133
134 pub fn data_type(&self) -> &DataType {
135 &self.column_def.data_type
136 }
137
138 pub fn mut_data_type(&mut self) -> &mut DataType {
139 &mut self.column_def.data_type
140 }
141
142 pub fn options(&self) -> &[ColumnOptionDef] {
143 &self.column_def.options
144 }
145
146 pub fn mut_options(&mut self) -> &mut Vec<ColumnOptionDef> {
147 &mut self.column_def.options
148 }
149}
150
151impl Display for Column {
152 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
153 if let Some(vector_options) = &self.extensions.vector_options
154 && let Some(dim) = vector_options.get(VECTOR_OPT_DIM)
155 {
156 write!(f, "{} VECTOR({})", self.column_def.name, dim)?;
157 return Ok(());
158 }
159
160 write!(f, "{}", self.column_def)?;
161
162 if let Some(fulltext_options) = &self.extensions.fulltext_index_options {
163 if !fulltext_options.is_empty() {
164 let options = fulltext_options.kv_pairs();
165 write!(f, " FULLTEXT INDEX WITH({})", format_list_comma!(options))?;
166 } else {
167 write!(f, " FULLTEXT INDEX")?;
168 }
169 }
170
171 if let Some(skipping_index_options) = &self.extensions.skipping_index_options {
172 if !skipping_index_options.is_empty() {
173 let options = skipping_index_options.kv_pairs();
174 write!(f, " SKIPPING INDEX WITH({})", format_list_comma!(options))?;
175 } else {
176 write!(f, " SKIPPING INDEX")?;
177 }
178 }
179
180 if let Some(inverted_index_options) = &self.extensions.inverted_index_options {
181 if !inverted_index_options.is_empty() {
182 let options = inverted_index_options.kv_pairs();
183 write!(f, " INVERTED INDEX WITH({})", format_list_comma!(options))?;
184 } else {
185 write!(f, " INVERTED INDEX")?;
186 }
187 }
188 Ok(())
189 }
190}
191
192impl ColumnExtensions {
193 pub fn build_fulltext_options(&self) -> Result<Option<FulltextOptions>> {
194 let Some(options) = self.fulltext_index_options.as_ref() else {
195 return Ok(None);
196 };
197
198 let options: HashMap<String, String> = options.clone().into_map();
199 Ok(Some(options.try_into().context(SetFulltextOptionSnafu)?))
200 }
201
202 pub fn build_skipping_index_options(&self) -> Result<Option<SkippingIndexOptions>> {
203 let Some(options) = self.skipping_index_options.as_ref() else {
204 return Ok(None);
205 };
206
207 let options: HashMap<String, String> = options.clone().into_map();
208 Ok(Some(
209 options.try_into().context(SetSkippingIndexOptionSnafu)?,
210 ))
211 }
212}
213
214#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
221pub struct Partitions {
222 pub column_list: Vec<Ident>,
223 pub exprs: Vec<Expr>,
224}
225
226impl Partitions {
227 pub fn set_quote(&mut self, quote_style: char) {
229 self.column_list
230 .iter_mut()
231 .for_each(|c| c.quote_style = Some(quote_style));
232 }
233}
234
235#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut)]
236pub struct PartitionEntry {
237 pub name: Ident,
238 pub value_list: Vec<SqlValue>,
239}
240
241impl Display for PartitionEntry {
242 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
243 write!(
244 f,
245 "PARTITION {} VALUES LESS THAN ({})",
246 self.name,
247 format_list_comma!(self.value_list),
248 )
249 }
250}
251
252impl Display for Partitions {
253 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
254 if !self.column_list.is_empty() {
255 write!(
256 f,
257 "PARTITION ON COLUMNS ({}) (\n{}\n)",
258 format_list_comma!(self.column_list),
259 format_list_indent!(self.exprs),
260 )?;
261 }
262 Ok(())
263 }
264}
265
266impl Display for CreateTable {
267 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
268 write!(f, "CREATE ")?;
269 if self.engine == FILE_ENGINE {
270 write!(f, "EXTERNAL ")?;
271 }
272 write!(f, "TABLE ")?;
273 if self.if_not_exists {
274 write!(f, "IF NOT EXISTS ")?;
275 }
276 writeln!(f, "{} (", &self.name)?;
277 writeln!(f, "{},", format_list_indent!(self.columns))?;
278 writeln!(f, "{}", format_table_constraint(&self.constraints))?;
279 writeln!(f, ")")?;
280 if let Some(partitions) = &self.partitions {
281 writeln!(f, "{partitions}")?;
282 }
283 writeln!(f, "ENGINE={}", &self.engine)?;
284 if !self.options.is_empty() {
285 let options = self.options.kv_pairs();
286 write!(f, "WITH(\n{}\n)", format_list_indent!(options))?;
287 }
288 Ok(())
289 }
290}
291
292#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
293pub struct CreateDatabase {
294 pub name: ObjectName,
295 pub if_not_exists: bool,
297 pub options: OptionMap,
298}
299
300impl CreateDatabase {
301 pub fn new(name: ObjectName, if_not_exists: bool, options: OptionMap) -> Self {
303 Self {
304 name,
305 if_not_exists,
306 options,
307 }
308 }
309}
310
311impl Display for CreateDatabase {
312 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
313 write!(f, "CREATE DATABASE ")?;
314 if self.if_not_exists {
315 write!(f, "IF NOT EXISTS ")?;
316 }
317 write!(f, "{}", &self.name)?;
318 if !self.options.is_empty() {
319 let options = self.options.kv_pairs();
320 write!(f, "\nWITH(\n{}\n)", format_list_indent!(options))?;
321 }
322 Ok(())
323 }
324}
325
326#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
327pub struct CreateExternalTable {
328 pub name: ObjectName,
330 pub columns: Vec<Column>,
331 pub constraints: Vec<TableConstraint>,
332 pub options: OptionMap,
334 pub if_not_exists: bool,
335 pub engine: String,
336}
337
338impl Display for CreateExternalTable {
339 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
340 write!(f, "CREATE EXTERNAL TABLE ")?;
341 if self.if_not_exists {
342 write!(f, "IF NOT EXISTS ")?;
343 }
344 writeln!(f, "{} (", &self.name)?;
345 writeln!(f, "{},", format_list_indent!(self.columns))?;
346 writeln!(f, "{}", format_table_constraint(&self.constraints))?;
347 writeln!(f, ")")?;
348 writeln!(f, "ENGINE={}", &self.engine)?;
349 if !self.options.is_empty() {
350 let options = self.options.kv_pairs();
351 write!(f, "WITH(\n{}\n)", format_list_indent!(options))?;
352 }
353 Ok(())
354 }
355}
356
357#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
358pub struct CreateTableLike {
359 pub table_name: ObjectName,
361 pub source_name: ObjectName,
363}
364
365impl Display for CreateTableLike {
366 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
367 let table_name = &self.table_name;
368 let source_name = &self.source_name;
369 write!(f, r#"CREATE TABLE {table_name} LIKE {source_name}"#)
370 }
371}
372
373#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
374pub struct CreateFlow {
375 pub flow_name: ObjectName,
377 pub sink_table_name: ObjectName,
379 pub or_replace: bool,
381 pub if_not_exists: bool,
383 pub expire_after: Option<i64>,
386 pub eval_interval: Option<i64>,
390 pub comment: Option<String>,
392 pub query: Box<SqlOrTql>,
394}
395
396#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
398pub enum SqlOrTql {
399 Sql(Query, String),
400 Tql(Tql, String),
401}
402
403impl std::fmt::Display for SqlOrTql {
404 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
405 match self {
406 Self::Sql(_, s) => write!(f, "{}", s),
407 Self::Tql(_, s) => write!(f, "{}", s),
408 }
409 }
410}
411
412impl SqlOrTql {
413 pub fn try_from_statement(
414 value: Statement,
415 original_query: &str,
416 ) -> std::result::Result<Self, crate::error::Error> {
417 match value {
418 Statement::Query(query) => {
419 Ok(Self::Sql((*query).try_into()?, original_query.to_string()))
420 }
421 Statement::Tql(tql) => Ok(Self::Tql(tql, original_query.to_string())),
422 _ => InvalidFlowQuerySnafu {
423 reason: format!("Expect either sql query or promql query, found {:?}", value),
424 }
425 .fail(),
426 }
427 }
428}
429
430impl Display for CreateFlow {
431 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
432 write!(f, "CREATE ")?;
433 if self.or_replace {
434 write!(f, "OR REPLACE ")?;
435 }
436 write!(f, "FLOW ")?;
437 if self.if_not_exists {
438 write!(f, "IF NOT EXISTS ")?;
439 }
440 writeln!(f, "{}", &self.flow_name)?;
441 writeln!(f, "SINK TO {}", &self.sink_table_name)?;
442 if let Some(expire_after) = &self.expire_after {
443 writeln!(f, "EXPIRE AFTER '{} s'", expire_after)?;
444 }
445 if let Some(eval_interval) = &self.eval_interval {
446 writeln!(f, "EVAL INTERVAL '{} s'", eval_interval)?;
447 }
448 if let Some(comment) = &self.comment {
449 writeln!(f, "COMMENT '{}'", comment)?;
450 }
451 write!(f, "AS {}", &self.query)
452 }
453}
454
455#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
457pub struct CreateView {
458 pub name: ObjectName,
460 pub columns: Vec<Ident>,
462 pub query: Box<Statement>,
465 pub or_replace: bool,
467 pub if_not_exists: bool,
469}
470
471impl Display for CreateView {
472 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
473 write!(f, "CREATE ")?;
474 if self.or_replace {
475 write!(f, "OR REPLACE ")?;
476 }
477 write!(f, "VIEW ")?;
478 if self.if_not_exists {
479 write!(f, "IF NOT EXISTS ")?;
480 }
481 write!(f, "{} ", &self.name)?;
482 if !self.columns.is_empty() {
483 write!(f, "({}) ", format_list_comma!(self.columns))?;
484 }
485 write!(f, "AS {}", &self.query)
486 }
487}
488
489#[cfg(test)]
490mod tests {
491 use std::assert_matches::assert_matches;
492
493 use crate::dialect::GreptimeDbDialect;
494 use crate::error::Error;
495 use crate::parser::{ParseOptions, ParserContext};
496 use crate::statements::statement::Statement;
497
498 #[test]
499 fn test_display_create_table() {
500 let sql = r"create table if not exists demo(
501 host string,
502 ts timestamp,
503 cpu double default 0,
504 memory double,
505 TIME INDEX (ts),
506 PRIMARY KEY(host)
507 )
508 PARTITION ON COLUMNS (host) (
509 host = 'a',
510 host > 'a',
511 )
512 engine=mito
513 with(ttl='7d', storage='File');
514 ";
515 let result =
516 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
517 .unwrap();
518 assert_eq!(1, result.len());
519
520 match &result[0] {
521 Statement::CreateTable(c) => {
522 let new_sql = format!("\n{}", c);
523 assert_eq!(
524 r#"
525CREATE TABLE IF NOT EXISTS demo (
526 host STRING,
527 ts TIMESTAMP,
528 cpu DOUBLE DEFAULT 0,
529 memory DOUBLE,
530 TIME INDEX (ts),
531 PRIMARY KEY (host)
532)
533PARTITION ON COLUMNS (host) (
534 host = 'a',
535 host > 'a'
536)
537ENGINE=mito
538WITH(
539 storage = 'File',
540 ttl = '7d'
541)"#,
542 &new_sql
543 );
544
545 let new_result = ParserContext::create_with_dialect(
546 &new_sql,
547 &GreptimeDbDialect {},
548 ParseOptions::default(),
549 )
550 .unwrap();
551 assert_eq!(result, new_result);
552 }
553 _ => unreachable!(),
554 }
555 }
556
557 #[test]
558 fn test_display_empty_partition_column() {
559 let sql = r"create table if not exists demo(
560 host string,
561 ts timestamp,
562 cpu double default 0,
563 memory double,
564 TIME INDEX (ts),
565 PRIMARY KEY(ts, host)
566 );
567 ";
568 let result =
569 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
570 .unwrap();
571 assert_eq!(1, result.len());
572
573 match &result[0] {
574 Statement::CreateTable(c) => {
575 let new_sql = format!("\n{}", c);
576 assert_eq!(
577 r#"
578CREATE TABLE IF NOT EXISTS demo (
579 host STRING,
580 ts TIMESTAMP,
581 cpu DOUBLE DEFAULT 0,
582 memory DOUBLE,
583 TIME INDEX (ts),
584 PRIMARY KEY (ts, host)
585)
586ENGINE=mito
587"#,
588 &new_sql
589 );
590
591 let new_result = ParserContext::create_with_dialect(
592 &new_sql,
593 &GreptimeDbDialect {},
594 ParseOptions::default(),
595 )
596 .unwrap();
597 assert_eq!(result, new_result);
598 }
599 _ => unreachable!(),
600 }
601 }
602
603 #[test]
604 fn test_validate_table_options() {
605 let sql = r"create table if not exists demo(
606 host string,
607 ts timestamp,
608 cpu double default 0,
609 memory double,
610 TIME INDEX (ts),
611 PRIMARY KEY(host)
612 )
613 PARTITION ON COLUMNS (host) ()
614 engine=mito
615 with(ttl='7d', 'compaction.type'='world');
616";
617 let result =
618 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
619 .unwrap();
620 match &result[0] {
621 Statement::CreateTable(c) => {
622 assert_eq!(2, c.options.len());
623 }
624 _ => unreachable!(),
625 }
626
627 let sql = r"create table if not exists demo(
628 host string,
629 ts timestamp,
630 cpu double default 0,
631 memory double,
632 TIME INDEX (ts),
633 PRIMARY KEY(host)
634 )
635 PARTITION ON COLUMNS (host) ()
636 engine=mito
637 with(ttl='7d', hello='world');
638";
639 let result =
640 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
641 assert_matches!(result, Err(Error::InvalidTableOption { .. }))
642 }
643
644 #[test]
645 fn test_display_create_database() {
646 let sql = r"create database test;";
647 let stmts =
648 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
649 .unwrap();
650 assert_eq!(1, stmts.len());
651 assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
652
653 match &stmts[0] {
654 Statement::CreateDatabase(set) => {
655 let new_sql = format!("\n{}", set);
656 assert_eq!(
657 r#"
658CREATE DATABASE test"#,
659 &new_sql
660 );
661 }
662 _ => {
663 unreachable!();
664 }
665 }
666
667 let sql = r"create database if not exists test;";
668 let stmts =
669 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
670 .unwrap();
671 assert_eq!(1, stmts.len());
672 assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
673
674 match &stmts[0] {
675 Statement::CreateDatabase(set) => {
676 let new_sql = format!("\n{}", set);
677 assert_eq!(
678 r#"
679CREATE DATABASE IF NOT EXISTS test"#,
680 &new_sql
681 );
682 }
683 _ => {
684 unreachable!();
685 }
686 }
687
688 let sql = r#"CREATE DATABASE IF NOT EXISTS test WITH (ttl='1h');"#;
689 let stmts =
690 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
691 .unwrap();
692 assert_eq!(1, stmts.len());
693 assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
694
695 match &stmts[0] {
696 Statement::CreateDatabase(set) => {
697 let new_sql = format!("\n{}", set);
698 assert_eq!(
699 r#"
700CREATE DATABASE IF NOT EXISTS test
701WITH(
702 ttl = '1h'
703)"#,
704 &new_sql
705 );
706 }
707 _ => {
708 unreachable!();
709 }
710 }
711 }
712
713 #[test]
714 fn test_display_create_table_like() {
715 let sql = r"create table t2 like t1;";
716 let stmts =
717 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
718 .unwrap();
719 assert_eq!(1, stmts.len());
720 assert_matches!(&stmts[0], Statement::CreateTableLike { .. });
721
722 match &stmts[0] {
723 Statement::CreateTableLike(create) => {
724 let new_sql = format!("\n{}", create);
725 assert_eq!(
726 r#"
727CREATE TABLE t2 LIKE t1"#,
728 &new_sql
729 );
730 }
731 _ => {
732 unreachable!();
733 }
734 }
735 }
736
737 #[test]
738 fn test_display_create_external_table() {
739 let sql = r#"CREATE EXTERNAL TABLE city (
740 host string,
741 ts timestamp,
742 cpu float64 default 0,
743 memory float64,
744 TIME INDEX (ts),
745 PRIMARY KEY(host)
746) WITH (location='/var/data/city.csv', format='csv');"#;
747 let stmts =
748 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
749 .unwrap();
750 assert_eq!(1, stmts.len());
751 assert_matches!(&stmts[0], Statement::CreateExternalTable { .. });
752
753 match &stmts[0] {
754 Statement::CreateExternalTable(create) => {
755 let new_sql = format!("\n{}", create);
756 assert_eq!(
757 r#"
758CREATE EXTERNAL TABLE city (
759 host STRING,
760 ts TIMESTAMP,
761 cpu DOUBLE DEFAULT 0,
762 memory DOUBLE,
763 TIME INDEX (ts),
764 PRIMARY KEY (host)
765)
766ENGINE=file
767WITH(
768 format = 'csv',
769 location = '/var/data/city.csv'
770)"#,
771 &new_sql
772 );
773 }
774 _ => {
775 unreachable!();
776 }
777 }
778 }
779
780 #[test]
781 fn test_display_create_flow() {
782 let sql = r"CREATE FLOW filter_numbers
783 SINK TO out_num_cnt
784 AS SELECT number FROM numbers_input where number > 10;";
785 let result =
786 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
787 .unwrap();
788 assert_eq!(1, result.len());
789
790 match &result[0] {
791 Statement::CreateFlow(c) => {
792 let new_sql = format!("\n{}", c);
793 assert_eq!(
794 r#"
795CREATE FLOW filter_numbers
796SINK TO out_num_cnt
797AS SELECT number FROM numbers_input where number > 10"#,
798 &new_sql
799 );
800
801 let new_result = ParserContext::create_with_dialect(
802 &new_sql,
803 &GreptimeDbDialect {},
804 ParseOptions::default(),
805 )
806 .unwrap();
807 assert_eq!(result, new_result);
808 }
809 _ => unreachable!(),
810 }
811 }
812}