1#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19
20use api::helper::ColumnDataTypeWrapper;
21use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
22use api::v1::alter_table_expr::Kind as AlterTableKind;
23use api::v1::column_def::options_from_column_schema;
24use api::v1::{
25 set_index, unset_index, AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer,
26 ColumnDataType, ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
27 DropColumn, DropColumns, DropDefaults, ExpireAfter, FulltextBackend as PbFulltextBackend,
28 ModifyColumnType, ModifyColumnTypes, RenameTable, SemanticType, SetDatabaseOptions,
29 SetDefaults, SetFulltext, SetIndex, SetIndexes, SetInverted, SetSkipping, SetTableOptions,
30 SkippingIndexType as PbSkippingIndexType, TableName, UnsetDatabaseOptions, UnsetFulltext,
31 UnsetIndex, UnsetIndexes, UnsetInverted, UnsetSkipping, UnsetTableOptions,
32};
33use common_error::ext::BoxedError;
34use common_grpc_expr::util::ColumnExpr;
35use common_time::Timezone;
36use datafusion::sql::planner::object_name_to_table_reference;
37use datatypes::schema::{
38 ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SkippingIndexType, COMMENT_KEY,
39};
40use file_engine::FileOptions;
41use query::sql::{
42 check_file_to_table_schema_compatibility, file_column_schemas_to_table,
43 infer_file_table_schema, prepare_file_table_files,
44};
45use session::context::QueryContextRef;
46use session::table_name::table_idents_to_full_name;
47use snafu::{ensure, OptionExt, ResultExt};
48use sql::ast::{ColumnOption, ObjectName};
49use sql::statements::alter::{
50 AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
51};
52use sql::statements::create::{
53 Column as SqlColumn, CreateExternalTable, CreateFlow, CreateTable, CreateView, TableConstraint,
54};
55use sql::statements::{
56 column_to_schema, sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type,
57};
58use sql::util::extract_tables_from_query;
59use table::requests::{TableOptions, FILE_TABLE_META_KEY};
60use table::table_reference::TableReference;
61#[cfg(feature = "enterprise")]
62pub use trigger::to_create_trigger_task_expr;
63
64use crate::error::{
65 BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
66 ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, FindNewColumnsOnInsertionSnafu,
67 IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidFlowNameSnafu, InvalidSqlSnafu,
68 NotSupportedSnafu, ParseSqlSnafu, PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu,
69 UnrecognizedTableOptionSnafu,
70};
71
72pub fn create_table_expr_by_column_schemas(
73 table_name: &TableReference<'_>,
74 column_schemas: &[api::v1::ColumnSchema],
75 engine: &str,
76 desc: Option<&str>,
77) -> Result<CreateTableExpr> {
78 let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
79 let expr = common_grpc_expr::util::build_create_table_expr(
80 None,
81 table_name,
82 column_exprs,
83 engine,
84 desc.unwrap_or("Created on insertion"),
85 )
86 .context(BuildCreateExprOnInsertionSnafu)?;
87
88 validate_create_expr(&expr)?;
89 Ok(expr)
90}
91
92pub fn extract_add_columns_expr(
93 schema: &Schema,
94 column_exprs: Vec<ColumnExpr>,
95) -> Result<Option<AddColumns>> {
96 let add_columns = common_grpc_expr::util::extract_new_columns(schema, column_exprs)
97 .context(FindNewColumnsOnInsertionSnafu)?;
98 if let Some(add_columns) = &add_columns {
99 validate_add_columns_expr(add_columns)?;
100 }
101 Ok(add_columns)
102}
103
104pub(crate) async fn create_external_expr(
128 create: CreateExternalTable,
129 query_ctx: &QueryContextRef,
130) -> Result<CreateTableExpr> {
131 let (catalog_name, schema_name, table_name) =
132 table_idents_to_full_name(&create.name, query_ctx)
133 .map_err(BoxedError::new)
134 .context(ExternalSnafu)?;
135
136 let mut table_options = create.options.into_map();
137
138 let (object_store, files) = prepare_file_table_files(&table_options)
139 .await
140 .context(PrepareFileTableSnafu)?;
141
142 let file_column_schemas = infer_file_table_schema(&object_store, &files, &table_options)
143 .await
144 .context(InferFileTableSchemaSnafu)?
145 .column_schemas;
146
147 let (time_index, primary_keys, table_column_schemas) = if !create.columns.is_empty() {
148 let time_index = find_time_index(&create.constraints)?;
150 let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
151 let column_schemas =
152 columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
153 (time_index, primary_keys, column_schemas)
154 } else {
155 let (column_schemas, time_index) = file_column_schemas_to_table(&file_column_schemas);
157 let primary_keys = vec![];
158 (time_index, primary_keys, column_schemas)
159 };
160
161 check_file_to_table_schema_compatibility(&file_column_schemas, &table_column_schemas)
162 .context(SchemaIncompatibleSnafu)?;
163
164 let meta = FileOptions {
165 files,
166 file_column_schemas,
167 };
168 table_options.insert(
169 FILE_TABLE_META_KEY.to_string(),
170 serde_json::to_string(&meta).context(EncodeJsonSnafu)?,
171 );
172
173 let column_defs = column_schemas_to_defs(table_column_schemas, &primary_keys)?;
174 let expr = CreateTableExpr {
175 catalog_name,
176 schema_name,
177 table_name,
178 desc: String::default(),
179 column_defs,
180 time_index,
181 primary_keys,
182 create_if_not_exists: create.if_not_exists,
183 table_options,
184 table_id: None,
185 engine: create.engine.to_string(),
186 };
187
188 Ok(expr)
189}
190
191pub fn create_to_expr(
193 create: &CreateTable,
194 query_ctx: &QueryContextRef,
195) -> Result<CreateTableExpr> {
196 let (catalog_name, schema_name, table_name) =
197 table_idents_to_full_name(&create.name, query_ctx)
198 .map_err(BoxedError::new)
199 .context(ExternalSnafu)?;
200
201 let time_index = find_time_index(&create.constraints)?;
202 let table_options = HashMap::from(
203 &TableOptions::try_from_iter(create.options.to_str_map())
204 .context(UnrecognizedTableOptionSnafu)?,
205 );
206
207 let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
208
209 let expr = CreateTableExpr {
210 catalog_name,
211 schema_name,
212 table_name,
213 desc: String::default(),
214 column_defs: columns_to_expr(
215 &create.columns,
216 &time_index,
217 &primary_keys,
218 Some(&query_ctx.timezone()),
219 )?,
220 time_index,
221 primary_keys,
222 create_if_not_exists: create.if_not_exists,
223 table_options,
224 table_id: None,
225 engine: create.engine.to_string(),
226 };
227
228 validate_create_expr(&expr)?;
229 Ok(expr)
230}
231
232pub fn validate_create_expr(create: &CreateTableExpr) -> Result<()> {
234 let mut column_to_indices = HashMap::with_capacity(create.column_defs.len());
236 for (idx, column) in create.column_defs.iter().enumerate() {
237 if let Some(indices) = column_to_indices.get(&column.name) {
238 return InvalidSqlSnafu {
239 err_msg: format!(
240 "column name `{}` is duplicated at index {} and {}",
241 column.name, indices, idx
242 ),
243 }
244 .fail();
245 }
246 column_to_indices.insert(&column.name, idx);
247 }
248
249 let _ = column_to_indices
251 .get(&create.time_index)
252 .with_context(|| InvalidSqlSnafu {
253 err_msg: format!(
254 "column name `{}` is not found in column list",
255 create.time_index
256 ),
257 })?;
258
259 for pk in &create.primary_keys {
261 let _ = column_to_indices
262 .get(&pk)
263 .with_context(|| InvalidSqlSnafu {
264 err_msg: format!("column name `{}` is not found in column list", pk),
265 })?;
266 }
267
268 let mut pk_set = HashSet::new();
270 for pk in &create.primary_keys {
271 if !pk_set.insert(pk) {
272 return InvalidSqlSnafu {
273 err_msg: format!("column name `{}` is duplicated in primary keys", pk),
274 }
275 .fail();
276 }
277 }
278
279 if pk_set.contains(&create.time_index) {
281 return InvalidSqlSnafu {
282 err_msg: format!(
283 "column name `{}` is both primary key and time index",
284 create.time_index
285 ),
286 }
287 .fail();
288 }
289
290 for column in &create.column_defs {
291 if is_interval_type(&column.data_type()) {
293 return InvalidSqlSnafu {
294 err_msg: format!(
295 "column name `{}` is interval type, which is not supported",
296 column.name
297 ),
298 }
299 .fail();
300 }
301 if is_date_time_type(&column.data_type()) {
303 return InvalidSqlSnafu {
304 err_msg: format!(
305 "column name `{}` is datetime type, which is not supported, please use `timestamp` type instead",
306 column.name
307 ),
308 }
309 .fail();
310 }
311 }
312 Ok(())
313}
314
315fn validate_add_columns_expr(add_columns: &AddColumns) -> Result<()> {
316 for add_column in &add_columns.add_columns {
317 let Some(column_def) = &add_column.column_def else {
318 continue;
319 };
320 if is_date_time_type(&column_def.data_type()) {
321 return InvalidSqlSnafu {
322 err_msg: format!("column name `{}` is datetime type, which is not supported, please use `timestamp` type instead", column_def.name),
323 }
324 .fail();
325 }
326 if is_interval_type(&column_def.data_type()) {
327 return InvalidSqlSnafu {
328 err_msg: format!(
329 "column name `{}` is interval type, which is not supported",
330 column_def.name
331 ),
332 }
333 .fail();
334 }
335 }
336 Ok(())
337}
338
339fn is_date_time_type(data_type: &ColumnDataType) -> bool {
340 matches!(data_type, ColumnDataType::Datetime)
341}
342
343fn is_interval_type(data_type: &ColumnDataType) -> bool {
344 matches!(
345 data_type,
346 ColumnDataType::IntervalYearMonth
347 | ColumnDataType::IntervalDayTime
348 | ColumnDataType::IntervalMonthDayNano
349 )
350}
351
352fn find_primary_keys(
353 columns: &[SqlColumn],
354 constraints: &[TableConstraint],
355) -> Result<Vec<String>> {
356 let columns_pk = columns
357 .iter()
358 .filter_map(|x| {
359 if x.options().iter().any(|o| {
360 matches!(
361 o.option,
362 ColumnOption::Unique {
363 is_primary: true,
364 ..
365 }
366 )
367 }) {
368 Some(x.name().value.clone())
369 } else {
370 None
371 }
372 })
373 .collect::<Vec<String>>();
374
375 ensure!(
376 columns_pk.len() <= 1,
377 IllegalPrimaryKeysDefSnafu {
378 msg: "not allowed to inline multiple primary keys in columns options"
379 }
380 );
381
382 let constraints_pk = constraints
383 .iter()
384 .filter_map(|constraint| match constraint {
385 TableConstraint::PrimaryKey { columns, .. } => {
386 Some(columns.iter().map(|ident| ident.value.clone()))
387 }
388 _ => None,
389 })
390 .flatten()
391 .collect::<Vec<String>>();
392
393 ensure!(
394 columns_pk.is_empty() || constraints_pk.is_empty(),
395 IllegalPrimaryKeysDefSnafu {
396 msg: "found definitions of primary keys in multiple places"
397 }
398 );
399
400 let mut primary_keys = Vec::with_capacity(columns_pk.len() + constraints_pk.len());
401 primary_keys.extend(columns_pk);
402 primary_keys.extend(constraints_pk);
403 Ok(primary_keys)
404}
405
406pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
407 let time_index = constraints
408 .iter()
409 .filter_map(|constraint| match constraint {
410 TableConstraint::TimeIndex { column, .. } => Some(&column.value),
411 _ => None,
412 })
413 .collect::<Vec<&String>>();
414 ensure!(
415 time_index.len() == 1,
416 InvalidSqlSnafu {
417 err_msg: "must have one and only one TimeIndex columns",
418 }
419 );
420 Ok(time_index.first().unwrap().to_string())
421}
422
423fn columns_to_expr(
424 column_defs: &[SqlColumn],
425 time_index: &str,
426 primary_keys: &[String],
427 timezone: Option<&Timezone>,
428) -> Result<Vec<api::v1::ColumnDef>> {
429 let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
430 column_schemas_to_defs(column_schemas, primary_keys)
431}
432
433fn columns_to_column_schemas(
434 columns: &[SqlColumn],
435 time_index: &str,
436 timezone: Option<&Timezone>,
437) -> Result<Vec<ColumnSchema>> {
438 columns
439 .iter()
440 .map(|c| column_to_schema(c, time_index, timezone).context(ParseSqlSnafu))
441 .collect::<Result<Vec<ColumnSchema>>>()
442}
443
444pub fn column_schemas_to_defs(
446 column_schemas: Vec<ColumnSchema>,
447 primary_keys: &[String],
448) -> Result<Vec<api::v1::ColumnDef>> {
449 let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
450 .iter()
451 .map(|c| {
452 ColumnDataTypeWrapper::try_from(c.data_type.clone())
453 .map(|w| w.to_parts())
454 .context(ColumnDataTypeSnafu)
455 })
456 .collect::<Result<Vec<_>>>()?;
457
458 column_schemas
459 .iter()
460 .zip(column_datatypes)
461 .map(|(schema, datatype)| {
462 let semantic_type = if schema.is_time_index() {
463 SemanticType::Timestamp
464 } else if primary_keys.contains(&schema.name) {
465 SemanticType::Tag
466 } else {
467 SemanticType::Field
468 } as i32;
469 let comment = schema
470 .metadata()
471 .get(COMMENT_KEY)
472 .cloned()
473 .unwrap_or_default();
474
475 Ok(api::v1::ColumnDef {
476 name: schema.name.clone(),
477 data_type: datatype.0 as i32,
478 is_nullable: schema.is_nullable(),
479 default_constraint: match schema.default_constraint() {
480 None => vec![],
481 Some(v) => {
482 v.clone()
483 .try_into()
484 .context(ConvertColumnDefaultConstraintSnafu {
485 column_name: &schema.name,
486 })?
487 }
488 },
489 semantic_type,
490 comment,
491 datatype_extension: datatype.1,
492 options: options_from_column_schema(schema),
493 })
494 })
495 .collect()
496}
497
498pub(crate) fn to_alter_table_expr(
500 alter_table: AlterTable,
501 query_ctx: &QueryContextRef,
502) -> Result<AlterTableExpr> {
503 let (catalog_name, schema_name, table_name) =
504 table_idents_to_full_name(alter_table.table_name(), query_ctx)
505 .map_err(BoxedError::new)
506 .context(ExternalSnafu)?;
507
508 let kind = match alter_table.alter_operation {
509 AlterTableOperation::AddConstraint(_) => {
510 return NotSupportedSnafu {
511 feat: "ADD CONSTRAINT",
512 }
513 .fail();
514 }
515 AlterTableOperation::AddColumns { add_columns } => AlterTableKind::AddColumns(AddColumns {
516 add_columns: add_columns
517 .into_iter()
518 .map(|add_column| {
519 let column_def = sql_column_def_to_grpc_column_def(
520 &add_column.column_def,
521 Some(&query_ctx.timezone()),
522 )
523 .map_err(BoxedError::new)
524 .context(ExternalSnafu)?;
525 if is_interval_type(&column_def.data_type()) {
526 return NotSupportedSnafu {
527 feat: "Add column with interval type",
528 }
529 .fail();
530 }
531 Ok(AddColumn {
532 column_def: Some(column_def),
533 location: add_column.location.as_ref().map(From::from),
534 add_if_not_exists: add_column.add_if_not_exists,
535 })
536 })
537 .collect::<Result<Vec<AddColumn>>>()?,
538 }),
539 AlterTableOperation::ModifyColumnType {
540 column_name,
541 target_type,
542 } => {
543 let target_type =
544 sql_data_type_to_concrete_data_type(&target_type).context(ParseSqlSnafu)?;
545 let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
546 .map(|w| w.to_parts())
547 .context(ColumnDataTypeSnafu)?;
548 if is_interval_type(&target_type) {
549 return NotSupportedSnafu {
550 feat: "Modify column type to interval type",
551 }
552 .fail();
553 }
554 AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
555 modify_column_types: vec![ModifyColumnType {
556 column_name: column_name.value,
557 target_type: target_type as i32,
558 target_type_extension,
559 }],
560 })
561 }
562 AlterTableOperation::DropColumn { name } => AlterTableKind::DropColumns(DropColumns {
563 drop_columns: vec![DropColumn {
564 name: name.value.to_string(),
565 }],
566 }),
567 AlterTableOperation::RenameTable { new_table_name } => {
568 AlterTableKind::RenameTable(RenameTable {
569 new_table_name: new_table_name.to_string(),
570 })
571 }
572 AlterTableOperation::SetTableOptions { options } => {
573 AlterTableKind::SetTableOptions(SetTableOptions {
574 table_options: options.into_iter().map(Into::into).collect(),
575 })
576 }
577 AlterTableOperation::UnsetTableOptions { keys } => {
578 AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys })
579 }
580 AlterTableOperation::SetIndex { options } => {
581 let option = match options {
582 sql::statements::alter::SetIndexOperation::Fulltext {
583 column_name,
584 options,
585 } => SetIndex {
586 options: Some(set_index::Options::Fulltext(SetFulltext {
587 column_name: column_name.value,
588 enable: options.enable,
589 analyzer: match options.analyzer {
590 FulltextAnalyzer::English => Analyzer::English.into(),
591 FulltextAnalyzer::Chinese => Analyzer::Chinese.into(),
592 },
593 case_sensitive: options.case_sensitive,
594 backend: match options.backend {
595 FulltextBackend::Bloom => PbFulltextBackend::Bloom.into(),
596 FulltextBackend::Tantivy => PbFulltextBackend::Tantivy.into(),
597 },
598 granularity: options.granularity as u64,
599 false_positive_rate: options.false_positive_rate(),
600 })),
601 },
602 sql::statements::alter::SetIndexOperation::Inverted { column_name } => SetIndex {
603 options: Some(set_index::Options::Inverted(SetInverted {
604 column_name: column_name.value,
605 })),
606 },
607 sql::statements::alter::SetIndexOperation::Skipping {
608 column_name,
609 options,
610 } => SetIndex {
611 options: Some(set_index::Options::Skipping(SetSkipping {
612 column_name: column_name.value,
613 enable: true,
614 granularity: options.granularity as u64,
615 false_positive_rate: options.false_positive_rate(),
616 skipping_index_type: match options.index_type {
617 SkippingIndexType::BloomFilter => {
618 PbSkippingIndexType::BloomFilter.into()
619 }
620 },
621 })),
622 },
623 };
624 AlterTableKind::SetIndexes(SetIndexes {
625 set_indexes: vec![option],
626 })
627 }
628 AlterTableOperation::UnsetIndex { options } => {
629 let option = match options {
630 sql::statements::alter::UnsetIndexOperation::Fulltext { column_name } => {
631 UnsetIndex {
632 options: Some(unset_index::Options::Fulltext(UnsetFulltext {
633 column_name: column_name.value,
634 })),
635 }
636 }
637 sql::statements::alter::UnsetIndexOperation::Inverted { column_name } => {
638 UnsetIndex {
639 options: Some(unset_index::Options::Inverted(UnsetInverted {
640 column_name: column_name.value,
641 })),
642 }
643 }
644 sql::statements::alter::UnsetIndexOperation::Skipping { column_name } => {
645 UnsetIndex {
646 options: Some(unset_index::Options::Skipping(UnsetSkipping {
647 column_name: column_name.value,
648 })),
649 }
650 }
651 };
652
653 AlterTableKind::UnsetIndexes(UnsetIndexes {
654 unset_indexes: vec![option],
655 })
656 }
657 AlterTableOperation::DropDefaults { columns } => {
658 AlterTableKind::DropDefaults(DropDefaults {
659 drop_defaults: columns
660 .into_iter()
661 .map(|col| {
662 let column_name = col.0.to_string();
663 Ok(api::v1::DropDefault { column_name })
664 })
665 .collect::<Result<Vec<_>>>()?,
666 })
667 }
668 AlterTableOperation::SetDefaults { defaults } => AlterTableKind::SetDefaults(SetDefaults {
669 set_defaults: defaults
670 .into_iter()
671 .map(|col| {
672 let column_name = col.column_name.to_string();
673 let default_constraint = serde_json::to_string(&col.default_constraint)
674 .context(EncodeJsonSnafu)?
675 .into_bytes();
676 Ok(api::v1::SetDefault {
677 column_name,
678 default_constraint,
679 })
680 })
681 .collect::<Result<Vec<_>>>()?,
682 }),
683 };
684
685 Ok(AlterTableExpr {
686 catalog_name,
687 schema_name,
688 table_name,
689 kind: Some(kind),
690 })
691}
692
693pub fn to_alter_database_expr(
695 alter_database: AlterDatabase,
696 query_ctx: &QueryContextRef,
697) -> Result<AlterDatabaseExpr> {
698 let catalog = query_ctx.current_catalog();
699 let schema = alter_database.database_name;
700
701 let kind = match alter_database.alter_operation {
702 AlterDatabaseOperation::SetDatabaseOption { options } => {
703 let options = options.into_iter().map(Into::into).collect();
704 AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
705 set_database_options: options,
706 })
707 }
708 AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
709 AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys })
710 }
711 };
712
713 Ok(AlterDatabaseExpr {
714 catalog_name: catalog.to_string(),
715 schema_name: schema.to_string(),
716 kind: Some(kind),
717 })
718}
719
720pub fn to_create_view_expr(
722 stmt: CreateView,
723 logical_plan: Vec<u8>,
724 table_names: Vec<TableName>,
725 columns: Vec<String>,
726 plan_columns: Vec<String>,
727 definition: String,
728 query_ctx: QueryContextRef,
729) -> Result<CreateViewExpr> {
730 let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx)
731 .map_err(BoxedError::new)
732 .context(ExternalSnafu)?;
733
734 let expr = CreateViewExpr {
735 catalog_name,
736 schema_name,
737 view_name,
738 logical_plan,
739 create_if_not_exists: stmt.if_not_exists,
740 or_replace: stmt.or_replace,
741 table_names,
742 columns,
743 plan_columns,
744 definition,
745 };
746
747 Ok(expr)
748}
749
750pub fn to_create_flow_task_expr(
751 create_flow: CreateFlow,
752 query_ctx: &QueryContextRef,
753) -> Result<CreateFlowExpr> {
754 let sink_table_ref =
756 object_name_to_table_reference(create_flow.sink_table_name.clone().into(), true)
757 .with_context(|_| ConvertIdentifierSnafu {
758 ident: create_flow.sink_table_name.to_string(),
759 })?;
760 let catalog = sink_table_ref
761 .catalog()
762 .unwrap_or(query_ctx.current_catalog())
763 .to_string();
764 let schema = sink_table_ref
765 .schema()
766 .map(|s| s.to_owned())
767 .unwrap_or(query_ctx.current_schema());
768
769 let sink_table_name = TableName {
770 catalog_name: catalog,
771 schema_name: schema,
772 table_name: sink_table_ref.table().to_string(),
773 };
774
775 let source_table_names = extract_tables_from_query(&create_flow.query)
776 .map(|name| {
777 let reference = object_name_to_table_reference(name.clone().into(), true)
778 .with_context(|_| ConvertIdentifierSnafu {
779 ident: name.to_string(),
780 })?;
781 let catalog = reference
782 .catalog()
783 .unwrap_or(query_ctx.current_catalog())
784 .to_string();
785 let schema = reference
786 .schema()
787 .map(|s| s.to_string())
788 .unwrap_or(query_ctx.current_schema());
789
790 let table_name = TableName {
791 catalog_name: catalog,
792 schema_name: schema,
793 table_name: reference.table().to_string(),
794 };
795 Ok(table_name)
796 })
797 .collect::<Result<Vec<_>>>()?;
798
799 Ok(CreateFlowExpr {
800 catalog_name: query_ctx.current_catalog().to_string(),
801 flow_name: sanitize_flow_name(create_flow.flow_name)?,
802 source_table_names,
803 sink_table_name: Some(sink_table_name),
804 or_replace: create_flow.or_replace,
805 create_if_not_exists: create_flow.if_not_exists,
806 expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
807 comment: create_flow.comment.unwrap_or_default(),
808 sql: create_flow.query.to_string(),
809 flow_options: HashMap::new(),
810 })
811}
812
813fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
815 ensure!(
816 flow_name.0.len() == 1,
817 InvalidFlowNameSnafu {
818 name: flow_name.to_string(),
819 }
820 );
821 Ok(flow_name.0.swap_remove(0).value)
823}
824
825#[cfg(test)]
826mod tests {
827 use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
828 use datatypes::value::Value;
829 use session::context::{QueryContext, QueryContextBuilder};
830 use sql::dialect::GreptimeDbDialect;
831 use sql::parser::{ParseOptions, ParserContext};
832 use sql::statements::statement::Statement;
833 use store_api::storage::ColumnDefaultConstraint;
834
835 use super::*;
836
837 #[test]
838 fn test_create_flow_tql_expr() {
839 let sql = r#"
840CREATE FLOW calc_reqs SINK TO cnt_reqs AS
841TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#;
842 let stmt =
843 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
844 .unwrap()
845 .pop()
846 .unwrap();
847
848 let Statement::CreateFlow(create_flow) = stmt else {
849 unreachable!()
850 };
851 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
852
853 let to_dot_sep =
854 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
855 assert_eq!("calc_reqs", expr.flow_name);
856 assert_eq!("greptime", expr.catalog_name);
857 assert_eq!(
858 "greptime.public.cnt_reqs",
859 expr.sink_table_name.map(to_dot_sep).unwrap()
860 );
861 assert!(expr.source_table_names.is_empty());
862 assert_eq!(
863 r#"TQL EVAL (0, 15, '5s') count_values("status_code", http_requests)"#,
864 expr.sql
865 );
866 }
867
868 #[test]
869 fn test_create_flow_expr() {
870 let sql = r"
871CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
872SELECT
873 DISTINCT number as dis
874FROM
875 distinct_basic;";
876 let stmt =
877 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
878 .unwrap()
879 .pop()
880 .unwrap();
881
882 let Statement::CreateFlow(create_flow) = stmt else {
883 unreachable!()
884 };
885 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
886
887 let to_dot_sep =
888 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
889 assert_eq!("test_distinct_basic", expr.flow_name);
890 assert_eq!("greptime", expr.catalog_name);
891 assert_eq!(
892 "greptime.public.out_distinct_basic",
893 expr.sink_table_name.map(to_dot_sep).unwrap()
894 );
895 assert_eq!(1, expr.source_table_names.len());
896 assert_eq!(
897 "greptime.public.distinct_basic",
898 to_dot_sep(expr.source_table_names[0].clone())
899 );
900 assert_eq!(
901 r"SELECT
902 DISTINCT number as dis
903FROM
904 distinct_basic",
905 expr.sql
906 );
907
908 let sql = r"
909CREATE FLOW `task_2`
910SINK TO schema_1.table_1
911AS
912SELECT max(c1), min(c2) FROM schema_2.table_2;";
913 let stmt =
914 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
915 .unwrap()
916 .pop()
917 .unwrap();
918
919 let Statement::CreateFlow(create_flow) = stmt else {
920 unreachable!()
921 };
922 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
923
924 let to_dot_sep =
925 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
926 assert_eq!("task_2", expr.flow_name);
927 assert_eq!("greptime", expr.catalog_name);
928 assert_eq!(
929 "greptime.schema_1.table_1",
930 expr.sink_table_name.map(to_dot_sep).unwrap()
931 );
932 assert_eq!(1, expr.source_table_names.len());
933 assert_eq!(
934 "greptime.schema_2.table_2",
935 to_dot_sep(expr.source_table_names[0].clone())
936 );
937 assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
938
939 let sql = r"
940CREATE FLOW abc.`task_2`
941SINK TO schema_1.table_1
942AS
943SELECT max(c1), min(c2) FROM schema_2.table_2;";
944 let stmt =
945 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
946 .unwrap()
947 .pop()
948 .unwrap();
949
950 let Statement::CreateFlow(create_flow) = stmt else {
951 unreachable!()
952 };
953 let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
954
955 assert!(res.is_err());
956 assert!(res
957 .unwrap_err()
958 .to_string()
959 .contains("Invalid flow name: abc.`task_2`"));
960 }
961
962 #[test]
963 fn test_create_to_expr() {
964 let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(ttl='3days', write_buffer_size='1024KB');";
965 let stmt =
966 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
967 .unwrap()
968 .pop()
969 .unwrap();
970
971 let Statement::CreateTable(create_table) = stmt else {
972 unreachable!()
973 };
974 let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
975 assert_eq!("3days", expr.table_options.get("ttl").unwrap());
976 assert_eq!(
977 "1.0MiB",
978 expr.table_options.get("write_buffer_size").unwrap()
979 );
980 }
981
982 #[test]
983 fn test_invalid_create_to_expr() {
984 let cases = [
985 "CREATE TABLE monitor (host STRING primary key, ts TIMESTAMP TIME INDEX, some_column text, some_column string);",
987 "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, some_column STRING, PRIMARY KEY (some_column, host, some_column));",
989 "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, PRIMARY KEY (host, ts));"
991 ];
992
993 for sql in cases {
994 let stmt = ParserContext::create_with_dialect(
995 sql,
996 &GreptimeDbDialect {},
997 ParseOptions::default(),
998 )
999 .unwrap()
1000 .pop()
1001 .unwrap();
1002 let Statement::CreateTable(create_table) = stmt else {
1003 unreachable!()
1004 };
1005 create_to_expr(&create_table, &QueryContext::arc()).unwrap_err();
1006 }
1007 }
1008
1009 #[test]
1010 fn test_create_to_expr_with_default_timestamp_value() {
1011 let sql = "CREATE TABLE monitor (v double,ts TIMESTAMP default '2024-01-30T00:01:01',TIME INDEX (ts)) engine=mito;";
1012 let stmt =
1013 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1014 .unwrap()
1015 .pop()
1016 .unwrap();
1017
1018 let Statement::CreateTable(create_table) = stmt else {
1019 unreachable!()
1020 };
1021
1022 let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1024 let ts_column = &expr.column_defs[1];
1025 let constraint = assert_ts_column(ts_column);
1026 assert!(
1027 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1028 if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1029 );
1030
1031 let ctx = QueryContextBuilder::default()
1033 .timezone(Timezone::from_tz_string("+08:00").unwrap())
1034 .build()
1035 .into();
1036 let expr = create_to_expr(&create_table, &ctx).unwrap();
1037 let ts_column = &expr.column_defs[1];
1038 let constraint = assert_ts_column(ts_column);
1039 assert!(
1040 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1041 if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1042 );
1043 }
1044
1045 fn assert_ts_column(ts_column: &api::v1::ColumnDef) -> ColumnDefaultConstraint {
1046 assert_eq!("ts", ts_column.name);
1047 assert_eq!(
1048 ColumnDataType::TimestampMillisecond as i32,
1049 ts_column.data_type
1050 );
1051 assert!(!ts_column.default_constraint.is_empty());
1052
1053 ColumnDefaultConstraint::try_from(&ts_column.default_constraint[..]).unwrap()
1054 }
1055
1056 #[test]
1057 fn test_to_alter_expr() {
1058 let sql = "ALTER DATABASE greptime SET key1='value1', key2='value2';";
1059 let stmt =
1060 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1061 .unwrap()
1062 .pop()
1063 .unwrap();
1064
1065 let Statement::AlterDatabase(alter_database) = stmt else {
1066 unreachable!()
1067 };
1068
1069 let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1070 let kind = expr.kind.unwrap();
1071
1072 let AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
1073 set_database_options,
1074 }) = kind
1075 else {
1076 unreachable!()
1077 };
1078
1079 assert_eq!(2, set_database_options.len());
1080 assert_eq!("key1", set_database_options[0].key);
1081 assert_eq!("value1", set_database_options[0].value);
1082 assert_eq!("key2", set_database_options[1].key);
1083 assert_eq!("value2", set_database_options[1].value);
1084
1085 let sql = "ALTER DATABASE greptime UNSET key1, key2;";
1086 let stmt =
1087 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1088 .unwrap()
1089 .pop()
1090 .unwrap();
1091
1092 let Statement::AlterDatabase(alter_database) = stmt else {
1093 unreachable!()
1094 };
1095
1096 let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1097 let kind = expr.kind.unwrap();
1098
1099 let AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys }) = kind else {
1100 unreachable!()
1101 };
1102
1103 assert_eq!(2, keys.len());
1104 assert!(keys.contains(&"key1".to_string()));
1105 assert!(keys.contains(&"key2".to_string()));
1106
1107 let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
1108 let stmt =
1109 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1110 .unwrap()
1111 .pop()
1112 .unwrap();
1113
1114 let Statement::AlterTable(alter_table) = stmt else {
1115 unreachable!()
1116 };
1117
1118 let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1120 let kind = expr.kind.unwrap();
1121
1122 let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1123 unreachable!()
1124 };
1125
1126 assert_eq!(1, add_columns.len());
1127 let ts_column = add_columns[0].column_def.clone().unwrap();
1128 let constraint = assert_ts_column(&ts_column);
1129 assert!(
1130 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1131 if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1132 );
1133
1134 let ctx = QueryContextBuilder::default()
1137 .timezone(Timezone::from_tz_string("+08:00").unwrap())
1138 .build()
1139 .into();
1140 let expr = to_alter_table_expr(alter_table, &ctx).unwrap();
1141 let kind = expr.kind.unwrap();
1142
1143 let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1144 unreachable!()
1145 };
1146
1147 assert_eq!(1, add_columns.len());
1148 let ts_column = add_columns[0].column_def.clone().unwrap();
1149 let constraint = assert_ts_column(&ts_column);
1150 assert!(
1151 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1152 if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1153 );
1154 }
1155
1156 #[test]
1157 fn test_to_alter_modify_column_type_expr() {
1158 let sql = "ALTER TABLE monitor MODIFY COLUMN mem_usage STRING;";
1159 let stmt =
1160 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1161 .unwrap()
1162 .pop()
1163 .unwrap();
1164
1165 let Statement::AlterTable(alter_table) = stmt else {
1166 unreachable!()
1167 };
1168
1169 let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1171 let kind = expr.kind.unwrap();
1172
1173 let AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
1174 modify_column_types,
1175 }) = kind
1176 else {
1177 unreachable!()
1178 };
1179
1180 assert_eq!(1, modify_column_types.len());
1181 let modify_column_type = &modify_column_types[0];
1182
1183 assert_eq!("mem_usage", modify_column_type.column_name);
1184 assert_eq!(
1185 ColumnDataType::String as i32,
1186 modify_column_type.target_type
1187 );
1188 assert!(modify_column_type.target_type_extension.is_none());
1189 }
1190
1191 fn new_test_table_names() -> Vec<TableName> {
1192 vec![
1193 TableName {
1194 catalog_name: "greptime".to_string(),
1195 schema_name: "public".to_string(),
1196 table_name: "a_table".to_string(),
1197 },
1198 TableName {
1199 catalog_name: "greptime".to_string(),
1200 schema_name: "public".to_string(),
1201 table_name: "b_table".to_string(),
1202 },
1203 ]
1204 }
1205
1206 #[test]
1207 fn test_to_create_view_expr() {
1208 let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
1209 let stmt =
1210 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1211 .unwrap()
1212 .pop()
1213 .unwrap();
1214
1215 let Statement::CreateView(stmt) = stmt else {
1216 unreachable!()
1217 };
1218
1219 let logical_plan = vec![1, 2, 3];
1220 let table_names = new_test_table_names();
1221 let columns = vec!["a".to_string()];
1222 let plan_columns = vec!["number".to_string()];
1223
1224 let expr = to_create_view_expr(
1225 stmt,
1226 logical_plan.clone(),
1227 table_names.clone(),
1228 columns.clone(),
1229 plan_columns.clone(),
1230 sql.to_string(),
1231 QueryContext::arc(),
1232 )
1233 .unwrap();
1234
1235 assert_eq!("greptime", expr.catalog_name);
1236 assert_eq!("public", expr.schema_name);
1237 assert_eq!("test", expr.view_name);
1238 assert!(!expr.create_if_not_exists);
1239 assert!(!expr.or_replace);
1240 assert_eq!(logical_plan, expr.logical_plan);
1241 assert_eq!(table_names, expr.table_names);
1242 assert_eq!(sql, expr.definition);
1243 assert_eq!(columns, expr.columns);
1244 assert_eq!(plan_columns, expr.plan_columns);
1245 }
1246
1247 #[test]
1248 fn test_to_create_view_expr_complex() {
1249 let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test.test_view AS SELECT * FROM NUMBERS";
1250 let stmt =
1251 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1252 .unwrap()
1253 .pop()
1254 .unwrap();
1255
1256 let Statement::CreateView(stmt) = stmt else {
1257 unreachable!()
1258 };
1259
1260 let logical_plan = vec![1, 2, 3];
1261 let table_names = new_test_table_names();
1262 let columns = vec!["a".to_string()];
1263 let plan_columns = vec!["number".to_string()];
1264
1265 let expr = to_create_view_expr(
1266 stmt,
1267 logical_plan.clone(),
1268 table_names.clone(),
1269 columns.clone(),
1270 plan_columns.clone(),
1271 sql.to_string(),
1272 QueryContext::arc(),
1273 )
1274 .unwrap();
1275
1276 assert_eq!("greptime", expr.catalog_name);
1277 assert_eq!("test", expr.schema_name);
1278 assert_eq!("test_view", expr.view_name);
1279 assert!(expr.create_if_not_exists);
1280 assert!(expr.or_replace);
1281 assert_eq!(logical_plan, expr.logical_plan);
1282 assert_eq!(table_names, expr.table_names);
1283 assert_eq!(sql, expr.definition);
1284 assert_eq!(columns, expr.columns);
1285 assert_eq!(plan_columns, expr.plan_columns);
1286 }
1287}