1use std::collections::{HashMap, HashSet};
16use std::fmt::{Display, Formatter};
17use std::sync::Arc;
18
19use common_catalog::consts::FILE_ENGINE;
20use datatypes::data_type::ConcreteDataType;
21use datatypes::json::JsonStructureSettings;
22use datatypes::schema::{
23 FulltextOptions, SkippingIndexOptions, VectorDistanceMetric, VectorIndexEngineType,
24 VectorIndexOptions,
25};
26use datatypes::types::StructType;
27use itertools::Itertools;
28use serde::Serialize;
29use snafu::{OptionExt, ResultExt};
30use sqlparser::ast::{ColumnOptionDef, DataType, Expr, Query};
31use sqlparser_derive::{Visit, VisitMut};
32
33use crate::ast::{ColumnDef, Ident, ObjectName, Value as SqlValue};
34use crate::error::{
35 InvalidFlowQuerySnafu, InvalidJsonStructureSettingSnafu, InvalidSqlSnafu, Result,
36 SetFulltextOptionSnafu, SetSkippingIndexOptionSnafu,
37};
38use crate::statements::statement::Statement;
39use crate::statements::tql::Tql;
40use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
41use crate::util::OptionValue;
42
43const LINE_SEP: &str = ",\n";
44const COMMA_SEP: &str = ", ";
45const INDENT: usize = 2;
46pub const VECTOR_OPT_DIM: &str = "dim";
47
48pub const JSON_OPT_UNSTRUCTURED_KEYS: &str = "unstructured_keys";
49pub const JSON_OPT_FORMAT: &str = "format";
50pub(crate) const JSON_OPT_FIELDS: &str = "fields";
51pub const JSON_FORMAT_FULL_STRUCTURED: &str = "structured";
52pub const JSON_FORMAT_RAW: &str = "raw";
53pub const JSON_FORMAT_PARTIAL: &str = "partial";
54
55macro_rules! format_indent {
56 ($fmt: expr, $arg: expr) => {
57 format!($fmt, format_args!("{: >1$}", "", INDENT), $arg)
58 };
59 ($arg: expr) => {
60 format_indent!("{}{}", $arg)
61 };
62}
63
64macro_rules! format_list_indent {
65 ($list: expr) => {
66 $list.iter().map(|e| format_indent!(e)).join(LINE_SEP)
67 };
68}
69
70macro_rules! format_list_comma {
71 ($list: expr) => {
72 $list.iter().map(|e| format!("{}", e)).join(COMMA_SEP)
73 };
74}
75
76#[cfg(feature = "enterprise")]
77pub mod trigger;
78
79fn format_table_constraint(constraints: &[TableConstraint]) -> String {
80 constraints.iter().map(|c| format_indent!(c)).join(LINE_SEP)
81}
82
83#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
85pub enum TableConstraint {
86 PrimaryKey { columns: Vec<Ident> },
88 TimeIndex { column: Ident },
90}
91
92impl Display for TableConstraint {
93 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
94 match self {
95 TableConstraint::PrimaryKey { columns } => {
96 write!(f, "PRIMARY KEY ({})", format_list_comma!(columns))
97 }
98 TableConstraint::TimeIndex { column } => {
99 write!(f, "TIME INDEX ({})", column)
100 }
101 }
102 }
103}
104
105#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
106pub struct CreateTable {
107 pub if_not_exists: bool,
109 pub table_id: u32,
110 pub name: ObjectName,
112 pub columns: Vec<Column>,
113 pub engine: String,
114 pub constraints: Vec<TableConstraint>,
115 pub options: OptionMap,
117 pub partitions: Option<Partitions>,
118}
119
120#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
122pub struct Column {
123 pub column_def: ColumnDef,
125 pub extensions: ColumnExtensions,
127}
128
129#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Default, Serialize)]
131pub struct ColumnExtensions {
132 pub vector_options: Option<OptionMap>,
134
135 pub fulltext_index_options: Option<OptionMap>,
137 pub skipping_index_options: Option<OptionMap>,
139 pub inverted_index_options: Option<OptionMap>,
143 pub vector_index_options: Option<OptionMap>,
145 pub json_datatype_options: Option<OptionMap>,
146}
147
148impl Column {
149 pub fn name(&self) -> &Ident {
150 &self.column_def.name
151 }
152
153 pub fn data_type(&self) -> &DataType {
154 &self.column_def.data_type
155 }
156
157 pub fn mut_data_type(&mut self) -> &mut DataType {
158 &mut self.column_def.data_type
159 }
160
161 pub fn options(&self) -> &[ColumnOptionDef] {
162 &self.column_def.options
163 }
164
165 pub fn mut_options(&mut self) -> &mut Vec<ColumnOptionDef> {
166 &mut self.column_def.options
167 }
168}
169
170impl Display for Column {
171 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
172 if let Some(vector_options) = &self.extensions.vector_options
173 && let Some(dim) = vector_options.get(VECTOR_OPT_DIM)
174 {
175 write!(f, "{} VECTOR({})", self.column_def.name, dim)?;
176 return Ok(());
177 }
178
179 write!(f, "{} {}", self.column_def.name, self.column_def.data_type)?;
180 if let Some(options) = &self.extensions.json_datatype_options {
181 write!(
182 f,
183 "({})",
184 options
185 .entries()
186 .map(|(k, v)| format!("{k} = {v}"))
187 .join(COMMA_SEP)
188 )?;
189 }
190 for option in &self.column_def.options {
191 write!(f, " {option}")?;
192 }
193
194 if let Some(fulltext_options) = &self.extensions.fulltext_index_options {
195 if !fulltext_options.is_empty() {
196 let options = fulltext_options.kv_pairs();
197 write!(f, " FULLTEXT INDEX WITH({})", format_list_comma!(options))?;
198 } else {
199 write!(f, " FULLTEXT INDEX")?;
200 }
201 }
202
203 if let Some(skipping_index_options) = &self.extensions.skipping_index_options {
204 if !skipping_index_options.is_empty() {
205 let options = skipping_index_options.kv_pairs();
206 write!(f, " SKIPPING INDEX WITH({})", format_list_comma!(options))?;
207 } else {
208 write!(f, " SKIPPING INDEX")?;
209 }
210 }
211
212 if let Some(inverted_index_options) = &self.extensions.inverted_index_options {
213 if !inverted_index_options.is_empty() {
214 let options = inverted_index_options.kv_pairs();
215 write!(f, " INVERTED INDEX WITH({})", format_list_comma!(options))?;
216 } else {
217 write!(f, " INVERTED INDEX")?;
218 }
219 }
220
221 if let Some(vector_index_options) = &self.extensions.vector_index_options {
222 if !vector_index_options.is_empty() {
223 let options = vector_index_options.kv_pairs();
224 write!(f, " VECTOR INDEX WITH({})", format_list_comma!(options))?;
225 } else {
226 write!(f, " VECTOR INDEX")?;
227 }
228 }
229 Ok(())
230 }
231}
232
233impl ColumnExtensions {
234 pub fn build_fulltext_options(&self) -> Result<Option<FulltextOptions>> {
235 let Some(options) = self.fulltext_index_options.as_ref() else {
236 return Ok(None);
237 };
238
239 let options: HashMap<String, String> = options.clone().into_map();
240 Ok(Some(options.try_into().context(SetFulltextOptionSnafu)?))
241 }
242
243 pub fn build_skipping_index_options(&self) -> Result<Option<SkippingIndexOptions>> {
244 let Some(options) = self.skipping_index_options.as_ref() else {
245 return Ok(None);
246 };
247
248 let options: HashMap<String, String> = options.clone().into_map();
249 Ok(Some(
250 options.try_into().context(SetSkippingIndexOptionSnafu)?,
251 ))
252 }
253
254 pub fn build_vector_index_options(&self) -> Result<Option<VectorIndexOptions>> {
255 let Some(options) = self.vector_index_options.as_ref() else {
256 return Ok(None);
257 };
258
259 let options_map: HashMap<String, String> = options.clone().into_map();
260 let mut result = VectorIndexOptions::default();
261
262 if let Some(s) = options_map.get("engine") {
263 result.engine = s.parse::<VectorIndexEngineType>().map_err(|e| {
264 InvalidSqlSnafu {
265 msg: format!("invalid VECTOR INDEX engine: {e}"),
266 }
267 .build()
268 })?;
269 }
270
271 if let Some(s) = options_map.get("metric") {
272 result.metric = s.parse::<VectorDistanceMetric>().map_err(|e| {
273 InvalidSqlSnafu {
274 msg: format!("invalid VECTOR INDEX metric: {e}"),
275 }
276 .build()
277 })?;
278 }
279
280 if let Some(s) = options_map.get("connectivity") {
281 let value = s.parse::<u32>().map_err(|_| {
282 InvalidSqlSnafu {
283 msg: format!(
284 "invalid VECTOR INDEX connectivity: {s}, expected positive integer"
285 ),
286 }
287 .build()
288 })?;
289 if !(2..=2048).contains(&value) {
290 return InvalidSqlSnafu {
291 msg: "VECTOR INDEX connectivity must be in the range [2, 2048].".to_string(),
292 }
293 .fail();
294 }
295 result.connectivity = value;
296 }
297
298 if let Some(s) = options_map.get("expansion_add") {
299 let value = s.parse::<u32>().map_err(|_| {
300 InvalidSqlSnafu {
301 msg: format!(
302 "invalid VECTOR INDEX expansion_add: {s}, expected positive integer"
303 ),
304 }
305 .build()
306 })?;
307 if value == 0 {
308 return InvalidSqlSnafu {
309 msg: "VECTOR INDEX expansion_add must be greater than 0".to_string(),
310 }
311 .fail();
312 }
313 result.expansion_add = value;
314 }
315
316 if let Some(s) = options_map.get("expansion_search") {
317 let value = s.parse::<u32>().map_err(|_| {
318 InvalidSqlSnafu {
319 msg: format!(
320 "invalid VECTOR INDEX expansion_search: {s}, expected positive integer"
321 ),
322 }
323 .build()
324 })?;
325 if value == 0 {
326 return InvalidSqlSnafu {
327 msg: "VECTOR INDEX expansion_search must be greater than 0".to_string(),
328 }
329 .fail();
330 }
331 result.expansion_search = value;
332 }
333
334 Ok(Some(result))
335 }
336
337 pub fn build_json_structure_settings(&self) -> Result<Option<JsonStructureSettings>> {
338 let Some(options) = self.json_datatype_options.as_ref() else {
339 return Ok(None);
340 };
341
342 let unstructured_keys = options
343 .value(JSON_OPT_UNSTRUCTURED_KEYS)
344 .and_then(|v| {
345 v.as_list().map(|x| {
346 x.into_iter()
347 .map(|x| x.to_string())
348 .collect::<HashSet<String>>()
349 })
350 })
351 .unwrap_or_default();
352
353 let fields = if let Some(value) = options.value(JSON_OPT_FIELDS) {
354 let fields = value
355 .as_struct_fields()
356 .context(InvalidJsonStructureSettingSnafu {
357 reason: format!(r#"expect "{JSON_OPT_FIELDS}" a struct, actual: "{value}""#,),
358 })?;
359 let fields = fields
360 .iter()
361 .map(|field| {
362 let name = field.field_name.as_ref().map(|x| x.value.clone()).context(
363 InvalidJsonStructureSettingSnafu {
364 reason: format!(r#"missing field name in "{field}""#),
365 },
366 )?;
367 let datatype = sql_data_type_to_concrete_data_type(
368 &field.field_type,
369 &Default::default(),
370 )?;
371 Ok(datatypes::types::StructField::new(name, datatype, true))
372 })
373 .collect::<Result<_>>()?;
374 Some(StructType::new(Arc::new(fields)))
375 } else {
376 None
377 };
378
379 options
380 .get(JSON_OPT_FORMAT)
381 .map(|format| match format {
382 JSON_FORMAT_FULL_STRUCTURED => Ok(JsonStructureSettings::Structured(fields)),
383 JSON_FORMAT_PARTIAL => {
384 let fields = fields.map(|fields| {
385 let mut fields = Arc::unwrap_or_clone(fields.fields());
386 fields.push(datatypes::types::StructField::new(
387 JsonStructureSettings::RAW_FIELD.to_string(),
388 ConcreteDataType::string_datatype(),
389 true,
390 ));
391 StructType::new(Arc::new(fields))
392 });
393 Ok(JsonStructureSettings::PartialUnstructuredByKey {
394 fields,
395 unstructured_keys,
396 })
397 }
398 JSON_FORMAT_RAW => Ok(JsonStructureSettings::UnstructuredRaw),
399 _ => InvalidSqlSnafu {
400 msg: format!("unknown JSON datatype 'format': {format}"),
401 }
402 .fail(),
403 })
404 .transpose()
405 }
406
407 pub fn set_json_structure_settings(&mut self, settings: JsonStructureSettings) {
408 let mut map = OptionMap::default();
409
410 let format = match settings {
411 JsonStructureSettings::Structured(_) => JSON_FORMAT_FULL_STRUCTURED,
412 JsonStructureSettings::PartialUnstructuredByKey { .. } => JSON_FORMAT_PARTIAL,
413 JsonStructureSettings::UnstructuredRaw => JSON_FORMAT_RAW,
414 };
415 map.insert(JSON_OPT_FORMAT.to_string(), format.to_string());
416
417 if let JsonStructureSettings::PartialUnstructuredByKey {
418 fields: _,
419 unstructured_keys,
420 } = settings
421 {
422 let value = OptionValue::from(
423 unstructured_keys
424 .iter()
425 .map(|x| x.as_str())
426 .sorted()
427 .collect::<Vec<_>>(),
428 );
429 map.insert_options(JSON_OPT_UNSTRUCTURED_KEYS, value);
430 }
431
432 self.json_datatype_options = Some(map);
433 }
434}
435
436#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
443pub struct Partitions {
444 pub column_list: Vec<Ident>,
445 pub exprs: Vec<Expr>,
446}
447
448impl Partitions {
449 pub fn set_quote(&mut self, quote_style: char) {
451 self.column_list
452 .iter_mut()
453 .for_each(|c| c.quote_style = Some(quote_style));
454 }
455}
456
457#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut)]
458pub struct PartitionEntry {
459 pub name: Ident,
460 pub value_list: Vec<SqlValue>,
461}
462
463impl Display for PartitionEntry {
464 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
465 write!(
466 f,
467 "PARTITION {} VALUES LESS THAN ({})",
468 self.name,
469 format_list_comma!(self.value_list),
470 )
471 }
472}
473
474impl Display for Partitions {
475 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
476 if !self.column_list.is_empty() {
477 write!(
478 f,
479 "PARTITION ON COLUMNS ({}) (\n{}\n)",
480 format_list_comma!(self.column_list),
481 format_list_indent!(self.exprs),
482 )?;
483 }
484 Ok(())
485 }
486}
487
488impl Display for CreateTable {
489 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
490 write!(f, "CREATE ")?;
491 if self.engine == FILE_ENGINE {
492 write!(f, "EXTERNAL ")?;
493 }
494 write!(f, "TABLE ")?;
495 if self.if_not_exists {
496 write!(f, "IF NOT EXISTS ")?;
497 }
498 writeln!(f, "{} (", &self.name)?;
499 writeln!(f, "{},", format_list_indent!(self.columns))?;
500 writeln!(f, "{}", format_table_constraint(&self.constraints))?;
501 writeln!(f, ")")?;
502 if let Some(partitions) = &self.partitions {
503 writeln!(f, "{partitions}")?;
504 }
505 writeln!(f, "ENGINE={}", &self.engine)?;
506 if !self.options.is_empty() {
507 let options = self.options.kv_pairs();
508 write!(f, "WITH(\n{}\n)", format_list_indent!(options))?;
509 }
510 Ok(())
511 }
512}
513
514#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
515pub struct CreateDatabase {
516 pub name: ObjectName,
517 pub if_not_exists: bool,
519 pub options: OptionMap,
520}
521
522impl CreateDatabase {
523 pub fn new(name: ObjectName, if_not_exists: bool, options: OptionMap) -> Self {
525 Self {
526 name,
527 if_not_exists,
528 options,
529 }
530 }
531}
532
533impl Display for CreateDatabase {
534 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
535 write!(f, "CREATE DATABASE ")?;
536 if self.if_not_exists {
537 write!(f, "IF NOT EXISTS ")?;
538 }
539 write!(f, "{}", &self.name)?;
540 if !self.options.is_empty() {
541 let options = self.options.kv_pairs();
542 write!(f, "\nWITH(\n{}\n)", format_list_indent!(options))?;
543 }
544 Ok(())
545 }
546}
547
548#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
549pub struct CreateExternalTable {
550 pub name: ObjectName,
552 pub columns: Vec<Column>,
553 pub constraints: Vec<TableConstraint>,
554 pub options: OptionMap,
556 pub if_not_exists: bool,
557 pub engine: String,
558}
559
560impl Display for CreateExternalTable {
561 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
562 write!(f, "CREATE EXTERNAL TABLE ")?;
563 if self.if_not_exists {
564 write!(f, "IF NOT EXISTS ")?;
565 }
566 writeln!(f, "{} (", &self.name)?;
567 writeln!(f, "{},", format_list_indent!(self.columns))?;
568 writeln!(f, "{}", format_table_constraint(&self.constraints))?;
569 writeln!(f, ")")?;
570 writeln!(f, "ENGINE={}", &self.engine)?;
571 if !self.options.is_empty() {
572 let options = self.options.kv_pairs();
573 write!(f, "WITH(\n{}\n)", format_list_indent!(options))?;
574 }
575 Ok(())
576 }
577}
578
579#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
580pub struct CreateTableLike {
581 pub table_name: ObjectName,
583 pub source_name: ObjectName,
585}
586
587impl Display for CreateTableLike {
588 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
589 let table_name = &self.table_name;
590 let source_name = &self.source_name;
591 write!(f, r#"CREATE TABLE {table_name} LIKE {source_name}"#)
592 }
593}
594
595#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
596pub struct CreateFlow {
597 pub flow_name: ObjectName,
599 pub sink_table_name: ObjectName,
601 pub or_replace: bool,
603 pub if_not_exists: bool,
605 pub expire_after: Option<i64>,
608 pub eval_interval: Option<i64>,
612 pub comment: Option<String>,
614 pub query: Box<SqlOrTql>,
616}
617
618#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
620pub enum SqlOrTql {
621 Sql(Query, String),
622 Tql(Tql, String),
623}
624
625impl std::fmt::Display for SqlOrTql {
626 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
627 match self {
628 Self::Sql(_, s) => write!(f, "{}", s),
629 Self::Tql(_, s) => write!(f, "{}", s),
630 }
631 }
632}
633
634impl SqlOrTql {
635 pub fn try_from_statement(
636 value: Statement,
637 original_query: &str,
638 ) -> std::result::Result<Self, crate::error::Error> {
639 match value {
640 Statement::Query(query) => {
641 Ok(Self::Sql((*query).try_into()?, original_query.to_string()))
642 }
643 Statement::Tql(tql) => Ok(Self::Tql(tql, original_query.to_string())),
644 _ => InvalidFlowQuerySnafu {
645 reason: format!("Expect either sql query or promql query, found {:?}", value),
646 }
647 .fail(),
648 }
649 }
650}
651
652impl Display for CreateFlow {
653 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
654 write!(f, "CREATE ")?;
655 if self.or_replace {
656 write!(f, "OR REPLACE ")?;
657 }
658 write!(f, "FLOW ")?;
659 if self.if_not_exists {
660 write!(f, "IF NOT EXISTS ")?;
661 }
662 writeln!(f, "{}", &self.flow_name)?;
663 writeln!(f, "SINK TO {}", &self.sink_table_name)?;
664 if let Some(expire_after) = &self.expire_after {
665 writeln!(f, "EXPIRE AFTER '{} s'", expire_after)?;
666 }
667 if let Some(eval_interval) = &self.eval_interval {
668 writeln!(f, "EVAL INTERVAL '{} s'", eval_interval)?;
669 }
670 if let Some(comment) = &self.comment {
671 writeln!(f, "COMMENT '{}'", comment)?;
672 }
673 write!(f, "AS {}", &self.query)
674 }
675}
676
677#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
679pub struct CreateView {
680 pub name: ObjectName,
682 pub columns: Vec<Ident>,
684 pub query: Box<Statement>,
687 pub or_replace: bool,
689 pub if_not_exists: bool,
691}
692
693impl Display for CreateView {
694 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
695 write!(f, "CREATE ")?;
696 if self.or_replace {
697 write!(f, "OR REPLACE ")?;
698 }
699 write!(f, "VIEW ")?;
700 if self.if_not_exists {
701 write!(f, "IF NOT EXISTS ")?;
702 }
703 write!(f, "{} ", &self.name)?;
704 if !self.columns.is_empty() {
705 write!(f, "({}) ", format_list_comma!(self.columns))?;
706 }
707 write!(f, "AS {}", &self.query)
708 }
709}
710
711#[cfg(test)]
712mod tests {
713 use std::assert_matches::assert_matches;
714
715 use crate::dialect::GreptimeDbDialect;
716 use crate::error::Error;
717 use crate::parser::{ParseOptions, ParserContext};
718 use crate::statements::statement::Statement;
719
720 #[test]
721 fn test_display_create_table() {
722 let sql = r"create table if not exists demo(
723 host string,
724 ts timestamp,
725 cpu double default 0,
726 memory double,
727 TIME INDEX (ts),
728 PRIMARY KEY(host)
729 )
730 PARTITION ON COLUMNS (host) (
731 host = 'a',
732 host > 'a',
733 )
734 engine=mito
735 with(ttl='7d', storage='File');
736 ";
737 let result =
738 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
739 .unwrap();
740 assert_eq!(1, result.len());
741
742 match &result[0] {
743 Statement::CreateTable(c) => {
744 let new_sql = format!("\n{}", c);
745 assert_eq!(
746 r#"
747CREATE TABLE IF NOT EXISTS demo (
748 host STRING,
749 ts TIMESTAMP,
750 cpu DOUBLE DEFAULT 0,
751 memory DOUBLE,
752 TIME INDEX (ts),
753 PRIMARY KEY (host)
754)
755PARTITION ON COLUMNS (host) (
756 host = 'a',
757 host > 'a'
758)
759ENGINE=mito
760WITH(
761 storage = 'File',
762 ttl = '7d'
763)"#,
764 &new_sql
765 );
766
767 let new_result = ParserContext::create_with_dialect(
768 &new_sql,
769 &GreptimeDbDialect {},
770 ParseOptions::default(),
771 )
772 .unwrap();
773 assert_eq!(result, new_result);
774 }
775 _ => unreachable!(),
776 }
777 }
778
779 #[test]
780 fn test_display_empty_partition_column() {
781 let sql = r"create table if not exists demo(
782 host string,
783 ts timestamp,
784 cpu double default 0,
785 memory double,
786 TIME INDEX (ts),
787 PRIMARY KEY(ts, host)
788 );
789 ";
790 let result =
791 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
792 .unwrap();
793 assert_eq!(1, result.len());
794
795 match &result[0] {
796 Statement::CreateTable(c) => {
797 let new_sql = format!("\n{}", c);
798 assert_eq!(
799 r#"
800CREATE TABLE IF NOT EXISTS demo (
801 host STRING,
802 ts TIMESTAMP,
803 cpu DOUBLE DEFAULT 0,
804 memory DOUBLE,
805 TIME INDEX (ts),
806 PRIMARY KEY (ts, host)
807)
808ENGINE=mito
809"#,
810 &new_sql
811 );
812
813 let new_result = ParserContext::create_with_dialect(
814 &new_sql,
815 &GreptimeDbDialect {},
816 ParseOptions::default(),
817 )
818 .unwrap();
819 assert_eq!(result, new_result);
820 }
821 _ => unreachable!(),
822 }
823 }
824
825 #[test]
826 fn test_validate_table_options() {
827 let sql = r"create table if not exists demo(
828 host string,
829 ts timestamp,
830 cpu double default 0,
831 memory double,
832 TIME INDEX (ts),
833 PRIMARY KEY(host)
834 )
835 PARTITION ON COLUMNS (host) ()
836 engine=mito
837 with(ttl='7d', 'compaction.type'='world');
838";
839 let result =
840 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
841 .unwrap();
842 match &result[0] {
843 Statement::CreateTable(c) => {
844 assert_eq!(2, c.options.len());
845 }
846 _ => unreachable!(),
847 }
848
849 let sql = r"create table if not exists demo(
850 host string,
851 ts timestamp,
852 cpu double default 0,
853 memory double,
854 TIME INDEX (ts),
855 PRIMARY KEY(host)
856 )
857 PARTITION ON COLUMNS (host) ()
858 engine=mito
859 with(ttl='7d', hello='world');
860";
861 let result =
862 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
863 assert_matches!(result, Err(Error::InvalidTableOption { .. }))
864 }
865
866 #[test]
867 fn test_display_create_database() {
868 let sql = r"create database test;";
869 let stmts =
870 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
871 .unwrap();
872 assert_eq!(1, stmts.len());
873 assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
874
875 match &stmts[0] {
876 Statement::CreateDatabase(set) => {
877 let new_sql = format!("\n{}", set);
878 assert_eq!(
879 r#"
880CREATE DATABASE test"#,
881 &new_sql
882 );
883 }
884 _ => {
885 unreachable!();
886 }
887 }
888
889 let sql = r"create database if not exists test;";
890 let stmts =
891 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
892 .unwrap();
893 assert_eq!(1, stmts.len());
894 assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
895
896 match &stmts[0] {
897 Statement::CreateDatabase(set) => {
898 let new_sql = format!("\n{}", set);
899 assert_eq!(
900 r#"
901CREATE DATABASE IF NOT EXISTS test"#,
902 &new_sql
903 );
904 }
905 _ => {
906 unreachable!();
907 }
908 }
909
910 let sql = r#"CREATE DATABASE IF NOT EXISTS test WITH (ttl='1h');"#;
911 let stmts =
912 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
913 .unwrap();
914 assert_eq!(1, stmts.len());
915 assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
916
917 match &stmts[0] {
918 Statement::CreateDatabase(set) => {
919 let new_sql = format!("\n{}", set);
920 assert_eq!(
921 r#"
922CREATE DATABASE IF NOT EXISTS test
923WITH(
924 ttl = '1h'
925)"#,
926 &new_sql
927 );
928 }
929 _ => {
930 unreachable!();
931 }
932 }
933 }
934
935 #[test]
936 fn test_display_create_table_like() {
937 let sql = r"create table t2 like t1;";
938 let stmts =
939 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
940 .unwrap();
941 assert_eq!(1, stmts.len());
942 assert_matches!(&stmts[0], Statement::CreateTableLike { .. });
943
944 match &stmts[0] {
945 Statement::CreateTableLike(create) => {
946 let new_sql = format!("\n{}", create);
947 assert_eq!(
948 r#"
949CREATE TABLE t2 LIKE t1"#,
950 &new_sql
951 );
952 }
953 _ => {
954 unreachable!();
955 }
956 }
957 }
958
959 #[test]
960 fn test_display_create_external_table() {
961 let sql = r#"CREATE EXTERNAL TABLE city (
962 host string,
963 ts timestamp,
964 cpu float64 default 0,
965 memory float64,
966 TIME INDEX (ts),
967 PRIMARY KEY(host)
968) WITH (location='/var/data/city.csv', format='csv');"#;
969 let stmts =
970 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
971 .unwrap();
972 assert_eq!(1, stmts.len());
973 assert_matches!(&stmts[0], Statement::CreateExternalTable { .. });
974
975 match &stmts[0] {
976 Statement::CreateExternalTable(create) => {
977 let new_sql = format!("\n{}", create);
978 assert_eq!(
979 r#"
980CREATE EXTERNAL TABLE city (
981 host STRING,
982 ts TIMESTAMP,
983 cpu DOUBLE DEFAULT 0,
984 memory DOUBLE,
985 TIME INDEX (ts),
986 PRIMARY KEY (host)
987)
988ENGINE=file
989WITH(
990 format = 'csv',
991 location = '/var/data/city.csv'
992)"#,
993 &new_sql
994 );
995 }
996 _ => {
997 unreachable!();
998 }
999 }
1000 }
1001
1002 #[test]
1003 fn test_display_create_flow() {
1004 let sql = r"CREATE FLOW filter_numbers
1005 SINK TO out_num_cnt
1006 AS SELECT number FROM numbers_input where number > 10;";
1007 let result =
1008 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1009 .unwrap();
1010 assert_eq!(1, result.len());
1011
1012 match &result[0] {
1013 Statement::CreateFlow(c) => {
1014 let new_sql = format!("\n{}", c);
1015 assert_eq!(
1016 r#"
1017CREATE FLOW filter_numbers
1018SINK TO out_num_cnt
1019AS SELECT number FROM numbers_input where number > 10"#,
1020 &new_sql
1021 );
1022
1023 let new_result = ParserContext::create_with_dialect(
1024 &new_sql,
1025 &GreptimeDbDialect {},
1026 ParseOptions::default(),
1027 )
1028 .unwrap();
1029 assert_eq!(result, new_result);
1030 }
1031 _ => unreachable!(),
1032 }
1033 }
1034
1035 #[test]
1036 fn test_vector_index_options_validation() {
1037 use super::{ColumnExtensions, OptionMap};
1038
1039 let extensions = ColumnExtensions {
1041 fulltext_index_options: None,
1042 vector_options: None,
1043 skipping_index_options: None,
1044 inverted_index_options: None,
1045 json_datatype_options: None,
1046 vector_index_options: Some(OptionMap::from([(
1047 "connectivity".to_string(),
1048 "0".to_string(),
1049 )])),
1050 };
1051 let result = extensions.build_vector_index_options();
1052 assert!(result.is_err());
1053 assert!(
1054 result
1055 .unwrap_err()
1056 .to_string()
1057 .contains("connectivity must be in the range [2, 2048]")
1058 );
1059
1060 let extensions = ColumnExtensions {
1062 fulltext_index_options: None,
1063 vector_options: None,
1064 skipping_index_options: None,
1065 inverted_index_options: None,
1066 json_datatype_options: None,
1067 vector_index_options: Some(OptionMap::from([(
1068 "expansion_add".to_string(),
1069 "0".to_string(),
1070 )])),
1071 };
1072 let result = extensions.build_vector_index_options();
1073 assert!(result.is_err());
1074 assert!(
1075 result
1076 .unwrap_err()
1077 .to_string()
1078 .contains("expansion_add must be greater than 0")
1079 );
1080
1081 let extensions = ColumnExtensions {
1083 fulltext_index_options: None,
1084 vector_options: None,
1085 skipping_index_options: None,
1086 inverted_index_options: None,
1087 json_datatype_options: None,
1088 vector_index_options: Some(OptionMap::from([(
1089 "expansion_search".to_string(),
1090 "0".to_string(),
1091 )])),
1092 };
1093 let result = extensions.build_vector_index_options();
1094 assert!(result.is_err());
1095 assert!(
1096 result
1097 .unwrap_err()
1098 .to_string()
1099 .contains("expansion_search must be greater than 0")
1100 );
1101
1102 let extensions = ColumnExtensions {
1104 fulltext_index_options: None,
1105 vector_options: None,
1106 skipping_index_options: None,
1107 inverted_index_options: None,
1108 json_datatype_options: None,
1109 vector_index_options: Some(OptionMap::from([
1110 ("connectivity".to_string(), "32".to_string()),
1111 ("expansion_add".to_string(), "200".to_string()),
1112 ("expansion_search".to_string(), "100".to_string()),
1113 ])),
1114 };
1115 let result = extensions.build_vector_index_options();
1116 assert!(result.is_ok());
1117 let options = result.unwrap().unwrap();
1118 assert_eq!(options.connectivity, 32);
1119 assert_eq!(options.expansion_add, 200);
1120 assert_eq!(options.expansion_search, 100);
1121 }
1122}