sql/statements/
create.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17
18use common_catalog::consts::FILE_ENGINE;
19use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
20use itertools::Itertools;
21use serde::Serialize;
22use snafu::ResultExt;
23use sqlparser::ast::{ColumnOptionDef, DataType, Expr, Query};
24use sqlparser_derive::{Visit, VisitMut};
25
26use crate::ast::{ColumnDef, Ident, ObjectName, Value as SqlValue};
27use crate::error::{
28    InvalidFlowQuerySnafu, Result, SetFulltextOptionSnafu, SetSkippingIndexOptionSnafu,
29};
30use crate::statements::statement::Statement;
31use crate::statements::tql::Tql;
32use crate::statements::OptionMap;
33
34const LINE_SEP: &str = ",\n";
35const COMMA_SEP: &str = ", ";
36const INDENT: usize = 2;
37pub const VECTOR_OPT_DIM: &str = "dim";
38
39macro_rules! format_indent {
40    ($fmt: expr, $arg: expr) => {
41        format!($fmt, format_args!("{: >1$}", "", INDENT), $arg)
42    };
43    ($arg: expr) => {
44        format_indent!("{}{}", $arg)
45    };
46}
47
48macro_rules! format_list_indent {
49    ($list: expr) => {
50        $list.iter().map(|e| format_indent!(e)).join(LINE_SEP)
51    };
52}
53
54macro_rules! format_list_comma {
55    ($list: expr) => {
56        $list.iter().map(|e| format!("{}", e)).join(COMMA_SEP)
57    };
58}
59
60#[cfg(feature = "enterprise")]
61pub mod trigger;
62
63fn format_table_constraint(constraints: &[TableConstraint]) -> String {
64    constraints.iter().map(|c| format_indent!(c)).join(LINE_SEP)
65}
66
67/// Table constraint for create table statement.
68#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
69pub enum TableConstraint {
70    /// Primary key constraint.
71    PrimaryKey { columns: Vec<Ident> },
72    /// Time index constraint.
73    TimeIndex { column: Ident },
74}
75
76impl Display for TableConstraint {
77    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
78        match self {
79            TableConstraint::PrimaryKey { columns } => {
80                write!(f, "PRIMARY KEY ({})", format_list_comma!(columns))
81            }
82            TableConstraint::TimeIndex { column } => {
83                write!(f, "TIME INDEX ({})", column)
84            }
85        }
86    }
87}
88
89#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
90pub struct CreateTable {
91    /// Create if not exists
92    pub if_not_exists: bool,
93    pub table_id: u32,
94    /// Table name
95    pub name: ObjectName,
96    pub columns: Vec<Column>,
97    pub engine: String,
98    pub constraints: Vec<TableConstraint>,
99    /// Table options in `WITH`. All keys are lowercase.
100    pub options: OptionMap,
101    pub partitions: Option<Partitions>,
102}
103
104/// Column definition in `CREATE TABLE` statement.
105#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
106pub struct Column {
107    /// `ColumnDef` from `sqlparser::ast`
108    pub column_def: ColumnDef,
109    /// Column extensions for greptimedb dialect.
110    pub extensions: ColumnExtensions,
111}
112
113/// Column extensions for greptimedb dialect.
114#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Default, Serialize)]
115pub struct ColumnExtensions {
116    /// Vector type options.
117    pub vector_options: Option<OptionMap>,
118
119    /// Fulltext index options.
120    pub fulltext_index_options: Option<OptionMap>,
121    /// Skipping index options.
122    pub skipping_index_options: Option<OptionMap>,
123    /// Inverted index options.
124    ///
125    /// Inverted index doesn't have options at present. There won't be any options in that map.
126    pub inverted_index_options: Option<OptionMap>,
127}
128
129impl Column {
130    pub fn name(&self) -> &Ident {
131        &self.column_def.name
132    }
133
134    pub fn data_type(&self) -> &DataType {
135        &self.column_def.data_type
136    }
137
138    pub fn mut_data_type(&mut self) -> &mut DataType {
139        &mut self.column_def.data_type
140    }
141
142    pub fn options(&self) -> &[ColumnOptionDef] {
143        &self.column_def.options
144    }
145
146    pub fn mut_options(&mut self) -> &mut Vec<ColumnOptionDef> {
147        &mut self.column_def.options
148    }
149}
150
151impl Display for Column {
152    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
153        if let Some(vector_options) = &self.extensions.vector_options {
154            if let Some(dim) = vector_options.get(VECTOR_OPT_DIM) {
155                write!(f, "{} VECTOR({})", self.column_def.name, dim)?;
156                return Ok(());
157            }
158        }
159
160        write!(f, "{}", self.column_def)?;
161
162        if let Some(fulltext_options) = &self.extensions.fulltext_index_options {
163            if !fulltext_options.is_empty() {
164                let options = fulltext_options.kv_pairs();
165                write!(f, " FULLTEXT INDEX WITH({})", format_list_comma!(options))?;
166            } else {
167                write!(f, " FULLTEXT INDEX")?;
168            }
169        }
170
171        if let Some(skipping_index_options) = &self.extensions.skipping_index_options {
172            if !skipping_index_options.is_empty() {
173                let options = skipping_index_options.kv_pairs();
174                write!(f, " SKIPPING INDEX WITH({})", format_list_comma!(options))?;
175            } else {
176                write!(f, " SKIPPING INDEX")?;
177            }
178        }
179
180        if let Some(inverted_index_options) = &self.extensions.inverted_index_options {
181            if !inverted_index_options.is_empty() {
182                let options = inverted_index_options.kv_pairs();
183                write!(f, " INVERTED INDEX WITH({})", format_list_comma!(options))?;
184            } else {
185                write!(f, " INVERTED INDEX")?;
186            }
187        }
188        Ok(())
189    }
190}
191
192impl ColumnExtensions {
193    pub fn build_fulltext_options(&self) -> Result<Option<FulltextOptions>> {
194        let Some(options) = self.fulltext_index_options.as_ref() else {
195            return Ok(None);
196        };
197
198        let options: HashMap<String, String> = options.clone().into_map();
199        Ok(Some(options.try_into().context(SetFulltextOptionSnafu)?))
200    }
201
202    pub fn build_skipping_index_options(&self) -> Result<Option<SkippingIndexOptions>> {
203        let Some(options) = self.skipping_index_options.as_ref() else {
204            return Ok(None);
205        };
206
207        let options: HashMap<String, String> = options.clone().into_map();
208        Ok(Some(
209            options.try_into().context(SetSkippingIndexOptionSnafu)?,
210        ))
211    }
212}
213
214/// Partition on columns or values.
215///
216/// - `column_list` is the list of columns in `PARTITION ON COLUMNS` clause.
217/// - `exprs` is the list of expressions in `PARTITION ON VALUES` clause, like
218///   `host <= 'host1'`, `host > 'host1' and host <= 'host2'` or `host > 'host2'`.
219///   Each expression stands for a partition.
220#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
221pub struct Partitions {
222    pub column_list: Vec<Ident>,
223    pub exprs: Vec<Expr>,
224}
225
226impl Partitions {
227    /// set quotes to all [Ident]s from column list
228    pub fn set_quote(&mut self, quote_style: char) {
229        self.column_list
230            .iter_mut()
231            .for_each(|c| c.quote_style = Some(quote_style));
232    }
233}
234
235#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut)]
236pub struct PartitionEntry {
237    pub name: Ident,
238    pub value_list: Vec<SqlValue>,
239}
240
241impl Display for PartitionEntry {
242    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
243        write!(
244            f,
245            "PARTITION {} VALUES LESS THAN ({})",
246            self.name,
247            format_list_comma!(self.value_list),
248        )
249    }
250}
251
252impl Display for Partitions {
253    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
254        if !self.column_list.is_empty() {
255            write!(
256                f,
257                "PARTITION ON COLUMNS ({}) (\n{}\n)",
258                format_list_comma!(self.column_list),
259                format_list_indent!(self.exprs),
260            )?;
261        }
262        Ok(())
263    }
264}
265
266impl Display for CreateTable {
267    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
268        write!(f, "CREATE ")?;
269        if self.engine == FILE_ENGINE {
270            write!(f, "EXTERNAL ")?;
271        }
272        write!(f, "TABLE ")?;
273        if self.if_not_exists {
274            write!(f, "IF NOT EXISTS ")?;
275        }
276        writeln!(f, "{} (", &self.name)?;
277        writeln!(f, "{},", format_list_indent!(self.columns))?;
278        writeln!(f, "{}", format_table_constraint(&self.constraints))?;
279        writeln!(f, ")")?;
280        if let Some(partitions) = &self.partitions {
281            writeln!(f, "{partitions}")?;
282        }
283        writeln!(f, "ENGINE={}", &self.engine)?;
284        if !self.options.is_empty() {
285            let options = self.options.kv_pairs();
286            write!(f, "WITH(\n{}\n)", format_list_indent!(options))?;
287        }
288        Ok(())
289    }
290}
291
292#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
293pub struct CreateDatabase {
294    pub name: ObjectName,
295    /// Create if not exists
296    pub if_not_exists: bool,
297    pub options: OptionMap,
298}
299
300impl CreateDatabase {
301    /// Creates a statement for `CREATE DATABASE`
302    pub fn new(name: ObjectName, if_not_exists: bool, options: OptionMap) -> Self {
303        Self {
304            name,
305            if_not_exists,
306            options,
307        }
308    }
309}
310
311impl Display for CreateDatabase {
312    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
313        write!(f, "CREATE DATABASE ")?;
314        if self.if_not_exists {
315            write!(f, "IF NOT EXISTS ")?;
316        }
317        write!(f, "{}", &self.name)?;
318        if !self.options.is_empty() {
319            let options = self.options.kv_pairs();
320            write!(f, "\nWITH(\n{}\n)", format_list_indent!(options))?;
321        }
322        Ok(())
323    }
324}
325
326#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
327pub struct CreateExternalTable {
328    /// Table name
329    pub name: ObjectName,
330    pub columns: Vec<Column>,
331    pub constraints: Vec<TableConstraint>,
332    /// Table options in `WITH`. All keys are lowercase.
333    pub options: OptionMap,
334    pub if_not_exists: bool,
335    pub engine: String,
336}
337
338impl Display for CreateExternalTable {
339    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
340        write!(f, "CREATE EXTERNAL TABLE ")?;
341        if self.if_not_exists {
342            write!(f, "IF NOT EXISTS ")?;
343        }
344        writeln!(f, "{} (", &self.name)?;
345        writeln!(f, "{},", format_list_indent!(self.columns))?;
346        writeln!(f, "{}", format_table_constraint(&self.constraints))?;
347        writeln!(f, ")")?;
348        writeln!(f, "ENGINE={}", &self.engine)?;
349        if !self.options.is_empty() {
350            let options = self.options.kv_pairs();
351            write!(f, "WITH(\n{}\n)", format_list_indent!(options))?;
352        }
353        Ok(())
354    }
355}
356
357#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
358pub struct CreateTableLike {
359    /// Table name
360    pub table_name: ObjectName,
361    /// The table that is designated to be imitated by `Like`
362    pub source_name: ObjectName,
363}
364
365impl Display for CreateTableLike {
366    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
367        let table_name = &self.table_name;
368        let source_name = &self.source_name;
369        write!(f, r#"CREATE TABLE {table_name} LIKE {source_name}"#)
370    }
371}
372
373#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
374pub struct CreateFlow {
375    /// Flow name
376    pub flow_name: ObjectName,
377    /// Output (sink) table name
378    pub sink_table_name: ObjectName,
379    /// Whether to replace existing task
380    pub or_replace: bool,
381    /// Create if not exist
382    pub if_not_exists: bool,
383    /// `EXPIRE AFTER`
384    /// Duration in second as `i64`
385    pub expire_after: Option<i64>,
386    /// Comment string
387    pub comment: Option<String>,
388    /// SQL statement
389    pub query: Box<SqlOrTql>,
390}
391
392/// Either a sql query or a tql query
393#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
394pub enum SqlOrTql {
395    Sql(Query, String),
396    Tql(Tql, String),
397}
398
399impl std::fmt::Display for SqlOrTql {
400    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
401        match self {
402            Self::Sql(_, s) => write!(f, "{}", s),
403            Self::Tql(_, s) => write!(f, "{}", s),
404        }
405    }
406}
407
408impl SqlOrTql {
409    pub fn try_from_statement(
410        value: Statement,
411        original_query: &str,
412    ) -> std::result::Result<Self, crate::error::Error> {
413        match value {
414            Statement::Query(query) => {
415                Ok(Self::Sql((*query).try_into()?, original_query.to_string()))
416            }
417            Statement::Tql(tql) => Ok(Self::Tql(tql, original_query.to_string())),
418            _ => InvalidFlowQuerySnafu {
419                reason: format!("Expect either sql query or promql query, found {:?}", value),
420            }
421            .fail(),
422        }
423    }
424}
425
426impl Display for CreateFlow {
427    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
428        write!(f, "CREATE ")?;
429        if self.or_replace {
430            write!(f, "OR REPLACE ")?;
431        }
432        write!(f, "FLOW ")?;
433        if self.if_not_exists {
434            write!(f, "IF NOT EXISTS ")?;
435        }
436        writeln!(f, "{}", &self.flow_name)?;
437        writeln!(f, "SINK TO {}", &self.sink_table_name)?;
438        if let Some(expire_after) = &self.expire_after {
439            writeln!(f, "EXPIRE AFTER {} ", expire_after)?;
440        }
441        if let Some(comment) = &self.comment {
442            writeln!(f, "COMMENT '{}'", comment)?;
443        }
444        write!(f, "AS {}", &self.query)
445    }
446}
447
448/// Create SQL view statement.
449#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
450pub struct CreateView {
451    /// View name
452    pub name: ObjectName,
453    /// An optional list of names to be used for columns of the view
454    pub columns: Vec<Ident>,
455    /// The clause after `As` that defines the VIEW.
456    /// Can only be either [Statement::Query] or [Statement::Tql].
457    pub query: Box<Statement>,
458    /// Whether to replace existing VIEW
459    pub or_replace: bool,
460    /// Create VIEW only when it doesn't exists
461    pub if_not_exists: bool,
462}
463
464impl Display for CreateView {
465    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
466        write!(f, "CREATE ")?;
467        if self.or_replace {
468            write!(f, "OR REPLACE ")?;
469        }
470        write!(f, "VIEW ")?;
471        if self.if_not_exists {
472            write!(f, "IF NOT EXISTS ")?;
473        }
474        write!(f, "{} ", &self.name)?;
475        if !self.columns.is_empty() {
476            write!(f, "({}) ", format_list_comma!(self.columns))?;
477        }
478        write!(f, "AS {}", &self.query)
479    }
480}
481
482#[cfg(test)]
483mod tests {
484    use std::assert_matches::assert_matches;
485
486    use crate::dialect::GreptimeDbDialect;
487    use crate::error::Error;
488    use crate::parser::{ParseOptions, ParserContext};
489    use crate::statements::statement::Statement;
490
491    #[test]
492    fn test_display_create_table() {
493        let sql = r"create table if not exists demo(
494                             host string,
495                             ts timestamp,
496                             cpu double default 0,
497                             memory double,
498                             TIME INDEX (ts),
499                             PRIMARY KEY(host)
500                       )
501                       PARTITION ON COLUMNS (host) (
502                            host = 'a',
503                            host > 'a',
504                       )
505                       engine=mito
506                       with(ttl='7d', storage='File');
507         ";
508        let result =
509            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
510                .unwrap();
511        assert_eq!(1, result.len());
512
513        match &result[0] {
514            Statement::CreateTable(c) => {
515                let new_sql = format!("\n{}", c);
516                assert_eq!(
517                    r#"
518CREATE TABLE IF NOT EXISTS demo (
519  host STRING,
520  ts TIMESTAMP,
521  cpu DOUBLE DEFAULT 0,
522  memory DOUBLE,
523  TIME INDEX (ts),
524  PRIMARY KEY (host)
525)
526PARTITION ON COLUMNS (host) (
527  host = 'a',
528  host > 'a'
529)
530ENGINE=mito
531WITH(
532  storage = 'File',
533  ttl = '7d'
534)"#,
535                    &new_sql
536                );
537
538                let new_result = ParserContext::create_with_dialect(
539                    &new_sql,
540                    &GreptimeDbDialect {},
541                    ParseOptions::default(),
542                )
543                .unwrap();
544                assert_eq!(result, new_result);
545            }
546            _ => unreachable!(),
547        }
548    }
549
550    #[test]
551    fn test_display_empty_partition_column() {
552        let sql = r"create table if not exists demo(
553            host string,
554            ts timestamp,
555            cpu double default 0,
556            memory double,
557            TIME INDEX (ts),
558            PRIMARY KEY(ts, host)
559            );
560        ";
561        let result =
562            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
563                .unwrap();
564        assert_eq!(1, result.len());
565
566        match &result[0] {
567            Statement::CreateTable(c) => {
568                let new_sql = format!("\n{}", c);
569                assert_eq!(
570                    r#"
571CREATE TABLE IF NOT EXISTS demo (
572  host STRING,
573  ts TIMESTAMP,
574  cpu DOUBLE DEFAULT 0,
575  memory DOUBLE,
576  TIME INDEX (ts),
577  PRIMARY KEY (ts, host)
578)
579ENGINE=mito
580"#,
581                    &new_sql
582                );
583
584                let new_result = ParserContext::create_with_dialect(
585                    &new_sql,
586                    &GreptimeDbDialect {},
587                    ParseOptions::default(),
588                )
589                .unwrap();
590                assert_eq!(result, new_result);
591            }
592            _ => unreachable!(),
593        }
594    }
595
596    #[test]
597    fn test_validate_table_options() {
598        let sql = r"create table if not exists demo(
599            host string,
600            ts timestamp,
601            cpu double default 0,
602            memory double,
603            TIME INDEX (ts),
604            PRIMARY KEY(host)
605      )
606      PARTITION ON COLUMNS (host) ()
607      engine=mito
608      with(ttl='7d', 'compaction.type'='world');
609";
610        let result =
611            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
612                .unwrap();
613        match &result[0] {
614            Statement::CreateTable(c) => {
615                assert_eq!(2, c.options.len());
616            }
617            _ => unreachable!(),
618        }
619
620        let sql = r"create table if not exists demo(
621            host string,
622            ts timestamp,
623            cpu double default 0,
624            memory double,
625            TIME INDEX (ts),
626            PRIMARY KEY(host)
627      )
628      PARTITION ON COLUMNS (host) ()
629      engine=mito
630      with(ttl='7d', hello='world');
631";
632        let result =
633            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
634        assert_matches!(result, Err(Error::InvalidTableOption { .. }))
635    }
636
637    #[test]
638    fn test_display_create_database() {
639        let sql = r"create database test;";
640        let stmts =
641            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
642                .unwrap();
643        assert_eq!(1, stmts.len());
644        assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
645
646        match &stmts[0] {
647            Statement::CreateDatabase(set) => {
648                let new_sql = format!("\n{}", set);
649                assert_eq!(
650                    r#"
651CREATE DATABASE test"#,
652                    &new_sql
653                );
654            }
655            _ => {
656                unreachable!();
657            }
658        }
659
660        let sql = r"create database if not exists test;";
661        let stmts =
662            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
663                .unwrap();
664        assert_eq!(1, stmts.len());
665        assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
666
667        match &stmts[0] {
668            Statement::CreateDatabase(set) => {
669                let new_sql = format!("\n{}", set);
670                assert_eq!(
671                    r#"
672CREATE DATABASE IF NOT EXISTS test"#,
673                    &new_sql
674                );
675            }
676            _ => {
677                unreachable!();
678            }
679        }
680
681        let sql = r#"CREATE DATABASE IF NOT EXISTS test WITH (ttl='1h');"#;
682        let stmts =
683            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
684                .unwrap();
685        assert_eq!(1, stmts.len());
686        assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
687
688        match &stmts[0] {
689            Statement::CreateDatabase(set) => {
690                let new_sql = format!("\n{}", set);
691                assert_eq!(
692                    r#"
693CREATE DATABASE IF NOT EXISTS test
694WITH(
695  ttl = '1h'
696)"#,
697                    &new_sql
698                );
699            }
700            _ => {
701                unreachable!();
702            }
703        }
704    }
705
706    #[test]
707    fn test_display_create_table_like() {
708        let sql = r"create table t2 like t1;";
709        let stmts =
710            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
711                .unwrap();
712        assert_eq!(1, stmts.len());
713        assert_matches!(&stmts[0], Statement::CreateTableLike { .. });
714
715        match &stmts[0] {
716            Statement::CreateTableLike(create) => {
717                let new_sql = format!("\n{}", create);
718                assert_eq!(
719                    r#"
720CREATE TABLE t2 LIKE t1"#,
721                    &new_sql
722                );
723            }
724            _ => {
725                unreachable!();
726            }
727        }
728    }
729
730    #[test]
731    fn test_display_create_external_table() {
732        let sql = r#"CREATE EXTERNAL TABLE city (
733            host string,
734            ts timestamp,
735            cpu float64 default 0,
736            memory float64,
737            TIME INDEX (ts),
738            PRIMARY KEY(host)
739) WITH (location='/var/data/city.csv', format='csv');"#;
740        let stmts =
741            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
742                .unwrap();
743        assert_eq!(1, stmts.len());
744        assert_matches!(&stmts[0], Statement::CreateExternalTable { .. });
745
746        match &stmts[0] {
747            Statement::CreateExternalTable(create) => {
748                let new_sql = format!("\n{}", create);
749                assert_eq!(
750                    r#"
751CREATE EXTERNAL TABLE city (
752  host STRING,
753  ts TIMESTAMP,
754  cpu DOUBLE DEFAULT 0,
755  memory DOUBLE,
756  TIME INDEX (ts),
757  PRIMARY KEY (host)
758)
759ENGINE=file
760WITH(
761  format = 'csv',
762  location = '/var/data/city.csv'
763)"#,
764                    &new_sql
765                );
766            }
767            _ => {
768                unreachable!();
769            }
770        }
771    }
772
773    #[test]
774    fn test_display_create_flow() {
775        let sql = r"CREATE FLOW filter_numbers
776            SINK TO out_num_cnt
777            AS SELECT number FROM numbers_input where number > 10;";
778        let result =
779            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
780                .unwrap();
781        assert_eq!(1, result.len());
782
783        match &result[0] {
784            Statement::CreateFlow(c) => {
785                let new_sql = format!("\n{}", c);
786                assert_eq!(
787                    r#"
788CREATE FLOW filter_numbers
789SINK TO out_num_cnt
790AS SELECT number FROM numbers_input where number > 10"#,
791                    &new_sql
792                );
793
794                let new_result = ParserContext::create_with_dialect(
795                    &new_sql,
796                    &GreptimeDbDialect {},
797                    ParseOptions::default(),
798                )
799                .unwrap();
800                assert_eq!(result, new_result);
801            }
802            _ => unreachable!(),
803        }
804    }
805}