sql/statements/
create.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17
18use common_catalog::consts::FILE_ENGINE;
19use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
20use itertools::Itertools;
21use serde::Serialize;
22use snafu::ResultExt;
23use sqlparser::ast::{ColumnOptionDef, DataType, Expr, Query};
24use sqlparser_derive::{Visit, VisitMut};
25
26use crate::ast::{ColumnDef, Ident, ObjectName, Value as SqlValue};
27use crate::error::{
28    InvalidFlowQuerySnafu, Result, SetFulltextOptionSnafu, SetSkippingIndexOptionSnafu,
29};
30use crate::statements::OptionMap;
31use crate::statements::statement::Statement;
32use crate::statements::tql::Tql;
33
34const LINE_SEP: &str = ",\n";
35const COMMA_SEP: &str = ", ";
36const INDENT: usize = 2;
37pub const VECTOR_OPT_DIM: &str = "dim";
38
39macro_rules! format_indent {
40    ($fmt: expr, $arg: expr) => {
41        format!($fmt, format_args!("{: >1$}", "", INDENT), $arg)
42    };
43    ($arg: expr) => {
44        format_indent!("{}{}", $arg)
45    };
46}
47
48macro_rules! format_list_indent {
49    ($list: expr) => {
50        $list.iter().map(|e| format_indent!(e)).join(LINE_SEP)
51    };
52}
53
54macro_rules! format_list_comma {
55    ($list: expr) => {
56        $list.iter().map(|e| format!("{}", e)).join(COMMA_SEP)
57    };
58}
59
60#[cfg(feature = "enterprise")]
61pub mod trigger;
62
63fn format_table_constraint(constraints: &[TableConstraint]) -> String {
64    constraints.iter().map(|c| format_indent!(c)).join(LINE_SEP)
65}
66
67/// Table constraint for create table statement.
68#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
69pub enum TableConstraint {
70    /// Primary key constraint.
71    PrimaryKey { columns: Vec<Ident> },
72    /// Time index constraint.
73    TimeIndex { column: Ident },
74}
75
76impl Display for TableConstraint {
77    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
78        match self {
79            TableConstraint::PrimaryKey { columns } => {
80                write!(f, "PRIMARY KEY ({})", format_list_comma!(columns))
81            }
82            TableConstraint::TimeIndex { column } => {
83                write!(f, "TIME INDEX ({})", column)
84            }
85        }
86    }
87}
88
89#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
90pub struct CreateTable {
91    /// Create if not exists
92    pub if_not_exists: bool,
93    pub table_id: u32,
94    /// Table name
95    pub name: ObjectName,
96    pub columns: Vec<Column>,
97    pub engine: String,
98    pub constraints: Vec<TableConstraint>,
99    /// Table options in `WITH`. All keys are lowercase.
100    pub options: OptionMap,
101    pub partitions: Option<Partitions>,
102}
103
104/// Column definition in `CREATE TABLE` statement.
105#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
106pub struct Column {
107    /// `ColumnDef` from `sqlparser::ast`
108    pub column_def: ColumnDef,
109    /// Column extensions for greptimedb dialect.
110    pub extensions: ColumnExtensions,
111}
112
113/// Column extensions for greptimedb dialect.
114#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Default, Serialize)]
115pub struct ColumnExtensions {
116    /// Vector type options.
117    pub vector_options: Option<OptionMap>,
118
119    /// Fulltext index options.
120    pub fulltext_index_options: Option<OptionMap>,
121    /// Skipping index options.
122    pub skipping_index_options: Option<OptionMap>,
123    /// Inverted index options.
124    ///
125    /// Inverted index doesn't have options at present. There won't be any options in that map.
126    pub inverted_index_options: Option<OptionMap>,
127}
128
129impl Column {
130    pub fn name(&self) -> &Ident {
131        &self.column_def.name
132    }
133
134    pub fn data_type(&self) -> &DataType {
135        &self.column_def.data_type
136    }
137
138    pub fn mut_data_type(&mut self) -> &mut DataType {
139        &mut self.column_def.data_type
140    }
141
142    pub fn options(&self) -> &[ColumnOptionDef] {
143        &self.column_def.options
144    }
145
146    pub fn mut_options(&mut self) -> &mut Vec<ColumnOptionDef> {
147        &mut self.column_def.options
148    }
149}
150
151impl Display for Column {
152    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
153        if let Some(vector_options) = &self.extensions.vector_options
154            && let Some(dim) = vector_options.get(VECTOR_OPT_DIM)
155        {
156            write!(f, "{} VECTOR({})", self.column_def.name, dim)?;
157            return Ok(());
158        }
159
160        write!(f, "{}", self.column_def)?;
161
162        if let Some(fulltext_options) = &self.extensions.fulltext_index_options {
163            if !fulltext_options.is_empty() {
164                let options = fulltext_options.kv_pairs();
165                write!(f, " FULLTEXT INDEX WITH({})", format_list_comma!(options))?;
166            } else {
167                write!(f, " FULLTEXT INDEX")?;
168            }
169        }
170
171        if let Some(skipping_index_options) = &self.extensions.skipping_index_options {
172            if !skipping_index_options.is_empty() {
173                let options = skipping_index_options.kv_pairs();
174                write!(f, " SKIPPING INDEX WITH({})", format_list_comma!(options))?;
175            } else {
176                write!(f, " SKIPPING INDEX")?;
177            }
178        }
179
180        if let Some(inverted_index_options) = &self.extensions.inverted_index_options {
181            if !inverted_index_options.is_empty() {
182                let options = inverted_index_options.kv_pairs();
183                write!(f, " INVERTED INDEX WITH({})", format_list_comma!(options))?;
184            } else {
185                write!(f, " INVERTED INDEX")?;
186            }
187        }
188        Ok(())
189    }
190}
191
192impl ColumnExtensions {
193    pub fn build_fulltext_options(&self) -> Result<Option<FulltextOptions>> {
194        let Some(options) = self.fulltext_index_options.as_ref() else {
195            return Ok(None);
196        };
197
198        let options: HashMap<String, String> = options.clone().into_map();
199        Ok(Some(options.try_into().context(SetFulltextOptionSnafu)?))
200    }
201
202    pub fn build_skipping_index_options(&self) -> Result<Option<SkippingIndexOptions>> {
203        let Some(options) = self.skipping_index_options.as_ref() else {
204            return Ok(None);
205        };
206
207        let options: HashMap<String, String> = options.clone().into_map();
208        Ok(Some(
209            options.try_into().context(SetSkippingIndexOptionSnafu)?,
210        ))
211    }
212}
213
214/// Partition on columns or values.
215///
216/// - `column_list` is the list of columns in `PARTITION ON COLUMNS` clause.
217/// - `exprs` is the list of expressions in `PARTITION ON VALUES` clause, like
218///   `host <= 'host1'`, `host > 'host1' and host <= 'host2'` or `host > 'host2'`.
219///   Each expression stands for a partition.
220#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
221pub struct Partitions {
222    pub column_list: Vec<Ident>,
223    pub exprs: Vec<Expr>,
224}
225
226impl Partitions {
227    /// set quotes to all [Ident]s from column list
228    pub fn set_quote(&mut self, quote_style: char) {
229        self.column_list
230            .iter_mut()
231            .for_each(|c| c.quote_style = Some(quote_style));
232    }
233}
234
235#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut)]
236pub struct PartitionEntry {
237    pub name: Ident,
238    pub value_list: Vec<SqlValue>,
239}
240
241impl Display for PartitionEntry {
242    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
243        write!(
244            f,
245            "PARTITION {} VALUES LESS THAN ({})",
246            self.name,
247            format_list_comma!(self.value_list),
248        )
249    }
250}
251
252impl Display for Partitions {
253    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
254        if !self.column_list.is_empty() {
255            write!(
256                f,
257                "PARTITION ON COLUMNS ({}) (\n{}\n)",
258                format_list_comma!(self.column_list),
259                format_list_indent!(self.exprs),
260            )?;
261        }
262        Ok(())
263    }
264}
265
266impl Display for CreateTable {
267    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
268        write!(f, "CREATE ")?;
269        if self.engine == FILE_ENGINE {
270            write!(f, "EXTERNAL ")?;
271        }
272        write!(f, "TABLE ")?;
273        if self.if_not_exists {
274            write!(f, "IF NOT EXISTS ")?;
275        }
276        writeln!(f, "{} (", &self.name)?;
277        writeln!(f, "{},", format_list_indent!(self.columns))?;
278        writeln!(f, "{}", format_table_constraint(&self.constraints))?;
279        writeln!(f, ")")?;
280        if let Some(partitions) = &self.partitions {
281            writeln!(f, "{partitions}")?;
282        }
283        writeln!(f, "ENGINE={}", &self.engine)?;
284        if !self.options.is_empty() {
285            let options = self.options.kv_pairs();
286            write!(f, "WITH(\n{}\n)", format_list_indent!(options))?;
287        }
288        Ok(())
289    }
290}
291
292#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
293pub struct CreateDatabase {
294    pub name: ObjectName,
295    /// Create if not exists
296    pub if_not_exists: bool,
297    pub options: OptionMap,
298}
299
300impl CreateDatabase {
301    /// Creates a statement for `CREATE DATABASE`
302    pub fn new(name: ObjectName, if_not_exists: bool, options: OptionMap) -> Self {
303        Self {
304            name,
305            if_not_exists,
306            options,
307        }
308    }
309}
310
311impl Display for CreateDatabase {
312    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
313        write!(f, "CREATE DATABASE ")?;
314        if self.if_not_exists {
315            write!(f, "IF NOT EXISTS ")?;
316        }
317        write!(f, "{}", &self.name)?;
318        if !self.options.is_empty() {
319            let options = self.options.kv_pairs();
320            write!(f, "\nWITH(\n{}\n)", format_list_indent!(options))?;
321        }
322        Ok(())
323    }
324}
325
326#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
327pub struct CreateExternalTable {
328    /// Table name
329    pub name: ObjectName,
330    pub columns: Vec<Column>,
331    pub constraints: Vec<TableConstraint>,
332    /// Table options in `WITH`. All keys are lowercase.
333    pub options: OptionMap,
334    pub if_not_exists: bool,
335    pub engine: String,
336}
337
338impl Display for CreateExternalTable {
339    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
340        write!(f, "CREATE EXTERNAL TABLE ")?;
341        if self.if_not_exists {
342            write!(f, "IF NOT EXISTS ")?;
343        }
344        writeln!(f, "{} (", &self.name)?;
345        writeln!(f, "{},", format_list_indent!(self.columns))?;
346        writeln!(f, "{}", format_table_constraint(&self.constraints))?;
347        writeln!(f, ")")?;
348        writeln!(f, "ENGINE={}", &self.engine)?;
349        if !self.options.is_empty() {
350            let options = self.options.kv_pairs();
351            write!(f, "WITH(\n{}\n)", format_list_indent!(options))?;
352        }
353        Ok(())
354    }
355}
356
357#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
358pub struct CreateTableLike {
359    /// Table name
360    pub table_name: ObjectName,
361    /// The table that is designated to be imitated by `Like`
362    pub source_name: ObjectName,
363}
364
365impl Display for CreateTableLike {
366    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
367        let table_name = &self.table_name;
368        let source_name = &self.source_name;
369        write!(f, r#"CREATE TABLE {table_name} LIKE {source_name}"#)
370    }
371}
372
373#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
374pub struct CreateFlow {
375    /// Flow name
376    pub flow_name: ObjectName,
377    /// Output (sink) table name
378    pub sink_table_name: ObjectName,
379    /// Whether to replace existing task
380    pub or_replace: bool,
381    /// Create if not exist
382    pub if_not_exists: bool,
383    /// `EXPIRE AFTER`
384    /// Duration in second as `i64`
385    pub expire_after: Option<i64>,
386    /// Duration for flow evaluation interval
387    /// Duration in seconds as `i64`
388    /// If not set, flow will be evaluated based on time window size and other args.
389    pub eval_interval: Option<i64>,
390    /// Comment string
391    pub comment: Option<String>,
392    /// SQL statement
393    pub query: Box<SqlOrTql>,
394}
395
396/// Either a sql query or a tql query
397#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
398pub enum SqlOrTql {
399    Sql(Query, String),
400    Tql(Tql, String),
401}
402
403impl std::fmt::Display for SqlOrTql {
404    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
405        match self {
406            Self::Sql(_, s) => write!(f, "{}", s),
407            Self::Tql(_, s) => write!(f, "{}", s),
408        }
409    }
410}
411
412impl SqlOrTql {
413    pub fn try_from_statement(
414        value: Statement,
415        original_query: &str,
416    ) -> std::result::Result<Self, crate::error::Error> {
417        match value {
418            Statement::Query(query) => {
419                Ok(Self::Sql((*query).try_into()?, original_query.to_string()))
420            }
421            Statement::Tql(tql) => Ok(Self::Tql(tql, original_query.to_string())),
422            _ => InvalidFlowQuerySnafu {
423                reason: format!("Expect either sql query or promql query, found {:?}", value),
424            }
425            .fail(),
426        }
427    }
428}
429
430impl Display for CreateFlow {
431    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
432        write!(f, "CREATE ")?;
433        if self.or_replace {
434            write!(f, "OR REPLACE ")?;
435        }
436        write!(f, "FLOW ")?;
437        if self.if_not_exists {
438            write!(f, "IF NOT EXISTS ")?;
439        }
440        writeln!(f, "{}", &self.flow_name)?;
441        writeln!(f, "SINK TO {}", &self.sink_table_name)?;
442        if let Some(expire_after) = &self.expire_after {
443            writeln!(f, "EXPIRE AFTER '{} s'", expire_after)?;
444        }
445        if let Some(eval_interval) = &self.eval_interval {
446            writeln!(f, "EVAL INTERVAL '{} s'", eval_interval)?;
447        }
448        if let Some(comment) = &self.comment {
449            writeln!(f, "COMMENT '{}'", comment)?;
450        }
451        write!(f, "AS {}", &self.query)
452    }
453}
454
455/// Create SQL view statement.
456#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
457pub struct CreateView {
458    /// View name
459    pub name: ObjectName,
460    /// An optional list of names to be used for columns of the view
461    pub columns: Vec<Ident>,
462    /// The clause after `As` that defines the VIEW.
463    /// Can only be either [Statement::Query] or [Statement::Tql].
464    pub query: Box<Statement>,
465    /// Whether to replace existing VIEW
466    pub or_replace: bool,
467    /// Create VIEW only when it doesn't exists
468    pub if_not_exists: bool,
469}
470
471impl Display for CreateView {
472    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
473        write!(f, "CREATE ")?;
474        if self.or_replace {
475            write!(f, "OR REPLACE ")?;
476        }
477        write!(f, "VIEW ")?;
478        if self.if_not_exists {
479            write!(f, "IF NOT EXISTS ")?;
480        }
481        write!(f, "{} ", &self.name)?;
482        if !self.columns.is_empty() {
483            write!(f, "({}) ", format_list_comma!(self.columns))?;
484        }
485        write!(f, "AS {}", &self.query)
486    }
487}
488
489#[cfg(test)]
490mod tests {
491    use std::assert_matches::assert_matches;
492
493    use crate::dialect::GreptimeDbDialect;
494    use crate::error::Error;
495    use crate::parser::{ParseOptions, ParserContext};
496    use crate::statements::statement::Statement;
497
498    #[test]
499    fn test_display_create_table() {
500        let sql = r"create table if not exists demo(
501                             host string,
502                             ts timestamp,
503                             cpu double default 0,
504                             memory double,
505                             TIME INDEX (ts),
506                             PRIMARY KEY(host)
507                       )
508                       PARTITION ON COLUMNS (host) (
509                            host = 'a',
510                            host > 'a',
511                       )
512                       engine=mito
513                       with(ttl='7d', storage='File');
514         ";
515        let result =
516            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
517                .unwrap();
518        assert_eq!(1, result.len());
519
520        match &result[0] {
521            Statement::CreateTable(c) => {
522                let new_sql = format!("\n{}", c);
523                assert_eq!(
524                    r#"
525CREATE TABLE IF NOT EXISTS demo (
526  host STRING,
527  ts TIMESTAMP,
528  cpu DOUBLE DEFAULT 0,
529  memory DOUBLE,
530  TIME INDEX (ts),
531  PRIMARY KEY (host)
532)
533PARTITION ON COLUMNS (host) (
534  host = 'a',
535  host > 'a'
536)
537ENGINE=mito
538WITH(
539  storage = 'File',
540  ttl = '7d'
541)"#,
542                    &new_sql
543                );
544
545                let new_result = ParserContext::create_with_dialect(
546                    &new_sql,
547                    &GreptimeDbDialect {},
548                    ParseOptions::default(),
549                )
550                .unwrap();
551                assert_eq!(result, new_result);
552            }
553            _ => unreachable!(),
554        }
555    }
556
557    #[test]
558    fn test_display_empty_partition_column() {
559        let sql = r"create table if not exists demo(
560            host string,
561            ts timestamp,
562            cpu double default 0,
563            memory double,
564            TIME INDEX (ts),
565            PRIMARY KEY(ts, host)
566            );
567        ";
568        let result =
569            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
570                .unwrap();
571        assert_eq!(1, result.len());
572
573        match &result[0] {
574            Statement::CreateTable(c) => {
575                let new_sql = format!("\n{}", c);
576                assert_eq!(
577                    r#"
578CREATE TABLE IF NOT EXISTS demo (
579  host STRING,
580  ts TIMESTAMP,
581  cpu DOUBLE DEFAULT 0,
582  memory DOUBLE,
583  TIME INDEX (ts),
584  PRIMARY KEY (ts, host)
585)
586ENGINE=mito
587"#,
588                    &new_sql
589                );
590
591                let new_result = ParserContext::create_with_dialect(
592                    &new_sql,
593                    &GreptimeDbDialect {},
594                    ParseOptions::default(),
595                )
596                .unwrap();
597                assert_eq!(result, new_result);
598            }
599            _ => unreachable!(),
600        }
601    }
602
603    #[test]
604    fn test_validate_table_options() {
605        let sql = r"create table if not exists demo(
606            host string,
607            ts timestamp,
608            cpu double default 0,
609            memory double,
610            TIME INDEX (ts),
611            PRIMARY KEY(host)
612      )
613      PARTITION ON COLUMNS (host) ()
614      engine=mito
615      with(ttl='7d', 'compaction.type'='world');
616";
617        let result =
618            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
619                .unwrap();
620        match &result[0] {
621            Statement::CreateTable(c) => {
622                assert_eq!(2, c.options.len());
623            }
624            _ => unreachable!(),
625        }
626
627        let sql = r"create table if not exists demo(
628            host string,
629            ts timestamp,
630            cpu double default 0,
631            memory double,
632            TIME INDEX (ts),
633            PRIMARY KEY(host)
634      )
635      PARTITION ON COLUMNS (host) ()
636      engine=mito
637      with(ttl='7d', hello='world');
638";
639        let result =
640            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
641        assert_matches!(result, Err(Error::InvalidTableOption { .. }))
642    }
643
644    #[test]
645    fn test_display_create_database() {
646        let sql = r"create database test;";
647        let stmts =
648            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
649                .unwrap();
650        assert_eq!(1, stmts.len());
651        assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
652
653        match &stmts[0] {
654            Statement::CreateDatabase(set) => {
655                let new_sql = format!("\n{}", set);
656                assert_eq!(
657                    r#"
658CREATE DATABASE test"#,
659                    &new_sql
660                );
661            }
662            _ => {
663                unreachable!();
664            }
665        }
666
667        let sql = r"create database if not exists test;";
668        let stmts =
669            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
670                .unwrap();
671        assert_eq!(1, stmts.len());
672        assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
673
674        match &stmts[0] {
675            Statement::CreateDatabase(set) => {
676                let new_sql = format!("\n{}", set);
677                assert_eq!(
678                    r#"
679CREATE DATABASE IF NOT EXISTS test"#,
680                    &new_sql
681                );
682            }
683            _ => {
684                unreachable!();
685            }
686        }
687
688        let sql = r#"CREATE DATABASE IF NOT EXISTS test WITH (ttl='1h');"#;
689        let stmts =
690            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
691                .unwrap();
692        assert_eq!(1, stmts.len());
693        assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
694
695        match &stmts[0] {
696            Statement::CreateDatabase(set) => {
697                let new_sql = format!("\n{}", set);
698                assert_eq!(
699                    r#"
700CREATE DATABASE IF NOT EXISTS test
701WITH(
702  ttl = '1h'
703)"#,
704                    &new_sql
705                );
706            }
707            _ => {
708                unreachable!();
709            }
710        }
711    }
712
713    #[test]
714    fn test_display_create_table_like() {
715        let sql = r"create table t2 like t1;";
716        let stmts =
717            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
718                .unwrap();
719        assert_eq!(1, stmts.len());
720        assert_matches!(&stmts[0], Statement::CreateTableLike { .. });
721
722        match &stmts[0] {
723            Statement::CreateTableLike(create) => {
724                let new_sql = format!("\n{}", create);
725                assert_eq!(
726                    r#"
727CREATE TABLE t2 LIKE t1"#,
728                    &new_sql
729                );
730            }
731            _ => {
732                unreachable!();
733            }
734        }
735    }
736
737    #[test]
738    fn test_display_create_external_table() {
739        let sql = r#"CREATE EXTERNAL TABLE city (
740            host string,
741            ts timestamp,
742            cpu float64 default 0,
743            memory float64,
744            TIME INDEX (ts),
745            PRIMARY KEY(host)
746) WITH (location='/var/data/city.csv', format='csv');"#;
747        let stmts =
748            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
749                .unwrap();
750        assert_eq!(1, stmts.len());
751        assert_matches!(&stmts[0], Statement::CreateExternalTable { .. });
752
753        match &stmts[0] {
754            Statement::CreateExternalTable(create) => {
755                let new_sql = format!("\n{}", create);
756                assert_eq!(
757                    r#"
758CREATE EXTERNAL TABLE city (
759  host STRING,
760  ts TIMESTAMP,
761  cpu DOUBLE DEFAULT 0,
762  memory DOUBLE,
763  TIME INDEX (ts),
764  PRIMARY KEY (host)
765)
766ENGINE=file
767WITH(
768  format = 'csv',
769  location = '/var/data/city.csv'
770)"#,
771                    &new_sql
772                );
773            }
774            _ => {
775                unreachable!();
776            }
777        }
778    }
779
780    #[test]
781    fn test_display_create_flow() {
782        let sql = r"CREATE FLOW filter_numbers
783            SINK TO out_num_cnt
784            AS SELECT number FROM numbers_input where number > 10;";
785        let result =
786            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
787                .unwrap();
788        assert_eq!(1, result.len());
789
790        match &result[0] {
791            Statement::CreateFlow(c) => {
792                let new_sql = format!("\n{}", c);
793                assert_eq!(
794                    r#"
795CREATE FLOW filter_numbers
796SINK TO out_num_cnt
797AS SELECT number FROM numbers_input where number > 10"#,
798                    &new_sql
799                );
800
801                let new_result = ParserContext::create_with_dialect(
802                    &new_sql,
803                    &GreptimeDbDialect {},
804                    ParseOptions::default(),
805                )
806                .unwrap();
807                assert_eq!(result, new_result);
808            }
809            _ => unreachable!(),
810        }
811    }
812}