1use std::collections::{HashMap, HashSet};
16use std::fmt::{Display, Formatter};
17use std::sync::Arc;
18
19use common_catalog::consts::FILE_ENGINE;
20use datatypes::data_type::ConcreteDataType;
21use datatypes::json::JsonStructureSettings;
22use datatypes::schema::{
23 FulltextOptions, SkippingIndexOptions, VectorDistanceMetric, VectorIndexEngineType,
24 VectorIndexOptions,
25};
26use datatypes::types::StructType;
27use itertools::Itertools;
28use serde::Serialize;
29use snafu::{OptionExt, ResultExt};
30use sqlparser::ast::{ColumnOptionDef, DataType, Expr};
31use sqlparser_derive::{Visit, VisitMut};
32
33use crate::ast::{ColumnDef, Ident, ObjectName, Value as SqlValue};
34use crate::error::{
35 InvalidFlowQuerySnafu, InvalidJsonStructureSettingSnafu, InvalidSqlSnafu, Result,
36 SetFulltextOptionSnafu, SetSkippingIndexOptionSnafu,
37};
38use crate::statements::query::Query as GtQuery;
39use crate::statements::statement::Statement;
40use crate::statements::tql::Tql;
41use crate::statements::{OptionMap, sql_data_type_to_concrete_data_type};
42use crate::util::OptionValue;
43
44const LINE_SEP: &str = ",\n";
45const COMMA_SEP: &str = ", ";
46const INDENT: usize = 2;
47pub const VECTOR_OPT_DIM: &str = "dim";
48
49pub const JSON_OPT_UNSTRUCTURED_KEYS: &str = "unstructured_keys";
50pub const JSON_OPT_FORMAT: &str = "format";
51pub(crate) const JSON_OPT_FIELDS: &str = "fields";
52pub const JSON_FORMAT_FULL_STRUCTURED: &str = "structured";
53pub const JSON_FORMAT_RAW: &str = "raw";
54pub const JSON_FORMAT_PARTIAL: &str = "partial";
55
56macro_rules! format_indent {
57 ($fmt: expr, $arg: expr) => {
58 format!($fmt, format_args!("{: >1$}", "", INDENT), $arg)
59 };
60 ($arg: expr) => {
61 format_indent!("{}{}", $arg)
62 };
63}
64
65macro_rules! format_list_indent {
66 ($list: expr) => {
67 $list.iter().map(|e| format_indent!(e)).join(LINE_SEP)
68 };
69}
70
71macro_rules! format_list_comma {
72 ($list: expr) => {
73 $list.iter().map(|e| format!("{}", e)).join(COMMA_SEP)
74 };
75}
76
77#[cfg(feature = "enterprise")]
78pub mod trigger;
79
80fn format_table_constraint(constraints: &[TableConstraint]) -> String {
81 constraints.iter().map(|c| format_indent!(c)).join(LINE_SEP)
82}
83
84#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
86pub enum TableConstraint {
87 PrimaryKey { columns: Vec<Ident> },
89 TimeIndex { column: Ident },
91}
92
93impl Display for TableConstraint {
94 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
95 match self {
96 TableConstraint::PrimaryKey { columns } => {
97 write!(f, "PRIMARY KEY ({})", format_list_comma!(columns))
98 }
99 TableConstraint::TimeIndex { column } => {
100 write!(f, "TIME INDEX ({})", column)
101 }
102 }
103 }
104}
105
106#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
107pub struct CreateTable {
108 pub if_not_exists: bool,
110 pub table_id: u32,
111 pub name: ObjectName,
113 pub columns: Vec<Column>,
114 pub engine: String,
115 pub constraints: Vec<TableConstraint>,
116 pub options: OptionMap,
118 pub partitions: Option<Partitions>,
119}
120
121#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
123pub struct Column {
124 pub column_def: ColumnDef,
126 pub extensions: ColumnExtensions,
128}
129
130#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Default, Serialize)]
132pub struct ColumnExtensions {
133 pub vector_options: Option<OptionMap>,
135
136 pub fulltext_index_options: Option<OptionMap>,
138 pub skipping_index_options: Option<OptionMap>,
140 pub inverted_index_options: Option<OptionMap>,
144 pub vector_index_options: Option<OptionMap>,
146 pub json_datatype_options: Option<OptionMap>,
147}
148
149impl Column {
150 pub fn name(&self) -> &Ident {
151 &self.column_def.name
152 }
153
154 pub fn data_type(&self) -> &DataType {
155 &self.column_def.data_type
156 }
157
158 pub fn mut_data_type(&mut self) -> &mut DataType {
159 &mut self.column_def.data_type
160 }
161
162 pub fn options(&self) -> &[ColumnOptionDef] {
163 &self.column_def.options
164 }
165
166 pub fn mut_options(&mut self) -> &mut Vec<ColumnOptionDef> {
167 &mut self.column_def.options
168 }
169}
170
171impl Display for Column {
172 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
173 if let Some(vector_options) = &self.extensions.vector_options
174 && let Some(dim) = vector_options.get(VECTOR_OPT_DIM)
175 {
176 write!(f, "{} VECTOR({})", self.column_def.name, dim)?;
177 return Ok(());
178 }
179
180 write!(f, "{} {}", self.column_def.name, self.column_def.data_type)?;
181 if let Some(options) = &self.extensions.json_datatype_options {
182 write!(
183 f,
184 "({})",
185 options
186 .entries()
187 .map(|(k, v)| format!("{k} = {v}"))
188 .join(COMMA_SEP)
189 )?;
190 }
191 for option in &self.column_def.options {
192 write!(f, " {option}")?;
193 }
194
195 if let Some(fulltext_options) = &self.extensions.fulltext_index_options {
196 if !fulltext_options.is_empty() {
197 let options = fulltext_options.kv_pairs();
198 write!(f, " FULLTEXT INDEX WITH({})", format_list_comma!(options))?;
199 } else {
200 write!(f, " FULLTEXT INDEX")?;
201 }
202 }
203
204 if let Some(skipping_index_options) = &self.extensions.skipping_index_options {
205 if !skipping_index_options.is_empty() {
206 let options = skipping_index_options.kv_pairs();
207 write!(f, " SKIPPING INDEX WITH({})", format_list_comma!(options))?;
208 } else {
209 write!(f, " SKIPPING INDEX")?;
210 }
211 }
212
213 if let Some(inverted_index_options) = &self.extensions.inverted_index_options {
214 if !inverted_index_options.is_empty() {
215 let options = inverted_index_options.kv_pairs();
216 write!(f, " INVERTED INDEX WITH({})", format_list_comma!(options))?;
217 } else {
218 write!(f, " INVERTED INDEX")?;
219 }
220 }
221
222 if let Some(vector_index_options) = &self.extensions.vector_index_options {
223 if !vector_index_options.is_empty() {
224 let options = vector_index_options.kv_pairs();
225 write!(f, " VECTOR INDEX WITH({})", format_list_comma!(options))?;
226 } else {
227 write!(f, " VECTOR INDEX")?;
228 }
229 }
230 Ok(())
231 }
232}
233
234impl ColumnExtensions {
235 pub fn build_fulltext_options(&self) -> Result<Option<FulltextOptions>> {
236 let Some(options) = self.fulltext_index_options.as_ref() else {
237 return Ok(None);
238 };
239
240 let options: HashMap<String, String> = options.clone().into_map();
241 Ok(Some(options.try_into().context(SetFulltextOptionSnafu)?))
242 }
243
244 pub fn build_skipping_index_options(&self) -> Result<Option<SkippingIndexOptions>> {
245 let Some(options) = self.skipping_index_options.as_ref() else {
246 return Ok(None);
247 };
248
249 let options: HashMap<String, String> = options.clone().into_map();
250 Ok(Some(
251 options.try_into().context(SetSkippingIndexOptionSnafu)?,
252 ))
253 }
254
255 pub fn build_vector_index_options(&self) -> Result<Option<VectorIndexOptions>> {
256 let Some(options) = self.vector_index_options.as_ref() else {
257 return Ok(None);
258 };
259
260 let options_map: HashMap<String, String> = options.clone().into_map();
261 let mut result = VectorIndexOptions::default();
262
263 if let Some(s) = options_map.get("engine") {
264 result.engine = s.parse::<VectorIndexEngineType>().map_err(|e| {
265 InvalidSqlSnafu {
266 msg: format!("invalid VECTOR INDEX engine: {e}"),
267 }
268 .build()
269 })?;
270 }
271
272 if let Some(s) = options_map.get("metric") {
273 result.metric = s.parse::<VectorDistanceMetric>().map_err(|e| {
274 InvalidSqlSnafu {
275 msg: format!("invalid VECTOR INDEX metric: {e}"),
276 }
277 .build()
278 })?;
279 }
280
281 if let Some(s) = options_map.get("connectivity") {
282 let value = s.parse::<u32>().map_err(|_| {
283 InvalidSqlSnafu {
284 msg: format!(
285 "invalid VECTOR INDEX connectivity: {s}, expected positive integer"
286 ),
287 }
288 .build()
289 })?;
290 if !(2..=2048).contains(&value) {
291 return InvalidSqlSnafu {
292 msg: "VECTOR INDEX connectivity must be in the range [2, 2048].".to_string(),
293 }
294 .fail();
295 }
296 result.connectivity = value;
297 }
298
299 if let Some(s) = options_map.get("expansion_add") {
300 let value = s.parse::<u32>().map_err(|_| {
301 InvalidSqlSnafu {
302 msg: format!(
303 "invalid VECTOR INDEX expansion_add: {s}, expected positive integer"
304 ),
305 }
306 .build()
307 })?;
308 if value == 0 {
309 return InvalidSqlSnafu {
310 msg: "VECTOR INDEX expansion_add must be greater than 0".to_string(),
311 }
312 .fail();
313 }
314 result.expansion_add = value;
315 }
316
317 if let Some(s) = options_map.get("expansion_search") {
318 let value = s.parse::<u32>().map_err(|_| {
319 InvalidSqlSnafu {
320 msg: format!(
321 "invalid VECTOR INDEX expansion_search: {s}, expected positive integer"
322 ),
323 }
324 .build()
325 })?;
326 if value == 0 {
327 return InvalidSqlSnafu {
328 msg: "VECTOR INDEX expansion_search must be greater than 0".to_string(),
329 }
330 .fail();
331 }
332 result.expansion_search = value;
333 }
334
335 Ok(Some(result))
336 }
337
338 pub fn build_json_structure_settings(&self) -> Result<Option<JsonStructureSettings>> {
339 let Some(options) = self.json_datatype_options.as_ref() else {
340 return Ok(None);
341 };
342
343 let unstructured_keys = options
344 .value(JSON_OPT_UNSTRUCTURED_KEYS)
345 .and_then(|v| {
346 v.as_list().map(|x| {
347 x.into_iter()
348 .map(|x| x.to_string())
349 .collect::<HashSet<String>>()
350 })
351 })
352 .unwrap_or_default();
353
354 let fields = if let Some(value) = options.value(JSON_OPT_FIELDS) {
355 let fields = value
356 .as_struct_fields()
357 .context(InvalidJsonStructureSettingSnafu {
358 reason: format!(r#"expect "{JSON_OPT_FIELDS}" a struct, actual: "{value}""#,),
359 })?;
360 let fields = fields
361 .iter()
362 .map(|field| {
363 let name = field.field_name.as_ref().map(|x| x.value.clone()).context(
364 InvalidJsonStructureSettingSnafu {
365 reason: format!(r#"missing field name in "{field}""#),
366 },
367 )?;
368 let datatype = sql_data_type_to_concrete_data_type(
369 &field.field_type,
370 &Default::default(),
371 )?;
372 Ok(datatypes::types::StructField::new(name, datatype, true))
373 })
374 .collect::<Result<_>>()?;
375 Some(StructType::new(Arc::new(fields)))
376 } else {
377 None
378 };
379
380 options
381 .get(JSON_OPT_FORMAT)
382 .map(|format| match format {
383 JSON_FORMAT_FULL_STRUCTURED => Ok(JsonStructureSettings::Structured(fields)),
384 JSON_FORMAT_PARTIAL => {
385 let fields = fields.map(|fields| {
386 let mut fields = Arc::unwrap_or_clone(fields.fields());
387 fields.push(datatypes::types::StructField::new(
388 JsonStructureSettings::RAW_FIELD.to_string(),
389 ConcreteDataType::string_datatype(),
390 true,
391 ));
392 StructType::new(Arc::new(fields))
393 });
394 Ok(JsonStructureSettings::PartialUnstructuredByKey {
395 fields,
396 unstructured_keys,
397 })
398 }
399 JSON_FORMAT_RAW => Ok(JsonStructureSettings::UnstructuredRaw),
400 _ => InvalidSqlSnafu {
401 msg: format!("unknown JSON datatype 'format': {format}"),
402 }
403 .fail(),
404 })
405 .transpose()
406 }
407
408 pub fn set_json_structure_settings(&mut self, settings: JsonStructureSettings) {
409 let mut map = OptionMap::default();
410
411 let format = match settings {
412 JsonStructureSettings::Structured(_) => JSON_FORMAT_FULL_STRUCTURED,
413 JsonStructureSettings::PartialUnstructuredByKey { .. } => JSON_FORMAT_PARTIAL,
414 JsonStructureSettings::UnstructuredRaw => JSON_FORMAT_RAW,
415 };
416 map.insert(JSON_OPT_FORMAT.to_string(), format.to_string());
417
418 if let JsonStructureSettings::PartialUnstructuredByKey {
419 fields: _,
420 unstructured_keys,
421 } = settings
422 {
423 let value = OptionValue::from(
424 unstructured_keys
425 .iter()
426 .map(|x| x.as_str())
427 .sorted()
428 .collect::<Vec<_>>(),
429 );
430 map.insert_options(JSON_OPT_UNSTRUCTURED_KEYS, value);
431 }
432
433 self.json_datatype_options = Some(map);
434 }
435}
436
437#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
444pub struct Partitions {
445 pub column_list: Vec<Ident>,
446 pub exprs: Vec<Expr>,
447}
448
449impl Partitions {
450 pub fn set_quote(&mut self, quote_style: char) {
452 self.column_list
453 .iter_mut()
454 .for_each(|c| c.quote_style = Some(quote_style));
455 }
456}
457
458#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut)]
459pub struct PartitionEntry {
460 pub name: Ident,
461 pub value_list: Vec<SqlValue>,
462}
463
464impl Display for PartitionEntry {
465 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
466 write!(
467 f,
468 "PARTITION {} VALUES LESS THAN ({})",
469 self.name,
470 format_list_comma!(self.value_list),
471 )
472 }
473}
474
475impl Display for Partitions {
476 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
477 if !self.column_list.is_empty() {
478 write!(
479 f,
480 "PARTITION ON COLUMNS ({}) (\n{}\n)",
481 format_list_comma!(self.column_list),
482 format_list_indent!(self.exprs),
483 )?;
484 }
485 Ok(())
486 }
487}
488
489impl Display for CreateTable {
490 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
491 write!(f, "CREATE ")?;
492 if self.engine == FILE_ENGINE {
493 write!(f, "EXTERNAL ")?;
494 }
495 write!(f, "TABLE ")?;
496 if self.if_not_exists {
497 write!(f, "IF NOT EXISTS ")?;
498 }
499 writeln!(f, "{} (", &self.name)?;
500 writeln!(f, "{},", format_list_indent!(self.columns))?;
501 writeln!(f, "{}", format_table_constraint(&self.constraints))?;
502 writeln!(f, ")")?;
503 if let Some(partitions) = &self.partitions {
504 writeln!(f, "{partitions}")?;
505 }
506 writeln!(f, "ENGINE={}", &self.engine)?;
507 if !self.options.is_empty() {
508 let options = self.options.kv_pairs();
509 write!(f, "WITH(\n{}\n)", format_list_indent!(options))?;
510 }
511 Ok(())
512 }
513}
514
515#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
516pub struct CreateDatabase {
517 pub name: ObjectName,
518 pub if_not_exists: bool,
520 pub options: OptionMap,
521}
522
523impl CreateDatabase {
524 pub fn new(name: ObjectName, if_not_exists: bool, options: OptionMap) -> Self {
526 Self {
527 name,
528 if_not_exists,
529 options,
530 }
531 }
532}
533
534impl Display for CreateDatabase {
535 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
536 write!(f, "CREATE DATABASE ")?;
537 if self.if_not_exists {
538 write!(f, "IF NOT EXISTS ")?;
539 }
540 write!(f, "{}", &self.name)?;
541 if !self.options.is_empty() {
542 let options = self.options.kv_pairs();
543 write!(f, "\nWITH(\n{}\n)", format_list_indent!(options))?;
544 }
545 Ok(())
546 }
547}
548
549#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
550pub struct CreateExternalTable {
551 pub name: ObjectName,
553 pub columns: Vec<Column>,
554 pub constraints: Vec<TableConstraint>,
555 pub options: OptionMap,
557 pub if_not_exists: bool,
558 pub engine: String,
559}
560
561impl Display for CreateExternalTable {
562 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
563 write!(f, "CREATE EXTERNAL TABLE ")?;
564 if self.if_not_exists {
565 write!(f, "IF NOT EXISTS ")?;
566 }
567 writeln!(f, "{} (", &self.name)?;
568 writeln!(f, "{},", format_list_indent!(self.columns))?;
569 writeln!(f, "{}", format_table_constraint(&self.constraints))?;
570 writeln!(f, ")")?;
571 writeln!(f, "ENGINE={}", &self.engine)?;
572 if !self.options.is_empty() {
573 let options = self.options.kv_pairs();
574 write!(f, "WITH(\n{}\n)", format_list_indent!(options))?;
575 }
576 Ok(())
577 }
578}
579
580#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
581pub struct CreateTableLike {
582 pub table_name: ObjectName,
584 pub source_name: ObjectName,
586}
587
588impl Display for CreateTableLike {
589 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
590 let table_name = &self.table_name;
591 let source_name = &self.source_name;
592 write!(f, r#"CREATE TABLE {table_name} LIKE {source_name}"#)
593 }
594}
595
596#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
597pub struct CreateFlow {
598 pub flow_name: ObjectName,
600 pub sink_table_name: ObjectName,
602 pub or_replace: bool,
604 pub if_not_exists: bool,
606 pub expire_after: Option<i64>,
609 pub eval_interval: Option<i64>,
613 pub comment: Option<String>,
615 pub query: Box<SqlOrTql>,
617}
618
619#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
621pub enum SqlOrTql {
622 Sql(GtQuery, String),
623 Tql(Tql, String),
624}
625
626impl std::fmt::Display for SqlOrTql {
627 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
628 match self {
629 Self::Sql(_, s) => write!(f, "{}", s),
630 Self::Tql(_, s) => write!(f, "{}", s),
631 }
632 }
633}
634
635impl SqlOrTql {
636 pub fn try_from_statement(
637 value: Statement,
638 original_query: &str,
639 ) -> std::result::Result<Self, crate::error::Error> {
640 match value {
641 Statement::Query(query) => Ok(Self::Sql(*query, original_query.to_string())),
642 Statement::Tql(tql) => Ok(Self::Tql(tql, original_query.to_string())),
643 _ => InvalidFlowQuerySnafu {
644 reason: format!("Expect either sql query or promql query, found {:?}", value),
645 }
646 .fail(),
647 }
648 }
649}
650
651impl Display for CreateFlow {
652 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
653 write!(f, "CREATE ")?;
654 if self.or_replace {
655 write!(f, "OR REPLACE ")?;
656 }
657 write!(f, "FLOW ")?;
658 if self.if_not_exists {
659 write!(f, "IF NOT EXISTS ")?;
660 }
661 writeln!(f, "{}", &self.flow_name)?;
662 writeln!(f, "SINK TO {}", &self.sink_table_name)?;
663 if let Some(expire_after) = &self.expire_after {
664 writeln!(f, "EXPIRE AFTER '{} s'", expire_after)?;
665 }
666 if let Some(eval_interval) = &self.eval_interval {
667 writeln!(f, "EVAL INTERVAL '{} s'", eval_interval)?;
668 }
669 if let Some(comment) = &self.comment {
670 writeln!(f, "COMMENT '{}'", comment)?;
671 }
672 write!(f, "AS {}", &self.query)
673 }
674}
675
676#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
678pub struct CreateView {
679 pub name: ObjectName,
681 pub columns: Vec<Ident>,
683 pub query: Box<Statement>,
686 pub or_replace: bool,
688 pub if_not_exists: bool,
690}
691
692impl Display for CreateView {
693 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
694 write!(f, "CREATE ")?;
695 if self.or_replace {
696 write!(f, "OR REPLACE ")?;
697 }
698 write!(f, "VIEW ")?;
699 if self.if_not_exists {
700 write!(f, "IF NOT EXISTS ")?;
701 }
702 write!(f, "{} ", &self.name)?;
703 if !self.columns.is_empty() {
704 write!(f, "({}) ", format_list_comma!(self.columns))?;
705 }
706 write!(f, "AS {}", &self.query)
707 }
708}
709
710#[cfg(test)]
711mod tests {
712 use std::assert_matches::assert_matches;
713
714 use crate::dialect::GreptimeDbDialect;
715 use crate::error::Error;
716 use crate::parser::{ParseOptions, ParserContext};
717 use crate::statements::statement::Statement;
718
719 #[test]
720 fn test_display_create_table() {
721 let sql = r"create table if not exists demo(
722 host string,
723 ts timestamp,
724 cpu double default 0,
725 memory double,
726 TIME INDEX (ts),
727 PRIMARY KEY(host)
728 )
729 PARTITION ON COLUMNS (host) (
730 host = 'a',
731 host > 'a',
732 )
733 engine=mito
734 with(ttl='7d', storage='File');
735 ";
736 let result =
737 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
738 .unwrap();
739 assert_eq!(1, result.len());
740
741 match &result[0] {
742 Statement::CreateTable(c) => {
743 let new_sql = format!("\n{}", c);
744 assert_eq!(
745 r#"
746CREATE TABLE IF NOT EXISTS demo (
747 host STRING,
748 ts TIMESTAMP,
749 cpu DOUBLE DEFAULT 0,
750 memory DOUBLE,
751 TIME INDEX (ts),
752 PRIMARY KEY (host)
753)
754PARTITION ON COLUMNS (host) (
755 host = 'a',
756 host > 'a'
757)
758ENGINE=mito
759WITH(
760 storage = 'File',
761 ttl = '7d'
762)"#,
763 &new_sql
764 );
765
766 let new_result = ParserContext::create_with_dialect(
767 &new_sql,
768 &GreptimeDbDialect {},
769 ParseOptions::default(),
770 )
771 .unwrap();
772 assert_eq!(result, new_result);
773 }
774 _ => unreachable!(),
775 }
776 }
777
778 #[test]
779 fn test_display_empty_partition_column() {
780 let sql = r"create table if not exists demo(
781 host string,
782 ts timestamp,
783 cpu double default 0,
784 memory double,
785 TIME INDEX (ts),
786 PRIMARY KEY(ts, host)
787 );
788 ";
789 let result =
790 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
791 .unwrap();
792 assert_eq!(1, result.len());
793
794 match &result[0] {
795 Statement::CreateTable(c) => {
796 let new_sql = format!("\n{}", c);
797 assert_eq!(
798 r#"
799CREATE TABLE IF NOT EXISTS demo (
800 host STRING,
801 ts TIMESTAMP,
802 cpu DOUBLE DEFAULT 0,
803 memory DOUBLE,
804 TIME INDEX (ts),
805 PRIMARY KEY (ts, host)
806)
807ENGINE=mito
808"#,
809 &new_sql
810 );
811
812 let new_result = ParserContext::create_with_dialect(
813 &new_sql,
814 &GreptimeDbDialect {},
815 ParseOptions::default(),
816 )
817 .unwrap();
818 assert_eq!(result, new_result);
819 }
820 _ => unreachable!(),
821 }
822 }
823
824 #[test]
825 fn test_validate_table_options() {
826 let sql = r"create table if not exists demo(
827 host string,
828 ts timestamp,
829 cpu double default 0,
830 memory double,
831 TIME INDEX (ts),
832 PRIMARY KEY(host)
833 )
834 PARTITION ON COLUMNS (host) ()
835 engine=mito
836 with(ttl='7d', 'compaction.type'='world');
837";
838 let result =
839 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
840 .unwrap();
841 match &result[0] {
842 Statement::CreateTable(c) => {
843 assert_eq!(2, c.options.len());
844 }
845 _ => unreachable!(),
846 }
847
848 let sql = r"create table if not exists demo(
849 host string,
850 ts timestamp,
851 cpu double default 0,
852 memory double,
853 TIME INDEX (ts),
854 PRIMARY KEY(host)
855 )
856 PARTITION ON COLUMNS (host) ()
857 engine=mito
858 with(ttl='7d', hello='world');
859";
860 let result =
861 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
862 assert_matches!(result, Err(Error::InvalidTableOption { .. }))
863 }
864
865 #[test]
866 fn test_display_create_database() {
867 let sql = r"create database test;";
868 let stmts =
869 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
870 .unwrap();
871 assert_eq!(1, stmts.len());
872 assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
873
874 match &stmts[0] {
875 Statement::CreateDatabase(set) => {
876 let new_sql = format!("\n{}", set);
877 assert_eq!(
878 r#"
879CREATE DATABASE test"#,
880 &new_sql
881 );
882 }
883 _ => {
884 unreachable!();
885 }
886 }
887
888 let sql = r"create database if not exists test;";
889 let stmts =
890 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
891 .unwrap();
892 assert_eq!(1, stmts.len());
893 assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
894
895 match &stmts[0] {
896 Statement::CreateDatabase(set) => {
897 let new_sql = format!("\n{}", set);
898 assert_eq!(
899 r#"
900CREATE DATABASE IF NOT EXISTS test"#,
901 &new_sql
902 );
903 }
904 _ => {
905 unreachable!();
906 }
907 }
908
909 let sql = r#"CREATE DATABASE IF NOT EXISTS test WITH (ttl='1h');"#;
910 let stmts =
911 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
912 .unwrap();
913 assert_eq!(1, stmts.len());
914 assert_matches!(&stmts[0], Statement::CreateDatabase { .. });
915
916 match &stmts[0] {
917 Statement::CreateDatabase(set) => {
918 let new_sql = format!("\n{}", set);
919 assert_eq!(
920 r#"
921CREATE DATABASE IF NOT EXISTS test
922WITH(
923 ttl = '1h'
924)"#,
925 &new_sql
926 );
927 }
928 _ => {
929 unreachable!();
930 }
931 }
932 }
933
934 #[test]
935 fn test_display_create_table_like() {
936 let sql = r"create table t2 like t1;";
937 let stmts =
938 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
939 .unwrap();
940 assert_eq!(1, stmts.len());
941 assert_matches!(&stmts[0], Statement::CreateTableLike { .. });
942
943 match &stmts[0] {
944 Statement::CreateTableLike(create) => {
945 let new_sql = format!("\n{}", create);
946 assert_eq!(
947 r#"
948CREATE TABLE t2 LIKE t1"#,
949 &new_sql
950 );
951 }
952 _ => {
953 unreachable!();
954 }
955 }
956 }
957
958 #[test]
959 fn test_display_create_external_table() {
960 let sql = r#"CREATE EXTERNAL TABLE city (
961 host string,
962 ts timestamp,
963 cpu float64 default 0,
964 memory float64,
965 TIME INDEX (ts),
966 PRIMARY KEY(host)
967) WITH (location='/var/data/city.csv', format='csv');"#;
968 let stmts =
969 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
970 .unwrap();
971 assert_eq!(1, stmts.len());
972 assert_matches!(&stmts[0], Statement::CreateExternalTable { .. });
973
974 match &stmts[0] {
975 Statement::CreateExternalTable(create) => {
976 let new_sql = format!("\n{}", create);
977 assert_eq!(
978 r#"
979CREATE EXTERNAL TABLE city (
980 host STRING,
981 ts TIMESTAMP,
982 cpu DOUBLE DEFAULT 0,
983 memory DOUBLE,
984 TIME INDEX (ts),
985 PRIMARY KEY (host)
986)
987ENGINE=file
988WITH(
989 format = 'csv',
990 location = '/var/data/city.csv'
991)"#,
992 &new_sql
993 );
994 }
995 _ => {
996 unreachable!();
997 }
998 }
999 }
1000
1001 #[test]
1002 fn test_display_create_flow() {
1003 let sql = r"CREATE FLOW filter_numbers
1004 SINK TO out_num_cnt
1005 AS SELECT number FROM numbers_input where number > 10;";
1006 let result =
1007 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1008 .unwrap();
1009 assert_eq!(1, result.len());
1010
1011 match &result[0] {
1012 Statement::CreateFlow(c) => {
1013 let new_sql = format!("\n{}", c);
1014 assert_eq!(
1015 r#"
1016CREATE FLOW filter_numbers
1017SINK TO out_num_cnt
1018AS SELECT number FROM numbers_input where number > 10"#,
1019 &new_sql
1020 );
1021
1022 let new_result = ParserContext::create_with_dialect(
1023 &new_sql,
1024 &GreptimeDbDialect {},
1025 ParseOptions::default(),
1026 )
1027 .unwrap();
1028 assert_eq!(result, new_result);
1029 }
1030 _ => unreachable!(),
1031 }
1032 }
1033
1034 #[test]
1035 fn test_vector_index_options_validation() {
1036 use super::{ColumnExtensions, OptionMap};
1037
1038 let extensions = ColumnExtensions {
1040 fulltext_index_options: None,
1041 vector_options: None,
1042 skipping_index_options: None,
1043 inverted_index_options: None,
1044 json_datatype_options: None,
1045 vector_index_options: Some(OptionMap::from([(
1046 "connectivity".to_string(),
1047 "0".to_string(),
1048 )])),
1049 };
1050 let result = extensions.build_vector_index_options();
1051 assert!(result.is_err());
1052 assert!(
1053 result
1054 .unwrap_err()
1055 .to_string()
1056 .contains("connectivity must be in the range [2, 2048]")
1057 );
1058
1059 let extensions = ColumnExtensions {
1061 fulltext_index_options: None,
1062 vector_options: None,
1063 skipping_index_options: None,
1064 inverted_index_options: None,
1065 json_datatype_options: None,
1066 vector_index_options: Some(OptionMap::from([(
1067 "expansion_add".to_string(),
1068 "0".to_string(),
1069 )])),
1070 };
1071 let result = extensions.build_vector_index_options();
1072 assert!(result.is_err());
1073 assert!(
1074 result
1075 .unwrap_err()
1076 .to_string()
1077 .contains("expansion_add must be greater than 0")
1078 );
1079
1080 let extensions = ColumnExtensions {
1082 fulltext_index_options: None,
1083 vector_options: None,
1084 skipping_index_options: None,
1085 inverted_index_options: None,
1086 json_datatype_options: None,
1087 vector_index_options: Some(OptionMap::from([(
1088 "expansion_search".to_string(),
1089 "0".to_string(),
1090 )])),
1091 };
1092 let result = extensions.build_vector_index_options();
1093 assert!(result.is_err());
1094 assert!(
1095 result
1096 .unwrap_err()
1097 .to_string()
1098 .contains("expansion_search must be greater than 0")
1099 );
1100
1101 let extensions = ColumnExtensions {
1103 fulltext_index_options: None,
1104 vector_options: None,
1105 skipping_index_options: None,
1106 inverted_index_options: None,
1107 json_datatype_options: None,
1108 vector_index_options: Some(OptionMap::from([
1109 ("connectivity".to_string(), "32".to_string()),
1110 ("expansion_add".to_string(), "200".to_string()),
1111 ("expansion_search".to_string(), "100".to_string()),
1112 ])),
1113 };
1114 let result = extensions.build_vector_index_options();
1115 assert!(result.is_ok());
1116 let options = result.unwrap().unwrap();
1117 assert_eq!(options.connectivity, 32);
1118 assert_eq!(options.expansion_add, 200);
1119 assert_eq!(options.expansion_search, 100);
1120 }
1121}