1use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17
18use common_catalog::consts::FILE_ENGINE;
19use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
20use itertools::Itertools;
21use serde::Serialize;
22use snafu::ResultExt;
23use sqlparser::ast::{ColumnOptionDef, DataType, Expr, Query};
24use sqlparser_derive::{Visit, VisitMut};
25
26use crate::ast::{ColumnDef, Ident, ObjectName, Value as SqlValue};
27use crate::error::{
28 InvalidFlowQuerySnafu, Result, SetFulltextOptionSnafu, SetSkippingIndexOptionSnafu,
29};
30use crate::statements::statement::Statement;
31use crate::statements::tql::Tql;
32use crate::statements::OptionMap;
33
34const LINE_SEP: &str = ",\n";
35const COMMA_SEP: &str = ", ";
36const INDENT: usize = 2;
37pub const VECTOR_OPT_DIM: &str = "dim";
38
39macro_rules! format_indent {
40 ($fmt: expr, $arg: expr) => {
41 format!($fmt, format_args!("{: >1$}", "", INDENT), $arg)
42 };
43 ($arg: expr) => {
44 format_indent!("{}{}", $arg)
45 };
46}
47
48macro_rules! format_list_indent {
49 ($list: expr) => {
50 $list.iter().map(|e| format_indent!(e)).join(LINE_SEP)
51 };
52}
53
54macro_rules! format_list_comma {
55 ($list: expr) => {
56 $list.iter().map(|e| format!("{}", e)).join(COMMA_SEP)
57 };
58}
59
60#[cfg(feature = "enterprise")]
61pub mod trigger;
62
63fn format_table_constraint(constraints: &[TableConstraint]) -> String {
64 constraints.iter().map(|c| format_indent!(c)).join(LINE_SEP)
65}
66
67#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
69pub enum TableConstraint {
70 PrimaryKey { columns: Vec<Ident> },
72 TimeIndex { column: Ident },
74}
75
76impl Display for TableConstraint {
77 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
78 match self {
79 TableConstraint::PrimaryKey { columns } => {
80 write!(f, "PRIMARY KEY ({})", format_list_comma!(columns))
81 }
82 TableConstraint::TimeIndex { column } => {
83 write!(f, "TIME INDEX ({})", column)
84 }
85 }
86 }
87}
88
89#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
90pub struct CreateTable {
91 pub if_not_exists: bool,
93 pub table_id: u32,
94 pub name: ObjectName,
96 pub columns: Vec<Column>,
97 pub engine: String,
98 pub constraints: Vec<TableConstraint>,
99 pub options: OptionMap,
101 pub partitions: Option<Partitions>,
102}
103
104#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
106pub struct Column {
107 pub column_def: ColumnDef,
109 pub extensions: ColumnExtensions,
111}
112
113#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Default, Serialize)]
115pub struct ColumnExtensions {
116 pub vector_options: Option<OptionMap>,
118
119 pub fulltext_index_options: Option<OptionMap>,
121 pub skipping_index_options: Option<OptionMap>,
123 pub inverted_index_options: Option<OptionMap>,
127}
128
129impl Column {
130 pub fn name(&self) -> &Ident {
131 &self.column_def.name
132 }
133
134 pub fn data_type(&self) -> &DataType {
135 &self.column_def.data_type
136 }
137
138 pub fn mut_data_type(&mut self) -> &mut DataType {
139 &mut self.column_def.data_type
140 }
141
142 pub fn options(&self) -> &[ColumnOptionDef] {
143 &self.column_def.options
144 }
145
146 pub fn mut_options(&mut self) -> &mut Vec<ColumnOptionDef> {
147 &mut self.column_def.options
148 }
149}
150
151impl Display for Column {
152 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
153 if let Some(vector_options) = &self.extensions.vector_options {
154 if let Some(dim) = vector_options.get(VECTOR_OPT_DIM) {
155 write!(f, "{} VECTOR({})", self.column_def.name, dim)?;
156 return Ok(());
157 }
158 }
159
160 write!(f, "{}", self.column_def)?;
161
162 if let Some(fulltext_options) = &self.extensions.fulltext_index_options {
163 if !fulltext_options.is_empty() {
164 let options = fulltext_options.kv_pairs();
165 write!(f, " FULLTEXT INDEX WITH({})", format_list_comma!(options))?;
166 } else {
167 write!(f, " FULLTEXT INDEX")?;
168 }
169 }
170
171 if let Some(skipping_index_options) = &self.extensions.skipping_index_options {
172 if !skipping_index_options.is_empty() {
173 let options = skipping_index_options.kv_pairs();
174 write!(f, " SKIPPING INDEX WITH({})", format_list_comma!(options))?;
175 } else {
176 write!(f, " SKIPPING INDEX")?;
177 }
178 }
179
180 if let Some(inverted_index_options) = &self.extensions.inverted_index_options {
181 if !inverted_index_options.is_empty() {
182 let options = inverted_index_options.kv_pairs();
183 write!(f, " INVERTED INDEX WITH({})", format_list_comma!(options))?;
184 } else {
185 write!(f, " INVERTED INDEX")?;
186 }
187 }
188 Ok(())
189 }
190}
191
192impl ColumnExtensions {
193 pub fn build_fulltext_options(&self) -> Result<Option<FulltextOptions>> {
194 let Some(options) = self.fulltext_index_options.as_ref() else {
195 return Ok(None);
196 };
197
198 let options: HashMap<String, String> = options.clone().into_map();
199 Ok(Some(options.try_into().context(SetFulltextOptionSnafu)?))
200 }
201
202 pub fn build_skipping_index_options(&self) -> Result<Option<SkippingIndexOptions>> {
203 let Some(options) = self.skipping_index_options.as_ref() else {
204 return Ok(None);
205 };
206
207 let options: HashMap<String, String> = options.clone().into_map();
208 Ok(Some(
209 options.try_into().context(SetSkippingIndexOptionSnafu)?,
210 ))
211 }
212}
213
214#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
221pub struct Partitions {
222 pub column_list: Vec<Ident>,
223 pub exprs: Vec<Expr>,
224}
225
226impl Partitions {
227 pub fn set_quote(&mut self, quote_style: char) {
229 self.column_list
230 .iter_mut()
231 .for_each(|c| c.quote_style = Some(quote_style));
232 }
233}
234
235#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut)]
236pub struct PartitionEntry {
237 pub name: Ident,
238 pub value_list: Vec<SqlValue>,
239}
240
241impl Display for PartitionEntry {
242 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
243 write!(
244 f,
245 "PARTITION {} VALUES LESS THAN ({})",
246 self.name,
247 format_list_comma!(self.value_list),
248 )
249 }
250}
251
252impl Display for Partitions {
253 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
254 if !self.column_list.is_empty() {
255 write!(
256 f,
257 "PARTITION ON COLUMNS ({}) (\n{}\n)",
258 format_list_comma!(self.column_list),
259 format_list_indent!(self.exprs),
260 )?;
261 }
262 Ok(())
263 }
264}
265
266impl Display for CreateTable {
267 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
268 write!(f, "CREATE ")?;
269 if self.engine == FILE_ENGINE {
270 write!(f, "EXTERNAL ")?;
271 }
272 write!(f, "TABLE ")?;
273 if self.if_not_exists {
274 write!(f, "IF NOT EXISTS ")?;
275 }
276 writeln!(f, "{} (", &self.name)?;
277 writeln!(f, "{},", format_list_indent!(self.columns))?;
278 writeln!(f, "{}", format_table_constraint(&self.constraints))?;
279 writeln!(f, ")")?;
280 if let Some(partitions) = &self.partitions {
281 writeln!(f, "{partitions}")?;
282 }
283 writeln!(f, "ENGINE={}", &self.engine)?;
284 if !self.options.is_empty() {
285 let options = self.options.kv_pairs();
286 write!(f, "WITH(\n{}\n)", format_list_indent!(options))?;
287 }
288 Ok(())
289 }
290}
291
292#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
293pub struct CreateDatabase {
294 pub name: ObjectName,
295 pub if_not_exists: bool,
297 pub options: OptionMap,
298}
299
300impl CreateDatabase {
301 pub fn new(name: ObjectName, if_not_exists: bool, options: OptionMap) -> Self {
303 Self {
304 name,
305 if_not_exists,
306 options,
307 }
308 }
309}
310
311impl Display for CreateDatabase {
312 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
313 write!(f, "CREATE DATABASE ")?;
314 if self.if_not_exists {
315 write!(f, "IF NOT EXISTS ")?;
316 }
317 write!(f, "{}", &self.name)?;
318 if !self.options.is_empty() {
319 let options = self.options.kv_pairs();
320 write!(f, "\nWITH(\n{}\n)", format_list_indent!(options))?;
321 }
322 Ok(())
323 }
324}
325
326#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
327pub struct CreateExternalTable {
328 pub name: ObjectName,
330 pub columns: Vec<Column>,
331 pub constraints: Vec<TableConstraint>,
332 pub options: OptionMap,
334 pub if_not_exists: bool,
335 pub engine: String,
336}
337
338impl Display for CreateExternalTable {
339 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
340 write!(f, "CREATE EXTERNAL TABLE ")?;
341 if self.if_not_exists {
342 write!(f, "IF NOT EXISTS ")?;
343 }
344 writeln!(f, "{} (", &self.name)?;
345 writeln!(f, "{},", format_list_indent!(self.columns))?;
346 writeln!(f, "{}", format_table_constraint(&self.constraints))?;
347 writeln!(f, ")")?;
348 writeln!(f, "ENGINE={}", &self.engine)?;
349 if !self.options.is_empty() {
350 let options = self.options.kv_pairs();
351 write!(f, "WITH(\n{}\n)", format_list_indent!(options))?;
352 }
353 Ok(())
354 }
355}
356
357#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
358pub struct CreateTableLike {
359 pub table_name: ObjectName,
361 pub source_name: ObjectName,
363}
364
365impl Display for CreateTableLike {
366 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
367 let table_name = &self.table_name;
368 let source_name = &self.source_name;
369 write!(f, r#"CREATE TABLE {table_name} LIKE {source_name}"#)
370 }
371}
372
373#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
374pub struct CreateFlow {
375 pub flow_name: ObjectName,
377 pub sink_table_name: ObjectName,
379 pub or_replace: bool,
381 pub if_not_exists: bool,
383 pub expire_after: Option<i64>,
386 pub comment: Option<String>,
388 pub query: Box<SqlOrTql>,
390}
391
392#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
394pub enum SqlOrTql {
395 Sql(Query, String),
396 Tql(Tql, String),
397}
398
399impl std::fmt::Display for SqlOrTql {
400 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
401 match self {
402 Self::Sql(_, s) => write!(f, "{}", s),
403 Self::Tql(_, s) => write!(f, "{}", s),
404 }
405 }
406}
407
408impl SqlOrTql {
409 pub fn try_from_statement(
410 value: Statement,
411 original_query: &str,
412 ) -> std::result::Result<Self, crate::error::Error> {
413 match value {
414 Statement::Query(query) => {
415 Ok(Self::Sql((*query).try_into()?, original_query.to_string()))
416 }
417 Statement::Tql(tql) => Ok(Self::Tql(tql, original_query.to_string())),
418 _ => InvalidFlowQuerySnafu {
419 reason: format!("Expect either sql query or promql query, found {:?}", value),
420 }
421 .fail(),
422 }
423 }
424}
425
426impl Display for CreateFlow {
427 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
428 write!(f, "CREATE ")?;
429 if self.or_replace {
430 write!(f, "OR REPLACE ")?;
431 }
432 write!(f, "FLOW ")?;
433 if self.if_not_exists {
434 write!(f, "IF NOT EXISTS ")?;
435 }
436 writeln!(f, "{}", &self.flow_name)?;
437 writeln!(f, "SINK TO {}", &self.sink_table_name)?;
438 if let Some(expire_after) = &self.expire_after {
439 writeln!(f, "EXPIRE AFTER {} ", expire_after)?;
440 }
441 if let Some(comment) = &self.comment {
442 writeln!(f, "COMMENT '{}'", comment)?;
443 }
444 write!(f, "AS {}", &self.query)
445 }
446}
447
448#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
450pub struct CreateView {
451 pub name: ObjectName,
453 pub columns: Vec<Ident>,
455 pub query: Box<Statement>,
458 pub or_replace: bool,
460 pub if_not_exists: bool,
462}
463
464impl Display for CreateView {
465 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
466 write!(f, "CREATE ")?;
467 if self.or_replace {
468 write!(f, "OR REPLACE ")?;
469 }
470 write!(f, "VIEW ")?;
471 if self.if_not_exists {
472 write!(f, "IF NOT EXISTS ")?;
473 }
474 write!(f, "{} ", &self.name)?;
475 if !self.columns.is_empty() {
476 write!(f, "({}) ", format_list_comma!(self.columns))?;
477 }
478 write!(f, "AS {}", &self.query)
479 }
480}
481
482#[cfg(test)]
483mod tests {
484 use std::assert_matches::assert_matches;
485
486 use crate::dialect::GreptimeDbDialect;
487 use crate::error::Error;
488 use crate::parser::{ParseOptions, ParserContext};
489 use crate::statements::statement::Statement;
490
491 #[test]
492 fn test_display_create_table() {
493 let sql = r"create table if not exists demo(
494 host string,
495 ts timestamp,
496 cpu double default 0,
497 memory double,
498 TIME INDEX (ts),
499 PRIMARY KEY(host)
500 )
501 PARTITION ON COLUMNS (host) (
502 host = 'a',
503 host > 'a',
504 )
505 engine=mito
506 with(ttl='7d', storage='File');
507 ";
508 let result =
509 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
510 .unwrap();
511 assert_eq!(1, result.len());
512
513 match &result[0] {
514 Statement::CreateTable(c) => {
515 let new_sql = format!("\n{}", c);
516 assert_eq!(
517 r#"
518CREATE TABLE IF NOT EXISTS demo (
519 host STRING,
520 ts TIMESTAMP,
521 cpu DOUBLE DEFAULT 0,
522 memory DOUBLE,
523 TIME INDEX (ts),
524 PRIMARY KEY (host)
525)
526PARTITION ON COLUMNS (host) (
527 host = 'a',
528 host > 'a'
529)
530ENGINE=mito
531WITH(
532 storage = 'File',
533 ttl = '7d'
534)"#,
535 &new_sql
536 );
537
538 let new_result = ParserContext::create_with_dialect(
539 &new_sql,
540 &GreptimeDbDialect {},
541 ParseOptions::default(),
542 )
543 .unwrap();
544 assert_eq!(result, new_result);
545 }
546 _ => unreachable!(),
547 }
548 }
549
550 #[test]
551 fn test_display_empty_partition_column() {
552 let sql = r"create table if not exists demo(
553 host string,
554 ts timestamp,
555 cpu double default 0,
556 memory double,
557 TIME INDEX (ts),
558 PRIMARY KEY(ts, host)
559 );
560 ";
561 let result =
562 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
563 .unwrap();
564 assert_eq!(1, result.len());
565
566 match &result[0] {
567 Statement::CreateTable(c) => {
568 let new_sql = format!("\n{}", c);
569 assert_eq!(
570 r#"
571CREATE TABLE IF NOT EXISTS demo (
572 host STRING,
573 ts TIMESTAMP,
574 cpu DOUBLE DEFAULT 0,
575 memory DOUBLE,
576 TIME INDEX (ts),
577 PRIMARY KEY (ts, host)
578)
579ENGINE=mito
580"#,
581 &new_sql
582 );
583
584 let new_result = ParserContext::create_with_dialect(
585 &new_sql,
586 &GreptimeDbDialect {},
587 ParseOptions::default(),
588 )
589 .unwrap();
590 assert_eq!(result, new_result);
591 }
592 _ => unreachable!(),
593 }
594 }
595
596 #[test]
597 fn test_validate_table_options() {
598 let sql = r"create table if not exists demo(
599 host string,
600 ts timestamp,
601 cpu double default 0,
602 memory double,
603 TIME INDEX (ts),
604 PRIMARY KEY(host)
605 )
606 PARTITION ON COLUMNS (host) ()
607 engine=mito
608 with(ttl='7d', 'compaction.type'='world');
609";
610 let result =
611 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
612 .unwrap();
613 match &result[0] {
614 Statement::CreateTable(c) => {
615 assert_eq!(2, c.options.len());
616 }
617 _ => unreachable!(),
618 }
619
620 let sql = r"create table if not exists demo(
621 host string,
622 ts timestamp,
623 cpu double default 0,
624 memory double,
625 TIME INDEX (ts),
626 PRIMARY KEY(host)
627 )
628 PARTITION ON COLUMNS (host) ()
629 engine=mito
630 with(ttl='7d', hello='world');
631";
632 let result =
633 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
634 assert_matches!(result, Err(Error::InvalidTableOption { .. }))
635 }
636
637 #[test]
638 fn test_display_create_database() {
639 let sql = r"create database test;";
640 let stmts =
641 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
642 .unwrap();
643 assert_eq!(1, stmts.len());
644 assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
645
646 match &stmts[0] {
647 Statement::CreateDatabase(set) => {
648 let new_sql = format!("\n{}", set);
649 assert_eq!(
650 r#"
651CREATE DATABASE test"#,
652 &new_sql
653 );
654 }
655 _ => {
656 unreachable!();
657 }
658 }
659
660 let sql = r"create database if not exists test;";
661 let stmts =
662 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
663 .unwrap();
664 assert_eq!(1, stmts.len());
665 assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
666
667 match &stmts[0] {
668 Statement::CreateDatabase(set) => {
669 let new_sql = format!("\n{}", set);
670 assert_eq!(
671 r#"
672CREATE DATABASE IF NOT EXISTS test"#,
673 &new_sql
674 );
675 }
676 _ => {
677 unreachable!();
678 }
679 }
680
681 let sql = r#"CREATE DATABASE IF NOT EXISTS test WITH (ttl='1h');"#;
682 let stmts =
683 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
684 .unwrap();
685 assert_eq!(1, stmts.len());
686 assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
687
688 match &stmts[0] {
689 Statement::CreateDatabase(set) => {
690 let new_sql = format!("\n{}", set);
691 assert_eq!(
692 r#"
693CREATE DATABASE IF NOT EXISTS test
694WITH(
695 ttl = '1h'
696)"#,
697 &new_sql
698 );
699 }
700 _ => {
701 unreachable!();
702 }
703 }
704 }
705
706 #[test]
707 fn test_display_create_table_like() {
708 let sql = r"create table t2 like t1;";
709 let stmts =
710 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
711 .unwrap();
712 assert_eq!(1, stmts.len());
713 assert_matches!(&stmts[0], Statement::CreateTableLike { .. });
714
715 match &stmts[0] {
716 Statement::CreateTableLike(create) => {
717 let new_sql = format!("\n{}", create);
718 assert_eq!(
719 r#"
720CREATE TABLE t2 LIKE t1"#,
721 &new_sql
722 );
723 }
724 _ => {
725 unreachable!();
726 }
727 }
728 }
729
730 #[test]
731 fn test_display_create_external_table() {
732 let sql = r#"CREATE EXTERNAL TABLE city (
733 host string,
734 ts timestamp,
735 cpu float64 default 0,
736 memory float64,
737 TIME INDEX (ts),
738 PRIMARY KEY(host)
739) WITH (location='/var/data/city.csv', format='csv');"#;
740 let stmts =
741 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
742 .unwrap();
743 assert_eq!(1, stmts.len());
744 assert_matches!(&stmts[0], Statement::CreateExternalTable { .. });
745
746 match &stmts[0] {
747 Statement::CreateExternalTable(create) => {
748 let new_sql = format!("\n{}", create);
749 assert_eq!(
750 r#"
751CREATE EXTERNAL TABLE city (
752 host STRING,
753 ts TIMESTAMP,
754 cpu DOUBLE DEFAULT 0,
755 memory DOUBLE,
756 TIME INDEX (ts),
757 PRIMARY KEY (host)
758)
759ENGINE=file
760WITH(
761 format = 'csv',
762 location = '/var/data/city.csv'
763)"#,
764 &new_sql
765 );
766 }
767 _ => {
768 unreachable!();
769 }
770 }
771 }
772
773 #[test]
774 fn test_display_create_flow() {
775 let sql = r"CREATE FLOW filter_numbers
776 SINK TO out_num_cnt
777 AS SELECT number FROM numbers_input where number > 10;";
778 let result =
779 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
780 .unwrap();
781 assert_eq!(1, result.len());
782
783 match &result[0] {
784 Statement::CreateFlow(c) => {
785 let new_sql = format!("\n{}", c);
786 assert_eq!(
787 r#"
788CREATE FLOW filter_numbers
789SINK TO out_num_cnt
790AS SELECT number FROM numbers_input where number > 10"#,
791 &new_sql
792 );
793
794 let new_result = ParserContext::create_with_dialect(
795 &new_sql,
796 &GreptimeDbDialect {},
797 ParseOptions::default(),
798 )
799 .unwrap();
800 assert_eq!(result, new_result);
801 }
802 _ => unreachable!(),
803 }
804 }
805}