1#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19
20use api::helper::ColumnDataTypeWrapper;
21use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
22use api::v1::alter_table_expr::Kind as AlterTableKind;
23use api::v1::column_def::{options_from_column_schema, try_as_column_schema};
24use api::v1::{
25 AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer, ColumnDataType,
26 ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn,
27 DropColumns, DropDefaults, ExpireAfter, FulltextBackend as PbFulltextBackend, ModifyColumnType,
28 ModifyColumnTypes, RenameTable, SemanticType, SetDatabaseOptions, SetDefaults, SetFulltext,
29 SetIndex, SetIndexes, SetInverted, SetSkipping, SetTableOptions,
30 SkippingIndexType as PbSkippingIndexType, TableName, UnsetDatabaseOptions, UnsetFulltext,
31 UnsetIndex, UnsetIndexes, UnsetInverted, UnsetSkipping, UnsetTableOptions, set_index,
32 unset_index,
33};
34use common_error::ext::BoxedError;
35use common_grpc_expr::util::ColumnExpr;
36use common_time::Timezone;
37use datafusion::sql::planner::object_name_to_table_reference;
38use datatypes::schema::{
39 COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
40 COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
41 COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
42 COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
43 ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema,
44 SkippingIndexType,
45};
46use file_engine::FileOptions;
47use query::sql::{
48 check_file_to_table_schema_compatibility, file_column_schemas_to_table,
49 infer_file_table_schema, prepare_file_table_files,
50};
51use session::context::QueryContextRef;
52use session::table_name::table_idents_to_full_name;
53use snafu::{OptionExt, ResultExt, ensure};
54use sql::ast::{
55 ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName, ObjectNamePartExt,
56};
57use sql::dialect::GreptimeDbDialect;
58use sql::parser::ParserContext;
59use sql::statements::alter::{
60 AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
61};
62use sql::statements::create::{
63 Column as SqlColumn, ColumnExtensions, CreateExternalTable, CreateFlow, CreateTable,
64 CreateView, TableConstraint,
65};
66use sql::statements::{
67 OptionMap, column_to_schema, concrete_data_type_to_sql_data_type,
68 sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type, value_to_sql_value,
69};
70use sql::util::extract_tables_from_query;
71use store_api::mito_engine_options::{COMPACTION_OVERRIDE, COMPACTION_TYPE};
72use table::requests::{FILE_TABLE_META_KEY, TableOptions};
73use table::table_reference::TableReference;
74#[cfg(feature = "enterprise")]
75pub use trigger::to_create_trigger_task_expr;
76
77use crate::error::{
78 BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
79 ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, FindNewColumnsOnInsertionSnafu,
80 IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidColumnDefSnafu,
81 InvalidFlowNameSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, ParseSqlValueSnafu,
82 PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu,
83};
84
85pub fn create_table_expr_by_column_schemas(
86 table_name: &TableReference<'_>,
87 column_schemas: &[api::v1::ColumnSchema],
88 engine: &str,
89 desc: Option<&str>,
90) -> Result<CreateTableExpr> {
91 let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
92 let expr = common_grpc_expr::util::build_create_table_expr(
93 None,
94 table_name,
95 column_exprs,
96 engine,
97 desc.unwrap_or("Created on insertion"),
98 )
99 .context(BuildCreateExprOnInsertionSnafu)?;
100
101 validate_create_expr(&expr)?;
102 Ok(expr)
103}
104
105pub fn extract_add_columns_expr(
106 schema: &Schema,
107 column_exprs: Vec<ColumnExpr>,
108) -> Result<Option<AddColumns>> {
109 let add_columns = common_grpc_expr::util::extract_new_columns(schema, column_exprs)
110 .context(FindNewColumnsOnInsertionSnafu)?;
111 if let Some(add_columns) = &add_columns {
112 validate_add_columns_expr(add_columns)?;
113 }
114 Ok(add_columns)
115}
116
117pub(crate) async fn create_external_expr(
141 create: CreateExternalTable,
142 query_ctx: &QueryContextRef,
143) -> Result<CreateTableExpr> {
144 let (catalog_name, schema_name, table_name) =
145 table_idents_to_full_name(&create.name, query_ctx)
146 .map_err(BoxedError::new)
147 .context(ExternalSnafu)?;
148
149 let mut table_options = create.options.into_map();
150
151 let (object_store, files) = prepare_file_table_files(&table_options)
152 .await
153 .context(PrepareFileTableSnafu)?;
154
155 let file_column_schemas = infer_file_table_schema(&object_store, &files, &table_options)
156 .await
157 .context(InferFileTableSchemaSnafu)?
158 .column_schemas()
159 .to_vec();
160
161 let (time_index, primary_keys, table_column_schemas) = if !create.columns.is_empty() {
162 let time_index = find_time_index(&create.constraints)?;
164 let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
165 let column_schemas =
166 columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
167 (time_index, primary_keys, column_schemas)
168 } else {
169 let (column_schemas, time_index) = file_column_schemas_to_table(&file_column_schemas);
171 let primary_keys = vec![];
172 (time_index, primary_keys, column_schemas)
173 };
174
175 check_file_to_table_schema_compatibility(&file_column_schemas, &table_column_schemas)
176 .context(SchemaIncompatibleSnafu)?;
177
178 let meta = FileOptions {
179 files,
180 file_column_schemas,
181 };
182 table_options.insert(
183 FILE_TABLE_META_KEY.to_string(),
184 serde_json::to_string(&meta).context(EncodeJsonSnafu)?,
185 );
186
187 let column_defs = column_schemas_to_defs(table_column_schemas, &primary_keys)?;
188 let expr = CreateTableExpr {
189 catalog_name,
190 schema_name,
191 table_name,
192 desc: String::default(),
193 column_defs,
194 time_index,
195 primary_keys,
196 create_if_not_exists: create.if_not_exists,
197 table_options,
198 table_id: None,
199 engine: create.engine.clone(),
200 };
201
202 Ok(expr)
203}
204
205pub fn create_to_expr(
207 create: &CreateTable,
208 query_ctx: &QueryContextRef,
209) -> Result<CreateTableExpr> {
210 let (catalog_name, schema_name, table_name) =
211 table_idents_to_full_name(&create.name, query_ctx)
212 .map_err(BoxedError::new)
213 .context(ExternalSnafu)?;
214
215 let time_index = find_time_index(&create.constraints)?;
216 let table_options = HashMap::from(
217 &TableOptions::try_from_iter(create.options.to_str_map())
218 .context(UnrecognizedTableOptionSnafu)?,
219 );
220
221 let mut table_options = table_options;
222 if table_options.contains_key(COMPACTION_TYPE) {
223 table_options.insert(COMPACTION_OVERRIDE.to_string(), "true".to_string());
224 }
225
226 let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
227
228 let expr = CreateTableExpr {
229 catalog_name,
230 schema_name,
231 table_name,
232 desc: String::default(),
233 column_defs: columns_to_expr(
234 &create.columns,
235 &time_index,
236 &primary_keys,
237 Some(&query_ctx.timezone()),
238 )?,
239 time_index,
240 primary_keys,
241 create_if_not_exists: create.if_not_exists,
242 table_options,
243 table_id: None,
244 engine: create.engine.clone(),
245 };
246
247 validate_create_expr(&expr)?;
248 Ok(expr)
249}
250
251pub fn expr_to_create(expr: &CreateTableExpr, quote_style: Option<char>) -> Result<CreateTable> {
259 let quote_style = quote_style.unwrap_or('`');
260
261 let table_name = ObjectName(vec![sql::ast::ObjectNamePart::Identifier(
263 sql::ast::Ident::with_quote(quote_style, &expr.table_name),
264 )]);
265
266 let mut columns = Vec::with_capacity(expr.column_defs.len());
268 for column_def in &expr.column_defs {
269 let column_schema = try_as_column_schema(column_def).context(InvalidColumnDefSnafu {
270 column: &column_def.name,
271 })?;
272
273 let mut options = Vec::new();
274
275 if column_def.is_nullable {
277 options.push(ColumnOptionDef {
278 name: None,
279 option: ColumnOption::Null,
280 });
281 } else {
282 options.push(ColumnOptionDef {
283 name: None,
284 option: ColumnOption::NotNull,
285 });
286 }
287
288 if let Some(default_constraint) = column_schema.default_constraint() {
290 let expr = match default_constraint {
291 ColumnDefaultConstraint::Value(v) => {
292 Expr::Value(value_to_sql_value(v).context(ParseSqlValueSnafu)?.into())
293 }
294 ColumnDefaultConstraint::Function(func_expr) => {
295 ParserContext::parse_function(func_expr, &GreptimeDbDialect {})
296 .context(ParseSqlSnafu)?
297 }
298 };
299 options.push(ColumnOptionDef {
300 name: None,
301 option: ColumnOption::Default(expr),
302 });
303 }
304
305 if !column_def.comment.is_empty() {
307 options.push(ColumnOptionDef {
308 name: None,
309 option: ColumnOption::Comment(column_def.comment.clone()),
310 });
311 }
312
313 let mut extensions = ColumnExtensions::default();
318
319 if let Ok(Some(opt)) = column_schema.fulltext_options()
321 && opt.enable
322 {
323 let mut map = HashMap::from([
324 (
325 COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
326 opt.analyzer.to_string(),
327 ),
328 (
329 COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
330 opt.case_sensitive.to_string(),
331 ),
332 (
333 COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
334 opt.backend.to_string(),
335 ),
336 ]);
337 if opt.backend == FulltextBackend::Bloom {
338 map.insert(
339 COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(),
340 opt.granularity.to_string(),
341 );
342 map.insert(
343 COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
344 opt.false_positive_rate().to_string(),
345 );
346 }
347 extensions.fulltext_index_options = Some(map.into());
348 }
349
350 if let Ok(Some(opt)) = column_schema.skipping_index_options() {
352 let map = HashMap::from([
353 (
354 COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
355 opt.granularity.to_string(),
356 ),
357 (
358 COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
359 opt.false_positive_rate().to_string(),
360 ),
361 (
362 COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
363 opt.index_type.to_string(),
364 ),
365 ]);
366 extensions.skipping_index_options = Some(map.into());
367 }
368
369 if column_schema.is_inverted_indexed() {
371 extensions.inverted_index_options = Some(HashMap::new().into());
372 }
373
374 let sql_column = SqlColumn {
375 column_def: ColumnDef {
376 name: Ident::with_quote(quote_style, &column_def.name),
377 data_type: concrete_data_type_to_sql_data_type(&column_schema.data_type)
378 .context(ParseSqlSnafu)?,
379 options,
380 },
381 extensions,
382 };
383
384 columns.push(sql_column);
385 }
386
387 let mut constraints = Vec::new();
389
390 constraints.push(TableConstraint::TimeIndex {
392 column: Ident::with_quote(quote_style, &expr.time_index),
393 });
394
395 if !expr.primary_keys.is_empty() {
397 let primary_key_columns: Vec<Ident> = expr
398 .primary_keys
399 .iter()
400 .map(|pk| Ident::with_quote(quote_style, pk))
401 .collect();
402
403 constraints.push(TableConstraint::PrimaryKey {
404 columns: primary_key_columns,
405 });
406 }
407
408 let mut options = OptionMap::default();
410 for (key, value) in &expr.table_options {
411 options.insert(key.clone(), value.clone());
412 }
413
414 Ok(CreateTable {
415 if_not_exists: expr.create_if_not_exists,
416 table_id: expr.table_id.as_ref().map(|tid| tid.id).unwrap_or(0),
417 name: table_name,
418 columns,
419 engine: expr.engine.clone(),
420 constraints,
421 options,
422 partitions: None,
423 })
424}
425
426pub fn validate_create_expr(create: &CreateTableExpr) -> Result<()> {
428 let mut column_to_indices = HashMap::with_capacity(create.column_defs.len());
430 for (idx, column) in create.column_defs.iter().enumerate() {
431 if let Some(indices) = column_to_indices.get(&column.name) {
432 return InvalidSqlSnafu {
433 err_msg: format!(
434 "column name `{}` is duplicated at index {} and {}",
435 column.name, indices, idx
436 ),
437 }
438 .fail();
439 }
440 column_to_indices.insert(&column.name, idx);
441 }
442
443 let _ = column_to_indices
445 .get(&create.time_index)
446 .with_context(|| InvalidSqlSnafu {
447 err_msg: format!(
448 "column name `{}` is not found in column list",
449 create.time_index
450 ),
451 })?;
452
453 for pk in &create.primary_keys {
455 let _ = column_to_indices
456 .get(&pk)
457 .with_context(|| InvalidSqlSnafu {
458 err_msg: format!("column name `{}` is not found in column list", pk),
459 })?;
460 }
461
462 let mut pk_set = HashSet::new();
464 for pk in &create.primary_keys {
465 if !pk_set.insert(pk) {
466 return InvalidSqlSnafu {
467 err_msg: format!("column name `{}` is duplicated in primary keys", pk),
468 }
469 .fail();
470 }
471 }
472
473 if pk_set.contains(&create.time_index) {
475 return InvalidSqlSnafu {
476 err_msg: format!(
477 "column name `{}` is both primary key and time index",
478 create.time_index
479 ),
480 }
481 .fail();
482 }
483
484 for column in &create.column_defs {
485 if is_interval_type(&column.data_type()) {
487 return InvalidSqlSnafu {
488 err_msg: format!(
489 "column name `{}` is interval type, which is not supported",
490 column.name
491 ),
492 }
493 .fail();
494 }
495 if is_date_time_type(&column.data_type()) {
497 return InvalidSqlSnafu {
498 err_msg: format!(
499 "column name `{}` is datetime type, which is not supported, please use `timestamp` type instead",
500 column.name
501 ),
502 }
503 .fail();
504 }
505 }
506 Ok(())
507}
508
509fn validate_add_columns_expr(add_columns: &AddColumns) -> Result<()> {
510 for add_column in &add_columns.add_columns {
511 let Some(column_def) = &add_column.column_def else {
512 continue;
513 };
514 if is_date_time_type(&column_def.data_type()) {
515 return InvalidSqlSnafu {
516 err_msg: format!("column name `{}` is datetime type, which is not supported, please use `timestamp` type instead", column_def.name),
517 }
518 .fail();
519 }
520 if is_interval_type(&column_def.data_type()) {
521 return InvalidSqlSnafu {
522 err_msg: format!(
523 "column name `{}` is interval type, which is not supported",
524 column_def.name
525 ),
526 }
527 .fail();
528 }
529 }
530 Ok(())
531}
532
533fn is_date_time_type(data_type: &ColumnDataType) -> bool {
534 matches!(data_type, ColumnDataType::Datetime)
535}
536
537fn is_interval_type(data_type: &ColumnDataType) -> bool {
538 matches!(
539 data_type,
540 ColumnDataType::IntervalYearMonth
541 | ColumnDataType::IntervalDayTime
542 | ColumnDataType::IntervalMonthDayNano
543 )
544}
545
546fn find_primary_keys(
547 columns: &[SqlColumn],
548 constraints: &[TableConstraint],
549) -> Result<Vec<String>> {
550 let columns_pk = columns
551 .iter()
552 .filter_map(|x| {
553 if x.options()
554 .iter()
555 .any(|o| matches!(o.option, ColumnOption::PrimaryKey(_)))
556 {
557 Some(x.name().value.clone())
558 } else {
559 None
560 }
561 })
562 .collect::<Vec<String>>();
563
564 ensure!(
565 columns_pk.len() <= 1,
566 IllegalPrimaryKeysDefSnafu {
567 msg: "not allowed to inline multiple primary keys in columns options"
568 }
569 );
570
571 let constraints_pk = constraints
572 .iter()
573 .filter_map(|constraint| match constraint {
574 TableConstraint::PrimaryKey { columns, .. } => {
575 Some(columns.iter().map(|ident| ident.value.clone()))
576 }
577 _ => None,
578 })
579 .flatten()
580 .collect::<Vec<String>>();
581
582 ensure!(
583 columns_pk.is_empty() || constraints_pk.is_empty(),
584 IllegalPrimaryKeysDefSnafu {
585 msg: "found definitions of primary keys in multiple places"
586 }
587 );
588
589 let mut primary_keys = Vec::with_capacity(columns_pk.len() + constraints_pk.len());
590 primary_keys.extend(columns_pk);
591 primary_keys.extend(constraints_pk);
592 Ok(primary_keys)
593}
594
595pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
596 let time_index = constraints
597 .iter()
598 .filter_map(|constraint| match constraint {
599 TableConstraint::TimeIndex { column, .. } => Some(&column.value),
600 _ => None,
601 })
602 .collect::<Vec<&String>>();
603 ensure!(
604 time_index.len() == 1,
605 InvalidSqlSnafu {
606 err_msg: "must have one and only one TimeIndex columns",
607 }
608 );
609 Ok(time_index[0].clone())
610}
611
612fn columns_to_expr(
613 column_defs: &[SqlColumn],
614 time_index: &str,
615 primary_keys: &[String],
616 timezone: Option<&Timezone>,
617) -> Result<Vec<api::v1::ColumnDef>> {
618 let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
619 column_schemas_to_defs(column_schemas, primary_keys)
620}
621
622fn columns_to_column_schemas(
623 columns: &[SqlColumn],
624 time_index: &str,
625 timezone: Option<&Timezone>,
626) -> Result<Vec<ColumnSchema>> {
627 columns
628 .iter()
629 .map(|c| column_to_schema(c, time_index, timezone).context(ParseSqlSnafu))
630 .collect::<Result<Vec<ColumnSchema>>>()
631}
632
633pub fn column_schemas_to_defs(
635 column_schemas: Vec<ColumnSchema>,
636 primary_keys: &[String],
637) -> Result<Vec<api::v1::ColumnDef>> {
638 let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
639 .iter()
640 .map(|c| {
641 ColumnDataTypeWrapper::try_from(c.data_type.clone())
642 .map(|w| w.to_parts())
643 .context(ColumnDataTypeSnafu)
644 })
645 .collect::<Result<Vec<_>>>()?;
646
647 column_schemas
648 .iter()
649 .zip(column_datatypes)
650 .map(|(schema, datatype)| {
651 let semantic_type = if schema.is_time_index() {
652 SemanticType::Timestamp
653 } else if primary_keys.contains(&schema.name) {
654 SemanticType::Tag
655 } else {
656 SemanticType::Field
657 } as i32;
658 let comment = schema
659 .metadata()
660 .get(COMMENT_KEY)
661 .cloned()
662 .unwrap_or_default();
663
664 Ok(api::v1::ColumnDef {
665 name: schema.name.clone(),
666 data_type: datatype.0 as i32,
667 is_nullable: schema.is_nullable(),
668 default_constraint: match schema.default_constraint() {
669 None => vec![],
670 Some(v) => {
671 v.clone()
672 .try_into()
673 .context(ConvertColumnDefaultConstraintSnafu {
674 column_name: &schema.name,
675 })?
676 }
677 },
678 semantic_type,
679 comment,
680 datatype_extension: datatype.1,
681 options: options_from_column_schema(schema),
682 })
683 })
684 .collect()
685}
686
687#[derive(Debug, Clone, PartialEq, Eq)]
688pub struct RepartitionRequest {
689 pub catalog_name: String,
690 pub schema_name: String,
691 pub table_name: String,
692 pub from_exprs: Vec<Expr>,
693 pub into_exprs: Vec<Expr>,
694 pub options: OptionMap,
695}
696
697pub(crate) fn to_repartition_request(
698 alter_table: AlterTable,
699 query_ctx: &QueryContextRef,
700) -> Result<RepartitionRequest> {
701 let AlterTable {
702 table_name,
703 alter_operation,
704 options,
705 } = alter_table;
706
707 let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&table_name, query_ctx)
708 .map_err(BoxedError::new)
709 .context(ExternalSnafu)?;
710
711 let AlterTableOperation::Repartition { operation } = alter_operation else {
712 return InvalidSqlSnafu {
713 err_msg: "expected REPARTITION operation",
714 }
715 .fail();
716 };
717
718 Ok(RepartitionRequest {
719 catalog_name,
720 schema_name,
721 table_name,
722 from_exprs: operation.from_exprs,
723 into_exprs: operation.into_exprs,
724 options,
725 })
726}
727
728pub(crate) fn to_alter_table_expr(
730 alter_table: AlterTable,
731 query_ctx: &QueryContextRef,
732) -> Result<AlterTableExpr> {
733 let (catalog_name, schema_name, table_name) =
734 table_idents_to_full_name(alter_table.table_name(), query_ctx)
735 .map_err(BoxedError::new)
736 .context(ExternalSnafu)?;
737
738 let kind = match alter_table.alter_operation {
739 AlterTableOperation::AddConstraint(_) => {
740 return NotSupportedSnafu {
741 feat: "ADD CONSTRAINT",
742 }
743 .fail();
744 }
745 AlterTableOperation::AddColumns { add_columns } => AlterTableKind::AddColumns(AddColumns {
746 add_columns: add_columns
747 .into_iter()
748 .map(|add_column| {
749 let column_def = sql_column_def_to_grpc_column_def(
750 &add_column.column_def,
751 Some(&query_ctx.timezone()),
752 )
753 .map_err(BoxedError::new)
754 .context(ExternalSnafu)?;
755 if is_interval_type(&column_def.data_type()) {
756 return NotSupportedSnafu {
757 feat: "Add column with interval type",
758 }
759 .fail();
760 }
761 Ok(AddColumn {
762 column_def: Some(column_def),
763 location: add_column.location.as_ref().map(From::from),
764 add_if_not_exists: add_column.add_if_not_exists,
765 })
766 })
767 .collect::<Result<Vec<AddColumn>>>()?,
768 }),
769 AlterTableOperation::ModifyColumnType {
770 column_name,
771 target_type,
772 } => {
773 let target_type =
774 sql_data_type_to_concrete_data_type(&target_type, &Default::default())
775 .context(ParseSqlSnafu)?;
776 let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
777 .map(|w| w.to_parts())
778 .context(ColumnDataTypeSnafu)?;
779 if is_interval_type(&target_type) {
780 return NotSupportedSnafu {
781 feat: "Modify column type to interval type",
782 }
783 .fail();
784 }
785 AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
786 modify_column_types: vec![ModifyColumnType {
787 column_name: column_name.value,
788 target_type: target_type as i32,
789 target_type_extension,
790 }],
791 })
792 }
793 AlterTableOperation::DropColumn { name } => AlterTableKind::DropColumns(DropColumns {
794 drop_columns: vec![DropColumn {
795 name: name.value.clone(),
796 }],
797 }),
798 AlterTableOperation::RenameTable { new_table_name } => {
799 AlterTableKind::RenameTable(RenameTable {
800 new_table_name: new_table_name.clone(),
801 })
802 }
803 AlterTableOperation::SetTableOptions { options } => {
804 AlterTableKind::SetTableOptions(SetTableOptions {
805 table_options: options.into_iter().map(Into::into).collect(),
806 })
807 }
808 AlterTableOperation::UnsetTableOptions { keys } => {
809 AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys })
810 }
811 AlterTableOperation::Repartition { .. } => {
812 return NotSupportedSnafu {
813 feat: "ALTER TABLE ... REPARTITION",
814 }
815 .fail();
816 }
817 AlterTableOperation::SetIndex { options } => {
818 let option = match options {
819 sql::statements::alter::SetIndexOperation::Fulltext {
820 column_name,
821 options,
822 } => SetIndex {
823 options: Some(set_index::Options::Fulltext(SetFulltext {
824 column_name: column_name.value,
825 enable: options.enable,
826 analyzer: match options.analyzer {
827 FulltextAnalyzer::English => Analyzer::English.into(),
828 FulltextAnalyzer::Chinese => Analyzer::Chinese.into(),
829 },
830 case_sensitive: options.case_sensitive,
831 backend: match options.backend {
832 FulltextBackend::Bloom => PbFulltextBackend::Bloom.into(),
833 FulltextBackend::Tantivy => PbFulltextBackend::Tantivy.into(),
834 },
835 granularity: options.granularity as u64,
836 false_positive_rate: options.false_positive_rate(),
837 })),
838 },
839 sql::statements::alter::SetIndexOperation::Inverted { column_name } => SetIndex {
840 options: Some(set_index::Options::Inverted(SetInverted {
841 column_name: column_name.value,
842 })),
843 },
844 sql::statements::alter::SetIndexOperation::Skipping {
845 column_name,
846 options,
847 } => SetIndex {
848 options: Some(set_index::Options::Skipping(SetSkipping {
849 column_name: column_name.value,
850 enable: true,
851 granularity: options.granularity as u64,
852 false_positive_rate: options.false_positive_rate(),
853 skipping_index_type: match options.index_type {
854 SkippingIndexType::BloomFilter => {
855 PbSkippingIndexType::BloomFilter.into()
856 }
857 },
858 })),
859 },
860 };
861 AlterTableKind::SetIndexes(SetIndexes {
862 set_indexes: vec![option],
863 })
864 }
865 AlterTableOperation::UnsetIndex { options } => {
866 let option = match options {
867 sql::statements::alter::UnsetIndexOperation::Fulltext { column_name } => {
868 UnsetIndex {
869 options: Some(unset_index::Options::Fulltext(UnsetFulltext {
870 column_name: column_name.value,
871 })),
872 }
873 }
874 sql::statements::alter::UnsetIndexOperation::Inverted { column_name } => {
875 UnsetIndex {
876 options: Some(unset_index::Options::Inverted(UnsetInverted {
877 column_name: column_name.value,
878 })),
879 }
880 }
881 sql::statements::alter::UnsetIndexOperation::Skipping { column_name } => {
882 UnsetIndex {
883 options: Some(unset_index::Options::Skipping(UnsetSkipping {
884 column_name: column_name.value,
885 })),
886 }
887 }
888 };
889
890 AlterTableKind::UnsetIndexes(UnsetIndexes {
891 unset_indexes: vec![option],
892 })
893 }
894 AlterTableOperation::DropDefaults { columns } => {
895 AlterTableKind::DropDefaults(DropDefaults {
896 drop_defaults: columns
897 .into_iter()
898 .map(|col| {
899 let column_name = col.0.to_string();
900 Ok(api::v1::DropDefault { column_name })
901 })
902 .collect::<Result<Vec<_>>>()?,
903 })
904 }
905 AlterTableOperation::SetDefaults { defaults } => AlterTableKind::SetDefaults(SetDefaults {
906 set_defaults: defaults
907 .into_iter()
908 .map(|col| {
909 let column_name = col.column_name.to_string();
910 let default_constraint = serde_json::to_string(&col.default_constraint)
911 .context(EncodeJsonSnafu)?
912 .into_bytes();
913 Ok(api::v1::SetDefault {
914 column_name,
915 default_constraint,
916 })
917 })
918 .collect::<Result<Vec<_>>>()?,
919 }),
920 };
921
922 Ok(AlterTableExpr {
923 catalog_name,
924 schema_name,
925 table_name,
926 kind: Some(kind),
927 })
928}
929
930pub fn to_alter_database_expr(
932 alter_database: AlterDatabase,
933 query_ctx: &QueryContextRef,
934) -> Result<AlterDatabaseExpr> {
935 let catalog = query_ctx.current_catalog();
936 let schema = alter_database.database_name;
937
938 let kind = match alter_database.alter_operation {
939 AlterDatabaseOperation::SetDatabaseOption { options } => {
940 let options = options.into_iter().map(Into::into).collect();
941 AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
942 set_database_options: options,
943 })
944 }
945 AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
946 AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys })
947 }
948 };
949
950 Ok(AlterDatabaseExpr {
951 catalog_name: catalog.to_string(),
952 schema_name: schema.to_string(),
953 kind: Some(kind),
954 })
955}
956
957pub fn to_create_view_expr(
959 stmt: CreateView,
960 logical_plan: Vec<u8>,
961 table_names: Vec<TableName>,
962 columns: Vec<String>,
963 plan_columns: Vec<String>,
964 definition: String,
965 query_ctx: QueryContextRef,
966) -> Result<CreateViewExpr> {
967 let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx)
968 .map_err(BoxedError::new)
969 .context(ExternalSnafu)?;
970
971 let expr = CreateViewExpr {
972 catalog_name,
973 schema_name,
974 view_name,
975 logical_plan,
976 create_if_not_exists: stmt.if_not_exists,
977 or_replace: stmt.or_replace,
978 table_names,
979 columns,
980 plan_columns,
981 definition,
982 };
983
984 Ok(expr)
985}
986
987pub fn to_create_flow_task_expr(
988 create_flow: CreateFlow,
989 query_ctx: &QueryContextRef,
990) -> Result<CreateFlowExpr> {
991 let sink_table_ref = object_name_to_table_reference(create_flow.sink_table_name.clone(), true)
993 .with_context(|_| ConvertIdentifierSnafu {
994 ident: create_flow.sink_table_name.to_string(),
995 })?;
996 let catalog = sink_table_ref
997 .catalog()
998 .unwrap_or(query_ctx.current_catalog())
999 .to_string();
1000 let schema = sink_table_ref
1001 .schema()
1002 .map(|s| s.to_owned())
1003 .unwrap_or(query_ctx.current_schema());
1004
1005 let sink_table_name = TableName {
1006 catalog_name: catalog,
1007 schema_name: schema,
1008 table_name: sink_table_ref.table().to_string(),
1009 };
1010
1011 let source_table_names = extract_tables_from_query(&create_flow.query)
1012 .map(|name| {
1013 let reference =
1014 object_name_to_table_reference(name.clone(), true).with_context(|_| {
1015 ConvertIdentifierSnafu {
1016 ident: name.to_string(),
1017 }
1018 })?;
1019 let catalog = reference
1020 .catalog()
1021 .unwrap_or(query_ctx.current_catalog())
1022 .to_string();
1023 let schema = reference
1024 .schema()
1025 .map(|s| s.to_string())
1026 .unwrap_or(query_ctx.current_schema());
1027
1028 let table_name = TableName {
1029 catalog_name: catalog,
1030 schema_name: schema,
1031 table_name: reference.table().to_string(),
1032 };
1033 Ok(table_name)
1034 })
1035 .collect::<Result<Vec<_>>>()?;
1036
1037 let eval_interval = create_flow.eval_interval;
1038
1039 Ok(CreateFlowExpr {
1040 catalog_name: query_ctx.current_catalog().to_string(),
1041 flow_name: sanitize_flow_name(create_flow.flow_name)?,
1042 source_table_names,
1043 sink_table_name: Some(sink_table_name),
1044 or_replace: create_flow.or_replace,
1045 create_if_not_exists: create_flow.if_not_exists,
1046 expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
1047 eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }),
1048 comment: create_flow.comment.unwrap_or_default(),
1049 sql: create_flow.query.to_string(),
1050 flow_options: stringify_flow_options(create_flow.flow_options)?,
1051 })
1052}
1053
1054fn stringify_flow_options(flow_options: OptionMap) -> Result<HashMap<String, String>> {
1055 let options_len = flow_options.len();
1056 let flow_options = flow_options.into_map();
1057 ensure!(
1058 flow_options.len() == options_len,
1059 InvalidSqlSnafu {
1060 err_msg: "flow options only support scalar string-compatible values".to_string(),
1061 }
1062 );
1063 Ok(flow_options)
1064}
1065
1066fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
1068 ensure!(
1069 flow_name.0.len() == 1,
1070 InvalidFlowNameSnafu {
1071 name: flow_name.to_string(),
1072 }
1073 );
1074 Ok(flow_name.0.swap_remove(0).to_string_unquoted())
1076}
1077
1078#[cfg(test)]
1079mod tests {
1080 use std::collections::HashMap;
1081
1082 use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
1083 use datatypes::value::Value;
1084 use session::context::{QueryContext, QueryContextBuilder};
1085 use sql::dialect::GreptimeDbDialect;
1086 use sql::parser::{ParseOptions, ParserContext};
1087 use sql::statements::statement::Statement;
1088 use store_api::storage::ColumnDefaultConstraint;
1089
1090 use super::*;
1091
1092 #[test]
1093 fn test_create_flow_tql_expr() {
1094 let sql = r#"
1095CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1096TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#;
1097 let stmt =
1098 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1099
1100 assert!(
1101 stmt.is_err(),
1102 "Expected error for invalid TQL EVAL parameters: {:#?}",
1103 stmt
1104 );
1105
1106 let sql = r#"
1107CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1108TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests);"#;
1109 let stmt =
1110 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1111 .unwrap()
1112 .pop()
1113 .unwrap();
1114
1115 let Statement::CreateFlow(create_flow) = stmt else {
1116 unreachable!()
1117 };
1118 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1119
1120 let to_dot_sep =
1121 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1122 assert_eq!("calc_reqs", expr.flow_name);
1123 assert_eq!("greptime", expr.catalog_name);
1124 assert_eq!(
1125 "greptime.public.cnt_reqs",
1126 expr.sink_table_name.map(to_dot_sep).unwrap()
1127 );
1128 assert_eq!(1, expr.source_table_names.len());
1129 assert_eq!(
1130 "greptime.public.http_requests",
1131 to_dot_sep(expr.source_table_names[0].clone())
1132 );
1133 assert_eq!(
1134 r#"TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests)"#,
1135 expr.sql
1136 );
1137
1138 let sql = r#"
1139CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1140TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests{__schema__="greptime_private"});"#;
1141 let stmt =
1142 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1143 .unwrap()
1144 .pop()
1145 .unwrap();
1146 let Statement::CreateFlow(create_flow) = stmt else {
1147 unreachable!()
1148 };
1149 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1150 assert_eq!(1, expr.source_table_names.len());
1151 assert_eq!(
1152 "greptime.greptime_private.http_requests",
1153 to_dot_sep(expr.source_table_names[0].clone())
1154 );
1155
1156 let sql = r#"
1157CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1158TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests{__database__="greptime_private"});"#;
1159 let stmt =
1160 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1161 .unwrap()
1162 .pop()
1163 .unwrap();
1164 let Statement::CreateFlow(create_flow) = stmt else {
1165 unreachable!()
1166 };
1167 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1168 assert_eq!(1, expr.source_table_names.len());
1169 assert_eq!(
1170 "greptime.greptime_private.http_requests",
1171 to_dot_sep(expr.source_table_names[0].clone())
1172 );
1173 }
1174
1175 #[test]
1176 fn test_create_flow_tql_cte_source_tables() {
1177 let sql = r#"
1178CREATE FLOW calc_cte
1179SINK TO metric_cte_sink
1180EVAL INTERVAL '1m'
1181AS
1182WITH tql(ts, the_value) AS (
1183 TQL EVAL (now() - '1m'::interval, now(), '5s') metric_cte
1184)
1185SELECT * FROM tql;
1186"#;
1187
1188 let stmt =
1189 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1190 .unwrap()
1191 .pop()
1192 .unwrap();
1193
1194 let Statement::CreateFlow(create_flow) = stmt else {
1195 unreachable!()
1196 };
1197 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1198
1199 let to_dot_sep =
1200 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1201 assert_eq!(1, expr.source_table_names.len());
1202 assert_eq!(
1203 "greptime.public.metric_cte",
1204 to_dot_sep(expr.source_table_names[0].clone())
1205 );
1206 }
1207
1208 #[test]
1209 fn test_create_flow_tql_cte_source_tables_quoted_cte_name() {
1210 let sql = r#"
1211CREATE FLOW calc_cte
1212SINK TO metric_cte_sink
1213EVAL INTERVAL '1m'
1214AS
1215WITH "TQL"(ts, the_value) AS (
1216 TQL EVAL (now() - '1m'::interval, now(), '5s') metric_cte
1217)
1218SELECT * FROM "TQL";
1219"#;
1220
1221 let stmt =
1222 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1223 .unwrap()
1224 .pop()
1225 .unwrap();
1226
1227 let Statement::CreateFlow(create_flow) = stmt else {
1228 unreachable!()
1229 };
1230 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1231
1232 let to_dot_sep =
1233 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1234 assert_eq!(1, expr.source_table_names.len());
1235 assert_eq!(
1236 "greptime.public.metric_cte",
1237 to_dot_sep(expr.source_table_names[0].clone())
1238 );
1239 }
1240
1241 #[test]
1242 fn test_create_flow_tql_cte_source_tables_same_name() {
1243 let sql = r#"
1244CREATE FLOW calc_cte
1245SINK TO metric_cte_sink
1246EVAL INTERVAL '1m'
1247AS
1248WITH tql(ts, the_value) AS (
1249 TQL EVAL (now() - '1m'::interval, now(), '5s') tql
1250)
1251SELECT * FROM tql;
1252"#;
1253
1254 let stmt =
1255 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1256 .unwrap()
1257 .pop()
1258 .unwrap();
1259
1260 let Statement::CreateFlow(create_flow) = stmt else {
1261 unreachable!()
1262 };
1263 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1264
1265 let to_dot_sep =
1266 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1267 assert_eq!(1, expr.source_table_names.len());
1268 assert_eq!(
1269 "greptime.public.tql",
1270 to_dot_sep(expr.source_table_names[0].clone())
1271 );
1272 }
1273
1274 #[test]
1275 fn test_create_flow_expr() {
1276 let sql = r"
1277CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
1278SELECT
1279 DISTINCT number as dis
1280FROM
1281 distinct_basic;";
1282 let stmt =
1283 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1284 .unwrap()
1285 .pop()
1286 .unwrap();
1287
1288 let Statement::CreateFlow(create_flow) = stmt else {
1289 unreachable!()
1290 };
1291 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1292
1293 let to_dot_sep =
1294 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1295 assert_eq!("test_distinct_basic", expr.flow_name);
1296 assert_eq!("greptime", expr.catalog_name);
1297 assert_eq!(
1298 "greptime.public.out_distinct_basic",
1299 expr.sink_table_name.map(to_dot_sep).unwrap()
1300 );
1301 assert_eq!(1, expr.source_table_names.len());
1302 assert_eq!(
1303 "greptime.public.distinct_basic",
1304 to_dot_sep(expr.source_table_names[0].clone())
1305 );
1306 assert_eq!(
1307 r"SELECT
1308 DISTINCT number as dis
1309FROM
1310 distinct_basic",
1311 expr.sql
1312 );
1313
1314 let sql = r"
1315CREATE FLOW `task_2`
1316SINK TO schema_1.table_1
1317AS
1318SELECT max(c1), min(c2) FROM schema_2.table_2;";
1319 let stmt =
1320 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1321 .unwrap()
1322 .pop()
1323 .unwrap();
1324
1325 let Statement::CreateFlow(create_flow) = stmt else {
1326 unreachable!()
1327 };
1328 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1329
1330 let to_dot_sep =
1331 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1332 assert_eq!("task_2", expr.flow_name);
1333 assert_eq!("greptime", expr.catalog_name);
1334 assert_eq!(
1335 "greptime.schema_1.table_1",
1336 expr.sink_table_name.map(to_dot_sep).unwrap()
1337 );
1338 assert_eq!(1, expr.source_table_names.len());
1339 assert_eq!(
1340 "greptime.schema_2.table_2",
1341 to_dot_sep(expr.source_table_names[0].clone())
1342 );
1343 assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
1344 assert!(expr.flow_options.is_empty());
1345
1346 let sql = r"
1347CREATE FLOW task_3
1348SINK TO schema_1.table_1
1349WITH (defer_on_missing_source = 'true', foo = 'bar')
1350AS
1351SELECT max(c1), min(c2) FROM schema_2.table_2;";
1352 let stmt =
1353 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1354 .unwrap()
1355 .pop()
1356 .unwrap();
1357
1358 let Statement::CreateFlow(create_flow) = stmt else {
1359 unreachable!()
1360 };
1361 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1362 assert_eq!(
1363 expr.flow_options,
1364 HashMap::from([
1365 ("defer_on_missing_source".to_string(), "true".to_string()),
1366 ("foo".to_string(), "bar".to_string()),
1367 ])
1368 );
1369
1370 let sql = r"
1371CREATE FLOW task_4
1372SINK TO schema_1.table_1
1373WITH (defer_on_missing_source = true)
1374AS
1375SELECT max(c1), min(c2) FROM schema_2.table_2;";
1376 let stmt =
1377 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1378 .unwrap()
1379 .pop()
1380 .unwrap();
1381
1382 let Statement::CreateFlow(create_flow) = stmt else {
1383 unreachable!()
1384 };
1385 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1386 assert_eq!(
1387 expr.flow_options,
1388 HashMap::from([("defer_on_missing_source".to_string(), "true".to_string(),)])
1389 );
1390
1391 let sql = r"
1392CREATE FLOW task_5
1393SINK TO schema_1.table_1
1394WITH (defer_on_missing_source = [true])
1395AS
1396SELECT max(c1), min(c2) FROM schema_2.table_2;";
1397 let stmt =
1398 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1399 .unwrap()
1400 .pop()
1401 .unwrap();
1402
1403 let Statement::CreateFlow(create_flow) = stmt else {
1404 unreachable!()
1405 };
1406 let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
1407 assert!(res.is_err());
1408 assert!(
1409 res.unwrap_err()
1410 .to_string()
1411 .contains("flow options only support scalar string-compatible values")
1412 );
1413
1414 let sql = r"
1415CREATE FLOW abc.`task_2`
1416SINK TO schema_1.table_1
1417AS
1418SELECT max(c1), min(c2) FROM schema_2.table_2;";
1419 let stmt =
1420 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1421 .unwrap()
1422 .pop()
1423 .unwrap();
1424
1425 let Statement::CreateFlow(create_flow) = stmt else {
1426 unreachable!()
1427 };
1428 let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
1429
1430 assert!(res.is_err());
1431 assert!(
1432 res.unwrap_err()
1433 .to_string()
1434 .contains("Invalid flow name: abc.`task_2`")
1435 );
1436 }
1437
1438 #[test]
1439 fn test_create_to_expr() {
1440 let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(ttl='3days', write_buffer_size='1024KB');";
1441 let stmt =
1442 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1443 .unwrap()
1444 .pop()
1445 .unwrap();
1446
1447 let Statement::CreateTable(create_table) = stmt else {
1448 unreachable!()
1449 };
1450 let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1451 assert_eq!("3days", expr.table_options.get("ttl").unwrap());
1452 assert_eq!(
1453 "1.0MiB",
1454 expr.table_options.get("write_buffer_size").unwrap()
1455 );
1456 }
1457
1458 #[test]
1459 fn test_invalid_create_to_expr() {
1460 let cases = [
1461 "CREATE TABLE monitor (host STRING primary key, ts TIMESTAMP TIME INDEX, some_column text, some_column string);",
1463 "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, some_column STRING, PRIMARY KEY (some_column, host, some_column));",
1465 "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, PRIMARY KEY (host, ts));",
1467 ];
1468
1469 for sql in cases {
1470 let stmt = ParserContext::create_with_dialect(
1471 sql,
1472 &GreptimeDbDialect {},
1473 ParseOptions::default(),
1474 )
1475 .unwrap()
1476 .pop()
1477 .unwrap();
1478 let Statement::CreateTable(create_table) = stmt else {
1479 unreachable!()
1480 };
1481 create_to_expr(&create_table, &QueryContext::arc()).unwrap_err();
1482 }
1483 }
1484
1485 #[test]
1486 fn test_create_to_expr_with_default_timestamp_value() {
1487 let sql = "CREATE TABLE monitor (v double,ts TIMESTAMP default '2024-01-30T00:01:01',TIME INDEX (ts)) engine=mito;";
1488 let stmt =
1489 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1490 .unwrap()
1491 .pop()
1492 .unwrap();
1493
1494 let Statement::CreateTable(create_table) = stmt else {
1495 unreachable!()
1496 };
1497
1498 let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1500 let ts_column = &expr.column_defs[1];
1501 let constraint = assert_ts_column(ts_column);
1502 assert!(
1503 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1504 if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1505 );
1506
1507 let ctx = QueryContextBuilder::default()
1509 .timezone(Timezone::from_tz_string("+08:00").unwrap())
1510 .build()
1511 .into();
1512 let expr = create_to_expr(&create_table, &ctx).unwrap();
1513 let ts_column = &expr.column_defs[1];
1514 let constraint = assert_ts_column(ts_column);
1515 assert!(
1516 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1517 if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1518 );
1519 }
1520
1521 fn assert_ts_column(ts_column: &api::v1::ColumnDef) -> ColumnDefaultConstraint {
1522 assert_eq!("ts", ts_column.name);
1523 assert_eq!(
1524 ColumnDataType::TimestampMillisecond as i32,
1525 ts_column.data_type
1526 );
1527 assert!(!ts_column.default_constraint.is_empty());
1528
1529 ColumnDefaultConstraint::try_from(&ts_column.default_constraint[..]).unwrap()
1530 }
1531
1532 #[test]
1533 fn test_to_alter_expr() {
1534 let sql = "ALTER DATABASE greptime SET key1='value1', key2='value2';";
1535 let stmt =
1536 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1537 .unwrap()
1538 .pop()
1539 .unwrap();
1540
1541 let Statement::AlterDatabase(alter_database) = stmt else {
1542 unreachable!()
1543 };
1544
1545 let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1546 let kind = expr.kind.unwrap();
1547
1548 let AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
1549 set_database_options,
1550 }) = kind
1551 else {
1552 unreachable!()
1553 };
1554
1555 assert_eq!(2, set_database_options.len());
1556 assert_eq!("key1", set_database_options[0].key);
1557 assert_eq!("value1", set_database_options[0].value);
1558 assert_eq!("key2", set_database_options[1].key);
1559 assert_eq!("value2", set_database_options[1].value);
1560
1561 let sql = "ALTER DATABASE greptime UNSET key1, key2;";
1562 let stmt =
1563 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1564 .unwrap()
1565 .pop()
1566 .unwrap();
1567
1568 let Statement::AlterDatabase(alter_database) = stmt else {
1569 unreachable!()
1570 };
1571
1572 let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1573 let kind = expr.kind.unwrap();
1574
1575 let AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys }) = kind else {
1576 unreachable!()
1577 };
1578
1579 assert_eq!(2, keys.len());
1580 assert!(keys.contains(&"key1".to_string()));
1581 assert!(keys.contains(&"key2".to_string()));
1582
1583 let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
1584 let stmt =
1585 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1586 .unwrap()
1587 .pop()
1588 .unwrap();
1589
1590 let Statement::AlterTable(alter_table) = stmt else {
1591 unreachable!()
1592 };
1593
1594 let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1596 let kind = expr.kind.unwrap();
1597
1598 let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1599 unreachable!()
1600 };
1601
1602 assert_eq!(1, add_columns.len());
1603 let ts_column = add_columns[0].column_def.clone().unwrap();
1604 let constraint = assert_ts_column(&ts_column);
1605 assert!(
1606 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1607 if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1608 );
1609
1610 let ctx = QueryContextBuilder::default()
1613 .timezone(Timezone::from_tz_string("+08:00").unwrap())
1614 .build()
1615 .into();
1616 let expr = to_alter_table_expr(alter_table, &ctx).unwrap();
1617 let kind = expr.kind.unwrap();
1618
1619 let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1620 unreachable!()
1621 };
1622
1623 assert_eq!(1, add_columns.len());
1624 let ts_column = add_columns[0].column_def.clone().unwrap();
1625 let constraint = assert_ts_column(&ts_column);
1626 assert!(
1627 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1628 if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1629 );
1630 }
1631
1632 #[test]
1633 fn test_to_alter_modify_column_type_expr() {
1634 let sql = "ALTER TABLE monitor MODIFY COLUMN mem_usage STRING;";
1635 let stmt =
1636 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1637 .unwrap()
1638 .pop()
1639 .unwrap();
1640
1641 let Statement::AlterTable(alter_table) = stmt else {
1642 unreachable!()
1643 };
1644
1645 let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1647 let kind = expr.kind.unwrap();
1648
1649 let AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
1650 modify_column_types,
1651 }) = kind
1652 else {
1653 unreachable!()
1654 };
1655
1656 assert_eq!(1, modify_column_types.len());
1657 let modify_column_type = &modify_column_types[0];
1658
1659 assert_eq!("mem_usage", modify_column_type.column_name);
1660 assert_eq!(
1661 ColumnDataType::String as i32,
1662 modify_column_type.target_type
1663 );
1664 assert!(modify_column_type.target_type_extension.is_none());
1665 }
1666
1667 #[test]
1668 fn test_to_repartition_request() {
1669 let sql = r#"
1670ALTER TABLE metrics REPARTITION (
1671 device_id < 100
1672) INTO (
1673 device_id < 100 AND area < 'South',
1674 device_id < 100 AND area >= 'South'
1675);"#;
1676 let stmt =
1677 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1678 .unwrap()
1679 .pop()
1680 .unwrap();
1681
1682 let Statement::AlterTable(alter_table) = stmt else {
1683 unreachable!()
1684 };
1685
1686 let request = to_repartition_request(alter_table, &QueryContext::arc()).unwrap();
1687 assert_eq!("greptime", request.catalog_name);
1688 assert_eq!("public", request.schema_name);
1689 assert_eq!("metrics", request.table_name);
1690 assert_eq!(
1691 request
1692 .from_exprs
1693 .into_iter()
1694 .map(|x| x.to_string())
1695 .collect::<Vec<_>>(),
1696 vec!["device_id < 100".to_string()]
1697 );
1698 assert_eq!(
1699 request
1700 .into_exprs
1701 .into_iter()
1702 .map(|x| x.to_string())
1703 .collect::<Vec<_>>(),
1704 vec![
1705 "device_id < 100 AND area < 'South'".to_string(),
1706 "device_id < 100 AND area >= 'South'".to_string()
1707 ]
1708 );
1709 }
1710
1711 fn new_test_table_names() -> Vec<TableName> {
1712 vec![
1713 TableName {
1714 catalog_name: "greptime".to_string(),
1715 schema_name: "public".to_string(),
1716 table_name: "a_table".to_string(),
1717 },
1718 TableName {
1719 catalog_name: "greptime".to_string(),
1720 schema_name: "public".to_string(),
1721 table_name: "b_table".to_string(),
1722 },
1723 ]
1724 }
1725
1726 #[test]
1727 fn test_to_create_view_expr() {
1728 let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
1729 let stmt =
1730 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1731 .unwrap()
1732 .pop()
1733 .unwrap();
1734
1735 let Statement::CreateView(stmt) = stmt else {
1736 unreachable!()
1737 };
1738
1739 let logical_plan = vec![1, 2, 3];
1740 let table_names = new_test_table_names();
1741 let columns = vec!["a".to_string()];
1742 let plan_columns = vec!["number".to_string()];
1743
1744 let expr = to_create_view_expr(
1745 stmt,
1746 logical_plan.clone(),
1747 table_names.clone(),
1748 columns.clone(),
1749 plan_columns.clone(),
1750 sql.to_string(),
1751 QueryContext::arc(),
1752 )
1753 .unwrap();
1754
1755 assert_eq!("greptime", expr.catalog_name);
1756 assert_eq!("public", expr.schema_name);
1757 assert_eq!("test", expr.view_name);
1758 assert!(!expr.create_if_not_exists);
1759 assert!(!expr.or_replace);
1760 assert_eq!(logical_plan, expr.logical_plan);
1761 assert_eq!(table_names, expr.table_names);
1762 assert_eq!(sql, expr.definition);
1763 assert_eq!(columns, expr.columns);
1764 assert_eq!(plan_columns, expr.plan_columns);
1765 }
1766
1767 #[test]
1768 fn test_to_create_view_expr_complex() {
1769 let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test.test_view AS SELECT * FROM NUMBERS";
1770 let stmt =
1771 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1772 .unwrap()
1773 .pop()
1774 .unwrap();
1775
1776 let Statement::CreateView(stmt) = stmt else {
1777 unreachable!()
1778 };
1779
1780 let logical_plan = vec![1, 2, 3];
1781 let table_names = new_test_table_names();
1782 let columns = vec!["a".to_string()];
1783 let plan_columns = vec!["number".to_string()];
1784
1785 let expr = to_create_view_expr(
1786 stmt,
1787 logical_plan.clone(),
1788 table_names.clone(),
1789 columns.clone(),
1790 plan_columns.clone(),
1791 sql.to_string(),
1792 QueryContext::arc(),
1793 )
1794 .unwrap();
1795
1796 assert_eq!("greptime", expr.catalog_name);
1797 assert_eq!("test", expr.schema_name);
1798 assert_eq!("test_view", expr.view_name);
1799 assert!(expr.create_if_not_exists);
1800 assert!(expr.or_replace);
1801 assert_eq!(logical_plan, expr.logical_plan);
1802 assert_eq!(table_names, expr.table_names);
1803 assert_eq!(sql, expr.definition);
1804 assert_eq!(columns, expr.columns);
1805 assert_eq!(plan_columns, expr.plan_columns);
1806 }
1807
1808 #[test]
1809 fn test_expr_to_create() {
1810 let sql = r#"CREATE TABLE IF NOT EXISTS `tt` (
1811 `timestamp` TIMESTAMP(9) NOT NULL,
1812 `ip_address` STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),
1813 `username` STRING NULL,
1814 `http_method` STRING NULL INVERTED INDEX,
1815 `request_line` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),
1816 `protocol` STRING NULL,
1817 `status_code` INT NULL INVERTED INDEX,
1818 `response_size` BIGINT NULL,
1819 `message` STRING NULL,
1820 TIME INDEX (`timestamp`),
1821 PRIMARY KEY (`username`, `status_code`)
1822)
1823ENGINE=mito
1824WITH(
1825 append_mode = 'true'
1826)"#;
1827 let stmt =
1828 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1829 .unwrap()
1830 .pop()
1831 .unwrap();
1832
1833 let Statement::CreateTable(original_create) = stmt else {
1834 unreachable!()
1835 };
1836
1837 let expr = create_to_expr(&original_create, &QueryContext::arc()).unwrap();
1839
1840 let create_table = expr_to_create(&expr, Some('`')).unwrap();
1841 let new_sql = format!("{:#}", create_table);
1842 assert_eq!(sql, new_sql);
1843 }
1844}