1#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19
20use api::helper::ColumnDataTypeWrapper;
21use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
22use api::v1::alter_table_expr::Kind as AlterTableKind;
23use api::v1::column_def::{options_from_column_schema, try_as_column_schema};
24use api::v1::{
25 AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer, ColumnDataType,
26 ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn,
27 DropColumns, DropDefaults, ExpireAfter, FulltextBackend as PbFulltextBackend, ModifyColumnType,
28 ModifyColumnTypes, RenameTable, SemanticType, SetDatabaseOptions, SetDefaults, SetFulltext,
29 SetIndex, SetIndexes, SetInverted, SetSkipping, SetTableOptions,
30 SkippingIndexType as PbSkippingIndexType, TableName, UnsetDatabaseOptions, UnsetFulltext,
31 UnsetIndex, UnsetIndexes, UnsetInverted, UnsetSkipping, UnsetTableOptions, set_index,
32 unset_index,
33};
34use common_error::ext::BoxedError;
35use common_grpc_expr::util::ColumnExpr;
36use common_time::Timezone;
37use datafusion::sql::planner::object_name_to_table_reference;
38use datatypes::schema::{
39 COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
40 COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
41 COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
42 COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
43 ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema,
44 SkippingIndexType,
45};
46use file_engine::FileOptions;
47use query::sql::{
48 check_file_to_table_schema_compatibility, file_column_schemas_to_table,
49 infer_file_table_schema, prepare_file_table_files,
50};
51use session::context::QueryContextRef;
52use session::table_name::table_idents_to_full_name;
53use snafu::{OptionExt, ResultExt, ensure};
54use sql::ast::{
55 ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName, ObjectNamePartExt,
56};
57use sql::dialect::GreptimeDbDialect;
58use sql::parser::ParserContext;
59use sql::statements::alter::{
60 AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
61};
62use sql::statements::create::{
63 Column as SqlColumn, ColumnExtensions, CreateExternalTable, CreateFlow, CreateTable,
64 CreateView, TableConstraint,
65};
66use sql::statements::{
67 OptionMap, column_to_schema, concrete_data_type_to_sql_data_type,
68 sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type, value_to_sql_value,
69};
70use sql::util::extract_tables_from_query;
71use store_api::mito_engine_options::{COMPACTION_OVERRIDE, COMPACTION_TYPE};
72use table::requests::{FILE_TABLE_META_KEY, TableOptions};
73use table::table_reference::TableReference;
74#[cfg(feature = "enterprise")]
75pub use trigger::to_create_trigger_task_expr;
76
77use crate::error::{
78 BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
79 ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, FindNewColumnsOnInsertionSnafu,
80 IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidColumnDefSnafu,
81 InvalidFlowNameSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, ParseSqlValueSnafu,
82 PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu,
83};
84
85pub fn create_table_expr_by_column_schemas(
86 table_name: &TableReference<'_>,
87 column_schemas: &[api::v1::ColumnSchema],
88 engine: &str,
89 desc: Option<&str>,
90) -> Result<CreateTableExpr> {
91 let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
92 let expr = common_grpc_expr::util::build_create_table_expr(
93 None,
94 table_name,
95 column_exprs,
96 engine,
97 desc.unwrap_or("Created on insertion"),
98 )
99 .context(BuildCreateExprOnInsertionSnafu)?;
100
101 validate_create_expr(&expr)?;
102 Ok(expr)
103}
104
105pub fn extract_add_columns_expr(
106 schema: &Schema,
107 column_exprs: Vec<ColumnExpr>,
108) -> Result<Option<AddColumns>> {
109 let add_columns = common_grpc_expr::util::extract_new_columns(schema, column_exprs)
110 .context(FindNewColumnsOnInsertionSnafu)?;
111 if let Some(add_columns) = &add_columns {
112 validate_add_columns_expr(add_columns)?;
113 }
114 Ok(add_columns)
115}
116
117pub(crate) async fn create_external_expr(
141 create: CreateExternalTable,
142 query_ctx: &QueryContextRef,
143) -> Result<CreateTableExpr> {
144 let (catalog_name, schema_name, table_name) =
145 table_idents_to_full_name(&create.name, query_ctx)
146 .map_err(BoxedError::new)
147 .context(ExternalSnafu)?;
148
149 let mut table_options = create.options.into_map();
150
151 let (object_store, files) = prepare_file_table_files(&table_options)
152 .await
153 .context(PrepareFileTableSnafu)?;
154
155 let file_column_schemas = infer_file_table_schema(&object_store, &files, &table_options)
156 .await
157 .context(InferFileTableSchemaSnafu)?
158 .column_schemas()
159 .to_vec();
160
161 let (time_index, primary_keys, table_column_schemas) = if !create.columns.is_empty() {
162 let time_index = find_time_index(&create.constraints)?;
164 let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
165 let column_schemas =
166 columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
167 (time_index, primary_keys, column_schemas)
168 } else {
169 let (column_schemas, time_index) = file_column_schemas_to_table(&file_column_schemas);
171 let primary_keys = vec![];
172 (time_index, primary_keys, column_schemas)
173 };
174
175 check_file_to_table_schema_compatibility(&file_column_schemas, &table_column_schemas)
176 .context(SchemaIncompatibleSnafu)?;
177
178 let meta = FileOptions {
179 files,
180 file_column_schemas,
181 };
182 table_options.insert(
183 FILE_TABLE_META_KEY.to_string(),
184 serde_json::to_string(&meta).context(EncodeJsonSnafu)?,
185 );
186
187 let column_defs = column_schemas_to_defs(table_column_schemas, &primary_keys)?;
188 let expr = CreateTableExpr {
189 catalog_name,
190 schema_name,
191 table_name,
192 desc: String::default(),
193 column_defs,
194 time_index,
195 primary_keys,
196 create_if_not_exists: create.if_not_exists,
197 table_options,
198 table_id: None,
199 engine: create.engine.clone(),
200 };
201
202 Ok(expr)
203}
204
205pub fn create_to_expr(
207 create: &CreateTable,
208 query_ctx: &QueryContextRef,
209) -> Result<CreateTableExpr> {
210 let (catalog_name, schema_name, table_name) =
211 table_idents_to_full_name(&create.name, query_ctx)
212 .map_err(BoxedError::new)
213 .context(ExternalSnafu)?;
214
215 let time_index = find_time_index(&create.constraints)?;
216 let table_options = HashMap::from(
217 &TableOptions::try_from_iter(create.options.to_str_map())
218 .context(UnrecognizedTableOptionSnafu)?,
219 );
220
221 let mut table_options = table_options;
222 if table_options.contains_key(COMPACTION_TYPE) {
223 table_options.insert(COMPACTION_OVERRIDE.to_string(), "true".to_string());
224 }
225
226 let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
227
228 let expr = CreateTableExpr {
229 catalog_name,
230 schema_name,
231 table_name,
232 desc: String::default(),
233 column_defs: columns_to_expr(
234 &create.columns,
235 &time_index,
236 &primary_keys,
237 Some(&query_ctx.timezone()),
238 )?,
239 time_index,
240 primary_keys,
241 create_if_not_exists: create.if_not_exists,
242 table_options,
243 table_id: None,
244 engine: create.engine.clone(),
245 };
246
247 validate_create_expr(&expr)?;
248 Ok(expr)
249}
250
251pub fn expr_to_create(expr: &CreateTableExpr, quote_style: Option<char>) -> Result<CreateTable> {
259 let quote_style = quote_style.unwrap_or('`');
260
261 let table_name = ObjectName(vec![sql::ast::ObjectNamePart::Identifier(
263 sql::ast::Ident::with_quote(quote_style, &expr.table_name),
264 )]);
265
266 let mut columns = Vec::with_capacity(expr.column_defs.len());
268 for column_def in &expr.column_defs {
269 let column_schema = try_as_column_schema(column_def).context(InvalidColumnDefSnafu {
270 column: &column_def.name,
271 })?;
272
273 let mut options = Vec::new();
274
275 if column_def.is_nullable {
277 options.push(ColumnOptionDef {
278 name: None,
279 option: ColumnOption::Null,
280 });
281 } else {
282 options.push(ColumnOptionDef {
283 name: None,
284 option: ColumnOption::NotNull,
285 });
286 }
287
288 if let Some(default_constraint) = column_schema.default_constraint() {
290 let expr = match default_constraint {
291 ColumnDefaultConstraint::Value(v) => {
292 Expr::Value(value_to_sql_value(v).context(ParseSqlValueSnafu)?.into())
293 }
294 ColumnDefaultConstraint::Function(func_expr) => {
295 ParserContext::parse_function(func_expr, &GreptimeDbDialect {})
296 .context(ParseSqlSnafu)?
297 }
298 };
299 options.push(ColumnOptionDef {
300 name: None,
301 option: ColumnOption::Default(expr),
302 });
303 }
304
305 if !column_def.comment.is_empty() {
307 options.push(ColumnOptionDef {
308 name: None,
309 option: ColumnOption::Comment(column_def.comment.clone()),
310 });
311 }
312
313 let mut extensions = ColumnExtensions::default();
318
319 if let Ok(Some(opt)) = column_schema.fulltext_options()
321 && opt.enable
322 {
323 let mut map = HashMap::from([
324 (
325 COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
326 opt.analyzer.to_string(),
327 ),
328 (
329 COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
330 opt.case_sensitive.to_string(),
331 ),
332 (
333 COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
334 opt.backend.to_string(),
335 ),
336 ]);
337 if opt.backend == FulltextBackend::Bloom {
338 map.insert(
339 COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(),
340 opt.granularity.to_string(),
341 );
342 map.insert(
343 COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
344 opt.false_positive_rate().to_string(),
345 );
346 }
347 extensions.fulltext_index_options = Some(map.into());
348 }
349
350 if let Ok(Some(opt)) = column_schema.skipping_index_options() {
352 let map = HashMap::from([
353 (
354 COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
355 opt.granularity.to_string(),
356 ),
357 (
358 COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
359 opt.false_positive_rate().to_string(),
360 ),
361 (
362 COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
363 opt.index_type.to_string(),
364 ),
365 ]);
366 extensions.skipping_index_options = Some(map.into());
367 }
368
369 if column_schema.is_inverted_indexed() {
371 extensions.inverted_index_options = Some(HashMap::new().into());
372 }
373
374 let sql_column = SqlColumn {
375 column_def: ColumnDef {
376 name: Ident::with_quote(quote_style, &column_def.name),
377 data_type: concrete_data_type_to_sql_data_type(&column_schema.data_type)
378 .context(ParseSqlSnafu)?,
379 options,
380 },
381 extensions,
382 };
383
384 columns.push(sql_column);
385 }
386
387 let mut constraints = Vec::new();
389
390 constraints.push(TableConstraint::TimeIndex {
392 column: Ident::with_quote(quote_style, &expr.time_index),
393 });
394
395 if !expr.primary_keys.is_empty() {
397 let primary_key_columns: Vec<Ident> = expr
398 .primary_keys
399 .iter()
400 .map(|pk| Ident::with_quote(quote_style, pk))
401 .collect();
402
403 constraints.push(TableConstraint::PrimaryKey {
404 columns: primary_key_columns,
405 });
406 }
407
408 let mut options = OptionMap::default();
410 for (key, value) in &expr.table_options {
411 options.insert(key.clone(), value.clone());
412 }
413
414 Ok(CreateTable {
415 if_not_exists: expr.create_if_not_exists,
416 table_id: expr.table_id.as_ref().map(|tid| tid.id).unwrap_or(0),
417 name: table_name,
418 columns,
419 engine: expr.engine.clone(),
420 constraints,
421 options,
422 partitions: None,
423 })
424}
425
426pub fn validate_create_expr(create: &CreateTableExpr) -> Result<()> {
428 let mut column_to_indices = HashMap::with_capacity(create.column_defs.len());
430 for (idx, column) in create.column_defs.iter().enumerate() {
431 if let Some(indices) = column_to_indices.get(&column.name) {
432 return InvalidSqlSnafu {
433 err_msg: format!(
434 "column name `{}` is duplicated at index {} and {}",
435 column.name, indices, idx
436 ),
437 }
438 .fail();
439 }
440 column_to_indices.insert(&column.name, idx);
441 }
442
443 let _ = column_to_indices
445 .get(&create.time_index)
446 .with_context(|| InvalidSqlSnafu {
447 err_msg: format!(
448 "column name `{}` is not found in column list",
449 create.time_index
450 ),
451 })?;
452
453 for pk in &create.primary_keys {
455 let _ = column_to_indices
456 .get(&pk)
457 .with_context(|| InvalidSqlSnafu {
458 err_msg: format!("column name `{}` is not found in column list", pk),
459 })?;
460 }
461
462 let mut pk_set = HashSet::new();
464 for pk in &create.primary_keys {
465 if !pk_set.insert(pk) {
466 return InvalidSqlSnafu {
467 err_msg: format!("column name `{}` is duplicated in primary keys", pk),
468 }
469 .fail();
470 }
471 }
472
473 if pk_set.contains(&create.time_index) {
475 return InvalidSqlSnafu {
476 err_msg: format!(
477 "column name `{}` is both primary key and time index",
478 create.time_index
479 ),
480 }
481 .fail();
482 }
483
484 for column in &create.column_defs {
485 if is_interval_type(&column.data_type()) {
487 return InvalidSqlSnafu {
488 err_msg: format!(
489 "column name `{}` is interval type, which is not supported",
490 column.name
491 ),
492 }
493 .fail();
494 }
495 if is_date_time_type(&column.data_type()) {
497 return InvalidSqlSnafu {
498 err_msg: format!(
499 "column name `{}` is datetime type, which is not supported, please use `timestamp` type instead",
500 column.name
501 ),
502 }
503 .fail();
504 }
505 }
506 Ok(())
507}
508
509fn validate_add_columns_expr(add_columns: &AddColumns) -> Result<()> {
510 for add_column in &add_columns.add_columns {
511 let Some(column_def) = &add_column.column_def else {
512 continue;
513 };
514 if is_date_time_type(&column_def.data_type()) {
515 return InvalidSqlSnafu {
516 err_msg: format!("column name `{}` is datetime type, which is not supported, please use `timestamp` type instead", column_def.name),
517 }
518 .fail();
519 }
520 if is_interval_type(&column_def.data_type()) {
521 return InvalidSqlSnafu {
522 err_msg: format!(
523 "column name `{}` is interval type, which is not supported",
524 column_def.name
525 ),
526 }
527 .fail();
528 }
529 }
530 Ok(())
531}
532
533fn is_date_time_type(data_type: &ColumnDataType) -> bool {
534 matches!(data_type, ColumnDataType::Datetime)
535}
536
537fn is_interval_type(data_type: &ColumnDataType) -> bool {
538 matches!(
539 data_type,
540 ColumnDataType::IntervalYearMonth
541 | ColumnDataType::IntervalDayTime
542 | ColumnDataType::IntervalMonthDayNano
543 )
544}
545
546fn find_primary_keys(
547 columns: &[SqlColumn],
548 constraints: &[TableConstraint],
549) -> Result<Vec<String>> {
550 let columns_pk = columns
551 .iter()
552 .filter_map(|x| {
553 if x.options()
554 .iter()
555 .any(|o| matches!(o.option, ColumnOption::PrimaryKey(_)))
556 {
557 Some(x.name().value.clone())
558 } else {
559 None
560 }
561 })
562 .collect::<Vec<String>>();
563
564 ensure!(
565 columns_pk.len() <= 1,
566 IllegalPrimaryKeysDefSnafu {
567 msg: "not allowed to inline multiple primary keys in columns options"
568 }
569 );
570
571 let constraints_pk = constraints
572 .iter()
573 .filter_map(|constraint| match constraint {
574 TableConstraint::PrimaryKey { columns, .. } => {
575 Some(columns.iter().map(|ident| ident.value.clone()))
576 }
577 _ => None,
578 })
579 .flatten()
580 .collect::<Vec<String>>();
581
582 ensure!(
583 columns_pk.is_empty() || constraints_pk.is_empty(),
584 IllegalPrimaryKeysDefSnafu {
585 msg: "found definitions of primary keys in multiple places"
586 }
587 );
588
589 let mut primary_keys = Vec::with_capacity(columns_pk.len() + constraints_pk.len());
590 primary_keys.extend(columns_pk);
591 primary_keys.extend(constraints_pk);
592 Ok(primary_keys)
593}
594
595pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
596 let time_index = constraints
597 .iter()
598 .filter_map(|constraint| match constraint {
599 TableConstraint::TimeIndex { column, .. } => Some(&column.value),
600 _ => None,
601 })
602 .collect::<Vec<&String>>();
603 ensure!(
604 time_index.len() == 1,
605 InvalidSqlSnafu {
606 err_msg: "must have one and only one TimeIndex columns",
607 }
608 );
609 Ok(time_index[0].clone())
610}
611
612fn columns_to_expr(
613 column_defs: &[SqlColumn],
614 time_index: &str,
615 primary_keys: &[String],
616 timezone: Option<&Timezone>,
617) -> Result<Vec<api::v1::ColumnDef>> {
618 let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
619 column_schemas_to_defs(column_schemas, primary_keys)
620}
621
622fn columns_to_column_schemas(
623 columns: &[SqlColumn],
624 time_index: &str,
625 timezone: Option<&Timezone>,
626) -> Result<Vec<ColumnSchema>> {
627 columns
628 .iter()
629 .map(|c| column_to_schema(c, time_index, timezone).context(ParseSqlSnafu))
630 .collect::<Result<Vec<ColumnSchema>>>()
631}
632
633pub fn column_schemas_to_defs(
635 column_schemas: Vec<ColumnSchema>,
636 primary_keys: &[String],
637) -> Result<Vec<api::v1::ColumnDef>> {
638 let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
639 .iter()
640 .map(|c| {
641 ColumnDataTypeWrapper::try_from(c.data_type.clone())
642 .map(|w| w.to_parts())
643 .context(ColumnDataTypeSnafu)
644 })
645 .collect::<Result<Vec<_>>>()?;
646
647 column_schemas
648 .iter()
649 .zip(column_datatypes)
650 .map(|(schema, datatype)| {
651 let semantic_type = if schema.is_time_index() {
652 SemanticType::Timestamp
653 } else if primary_keys.contains(&schema.name) {
654 SemanticType::Tag
655 } else {
656 SemanticType::Field
657 } as i32;
658 let comment = schema
659 .metadata()
660 .get(COMMENT_KEY)
661 .cloned()
662 .unwrap_or_default();
663
664 Ok(api::v1::ColumnDef {
665 name: schema.name.clone(),
666 data_type: datatype.0 as i32,
667 is_nullable: schema.is_nullable(),
668 default_constraint: match schema.default_constraint() {
669 None => vec![],
670 Some(v) => {
671 v.clone()
672 .try_into()
673 .context(ConvertColumnDefaultConstraintSnafu {
674 column_name: &schema.name,
675 })?
676 }
677 },
678 semantic_type,
679 comment,
680 datatype_extension: datatype.1,
681 options: options_from_column_schema(schema),
682 })
683 })
684 .collect()
685}
686
687#[derive(Debug, Clone, PartialEq, Eq)]
688pub struct RepartitionRequest {
689 pub catalog_name: String,
690 pub schema_name: String,
691 pub table_name: String,
692 pub from_exprs: Vec<Expr>,
693 pub into_exprs: Vec<Expr>,
694 pub options: OptionMap,
695}
696
697pub(crate) fn to_repartition_request(
698 alter_table: AlterTable,
699 query_ctx: &QueryContextRef,
700) -> Result<RepartitionRequest> {
701 let AlterTable {
702 table_name,
703 alter_operation,
704 options,
705 } = alter_table;
706
707 let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&table_name, query_ctx)
708 .map_err(BoxedError::new)
709 .context(ExternalSnafu)?;
710
711 let AlterTableOperation::Repartition { operation } = alter_operation else {
712 return InvalidSqlSnafu {
713 err_msg: "expected REPARTITION operation",
714 }
715 .fail();
716 };
717
718 Ok(RepartitionRequest {
719 catalog_name,
720 schema_name,
721 table_name,
722 from_exprs: operation.from_exprs,
723 into_exprs: operation.into_exprs,
724 options,
725 })
726}
727
728pub(crate) fn to_alter_table_expr(
730 alter_table: AlterTable,
731 query_ctx: &QueryContextRef,
732) -> Result<AlterTableExpr> {
733 let (catalog_name, schema_name, table_name) =
734 table_idents_to_full_name(alter_table.table_name(), query_ctx)
735 .map_err(BoxedError::new)
736 .context(ExternalSnafu)?;
737
738 let kind = match alter_table.alter_operation {
739 AlterTableOperation::AddConstraint(_) => {
740 return NotSupportedSnafu {
741 feat: "ADD CONSTRAINT",
742 }
743 .fail();
744 }
745 AlterTableOperation::AddColumns { add_columns } => AlterTableKind::AddColumns(AddColumns {
746 add_columns: add_columns
747 .into_iter()
748 .map(|add_column| {
749 let column_def = sql_column_def_to_grpc_column_def(
750 &add_column.column_def,
751 Some(&query_ctx.timezone()),
752 )
753 .map_err(BoxedError::new)
754 .context(ExternalSnafu)?;
755 if is_interval_type(&column_def.data_type()) {
756 return NotSupportedSnafu {
757 feat: "Add column with interval type",
758 }
759 .fail();
760 }
761 Ok(AddColumn {
762 column_def: Some(column_def),
763 location: add_column.location.as_ref().map(From::from),
764 add_if_not_exists: add_column.add_if_not_exists,
765 })
766 })
767 .collect::<Result<Vec<AddColumn>>>()?,
768 }),
769 AlterTableOperation::ModifyColumnType {
770 column_name,
771 target_type,
772 } => {
773 let target_type =
774 sql_data_type_to_concrete_data_type(&target_type, &Default::default())
775 .context(ParseSqlSnafu)?;
776 let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
777 .map(|w| w.to_parts())
778 .context(ColumnDataTypeSnafu)?;
779 if is_interval_type(&target_type) {
780 return NotSupportedSnafu {
781 feat: "Modify column type to interval type",
782 }
783 .fail();
784 }
785 AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
786 modify_column_types: vec![ModifyColumnType {
787 column_name: column_name.value,
788 target_type: target_type as i32,
789 target_type_extension,
790 }],
791 })
792 }
793 AlterTableOperation::DropColumn { name } => AlterTableKind::DropColumns(DropColumns {
794 drop_columns: vec![DropColumn {
795 name: name.value.clone(),
796 }],
797 }),
798 AlterTableOperation::RenameTable { new_table_name } => {
799 AlterTableKind::RenameTable(RenameTable {
800 new_table_name: new_table_name.clone(),
801 })
802 }
803 AlterTableOperation::SetTableOptions { options } => {
804 AlterTableKind::SetTableOptions(SetTableOptions {
805 table_options: options.into_iter().map(Into::into).collect(),
806 })
807 }
808 AlterTableOperation::UnsetTableOptions { keys } => {
809 AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys })
810 }
811 AlterTableOperation::Repartition { .. } => {
812 return NotSupportedSnafu {
813 feat: "ALTER TABLE ... REPARTITION",
814 }
815 .fail();
816 }
817 AlterTableOperation::SetIndex { options } => {
818 let option = match options {
819 sql::statements::alter::SetIndexOperation::Fulltext {
820 column_name,
821 options,
822 } => SetIndex {
823 options: Some(set_index::Options::Fulltext(SetFulltext {
824 column_name: column_name.value,
825 enable: options.enable,
826 analyzer: match options.analyzer {
827 FulltextAnalyzer::English => Analyzer::English.into(),
828 FulltextAnalyzer::Chinese => Analyzer::Chinese.into(),
829 },
830 case_sensitive: options.case_sensitive,
831 backend: match options.backend {
832 FulltextBackend::Bloom => PbFulltextBackend::Bloom.into(),
833 FulltextBackend::Tantivy => PbFulltextBackend::Tantivy.into(),
834 },
835 granularity: options.granularity as u64,
836 false_positive_rate: options.false_positive_rate(),
837 })),
838 },
839 sql::statements::alter::SetIndexOperation::Inverted { column_name } => SetIndex {
840 options: Some(set_index::Options::Inverted(SetInverted {
841 column_name: column_name.value,
842 })),
843 },
844 sql::statements::alter::SetIndexOperation::Skipping {
845 column_name,
846 options,
847 } => SetIndex {
848 options: Some(set_index::Options::Skipping(SetSkipping {
849 column_name: column_name.value,
850 enable: true,
851 granularity: options.granularity as u64,
852 false_positive_rate: options.false_positive_rate(),
853 skipping_index_type: match options.index_type {
854 SkippingIndexType::BloomFilter => {
855 PbSkippingIndexType::BloomFilter.into()
856 }
857 },
858 })),
859 },
860 };
861 AlterTableKind::SetIndexes(SetIndexes {
862 set_indexes: vec![option],
863 })
864 }
865 AlterTableOperation::UnsetIndex { options } => {
866 let option = match options {
867 sql::statements::alter::UnsetIndexOperation::Fulltext { column_name } => {
868 UnsetIndex {
869 options: Some(unset_index::Options::Fulltext(UnsetFulltext {
870 column_name: column_name.value,
871 })),
872 }
873 }
874 sql::statements::alter::UnsetIndexOperation::Inverted { column_name } => {
875 UnsetIndex {
876 options: Some(unset_index::Options::Inverted(UnsetInverted {
877 column_name: column_name.value,
878 })),
879 }
880 }
881 sql::statements::alter::UnsetIndexOperation::Skipping { column_name } => {
882 UnsetIndex {
883 options: Some(unset_index::Options::Skipping(UnsetSkipping {
884 column_name: column_name.value,
885 })),
886 }
887 }
888 };
889
890 AlterTableKind::UnsetIndexes(UnsetIndexes {
891 unset_indexes: vec![option],
892 })
893 }
894 AlterTableOperation::DropDefaults { columns } => {
895 AlterTableKind::DropDefaults(DropDefaults {
896 drop_defaults: columns
897 .into_iter()
898 .map(|col| {
899 let column_name = col.0.to_string();
900 Ok(api::v1::DropDefault { column_name })
901 })
902 .collect::<Result<Vec<_>>>()?,
903 })
904 }
905 AlterTableOperation::SetDefaults { defaults } => AlterTableKind::SetDefaults(SetDefaults {
906 set_defaults: defaults
907 .into_iter()
908 .map(|col| {
909 let column_name = col.column_name.to_string();
910 let default_constraint = serde_json::to_string(&col.default_constraint)
911 .context(EncodeJsonSnafu)?
912 .into_bytes();
913 Ok(api::v1::SetDefault {
914 column_name,
915 default_constraint,
916 })
917 })
918 .collect::<Result<Vec<_>>>()?,
919 }),
920 };
921
922 Ok(AlterTableExpr {
923 catalog_name,
924 schema_name,
925 table_name,
926 kind: Some(kind),
927 })
928}
929
930pub fn to_alter_database_expr(
932 alter_database: AlterDatabase,
933 query_ctx: &QueryContextRef,
934) -> Result<AlterDatabaseExpr> {
935 let catalog = query_ctx.current_catalog();
936 let schema = alter_database.database_name;
937
938 let kind = match alter_database.alter_operation {
939 AlterDatabaseOperation::SetDatabaseOption { options } => {
940 let options = options.into_iter().map(Into::into).collect();
941 AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
942 set_database_options: options,
943 })
944 }
945 AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
946 AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys })
947 }
948 };
949
950 Ok(AlterDatabaseExpr {
951 catalog_name: catalog.to_string(),
952 schema_name: schema.to_string(),
953 kind: Some(kind),
954 })
955}
956
957pub fn to_create_view_expr(
959 stmt: CreateView,
960 logical_plan: Vec<u8>,
961 table_names: Vec<TableName>,
962 columns: Vec<String>,
963 plan_columns: Vec<String>,
964 definition: String,
965 query_ctx: QueryContextRef,
966) -> Result<CreateViewExpr> {
967 let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx)
968 .map_err(BoxedError::new)
969 .context(ExternalSnafu)?;
970
971 let expr = CreateViewExpr {
972 catalog_name,
973 schema_name,
974 view_name,
975 logical_plan,
976 create_if_not_exists: stmt.if_not_exists,
977 or_replace: stmt.or_replace,
978 table_names,
979 columns,
980 plan_columns,
981 definition,
982 };
983
984 Ok(expr)
985}
986
987pub fn to_create_flow_task_expr(
988 create_flow: CreateFlow,
989 query_ctx: &QueryContextRef,
990) -> Result<CreateFlowExpr> {
991 let sink_table_ref = object_name_to_table_reference(create_flow.sink_table_name.clone(), true)
993 .with_context(|_| ConvertIdentifierSnafu {
994 ident: create_flow.sink_table_name.to_string(),
995 })?;
996 let catalog = sink_table_ref
997 .catalog()
998 .unwrap_or(query_ctx.current_catalog())
999 .to_string();
1000 let schema = sink_table_ref
1001 .schema()
1002 .map(|s| s.to_owned())
1003 .unwrap_or(query_ctx.current_schema());
1004
1005 let sink_table_name = TableName {
1006 catalog_name: catalog,
1007 schema_name: schema,
1008 table_name: sink_table_ref.table().to_string(),
1009 };
1010
1011 let source_table_names = extract_tables_from_query(&create_flow.query)
1012 .map(|name| {
1013 let reference =
1014 object_name_to_table_reference(name.clone(), true).with_context(|_| {
1015 ConvertIdentifierSnafu {
1016 ident: name.to_string(),
1017 }
1018 })?;
1019 let catalog = reference
1020 .catalog()
1021 .unwrap_or(query_ctx.current_catalog())
1022 .to_string();
1023 let schema = reference
1024 .schema()
1025 .map(|s| s.to_string())
1026 .unwrap_or(query_ctx.current_schema());
1027
1028 let table_name = TableName {
1029 catalog_name: catalog,
1030 schema_name: schema,
1031 table_name: reference.table().to_string(),
1032 };
1033 Ok(table_name)
1034 })
1035 .collect::<Result<Vec<_>>>()?;
1036
1037 let eval_interval = create_flow.eval_interval;
1038
1039 Ok(CreateFlowExpr {
1040 catalog_name: query_ctx.current_catalog().to_string(),
1041 flow_name: sanitize_flow_name(create_flow.flow_name)?,
1042 source_table_names,
1043 sink_table_name: Some(sink_table_name),
1044 or_replace: create_flow.or_replace,
1045 create_if_not_exists: create_flow.if_not_exists,
1046 expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
1047 eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }),
1048 comment: create_flow.comment.unwrap_or_default(),
1049 sql: create_flow.query.to_string(),
1050 flow_options: Default::default(),
1051 })
1052}
1053
1054fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
1056 ensure!(
1057 flow_name.0.len() == 1,
1058 InvalidFlowNameSnafu {
1059 name: flow_name.to_string(),
1060 }
1061 );
1062 Ok(flow_name.0.swap_remove(0).to_string_unquoted())
1064}
1065
1066#[cfg(test)]
1067mod tests {
1068 use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
1069 use datatypes::value::Value;
1070 use session::context::{QueryContext, QueryContextBuilder};
1071 use sql::dialect::GreptimeDbDialect;
1072 use sql::parser::{ParseOptions, ParserContext};
1073 use sql::statements::statement::Statement;
1074 use store_api::storage::ColumnDefaultConstraint;
1075
1076 use super::*;
1077
1078 #[test]
1079 fn test_create_flow_tql_expr() {
1080 let sql = r#"
1081CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1082TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#;
1083 let stmt =
1084 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1085
1086 assert!(
1087 stmt.is_err(),
1088 "Expected error for invalid TQL EVAL parameters: {:#?}",
1089 stmt
1090 );
1091
1092 let sql = r#"
1093CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1094TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests);"#;
1095 let stmt =
1096 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1097 .unwrap()
1098 .pop()
1099 .unwrap();
1100
1101 let Statement::CreateFlow(create_flow) = stmt else {
1102 unreachable!()
1103 };
1104 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1105
1106 let to_dot_sep =
1107 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1108 assert_eq!("calc_reqs", expr.flow_name);
1109 assert_eq!("greptime", expr.catalog_name);
1110 assert_eq!(
1111 "greptime.public.cnt_reqs",
1112 expr.sink_table_name.map(to_dot_sep).unwrap()
1113 );
1114 assert_eq!(1, expr.source_table_names.len());
1115 assert_eq!(
1116 "greptime.public.http_requests",
1117 to_dot_sep(expr.source_table_names[0].clone())
1118 );
1119 assert_eq!(
1120 r#"TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests)"#,
1121 expr.sql
1122 );
1123
1124 let sql = r#"
1125CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1126TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests{__schema__="greptime_private"});"#;
1127 let stmt =
1128 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1129 .unwrap()
1130 .pop()
1131 .unwrap();
1132 let Statement::CreateFlow(create_flow) = stmt else {
1133 unreachable!()
1134 };
1135 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1136 assert_eq!(1, expr.source_table_names.len());
1137 assert_eq!(
1138 "greptime.greptime_private.http_requests",
1139 to_dot_sep(expr.source_table_names[0].clone())
1140 );
1141
1142 let sql = r#"
1143CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1144TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests{__database__="greptime_private"});"#;
1145 let stmt =
1146 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1147 .unwrap()
1148 .pop()
1149 .unwrap();
1150 let Statement::CreateFlow(create_flow) = stmt else {
1151 unreachable!()
1152 };
1153 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1154 assert_eq!(1, expr.source_table_names.len());
1155 assert_eq!(
1156 "greptime.greptime_private.http_requests",
1157 to_dot_sep(expr.source_table_names[0].clone())
1158 );
1159 }
1160
1161 #[test]
1162 fn test_create_flow_tql_cte_source_tables() {
1163 let sql = r#"
1164CREATE FLOW calc_cte
1165SINK TO metric_cte_sink
1166EVAL INTERVAL '1m'
1167AS
1168WITH tql(ts, the_value) AS (
1169 TQL EVAL (now() - '1m'::interval, now(), '5s') metric_cte
1170)
1171SELECT * FROM tql;
1172"#;
1173
1174 let stmt =
1175 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1176 .unwrap()
1177 .pop()
1178 .unwrap();
1179
1180 let Statement::CreateFlow(create_flow) = stmt else {
1181 unreachable!()
1182 };
1183 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1184
1185 let to_dot_sep =
1186 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1187 assert_eq!(1, expr.source_table_names.len());
1188 assert_eq!(
1189 "greptime.public.metric_cte",
1190 to_dot_sep(expr.source_table_names[0].clone())
1191 );
1192 }
1193
1194 #[test]
1195 fn test_create_flow_tql_cte_source_tables_quoted_cte_name() {
1196 let sql = r#"
1197CREATE FLOW calc_cte
1198SINK TO metric_cte_sink
1199EVAL INTERVAL '1m'
1200AS
1201WITH "TQL"(ts, the_value) AS (
1202 TQL EVAL (now() - '1m'::interval, now(), '5s') metric_cte
1203)
1204SELECT * FROM "TQL";
1205"#;
1206
1207 let stmt =
1208 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1209 .unwrap()
1210 .pop()
1211 .unwrap();
1212
1213 let Statement::CreateFlow(create_flow) = stmt else {
1214 unreachable!()
1215 };
1216 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1217
1218 let to_dot_sep =
1219 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1220 assert_eq!(1, expr.source_table_names.len());
1221 assert_eq!(
1222 "greptime.public.metric_cte",
1223 to_dot_sep(expr.source_table_names[0].clone())
1224 );
1225 }
1226
1227 #[test]
1228 fn test_create_flow_tql_cte_source_tables_same_name() {
1229 let sql = r#"
1230CREATE FLOW calc_cte
1231SINK TO metric_cte_sink
1232EVAL INTERVAL '1m'
1233AS
1234WITH tql(ts, the_value) AS (
1235 TQL EVAL (now() - '1m'::interval, now(), '5s') tql
1236)
1237SELECT * FROM tql;
1238"#;
1239
1240 let stmt =
1241 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1242 .unwrap()
1243 .pop()
1244 .unwrap();
1245
1246 let Statement::CreateFlow(create_flow) = stmt else {
1247 unreachable!()
1248 };
1249 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1250
1251 let to_dot_sep =
1252 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1253 assert_eq!(1, expr.source_table_names.len());
1254 assert_eq!(
1255 "greptime.public.tql",
1256 to_dot_sep(expr.source_table_names[0].clone())
1257 );
1258 }
1259
1260 #[test]
1261 fn test_create_flow_expr() {
1262 let sql = r"
1263CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
1264SELECT
1265 DISTINCT number as dis
1266FROM
1267 distinct_basic;";
1268 let stmt =
1269 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1270 .unwrap()
1271 .pop()
1272 .unwrap();
1273
1274 let Statement::CreateFlow(create_flow) = stmt else {
1275 unreachable!()
1276 };
1277 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1278
1279 let to_dot_sep =
1280 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1281 assert_eq!("test_distinct_basic", expr.flow_name);
1282 assert_eq!("greptime", expr.catalog_name);
1283 assert_eq!(
1284 "greptime.public.out_distinct_basic",
1285 expr.sink_table_name.map(to_dot_sep).unwrap()
1286 );
1287 assert_eq!(1, expr.source_table_names.len());
1288 assert_eq!(
1289 "greptime.public.distinct_basic",
1290 to_dot_sep(expr.source_table_names[0].clone())
1291 );
1292 assert_eq!(
1293 r"SELECT
1294 DISTINCT number as dis
1295FROM
1296 distinct_basic",
1297 expr.sql
1298 );
1299
1300 let sql = r"
1301CREATE FLOW `task_2`
1302SINK TO schema_1.table_1
1303AS
1304SELECT max(c1), min(c2) FROM schema_2.table_2;";
1305 let stmt =
1306 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1307 .unwrap()
1308 .pop()
1309 .unwrap();
1310
1311 let Statement::CreateFlow(create_flow) = stmt else {
1312 unreachable!()
1313 };
1314 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1315
1316 let to_dot_sep =
1317 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1318 assert_eq!("task_2", expr.flow_name);
1319 assert_eq!("greptime", expr.catalog_name);
1320 assert_eq!(
1321 "greptime.schema_1.table_1",
1322 expr.sink_table_name.map(to_dot_sep).unwrap()
1323 );
1324 assert_eq!(1, expr.source_table_names.len());
1325 assert_eq!(
1326 "greptime.schema_2.table_2",
1327 to_dot_sep(expr.source_table_names[0].clone())
1328 );
1329 assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
1330
1331 let sql = r"
1332CREATE FLOW abc.`task_2`
1333SINK TO schema_1.table_1
1334AS
1335SELECT max(c1), min(c2) FROM schema_2.table_2;";
1336 let stmt =
1337 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1338 .unwrap()
1339 .pop()
1340 .unwrap();
1341
1342 let Statement::CreateFlow(create_flow) = stmt else {
1343 unreachable!()
1344 };
1345 let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
1346
1347 assert!(res.is_err());
1348 assert!(
1349 res.unwrap_err()
1350 .to_string()
1351 .contains("Invalid flow name: abc.`task_2`")
1352 );
1353 }
1354
1355 #[test]
1356 fn test_create_to_expr() {
1357 let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(ttl='3days', write_buffer_size='1024KB');";
1358 let stmt =
1359 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1360 .unwrap()
1361 .pop()
1362 .unwrap();
1363
1364 let Statement::CreateTable(create_table) = stmt else {
1365 unreachable!()
1366 };
1367 let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1368 assert_eq!("3days", expr.table_options.get("ttl").unwrap());
1369 assert_eq!(
1370 "1.0MiB",
1371 expr.table_options.get("write_buffer_size").unwrap()
1372 );
1373 }
1374
1375 #[test]
1376 fn test_invalid_create_to_expr() {
1377 let cases = [
1378 "CREATE TABLE monitor (host STRING primary key, ts TIMESTAMP TIME INDEX, some_column text, some_column string);",
1380 "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, some_column STRING, PRIMARY KEY (some_column, host, some_column));",
1382 "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, PRIMARY KEY (host, ts));",
1384 ];
1385
1386 for sql in cases {
1387 let stmt = ParserContext::create_with_dialect(
1388 sql,
1389 &GreptimeDbDialect {},
1390 ParseOptions::default(),
1391 )
1392 .unwrap()
1393 .pop()
1394 .unwrap();
1395 let Statement::CreateTable(create_table) = stmt else {
1396 unreachable!()
1397 };
1398 create_to_expr(&create_table, &QueryContext::arc()).unwrap_err();
1399 }
1400 }
1401
1402 #[test]
1403 fn test_create_to_expr_with_default_timestamp_value() {
1404 let sql = "CREATE TABLE monitor (v double,ts TIMESTAMP default '2024-01-30T00:01:01',TIME INDEX (ts)) engine=mito;";
1405 let stmt =
1406 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1407 .unwrap()
1408 .pop()
1409 .unwrap();
1410
1411 let Statement::CreateTable(create_table) = stmt else {
1412 unreachable!()
1413 };
1414
1415 let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1417 let ts_column = &expr.column_defs[1];
1418 let constraint = assert_ts_column(ts_column);
1419 assert!(
1420 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1421 if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1422 );
1423
1424 let ctx = QueryContextBuilder::default()
1426 .timezone(Timezone::from_tz_string("+08:00").unwrap())
1427 .build()
1428 .into();
1429 let expr = create_to_expr(&create_table, &ctx).unwrap();
1430 let ts_column = &expr.column_defs[1];
1431 let constraint = assert_ts_column(ts_column);
1432 assert!(
1433 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1434 if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1435 );
1436 }
1437
1438 fn assert_ts_column(ts_column: &api::v1::ColumnDef) -> ColumnDefaultConstraint {
1439 assert_eq!("ts", ts_column.name);
1440 assert_eq!(
1441 ColumnDataType::TimestampMillisecond as i32,
1442 ts_column.data_type
1443 );
1444 assert!(!ts_column.default_constraint.is_empty());
1445
1446 ColumnDefaultConstraint::try_from(&ts_column.default_constraint[..]).unwrap()
1447 }
1448
1449 #[test]
1450 fn test_to_alter_expr() {
1451 let sql = "ALTER DATABASE greptime SET key1='value1', key2='value2';";
1452 let stmt =
1453 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1454 .unwrap()
1455 .pop()
1456 .unwrap();
1457
1458 let Statement::AlterDatabase(alter_database) = stmt else {
1459 unreachable!()
1460 };
1461
1462 let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1463 let kind = expr.kind.unwrap();
1464
1465 let AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
1466 set_database_options,
1467 }) = kind
1468 else {
1469 unreachable!()
1470 };
1471
1472 assert_eq!(2, set_database_options.len());
1473 assert_eq!("key1", set_database_options[0].key);
1474 assert_eq!("value1", set_database_options[0].value);
1475 assert_eq!("key2", set_database_options[1].key);
1476 assert_eq!("value2", set_database_options[1].value);
1477
1478 let sql = "ALTER DATABASE greptime UNSET key1, key2;";
1479 let stmt =
1480 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1481 .unwrap()
1482 .pop()
1483 .unwrap();
1484
1485 let Statement::AlterDatabase(alter_database) = stmt else {
1486 unreachable!()
1487 };
1488
1489 let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1490 let kind = expr.kind.unwrap();
1491
1492 let AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys }) = kind else {
1493 unreachable!()
1494 };
1495
1496 assert_eq!(2, keys.len());
1497 assert!(keys.contains(&"key1".to_string()));
1498 assert!(keys.contains(&"key2".to_string()));
1499
1500 let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
1501 let stmt =
1502 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1503 .unwrap()
1504 .pop()
1505 .unwrap();
1506
1507 let Statement::AlterTable(alter_table) = stmt else {
1508 unreachable!()
1509 };
1510
1511 let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1513 let kind = expr.kind.unwrap();
1514
1515 let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1516 unreachable!()
1517 };
1518
1519 assert_eq!(1, add_columns.len());
1520 let ts_column = add_columns[0].column_def.clone().unwrap();
1521 let constraint = assert_ts_column(&ts_column);
1522 assert!(
1523 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1524 if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1525 );
1526
1527 let ctx = QueryContextBuilder::default()
1530 .timezone(Timezone::from_tz_string("+08:00").unwrap())
1531 .build()
1532 .into();
1533 let expr = to_alter_table_expr(alter_table, &ctx).unwrap();
1534 let kind = expr.kind.unwrap();
1535
1536 let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1537 unreachable!()
1538 };
1539
1540 assert_eq!(1, add_columns.len());
1541 let ts_column = add_columns[0].column_def.clone().unwrap();
1542 let constraint = assert_ts_column(&ts_column);
1543 assert!(
1544 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1545 if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1546 );
1547 }
1548
1549 #[test]
1550 fn test_to_alter_modify_column_type_expr() {
1551 let sql = "ALTER TABLE monitor MODIFY COLUMN mem_usage STRING;";
1552 let stmt =
1553 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1554 .unwrap()
1555 .pop()
1556 .unwrap();
1557
1558 let Statement::AlterTable(alter_table) = stmt else {
1559 unreachable!()
1560 };
1561
1562 let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1564 let kind = expr.kind.unwrap();
1565
1566 let AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
1567 modify_column_types,
1568 }) = kind
1569 else {
1570 unreachable!()
1571 };
1572
1573 assert_eq!(1, modify_column_types.len());
1574 let modify_column_type = &modify_column_types[0];
1575
1576 assert_eq!("mem_usage", modify_column_type.column_name);
1577 assert_eq!(
1578 ColumnDataType::String as i32,
1579 modify_column_type.target_type
1580 );
1581 assert!(modify_column_type.target_type_extension.is_none());
1582 }
1583
1584 #[test]
1585 fn test_to_repartition_request() {
1586 let sql = r#"
1587ALTER TABLE metrics REPARTITION (
1588 device_id < 100
1589) INTO (
1590 device_id < 100 AND area < 'South',
1591 device_id < 100 AND area >= 'South'
1592);"#;
1593 let stmt =
1594 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1595 .unwrap()
1596 .pop()
1597 .unwrap();
1598
1599 let Statement::AlterTable(alter_table) = stmt else {
1600 unreachable!()
1601 };
1602
1603 let request = to_repartition_request(alter_table, &QueryContext::arc()).unwrap();
1604 assert_eq!("greptime", request.catalog_name);
1605 assert_eq!("public", request.schema_name);
1606 assert_eq!("metrics", request.table_name);
1607 assert_eq!(
1608 request
1609 .from_exprs
1610 .into_iter()
1611 .map(|x| x.to_string())
1612 .collect::<Vec<_>>(),
1613 vec!["device_id < 100".to_string()]
1614 );
1615 assert_eq!(
1616 request
1617 .into_exprs
1618 .into_iter()
1619 .map(|x| x.to_string())
1620 .collect::<Vec<_>>(),
1621 vec![
1622 "device_id < 100 AND area < 'South'".to_string(),
1623 "device_id < 100 AND area >= 'South'".to_string()
1624 ]
1625 );
1626 }
1627
1628 fn new_test_table_names() -> Vec<TableName> {
1629 vec![
1630 TableName {
1631 catalog_name: "greptime".to_string(),
1632 schema_name: "public".to_string(),
1633 table_name: "a_table".to_string(),
1634 },
1635 TableName {
1636 catalog_name: "greptime".to_string(),
1637 schema_name: "public".to_string(),
1638 table_name: "b_table".to_string(),
1639 },
1640 ]
1641 }
1642
1643 #[test]
1644 fn test_to_create_view_expr() {
1645 let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
1646 let stmt =
1647 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1648 .unwrap()
1649 .pop()
1650 .unwrap();
1651
1652 let Statement::CreateView(stmt) = stmt else {
1653 unreachable!()
1654 };
1655
1656 let logical_plan = vec![1, 2, 3];
1657 let table_names = new_test_table_names();
1658 let columns = vec!["a".to_string()];
1659 let plan_columns = vec!["number".to_string()];
1660
1661 let expr = to_create_view_expr(
1662 stmt,
1663 logical_plan.clone(),
1664 table_names.clone(),
1665 columns.clone(),
1666 plan_columns.clone(),
1667 sql.to_string(),
1668 QueryContext::arc(),
1669 )
1670 .unwrap();
1671
1672 assert_eq!("greptime", expr.catalog_name);
1673 assert_eq!("public", expr.schema_name);
1674 assert_eq!("test", expr.view_name);
1675 assert!(!expr.create_if_not_exists);
1676 assert!(!expr.or_replace);
1677 assert_eq!(logical_plan, expr.logical_plan);
1678 assert_eq!(table_names, expr.table_names);
1679 assert_eq!(sql, expr.definition);
1680 assert_eq!(columns, expr.columns);
1681 assert_eq!(plan_columns, expr.plan_columns);
1682 }
1683
1684 #[test]
1685 fn test_to_create_view_expr_complex() {
1686 let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test.test_view AS SELECT * FROM NUMBERS";
1687 let stmt =
1688 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1689 .unwrap()
1690 .pop()
1691 .unwrap();
1692
1693 let Statement::CreateView(stmt) = stmt else {
1694 unreachable!()
1695 };
1696
1697 let logical_plan = vec![1, 2, 3];
1698 let table_names = new_test_table_names();
1699 let columns = vec!["a".to_string()];
1700 let plan_columns = vec!["number".to_string()];
1701
1702 let expr = to_create_view_expr(
1703 stmt,
1704 logical_plan.clone(),
1705 table_names.clone(),
1706 columns.clone(),
1707 plan_columns.clone(),
1708 sql.to_string(),
1709 QueryContext::arc(),
1710 )
1711 .unwrap();
1712
1713 assert_eq!("greptime", expr.catalog_name);
1714 assert_eq!("test", expr.schema_name);
1715 assert_eq!("test_view", expr.view_name);
1716 assert!(expr.create_if_not_exists);
1717 assert!(expr.or_replace);
1718 assert_eq!(logical_plan, expr.logical_plan);
1719 assert_eq!(table_names, expr.table_names);
1720 assert_eq!(sql, expr.definition);
1721 assert_eq!(columns, expr.columns);
1722 assert_eq!(plan_columns, expr.plan_columns);
1723 }
1724
1725 #[test]
1726 fn test_expr_to_create() {
1727 let sql = r#"CREATE TABLE IF NOT EXISTS `tt` (
1728 `timestamp` TIMESTAMP(9) NOT NULL,
1729 `ip_address` STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),
1730 `username` STRING NULL,
1731 `http_method` STRING NULL INVERTED INDEX,
1732 `request_line` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),
1733 `protocol` STRING NULL,
1734 `status_code` INT NULL INVERTED INDEX,
1735 `response_size` BIGINT NULL,
1736 `message` STRING NULL,
1737 TIME INDEX (`timestamp`),
1738 PRIMARY KEY (`username`, `status_code`)
1739)
1740ENGINE=mito
1741WITH(
1742 append_mode = 'true'
1743)"#;
1744 let stmt =
1745 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1746 .unwrap()
1747 .pop()
1748 .unwrap();
1749
1750 let Statement::CreateTable(original_create) = stmt else {
1751 unreachable!()
1752 };
1753
1754 let expr = create_to_expr(&original_create, &QueryContext::arc()).unwrap();
1756
1757 let create_table = expr_to_create(&expr, Some('`')).unwrap();
1758 let new_sql = format!("{:#}", create_table);
1759 assert_eq!(sql, new_sql);
1760 }
1761}