1use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17
18use common_catalog::consts::FILE_ENGINE;
19use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
20use itertools::Itertools;
21use serde::Serialize;
22use snafu::ResultExt;
23use sqlparser::ast::{ColumnOptionDef, DataType, Expr, Query};
24use sqlparser_derive::{Visit, VisitMut};
25
26use crate::ast::{ColumnDef, Ident, ObjectName, Value as SqlValue};
27use crate::error::{Result, SetFulltextOptionSnafu, SetSkippingIndexOptionSnafu};
28use crate::statements::statement::Statement;
29use crate::statements::OptionMap;
30
31const LINE_SEP: &str = ",\n";
32const COMMA_SEP: &str = ", ";
33const INDENT: usize = 2;
34pub const VECTOR_OPT_DIM: &str = "dim";
35
36macro_rules! format_indent {
37 ($fmt: expr, $arg: expr) => {
38 format!($fmt, format_args!("{: >1$}", "", INDENT), $arg)
39 };
40 ($arg: expr) => {
41 format_indent!("{}{}", $arg)
42 };
43}
44
45macro_rules! format_list_indent {
46 ($list: expr) => {
47 $list.iter().map(|e| format_indent!(e)).join(LINE_SEP)
48 };
49}
50
51macro_rules! format_list_comma {
52 ($list: expr) => {
53 $list.iter().map(|e| format!("{}", e)).join(COMMA_SEP)
54 };
55}
56
57fn format_table_constraint(constraints: &[TableConstraint]) -> String {
58 constraints.iter().map(|c| format_indent!(c)).join(LINE_SEP)
59}
60
61#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
63pub enum TableConstraint {
64 PrimaryKey { columns: Vec<Ident> },
66 TimeIndex { column: Ident },
68}
69
70impl Display for TableConstraint {
71 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
72 match self {
73 TableConstraint::PrimaryKey { columns } => {
74 write!(f, "PRIMARY KEY ({})", format_list_comma!(columns))
75 }
76 TableConstraint::TimeIndex { column } => {
77 write!(f, "TIME INDEX ({})", column)
78 }
79 }
80 }
81}
82
83#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
84pub struct CreateTable {
85 pub if_not_exists: bool,
87 pub table_id: u32,
88 pub name: ObjectName,
90 pub columns: Vec<Column>,
91 pub engine: String,
92 pub constraints: Vec<TableConstraint>,
93 pub options: OptionMap,
95 pub partitions: Option<Partitions>,
96}
97
98#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
100pub struct Column {
101 pub column_def: ColumnDef,
103 pub extensions: ColumnExtensions,
105}
106
107#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Default, Serialize)]
109pub struct ColumnExtensions {
110 pub vector_options: Option<OptionMap>,
112
113 pub fulltext_index_options: Option<OptionMap>,
115 pub skipping_index_options: Option<OptionMap>,
117 pub inverted_index_options: Option<OptionMap>,
121}
122
123impl Column {
124 pub fn name(&self) -> &Ident {
125 &self.column_def.name
126 }
127
128 pub fn data_type(&self) -> &DataType {
129 &self.column_def.data_type
130 }
131
132 pub fn mut_data_type(&mut self) -> &mut DataType {
133 &mut self.column_def.data_type
134 }
135
136 pub fn options(&self) -> &[ColumnOptionDef] {
137 &self.column_def.options
138 }
139
140 pub fn mut_options(&mut self) -> &mut Vec<ColumnOptionDef> {
141 &mut self.column_def.options
142 }
143}
144
145impl Display for Column {
146 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
147 if let Some(vector_options) = &self.extensions.vector_options {
148 if let Some(dim) = vector_options.get(VECTOR_OPT_DIM) {
149 write!(f, "{} VECTOR({})", self.column_def.name, dim)?;
150 return Ok(());
151 }
152 }
153
154 write!(f, "{}", self.column_def)?;
155
156 if let Some(fulltext_options) = &self.extensions.fulltext_index_options {
157 if !fulltext_options.is_empty() {
158 let options = fulltext_options.kv_pairs();
159 write!(f, " FULLTEXT INDEX WITH({})", format_list_comma!(options))?;
160 } else {
161 write!(f, " FULLTEXT INDEX")?;
162 }
163 }
164
165 if let Some(skipping_index_options) = &self.extensions.skipping_index_options {
166 if !skipping_index_options.is_empty() {
167 let options = skipping_index_options.kv_pairs();
168 write!(f, " SKIPPING INDEX WITH({})", format_list_comma!(options))?;
169 } else {
170 write!(f, " SKIPPING INDEX")?;
171 }
172 }
173
174 if let Some(inverted_index_options) = &self.extensions.inverted_index_options {
175 if !inverted_index_options.is_empty() {
176 let options = inverted_index_options.kv_pairs();
177 write!(f, " INVERTED INDEX WITH({})", format_list_comma!(options))?;
178 } else {
179 write!(f, " INVERTED INDEX")?;
180 }
181 }
182 Ok(())
183 }
184}
185
186impl ColumnExtensions {
187 pub fn build_fulltext_options(&self) -> Result<Option<FulltextOptions>> {
188 let Some(options) = self.fulltext_index_options.as_ref() else {
189 return Ok(None);
190 };
191
192 let options: HashMap<String, String> = options.clone().into_map();
193 Ok(Some(options.try_into().context(SetFulltextOptionSnafu)?))
194 }
195
196 pub fn build_skipping_index_options(&self) -> Result<Option<SkippingIndexOptions>> {
197 let Some(options) = self.skipping_index_options.as_ref() else {
198 return Ok(None);
199 };
200
201 let options: HashMap<String, String> = options.clone().into_map();
202 Ok(Some(
203 options.try_into().context(SetSkippingIndexOptionSnafu)?,
204 ))
205 }
206}
207
208#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
209pub struct Partitions {
210 pub column_list: Vec<Ident>,
211 pub exprs: Vec<Expr>,
212}
213
214impl Partitions {
215 pub fn set_quote(&mut self, quote_style: char) {
217 self.column_list
218 .iter_mut()
219 .for_each(|c| c.quote_style = Some(quote_style));
220 }
221}
222
223#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut)]
224pub struct PartitionEntry {
225 pub name: Ident,
226 pub value_list: Vec<SqlValue>,
227}
228
229impl Display for PartitionEntry {
230 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
231 write!(
232 f,
233 "PARTITION {} VALUES LESS THAN ({})",
234 self.name,
235 format_list_comma!(self.value_list),
236 )
237 }
238}
239
240impl Display for Partitions {
241 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
242 if !self.column_list.is_empty() {
243 write!(
244 f,
245 "PARTITION ON COLUMNS ({}) (\n{}\n)",
246 format_list_comma!(self.column_list),
247 format_list_indent!(self.exprs),
248 )?;
249 }
250 Ok(())
251 }
252}
253
254impl Display for CreateTable {
255 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
256 write!(f, "CREATE ")?;
257 if self.engine == FILE_ENGINE {
258 write!(f, "EXTERNAL ")?;
259 }
260 write!(f, "TABLE ")?;
261 if self.if_not_exists {
262 write!(f, "IF NOT EXISTS ")?;
263 }
264 writeln!(f, "{} (", &self.name)?;
265 writeln!(f, "{},", format_list_indent!(self.columns))?;
266 writeln!(f, "{}", format_table_constraint(&self.constraints))?;
267 writeln!(f, ")")?;
268 if let Some(partitions) = &self.partitions {
269 writeln!(f, "{partitions}")?;
270 }
271 writeln!(f, "ENGINE={}", &self.engine)?;
272 if !self.options.is_empty() {
273 let options = self.options.kv_pairs();
274 write!(f, "WITH(\n{}\n)", format_list_indent!(options))?;
275 }
276 Ok(())
277 }
278}
279
280#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
281pub struct CreateDatabase {
282 pub name: ObjectName,
283 pub if_not_exists: bool,
285 pub options: OptionMap,
286}
287
288impl CreateDatabase {
289 pub fn new(name: ObjectName, if_not_exists: bool, options: OptionMap) -> Self {
291 Self {
292 name,
293 if_not_exists,
294 options,
295 }
296 }
297}
298
299impl Display for CreateDatabase {
300 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
301 write!(f, "CREATE DATABASE ")?;
302 if self.if_not_exists {
303 write!(f, "IF NOT EXISTS ")?;
304 }
305 write!(f, "{}", &self.name)?;
306 if !self.options.is_empty() {
307 let options = self.options.kv_pairs();
308 write!(f, "\nWITH(\n{}\n)", format_list_indent!(options))?;
309 }
310 Ok(())
311 }
312}
313
314#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
315pub struct CreateExternalTable {
316 pub name: ObjectName,
318 pub columns: Vec<Column>,
319 pub constraints: Vec<TableConstraint>,
320 pub options: OptionMap,
322 pub if_not_exists: bool,
323 pub engine: String,
324}
325
326impl Display for CreateExternalTable {
327 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
328 write!(f, "CREATE EXTERNAL TABLE ")?;
329 if self.if_not_exists {
330 write!(f, "IF NOT EXISTS ")?;
331 }
332 writeln!(f, "{} (", &self.name)?;
333 writeln!(f, "{},", format_list_indent!(self.columns))?;
334 writeln!(f, "{}", format_table_constraint(&self.constraints))?;
335 writeln!(f, ")")?;
336 writeln!(f, "ENGINE={}", &self.engine)?;
337 if !self.options.is_empty() {
338 let options = self.options.kv_pairs();
339 write!(f, "WITH(\n{}\n)", format_list_indent!(options))?;
340 }
341 Ok(())
342 }
343}
344
345#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
346pub struct CreateTableLike {
347 pub table_name: ObjectName,
349 pub source_name: ObjectName,
351}
352
353impl Display for CreateTableLike {
354 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
355 let table_name = &self.table_name;
356 let source_name = &self.source_name;
357 write!(f, r#"CREATE TABLE {table_name} LIKE {source_name}"#)
358 }
359}
360
361#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
362pub struct CreateFlow {
363 pub flow_name: ObjectName,
365 pub sink_table_name: ObjectName,
367 pub or_replace: bool,
369 pub if_not_exists: bool,
371 pub expire_after: Option<i64>,
374 pub comment: Option<String>,
376 pub query: Box<Query>,
378}
379
380impl Display for CreateFlow {
381 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
382 write!(f, "CREATE ")?;
383 if self.or_replace {
384 write!(f, "OR REPLACE ")?;
385 }
386 write!(f, "FLOW ")?;
387 if self.if_not_exists {
388 write!(f, "IF NOT EXISTS ")?;
389 }
390 writeln!(f, "{}", &self.flow_name)?;
391 writeln!(f, "SINK TO {}", &self.sink_table_name)?;
392 if let Some(expire_after) = &self.expire_after {
393 writeln!(f, "EXPIRE AFTER {} ", expire_after)?;
394 }
395 if let Some(comment) = &self.comment {
396 writeln!(f, "COMMENT '{}'", comment)?;
397 }
398 write!(f, "AS {}", &self.query)
399 }
400}
401
402#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
404pub struct CreateView {
405 pub name: ObjectName,
407 pub columns: Vec<Ident>,
409 pub query: Box<Statement>,
412 pub or_replace: bool,
414 pub if_not_exists: bool,
416}
417
418impl Display for CreateView {
419 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
420 write!(f, "CREATE ")?;
421 if self.or_replace {
422 write!(f, "OR REPLACE ")?;
423 }
424 write!(f, "VIEW ")?;
425 if self.if_not_exists {
426 write!(f, "IF NOT EXISTS ")?;
427 }
428 write!(f, "{} ", &self.name)?;
429 if !self.columns.is_empty() {
430 write!(f, "({}) ", format_list_comma!(self.columns))?;
431 }
432 write!(f, "AS {}", &self.query)
433 }
434}
435
436#[cfg(test)]
437mod tests {
438 use std::assert_matches::assert_matches;
439
440 use crate::dialect::GreptimeDbDialect;
441 use crate::error::Error;
442 use crate::parser::{ParseOptions, ParserContext};
443 use crate::statements::statement::Statement;
444
445 #[test]
446 fn test_display_create_table() {
447 let sql = r"create table if not exists demo(
448 host string,
449 ts timestamp,
450 cpu double default 0,
451 memory double,
452 TIME INDEX (ts),
453 PRIMARY KEY(host)
454 )
455 PARTITION ON COLUMNS (host) (
456 host = 'a',
457 host > 'a',
458 )
459 engine=mito
460 with(ttl='7d', storage='File');
461 ";
462 let result =
463 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
464 .unwrap();
465 assert_eq!(1, result.len());
466
467 match &result[0] {
468 Statement::CreateTable(c) => {
469 let new_sql = format!("\n{}", c);
470 assert_eq!(
471 r#"
472CREATE TABLE IF NOT EXISTS demo (
473 host STRING,
474 ts TIMESTAMP,
475 cpu DOUBLE DEFAULT 0,
476 memory DOUBLE,
477 TIME INDEX (ts),
478 PRIMARY KEY (host)
479)
480PARTITION ON COLUMNS (host) (
481 host = 'a',
482 host > 'a'
483)
484ENGINE=mito
485WITH(
486 storage = 'File',
487 ttl = '7d'
488)"#,
489 &new_sql
490 );
491
492 let new_result = ParserContext::create_with_dialect(
493 &new_sql,
494 &GreptimeDbDialect {},
495 ParseOptions::default(),
496 )
497 .unwrap();
498 assert_eq!(result, new_result);
499 }
500 _ => unreachable!(),
501 }
502 }
503
504 #[test]
505 fn test_display_empty_partition_column() {
506 let sql = r"create table if not exists demo(
507 host string,
508 ts timestamp,
509 cpu double default 0,
510 memory double,
511 TIME INDEX (ts),
512 PRIMARY KEY(ts, host)
513 );
514 ";
515 let result =
516 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
517 .unwrap();
518 assert_eq!(1, result.len());
519
520 match &result[0] {
521 Statement::CreateTable(c) => {
522 let new_sql = format!("\n{}", c);
523 assert_eq!(
524 r#"
525CREATE TABLE IF NOT EXISTS demo (
526 host STRING,
527 ts TIMESTAMP,
528 cpu DOUBLE DEFAULT 0,
529 memory DOUBLE,
530 TIME INDEX (ts),
531 PRIMARY KEY (ts, host)
532)
533ENGINE=mito
534"#,
535 &new_sql
536 );
537
538 let new_result = ParserContext::create_with_dialect(
539 &new_sql,
540 &GreptimeDbDialect {},
541 ParseOptions::default(),
542 )
543 .unwrap();
544 assert_eq!(result, new_result);
545 }
546 _ => unreachable!(),
547 }
548 }
549
550 #[test]
551 fn test_validate_table_options() {
552 let sql = r"create table if not exists demo(
553 host string,
554 ts timestamp,
555 cpu double default 0,
556 memory double,
557 TIME INDEX (ts),
558 PRIMARY KEY(host)
559 )
560 PARTITION ON COLUMNS (host) ()
561 engine=mito
562 with(ttl='7d', 'compaction.type'='world');
563";
564 let result =
565 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
566 .unwrap();
567 match &result[0] {
568 Statement::CreateTable(c) => {
569 assert_eq!(2, c.options.len());
570 }
571 _ => unreachable!(),
572 }
573
574 let sql = r"create table if not exists demo(
575 host string,
576 ts timestamp,
577 cpu double default 0,
578 memory double,
579 TIME INDEX (ts),
580 PRIMARY KEY(host)
581 )
582 PARTITION ON COLUMNS (host) ()
583 engine=mito
584 with(ttl='7d', hello='world');
585";
586 let result =
587 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
588 assert_matches!(result, Err(Error::InvalidTableOption { .. }))
589 }
590
591 #[test]
592 fn test_display_create_database() {
593 let sql = r"create database test;";
594 let stmts =
595 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
596 .unwrap();
597 assert_eq!(1, stmts.len());
598 assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
599
600 match &stmts[0] {
601 Statement::CreateDatabase(set) => {
602 let new_sql = format!("\n{}", set);
603 assert_eq!(
604 r#"
605CREATE DATABASE test"#,
606 &new_sql
607 );
608 }
609 _ => {
610 unreachable!();
611 }
612 }
613
614 let sql = r"create database if not exists test;";
615 let stmts =
616 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
617 .unwrap();
618 assert_eq!(1, stmts.len());
619 assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
620
621 match &stmts[0] {
622 Statement::CreateDatabase(set) => {
623 let new_sql = format!("\n{}", set);
624 assert_eq!(
625 r#"
626CREATE DATABASE IF NOT EXISTS test"#,
627 &new_sql
628 );
629 }
630 _ => {
631 unreachable!();
632 }
633 }
634
635 let sql = r#"CREATE DATABASE IF NOT EXISTS test WITH (ttl='1h');"#;
636 let stmts =
637 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
638 .unwrap();
639 assert_eq!(1, stmts.len());
640 assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
641
642 match &stmts[0] {
643 Statement::CreateDatabase(set) => {
644 let new_sql = format!("\n{}", set);
645 assert_eq!(
646 r#"
647CREATE DATABASE IF NOT EXISTS test
648WITH(
649 ttl = '1h'
650)"#,
651 &new_sql
652 );
653 }
654 _ => {
655 unreachable!();
656 }
657 }
658 }
659
660 #[test]
661 fn test_display_create_table_like() {
662 let sql = r"create table t2 like t1;";
663 let stmts =
664 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
665 .unwrap();
666 assert_eq!(1, stmts.len());
667 assert_matches!(&stmts[0], Statement::CreateTableLike { .. });
668
669 match &stmts[0] {
670 Statement::CreateTableLike(create) => {
671 let new_sql = format!("\n{}", create);
672 assert_eq!(
673 r#"
674CREATE TABLE t2 LIKE t1"#,
675 &new_sql
676 );
677 }
678 _ => {
679 unreachable!();
680 }
681 }
682 }
683
684 #[test]
685 fn test_display_create_external_table() {
686 let sql = r#"CREATE EXTERNAL TABLE city (
687 host string,
688 ts timestamp,
689 cpu float64 default 0,
690 memory float64,
691 TIME INDEX (ts),
692 PRIMARY KEY(host)
693) WITH (location='/var/data/city.csv', format='csv');"#;
694 let stmts =
695 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
696 .unwrap();
697 assert_eq!(1, stmts.len());
698 assert_matches!(&stmts[0], Statement::CreateExternalTable { .. });
699
700 match &stmts[0] {
701 Statement::CreateExternalTable(create) => {
702 let new_sql = format!("\n{}", create);
703 assert_eq!(
704 r#"
705CREATE EXTERNAL TABLE city (
706 host STRING,
707 ts TIMESTAMP,
708 cpu DOUBLE DEFAULT 0,
709 memory DOUBLE,
710 TIME INDEX (ts),
711 PRIMARY KEY (host)
712)
713ENGINE=file
714WITH(
715 format = 'csv',
716 location = '/var/data/city.csv'
717)"#,
718 &new_sql
719 );
720 }
721 _ => {
722 unreachable!();
723 }
724 }
725 }
726
727 #[test]
728 fn test_display_create_flow() {
729 let sql = r"CREATE FLOW filter_numbers
730 SINK TO out_num_cnt
731 AS SELECT number FROM numbers_input where number > 10;";
732 let result =
733 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
734 .unwrap();
735 assert_eq!(1, result.len());
736
737 match &result[0] {
738 Statement::CreateFlow(c) => {
739 let new_sql = format!("\n{}", c);
740 assert_eq!(
741 r#"
742CREATE FLOW filter_numbers
743SINK TO out_num_cnt
744AS SELECT number FROM numbers_input WHERE number > 10"#,
745 &new_sql
746 );
747
748 let new_result = ParserContext::create_with_dialect(
749 &new_sql,
750 &GreptimeDbDialect {},
751 ParseOptions::default(),
752 )
753 .unwrap();
754 assert_eq!(result, new_result);
755 }
756 _ => unreachable!(),
757 }
758 }
759}