1#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19
20use api::helper::ColumnDataTypeWrapper;
21use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
22use api::v1::alter_table_expr::Kind as AlterTableKind;
23use api::v1::column_def::{options_from_column_schema, try_as_column_schema};
24use api::v1::{
25 AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer, ColumnDataType,
26 ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn,
27 DropColumns, DropDefaults, ExpireAfter, FulltextBackend as PbFulltextBackend, ModifyColumnType,
28 ModifyColumnTypes, RenameTable, SemanticType, SetDatabaseOptions, SetDefaults, SetFulltext,
29 SetIndex, SetIndexes, SetInverted, SetSkipping, SetTableOptions,
30 SkippingIndexType as PbSkippingIndexType, TableName, UnsetDatabaseOptions, UnsetFulltext,
31 UnsetIndex, UnsetIndexes, UnsetInverted, UnsetSkipping, UnsetTableOptions, set_index,
32 unset_index,
33};
34use common_error::ext::BoxedError;
35use common_grpc_expr::util::ColumnExpr;
36use common_time::Timezone;
37use datafusion::sql::planner::object_name_to_table_reference;
38use datatypes::schema::{
39 COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
40 COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
41 COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
42 COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
43 ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema,
44 SkippingIndexType,
45};
46use file_engine::FileOptions;
47use query::sql::{
48 check_file_to_table_schema_compatibility, file_column_schemas_to_table,
49 infer_file_table_schema, prepare_file_table_files,
50};
51use session::context::QueryContextRef;
52use session::table_name::table_idents_to_full_name;
53use snafu::{OptionExt, ResultExt, ensure};
54use sql::ast::{
55 ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName, ObjectNamePartExt,
56};
57use sql::dialect::GreptimeDbDialect;
58use sql::parser::ParserContext;
59use sql::statements::alter::{
60 AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
61};
62use sql::statements::create::{
63 Column as SqlColumn, ColumnExtensions, CreateExternalTable, CreateFlow, CreateTable,
64 CreateView, TableConstraint,
65};
66use sql::statements::{
67 OptionMap, column_to_schema, concrete_data_type_to_sql_data_type,
68 sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type, value_to_sql_value,
69};
70use sql::util::extract_tables_from_query;
71use store_api::mito_engine_options::{COMPACTION_OVERRIDE, COMPACTION_TYPE};
72use table::requests::{FILE_TABLE_META_KEY, TableOptions};
73use table::table_reference::TableReference;
74#[cfg(feature = "enterprise")]
75pub use trigger::to_create_trigger_task_expr;
76
77use crate::error::{
78 BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
79 ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, FindNewColumnsOnInsertionSnafu,
80 IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidColumnDefSnafu,
81 InvalidFlowNameSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, ParseSqlValueSnafu,
82 PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu,
83};
84
85pub fn create_table_expr_by_column_schemas(
86 table_name: &TableReference<'_>,
87 column_schemas: &[api::v1::ColumnSchema],
88 engine: &str,
89 desc: Option<&str>,
90) -> Result<CreateTableExpr> {
91 let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
92 let expr = common_grpc_expr::util::build_create_table_expr(
93 None,
94 table_name,
95 column_exprs,
96 engine,
97 desc.unwrap_or("Created on insertion"),
98 )
99 .context(BuildCreateExprOnInsertionSnafu)?;
100
101 validate_create_expr(&expr)?;
102 Ok(expr)
103}
104
105pub fn extract_add_columns_expr(
106 schema: &Schema,
107 column_exprs: Vec<ColumnExpr>,
108) -> Result<Option<AddColumns>> {
109 let add_columns = common_grpc_expr::util::extract_new_columns(schema, column_exprs)
110 .context(FindNewColumnsOnInsertionSnafu)?;
111 if let Some(add_columns) = &add_columns {
112 validate_add_columns_expr(add_columns)?;
113 }
114 Ok(add_columns)
115}
116
117pub(crate) async fn create_external_expr(
141 create: CreateExternalTable,
142 query_ctx: &QueryContextRef,
143) -> Result<CreateTableExpr> {
144 let (catalog_name, schema_name, table_name) =
145 table_idents_to_full_name(&create.name, query_ctx)
146 .map_err(BoxedError::new)
147 .context(ExternalSnafu)?;
148
149 let mut table_options = create.options.into_map();
150
151 let (object_store, files) = prepare_file_table_files(&table_options)
152 .await
153 .context(PrepareFileTableSnafu)?;
154
155 let file_column_schemas = infer_file_table_schema(&object_store, &files, &table_options)
156 .await
157 .context(InferFileTableSchemaSnafu)?
158 .column_schemas()
159 .to_vec();
160
161 let (time_index, primary_keys, table_column_schemas) = if !create.columns.is_empty() {
162 let time_index = find_time_index(&create.constraints)?;
164 let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
165 let column_schemas =
166 columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
167 (time_index, primary_keys, column_schemas)
168 } else {
169 let (column_schemas, time_index) = file_column_schemas_to_table(&file_column_schemas);
171 let primary_keys = vec![];
172 (time_index, primary_keys, column_schemas)
173 };
174
175 check_file_to_table_schema_compatibility(&file_column_schemas, &table_column_schemas)
176 .context(SchemaIncompatibleSnafu)?;
177
178 let meta = FileOptions {
179 files,
180 file_column_schemas,
181 };
182 table_options.insert(
183 FILE_TABLE_META_KEY.to_string(),
184 serde_json::to_string(&meta).context(EncodeJsonSnafu)?,
185 );
186
187 let column_defs = column_schemas_to_defs(table_column_schemas, &primary_keys)?;
188 let expr = CreateTableExpr {
189 catalog_name,
190 schema_name,
191 table_name,
192 desc: String::default(),
193 column_defs,
194 time_index,
195 primary_keys,
196 create_if_not_exists: create.if_not_exists,
197 table_options,
198 table_id: None,
199 engine: create.engine.clone(),
200 };
201
202 Ok(expr)
203}
204
205pub fn create_to_expr(
207 create: &CreateTable,
208 query_ctx: &QueryContextRef,
209) -> Result<CreateTableExpr> {
210 let (catalog_name, schema_name, table_name) =
211 table_idents_to_full_name(&create.name, query_ctx)
212 .map_err(BoxedError::new)
213 .context(ExternalSnafu)?;
214
215 let time_index = find_time_index(&create.constraints)?;
216 let table_options = HashMap::from(
217 &TableOptions::try_from_iter(create.options.to_str_map())
218 .context(UnrecognizedTableOptionSnafu)?,
219 );
220
221 let mut table_options = table_options;
222 if table_options.contains_key(COMPACTION_TYPE) {
223 table_options.insert(COMPACTION_OVERRIDE.to_string(), "true".to_string());
224 }
225
226 let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
227
228 let expr = CreateTableExpr {
229 catalog_name,
230 schema_name,
231 table_name,
232 desc: String::default(),
233 column_defs: columns_to_expr(
234 &create.columns,
235 &time_index,
236 &primary_keys,
237 Some(&query_ctx.timezone()),
238 )?,
239 time_index,
240 primary_keys,
241 create_if_not_exists: create.if_not_exists,
242 table_options,
243 table_id: None,
244 engine: create.engine.clone(),
245 };
246
247 validate_create_expr(&expr)?;
248 Ok(expr)
249}
250
251pub fn expr_to_create(expr: &CreateTableExpr, quote_style: Option<char>) -> Result<CreateTable> {
259 let quote_style = quote_style.unwrap_or('`');
260
261 let table_name = ObjectName(vec![sql::ast::ObjectNamePart::Identifier(
263 sql::ast::Ident::with_quote(quote_style, &expr.table_name),
264 )]);
265
266 let mut columns = Vec::with_capacity(expr.column_defs.len());
268 for column_def in &expr.column_defs {
269 let column_schema = try_as_column_schema(column_def).context(InvalidColumnDefSnafu {
270 column: &column_def.name,
271 })?;
272
273 let mut options = Vec::new();
274
275 if column_def.is_nullable {
277 options.push(ColumnOptionDef {
278 name: None,
279 option: ColumnOption::Null,
280 });
281 } else {
282 options.push(ColumnOptionDef {
283 name: None,
284 option: ColumnOption::NotNull,
285 });
286 }
287
288 if let Some(default_constraint) = column_schema.default_constraint() {
290 let expr = match default_constraint {
291 ColumnDefaultConstraint::Value(v) => {
292 Expr::Value(value_to_sql_value(v).context(ParseSqlValueSnafu)?.into())
293 }
294 ColumnDefaultConstraint::Function(func_expr) => {
295 ParserContext::parse_function(func_expr, &GreptimeDbDialect {})
296 .context(ParseSqlSnafu)?
297 }
298 };
299 options.push(ColumnOptionDef {
300 name: None,
301 option: ColumnOption::Default(expr),
302 });
303 }
304
305 if !column_def.comment.is_empty() {
307 options.push(ColumnOptionDef {
308 name: None,
309 option: ColumnOption::Comment(column_def.comment.clone()),
310 });
311 }
312
313 let mut extensions = ColumnExtensions::default();
318
319 if let Ok(Some(opt)) = column_schema.fulltext_options()
321 && opt.enable
322 {
323 let mut map = HashMap::from([
324 (
325 COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
326 opt.analyzer.to_string(),
327 ),
328 (
329 COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
330 opt.case_sensitive.to_string(),
331 ),
332 (
333 COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
334 opt.backend.to_string(),
335 ),
336 ]);
337 if opt.backend == FulltextBackend::Bloom {
338 map.insert(
339 COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(),
340 opt.granularity.to_string(),
341 );
342 map.insert(
343 COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
344 opt.false_positive_rate().to_string(),
345 );
346 }
347 extensions.fulltext_index_options = Some(map.into());
348 }
349
350 if let Ok(Some(opt)) = column_schema.skipping_index_options() {
352 let map = HashMap::from([
353 (
354 COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
355 opt.granularity.to_string(),
356 ),
357 (
358 COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
359 opt.false_positive_rate().to_string(),
360 ),
361 (
362 COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
363 opt.index_type.to_string(),
364 ),
365 ]);
366 extensions.skipping_index_options = Some(map.into());
367 }
368
369 if column_schema.is_inverted_indexed() {
371 extensions.inverted_index_options = Some(HashMap::new().into());
372 }
373
374 let sql_column = SqlColumn {
375 column_def: ColumnDef {
376 name: Ident::with_quote(quote_style, &column_def.name),
377 data_type: concrete_data_type_to_sql_data_type(&column_schema.data_type)
378 .context(ParseSqlSnafu)?,
379 options,
380 },
381 extensions,
382 };
383
384 columns.push(sql_column);
385 }
386
387 let mut constraints = Vec::new();
389
390 constraints.push(TableConstraint::TimeIndex {
392 column: Ident::with_quote(quote_style, &expr.time_index),
393 });
394
395 if !expr.primary_keys.is_empty() {
397 let primary_key_columns: Vec<Ident> = expr
398 .primary_keys
399 .iter()
400 .map(|pk| Ident::with_quote(quote_style, pk))
401 .collect();
402
403 constraints.push(TableConstraint::PrimaryKey {
404 columns: primary_key_columns,
405 });
406 }
407
408 let mut options = OptionMap::default();
410 for (key, value) in &expr.table_options {
411 options.insert(key.clone(), value.clone());
412 }
413
414 Ok(CreateTable {
415 if_not_exists: expr.create_if_not_exists,
416 table_id: expr.table_id.as_ref().map(|tid| tid.id).unwrap_or(0),
417 name: table_name,
418 columns,
419 engine: expr.engine.clone(),
420 constraints,
421 options,
422 partitions: None,
423 })
424}
425
426pub fn validate_create_expr(create: &CreateTableExpr) -> Result<()> {
428 let mut column_to_indices = HashMap::with_capacity(create.column_defs.len());
430 for (idx, column) in create.column_defs.iter().enumerate() {
431 if let Some(indices) = column_to_indices.get(&column.name) {
432 return InvalidSqlSnafu {
433 err_msg: format!(
434 "column name `{}` is duplicated at index {} and {}",
435 column.name, indices, idx
436 ),
437 }
438 .fail();
439 }
440 column_to_indices.insert(&column.name, idx);
441 }
442
443 let _ = column_to_indices
445 .get(&create.time_index)
446 .with_context(|| InvalidSqlSnafu {
447 err_msg: format!(
448 "column name `{}` is not found in column list",
449 create.time_index
450 ),
451 })?;
452
453 for pk in &create.primary_keys {
455 let _ = column_to_indices
456 .get(&pk)
457 .with_context(|| InvalidSqlSnafu {
458 err_msg: format!("column name `{}` is not found in column list", pk),
459 })?;
460 }
461
462 let mut pk_set = HashSet::new();
464 for pk in &create.primary_keys {
465 if !pk_set.insert(pk) {
466 return InvalidSqlSnafu {
467 err_msg: format!("column name `{}` is duplicated in primary keys", pk),
468 }
469 .fail();
470 }
471 }
472
473 if pk_set.contains(&create.time_index) {
475 return InvalidSqlSnafu {
476 err_msg: format!(
477 "column name `{}` is both primary key and time index",
478 create.time_index
479 ),
480 }
481 .fail();
482 }
483
484 for column in &create.column_defs {
485 if is_interval_type(&column.data_type()) {
487 return InvalidSqlSnafu {
488 err_msg: format!(
489 "column name `{}` is interval type, which is not supported",
490 column.name
491 ),
492 }
493 .fail();
494 }
495 if is_date_time_type(&column.data_type()) {
497 return InvalidSqlSnafu {
498 err_msg: format!(
499 "column name `{}` is datetime type, which is not supported, please use `timestamp` type instead",
500 column.name
501 ),
502 }
503 .fail();
504 }
505 }
506 Ok(())
507}
508
509fn validate_add_columns_expr(add_columns: &AddColumns) -> Result<()> {
510 for add_column in &add_columns.add_columns {
511 let Some(column_def) = &add_column.column_def else {
512 continue;
513 };
514 if is_date_time_type(&column_def.data_type()) {
515 return InvalidSqlSnafu {
516 err_msg: format!("column name `{}` is datetime type, which is not supported, please use `timestamp` type instead", column_def.name),
517 }
518 .fail();
519 }
520 if is_interval_type(&column_def.data_type()) {
521 return InvalidSqlSnafu {
522 err_msg: format!(
523 "column name `{}` is interval type, which is not supported",
524 column_def.name
525 ),
526 }
527 .fail();
528 }
529 }
530 Ok(())
531}
532
533fn is_date_time_type(data_type: &ColumnDataType) -> bool {
534 matches!(data_type, ColumnDataType::Datetime)
535}
536
537fn is_interval_type(data_type: &ColumnDataType) -> bool {
538 matches!(
539 data_type,
540 ColumnDataType::IntervalYearMonth
541 | ColumnDataType::IntervalDayTime
542 | ColumnDataType::IntervalMonthDayNano
543 )
544}
545
546fn find_primary_keys(
547 columns: &[SqlColumn],
548 constraints: &[TableConstraint],
549) -> Result<Vec<String>> {
550 let columns_pk = columns
551 .iter()
552 .filter_map(|x| {
553 if x.options()
554 .iter()
555 .any(|o| matches!(o.option, ColumnOption::PrimaryKey(_)))
556 {
557 Some(x.name().value.clone())
558 } else {
559 None
560 }
561 })
562 .collect::<Vec<String>>();
563
564 ensure!(
565 columns_pk.len() <= 1,
566 IllegalPrimaryKeysDefSnafu {
567 msg: "not allowed to inline multiple primary keys in columns options"
568 }
569 );
570
571 let constraints_pk = constraints
572 .iter()
573 .filter_map(|constraint| match constraint {
574 TableConstraint::PrimaryKey { columns, .. } => {
575 Some(columns.iter().map(|ident| ident.value.clone()))
576 }
577 _ => None,
578 })
579 .flatten()
580 .collect::<Vec<String>>();
581
582 ensure!(
583 columns_pk.is_empty() || constraints_pk.is_empty(),
584 IllegalPrimaryKeysDefSnafu {
585 msg: "found definitions of primary keys in multiple places"
586 }
587 );
588
589 let mut primary_keys = Vec::with_capacity(columns_pk.len() + constraints_pk.len());
590 primary_keys.extend(columns_pk);
591 primary_keys.extend(constraints_pk);
592 Ok(primary_keys)
593}
594
595pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
596 let time_index = constraints
597 .iter()
598 .filter_map(|constraint| match constraint {
599 TableConstraint::TimeIndex { column, .. } => Some(&column.value),
600 _ => None,
601 })
602 .collect::<Vec<&String>>();
603 ensure!(
604 time_index.len() == 1,
605 InvalidSqlSnafu {
606 err_msg: "must have one and only one TimeIndex columns",
607 }
608 );
609 Ok(time_index[0].clone())
610}
611
612fn columns_to_expr(
613 column_defs: &[SqlColumn],
614 time_index: &str,
615 primary_keys: &[String],
616 timezone: Option<&Timezone>,
617) -> Result<Vec<api::v1::ColumnDef>> {
618 let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
619 column_schemas_to_defs(column_schemas, primary_keys)
620}
621
622fn columns_to_column_schemas(
623 columns: &[SqlColumn],
624 time_index: &str,
625 timezone: Option<&Timezone>,
626) -> Result<Vec<ColumnSchema>> {
627 columns
628 .iter()
629 .map(|c| column_to_schema(c, time_index, timezone).context(ParseSqlSnafu))
630 .collect::<Result<Vec<ColumnSchema>>>()
631}
632
633pub fn column_schemas_to_defs(
635 column_schemas: Vec<ColumnSchema>,
636 primary_keys: &[String],
637) -> Result<Vec<api::v1::ColumnDef>> {
638 let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
639 .iter()
640 .map(|c| {
641 ColumnDataTypeWrapper::try_from(c.data_type.clone())
642 .map(|w| w.to_parts())
643 .context(ColumnDataTypeSnafu)
644 })
645 .collect::<Result<Vec<_>>>()?;
646
647 column_schemas
648 .iter()
649 .zip(column_datatypes)
650 .map(|(schema, datatype)| {
651 let semantic_type = if schema.is_time_index() {
652 SemanticType::Timestamp
653 } else if primary_keys.contains(&schema.name) {
654 SemanticType::Tag
655 } else {
656 SemanticType::Field
657 } as i32;
658 let comment = schema
659 .metadata()
660 .get(COMMENT_KEY)
661 .cloned()
662 .unwrap_or_default();
663
664 Ok(api::v1::ColumnDef {
665 name: schema.name.clone(),
666 data_type: datatype.0 as i32,
667 is_nullable: schema.is_nullable(),
668 default_constraint: match schema.default_constraint() {
669 None => vec![],
670 Some(v) => {
671 v.clone()
672 .try_into()
673 .context(ConvertColumnDefaultConstraintSnafu {
674 column_name: &schema.name,
675 })?
676 }
677 },
678 semantic_type,
679 comment,
680 datatype_extension: datatype.1,
681 options: options_from_column_schema(schema),
682 })
683 })
684 .collect()
685}
686
687#[derive(Debug, Clone, PartialEq, Eq)]
688pub struct RepartitionRequest {
689 pub catalog_name: String,
690 pub schema_name: String,
691 pub table_name: String,
692 pub source: RepartitionSource,
693 pub into_exprs: Vec<Expr>,
694 pub options: OptionMap,
695}
696
697#[derive(Debug, Clone, PartialEq, Eq)]
698pub enum RepartitionSource {
699 Partitions {
700 from_exprs: Vec<Expr>,
701 target_partition_columns: Option<Vec<String>>,
702 },
703 Unpartitioned {
704 partition_columns: Vec<String>,
705 },
706}
707
708pub(crate) fn to_repartition_request(
709 alter_table: AlterTable,
710 query_ctx: &QueryContextRef,
711) -> Result<RepartitionRequest> {
712 let AlterTable {
713 table_name,
714 alter_operation,
715 options,
716 } = alter_table;
717
718 let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&table_name, query_ctx)
719 .map_err(BoxedError::new)
720 .context(ExternalSnafu)?;
721
722 let (source, into_exprs) = match alter_operation {
723 AlterTableOperation::Repartition { operation } => (
724 RepartitionSource::Partitions {
725 from_exprs: operation.from_exprs,
726 target_partition_columns: operation.partition_columns.map(|columns| {
727 columns
728 .into_iter()
729 .map(|ident| ident.value)
730 .collect::<Vec<_>>()
731 }),
732 },
733 operation.into_exprs,
734 ),
735 AlterTableOperation::Partition { partitions } => (
736 RepartitionSource::Unpartitioned {
737 partition_columns: partitions
738 .column_list
739 .into_iter()
740 .map(|ident| ident.value)
741 .collect(),
742 },
743 partitions.exprs,
744 ),
745 _ => {
746 return InvalidSqlSnafu {
747 err_msg: "expected REPARTITION or PARTITION operation",
748 }
749 .fail();
750 }
751 };
752
753 Ok(RepartitionRequest {
754 catalog_name,
755 schema_name,
756 table_name,
757 source,
758 into_exprs,
759 options,
760 })
761}
762
763pub(crate) fn to_alter_table_expr(
765 alter_table: AlterTable,
766 query_ctx: &QueryContextRef,
767) -> Result<AlterTableExpr> {
768 let (catalog_name, schema_name, table_name) =
769 table_idents_to_full_name(alter_table.table_name(), query_ctx)
770 .map_err(BoxedError::new)
771 .context(ExternalSnafu)?;
772
773 let kind = match alter_table.alter_operation {
774 AlterTableOperation::AddConstraint(_) => {
775 return NotSupportedSnafu {
776 feat: "ADD CONSTRAINT",
777 }
778 .fail();
779 }
780 AlterTableOperation::AddColumns { add_columns } => AlterTableKind::AddColumns(AddColumns {
781 add_columns: add_columns
782 .into_iter()
783 .map(|add_column| {
784 let column_def = sql_column_def_to_grpc_column_def(
785 &add_column.column_def,
786 Some(&query_ctx.timezone()),
787 )
788 .map_err(BoxedError::new)
789 .context(ExternalSnafu)?;
790 if is_interval_type(&column_def.data_type()) {
791 return NotSupportedSnafu {
792 feat: "Add column with interval type",
793 }
794 .fail();
795 }
796 Ok(AddColumn {
797 column_def: Some(column_def),
798 location: add_column.location.as_ref().map(From::from),
799 add_if_not_exists: add_column.add_if_not_exists,
800 })
801 })
802 .collect::<Result<Vec<AddColumn>>>()?,
803 }),
804 AlterTableOperation::ModifyColumnType {
805 column_name,
806 target_type,
807 } => {
808 let target_type =
809 sql_data_type_to_concrete_data_type(&target_type, &Default::default())
810 .context(ParseSqlSnafu)?;
811 let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
812 .map(|w| w.to_parts())
813 .context(ColumnDataTypeSnafu)?;
814 if is_interval_type(&target_type) {
815 return NotSupportedSnafu {
816 feat: "Modify column type to interval type",
817 }
818 .fail();
819 }
820 AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
821 modify_column_types: vec![ModifyColumnType {
822 column_name: column_name.value,
823 target_type: target_type as i32,
824 target_type_extension,
825 }],
826 })
827 }
828 AlterTableOperation::DropColumn { name } => AlterTableKind::DropColumns(DropColumns {
829 drop_columns: vec![DropColumn {
830 name: name.value.clone(),
831 }],
832 }),
833 AlterTableOperation::RenameTable { new_table_name } => {
834 AlterTableKind::RenameTable(RenameTable {
835 new_table_name: new_table_name.clone(),
836 })
837 }
838 AlterTableOperation::SetTableOptions { options } => {
839 AlterTableKind::SetTableOptions(SetTableOptions {
840 table_options: options.into_iter().map(Into::into).collect(),
841 })
842 }
843 AlterTableOperation::UnsetTableOptions { keys } => {
844 AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys })
845 }
846 AlterTableOperation::Repartition { .. } => {
847 return NotSupportedSnafu {
848 feat: "ALTER TABLE ... REPARTITION",
849 }
850 .fail();
851 }
852 AlterTableOperation::Partition { .. } => {
853 return NotSupportedSnafu {
854 feat: "ALTER TABLE ... PARTITION ON COLUMNS",
855 }
856 .fail();
857 }
858 AlterTableOperation::SetIndex { options } => {
859 let option = match options {
860 sql::statements::alter::SetIndexOperation::Fulltext {
861 column_name,
862 options,
863 } => SetIndex {
864 options: Some(set_index::Options::Fulltext(SetFulltext {
865 column_name: column_name.value,
866 enable: options.enable,
867 analyzer: match options.analyzer {
868 FulltextAnalyzer::English => Analyzer::English.into(),
869 FulltextAnalyzer::Chinese => Analyzer::Chinese.into(),
870 },
871 case_sensitive: options.case_sensitive,
872 backend: match options.backend {
873 FulltextBackend::Bloom => PbFulltextBackend::Bloom.into(),
874 FulltextBackend::Tantivy => PbFulltextBackend::Tantivy.into(),
875 },
876 granularity: options.granularity as u64,
877 false_positive_rate: options.false_positive_rate(),
878 })),
879 },
880 sql::statements::alter::SetIndexOperation::Inverted { column_name } => SetIndex {
881 options: Some(set_index::Options::Inverted(SetInverted {
882 column_name: column_name.value,
883 })),
884 },
885 sql::statements::alter::SetIndexOperation::Skipping {
886 column_name,
887 options,
888 } => SetIndex {
889 options: Some(set_index::Options::Skipping(SetSkipping {
890 column_name: column_name.value,
891 enable: true,
892 granularity: options.granularity as u64,
893 false_positive_rate: options.false_positive_rate(),
894 skipping_index_type: match options.index_type {
895 SkippingIndexType::BloomFilter => {
896 PbSkippingIndexType::BloomFilter.into()
897 }
898 },
899 })),
900 },
901 };
902 AlterTableKind::SetIndexes(SetIndexes {
903 set_indexes: vec![option],
904 })
905 }
906 AlterTableOperation::UnsetIndex { options } => {
907 let option = match options {
908 sql::statements::alter::UnsetIndexOperation::Fulltext { column_name } => {
909 UnsetIndex {
910 options: Some(unset_index::Options::Fulltext(UnsetFulltext {
911 column_name: column_name.value,
912 })),
913 }
914 }
915 sql::statements::alter::UnsetIndexOperation::Inverted { column_name } => {
916 UnsetIndex {
917 options: Some(unset_index::Options::Inverted(UnsetInverted {
918 column_name: column_name.value,
919 })),
920 }
921 }
922 sql::statements::alter::UnsetIndexOperation::Skipping { column_name } => {
923 UnsetIndex {
924 options: Some(unset_index::Options::Skipping(UnsetSkipping {
925 column_name: column_name.value,
926 })),
927 }
928 }
929 };
930
931 AlterTableKind::UnsetIndexes(UnsetIndexes {
932 unset_indexes: vec![option],
933 })
934 }
935 AlterTableOperation::DropDefaults { columns } => {
936 AlterTableKind::DropDefaults(DropDefaults {
937 drop_defaults: columns
938 .into_iter()
939 .map(|col| {
940 let column_name = col.0.to_string();
941 Ok(api::v1::DropDefault { column_name })
942 })
943 .collect::<Result<Vec<_>>>()?,
944 })
945 }
946 AlterTableOperation::SetDefaults { defaults } => AlterTableKind::SetDefaults(SetDefaults {
947 set_defaults: defaults
948 .into_iter()
949 .map(|col| {
950 let column_name = col.column_name.to_string();
951 let default_constraint = serde_json::to_string(&col.default_constraint)
952 .context(EncodeJsonSnafu)?
953 .into_bytes();
954 Ok(api::v1::SetDefault {
955 column_name,
956 default_constraint,
957 })
958 })
959 .collect::<Result<Vec<_>>>()?,
960 }),
961 };
962
963 Ok(AlterTableExpr {
964 catalog_name,
965 schema_name,
966 table_name,
967 kind: Some(kind),
968 })
969}
970
971pub fn to_alter_database_expr(
973 alter_database: AlterDatabase,
974 query_ctx: &QueryContextRef,
975) -> Result<AlterDatabaseExpr> {
976 let catalog = query_ctx.current_catalog();
977 let schema = alter_database.database_name;
978
979 let kind = match alter_database.alter_operation {
980 AlterDatabaseOperation::SetDatabaseOption { options } => {
981 let options = options.into_iter().map(Into::into).collect();
982 AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
983 set_database_options: options,
984 })
985 }
986 AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
987 AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys })
988 }
989 };
990
991 Ok(AlterDatabaseExpr {
992 catalog_name: catalog.to_string(),
993 schema_name: schema.to_string(),
994 kind: Some(kind),
995 })
996}
997
998pub fn to_create_view_expr(
1000 stmt: CreateView,
1001 logical_plan: Vec<u8>,
1002 table_names: Vec<TableName>,
1003 columns: Vec<String>,
1004 plan_columns: Vec<String>,
1005 definition: String,
1006 query_ctx: QueryContextRef,
1007) -> Result<CreateViewExpr> {
1008 let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx)
1009 .map_err(BoxedError::new)
1010 .context(ExternalSnafu)?;
1011
1012 let expr = CreateViewExpr {
1013 catalog_name,
1014 schema_name,
1015 view_name,
1016 logical_plan,
1017 create_if_not_exists: stmt.if_not_exists,
1018 or_replace: stmt.or_replace,
1019 table_names,
1020 columns,
1021 plan_columns,
1022 definition,
1023 };
1024
1025 Ok(expr)
1026}
1027
1028pub fn to_create_flow_task_expr(
1029 create_flow: CreateFlow,
1030 query_ctx: &QueryContextRef,
1031) -> Result<CreateFlowExpr> {
1032 let sink_table_ref = object_name_to_table_reference(create_flow.sink_table_name.clone(), true)
1034 .with_context(|_| ConvertIdentifierSnafu {
1035 ident: create_flow.sink_table_name.to_string(),
1036 })?;
1037 let catalog = sink_table_ref
1038 .catalog()
1039 .unwrap_or(query_ctx.current_catalog())
1040 .to_string();
1041 let schema = sink_table_ref
1042 .schema()
1043 .map(|s| s.to_owned())
1044 .unwrap_or(query_ctx.current_schema());
1045
1046 let sink_table_name = TableName {
1047 catalog_name: catalog,
1048 schema_name: schema,
1049 table_name: sink_table_ref.table().to_string(),
1050 };
1051
1052 let source_table_names = extract_tables_from_query(&create_flow.query)
1053 .map(|name| {
1054 let reference =
1055 object_name_to_table_reference(name.clone(), true).with_context(|_| {
1056 ConvertIdentifierSnafu {
1057 ident: name.to_string(),
1058 }
1059 })?;
1060 let catalog = reference
1061 .catalog()
1062 .unwrap_or(query_ctx.current_catalog())
1063 .to_string();
1064 let schema = reference
1065 .schema()
1066 .map(|s| s.to_string())
1067 .unwrap_or(query_ctx.current_schema());
1068
1069 let table_name = TableName {
1070 catalog_name: catalog,
1071 schema_name: schema,
1072 table_name: reference.table().to_string(),
1073 };
1074 Ok(table_name)
1075 })
1076 .collect::<Result<Vec<_>>>()?;
1077
1078 let eval_interval = create_flow.eval_interval;
1079
1080 Ok(CreateFlowExpr {
1081 catalog_name: query_ctx.current_catalog().to_string(),
1082 flow_name: sanitize_flow_name(create_flow.flow_name)?,
1083 source_table_names,
1084 sink_table_name: Some(sink_table_name),
1085 or_replace: create_flow.or_replace,
1086 create_if_not_exists: create_flow.if_not_exists,
1087 expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
1088 eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }),
1089 comment: create_flow.comment.unwrap_or_default(),
1090 sql: create_flow.query.to_string(),
1091 flow_options: stringify_flow_options(create_flow.flow_options)?,
1092 })
1093}
1094
1095fn stringify_flow_options(flow_options: OptionMap) -> Result<HashMap<String, String>> {
1096 let options_len = flow_options.len();
1097 let flow_options = flow_options.into_map();
1098 ensure!(
1099 flow_options.len() == options_len,
1100 InvalidSqlSnafu {
1101 err_msg: "flow options only support scalar string-compatible values".to_string(),
1102 }
1103 );
1104 Ok(flow_options)
1105}
1106
1107fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
1109 ensure!(
1110 flow_name.0.len() == 1,
1111 InvalidFlowNameSnafu {
1112 name: flow_name.to_string(),
1113 }
1114 );
1115 Ok(flow_name.0.swap_remove(0).to_string_unquoted())
1117}
1118
1119#[cfg(test)]
1120mod tests {
1121 use std::collections::HashMap;
1122
1123 use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
1124 use datatypes::value::Value;
1125 use session::context::{QueryContext, QueryContextBuilder};
1126 use sql::dialect::GreptimeDbDialect;
1127 use sql::parser::{ParseOptions, ParserContext};
1128 use sql::statements::statement::Statement;
1129 use store_api::storage::ColumnDefaultConstraint;
1130
1131 use super::*;
1132
1133 #[test]
1134 fn test_create_flow_tql_expr() {
1135 let sql = r#"
1136CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1137TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#;
1138 let stmt =
1139 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1140
1141 assert!(
1142 stmt.is_err(),
1143 "Expected error for invalid TQL EVAL parameters: {:#?}",
1144 stmt
1145 );
1146
1147 let sql = r#"
1148CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1149TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests);"#;
1150 let stmt =
1151 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1152 .unwrap()
1153 .pop()
1154 .unwrap();
1155
1156 let Statement::CreateFlow(create_flow) = stmt else {
1157 unreachable!()
1158 };
1159 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1160
1161 let to_dot_sep =
1162 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1163 assert_eq!("calc_reqs", expr.flow_name);
1164 assert_eq!("greptime", expr.catalog_name);
1165 assert_eq!(
1166 "greptime.public.cnt_reqs",
1167 expr.sink_table_name.map(to_dot_sep).unwrap()
1168 );
1169 assert_eq!(1, expr.source_table_names.len());
1170 assert_eq!(
1171 "greptime.public.http_requests",
1172 to_dot_sep(expr.source_table_names[0].clone())
1173 );
1174 assert_eq!(
1175 r#"TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests)"#,
1176 expr.sql
1177 );
1178
1179 let sql = r#"
1180CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1181TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests{__schema__="greptime_private"});"#;
1182 let stmt =
1183 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1184 .unwrap()
1185 .pop()
1186 .unwrap();
1187 let Statement::CreateFlow(create_flow) = stmt else {
1188 unreachable!()
1189 };
1190 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1191 assert_eq!(1, expr.source_table_names.len());
1192 assert_eq!(
1193 "greptime.greptime_private.http_requests",
1194 to_dot_sep(expr.source_table_names[0].clone())
1195 );
1196
1197 let sql = r#"
1198CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1199TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests{__database__="greptime_private"});"#;
1200 let stmt =
1201 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1202 .unwrap()
1203 .pop()
1204 .unwrap();
1205 let Statement::CreateFlow(create_flow) = stmt else {
1206 unreachable!()
1207 };
1208 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1209 assert_eq!(1, expr.source_table_names.len());
1210 assert_eq!(
1211 "greptime.greptime_private.http_requests",
1212 to_dot_sep(expr.source_table_names[0].clone())
1213 );
1214 }
1215
1216 #[test]
1217 fn test_create_flow_tql_cte_source_tables() {
1218 let sql = r#"
1219CREATE FLOW calc_cte
1220SINK TO metric_cte_sink
1221EVAL INTERVAL '1m'
1222AS
1223WITH tql(ts, the_value) AS (
1224 TQL EVAL (now() - '1m'::interval, now(), '5s') metric_cte
1225)
1226SELECT * FROM tql;
1227"#;
1228
1229 let stmt =
1230 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1231 .unwrap()
1232 .pop()
1233 .unwrap();
1234
1235 let Statement::CreateFlow(create_flow) = stmt else {
1236 unreachable!()
1237 };
1238 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1239
1240 let to_dot_sep =
1241 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1242 assert_eq!(1, expr.source_table_names.len());
1243 assert_eq!(
1244 "greptime.public.metric_cte",
1245 to_dot_sep(expr.source_table_names[0].clone())
1246 );
1247 }
1248
1249 #[test]
1250 fn test_create_flow_tql_cte_source_tables_quoted_cte_name() {
1251 let sql = r#"
1252CREATE FLOW calc_cte
1253SINK TO metric_cte_sink
1254EVAL INTERVAL '1m'
1255AS
1256WITH "TQL"(ts, the_value) AS (
1257 TQL EVAL (now() - '1m'::interval, now(), '5s') metric_cte
1258)
1259SELECT * FROM "TQL";
1260"#;
1261
1262 let stmt =
1263 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1264 .unwrap()
1265 .pop()
1266 .unwrap();
1267
1268 let Statement::CreateFlow(create_flow) = stmt else {
1269 unreachable!()
1270 };
1271 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1272
1273 let to_dot_sep =
1274 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1275 assert_eq!(1, expr.source_table_names.len());
1276 assert_eq!(
1277 "greptime.public.metric_cte",
1278 to_dot_sep(expr.source_table_names[0].clone())
1279 );
1280 }
1281
1282 #[test]
1283 fn test_create_flow_tql_cte_source_tables_same_name() {
1284 let sql = r#"
1285CREATE FLOW calc_cte
1286SINK TO metric_cte_sink
1287EVAL INTERVAL '1m'
1288AS
1289WITH tql(ts, the_value) AS (
1290 TQL EVAL (now() - '1m'::interval, now(), '5s') tql
1291)
1292SELECT * FROM tql;
1293"#;
1294
1295 let stmt =
1296 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1297 .unwrap()
1298 .pop()
1299 .unwrap();
1300
1301 let Statement::CreateFlow(create_flow) = stmt else {
1302 unreachable!()
1303 };
1304 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1305
1306 let to_dot_sep =
1307 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1308 assert_eq!(1, expr.source_table_names.len());
1309 assert_eq!(
1310 "greptime.public.tql",
1311 to_dot_sep(expr.source_table_names[0].clone())
1312 );
1313 }
1314
1315 #[test]
1316 fn test_create_flow_expr() {
1317 let sql = r"
1318CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
1319SELECT
1320 DISTINCT number as dis
1321FROM
1322 distinct_basic;";
1323 let stmt =
1324 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1325 .unwrap()
1326 .pop()
1327 .unwrap();
1328
1329 let Statement::CreateFlow(create_flow) = stmt else {
1330 unreachable!()
1331 };
1332 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1333
1334 let to_dot_sep =
1335 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1336 assert_eq!("test_distinct_basic", expr.flow_name);
1337 assert_eq!("greptime", expr.catalog_name);
1338 assert_eq!(
1339 "greptime.public.out_distinct_basic",
1340 expr.sink_table_name.map(to_dot_sep).unwrap()
1341 );
1342 assert_eq!(1, expr.source_table_names.len());
1343 assert_eq!(
1344 "greptime.public.distinct_basic",
1345 to_dot_sep(expr.source_table_names[0].clone())
1346 );
1347 assert_eq!(
1348 r"SELECT
1349 DISTINCT number as dis
1350FROM
1351 distinct_basic",
1352 expr.sql
1353 );
1354
1355 let sql = r"
1356CREATE FLOW `task_2`
1357SINK TO schema_1.table_1
1358AS
1359SELECT max(c1), min(c2) FROM schema_2.table_2;";
1360 let stmt =
1361 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1362 .unwrap()
1363 .pop()
1364 .unwrap();
1365
1366 let Statement::CreateFlow(create_flow) = stmt else {
1367 unreachable!()
1368 };
1369 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1370
1371 let to_dot_sep =
1372 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1373 assert_eq!("task_2", expr.flow_name);
1374 assert_eq!("greptime", expr.catalog_name);
1375 assert_eq!(
1376 "greptime.schema_1.table_1",
1377 expr.sink_table_name.map(to_dot_sep).unwrap()
1378 );
1379 assert_eq!(1, expr.source_table_names.len());
1380 assert_eq!(
1381 "greptime.schema_2.table_2",
1382 to_dot_sep(expr.source_table_names[0].clone())
1383 );
1384 assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
1385 assert!(expr.flow_options.is_empty());
1386
1387 let sql = r"
1388CREATE FLOW task_3
1389SINK TO schema_1.table_1
1390WITH (defer_on_missing_source = 'true', foo = 'bar')
1391AS
1392SELECT max(c1), min(c2) FROM schema_2.table_2;";
1393 let stmt =
1394 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1395 .unwrap()
1396 .pop()
1397 .unwrap();
1398
1399 let Statement::CreateFlow(create_flow) = stmt else {
1400 unreachable!()
1401 };
1402 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1403 assert_eq!(
1404 expr.flow_options,
1405 HashMap::from([
1406 ("defer_on_missing_source".to_string(), "true".to_string()),
1407 ("foo".to_string(), "bar".to_string()),
1408 ])
1409 );
1410
1411 let sql = r"
1412CREATE FLOW task_4
1413SINK TO schema_1.table_1
1414WITH (defer_on_missing_source = true)
1415AS
1416SELECT max(c1), min(c2) FROM schema_2.table_2;";
1417 let stmt =
1418 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1419 .unwrap()
1420 .pop()
1421 .unwrap();
1422
1423 let Statement::CreateFlow(create_flow) = stmt else {
1424 unreachable!()
1425 };
1426 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1427 assert_eq!(
1428 expr.flow_options,
1429 HashMap::from([("defer_on_missing_source".to_string(), "true".to_string(),)])
1430 );
1431
1432 let sql = r"
1433CREATE FLOW task_5
1434SINK TO schema_1.table_1
1435WITH (defer_on_missing_source = [true])
1436AS
1437SELECT max(c1), min(c2) FROM schema_2.table_2;";
1438 let stmt =
1439 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1440 .unwrap()
1441 .pop()
1442 .unwrap();
1443
1444 let Statement::CreateFlow(create_flow) = stmt else {
1445 unreachable!()
1446 };
1447 let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
1448 assert!(res.is_err());
1449 assert!(
1450 res.unwrap_err()
1451 .to_string()
1452 .contains("flow options only support scalar string-compatible values")
1453 );
1454
1455 let sql = r"
1456CREATE FLOW abc.`task_2`
1457SINK TO schema_1.table_1
1458AS
1459SELECT max(c1), min(c2) FROM schema_2.table_2;";
1460 let stmt =
1461 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1462 .unwrap()
1463 .pop()
1464 .unwrap();
1465
1466 let Statement::CreateFlow(create_flow) = stmt else {
1467 unreachable!()
1468 };
1469 let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
1470
1471 assert!(res.is_err());
1472 assert!(
1473 res.unwrap_err()
1474 .to_string()
1475 .contains("Invalid flow name: abc.`task_2`")
1476 );
1477 }
1478
1479 #[test]
1480 fn test_create_to_expr() {
1481 let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(ttl='3days', write_buffer_size='1024KB');";
1482 let stmt =
1483 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1484 .unwrap()
1485 .pop()
1486 .unwrap();
1487
1488 let Statement::CreateTable(create_table) = stmt else {
1489 unreachable!()
1490 };
1491 let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1492 assert_eq!("3days", expr.table_options.get("ttl").unwrap());
1493 assert_eq!(
1494 "1.0MiB",
1495 expr.table_options.get("write_buffer_size").unwrap()
1496 );
1497 }
1498
1499 #[test]
1500 fn test_invalid_create_to_expr() {
1501 let cases = [
1502 "CREATE TABLE monitor (host STRING primary key, ts TIMESTAMP TIME INDEX, some_column text, some_column string);",
1504 "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, some_column STRING, PRIMARY KEY (some_column, host, some_column));",
1506 "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, PRIMARY KEY (host, ts));",
1508 ];
1509
1510 for sql in cases {
1511 let stmt = ParserContext::create_with_dialect(
1512 sql,
1513 &GreptimeDbDialect {},
1514 ParseOptions::default(),
1515 )
1516 .unwrap()
1517 .pop()
1518 .unwrap();
1519 let Statement::CreateTable(create_table) = stmt else {
1520 unreachable!()
1521 };
1522 create_to_expr(&create_table, &QueryContext::arc()).unwrap_err();
1523 }
1524 }
1525
1526 #[test]
1527 fn test_create_to_expr_with_default_timestamp_value() {
1528 let sql = "CREATE TABLE monitor (v double,ts TIMESTAMP default '2024-01-30T00:01:01',TIME INDEX (ts)) engine=mito;";
1529 let stmt =
1530 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1531 .unwrap()
1532 .pop()
1533 .unwrap();
1534
1535 let Statement::CreateTable(create_table) = stmt else {
1536 unreachable!()
1537 };
1538
1539 let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1541 let ts_column = &expr.column_defs[1];
1542 let constraint = assert_ts_column(ts_column);
1543 assert!(
1544 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1545 if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1546 );
1547
1548 let ctx = QueryContextBuilder::default()
1550 .timezone(Timezone::from_tz_string("+08:00").unwrap())
1551 .build()
1552 .into();
1553 let expr = create_to_expr(&create_table, &ctx).unwrap();
1554 let ts_column = &expr.column_defs[1];
1555 let constraint = assert_ts_column(ts_column);
1556 assert!(
1557 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1558 if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1559 );
1560 }
1561
1562 fn assert_ts_column(ts_column: &api::v1::ColumnDef) -> ColumnDefaultConstraint {
1563 assert_eq!("ts", ts_column.name);
1564 assert_eq!(
1565 ColumnDataType::TimestampMillisecond as i32,
1566 ts_column.data_type
1567 );
1568 assert!(!ts_column.default_constraint.is_empty());
1569
1570 ColumnDefaultConstraint::try_from(&ts_column.default_constraint[..]).unwrap()
1571 }
1572
1573 #[test]
1574 fn test_to_alter_expr() {
1575 let sql = "ALTER DATABASE greptime SET key1='value1', key2='value2';";
1576 let stmt =
1577 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1578 .unwrap()
1579 .pop()
1580 .unwrap();
1581
1582 let Statement::AlterDatabase(alter_database) = stmt else {
1583 unreachable!()
1584 };
1585
1586 let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1587 let kind = expr.kind.unwrap();
1588
1589 let AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
1590 set_database_options,
1591 }) = kind
1592 else {
1593 unreachable!()
1594 };
1595
1596 assert_eq!(2, set_database_options.len());
1597 assert_eq!("key1", set_database_options[0].key);
1598 assert_eq!("value1", set_database_options[0].value);
1599 assert_eq!("key2", set_database_options[1].key);
1600 assert_eq!("value2", set_database_options[1].value);
1601
1602 let sql = "ALTER DATABASE greptime UNSET key1, key2;";
1603 let stmt =
1604 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1605 .unwrap()
1606 .pop()
1607 .unwrap();
1608
1609 let Statement::AlterDatabase(alter_database) = stmt else {
1610 unreachable!()
1611 };
1612
1613 let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1614 let kind = expr.kind.unwrap();
1615
1616 let AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys }) = kind else {
1617 unreachable!()
1618 };
1619
1620 assert_eq!(2, keys.len());
1621 assert!(keys.contains(&"key1".to_string()));
1622 assert!(keys.contains(&"key2".to_string()));
1623
1624 let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
1625 let stmt =
1626 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1627 .unwrap()
1628 .pop()
1629 .unwrap();
1630
1631 let Statement::AlterTable(alter_table) = stmt else {
1632 unreachable!()
1633 };
1634
1635 let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1637 let kind = expr.kind.unwrap();
1638
1639 let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1640 unreachable!()
1641 };
1642
1643 assert_eq!(1, add_columns.len());
1644 let ts_column = add_columns[0].column_def.clone().unwrap();
1645 let constraint = assert_ts_column(&ts_column);
1646 assert!(
1647 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1648 if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1649 );
1650
1651 let ctx = QueryContextBuilder::default()
1654 .timezone(Timezone::from_tz_string("+08:00").unwrap())
1655 .build()
1656 .into();
1657 let expr = to_alter_table_expr(alter_table, &ctx).unwrap();
1658 let kind = expr.kind.unwrap();
1659
1660 let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1661 unreachable!()
1662 };
1663
1664 assert_eq!(1, add_columns.len());
1665 let ts_column = add_columns[0].column_def.clone().unwrap();
1666 let constraint = assert_ts_column(&ts_column);
1667 assert!(
1668 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1669 if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1670 );
1671 }
1672
1673 #[test]
1674 fn test_to_alter_modify_column_type_expr() {
1675 let sql = "ALTER TABLE monitor MODIFY COLUMN mem_usage STRING;";
1676 let stmt =
1677 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1678 .unwrap()
1679 .pop()
1680 .unwrap();
1681
1682 let Statement::AlterTable(alter_table) = stmt else {
1683 unreachable!()
1684 };
1685
1686 let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1688 let kind = expr.kind.unwrap();
1689
1690 let AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
1691 modify_column_types,
1692 }) = kind
1693 else {
1694 unreachable!()
1695 };
1696
1697 assert_eq!(1, modify_column_types.len());
1698 let modify_column_type = &modify_column_types[0];
1699
1700 assert_eq!("mem_usage", modify_column_type.column_name);
1701 assert_eq!(
1702 ColumnDataType::String as i32,
1703 modify_column_type.target_type
1704 );
1705 assert!(modify_column_type.target_type_extension.is_none());
1706 }
1707
1708 #[test]
1709 fn test_to_repartition_request() {
1710 let sql = r#"
1711ALTER TABLE metrics REPARTITION (
1712 device_id < 100
1713) INTO (
1714 device_id < 100 AND area < 'South',
1715 device_id < 100 AND area >= 'South'
1716);"#;
1717 let stmt =
1718 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1719 .unwrap()
1720 .pop()
1721 .unwrap();
1722
1723 let Statement::AlterTable(alter_table) = stmt else {
1724 unreachable!()
1725 };
1726
1727 let request = to_repartition_request(alter_table, &QueryContext::arc()).unwrap();
1728 assert_eq!("greptime", request.catalog_name);
1729 assert_eq!("public", request.schema_name);
1730 assert_eq!("metrics", request.table_name);
1731 let RepartitionSource::Partitions {
1732 from_exprs,
1733 target_partition_columns,
1734 } = request.source
1735 else {
1736 unreachable!()
1737 };
1738 assert!(target_partition_columns.is_none());
1739 assert_eq!(
1740 from_exprs
1741 .into_iter()
1742 .map(|x| x.to_string())
1743 .collect::<Vec<_>>(),
1744 vec!["device_id < 100".to_string()]
1745 );
1746 assert_eq!(
1747 request
1748 .into_exprs
1749 .into_iter()
1750 .map(|x| x.to_string())
1751 .collect::<Vec<_>>(),
1752 vec![
1753 "device_id < 100 AND area < 'South'".to_string(),
1754 "device_id < 100 AND area >= 'South'".to_string()
1755 ]
1756 );
1757 }
1758
1759 #[test]
1760 fn test_to_repartition_request_with_target_partition_columns() {
1761 let sql = r#"
1762ALTER TABLE metrics REPARTITION (
1763 device_id < 100
1764) ON COLUMNS (device_id, area) INTO (
1765 device_id < 100 AND area < 'South',
1766 device_id < 100 AND area >= 'South'
1767);"#;
1768 let stmt =
1769 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1770 .unwrap()
1771 .pop()
1772 .unwrap();
1773
1774 let Statement::AlterTable(alter_table) = stmt else {
1775 unreachable!()
1776 };
1777
1778 let request = to_repartition_request(alter_table, &QueryContext::arc()).unwrap();
1779 let RepartitionSource::Partitions {
1780 target_partition_columns,
1781 ..
1782 } = request.source
1783 else {
1784 unreachable!()
1785 };
1786
1787 assert_eq!(
1788 target_partition_columns,
1789 Some(vec!["device_id".to_string(), "area".to_string()])
1790 );
1791 }
1792
1793 #[test]
1794 fn test_to_repartition_request_with_unpartitioned_source() {
1795 let sql = r#"
1796ALTER TABLE metrics PARTITION ON COLUMNS (device_id, area) (
1797 device_id < 100 AND area < 'South',
1798 device_id < 100 AND area >= 'South'
1799);"#;
1800 let stmt =
1801 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1802 .unwrap()
1803 .pop()
1804 .unwrap();
1805
1806 let Statement::AlterTable(alter_table) = stmt else {
1807 unreachable!()
1808 };
1809
1810 let request = to_repartition_request(alter_table, &QueryContext::arc()).unwrap();
1811 assert_eq!("greptime", request.catalog_name);
1812 assert_eq!("public", request.schema_name);
1813 assert_eq!("metrics", request.table_name);
1814 let RepartitionSource::Unpartitioned { partition_columns } = request.source else {
1815 unreachable!()
1816 };
1817 assert_eq!(partition_columns, vec!["device_id", "area"]);
1818 assert_eq!(
1819 request
1820 .into_exprs
1821 .into_iter()
1822 .map(|x| x.to_string())
1823 .collect::<Vec<_>>(),
1824 vec![
1825 "device_id < 100 AND area < 'South'".to_string(),
1826 "device_id < 100 AND area >= 'South'".to_string()
1827 ]
1828 );
1829 }
1830
1831 fn new_test_table_names() -> Vec<TableName> {
1832 vec![
1833 TableName {
1834 catalog_name: "greptime".to_string(),
1835 schema_name: "public".to_string(),
1836 table_name: "a_table".to_string(),
1837 },
1838 TableName {
1839 catalog_name: "greptime".to_string(),
1840 schema_name: "public".to_string(),
1841 table_name: "b_table".to_string(),
1842 },
1843 ]
1844 }
1845
1846 #[test]
1847 fn test_to_create_view_expr() {
1848 let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
1849 let stmt =
1850 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1851 .unwrap()
1852 .pop()
1853 .unwrap();
1854
1855 let Statement::CreateView(stmt) = stmt else {
1856 unreachable!()
1857 };
1858
1859 let logical_plan = vec![1, 2, 3];
1860 let table_names = new_test_table_names();
1861 let columns = vec!["a".to_string()];
1862 let plan_columns = vec!["number".to_string()];
1863
1864 let expr = to_create_view_expr(
1865 stmt,
1866 logical_plan.clone(),
1867 table_names.clone(),
1868 columns.clone(),
1869 plan_columns.clone(),
1870 sql.to_string(),
1871 QueryContext::arc(),
1872 )
1873 .unwrap();
1874
1875 assert_eq!("greptime", expr.catalog_name);
1876 assert_eq!("public", expr.schema_name);
1877 assert_eq!("test", expr.view_name);
1878 assert!(!expr.create_if_not_exists);
1879 assert!(!expr.or_replace);
1880 assert_eq!(logical_plan, expr.logical_plan);
1881 assert_eq!(table_names, expr.table_names);
1882 assert_eq!(sql, expr.definition);
1883 assert_eq!(columns, expr.columns);
1884 assert_eq!(plan_columns, expr.plan_columns);
1885 }
1886
1887 #[test]
1888 fn test_to_create_view_expr_complex() {
1889 let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test.test_view AS SELECT * FROM NUMBERS";
1890 let stmt =
1891 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1892 .unwrap()
1893 .pop()
1894 .unwrap();
1895
1896 let Statement::CreateView(stmt) = stmt else {
1897 unreachable!()
1898 };
1899
1900 let logical_plan = vec![1, 2, 3];
1901 let table_names = new_test_table_names();
1902 let columns = vec!["a".to_string()];
1903 let plan_columns = vec!["number".to_string()];
1904
1905 let expr = to_create_view_expr(
1906 stmt,
1907 logical_plan.clone(),
1908 table_names.clone(),
1909 columns.clone(),
1910 plan_columns.clone(),
1911 sql.to_string(),
1912 QueryContext::arc(),
1913 )
1914 .unwrap();
1915
1916 assert_eq!("greptime", expr.catalog_name);
1917 assert_eq!("test", expr.schema_name);
1918 assert_eq!("test_view", expr.view_name);
1919 assert!(expr.create_if_not_exists);
1920 assert!(expr.or_replace);
1921 assert_eq!(logical_plan, expr.logical_plan);
1922 assert_eq!(table_names, expr.table_names);
1923 assert_eq!(sql, expr.definition);
1924 assert_eq!(columns, expr.columns);
1925 assert_eq!(plan_columns, expr.plan_columns);
1926 }
1927
1928 #[test]
1929 fn test_expr_to_create() {
1930 let sql = r#"CREATE TABLE IF NOT EXISTS `tt` (
1931 `timestamp` TIMESTAMP(9) NOT NULL,
1932 `ip_address` STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),
1933 `username` STRING NULL,
1934 `http_method` STRING NULL INVERTED INDEX,
1935 `request_line` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),
1936 `protocol` STRING NULL,
1937 `status_code` INT NULL INVERTED INDEX,
1938 `response_size` BIGINT NULL,
1939 `message` STRING NULL,
1940 TIME INDEX (`timestamp`),
1941 PRIMARY KEY (`username`, `status_code`)
1942)
1943ENGINE=mito
1944WITH(
1945 append_mode = 'true'
1946)"#;
1947 let stmt =
1948 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1949 .unwrap()
1950 .pop()
1951 .unwrap();
1952
1953 let Statement::CreateTable(original_create) = stmt else {
1954 unreachable!()
1955 };
1956
1957 let expr = create_to_expr(&original_create, &QueryContext::arc()).unwrap();
1959
1960 let create_table = expr_to_create(&expr, Some('`')).unwrap();
1961 let new_sql = format!("{:#}", create_table);
1962 assert_eq!(sql, new_sql);
1963 }
1964}