1#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19
20use api::helper::ColumnDataTypeWrapper;
21use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
22use api::v1::alter_table_expr::Kind as AlterTableKind;
23use api::v1::column_def::{options_from_column_schema, try_as_column_schema};
24use api::v1::{
25 AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer, ColumnDataType,
26 ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn,
27 DropColumns, DropDefaults, ExpireAfter, FulltextBackend as PbFulltextBackend, ModifyColumnType,
28 ModifyColumnTypes, RenameTable, SemanticType, SetDatabaseOptions, SetDefaults, SetFulltext,
29 SetIndex, SetIndexes, SetInverted, SetSkipping, SetTableOptions,
30 SkippingIndexType as PbSkippingIndexType, TableName, UnsetDatabaseOptions, UnsetFulltext,
31 UnsetIndex, UnsetIndexes, UnsetInverted, UnsetSkipping, UnsetTableOptions, set_index,
32 unset_index,
33};
34use common_error::ext::BoxedError;
35use common_grpc_expr::util::ColumnExpr;
36use common_time::Timezone;
37use datafusion::sql::planner::object_name_to_table_reference;
38use datatypes::schema::{
39 COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
40 COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
41 COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
42 COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
43 ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema,
44 SkippingIndexType,
45};
46use file_engine::FileOptions;
47use query::sql::{
48 check_file_to_table_schema_compatibility, file_column_schemas_to_table,
49 infer_file_table_schema, prepare_file_table_files,
50};
51use session::context::QueryContextRef;
52use session::table_name::table_idents_to_full_name;
53use snafu::{OptionExt, ResultExt, ensure};
54use sql::ast::{
55 ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName, ObjectNamePartExt,
56};
57use sql::dialect::GreptimeDbDialect;
58use sql::parser::ParserContext;
59use sql::statements::alter::{
60 AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
61};
62use sql::statements::create::{
63 Column as SqlColumn, ColumnExtensions, CreateExternalTable, CreateFlow, CreateTable,
64 CreateView, TableConstraint,
65};
66use sql::statements::{
67 OptionMap, column_to_schema, concrete_data_type_to_sql_data_type,
68 sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type, value_to_sql_value,
69};
70use sql::util::extract_tables_from_query;
71use store_api::mito_engine_options::{COMPACTION_OVERRIDE, COMPACTION_TYPE};
72use table::requests::{FILE_TABLE_META_KEY, TableOptions};
73use table::table_reference::TableReference;
74#[cfg(feature = "enterprise")]
75pub use trigger::to_create_trigger_task_expr;
76
77use crate::error::{
78 BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
79 ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, FindNewColumnsOnInsertionSnafu,
80 IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidColumnDefSnafu,
81 InvalidFlowNameSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, ParseSqlValueSnafu,
82 PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu,
83};
84
85pub fn create_table_expr_by_column_schemas(
86 table_name: &TableReference<'_>,
87 column_schemas: &[api::v1::ColumnSchema],
88 engine: &str,
89 desc: Option<&str>,
90) -> Result<CreateTableExpr> {
91 let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
92 let expr = common_grpc_expr::util::build_create_table_expr(
93 None,
94 table_name,
95 column_exprs,
96 engine,
97 desc.unwrap_or("Created on insertion"),
98 )
99 .context(BuildCreateExprOnInsertionSnafu)?;
100
101 validate_create_expr(&expr)?;
102 Ok(expr)
103}
104
105pub fn extract_add_columns_expr(
106 schema: &Schema,
107 column_exprs: Vec<ColumnExpr>,
108) -> Result<Option<AddColumns>> {
109 let add_columns = common_grpc_expr::util::extract_new_columns(schema, column_exprs)
110 .context(FindNewColumnsOnInsertionSnafu)?;
111 if let Some(add_columns) = &add_columns {
112 validate_add_columns_expr(add_columns)?;
113 }
114 Ok(add_columns)
115}
116
117pub(crate) async fn create_external_expr(
141 create: CreateExternalTable,
142 query_ctx: &QueryContextRef,
143) -> Result<CreateTableExpr> {
144 let (catalog_name, schema_name, table_name) =
145 table_idents_to_full_name(&create.name, query_ctx)
146 .map_err(BoxedError::new)
147 .context(ExternalSnafu)?;
148
149 let mut table_options = create.options.into_map();
150
151 let (object_store, files) = prepare_file_table_files(&table_options)
152 .await
153 .context(PrepareFileTableSnafu)?;
154
155 let file_column_schemas = infer_file_table_schema(&object_store, &files, &table_options)
156 .await
157 .context(InferFileTableSchemaSnafu)?
158 .column_schemas()
159 .to_vec();
160
161 let (time_index, primary_keys, table_column_schemas) = if !create.columns.is_empty() {
162 let time_index = find_time_index(&create.constraints)?;
164 let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
165 let column_schemas =
166 columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
167 (time_index, primary_keys, column_schemas)
168 } else {
169 let (column_schemas, time_index) = file_column_schemas_to_table(&file_column_schemas);
171 let primary_keys = vec![];
172 (time_index, primary_keys, column_schemas)
173 };
174
175 check_file_to_table_schema_compatibility(&file_column_schemas, &table_column_schemas)
176 .context(SchemaIncompatibleSnafu)?;
177
178 let meta = FileOptions {
179 files,
180 file_column_schemas,
181 };
182 table_options.insert(
183 FILE_TABLE_META_KEY.to_string(),
184 serde_json::to_string(&meta).context(EncodeJsonSnafu)?,
185 );
186
187 let column_defs = column_schemas_to_defs(table_column_schemas, &primary_keys)?;
188 let expr = CreateTableExpr {
189 catalog_name,
190 schema_name,
191 table_name,
192 desc: String::default(),
193 column_defs,
194 time_index,
195 primary_keys,
196 create_if_not_exists: create.if_not_exists,
197 table_options,
198 table_id: None,
199 engine: create.engine.clone(),
200 };
201
202 Ok(expr)
203}
204
205pub fn create_to_expr(
207 create: &CreateTable,
208 query_ctx: &QueryContextRef,
209) -> Result<CreateTableExpr> {
210 let (catalog_name, schema_name, table_name) =
211 table_idents_to_full_name(&create.name, query_ctx)
212 .map_err(BoxedError::new)
213 .context(ExternalSnafu)?;
214
215 let time_index = find_time_index(&create.constraints)?;
216 let table_options = HashMap::from(
217 &TableOptions::try_from_iter(create.options.to_str_map())
218 .context(UnrecognizedTableOptionSnafu)?,
219 );
220
221 let mut table_options = table_options;
222 if table_options.contains_key(COMPACTION_TYPE) {
223 table_options.insert(COMPACTION_OVERRIDE.to_string(), "true".to_string());
224 }
225
226 let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
227
228 let expr = CreateTableExpr {
229 catalog_name,
230 schema_name,
231 table_name,
232 desc: String::default(),
233 column_defs: columns_to_expr(
234 &create.columns,
235 &time_index,
236 &primary_keys,
237 Some(&query_ctx.timezone()),
238 )?,
239 time_index,
240 primary_keys,
241 create_if_not_exists: create.if_not_exists,
242 table_options,
243 table_id: None,
244 engine: create.engine.clone(),
245 };
246
247 validate_create_expr(&expr)?;
248 Ok(expr)
249}
250
251pub fn expr_to_create(expr: &CreateTableExpr, quote_style: Option<char>) -> Result<CreateTable> {
259 let quote_style = quote_style.unwrap_or('`');
260
261 let table_name = ObjectName(vec![sql::ast::ObjectNamePart::Identifier(
263 sql::ast::Ident::with_quote(quote_style, &expr.table_name),
264 )]);
265
266 let mut columns = Vec::with_capacity(expr.column_defs.len());
268 for column_def in &expr.column_defs {
269 let column_schema = try_as_column_schema(column_def).context(InvalidColumnDefSnafu {
270 column: &column_def.name,
271 })?;
272
273 let mut options = Vec::new();
274
275 if column_def.is_nullable {
277 options.push(ColumnOptionDef {
278 name: None,
279 option: ColumnOption::Null,
280 });
281 } else {
282 options.push(ColumnOptionDef {
283 name: None,
284 option: ColumnOption::NotNull,
285 });
286 }
287
288 if let Some(default_constraint) = column_schema.default_constraint() {
290 let expr = match default_constraint {
291 ColumnDefaultConstraint::Value(v) => {
292 Expr::Value(value_to_sql_value(v).context(ParseSqlValueSnafu)?.into())
293 }
294 ColumnDefaultConstraint::Function(func_expr) => {
295 ParserContext::parse_function(func_expr, &GreptimeDbDialect {})
296 .context(ParseSqlSnafu)?
297 }
298 };
299 options.push(ColumnOptionDef {
300 name: None,
301 option: ColumnOption::Default(expr),
302 });
303 }
304
305 if !column_def.comment.is_empty() {
307 options.push(ColumnOptionDef {
308 name: None,
309 option: ColumnOption::Comment(column_def.comment.clone()),
310 });
311 }
312
313 let mut extensions = ColumnExtensions::default();
318
319 if let Ok(Some(opt)) = column_schema.fulltext_options()
321 && opt.enable
322 {
323 let mut map = HashMap::from([
324 (
325 COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
326 opt.analyzer.to_string(),
327 ),
328 (
329 COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
330 opt.case_sensitive.to_string(),
331 ),
332 (
333 COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
334 opt.backend.to_string(),
335 ),
336 ]);
337 if opt.backend == FulltextBackend::Bloom {
338 map.insert(
339 COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(),
340 opt.granularity.to_string(),
341 );
342 map.insert(
343 COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
344 opt.false_positive_rate().to_string(),
345 );
346 }
347 extensions.fulltext_index_options = Some(map.into());
348 }
349
350 if let Ok(Some(opt)) = column_schema.skipping_index_options() {
352 let map = HashMap::from([
353 (
354 COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
355 opt.granularity.to_string(),
356 ),
357 (
358 COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
359 opt.false_positive_rate().to_string(),
360 ),
361 (
362 COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
363 opt.index_type.to_string(),
364 ),
365 ]);
366 extensions.skipping_index_options = Some(map.into());
367 }
368
369 if column_schema.is_inverted_indexed() {
371 extensions.inverted_index_options = Some(HashMap::new().into());
372 }
373
374 let sql_column = SqlColumn {
375 column_def: ColumnDef {
376 name: Ident::with_quote(quote_style, &column_def.name),
377 data_type: concrete_data_type_to_sql_data_type(&column_schema.data_type)
378 .context(ParseSqlSnafu)?,
379 options,
380 },
381 extensions,
382 };
383
384 columns.push(sql_column);
385 }
386
387 let mut constraints = Vec::new();
389
390 constraints.push(TableConstraint::TimeIndex {
392 column: Ident::with_quote(quote_style, &expr.time_index),
393 });
394
395 if !expr.primary_keys.is_empty() {
397 let primary_key_columns: Vec<Ident> = expr
398 .primary_keys
399 .iter()
400 .map(|pk| Ident::with_quote(quote_style, pk))
401 .collect();
402
403 constraints.push(TableConstraint::PrimaryKey {
404 columns: primary_key_columns,
405 });
406 }
407
408 let mut options = OptionMap::default();
410 for (key, value) in &expr.table_options {
411 options.insert(key.clone(), value.clone());
412 }
413
414 Ok(CreateTable {
415 if_not_exists: expr.create_if_not_exists,
416 table_id: expr.table_id.as_ref().map(|tid| tid.id).unwrap_or(0),
417 name: table_name,
418 columns,
419 engine: expr.engine.clone(),
420 constraints,
421 options,
422 partitions: None,
423 })
424}
425
426pub fn validate_create_expr(create: &CreateTableExpr) -> Result<()> {
428 let mut column_to_indices = HashMap::with_capacity(create.column_defs.len());
430 for (idx, column) in create.column_defs.iter().enumerate() {
431 if let Some(indices) = column_to_indices.get(&column.name) {
432 return InvalidSqlSnafu {
433 err_msg: format!(
434 "column name `{}` is duplicated at index {} and {}",
435 column.name, indices, idx
436 ),
437 }
438 .fail();
439 }
440 column_to_indices.insert(&column.name, idx);
441 }
442
443 let _ = column_to_indices
445 .get(&create.time_index)
446 .with_context(|| InvalidSqlSnafu {
447 err_msg: format!(
448 "column name `{}` is not found in column list",
449 create.time_index
450 ),
451 })?;
452
453 for pk in &create.primary_keys {
455 let _ = column_to_indices
456 .get(&pk)
457 .with_context(|| InvalidSqlSnafu {
458 err_msg: format!("column name `{}` is not found in column list", pk),
459 })?;
460 }
461
462 let mut pk_set = HashSet::new();
464 for pk in &create.primary_keys {
465 if !pk_set.insert(pk) {
466 return InvalidSqlSnafu {
467 err_msg: format!("column name `{}` is duplicated in primary keys", pk),
468 }
469 .fail();
470 }
471 }
472
473 if pk_set.contains(&create.time_index) {
475 return InvalidSqlSnafu {
476 err_msg: format!(
477 "column name `{}` is both primary key and time index",
478 create.time_index
479 ),
480 }
481 .fail();
482 }
483
484 for column in &create.column_defs {
485 if is_interval_type(&column.data_type()) {
487 return InvalidSqlSnafu {
488 err_msg: format!(
489 "column name `{}` is interval type, which is not supported",
490 column.name
491 ),
492 }
493 .fail();
494 }
495 if is_date_time_type(&column.data_type()) {
497 return InvalidSqlSnafu {
498 err_msg: format!(
499 "column name `{}` is datetime type, which is not supported, please use `timestamp` type instead",
500 column.name
501 ),
502 }
503 .fail();
504 }
505 }
506 Ok(())
507}
508
509fn validate_add_columns_expr(add_columns: &AddColumns) -> Result<()> {
510 for add_column in &add_columns.add_columns {
511 let Some(column_def) = &add_column.column_def else {
512 continue;
513 };
514 if is_date_time_type(&column_def.data_type()) {
515 return InvalidSqlSnafu {
516 err_msg: format!("column name `{}` is datetime type, which is not supported, please use `timestamp` type instead", column_def.name),
517 }
518 .fail();
519 }
520 if is_interval_type(&column_def.data_type()) {
521 return InvalidSqlSnafu {
522 err_msg: format!(
523 "column name `{}` is interval type, which is not supported",
524 column_def.name
525 ),
526 }
527 .fail();
528 }
529 }
530 Ok(())
531}
532
533fn is_date_time_type(data_type: &ColumnDataType) -> bool {
534 matches!(data_type, ColumnDataType::Datetime)
535}
536
537fn is_interval_type(data_type: &ColumnDataType) -> bool {
538 matches!(
539 data_type,
540 ColumnDataType::IntervalYearMonth
541 | ColumnDataType::IntervalDayTime
542 | ColumnDataType::IntervalMonthDayNano
543 )
544}
545
546fn find_primary_keys(
547 columns: &[SqlColumn],
548 constraints: &[TableConstraint],
549) -> Result<Vec<String>> {
550 let columns_pk = columns
551 .iter()
552 .filter_map(|x| {
553 if x.options().iter().any(|o| {
554 matches!(
555 o.option,
556 ColumnOption::Unique {
557 is_primary: true,
558 ..
559 }
560 )
561 }) {
562 Some(x.name().value.clone())
563 } else {
564 None
565 }
566 })
567 .collect::<Vec<String>>();
568
569 ensure!(
570 columns_pk.len() <= 1,
571 IllegalPrimaryKeysDefSnafu {
572 msg: "not allowed to inline multiple primary keys in columns options"
573 }
574 );
575
576 let constraints_pk = constraints
577 .iter()
578 .filter_map(|constraint| match constraint {
579 TableConstraint::PrimaryKey { columns, .. } => {
580 Some(columns.iter().map(|ident| ident.value.clone()))
581 }
582 _ => None,
583 })
584 .flatten()
585 .collect::<Vec<String>>();
586
587 ensure!(
588 columns_pk.is_empty() || constraints_pk.is_empty(),
589 IllegalPrimaryKeysDefSnafu {
590 msg: "found definitions of primary keys in multiple places"
591 }
592 );
593
594 let mut primary_keys = Vec::with_capacity(columns_pk.len() + constraints_pk.len());
595 primary_keys.extend(columns_pk);
596 primary_keys.extend(constraints_pk);
597 Ok(primary_keys)
598}
599
600pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
601 let time_index = constraints
602 .iter()
603 .filter_map(|constraint| match constraint {
604 TableConstraint::TimeIndex { column, .. } => Some(&column.value),
605 _ => None,
606 })
607 .collect::<Vec<&String>>();
608 ensure!(
609 time_index.len() == 1,
610 InvalidSqlSnafu {
611 err_msg: "must have one and only one TimeIndex columns",
612 }
613 );
614 Ok(time_index[0].clone())
615}
616
617fn columns_to_expr(
618 column_defs: &[SqlColumn],
619 time_index: &str,
620 primary_keys: &[String],
621 timezone: Option<&Timezone>,
622) -> Result<Vec<api::v1::ColumnDef>> {
623 let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
624 column_schemas_to_defs(column_schemas, primary_keys)
625}
626
627fn columns_to_column_schemas(
628 columns: &[SqlColumn],
629 time_index: &str,
630 timezone: Option<&Timezone>,
631) -> Result<Vec<ColumnSchema>> {
632 columns
633 .iter()
634 .map(|c| column_to_schema(c, time_index, timezone).context(ParseSqlSnafu))
635 .collect::<Result<Vec<ColumnSchema>>>()
636}
637
638pub fn column_schemas_to_defs(
640 column_schemas: Vec<ColumnSchema>,
641 primary_keys: &[String],
642) -> Result<Vec<api::v1::ColumnDef>> {
643 let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
644 .iter()
645 .map(|c| {
646 ColumnDataTypeWrapper::try_from(c.data_type.clone())
647 .map(|w| w.to_parts())
648 .context(ColumnDataTypeSnafu)
649 })
650 .collect::<Result<Vec<_>>>()?;
651
652 column_schemas
653 .iter()
654 .zip(column_datatypes)
655 .map(|(schema, datatype)| {
656 let semantic_type = if schema.is_time_index() {
657 SemanticType::Timestamp
658 } else if primary_keys.contains(&schema.name) {
659 SemanticType::Tag
660 } else {
661 SemanticType::Field
662 } as i32;
663 let comment = schema
664 .metadata()
665 .get(COMMENT_KEY)
666 .cloned()
667 .unwrap_or_default();
668
669 Ok(api::v1::ColumnDef {
670 name: schema.name.clone(),
671 data_type: datatype.0 as i32,
672 is_nullable: schema.is_nullable(),
673 default_constraint: match schema.default_constraint() {
674 None => vec![],
675 Some(v) => {
676 v.clone()
677 .try_into()
678 .context(ConvertColumnDefaultConstraintSnafu {
679 column_name: &schema.name,
680 })?
681 }
682 },
683 semantic_type,
684 comment,
685 datatype_extension: datatype.1,
686 options: options_from_column_schema(schema),
687 })
688 })
689 .collect()
690}
691
692#[derive(Debug, Clone, PartialEq, Eq)]
693pub struct RepartitionRequest {
694 pub catalog_name: String,
695 pub schema_name: String,
696 pub table_name: String,
697 pub from_exprs: Vec<Expr>,
698 pub into_exprs: Vec<Expr>,
699 pub options: OptionMap,
700}
701
702pub(crate) fn to_repartition_request(
703 alter_table: AlterTable,
704 query_ctx: &QueryContextRef,
705) -> Result<RepartitionRequest> {
706 let AlterTable {
707 table_name,
708 alter_operation,
709 options,
710 } = alter_table;
711
712 let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&table_name, query_ctx)
713 .map_err(BoxedError::new)
714 .context(ExternalSnafu)?;
715
716 let AlterTableOperation::Repartition { operation } = alter_operation else {
717 return InvalidSqlSnafu {
718 err_msg: "expected REPARTITION operation",
719 }
720 .fail();
721 };
722
723 Ok(RepartitionRequest {
724 catalog_name,
725 schema_name,
726 table_name,
727 from_exprs: operation.from_exprs,
728 into_exprs: operation.into_exprs,
729 options,
730 })
731}
732
733pub(crate) fn to_alter_table_expr(
735 alter_table: AlterTable,
736 query_ctx: &QueryContextRef,
737) -> Result<AlterTableExpr> {
738 let (catalog_name, schema_name, table_name) =
739 table_idents_to_full_name(alter_table.table_name(), query_ctx)
740 .map_err(BoxedError::new)
741 .context(ExternalSnafu)?;
742
743 let kind = match alter_table.alter_operation {
744 AlterTableOperation::AddConstraint(_) => {
745 return NotSupportedSnafu {
746 feat: "ADD CONSTRAINT",
747 }
748 .fail();
749 }
750 AlterTableOperation::AddColumns { add_columns } => AlterTableKind::AddColumns(AddColumns {
751 add_columns: add_columns
752 .into_iter()
753 .map(|add_column| {
754 let column_def = sql_column_def_to_grpc_column_def(
755 &add_column.column_def,
756 Some(&query_ctx.timezone()),
757 )
758 .map_err(BoxedError::new)
759 .context(ExternalSnafu)?;
760 if is_interval_type(&column_def.data_type()) {
761 return NotSupportedSnafu {
762 feat: "Add column with interval type",
763 }
764 .fail();
765 }
766 Ok(AddColumn {
767 column_def: Some(column_def),
768 location: add_column.location.as_ref().map(From::from),
769 add_if_not_exists: add_column.add_if_not_exists,
770 })
771 })
772 .collect::<Result<Vec<AddColumn>>>()?,
773 }),
774 AlterTableOperation::ModifyColumnType {
775 column_name,
776 target_type,
777 } => {
778 let target_type =
779 sql_data_type_to_concrete_data_type(&target_type, &Default::default())
780 .context(ParseSqlSnafu)?;
781 let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
782 .map(|w| w.to_parts())
783 .context(ColumnDataTypeSnafu)?;
784 if is_interval_type(&target_type) {
785 return NotSupportedSnafu {
786 feat: "Modify column type to interval type",
787 }
788 .fail();
789 }
790 AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
791 modify_column_types: vec![ModifyColumnType {
792 column_name: column_name.value,
793 target_type: target_type as i32,
794 target_type_extension,
795 }],
796 })
797 }
798 AlterTableOperation::DropColumn { name } => AlterTableKind::DropColumns(DropColumns {
799 drop_columns: vec![DropColumn {
800 name: name.value.clone(),
801 }],
802 }),
803 AlterTableOperation::RenameTable { new_table_name } => {
804 AlterTableKind::RenameTable(RenameTable {
805 new_table_name: new_table_name.clone(),
806 })
807 }
808 AlterTableOperation::SetTableOptions { options } => {
809 AlterTableKind::SetTableOptions(SetTableOptions {
810 table_options: options.into_iter().map(Into::into).collect(),
811 })
812 }
813 AlterTableOperation::UnsetTableOptions { keys } => {
814 AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys })
815 }
816 AlterTableOperation::Repartition { .. } => {
817 return NotSupportedSnafu {
818 feat: "ALTER TABLE ... REPARTITION",
819 }
820 .fail();
821 }
822 AlterTableOperation::SetIndex { options } => {
823 let option = match options {
824 sql::statements::alter::SetIndexOperation::Fulltext {
825 column_name,
826 options,
827 } => SetIndex {
828 options: Some(set_index::Options::Fulltext(SetFulltext {
829 column_name: column_name.value,
830 enable: options.enable,
831 analyzer: match options.analyzer {
832 FulltextAnalyzer::English => Analyzer::English.into(),
833 FulltextAnalyzer::Chinese => Analyzer::Chinese.into(),
834 },
835 case_sensitive: options.case_sensitive,
836 backend: match options.backend {
837 FulltextBackend::Bloom => PbFulltextBackend::Bloom.into(),
838 FulltextBackend::Tantivy => PbFulltextBackend::Tantivy.into(),
839 },
840 granularity: options.granularity as u64,
841 false_positive_rate: options.false_positive_rate(),
842 })),
843 },
844 sql::statements::alter::SetIndexOperation::Inverted { column_name } => SetIndex {
845 options: Some(set_index::Options::Inverted(SetInverted {
846 column_name: column_name.value,
847 })),
848 },
849 sql::statements::alter::SetIndexOperation::Skipping {
850 column_name,
851 options,
852 } => SetIndex {
853 options: Some(set_index::Options::Skipping(SetSkipping {
854 column_name: column_name.value,
855 enable: true,
856 granularity: options.granularity as u64,
857 false_positive_rate: options.false_positive_rate(),
858 skipping_index_type: match options.index_type {
859 SkippingIndexType::BloomFilter => {
860 PbSkippingIndexType::BloomFilter.into()
861 }
862 },
863 })),
864 },
865 };
866 AlterTableKind::SetIndexes(SetIndexes {
867 set_indexes: vec![option],
868 })
869 }
870 AlterTableOperation::UnsetIndex { options } => {
871 let option = match options {
872 sql::statements::alter::UnsetIndexOperation::Fulltext { column_name } => {
873 UnsetIndex {
874 options: Some(unset_index::Options::Fulltext(UnsetFulltext {
875 column_name: column_name.value,
876 })),
877 }
878 }
879 sql::statements::alter::UnsetIndexOperation::Inverted { column_name } => {
880 UnsetIndex {
881 options: Some(unset_index::Options::Inverted(UnsetInverted {
882 column_name: column_name.value,
883 })),
884 }
885 }
886 sql::statements::alter::UnsetIndexOperation::Skipping { column_name } => {
887 UnsetIndex {
888 options: Some(unset_index::Options::Skipping(UnsetSkipping {
889 column_name: column_name.value,
890 })),
891 }
892 }
893 };
894
895 AlterTableKind::UnsetIndexes(UnsetIndexes {
896 unset_indexes: vec![option],
897 })
898 }
899 AlterTableOperation::DropDefaults { columns } => {
900 AlterTableKind::DropDefaults(DropDefaults {
901 drop_defaults: columns
902 .into_iter()
903 .map(|col| {
904 let column_name = col.0.to_string();
905 Ok(api::v1::DropDefault { column_name })
906 })
907 .collect::<Result<Vec<_>>>()?,
908 })
909 }
910 AlterTableOperation::SetDefaults { defaults } => AlterTableKind::SetDefaults(SetDefaults {
911 set_defaults: defaults
912 .into_iter()
913 .map(|col| {
914 let column_name = col.column_name.to_string();
915 let default_constraint = serde_json::to_string(&col.default_constraint)
916 .context(EncodeJsonSnafu)?
917 .into_bytes();
918 Ok(api::v1::SetDefault {
919 column_name,
920 default_constraint,
921 })
922 })
923 .collect::<Result<Vec<_>>>()?,
924 }),
925 };
926
927 Ok(AlterTableExpr {
928 catalog_name,
929 schema_name,
930 table_name,
931 kind: Some(kind),
932 })
933}
934
935pub fn to_alter_database_expr(
937 alter_database: AlterDatabase,
938 query_ctx: &QueryContextRef,
939) -> Result<AlterDatabaseExpr> {
940 let catalog = query_ctx.current_catalog();
941 let schema = alter_database.database_name;
942
943 let kind = match alter_database.alter_operation {
944 AlterDatabaseOperation::SetDatabaseOption { options } => {
945 let options = options.into_iter().map(Into::into).collect();
946 AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
947 set_database_options: options,
948 })
949 }
950 AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
951 AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys })
952 }
953 };
954
955 Ok(AlterDatabaseExpr {
956 catalog_name: catalog.to_string(),
957 schema_name: schema.to_string(),
958 kind: Some(kind),
959 })
960}
961
962pub fn to_create_view_expr(
964 stmt: CreateView,
965 logical_plan: Vec<u8>,
966 table_names: Vec<TableName>,
967 columns: Vec<String>,
968 plan_columns: Vec<String>,
969 definition: String,
970 query_ctx: QueryContextRef,
971) -> Result<CreateViewExpr> {
972 let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx)
973 .map_err(BoxedError::new)
974 .context(ExternalSnafu)?;
975
976 let expr = CreateViewExpr {
977 catalog_name,
978 schema_name,
979 view_name,
980 logical_plan,
981 create_if_not_exists: stmt.if_not_exists,
982 or_replace: stmt.or_replace,
983 table_names,
984 columns,
985 plan_columns,
986 definition,
987 };
988
989 Ok(expr)
990}
991
992pub fn to_create_flow_task_expr(
993 create_flow: CreateFlow,
994 query_ctx: &QueryContextRef,
995) -> Result<CreateFlowExpr> {
996 let sink_table_ref = object_name_to_table_reference(create_flow.sink_table_name.clone(), true)
998 .with_context(|_| ConvertIdentifierSnafu {
999 ident: create_flow.sink_table_name.to_string(),
1000 })?;
1001 let catalog = sink_table_ref
1002 .catalog()
1003 .unwrap_or(query_ctx.current_catalog())
1004 .to_string();
1005 let schema = sink_table_ref
1006 .schema()
1007 .map(|s| s.to_owned())
1008 .unwrap_or(query_ctx.current_schema());
1009
1010 let sink_table_name = TableName {
1011 catalog_name: catalog,
1012 schema_name: schema,
1013 table_name: sink_table_ref.table().to_string(),
1014 };
1015
1016 let source_table_names = extract_tables_from_query(&create_flow.query)
1017 .map(|name| {
1018 let reference =
1019 object_name_to_table_reference(name.clone(), true).with_context(|_| {
1020 ConvertIdentifierSnafu {
1021 ident: name.to_string(),
1022 }
1023 })?;
1024 let catalog = reference
1025 .catalog()
1026 .unwrap_or(query_ctx.current_catalog())
1027 .to_string();
1028 let schema = reference
1029 .schema()
1030 .map(|s| s.to_string())
1031 .unwrap_or(query_ctx.current_schema());
1032
1033 let table_name = TableName {
1034 catalog_name: catalog,
1035 schema_name: schema,
1036 table_name: reference.table().to_string(),
1037 };
1038 Ok(table_name)
1039 })
1040 .collect::<Result<Vec<_>>>()?;
1041
1042 let eval_interval = create_flow.eval_interval;
1043
1044 Ok(CreateFlowExpr {
1045 catalog_name: query_ctx.current_catalog().to_string(),
1046 flow_name: sanitize_flow_name(create_flow.flow_name)?,
1047 source_table_names,
1048 sink_table_name: Some(sink_table_name),
1049 or_replace: create_flow.or_replace,
1050 create_if_not_exists: create_flow.if_not_exists,
1051 expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
1052 eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }),
1053 comment: create_flow.comment.unwrap_or_default(),
1054 sql: create_flow.query.to_string(),
1055 flow_options: Default::default(),
1056 })
1057}
1058
1059fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
1061 ensure!(
1062 flow_name.0.len() == 1,
1063 InvalidFlowNameSnafu {
1064 name: flow_name.to_string(),
1065 }
1066 );
1067 Ok(flow_name.0.swap_remove(0).to_string_unquoted())
1069}
1070
1071#[cfg(test)]
1072mod tests {
1073 use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
1074 use datatypes::value::Value;
1075 use session::context::{QueryContext, QueryContextBuilder};
1076 use sql::dialect::GreptimeDbDialect;
1077 use sql::parser::{ParseOptions, ParserContext};
1078 use sql::statements::statement::Statement;
1079 use store_api::storage::ColumnDefaultConstraint;
1080
1081 use super::*;
1082
1083 #[test]
1084 fn test_create_flow_tql_expr() {
1085 let sql = r#"
1086CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1087TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#;
1088 let stmt =
1089 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1090
1091 assert!(
1092 stmt.is_err(),
1093 "Expected error for invalid TQL EVAL parameters: {:#?}",
1094 stmt
1095 );
1096
1097 let sql = r#"
1098CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1099TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests);"#;
1100 let stmt =
1101 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1102 .unwrap()
1103 .pop()
1104 .unwrap();
1105
1106 let Statement::CreateFlow(create_flow) = stmt else {
1107 unreachable!()
1108 };
1109 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1110
1111 let to_dot_sep =
1112 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1113 assert_eq!("calc_reqs", expr.flow_name);
1114 assert_eq!("greptime", expr.catalog_name);
1115 assert_eq!(
1116 "greptime.public.cnt_reqs",
1117 expr.sink_table_name.map(to_dot_sep).unwrap()
1118 );
1119 assert_eq!(1, expr.source_table_names.len());
1120 assert_eq!(
1121 "greptime.public.http_requests",
1122 to_dot_sep(expr.source_table_names[0].clone())
1123 );
1124 assert_eq!(
1125 r#"TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests)"#,
1126 expr.sql
1127 );
1128
1129 let sql = r#"
1130CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1131TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests{__schema__="greptime_private"});"#;
1132 let stmt =
1133 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1134 .unwrap()
1135 .pop()
1136 .unwrap();
1137 let Statement::CreateFlow(create_flow) = stmt else {
1138 unreachable!()
1139 };
1140 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1141 assert_eq!(1, expr.source_table_names.len());
1142 assert_eq!(
1143 "greptime.greptime_private.http_requests",
1144 to_dot_sep(expr.source_table_names[0].clone())
1145 );
1146
1147 let sql = r#"
1148CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1149TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests{__database__="greptime_private"});"#;
1150 let stmt =
1151 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1152 .unwrap()
1153 .pop()
1154 .unwrap();
1155 let Statement::CreateFlow(create_flow) = stmt else {
1156 unreachable!()
1157 };
1158 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1159 assert_eq!(1, expr.source_table_names.len());
1160 assert_eq!(
1161 "greptime.greptime_private.http_requests",
1162 to_dot_sep(expr.source_table_names[0].clone())
1163 );
1164 }
1165
1166 #[test]
1167 fn test_create_flow_expr() {
1168 let sql = r"
1169CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
1170SELECT
1171 DISTINCT number as dis
1172FROM
1173 distinct_basic;";
1174 let stmt =
1175 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1176 .unwrap()
1177 .pop()
1178 .unwrap();
1179
1180 let Statement::CreateFlow(create_flow) = stmt else {
1181 unreachable!()
1182 };
1183 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1184
1185 let to_dot_sep =
1186 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1187 assert_eq!("test_distinct_basic", expr.flow_name);
1188 assert_eq!("greptime", expr.catalog_name);
1189 assert_eq!(
1190 "greptime.public.out_distinct_basic",
1191 expr.sink_table_name.map(to_dot_sep).unwrap()
1192 );
1193 assert_eq!(1, expr.source_table_names.len());
1194 assert_eq!(
1195 "greptime.public.distinct_basic",
1196 to_dot_sep(expr.source_table_names[0].clone())
1197 );
1198 assert_eq!(
1199 r"SELECT
1200 DISTINCT number as dis
1201FROM
1202 distinct_basic",
1203 expr.sql
1204 );
1205
1206 let sql = r"
1207CREATE FLOW `task_2`
1208SINK TO schema_1.table_1
1209AS
1210SELECT max(c1), min(c2) FROM schema_2.table_2;";
1211 let stmt =
1212 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1213 .unwrap()
1214 .pop()
1215 .unwrap();
1216
1217 let Statement::CreateFlow(create_flow) = stmt else {
1218 unreachable!()
1219 };
1220 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1221
1222 let to_dot_sep =
1223 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1224 assert_eq!("task_2", expr.flow_name);
1225 assert_eq!("greptime", expr.catalog_name);
1226 assert_eq!(
1227 "greptime.schema_1.table_1",
1228 expr.sink_table_name.map(to_dot_sep).unwrap()
1229 );
1230 assert_eq!(1, expr.source_table_names.len());
1231 assert_eq!(
1232 "greptime.schema_2.table_2",
1233 to_dot_sep(expr.source_table_names[0].clone())
1234 );
1235 assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
1236
1237 let sql = r"
1238CREATE FLOW abc.`task_2`
1239SINK TO schema_1.table_1
1240AS
1241SELECT max(c1), min(c2) FROM schema_2.table_2;";
1242 let stmt =
1243 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1244 .unwrap()
1245 .pop()
1246 .unwrap();
1247
1248 let Statement::CreateFlow(create_flow) = stmt else {
1249 unreachable!()
1250 };
1251 let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
1252
1253 assert!(res.is_err());
1254 assert!(
1255 res.unwrap_err()
1256 .to_string()
1257 .contains("Invalid flow name: abc.`task_2`")
1258 );
1259 }
1260
1261 #[test]
1262 fn test_create_to_expr() {
1263 let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(ttl='3days', write_buffer_size='1024KB');";
1264 let stmt =
1265 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1266 .unwrap()
1267 .pop()
1268 .unwrap();
1269
1270 let Statement::CreateTable(create_table) = stmt else {
1271 unreachable!()
1272 };
1273 let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1274 assert_eq!("3days", expr.table_options.get("ttl").unwrap());
1275 assert_eq!(
1276 "1.0MiB",
1277 expr.table_options.get("write_buffer_size").unwrap()
1278 );
1279 }
1280
1281 #[test]
1282 fn test_invalid_create_to_expr() {
1283 let cases = [
1284 "CREATE TABLE monitor (host STRING primary key, ts TIMESTAMP TIME INDEX, some_column text, some_column string);",
1286 "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, some_column STRING, PRIMARY KEY (some_column, host, some_column));",
1288 "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, PRIMARY KEY (host, ts));",
1290 ];
1291
1292 for sql in cases {
1293 let stmt = ParserContext::create_with_dialect(
1294 sql,
1295 &GreptimeDbDialect {},
1296 ParseOptions::default(),
1297 )
1298 .unwrap()
1299 .pop()
1300 .unwrap();
1301 let Statement::CreateTable(create_table) = stmt else {
1302 unreachable!()
1303 };
1304 create_to_expr(&create_table, &QueryContext::arc()).unwrap_err();
1305 }
1306 }
1307
1308 #[test]
1309 fn test_create_to_expr_with_default_timestamp_value() {
1310 let sql = "CREATE TABLE monitor (v double,ts TIMESTAMP default '2024-01-30T00:01:01',TIME INDEX (ts)) engine=mito;";
1311 let stmt =
1312 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1313 .unwrap()
1314 .pop()
1315 .unwrap();
1316
1317 let Statement::CreateTable(create_table) = stmt else {
1318 unreachable!()
1319 };
1320
1321 let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1323 let ts_column = &expr.column_defs[1];
1324 let constraint = assert_ts_column(ts_column);
1325 assert!(
1326 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1327 if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1328 );
1329
1330 let ctx = QueryContextBuilder::default()
1332 .timezone(Timezone::from_tz_string("+08:00").unwrap())
1333 .build()
1334 .into();
1335 let expr = create_to_expr(&create_table, &ctx).unwrap();
1336 let ts_column = &expr.column_defs[1];
1337 let constraint = assert_ts_column(ts_column);
1338 assert!(
1339 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1340 if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1341 );
1342 }
1343
1344 fn assert_ts_column(ts_column: &api::v1::ColumnDef) -> ColumnDefaultConstraint {
1345 assert_eq!("ts", ts_column.name);
1346 assert_eq!(
1347 ColumnDataType::TimestampMillisecond as i32,
1348 ts_column.data_type
1349 );
1350 assert!(!ts_column.default_constraint.is_empty());
1351
1352 ColumnDefaultConstraint::try_from(&ts_column.default_constraint[..]).unwrap()
1353 }
1354
1355 #[test]
1356 fn test_to_alter_expr() {
1357 let sql = "ALTER DATABASE greptime SET key1='value1', key2='value2';";
1358 let stmt =
1359 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1360 .unwrap()
1361 .pop()
1362 .unwrap();
1363
1364 let Statement::AlterDatabase(alter_database) = stmt else {
1365 unreachable!()
1366 };
1367
1368 let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1369 let kind = expr.kind.unwrap();
1370
1371 let AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
1372 set_database_options,
1373 }) = kind
1374 else {
1375 unreachable!()
1376 };
1377
1378 assert_eq!(2, set_database_options.len());
1379 assert_eq!("key1", set_database_options[0].key);
1380 assert_eq!("value1", set_database_options[0].value);
1381 assert_eq!("key2", set_database_options[1].key);
1382 assert_eq!("value2", set_database_options[1].value);
1383
1384 let sql = "ALTER DATABASE greptime UNSET key1, key2;";
1385 let stmt =
1386 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1387 .unwrap()
1388 .pop()
1389 .unwrap();
1390
1391 let Statement::AlterDatabase(alter_database) = stmt else {
1392 unreachable!()
1393 };
1394
1395 let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1396 let kind = expr.kind.unwrap();
1397
1398 let AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys }) = kind else {
1399 unreachable!()
1400 };
1401
1402 assert_eq!(2, keys.len());
1403 assert!(keys.contains(&"key1".to_string()));
1404 assert!(keys.contains(&"key2".to_string()));
1405
1406 let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
1407 let stmt =
1408 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1409 .unwrap()
1410 .pop()
1411 .unwrap();
1412
1413 let Statement::AlterTable(alter_table) = stmt else {
1414 unreachable!()
1415 };
1416
1417 let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1419 let kind = expr.kind.unwrap();
1420
1421 let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1422 unreachable!()
1423 };
1424
1425 assert_eq!(1, add_columns.len());
1426 let ts_column = add_columns[0].column_def.clone().unwrap();
1427 let constraint = assert_ts_column(&ts_column);
1428 assert!(
1429 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1430 if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1431 );
1432
1433 let ctx = QueryContextBuilder::default()
1436 .timezone(Timezone::from_tz_string("+08:00").unwrap())
1437 .build()
1438 .into();
1439 let expr = to_alter_table_expr(alter_table, &ctx).unwrap();
1440 let kind = expr.kind.unwrap();
1441
1442 let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1443 unreachable!()
1444 };
1445
1446 assert_eq!(1, add_columns.len());
1447 let ts_column = add_columns[0].column_def.clone().unwrap();
1448 let constraint = assert_ts_column(&ts_column);
1449 assert!(
1450 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1451 if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1452 );
1453 }
1454
1455 #[test]
1456 fn test_to_alter_modify_column_type_expr() {
1457 let sql = "ALTER TABLE monitor MODIFY COLUMN mem_usage STRING;";
1458 let stmt =
1459 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1460 .unwrap()
1461 .pop()
1462 .unwrap();
1463
1464 let Statement::AlterTable(alter_table) = stmt else {
1465 unreachable!()
1466 };
1467
1468 let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1470 let kind = expr.kind.unwrap();
1471
1472 let AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
1473 modify_column_types,
1474 }) = kind
1475 else {
1476 unreachable!()
1477 };
1478
1479 assert_eq!(1, modify_column_types.len());
1480 let modify_column_type = &modify_column_types[0];
1481
1482 assert_eq!("mem_usage", modify_column_type.column_name);
1483 assert_eq!(
1484 ColumnDataType::String as i32,
1485 modify_column_type.target_type
1486 );
1487 assert!(modify_column_type.target_type_extension.is_none());
1488 }
1489
1490 #[test]
1491 fn test_to_repartition_request() {
1492 let sql = r#"
1493ALTER TABLE metrics REPARTITION (
1494 device_id < 100
1495) INTO (
1496 device_id < 100 AND area < 'South',
1497 device_id < 100 AND area >= 'South'
1498);"#;
1499 let stmt =
1500 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1501 .unwrap()
1502 .pop()
1503 .unwrap();
1504
1505 let Statement::AlterTable(alter_table) = stmt else {
1506 unreachable!()
1507 };
1508
1509 let request = to_repartition_request(alter_table, &QueryContext::arc()).unwrap();
1510 assert_eq!("greptime", request.catalog_name);
1511 assert_eq!("public", request.schema_name);
1512 assert_eq!("metrics", request.table_name);
1513 assert_eq!(
1514 request
1515 .from_exprs
1516 .into_iter()
1517 .map(|x| x.to_string())
1518 .collect::<Vec<_>>(),
1519 vec!["device_id < 100".to_string()]
1520 );
1521 assert_eq!(
1522 request
1523 .into_exprs
1524 .into_iter()
1525 .map(|x| x.to_string())
1526 .collect::<Vec<_>>(),
1527 vec![
1528 "device_id < 100 AND area < 'South'".to_string(),
1529 "device_id < 100 AND area >= 'South'".to_string()
1530 ]
1531 );
1532 }
1533
1534 fn new_test_table_names() -> Vec<TableName> {
1535 vec![
1536 TableName {
1537 catalog_name: "greptime".to_string(),
1538 schema_name: "public".to_string(),
1539 table_name: "a_table".to_string(),
1540 },
1541 TableName {
1542 catalog_name: "greptime".to_string(),
1543 schema_name: "public".to_string(),
1544 table_name: "b_table".to_string(),
1545 },
1546 ]
1547 }
1548
1549 #[test]
1550 fn test_to_create_view_expr() {
1551 let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
1552 let stmt =
1553 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1554 .unwrap()
1555 .pop()
1556 .unwrap();
1557
1558 let Statement::CreateView(stmt) = stmt else {
1559 unreachable!()
1560 };
1561
1562 let logical_plan = vec![1, 2, 3];
1563 let table_names = new_test_table_names();
1564 let columns = vec!["a".to_string()];
1565 let plan_columns = vec!["number".to_string()];
1566
1567 let expr = to_create_view_expr(
1568 stmt,
1569 logical_plan.clone(),
1570 table_names.clone(),
1571 columns.clone(),
1572 plan_columns.clone(),
1573 sql.to_string(),
1574 QueryContext::arc(),
1575 )
1576 .unwrap();
1577
1578 assert_eq!("greptime", expr.catalog_name);
1579 assert_eq!("public", expr.schema_name);
1580 assert_eq!("test", expr.view_name);
1581 assert!(!expr.create_if_not_exists);
1582 assert!(!expr.or_replace);
1583 assert_eq!(logical_plan, expr.logical_plan);
1584 assert_eq!(table_names, expr.table_names);
1585 assert_eq!(sql, expr.definition);
1586 assert_eq!(columns, expr.columns);
1587 assert_eq!(plan_columns, expr.plan_columns);
1588 }
1589
1590 #[test]
1591 fn test_to_create_view_expr_complex() {
1592 let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test.test_view AS SELECT * FROM NUMBERS";
1593 let stmt =
1594 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1595 .unwrap()
1596 .pop()
1597 .unwrap();
1598
1599 let Statement::CreateView(stmt) = stmt else {
1600 unreachable!()
1601 };
1602
1603 let logical_plan = vec![1, 2, 3];
1604 let table_names = new_test_table_names();
1605 let columns = vec!["a".to_string()];
1606 let plan_columns = vec!["number".to_string()];
1607
1608 let expr = to_create_view_expr(
1609 stmt,
1610 logical_plan.clone(),
1611 table_names.clone(),
1612 columns.clone(),
1613 plan_columns.clone(),
1614 sql.to_string(),
1615 QueryContext::arc(),
1616 )
1617 .unwrap();
1618
1619 assert_eq!("greptime", expr.catalog_name);
1620 assert_eq!("test", expr.schema_name);
1621 assert_eq!("test_view", expr.view_name);
1622 assert!(expr.create_if_not_exists);
1623 assert!(expr.or_replace);
1624 assert_eq!(logical_plan, expr.logical_plan);
1625 assert_eq!(table_names, expr.table_names);
1626 assert_eq!(sql, expr.definition);
1627 assert_eq!(columns, expr.columns);
1628 assert_eq!(plan_columns, expr.plan_columns);
1629 }
1630
1631 #[test]
1632 fn test_expr_to_create() {
1633 let sql = r#"CREATE TABLE IF NOT EXISTS `tt` (
1634 `timestamp` TIMESTAMP(9) NOT NULL,
1635 `ip_address` STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),
1636 `username` STRING NULL,
1637 `http_method` STRING NULL INVERTED INDEX,
1638 `request_line` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),
1639 `protocol` STRING NULL,
1640 `status_code` INT NULL INVERTED INDEX,
1641 `response_size` BIGINT NULL,
1642 `message` STRING NULL,
1643 TIME INDEX (`timestamp`),
1644 PRIMARY KEY (`username`, `status_code`)
1645)
1646ENGINE=mito
1647WITH(
1648 append_mode = 'true'
1649)"#;
1650 let stmt =
1651 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1652 .unwrap()
1653 .pop()
1654 .unwrap();
1655
1656 let Statement::CreateTable(original_create) = stmt else {
1657 unreachable!()
1658 };
1659
1660 let expr = create_to_expr(&original_create, &QueryContext::arc()).unwrap();
1662
1663 let create_table = expr_to_create(&expr, Some('`')).unwrap();
1664 let new_sql = format!("{:#}", create_table);
1665 assert_eq!(sql, new_sql);
1666 }
1667}