Skip to main content

sql/statements/
create.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::{Display, Formatter};
17use std::sync::Arc;
18
19use common_catalog::consts::FILE_ENGINE;
20use datatypes::data_type::ConcreteDataType;
21use datatypes::json::JsonStructureSettings;
22use datatypes::schema::{
23    FulltextOptions, SkippingIndexOptions, VectorDistanceMetric, VectorIndexEngineType,
24    VectorIndexOptions,
25};
26use datatypes::types::StructType;
27use itertools::Itertools;
28use serde::Serialize;
29use snafu::{OptionExt, ResultExt};
30use sqlparser::ast::{ColumnOptionDef, DataType, Expr};
31use sqlparser_derive::{Visit, VisitMut};
32
33use crate::ast::{ColumnDef, Ident, ObjectName, Value as SqlValue};
34use crate::error::{
35    InvalidFlowQuerySnafu, InvalidJsonStructureSettingSnafu, InvalidSqlSnafu, Result,
36    SetFulltextOptionSnafu, SetSkippingIndexOptionSnafu,
37};
38use crate::statements::query::Query as GtQuery;
39use crate::statements::statement::Statement;
40use crate::statements::tql::Tql;
41use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
42use crate::util::OptionValue;
43
44const LINE_SEP: &str = ",\n";
45const COMMA_SEP: &str = ", ";
46const INDENT: usize = 2;
47pub const VECTOR_OPT_DIM: &str = "dim";
48
49pub const JSON_OPT_UNSTRUCTURED_KEYS: &str = "unstructured_keys";
50pub const JSON_OPT_FORMAT: &str = "format";
51pub(crate) const JSON_OPT_FIELDS: &str = "fields";
52pub const JSON_FORMAT_FULL_STRUCTURED: &str = "structured";
53pub const JSON_FORMAT_RAW: &str = "raw";
54pub const JSON_FORMAT_PARTIAL: &str = "partial";
55
56macro_rules! format_indent {
57    ($fmt: expr, $arg: expr) => {
58        format!($fmt, format_args!("{: >1$}", "", INDENT), $arg)
59    };
60    ($arg: expr) => {
61        format_indent!("{}{}", $arg)
62    };
63}
64
65macro_rules! format_list_indent {
66    ($list: expr) => {
67        $list.iter().map(|e| format_indent!(e)).join(LINE_SEP)
68    };
69}
70
71macro_rules! format_list_comma {
72    ($list: expr) => {
73        $list.iter().map(|e| format!("{}", e)).join(COMMA_SEP)
74    };
75}
76
77#[cfg(feature = "enterprise")]
78pub mod trigger;
79
80fn format_table_constraint(constraints: &[TableConstraint]) -> String {
81    constraints.iter().map(|c| format_indent!(c)).join(LINE_SEP)
82}
83
84/// Table constraint for create table statement.
85#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
86pub enum TableConstraint {
87    /// Primary key constraint.
88    PrimaryKey { columns: Vec<Ident> },
89    /// Time index constraint.
90    TimeIndex { column: Ident },
91}
92
93impl Display for TableConstraint {
94    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
95        match self {
96            TableConstraint::PrimaryKey { columns } => {
97                write!(f, "PRIMARY KEY ({})", format_list_comma!(columns))
98            }
99            TableConstraint::TimeIndex { column } => {
100                write!(f, "TIME INDEX ({})", column)
101            }
102        }
103    }
104}
105
106#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
107pub struct CreateTable {
108    /// Create if not exists
109    pub if_not_exists: bool,
110    pub table_id: u32,
111    /// Table name
112    pub name: ObjectName,
113    pub columns: Vec<Column>,
114    pub engine: String,
115    pub constraints: Vec<TableConstraint>,
116    /// Table options in `WITH`. All keys are lowercase.
117    pub options: OptionMap,
118    pub partitions: Option<Partitions>,
119}
120
121/// Column definition in `CREATE TABLE` statement.
122#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
123pub struct Column {
124    /// `ColumnDef` from `sqlparser::ast`
125    pub column_def: ColumnDef,
126    /// Column extensions for greptimedb dialect.
127    pub extensions: ColumnExtensions,
128}
129
130/// Column extensions for greptimedb dialect.
131#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Default, Serialize)]
132pub struct ColumnExtensions {
133    /// Vector type options.
134    pub vector_options: Option<OptionMap>,
135
136    /// Fulltext index options.
137    pub fulltext_index_options: Option<OptionMap>,
138    /// Skipping index options.
139    pub skipping_index_options: Option<OptionMap>,
140    /// Inverted index options.
141    ///
142    /// Inverted index doesn't have options at present. There won't be any options in that map.
143    pub inverted_index_options: Option<OptionMap>,
144    /// Vector index options for HNSW-based vector similarity search.
145    pub vector_index_options: Option<OptionMap>,
146    pub json_datatype_options: Option<OptionMap>,
147}
148
149impl Column {
150    pub fn name(&self) -> &Ident {
151        &self.column_def.name
152    }
153
154    pub fn data_type(&self) -> &DataType {
155        &self.column_def.data_type
156    }
157
158    pub fn mut_data_type(&mut self) -> &mut DataType {
159        &mut self.column_def.data_type
160    }
161
162    pub fn options(&self) -> &[ColumnOptionDef] {
163        &self.column_def.options
164    }
165
166    pub fn mut_options(&mut self) -> &mut Vec<ColumnOptionDef> {
167        &mut self.column_def.options
168    }
169}
170
171impl Display for Column {
172    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
173        if let Some(vector_options) = &self.extensions.vector_options
174            && let Some(dim) = vector_options.get(VECTOR_OPT_DIM)
175        {
176            write!(f, "{} VECTOR({})", self.column_def.name, dim)?;
177            return Ok(());
178        }
179
180        write!(f, "{} {}", self.column_def.name, self.column_def.data_type)?;
181        if let Some(options) = &self.extensions.json_datatype_options {
182            write!(
183                f,
184                "({})",
185                options
186                    .entries()
187                    .map(|(k, v)| format!("{k} = {v}"))
188                    .join(COMMA_SEP)
189            )?;
190        }
191        for option in &self.column_def.options {
192            write!(f, " {option}")?;
193        }
194
195        if let Some(fulltext_options) = &self.extensions.fulltext_index_options {
196            if !fulltext_options.is_empty() {
197                let options = fulltext_options.kv_pairs();
198                write!(f, " FULLTEXT INDEX WITH({})", format_list_comma!(options))?;
199            } else {
200                write!(f, " FULLTEXT INDEX")?;
201            }
202        }
203
204        if let Some(skipping_index_options) = &self.extensions.skipping_index_options {
205            if !skipping_index_options.is_empty() {
206                let options = skipping_index_options.kv_pairs();
207                write!(f, " SKIPPING INDEX WITH({})", format_list_comma!(options))?;
208            } else {
209                write!(f, " SKIPPING INDEX")?;
210            }
211        }
212
213        if let Some(inverted_index_options) = &self.extensions.inverted_index_options {
214            if !inverted_index_options.is_empty() {
215                let options = inverted_index_options.kv_pairs();
216                write!(f, " INVERTED INDEX WITH({})", format_list_comma!(options))?;
217            } else {
218                write!(f, " INVERTED INDEX")?;
219            }
220        }
221
222        if let Some(vector_index_options) = &self.extensions.vector_index_options {
223            if !vector_index_options.is_empty() {
224                let options = vector_index_options.kv_pairs();
225                write!(f, " VECTOR INDEX WITH({})", format_list_comma!(options))?;
226            } else {
227                write!(f, " VECTOR INDEX")?;
228            }
229        }
230        Ok(())
231    }
232}
233
234impl ColumnExtensions {
235    pub fn build_fulltext_options(&self) -> Result<Option<FulltextOptions>> {
236        let Some(options) = self.fulltext_index_options.as_ref() else {
237            return Ok(None);
238        };
239
240        let options: HashMap<String, String> = options.clone().into_map();
241        Ok(Some(options.try_into().context(SetFulltextOptionSnafu)?))
242    }
243
244    pub fn build_skipping_index_options(&self) -> Result<Option<SkippingIndexOptions>> {
245        let Some(options) = self.skipping_index_options.as_ref() else {
246            return Ok(None);
247        };
248
249        let options: HashMap<String, String> = options.clone().into_map();
250        Ok(Some(
251            options.try_into().context(SetSkippingIndexOptionSnafu)?,
252        ))
253    }
254
255    pub fn build_vector_index_options(&self) -> Result<Option<VectorIndexOptions>> {
256        let Some(options) = self.vector_index_options.as_ref() else {
257            return Ok(None);
258        };
259
260        let options_map: HashMap<String, String> = options.clone().into_map();
261        let mut result = VectorIndexOptions::default();
262
263        if let Some(s) = options_map.get("engine") {
264            result.engine = s.parse::<VectorIndexEngineType>().map_err(|e| {
265                InvalidSqlSnafu {
266                    msg: format!("invalid VECTOR INDEX engine: {e}"),
267                }
268                .build()
269            })?;
270        }
271
272        if let Some(s) = options_map.get("metric") {
273            result.metric = s.parse::<VectorDistanceMetric>().map_err(|e| {
274                InvalidSqlSnafu {
275                    msg: format!("invalid VECTOR INDEX metric: {e}"),
276                }
277                .build()
278            })?;
279        }
280
281        if let Some(s) = options_map.get("connectivity") {
282            let value = s.parse::<u32>().map_err(|_| {
283                InvalidSqlSnafu {
284                    msg: format!(
285                        "invalid VECTOR INDEX connectivity: {s}, expected positive integer"
286                    ),
287                }
288                .build()
289            })?;
290            if !(2..=2048).contains(&value) {
291                return InvalidSqlSnafu {
292                    msg: "VECTOR INDEX connectivity must be in the range [2, 2048].".to_string(),
293                }
294                .fail();
295            }
296            result.connectivity = value;
297        }
298
299        if let Some(s) = options_map.get("expansion_add") {
300            let value = s.parse::<u32>().map_err(|_| {
301                InvalidSqlSnafu {
302                    msg: format!(
303                        "invalid VECTOR INDEX expansion_add: {s}, expected positive integer"
304                    ),
305                }
306                .build()
307            })?;
308            if value == 0 {
309                return InvalidSqlSnafu {
310                    msg: "VECTOR INDEX expansion_add must be greater than 0".to_string(),
311                }
312                .fail();
313            }
314            result.expansion_add = value;
315        }
316
317        if let Some(s) = options_map.get("expansion_search") {
318            let value = s.parse::<u32>().map_err(|_| {
319                InvalidSqlSnafu {
320                    msg: format!(
321                        "invalid VECTOR INDEX expansion_search: {s}, expected positive integer"
322                    ),
323                }
324                .build()
325            })?;
326            if value == 0 {
327                return InvalidSqlSnafu {
328                    msg: "VECTOR INDEX expansion_search must be greater than 0".to_string(),
329                }
330                .fail();
331            }
332            result.expansion_search = value;
333        }
334
335        Ok(Some(result))
336    }
337
338    pub fn build_json_structure_settings(&self) -> Result<Option<JsonStructureSettings>> {
339        let Some(options) = self.json_datatype_options.as_ref() else {
340            return Ok(None);
341        };
342
343        let unstructured_keys = options
344            .value(JSON_OPT_UNSTRUCTURED_KEYS)
345            .and_then(|v| {
346                v.as_list().map(|x| {
347                    x.into_iter()
348                        .map(|x| x.to_string())
349                        .collect::<HashSet<String>>()
350                })
351            })
352            .unwrap_or_default();
353
354        let fields = if let Some(value) = options.value(JSON_OPT_FIELDS) {
355            let fields = value
356                .as_struct_fields()
357                .context(InvalidJsonStructureSettingSnafu {
358                    reason: format!(r#"expect "{JSON_OPT_FIELDS}" a struct, actual: "{value}""#,),
359                })?;
360            let fields = fields
361                .iter()
362                .map(|field| {
363                    let name = field.field_name.as_ref().map(|x| x.value.clone()).context(
364                        InvalidJsonStructureSettingSnafu {
365                            reason: format!(r#"missing field name in "{field}""#),
366                        },
367                    )?;
368                    let datatype = sql_data_type_to_concrete_data_type(
369                        &field.field_type,
370                        &Default::default(),
371                    )?;
372                    Ok(datatypes::types::StructField::new(name, datatype, true))
373                })
374                .collect::<Result<_>>()?;
375            Some(StructType::new(Arc::new(fields)))
376        } else {
377            None
378        };
379
380        let format = options
381            .get(JSON_OPT_FORMAT)
382            .unwrap_or(JSON_FORMAT_FULL_STRUCTURED);
383        let settings = match format {
384            JSON_FORMAT_FULL_STRUCTURED => JsonStructureSettings::Structured(fields),
385            JSON_FORMAT_PARTIAL => {
386                let fields = fields.map(|fields| {
387                    let mut fields = Arc::unwrap_or_clone(fields.fields());
388                    fields.push(datatypes::types::StructField::new(
389                        JsonStructureSettings::RAW_FIELD.to_string(),
390                        ConcreteDataType::string_datatype(),
391                        true,
392                    ));
393                    StructType::new(Arc::new(fields))
394                });
395                JsonStructureSettings::PartialUnstructuredByKey {
396                    fields,
397                    unstructured_keys,
398                }
399            }
400            JSON_FORMAT_RAW => JsonStructureSettings::UnstructuredRaw,
401            _ => {
402                return InvalidSqlSnafu {
403                    msg: format!("unknown JSON datatype 'format': {format}"),
404                }
405                .fail();
406            }
407        };
408        Ok(Some(settings))
409    }
410
411    pub fn set_json_structure_settings(&mut self, settings: JsonStructureSettings) {
412        let mut map = OptionMap::default();
413
414        let format = match settings {
415            JsonStructureSettings::Structured(_) => JSON_FORMAT_FULL_STRUCTURED,
416            JsonStructureSettings::PartialUnstructuredByKey { .. } => JSON_FORMAT_PARTIAL,
417            JsonStructureSettings::UnstructuredRaw => JSON_FORMAT_RAW,
418        };
419        map.insert(JSON_OPT_FORMAT.to_string(), format.to_string());
420
421        if let JsonStructureSettings::PartialUnstructuredByKey {
422            fields: _,
423            unstructured_keys,
424        } = settings
425        {
426            let value = OptionValue::from(
427                unstructured_keys
428                    .iter()
429                    .map(|x| x.as_str())
430                    .sorted()
431                    .collect::<Vec<_>>(),
432            );
433            map.insert_options(JSON_OPT_UNSTRUCTURED_KEYS, value);
434        }
435
436        self.json_datatype_options = Some(map);
437    }
438}
439
440/// Partition on columns or values.
441///
442/// - `column_list` is the list of columns in `PARTITION ON COLUMNS` clause.
443/// - `exprs` is the list of expressions in `PARTITION ON VALUES` clause, like
444///   `host <= 'host1'`, `host > 'host1' and host <= 'host2'` or `host > 'host2'`.
445///   Each expression stands for a partition.
446#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
447pub struct Partitions {
448    pub column_list: Vec<Ident>,
449    pub exprs: Vec<Expr>,
450}
451
452impl Partitions {
453    /// set quotes to all [Ident]s from column list
454    pub fn set_quote(&mut self, quote_style: char) {
455        self.column_list
456            .iter_mut()
457            .for_each(|c| c.quote_style = Some(quote_style));
458    }
459}
460
461#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut)]
462pub struct PartitionEntry {
463    pub name: Ident,
464    pub value_list: Vec<SqlValue>,
465}
466
467impl Display for PartitionEntry {
468    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
469        write!(
470            f,
471            "PARTITION {} VALUES LESS THAN ({})",
472            self.name,
473            format_list_comma!(self.value_list),
474        )
475    }
476}
477
478impl Display for Partitions {
479    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
480        if !self.column_list.is_empty() {
481            write!(
482                f,
483                "PARTITION ON COLUMNS ({}) (\n{}\n)",
484                format_list_comma!(self.column_list),
485                format_list_indent!(self.exprs),
486            )?;
487        }
488        Ok(())
489    }
490}
491
492impl Display for CreateTable {
493    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
494        write!(f, "CREATE ")?;
495        if self.engine == FILE_ENGINE {
496            write!(f, "EXTERNAL ")?;
497        }
498        write!(f, "TABLE ")?;
499        if self.if_not_exists {
500            write!(f, "IF NOT EXISTS ")?;
501        }
502        writeln!(f, "{} (", &self.name)?;
503        writeln!(f, "{},", format_list_indent!(self.columns))?;
504        writeln!(f, "{}", format_table_constraint(&self.constraints))?;
505        writeln!(f, ")")?;
506        if let Some(partitions) = &self.partitions {
507            writeln!(f, "{partitions}")?;
508        }
509        writeln!(f, "ENGINE={}", &self.engine)?;
510        if !self.options.is_empty() {
511            let options = self.options.kv_pairs();
512            write!(f, "WITH(\n{}\n)", format_list_indent!(options))?;
513        }
514        Ok(())
515    }
516}
517
518#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
519pub struct CreateDatabase {
520    pub name: ObjectName,
521    /// Create if not exists
522    pub if_not_exists: bool,
523    pub options: OptionMap,
524}
525
526impl CreateDatabase {
527    /// Creates a statement for `CREATE DATABASE`
528    pub fn new(name: ObjectName, if_not_exists: bool, options: OptionMap) -> Self {
529        Self {
530            name,
531            if_not_exists,
532            options,
533        }
534    }
535}
536
537impl Display for CreateDatabase {
538    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
539        write!(f, "CREATE DATABASE ")?;
540        if self.if_not_exists {
541            write!(f, "IF NOT EXISTS ")?;
542        }
543        write!(f, "{}", &self.name)?;
544        if !self.options.is_empty() {
545            let options = self.options.kv_pairs();
546            write!(f, "\nWITH(\n{}\n)", format_list_indent!(options))?;
547        }
548        Ok(())
549    }
550}
551
552#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
553pub struct CreateExternalTable {
554    /// Table name
555    pub name: ObjectName,
556    pub columns: Vec<Column>,
557    pub constraints: Vec<TableConstraint>,
558    /// Table options in `WITH`. All keys are lowercase.
559    pub options: OptionMap,
560    pub if_not_exists: bool,
561    pub engine: String,
562}
563
564impl Display for CreateExternalTable {
565    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
566        write!(f, "CREATE EXTERNAL TABLE ")?;
567        if self.if_not_exists {
568            write!(f, "IF NOT EXISTS ")?;
569        }
570        writeln!(f, "{} (", &self.name)?;
571        writeln!(f, "{},", format_list_indent!(self.columns))?;
572        writeln!(f, "{}", format_table_constraint(&self.constraints))?;
573        writeln!(f, ")")?;
574        writeln!(f, "ENGINE={}", &self.engine)?;
575        if !self.options.is_empty() {
576            let options = self.options.kv_pairs();
577            write!(f, "WITH(\n{}\n)", format_list_indent!(options))?;
578        }
579        Ok(())
580    }
581}
582
583#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
584pub struct CreateTableLike {
585    /// Table name
586    pub table_name: ObjectName,
587    /// The table that is designated to be imitated by `Like`
588    pub source_name: ObjectName,
589}
590
591impl Display for CreateTableLike {
592    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
593        let table_name = &self.table_name;
594        let source_name = &self.source_name;
595        write!(f, r#"CREATE TABLE {table_name} LIKE {source_name}"#)
596    }
597}
598
599#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
600pub struct CreateFlow {
601    /// Flow name
602    pub flow_name: ObjectName,
603    /// Output (sink) table name
604    pub sink_table_name: ObjectName,
605    /// Whether to replace existing task
606    pub or_replace: bool,
607    /// Create if not exist
608    pub if_not_exists: bool,
609    /// `EXPIRE AFTER`
610    /// Duration in second as `i64`
611    pub expire_after: Option<i64>,
612    /// Duration for flow evaluation interval
613    /// Duration in seconds as `i64`
614    /// If not set, flow will be evaluated based on time window size and other args.
615    pub eval_interval: Option<i64>,
616    /// Comment string
617    pub comment: Option<String>,
618    /// Flow creation options from `WITH (...)`
619    pub flow_options: OptionMap,
620    /// SQL statement
621    pub query: Box<SqlOrTql>,
622}
623
624/// Either a sql query or a tql query
625#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
626pub enum SqlOrTql {
627    Sql(GtQuery, String),
628    Tql(Tql, String),
629}
630
631impl std::fmt::Display for SqlOrTql {
632    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
633        match self {
634            Self::Sql(_, s) => write!(f, "{}", s),
635            Self::Tql(_, s) => write!(f, "{}", s),
636        }
637    }
638}
639
640impl SqlOrTql {
641    pub fn try_from_statement(
642        value: Statement,
643        original_query: &str,
644    ) -> std::result::Result<Self, crate::error::Error> {
645        match value {
646            Statement::Query(query) => Ok(Self::Sql(*query, original_query.to_string())),
647            Statement::Tql(tql) => Ok(Self::Tql(tql, original_query.to_string())),
648            _ => InvalidFlowQuerySnafu {
649                reason: format!("Expect either sql query or promql query, found {:?}", value),
650            }
651            .fail(),
652        }
653    }
654}
655
656impl Display for CreateFlow {
657    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
658        write!(f, "CREATE ")?;
659        if self.or_replace {
660            write!(f, "OR REPLACE ")?;
661        }
662        write!(f, "FLOW ")?;
663        if self.if_not_exists {
664            write!(f, "IF NOT EXISTS ")?;
665        }
666        writeln!(f, "{}", &self.flow_name)?;
667        writeln!(f, "SINK TO {}", &self.sink_table_name)?;
668        if let Some(expire_after) = &self.expire_after {
669            writeln!(f, "EXPIRE AFTER '{} s'", expire_after)?;
670        }
671        if let Some(eval_interval) = &self.eval_interval {
672            writeln!(f, "EVAL INTERVAL '{} s'", eval_interval)?;
673        }
674        if let Some(comment) = &self.comment {
675            writeln!(f, "COMMENT '{}'", comment)?;
676        }
677        if !self.flow_options.is_empty() {
678            let options = self.flow_options.kv_pairs();
679            writeln!(f, "WITH ({})", format_list_comma!(options))?;
680        }
681        write!(f, "AS {}", &self.query)
682    }
683}
684
685/// Create SQL view statement.
686#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
687pub struct CreateView {
688    /// View name
689    pub name: ObjectName,
690    /// An optional list of names to be used for columns of the view
691    pub columns: Vec<Ident>,
692    /// The clause after `As` that defines the VIEW.
693    /// Can only be either [Statement::Query] or [Statement::Tql].
694    pub query: Box<Statement>,
695    /// Whether to replace existing VIEW
696    pub or_replace: bool,
697    /// Create VIEW only when it doesn't exists
698    pub if_not_exists: bool,
699}
700
701impl Display for CreateView {
702    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
703        write!(f, "CREATE ")?;
704        if self.or_replace {
705            write!(f, "OR REPLACE ")?;
706        }
707        write!(f, "VIEW ")?;
708        if self.if_not_exists {
709            write!(f, "IF NOT EXISTS ")?;
710        }
711        write!(f, "{} ", &self.name)?;
712        if !self.columns.is_empty() {
713            write!(f, "({}) ", format_list_comma!(self.columns))?;
714        }
715        write!(f, "AS {}", &self.query)
716    }
717}
718
719#[cfg(test)]
720mod tests {
721    use std::assert_matches;
722
723    use crate::dialect::GreptimeDbDialect;
724    use crate::error::Error;
725    use crate::parser::{ParseOptions, ParserContext};
726    use crate::statements::statement::Statement;
727
728    #[test]
729    fn test_display_create_table() {
730        let sql = r"create table if not exists demo(
731                             host string,
732                             ts timestamp,
733                             cpu double default 0,
734                             memory double,
735                             TIME INDEX (ts),
736                             PRIMARY KEY(host)
737                       )
738                       PARTITION ON COLUMNS (host) (
739                            host = 'a',
740                            host > 'a',
741                       )
742                       engine=mito
743                       with(ttl='7d', storage='File');
744         ";
745        let result =
746            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
747                .unwrap();
748        assert_eq!(1, result.len());
749
750        match &result[0] {
751            Statement::CreateTable(c) => {
752                let new_sql = format!("\n{}", c);
753                assert_eq!(
754                    r#"
755CREATE TABLE IF NOT EXISTS demo (
756  host STRING,
757  ts TIMESTAMP,
758  cpu DOUBLE DEFAULT 0,
759  memory DOUBLE,
760  TIME INDEX (ts),
761  PRIMARY KEY (host)
762)
763PARTITION ON COLUMNS (host) (
764  host = 'a',
765  host > 'a'
766)
767ENGINE=mito
768WITH(
769  storage = 'File',
770  ttl = '7d'
771)"#,
772                    &new_sql
773                );
774
775                let new_result = ParserContext::create_with_dialect(
776                    &new_sql,
777                    &GreptimeDbDialect {},
778                    ParseOptions::default(),
779                )
780                .unwrap();
781                assert_eq!(result, new_result);
782            }
783            _ => unreachable!(),
784        }
785    }
786
787    #[test]
788    fn test_display_empty_partition_column() {
789        let sql = r"create table if not exists demo(
790            host string,
791            ts timestamp,
792            cpu double default 0,
793            memory double,
794            TIME INDEX (ts),
795            PRIMARY KEY(ts, host)
796            );
797        ";
798        let result =
799            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
800                .unwrap();
801        assert_eq!(1, result.len());
802
803        match &result[0] {
804            Statement::CreateTable(c) => {
805                let new_sql = format!("\n{}", c);
806                assert_eq!(
807                    r#"
808CREATE TABLE IF NOT EXISTS demo (
809  host STRING,
810  ts TIMESTAMP,
811  cpu DOUBLE DEFAULT 0,
812  memory DOUBLE,
813  TIME INDEX (ts),
814  PRIMARY KEY (ts, host)
815)
816ENGINE=mito
817"#,
818                    &new_sql
819                );
820
821                let new_result = ParserContext::create_with_dialect(
822                    &new_sql,
823                    &GreptimeDbDialect {},
824                    ParseOptions::default(),
825                )
826                .unwrap();
827                assert_eq!(result, new_result);
828            }
829            _ => unreachable!(),
830        }
831    }
832
833    #[test]
834    fn test_validate_table_options() {
835        let sql = r"create table if not exists demo(
836            host string,
837            ts timestamp,
838            cpu double default 0,
839            memory double,
840            TIME INDEX (ts),
841            PRIMARY KEY(host)
842      )
843      PARTITION ON COLUMNS (host) ()
844      engine=mito
845      with(ttl='7d', 'compaction.type'='world');
846";
847        let result =
848            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
849                .unwrap();
850        match &result[0] {
851            Statement::CreateTable(c) => {
852                assert_eq!(2, c.options.len());
853            }
854            _ => unreachable!(),
855        }
856
857        let sql = r"create table if not exists demo(
858            host string,
859            ts timestamp,
860            cpu double default 0,
861            memory double,
862            TIME INDEX (ts),
863            PRIMARY KEY(host)
864      )
865      PARTITION ON COLUMNS (host) ()
866      engine=mito
867      with(ttl='7d', hello='world');
868";
869        let result =
870            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
871        assert_matches!(result, Err(Error::InvalidTableOption { .. }))
872    }
873
874    #[test]
875    fn test_display_create_database() {
876        let sql = r"create database test;";
877        let stmts =
878            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
879                .unwrap();
880        assert_eq!(1, stmts.len());
881        assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
882
883        match &stmts[0] {
884            Statement::CreateDatabase(set) => {
885                let new_sql = format!("\n{}", set);
886                assert_eq!(
887                    r#"
888CREATE DATABASE test"#,
889                    &new_sql
890                );
891            }
892            _ => {
893                unreachable!();
894            }
895        }
896
897        let sql = r"create database if not exists test;";
898        let stmts =
899            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
900                .unwrap();
901        assert_eq!(1, stmts.len());
902        assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
903
904        match &stmts[0] {
905            Statement::CreateDatabase(set) => {
906                let new_sql = format!("\n{}", set);
907                assert_eq!(
908                    r#"
909CREATE DATABASE IF NOT EXISTS test"#,
910                    &new_sql
911                );
912            }
913            _ => {
914                unreachable!();
915            }
916        }
917
918        let sql = r#"CREATE DATABASE IF NOT EXISTS test WITH (ttl='1h');"#;
919        let stmts =
920            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
921                .unwrap();
922        assert_eq!(1, stmts.len());
923        assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
924
925        match &stmts[0] {
926            Statement::CreateDatabase(set) => {
927                let new_sql = format!("\n{}", set);
928                assert_eq!(
929                    r#"
930CREATE DATABASE IF NOT EXISTS test
931WITH(
932  ttl = '1h'
933)"#,
934                    &new_sql
935                );
936            }
937            _ => {
938                unreachable!();
939            }
940        }
941    }
942
943    #[test]
944    fn test_display_create_table_like() {
945        let sql = r"create table t2 like t1;";
946        let stmts =
947            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
948                .unwrap();
949        assert_eq!(1, stmts.len());
950        assert_matches!(&stmts[0], Statement::CreateTableLike { .. });
951
952        match &stmts[0] {
953            Statement::CreateTableLike(create) => {
954                let new_sql = format!("\n{}", create);
955                assert_eq!(
956                    r#"
957CREATE TABLE t2 LIKE t1"#,
958                    &new_sql
959                );
960            }
961            _ => {
962                unreachable!();
963            }
964        }
965    }
966
967    #[test]
968    fn test_display_create_external_table() {
969        let sql = r#"CREATE EXTERNAL TABLE city (
970            host string,
971            ts timestamp,
972            cpu float64 default 0,
973            memory float64,
974            TIME INDEX (ts),
975            PRIMARY KEY(host)
976) WITH (location='/var/data/city.csv', format='csv');"#;
977        let stmts =
978            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
979                .unwrap();
980        assert_eq!(1, stmts.len());
981        assert_matches!(&stmts[0], Statement::CreateExternalTable { .. });
982
983        match &stmts[0] {
984            Statement::CreateExternalTable(create) => {
985                let new_sql = format!("\n{}", create);
986                assert_eq!(
987                    r#"
988CREATE EXTERNAL TABLE city (
989  host STRING,
990  ts TIMESTAMP,
991  cpu DOUBLE DEFAULT 0,
992  memory DOUBLE,
993  TIME INDEX (ts),
994  PRIMARY KEY (host)
995)
996ENGINE=file
997WITH(
998  format = 'csv',
999  location = '/var/data/city.csv'
1000)"#,
1001                    &new_sql
1002                );
1003            }
1004            _ => {
1005                unreachable!();
1006            }
1007        }
1008    }
1009
1010    #[test]
1011    fn test_display_create_flow() {
1012        let sql = r"CREATE FLOW filter_numbers
1013            SINK TO out_num_cnt
1014            AS SELECT number FROM numbers_input where number > 10;";
1015        let result =
1016            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1017                .unwrap();
1018        assert_eq!(1, result.len());
1019
1020        match &result[0] {
1021            Statement::CreateFlow(c) => {
1022                let new_sql = format!("\n{}", c);
1023                assert_eq!(
1024                    r#"
1025CREATE FLOW filter_numbers
1026SINK TO out_num_cnt
1027AS SELECT number FROM numbers_input where number > 10"#,
1028                    &new_sql
1029                );
1030
1031                let new_result = ParserContext::create_with_dialect(
1032                    &new_sql,
1033                    &GreptimeDbDialect {},
1034                    ParseOptions::default(),
1035                )
1036                .unwrap();
1037                assert_eq!(result, new_result);
1038            }
1039            _ => unreachable!(),
1040        }
1041    }
1042
1043    #[test]
1044    fn test_vector_index_options_validation() {
1045        use super::{ColumnExtensions, OptionMap};
1046
1047        // Test zero connectivity should fail
1048        let extensions = ColumnExtensions {
1049            fulltext_index_options: None,
1050            vector_options: None,
1051            skipping_index_options: None,
1052            inverted_index_options: None,
1053            json_datatype_options: None,
1054            vector_index_options: Some(OptionMap::from([(
1055                "connectivity".to_string(),
1056                "0".to_string(),
1057            )])),
1058        };
1059        let result = extensions.build_vector_index_options();
1060        assert!(result.is_err());
1061        assert!(
1062            result
1063                .unwrap_err()
1064                .to_string()
1065                .contains("connectivity must be in the range [2, 2048]")
1066        );
1067
1068        // Test zero expansion_add should fail
1069        let extensions = ColumnExtensions {
1070            fulltext_index_options: None,
1071            vector_options: None,
1072            skipping_index_options: None,
1073            inverted_index_options: None,
1074            json_datatype_options: None,
1075            vector_index_options: Some(OptionMap::from([(
1076                "expansion_add".to_string(),
1077                "0".to_string(),
1078            )])),
1079        };
1080        let result = extensions.build_vector_index_options();
1081        assert!(result.is_err());
1082        assert!(
1083            result
1084                .unwrap_err()
1085                .to_string()
1086                .contains("expansion_add must be greater than 0")
1087        );
1088
1089        // Test zero expansion_search should fail
1090        let extensions = ColumnExtensions {
1091            fulltext_index_options: None,
1092            vector_options: None,
1093            skipping_index_options: None,
1094            inverted_index_options: None,
1095            json_datatype_options: None,
1096            vector_index_options: Some(OptionMap::from([(
1097                "expansion_search".to_string(),
1098                "0".to_string(),
1099            )])),
1100        };
1101        let result = extensions.build_vector_index_options();
1102        assert!(result.is_err());
1103        assert!(
1104            result
1105                .unwrap_err()
1106                .to_string()
1107                .contains("expansion_search must be greater than 0")
1108        );
1109
1110        // Test valid values should succeed
1111        let extensions = ColumnExtensions {
1112            fulltext_index_options: None,
1113            vector_options: None,
1114            skipping_index_options: None,
1115            inverted_index_options: None,
1116            json_datatype_options: None,
1117            vector_index_options: Some(OptionMap::from([
1118                ("connectivity".to_string(), "32".to_string()),
1119                ("expansion_add".to_string(), "200".to_string()),
1120                ("expansion_search".to_string(), "100".to_string()),
1121            ])),
1122        };
1123        let result = extensions.build_vector_index_options();
1124        assert!(result.is_ok());
1125        let options = result.unwrap().unwrap();
1126        assert_eq!(options.connectivity, 32);
1127        assert_eq!(options.expansion_add, 200);
1128        assert_eq!(options.expansion_search, 100);
1129    }
1130}