sql/statements/
create.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::{Display, Formatter};
17use std::sync::Arc;
18
19use common_catalog::consts::FILE_ENGINE;
20use datatypes::data_type::ConcreteDataType;
21use datatypes::json::JsonStructureSettings;
22use datatypes::schema::{
23    FulltextOptions, SkippingIndexOptions, VectorDistanceMetric, VectorIndexEngineType,
24    VectorIndexOptions,
25};
26use datatypes::types::StructType;
27use itertools::Itertools;
28use serde::Serialize;
29use snafu::{OptionExt, ResultExt};
30use sqlparser::ast::{ColumnOptionDef, DataType, Expr, Query};
31use sqlparser_derive::{Visit, VisitMut};
32
33use crate::ast::{ColumnDef, Ident, ObjectName, Value as SqlValue};
34use crate::error::{
35    InvalidFlowQuerySnafu, InvalidJsonStructureSettingSnafu, InvalidSqlSnafu, Result,
36    SetFulltextOptionSnafu, SetSkippingIndexOptionSnafu,
37};
38use crate::statements::statement::Statement;
39use crate::statements::tql::Tql;
40use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
41use crate::util::OptionValue;
42
43const LINE_SEP: &str = ",\n";
44const COMMA_SEP: &str = ", ";
45const INDENT: usize = 2;
46pub const VECTOR_OPT_DIM: &str = "dim";
47
48pub const JSON_OPT_UNSTRUCTURED_KEYS: &str = "unstructured_keys";
49pub const JSON_OPT_FORMAT: &str = "format";
50pub(crate) const JSON_OPT_FIELDS: &str = "fields";
51pub const JSON_FORMAT_FULL_STRUCTURED: &str = "structured";
52pub const JSON_FORMAT_RAW: &str = "raw";
53pub const JSON_FORMAT_PARTIAL: &str = "partial";
54
55macro_rules! format_indent {
56    ($fmt: expr, $arg: expr) => {
57        format!($fmt, format_args!("{: >1$}", "", INDENT), $arg)
58    };
59    ($arg: expr) => {
60        format_indent!("{}{}", $arg)
61    };
62}
63
64macro_rules! format_list_indent {
65    ($list: expr) => {
66        $list.iter().map(|e| format_indent!(e)).join(LINE_SEP)
67    };
68}
69
70macro_rules! format_list_comma {
71    ($list: expr) => {
72        $list.iter().map(|e| format!("{}", e)).join(COMMA_SEP)
73    };
74}
75
76#[cfg(feature = "enterprise")]
77pub mod trigger;
78
79fn format_table_constraint(constraints: &[TableConstraint]) -> String {
80    constraints.iter().map(|c| format_indent!(c)).join(LINE_SEP)
81}
82
83/// Table constraint for create table statement.
84#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
85pub enum TableConstraint {
86    /// Primary key constraint.
87    PrimaryKey { columns: Vec<Ident> },
88    /// Time index constraint.
89    TimeIndex { column: Ident },
90}
91
92impl Display for TableConstraint {
93    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
94        match self {
95            TableConstraint::PrimaryKey { columns } => {
96                write!(f, "PRIMARY KEY ({})", format_list_comma!(columns))
97            }
98            TableConstraint::TimeIndex { column } => {
99                write!(f, "TIME INDEX ({})", column)
100            }
101        }
102    }
103}
104
105#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
106pub struct CreateTable {
107    /// Create if not exists
108    pub if_not_exists: bool,
109    pub table_id: u32,
110    /// Table name
111    pub name: ObjectName,
112    pub columns: Vec<Column>,
113    pub engine: String,
114    pub constraints: Vec<TableConstraint>,
115    /// Table options in `WITH`. All keys are lowercase.
116    pub options: OptionMap,
117    pub partitions: Option<Partitions>,
118}
119
120/// Column definition in `CREATE TABLE` statement.
121#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
122pub struct Column {
123    /// `ColumnDef` from `sqlparser::ast`
124    pub column_def: ColumnDef,
125    /// Column extensions for greptimedb dialect.
126    pub extensions: ColumnExtensions,
127}
128
129/// Column extensions for greptimedb dialect.
130#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Default, Serialize)]
131pub struct ColumnExtensions {
132    /// Vector type options.
133    pub vector_options: Option<OptionMap>,
134
135    /// Fulltext index options.
136    pub fulltext_index_options: Option<OptionMap>,
137    /// Skipping index options.
138    pub skipping_index_options: Option<OptionMap>,
139    /// Inverted index options.
140    ///
141    /// Inverted index doesn't have options at present. There won't be any options in that map.
142    pub inverted_index_options: Option<OptionMap>,
143    /// Vector index options for HNSW-based vector similarity search.
144    pub vector_index_options: Option<OptionMap>,
145    pub json_datatype_options: Option<OptionMap>,
146}
147
148impl Column {
149    pub fn name(&self) -> &Ident {
150        &self.column_def.name
151    }
152
153    pub fn data_type(&self) -> &DataType {
154        &self.column_def.data_type
155    }
156
157    pub fn mut_data_type(&mut self) -> &mut DataType {
158        &mut self.column_def.data_type
159    }
160
161    pub fn options(&self) -> &[ColumnOptionDef] {
162        &self.column_def.options
163    }
164
165    pub fn mut_options(&mut self) -> &mut Vec<ColumnOptionDef> {
166        &mut self.column_def.options
167    }
168}
169
170impl Display for Column {
171    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
172        if let Some(vector_options) = &self.extensions.vector_options
173            && let Some(dim) = vector_options.get(VECTOR_OPT_DIM)
174        {
175            write!(f, "{} VECTOR({})", self.column_def.name, dim)?;
176            return Ok(());
177        }
178
179        write!(f, "{} {}", self.column_def.name, self.column_def.data_type)?;
180        if let Some(options) = &self.extensions.json_datatype_options {
181            write!(
182                f,
183                "({})",
184                options
185                    .entries()
186                    .map(|(k, v)| format!("{k} = {v}"))
187                    .join(COMMA_SEP)
188            )?;
189        }
190        for option in &self.column_def.options {
191            write!(f, " {option}")?;
192        }
193
194        if let Some(fulltext_options) = &self.extensions.fulltext_index_options {
195            if !fulltext_options.is_empty() {
196                let options = fulltext_options.kv_pairs();
197                write!(f, " FULLTEXT INDEX WITH({})", format_list_comma!(options))?;
198            } else {
199                write!(f, " FULLTEXT INDEX")?;
200            }
201        }
202
203        if let Some(skipping_index_options) = &self.extensions.skipping_index_options {
204            if !skipping_index_options.is_empty() {
205                let options = skipping_index_options.kv_pairs();
206                write!(f, " SKIPPING INDEX WITH({})", format_list_comma!(options))?;
207            } else {
208                write!(f, " SKIPPING INDEX")?;
209            }
210        }
211
212        if let Some(inverted_index_options) = &self.extensions.inverted_index_options {
213            if !inverted_index_options.is_empty() {
214                let options = inverted_index_options.kv_pairs();
215                write!(f, " INVERTED INDEX WITH({})", format_list_comma!(options))?;
216            } else {
217                write!(f, " INVERTED INDEX")?;
218            }
219        }
220
221        if let Some(vector_index_options) = &self.extensions.vector_index_options {
222            if !vector_index_options.is_empty() {
223                let options = vector_index_options.kv_pairs();
224                write!(f, " VECTOR INDEX WITH({})", format_list_comma!(options))?;
225            } else {
226                write!(f, " VECTOR INDEX")?;
227            }
228        }
229        Ok(())
230    }
231}
232
233impl ColumnExtensions {
234    pub fn build_fulltext_options(&self) -> Result<Option<FulltextOptions>> {
235        let Some(options) = self.fulltext_index_options.as_ref() else {
236            return Ok(None);
237        };
238
239        let options: HashMap<String, String> = options.clone().into_map();
240        Ok(Some(options.try_into().context(SetFulltextOptionSnafu)?))
241    }
242
243    pub fn build_skipping_index_options(&self) -> Result<Option<SkippingIndexOptions>> {
244        let Some(options) = self.skipping_index_options.as_ref() else {
245            return Ok(None);
246        };
247
248        let options: HashMap<String, String> = options.clone().into_map();
249        Ok(Some(
250            options.try_into().context(SetSkippingIndexOptionSnafu)?,
251        ))
252    }
253
254    pub fn build_vector_index_options(&self) -> Result<Option<VectorIndexOptions>> {
255        let Some(options) = self.vector_index_options.as_ref() else {
256            return Ok(None);
257        };
258
259        let options_map: HashMap<String, String> = options.clone().into_map();
260        let mut result = VectorIndexOptions::default();
261
262        if let Some(s) = options_map.get("engine") {
263            result.engine = s.parse::<VectorIndexEngineType>().map_err(|e| {
264                InvalidSqlSnafu {
265                    msg: format!("invalid VECTOR INDEX engine: {e}"),
266                }
267                .build()
268            })?;
269        }
270
271        if let Some(s) = options_map.get("metric") {
272            result.metric = s.parse::<VectorDistanceMetric>().map_err(|e| {
273                InvalidSqlSnafu {
274                    msg: format!("invalid VECTOR INDEX metric: {e}"),
275                }
276                .build()
277            })?;
278        }
279
280        if let Some(s) = options_map.get("connectivity") {
281            let value = s.parse::<u32>().map_err(|_| {
282                InvalidSqlSnafu {
283                    msg: format!(
284                        "invalid VECTOR INDEX connectivity: {s}, expected positive integer"
285                    ),
286                }
287                .build()
288            })?;
289            if !(2..=2048).contains(&value) {
290                return InvalidSqlSnafu {
291                    msg: "VECTOR INDEX connectivity must be in the range [2, 2048].".to_string(),
292                }
293                .fail();
294            }
295            result.connectivity = value;
296        }
297
298        if let Some(s) = options_map.get("expansion_add") {
299            let value = s.parse::<u32>().map_err(|_| {
300                InvalidSqlSnafu {
301                    msg: format!(
302                        "invalid VECTOR INDEX expansion_add: {s}, expected positive integer"
303                    ),
304                }
305                .build()
306            })?;
307            if value == 0 {
308                return InvalidSqlSnafu {
309                    msg: "VECTOR INDEX expansion_add must be greater than 0".to_string(),
310                }
311                .fail();
312            }
313            result.expansion_add = value;
314        }
315
316        if let Some(s) = options_map.get("expansion_search") {
317            let value = s.parse::<u32>().map_err(|_| {
318                InvalidSqlSnafu {
319                    msg: format!(
320                        "invalid VECTOR INDEX expansion_search: {s}, expected positive integer"
321                    ),
322                }
323                .build()
324            })?;
325            if value == 0 {
326                return InvalidSqlSnafu {
327                    msg: "VECTOR INDEX expansion_search must be greater than 0".to_string(),
328                }
329                .fail();
330            }
331            result.expansion_search = value;
332        }
333
334        Ok(Some(result))
335    }
336
337    pub fn build_json_structure_settings(&self) -> Result<Option<JsonStructureSettings>> {
338        let Some(options) = self.json_datatype_options.as_ref() else {
339            return Ok(None);
340        };
341
342        let unstructured_keys = options
343            .value(JSON_OPT_UNSTRUCTURED_KEYS)
344            .and_then(|v| {
345                v.as_list().map(|x| {
346                    x.into_iter()
347                        .map(|x| x.to_string())
348                        .collect::<HashSet<String>>()
349                })
350            })
351            .unwrap_or_default();
352
353        let fields = if let Some(value) = options.value(JSON_OPT_FIELDS) {
354            let fields = value
355                .as_struct_fields()
356                .context(InvalidJsonStructureSettingSnafu {
357                    reason: format!(r#"expect "{JSON_OPT_FIELDS}" a struct, actual: "{value}""#,),
358                })?;
359            let fields = fields
360                .iter()
361                .map(|field| {
362                    let name = field.field_name.as_ref().map(|x| x.value.clone()).context(
363                        InvalidJsonStructureSettingSnafu {
364                            reason: format!(r#"missing field name in "{field}""#),
365                        },
366                    )?;
367                    let datatype = sql_data_type_to_concrete_data_type(
368                        &field.field_type,
369                        &Default::default(),
370                    )?;
371                    Ok(datatypes::types::StructField::new(name, datatype, true))
372                })
373                .collect::<Result<_>>()?;
374            Some(StructType::new(Arc::new(fields)))
375        } else {
376            None
377        };
378
379        options
380            .get(JSON_OPT_FORMAT)
381            .map(|format| match format {
382                JSON_FORMAT_FULL_STRUCTURED => Ok(JsonStructureSettings::Structured(fields)),
383                JSON_FORMAT_PARTIAL => {
384                    let fields = fields.map(|fields| {
385                        let mut fields = Arc::unwrap_or_clone(fields.fields());
386                        fields.push(datatypes::types::StructField::new(
387                            JsonStructureSettings::RAW_FIELD.to_string(),
388                            ConcreteDataType::string_datatype(),
389                            true,
390                        ));
391                        StructType::new(Arc::new(fields))
392                    });
393                    Ok(JsonStructureSettings::PartialUnstructuredByKey {
394                        fields,
395                        unstructured_keys,
396                    })
397                }
398                JSON_FORMAT_RAW => Ok(JsonStructureSettings::UnstructuredRaw),
399                _ => InvalidSqlSnafu {
400                    msg: format!("unknown JSON datatype 'format': {format}"),
401                }
402                .fail(),
403            })
404            .transpose()
405    }
406
407    pub fn set_json_structure_settings(&mut self, settings: JsonStructureSettings) {
408        let mut map = OptionMap::default();
409
410        let format = match settings {
411            JsonStructureSettings::Structured(_) => JSON_FORMAT_FULL_STRUCTURED,
412            JsonStructureSettings::PartialUnstructuredByKey { .. } => JSON_FORMAT_PARTIAL,
413            JsonStructureSettings::UnstructuredRaw => JSON_FORMAT_RAW,
414        };
415        map.insert(JSON_OPT_FORMAT.to_string(), format.to_string());
416
417        if let JsonStructureSettings::PartialUnstructuredByKey {
418            fields: _,
419            unstructured_keys,
420        } = settings
421        {
422            let value = OptionValue::from(
423                unstructured_keys
424                    .iter()
425                    .map(|x| x.as_str())
426                    .sorted()
427                    .collect::<Vec<_>>(),
428            );
429            map.insert_options(JSON_OPT_UNSTRUCTURED_KEYS, value);
430        }
431
432        self.json_datatype_options = Some(map);
433    }
434}
435
436/// Partition on columns or values.
437///
438/// - `column_list` is the list of columns in `PARTITION ON COLUMNS` clause.
439/// - `exprs` is the list of expressions in `PARTITION ON VALUES` clause, like
440///   `host <= 'host1'`, `host > 'host1' and host <= 'host2'` or `host > 'host2'`.
441///   Each expression stands for a partition.
442#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
443pub struct Partitions {
444    pub column_list: Vec<Ident>,
445    pub exprs: Vec<Expr>,
446}
447
448impl Partitions {
449    /// set quotes to all [Ident]s from column list
450    pub fn set_quote(&mut self, quote_style: char) {
451        self.column_list
452            .iter_mut()
453            .for_each(|c| c.quote_style = Some(quote_style));
454    }
455}
456
457#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut)]
458pub struct PartitionEntry {
459    pub name: Ident,
460    pub value_list: Vec<SqlValue>,
461}
462
463impl Display for PartitionEntry {
464    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
465        write!(
466            f,
467            "PARTITION {} VALUES LESS THAN ({})",
468            self.name,
469            format_list_comma!(self.value_list),
470        )
471    }
472}
473
474impl Display for Partitions {
475    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
476        if !self.column_list.is_empty() {
477            write!(
478                f,
479                "PARTITION ON COLUMNS ({}) (\n{}\n)",
480                format_list_comma!(self.column_list),
481                format_list_indent!(self.exprs),
482            )?;
483        }
484        Ok(())
485    }
486}
487
488impl Display for CreateTable {
489    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
490        write!(f, "CREATE ")?;
491        if self.engine == FILE_ENGINE {
492            write!(f, "EXTERNAL ")?;
493        }
494        write!(f, "TABLE ")?;
495        if self.if_not_exists {
496            write!(f, "IF NOT EXISTS ")?;
497        }
498        writeln!(f, "{} (", &self.name)?;
499        writeln!(f, "{},", format_list_indent!(self.columns))?;
500        writeln!(f, "{}", format_table_constraint(&self.constraints))?;
501        writeln!(f, ")")?;
502        if let Some(partitions) = &self.partitions {
503            writeln!(f, "{partitions}")?;
504        }
505        writeln!(f, "ENGINE={}", &self.engine)?;
506        if !self.options.is_empty() {
507            let options = self.options.kv_pairs();
508            write!(f, "WITH(\n{}\n)", format_list_indent!(options))?;
509        }
510        Ok(())
511    }
512}
513
514#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
515pub struct CreateDatabase {
516    pub name: ObjectName,
517    /// Create if not exists
518    pub if_not_exists: bool,
519    pub options: OptionMap,
520}
521
522impl CreateDatabase {
523    /// Creates a statement for `CREATE DATABASE`
524    pub fn new(name: ObjectName, if_not_exists: bool, options: OptionMap) -> Self {
525        Self {
526            name,
527            if_not_exists,
528            options,
529        }
530    }
531}
532
533impl Display for CreateDatabase {
534    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
535        write!(f, "CREATE DATABASE ")?;
536        if self.if_not_exists {
537            write!(f, "IF NOT EXISTS ")?;
538        }
539        write!(f, "{}", &self.name)?;
540        if !self.options.is_empty() {
541            let options = self.options.kv_pairs();
542            write!(f, "\nWITH(\n{}\n)", format_list_indent!(options))?;
543        }
544        Ok(())
545    }
546}
547
548#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
549pub struct CreateExternalTable {
550    /// Table name
551    pub name: ObjectName,
552    pub columns: Vec<Column>,
553    pub constraints: Vec<TableConstraint>,
554    /// Table options in `WITH`. All keys are lowercase.
555    pub options: OptionMap,
556    pub if_not_exists: bool,
557    pub engine: String,
558}
559
560impl Display for CreateExternalTable {
561    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
562        write!(f, "CREATE EXTERNAL TABLE ")?;
563        if self.if_not_exists {
564            write!(f, "IF NOT EXISTS ")?;
565        }
566        writeln!(f, "{} (", &self.name)?;
567        writeln!(f, "{},", format_list_indent!(self.columns))?;
568        writeln!(f, "{}", format_table_constraint(&self.constraints))?;
569        writeln!(f, ")")?;
570        writeln!(f, "ENGINE={}", &self.engine)?;
571        if !self.options.is_empty() {
572            let options = self.options.kv_pairs();
573            write!(f, "WITH(\n{}\n)", format_list_indent!(options))?;
574        }
575        Ok(())
576    }
577}
578
579#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
580pub struct CreateTableLike {
581    /// Table name
582    pub table_name: ObjectName,
583    /// The table that is designated to be imitated by `Like`
584    pub source_name: ObjectName,
585}
586
587impl Display for CreateTableLike {
588    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
589        let table_name = &self.table_name;
590        let source_name = &self.source_name;
591        write!(f, r#"CREATE TABLE {table_name} LIKE {source_name}"#)
592    }
593}
594
595#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
596pub struct CreateFlow {
597    /// Flow name
598    pub flow_name: ObjectName,
599    /// Output (sink) table name
600    pub sink_table_name: ObjectName,
601    /// Whether to replace existing task
602    pub or_replace: bool,
603    /// Create if not exist
604    pub if_not_exists: bool,
605    /// `EXPIRE AFTER`
606    /// Duration in second as `i64`
607    pub expire_after: Option<i64>,
608    /// Duration for flow evaluation interval
609    /// Duration in seconds as `i64`
610    /// If not set, flow will be evaluated based on time window size and other args.
611    pub eval_interval: Option<i64>,
612    /// Comment string
613    pub comment: Option<String>,
614    /// SQL statement
615    pub query: Box<SqlOrTql>,
616}
617
618/// Either a sql query or a tql query
619#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
620pub enum SqlOrTql {
621    Sql(Query, String),
622    Tql(Tql, String),
623}
624
625impl std::fmt::Display for SqlOrTql {
626    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
627        match self {
628            Self::Sql(_, s) => write!(f, "{}", s),
629            Self::Tql(_, s) => write!(f, "{}", s),
630        }
631    }
632}
633
634impl SqlOrTql {
635    pub fn try_from_statement(
636        value: Statement,
637        original_query: &str,
638    ) -> std::result::Result<Self, crate::error::Error> {
639        match value {
640            Statement::Query(query) => {
641                Ok(Self::Sql((*query).try_into()?, original_query.to_string()))
642            }
643            Statement::Tql(tql) => Ok(Self::Tql(tql, original_query.to_string())),
644            _ => InvalidFlowQuerySnafu {
645                reason: format!("Expect either sql query or promql query, found {:?}", value),
646            }
647            .fail(),
648        }
649    }
650}
651
652impl Display for CreateFlow {
653    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
654        write!(f, "CREATE ")?;
655        if self.or_replace {
656            write!(f, "OR REPLACE ")?;
657        }
658        write!(f, "FLOW ")?;
659        if self.if_not_exists {
660            write!(f, "IF NOT EXISTS ")?;
661        }
662        writeln!(f, "{}", &self.flow_name)?;
663        writeln!(f, "SINK TO {}", &self.sink_table_name)?;
664        if let Some(expire_after) = &self.expire_after {
665            writeln!(f, "EXPIRE AFTER '{} s'", expire_after)?;
666        }
667        if let Some(eval_interval) = &self.eval_interval {
668            writeln!(f, "EVAL INTERVAL '{} s'", eval_interval)?;
669        }
670        if let Some(comment) = &self.comment {
671            writeln!(f, "COMMENT '{}'", comment)?;
672        }
673        write!(f, "AS {}", &self.query)
674    }
675}
676
677/// Create SQL view statement.
678#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
679pub struct CreateView {
680    /// View name
681    pub name: ObjectName,
682    /// An optional list of names to be used for columns of the view
683    pub columns: Vec<Ident>,
684    /// The clause after `As` that defines the VIEW.
685    /// Can only be either [Statement::Query] or [Statement::Tql].
686    pub query: Box<Statement>,
687    /// Whether to replace existing VIEW
688    pub or_replace: bool,
689    /// Create VIEW only when it doesn't exists
690    pub if_not_exists: bool,
691}
692
693impl Display for CreateView {
694    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
695        write!(f, "CREATE ")?;
696        if self.or_replace {
697            write!(f, "OR REPLACE ")?;
698        }
699        write!(f, "VIEW ")?;
700        if self.if_not_exists {
701            write!(f, "IF NOT EXISTS ")?;
702        }
703        write!(f, "{} ", &self.name)?;
704        if !self.columns.is_empty() {
705            write!(f, "({}) ", format_list_comma!(self.columns))?;
706        }
707        write!(f, "AS {}", &self.query)
708    }
709}
710
711#[cfg(test)]
712mod tests {
713    use std::assert_matches::assert_matches;
714
715    use crate::dialect::GreptimeDbDialect;
716    use crate::error::Error;
717    use crate::parser::{ParseOptions, ParserContext};
718    use crate::statements::statement::Statement;
719
720    #[test]
721    fn test_display_create_table() {
722        let sql = r"create table if not exists demo(
723                             host string,
724                             ts timestamp,
725                             cpu double default 0,
726                             memory double,
727                             TIME INDEX (ts),
728                             PRIMARY KEY(host)
729                       )
730                       PARTITION ON COLUMNS (host) (
731                            host = 'a',
732                            host > 'a',
733                       )
734                       engine=mito
735                       with(ttl='7d', storage='File');
736         ";
737        let result =
738            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
739                .unwrap();
740        assert_eq!(1, result.len());
741
742        match &result[0] {
743            Statement::CreateTable(c) => {
744                let new_sql = format!("\n{}", c);
745                assert_eq!(
746                    r#"
747CREATE TABLE IF NOT EXISTS demo (
748  host STRING,
749  ts TIMESTAMP,
750  cpu DOUBLE DEFAULT 0,
751  memory DOUBLE,
752  TIME INDEX (ts),
753  PRIMARY KEY (host)
754)
755PARTITION ON COLUMNS (host) (
756  host = 'a',
757  host > 'a'
758)
759ENGINE=mito
760WITH(
761  storage = 'File',
762  ttl = '7d'
763)"#,
764                    &new_sql
765                );
766
767                let new_result = ParserContext::create_with_dialect(
768                    &new_sql,
769                    &GreptimeDbDialect {},
770                    ParseOptions::default(),
771                )
772                .unwrap();
773                assert_eq!(result, new_result);
774            }
775            _ => unreachable!(),
776        }
777    }
778
779    #[test]
780    fn test_display_empty_partition_column() {
781        let sql = r"create table if not exists demo(
782            host string,
783            ts timestamp,
784            cpu double default 0,
785            memory double,
786            TIME INDEX (ts),
787            PRIMARY KEY(ts, host)
788            );
789        ";
790        let result =
791            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
792                .unwrap();
793        assert_eq!(1, result.len());
794
795        match &result[0] {
796            Statement::CreateTable(c) => {
797                let new_sql = format!("\n{}", c);
798                assert_eq!(
799                    r#"
800CREATE TABLE IF NOT EXISTS demo (
801  host STRING,
802  ts TIMESTAMP,
803  cpu DOUBLE DEFAULT 0,
804  memory DOUBLE,
805  TIME INDEX (ts),
806  PRIMARY KEY (ts, host)
807)
808ENGINE=mito
809"#,
810                    &new_sql
811                );
812
813                let new_result = ParserContext::create_with_dialect(
814                    &new_sql,
815                    &GreptimeDbDialect {},
816                    ParseOptions::default(),
817                )
818                .unwrap();
819                assert_eq!(result, new_result);
820            }
821            _ => unreachable!(),
822        }
823    }
824
825    #[test]
826    fn test_validate_table_options() {
827        let sql = r"create table if not exists demo(
828            host string,
829            ts timestamp,
830            cpu double default 0,
831            memory double,
832            TIME INDEX (ts),
833            PRIMARY KEY(host)
834      )
835      PARTITION ON COLUMNS (host) ()
836      engine=mito
837      with(ttl='7d', 'compaction.type'='world');
838";
839        let result =
840            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
841                .unwrap();
842        match &result[0] {
843            Statement::CreateTable(c) => {
844                assert_eq!(2, c.options.len());
845            }
846            _ => unreachable!(),
847        }
848
849        let sql = r"create table if not exists demo(
850            host string,
851            ts timestamp,
852            cpu double default 0,
853            memory double,
854            TIME INDEX (ts),
855            PRIMARY KEY(host)
856      )
857      PARTITION ON COLUMNS (host) ()
858      engine=mito
859      with(ttl='7d', hello='world');
860";
861        let result =
862            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
863        assert_matches!(result, Err(Error::InvalidTableOption { .. }))
864    }
865
866    #[test]
867    fn test_display_create_database() {
868        let sql = r"create database test;";
869        let stmts =
870            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
871                .unwrap();
872        assert_eq!(1, stmts.len());
873        assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
874
875        match &stmts[0] {
876            Statement::CreateDatabase(set) => {
877                let new_sql = format!("\n{}", set);
878                assert_eq!(
879                    r#"
880CREATE DATABASE test"#,
881                    &new_sql
882                );
883            }
884            _ => {
885                unreachable!();
886            }
887        }
888
889        let sql = r"create database if not exists test;";
890        let stmts =
891            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
892                .unwrap();
893        assert_eq!(1, stmts.len());
894        assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
895
896        match &stmts[0] {
897            Statement::CreateDatabase(set) => {
898                let new_sql = format!("\n{}", set);
899                assert_eq!(
900                    r#"
901CREATE DATABASE IF NOT EXISTS test"#,
902                    &new_sql
903                );
904            }
905            _ => {
906                unreachable!();
907            }
908        }
909
910        let sql = r#"CREATE DATABASE IF NOT EXISTS test WITH (ttl='1h');"#;
911        let stmts =
912            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
913                .unwrap();
914        assert_eq!(1, stmts.len());
915        assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
916
917        match &stmts[0] {
918            Statement::CreateDatabase(set) => {
919                let new_sql = format!("\n{}", set);
920                assert_eq!(
921                    r#"
922CREATE DATABASE IF NOT EXISTS test
923WITH(
924  ttl = '1h'
925)"#,
926                    &new_sql
927                );
928            }
929            _ => {
930                unreachable!();
931            }
932        }
933    }
934
935    #[test]
936    fn test_display_create_table_like() {
937        let sql = r"create table t2 like t1;";
938        let stmts =
939            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
940                .unwrap();
941        assert_eq!(1, stmts.len());
942        assert_matches!(&stmts[0], Statement::CreateTableLike { .. });
943
944        match &stmts[0] {
945            Statement::CreateTableLike(create) => {
946                let new_sql = format!("\n{}", create);
947                assert_eq!(
948                    r#"
949CREATE TABLE t2 LIKE t1"#,
950                    &new_sql
951                );
952            }
953            _ => {
954                unreachable!();
955            }
956        }
957    }
958
959    #[test]
960    fn test_display_create_external_table() {
961        let sql = r#"CREATE EXTERNAL TABLE city (
962            host string,
963            ts timestamp,
964            cpu float64 default 0,
965            memory float64,
966            TIME INDEX (ts),
967            PRIMARY KEY(host)
968) WITH (location='/var/data/city.csv', format='csv');"#;
969        let stmts =
970            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
971                .unwrap();
972        assert_eq!(1, stmts.len());
973        assert_matches!(&stmts[0], Statement::CreateExternalTable { .. });
974
975        match &stmts[0] {
976            Statement::CreateExternalTable(create) => {
977                let new_sql = format!("\n{}", create);
978                assert_eq!(
979                    r#"
980CREATE EXTERNAL TABLE city (
981  host STRING,
982  ts TIMESTAMP,
983  cpu DOUBLE DEFAULT 0,
984  memory DOUBLE,
985  TIME INDEX (ts),
986  PRIMARY KEY (host)
987)
988ENGINE=file
989WITH(
990  format = 'csv',
991  location = '/var/data/city.csv'
992)"#,
993                    &new_sql
994                );
995            }
996            _ => {
997                unreachable!();
998            }
999        }
1000    }
1001
1002    #[test]
1003    fn test_display_create_flow() {
1004        let sql = r"CREATE FLOW filter_numbers
1005            SINK TO out_num_cnt
1006            AS SELECT number FROM numbers_input where number > 10;";
1007        let result =
1008            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1009                .unwrap();
1010        assert_eq!(1, result.len());
1011
1012        match &result[0] {
1013            Statement::CreateFlow(c) => {
1014                let new_sql = format!("\n{}", c);
1015                assert_eq!(
1016                    r#"
1017CREATE FLOW filter_numbers
1018SINK TO out_num_cnt
1019AS SELECT number FROM numbers_input where number > 10"#,
1020                    &new_sql
1021                );
1022
1023                let new_result = ParserContext::create_with_dialect(
1024                    &new_sql,
1025                    &GreptimeDbDialect {},
1026                    ParseOptions::default(),
1027                )
1028                .unwrap();
1029                assert_eq!(result, new_result);
1030            }
1031            _ => unreachable!(),
1032        }
1033    }
1034
1035    #[test]
1036    fn test_vector_index_options_validation() {
1037        use super::{ColumnExtensions, OptionMap};
1038
1039        // Test zero connectivity should fail
1040        let extensions = ColumnExtensions {
1041            fulltext_index_options: None,
1042            vector_options: None,
1043            skipping_index_options: None,
1044            inverted_index_options: None,
1045            json_datatype_options: None,
1046            vector_index_options: Some(OptionMap::from([(
1047                "connectivity".to_string(),
1048                "0".to_string(),
1049            )])),
1050        };
1051        let result = extensions.build_vector_index_options();
1052        assert!(result.is_err());
1053        assert!(
1054            result
1055                .unwrap_err()
1056                .to_string()
1057                .contains("connectivity must be in the range [2, 2048]")
1058        );
1059
1060        // Test zero expansion_add should fail
1061        let extensions = ColumnExtensions {
1062            fulltext_index_options: None,
1063            vector_options: None,
1064            skipping_index_options: None,
1065            inverted_index_options: None,
1066            json_datatype_options: None,
1067            vector_index_options: Some(OptionMap::from([(
1068                "expansion_add".to_string(),
1069                "0".to_string(),
1070            )])),
1071        };
1072        let result = extensions.build_vector_index_options();
1073        assert!(result.is_err());
1074        assert!(
1075            result
1076                .unwrap_err()
1077                .to_string()
1078                .contains("expansion_add must be greater than 0")
1079        );
1080
1081        // Test zero expansion_search should fail
1082        let extensions = ColumnExtensions {
1083            fulltext_index_options: None,
1084            vector_options: None,
1085            skipping_index_options: None,
1086            inverted_index_options: None,
1087            json_datatype_options: None,
1088            vector_index_options: Some(OptionMap::from([(
1089                "expansion_search".to_string(),
1090                "0".to_string(),
1091            )])),
1092        };
1093        let result = extensions.build_vector_index_options();
1094        assert!(result.is_err());
1095        assert!(
1096            result
1097                .unwrap_err()
1098                .to_string()
1099                .contains("expansion_search must be greater than 0")
1100        );
1101
1102        // Test valid values should succeed
1103        let extensions = ColumnExtensions {
1104            fulltext_index_options: None,
1105            vector_options: None,
1106            skipping_index_options: None,
1107            inverted_index_options: None,
1108            json_datatype_options: None,
1109            vector_index_options: Some(OptionMap::from([
1110                ("connectivity".to_string(), "32".to_string()),
1111                ("expansion_add".to_string(), "200".to_string()),
1112                ("expansion_search".to_string(), "100".to_string()),
1113            ])),
1114        };
1115        let result = extensions.build_vector_index_options();
1116        assert!(result.is_ok());
1117        let options = result.unwrap().unwrap();
1118        assert_eq!(options.connectivity, 32);
1119        assert_eq!(options.expansion_add, 200);
1120        assert_eq!(options.expansion_search, 100);
1121    }
1122}