1#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19
20use api::helper::ColumnDataTypeWrapper;
21use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
22use api::v1::alter_table_expr::Kind as AlterTableKind;
23use api::v1::column_def::options_from_column_schema;
24use api::v1::{
25 set_index, unset_index, AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer,
26 ColumnDataType, ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
27 DropColumn, DropColumns, DropDefaults, ExpireAfter, FulltextBackend as PbFulltextBackend,
28 ModifyColumnType, ModifyColumnTypes, RenameTable, SemanticType, SetDatabaseOptions,
29 SetFulltext, SetIndex, SetInverted, SetSkipping, SetTableOptions,
30 SkippingIndexType as PbSkippingIndexType, TableName, UnsetDatabaseOptions, UnsetFulltext,
31 UnsetIndex, UnsetInverted, UnsetSkipping, UnsetTableOptions,
32};
33use common_error::ext::BoxedError;
34use common_grpc_expr::util::ColumnExpr;
35use common_time::Timezone;
36use datafusion::sql::planner::object_name_to_table_reference;
37use datatypes::schema::{
38 ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SkippingIndexType, COMMENT_KEY,
39};
40use file_engine::FileOptions;
41use query::sql::{
42 check_file_to_table_schema_compatibility, file_column_schemas_to_table,
43 infer_file_table_schema, prepare_file_table_files,
44};
45use session::context::QueryContextRef;
46use session::table_name::table_idents_to_full_name;
47use snafu::{ensure, OptionExt, ResultExt};
48use sql::ast::{ColumnOption, ObjectName};
49use sql::statements::alter::{
50 AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
51};
52use sql::statements::create::{
53 Column as SqlColumn, CreateExternalTable, CreateFlow, CreateTable, CreateView, TableConstraint,
54};
55use sql::statements::{
56 column_to_schema, sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type,
57};
58use sql::util::extract_tables_from_query;
59use table::requests::{TableOptions, FILE_TABLE_META_KEY};
60use table::table_reference::TableReference;
61#[cfg(feature = "enterprise")]
62pub use trigger::to_create_trigger_task_expr;
63
64use crate::error::{
65 BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
66 ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, FindNewColumnsOnInsertionSnafu,
67 IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidFlowNameSnafu, InvalidSqlSnafu,
68 NotSupportedSnafu, ParseSqlSnafu, PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu,
69 UnrecognizedTableOptionSnafu,
70};
71
72pub fn create_table_expr_by_column_schemas(
73 table_name: &TableReference<'_>,
74 column_schemas: &[api::v1::ColumnSchema],
75 engine: &str,
76 desc: Option<&str>,
77) -> Result<CreateTableExpr> {
78 let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
79 let expr = common_grpc_expr::util::build_create_table_expr(
80 None,
81 table_name,
82 column_exprs,
83 engine,
84 desc.unwrap_or("Created on insertion"),
85 )
86 .context(BuildCreateExprOnInsertionSnafu)?;
87
88 validate_create_expr(&expr)?;
89 Ok(expr)
90}
91
92pub(crate) fn extract_add_columns_expr(
93 schema: &Schema,
94 column_exprs: Vec<ColumnExpr>,
95) -> Result<Option<AddColumns>> {
96 let add_columns = common_grpc_expr::util::extract_new_columns(schema, column_exprs)
97 .context(FindNewColumnsOnInsertionSnafu)?;
98 if let Some(add_columns) = &add_columns {
99 validate_add_columns_expr(add_columns)?;
100 }
101 Ok(add_columns)
102}
103
104pub(crate) async fn create_external_expr(
128 create: CreateExternalTable,
129 query_ctx: &QueryContextRef,
130) -> Result<CreateTableExpr> {
131 let (catalog_name, schema_name, table_name) =
132 table_idents_to_full_name(&create.name, query_ctx)
133 .map_err(BoxedError::new)
134 .context(ExternalSnafu)?;
135
136 let mut table_options = create.options.into_map();
137
138 let (object_store, files) = prepare_file_table_files(&table_options)
139 .await
140 .context(PrepareFileTableSnafu)?;
141
142 let file_column_schemas = infer_file_table_schema(&object_store, &files, &table_options)
143 .await
144 .context(InferFileTableSchemaSnafu)?
145 .column_schemas;
146
147 let (time_index, primary_keys, table_column_schemas) = if !create.columns.is_empty() {
148 let time_index = find_time_index(&create.constraints)?;
150 let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
151 let column_schemas =
152 columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
153 (time_index, primary_keys, column_schemas)
154 } else {
155 let (column_schemas, time_index) = file_column_schemas_to_table(&file_column_schemas);
157 let primary_keys = vec![];
158 (time_index, primary_keys, column_schemas)
159 };
160
161 check_file_to_table_schema_compatibility(&file_column_schemas, &table_column_schemas)
162 .context(SchemaIncompatibleSnafu)?;
163
164 let meta = FileOptions {
165 files,
166 file_column_schemas,
167 };
168 table_options.insert(
169 FILE_TABLE_META_KEY.to_string(),
170 serde_json::to_string(&meta).context(EncodeJsonSnafu)?,
171 );
172
173 let column_defs = column_schemas_to_defs(table_column_schemas, &primary_keys)?;
174 let expr = CreateTableExpr {
175 catalog_name,
176 schema_name,
177 table_name,
178 desc: String::default(),
179 column_defs,
180 time_index,
181 primary_keys,
182 create_if_not_exists: create.if_not_exists,
183 table_options,
184 table_id: None,
185 engine: create.engine.to_string(),
186 };
187
188 Ok(expr)
189}
190
191pub fn create_to_expr(
193 create: &CreateTable,
194 query_ctx: &QueryContextRef,
195) -> Result<CreateTableExpr> {
196 let (catalog_name, schema_name, table_name) =
197 table_idents_to_full_name(&create.name, query_ctx)
198 .map_err(BoxedError::new)
199 .context(ExternalSnafu)?;
200
201 let time_index = find_time_index(&create.constraints)?;
202 let table_options = HashMap::from(
203 &TableOptions::try_from_iter(create.options.to_str_map())
204 .context(UnrecognizedTableOptionSnafu)?,
205 );
206
207 let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
208
209 let expr = CreateTableExpr {
210 catalog_name,
211 schema_name,
212 table_name,
213 desc: String::default(),
214 column_defs: columns_to_expr(
215 &create.columns,
216 &time_index,
217 &primary_keys,
218 Some(&query_ctx.timezone()),
219 )?,
220 time_index,
221 primary_keys,
222 create_if_not_exists: create.if_not_exists,
223 table_options,
224 table_id: None,
225 engine: create.engine.to_string(),
226 };
227
228 validate_create_expr(&expr)?;
229 Ok(expr)
230}
231
232pub fn validate_create_expr(create: &CreateTableExpr) -> Result<()> {
234 let mut column_to_indices = HashMap::with_capacity(create.column_defs.len());
236 for (idx, column) in create.column_defs.iter().enumerate() {
237 if let Some(indices) = column_to_indices.get(&column.name) {
238 return InvalidSqlSnafu {
239 err_msg: format!(
240 "column name `{}` is duplicated at index {} and {}",
241 column.name, indices, idx
242 ),
243 }
244 .fail();
245 }
246 column_to_indices.insert(&column.name, idx);
247 }
248
249 let _ = column_to_indices
251 .get(&create.time_index)
252 .with_context(|| InvalidSqlSnafu {
253 err_msg: format!(
254 "column name `{}` is not found in column list",
255 create.time_index
256 ),
257 })?;
258
259 for pk in &create.primary_keys {
261 let _ = column_to_indices
262 .get(&pk)
263 .with_context(|| InvalidSqlSnafu {
264 err_msg: format!("column name `{}` is not found in column list", pk),
265 })?;
266 }
267
268 let mut pk_set = HashSet::new();
270 for pk in &create.primary_keys {
271 if !pk_set.insert(pk) {
272 return InvalidSqlSnafu {
273 err_msg: format!("column name `{}` is duplicated in primary keys", pk),
274 }
275 .fail();
276 }
277 }
278
279 if pk_set.contains(&create.time_index) {
281 return InvalidSqlSnafu {
282 err_msg: format!(
283 "column name `{}` is both primary key and time index",
284 create.time_index
285 ),
286 }
287 .fail();
288 }
289
290 for column in &create.column_defs {
291 if is_interval_type(&column.data_type()) {
293 return InvalidSqlSnafu {
294 err_msg: format!(
295 "column name `{}` is interval type, which is not supported",
296 column.name
297 ),
298 }
299 .fail();
300 }
301 if is_date_time_type(&column.data_type()) {
303 return InvalidSqlSnafu {
304 err_msg: format!(
305 "column name `{}` is datetime type, which is not supported, please use `timestamp` type instead",
306 column.name
307 ),
308 }
309 .fail();
310 }
311 }
312 Ok(())
313}
314
315fn validate_add_columns_expr(add_columns: &AddColumns) -> Result<()> {
316 for add_column in &add_columns.add_columns {
317 let Some(column_def) = &add_column.column_def else {
318 continue;
319 };
320 if is_date_time_type(&column_def.data_type()) {
321 return InvalidSqlSnafu {
322 err_msg: format!("column name `{}` is datetime type, which is not supported, please use `timestamp` type instead", column_def.name),
323 }
324 .fail();
325 }
326 if is_interval_type(&column_def.data_type()) {
327 return InvalidSqlSnafu {
328 err_msg: format!(
329 "column name `{}` is interval type, which is not supported",
330 column_def.name
331 ),
332 }
333 .fail();
334 }
335 }
336 Ok(())
337}
338
339fn is_date_time_type(data_type: &ColumnDataType) -> bool {
340 matches!(data_type, ColumnDataType::Datetime)
341}
342
343fn is_interval_type(data_type: &ColumnDataType) -> bool {
344 matches!(
345 data_type,
346 ColumnDataType::IntervalYearMonth
347 | ColumnDataType::IntervalDayTime
348 | ColumnDataType::IntervalMonthDayNano
349 )
350}
351
352fn find_primary_keys(
353 columns: &[SqlColumn],
354 constraints: &[TableConstraint],
355) -> Result<Vec<String>> {
356 let columns_pk = columns
357 .iter()
358 .filter_map(|x| {
359 if x.options().iter().any(|o| {
360 matches!(
361 o.option,
362 ColumnOption::Unique {
363 is_primary: true,
364 ..
365 }
366 )
367 }) {
368 Some(x.name().value.clone())
369 } else {
370 None
371 }
372 })
373 .collect::<Vec<String>>();
374
375 ensure!(
376 columns_pk.len() <= 1,
377 IllegalPrimaryKeysDefSnafu {
378 msg: "not allowed to inline multiple primary keys in columns options"
379 }
380 );
381
382 let constraints_pk = constraints
383 .iter()
384 .filter_map(|constraint| match constraint {
385 TableConstraint::PrimaryKey { columns, .. } => {
386 Some(columns.iter().map(|ident| ident.value.clone()))
387 }
388 _ => None,
389 })
390 .flatten()
391 .collect::<Vec<String>>();
392
393 ensure!(
394 columns_pk.is_empty() || constraints_pk.is_empty(),
395 IllegalPrimaryKeysDefSnafu {
396 msg: "found definitions of primary keys in multiple places"
397 }
398 );
399
400 let mut primary_keys = Vec::with_capacity(columns_pk.len() + constraints_pk.len());
401 primary_keys.extend(columns_pk);
402 primary_keys.extend(constraints_pk);
403 Ok(primary_keys)
404}
405
406pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
407 let time_index = constraints
408 .iter()
409 .filter_map(|constraint| match constraint {
410 TableConstraint::TimeIndex { column, .. } => Some(&column.value),
411 _ => None,
412 })
413 .collect::<Vec<&String>>();
414 ensure!(
415 time_index.len() == 1,
416 InvalidSqlSnafu {
417 err_msg: "must have one and only one TimeIndex columns",
418 }
419 );
420 Ok(time_index.first().unwrap().to_string())
421}
422
423fn columns_to_expr(
424 column_defs: &[SqlColumn],
425 time_index: &str,
426 primary_keys: &[String],
427 timezone: Option<&Timezone>,
428) -> Result<Vec<api::v1::ColumnDef>> {
429 let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
430 column_schemas_to_defs(column_schemas, primary_keys)
431}
432
433fn columns_to_column_schemas(
434 columns: &[SqlColumn],
435 time_index: &str,
436 timezone: Option<&Timezone>,
437) -> Result<Vec<ColumnSchema>> {
438 columns
439 .iter()
440 .map(|c| column_to_schema(c, time_index, timezone).context(ParseSqlSnafu))
441 .collect::<Result<Vec<ColumnSchema>>>()
442}
443
444pub fn column_schemas_to_defs(
445 column_schemas: Vec<ColumnSchema>,
446 primary_keys: &[String],
447) -> Result<Vec<api::v1::ColumnDef>> {
448 let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
449 .iter()
450 .map(|c| {
451 ColumnDataTypeWrapper::try_from(c.data_type.clone())
452 .map(|w| w.to_parts())
453 .context(ColumnDataTypeSnafu)
454 })
455 .collect::<Result<Vec<_>>>()?;
456
457 column_schemas
458 .iter()
459 .zip(column_datatypes)
460 .map(|(schema, datatype)| {
461 let semantic_type = if schema.is_time_index() {
462 SemanticType::Timestamp
463 } else if primary_keys.contains(&schema.name) {
464 SemanticType::Tag
465 } else {
466 SemanticType::Field
467 } as i32;
468 let comment = schema
469 .metadata()
470 .get(COMMENT_KEY)
471 .cloned()
472 .unwrap_or_default();
473
474 Ok(api::v1::ColumnDef {
475 name: schema.name.clone(),
476 data_type: datatype.0 as i32,
477 is_nullable: schema.is_nullable(),
478 default_constraint: match schema.default_constraint() {
479 None => vec![],
480 Some(v) => {
481 v.clone()
482 .try_into()
483 .context(ConvertColumnDefaultConstraintSnafu {
484 column_name: &schema.name,
485 })?
486 }
487 },
488 semantic_type,
489 comment,
490 datatype_extension: datatype.1,
491 options: options_from_column_schema(schema),
492 })
493 })
494 .collect()
495}
496
497pub(crate) fn to_alter_table_expr(
499 alter_table: AlterTable,
500 query_ctx: &QueryContextRef,
501) -> Result<AlterTableExpr> {
502 let (catalog_name, schema_name, table_name) =
503 table_idents_to_full_name(alter_table.table_name(), query_ctx)
504 .map_err(BoxedError::new)
505 .context(ExternalSnafu)?;
506
507 let kind = match alter_table.alter_operation {
508 AlterTableOperation::AddConstraint(_) => {
509 return NotSupportedSnafu {
510 feat: "ADD CONSTRAINT",
511 }
512 .fail();
513 }
514 AlterTableOperation::AddColumns { add_columns } => AlterTableKind::AddColumns(AddColumns {
515 add_columns: add_columns
516 .into_iter()
517 .map(|add_column| {
518 let column_def = sql_column_def_to_grpc_column_def(
519 &add_column.column_def,
520 Some(&query_ctx.timezone()),
521 )
522 .map_err(BoxedError::new)
523 .context(ExternalSnafu)?;
524 if is_interval_type(&column_def.data_type()) {
525 return NotSupportedSnafu {
526 feat: "Add column with interval type",
527 }
528 .fail();
529 }
530 Ok(AddColumn {
531 column_def: Some(column_def),
532 location: add_column.location.as_ref().map(From::from),
533 add_if_not_exists: add_column.add_if_not_exists,
534 })
535 })
536 .collect::<Result<Vec<AddColumn>>>()?,
537 }),
538 AlterTableOperation::ModifyColumnType {
539 column_name,
540 target_type,
541 } => {
542 let target_type =
543 sql_data_type_to_concrete_data_type(&target_type).context(ParseSqlSnafu)?;
544 let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
545 .map(|w| w.to_parts())
546 .context(ColumnDataTypeSnafu)?;
547 if is_interval_type(&target_type) {
548 return NotSupportedSnafu {
549 feat: "Modify column type to interval type",
550 }
551 .fail();
552 }
553 AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
554 modify_column_types: vec![ModifyColumnType {
555 column_name: column_name.value,
556 target_type: target_type as i32,
557 target_type_extension,
558 }],
559 })
560 }
561 AlterTableOperation::DropColumn { name } => AlterTableKind::DropColumns(DropColumns {
562 drop_columns: vec![DropColumn {
563 name: name.value.to_string(),
564 }],
565 }),
566 AlterTableOperation::RenameTable { new_table_name } => {
567 AlterTableKind::RenameTable(RenameTable {
568 new_table_name: new_table_name.to_string(),
569 })
570 }
571 AlterTableOperation::SetTableOptions { options } => {
572 AlterTableKind::SetTableOptions(SetTableOptions {
573 table_options: options.into_iter().map(Into::into).collect(),
574 })
575 }
576 AlterTableOperation::UnsetTableOptions { keys } => {
577 AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys })
578 }
579 AlterTableOperation::SetIndex { options } => AlterTableKind::SetIndex(match options {
580 sql::statements::alter::SetIndexOperation::Fulltext {
581 column_name,
582 options,
583 } => SetIndex {
584 options: Some(set_index::Options::Fulltext(SetFulltext {
585 column_name: column_name.value,
586 enable: options.enable,
587 analyzer: match options.analyzer {
588 FulltextAnalyzer::English => Analyzer::English.into(),
589 FulltextAnalyzer::Chinese => Analyzer::Chinese.into(),
590 },
591 case_sensitive: options.case_sensitive,
592 backend: match options.backend {
593 FulltextBackend::Bloom => PbFulltextBackend::Bloom.into(),
594 FulltextBackend::Tantivy => PbFulltextBackend::Tantivy.into(),
595 },
596 })),
597 },
598 sql::statements::alter::SetIndexOperation::Inverted { column_name } => SetIndex {
599 options: Some(set_index::Options::Inverted(SetInverted {
600 column_name: column_name.value,
601 })),
602 },
603 sql::statements::alter::SetIndexOperation::Skipping {
604 column_name,
605 options,
606 } => SetIndex {
607 options: Some(set_index::Options::Skipping(SetSkipping {
608 column_name: column_name.value,
609 enable: true,
610 granularity: options.granularity as u64,
611 skipping_index_type: match options.index_type {
612 SkippingIndexType::BloomFilter => PbSkippingIndexType::BloomFilter.into(),
613 },
614 })),
615 },
616 }),
617 AlterTableOperation::UnsetIndex { options } => AlterTableKind::UnsetIndex(match options {
618 sql::statements::alter::UnsetIndexOperation::Fulltext { column_name } => UnsetIndex {
619 options: Some(unset_index::Options::Fulltext(UnsetFulltext {
620 column_name: column_name.value,
621 })),
622 },
623 sql::statements::alter::UnsetIndexOperation::Inverted { column_name } => UnsetIndex {
624 options: Some(unset_index::Options::Inverted(UnsetInverted {
625 column_name: column_name.value,
626 })),
627 },
628 sql::statements::alter::UnsetIndexOperation::Skipping { column_name } => UnsetIndex {
629 options: Some(unset_index::Options::Skipping(UnsetSkipping {
630 column_name: column_name.value,
631 })),
632 },
633 }),
634 AlterTableOperation::DropDefaults { columns } => {
635 AlterTableKind::DropDefaults(DropDefaults {
636 drop_defaults: columns
637 .into_iter()
638 .map(|col| {
639 let column_name = col.0.to_string();
640 Ok(api::v1::DropDefault { column_name })
641 })
642 .collect::<Result<Vec<_>>>()?,
643 })
644 }
645 };
646
647 Ok(AlterTableExpr {
648 catalog_name,
649 schema_name,
650 table_name,
651 kind: Some(kind),
652 })
653}
654
655pub fn to_alter_database_expr(
657 alter_database: AlterDatabase,
658 query_ctx: &QueryContextRef,
659) -> Result<AlterDatabaseExpr> {
660 let catalog = query_ctx.current_catalog();
661 let schema = alter_database.database_name;
662
663 let kind = match alter_database.alter_operation {
664 AlterDatabaseOperation::SetDatabaseOption { options } => {
665 let options = options.into_iter().map(Into::into).collect();
666 AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
667 set_database_options: options,
668 })
669 }
670 AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
671 AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys })
672 }
673 };
674
675 Ok(AlterDatabaseExpr {
676 catalog_name: catalog.to_string(),
677 schema_name: schema.to_string(),
678 kind: Some(kind),
679 })
680}
681
682pub fn to_create_view_expr(
684 stmt: CreateView,
685 logical_plan: Vec<u8>,
686 table_names: Vec<TableName>,
687 columns: Vec<String>,
688 plan_columns: Vec<String>,
689 definition: String,
690 query_ctx: QueryContextRef,
691) -> Result<CreateViewExpr> {
692 let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx)
693 .map_err(BoxedError::new)
694 .context(ExternalSnafu)?;
695
696 let expr = CreateViewExpr {
697 catalog_name,
698 schema_name,
699 view_name,
700 logical_plan,
701 create_if_not_exists: stmt.if_not_exists,
702 or_replace: stmt.or_replace,
703 table_names,
704 columns,
705 plan_columns,
706 definition,
707 };
708
709 Ok(expr)
710}
711
712pub fn to_create_flow_task_expr(
713 create_flow: CreateFlow,
714 query_ctx: &QueryContextRef,
715) -> Result<CreateFlowExpr> {
716 let sink_table_ref =
718 object_name_to_table_reference(create_flow.sink_table_name.clone().into(), true)
719 .with_context(|_| ConvertIdentifierSnafu {
720 ident: create_flow.sink_table_name.to_string(),
721 })?;
722 let catalog = sink_table_ref
723 .catalog()
724 .unwrap_or(query_ctx.current_catalog())
725 .to_string();
726 let schema = sink_table_ref
727 .schema()
728 .map(|s| s.to_owned())
729 .unwrap_or(query_ctx.current_schema());
730
731 let sink_table_name = TableName {
732 catalog_name: catalog,
733 schema_name: schema,
734 table_name: sink_table_ref.table().to_string(),
735 };
736
737 let source_table_names = extract_tables_from_query(&create_flow.query)
738 .map(|name| {
739 let reference = object_name_to_table_reference(name.clone().into(), true)
740 .with_context(|_| ConvertIdentifierSnafu {
741 ident: name.to_string(),
742 })?;
743 let catalog = reference
744 .catalog()
745 .unwrap_or(query_ctx.current_catalog())
746 .to_string();
747 let schema = reference
748 .schema()
749 .map(|s| s.to_string())
750 .unwrap_or(query_ctx.current_schema());
751
752 let table_name = TableName {
753 catalog_name: catalog,
754 schema_name: schema,
755 table_name: reference.table().to_string(),
756 };
757 Ok(table_name)
758 })
759 .collect::<Result<Vec<_>>>()?;
760
761 Ok(CreateFlowExpr {
762 catalog_name: query_ctx.current_catalog().to_string(),
763 flow_name: sanitize_flow_name(create_flow.flow_name)?,
764 source_table_names,
765 sink_table_name: Some(sink_table_name),
766 or_replace: create_flow.or_replace,
767 create_if_not_exists: create_flow.if_not_exists,
768 expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
769 comment: create_flow.comment.unwrap_or_default(),
770 sql: create_flow.query.to_string(),
771 flow_options: HashMap::new(),
772 })
773}
774
775fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
777 ensure!(
778 flow_name.0.len() == 1,
779 InvalidFlowNameSnafu {
780 name: flow_name.to_string(),
781 }
782 );
783 Ok(flow_name.0.swap_remove(0).value)
785}
786
787#[cfg(test)]
788mod tests {
789 use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
790 use datatypes::value::Value;
791 use session::context::{QueryContext, QueryContextBuilder};
792 use sql::dialect::GreptimeDbDialect;
793 use sql::parser::{ParseOptions, ParserContext};
794 use sql::statements::statement::Statement;
795 use store_api::storage::ColumnDefaultConstraint;
796
797 use super::*;
798
799 #[test]
800 fn test_create_flow_tql_expr() {
801 let sql = r#"
802CREATE FLOW calc_reqs SINK TO cnt_reqs AS
803TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#;
804 let stmt =
805 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
806 .unwrap()
807 .pop()
808 .unwrap();
809
810 let Statement::CreateFlow(create_flow) = stmt else {
811 unreachable!()
812 };
813 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
814
815 let to_dot_sep =
816 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
817 assert_eq!("calc_reqs", expr.flow_name);
818 assert_eq!("greptime", expr.catalog_name);
819 assert_eq!(
820 "greptime.public.cnt_reqs",
821 expr.sink_table_name.map(to_dot_sep).unwrap()
822 );
823 assert!(expr.source_table_names.is_empty());
824 assert_eq!(
825 r#"TQL EVAL (0, 15, '5s') count_values("status_code", http_requests)"#,
826 expr.sql
827 );
828 }
829
830 #[test]
831 fn test_create_flow_expr() {
832 let sql = r"
833CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
834SELECT
835 DISTINCT number as dis
836FROM
837 distinct_basic;";
838 let stmt =
839 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
840 .unwrap()
841 .pop()
842 .unwrap();
843
844 let Statement::CreateFlow(create_flow) = stmt else {
845 unreachable!()
846 };
847 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
848
849 let to_dot_sep =
850 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
851 assert_eq!("test_distinct_basic", expr.flow_name);
852 assert_eq!("greptime", expr.catalog_name);
853 assert_eq!(
854 "greptime.public.out_distinct_basic",
855 expr.sink_table_name.map(to_dot_sep).unwrap()
856 );
857 assert_eq!(1, expr.source_table_names.len());
858 assert_eq!(
859 "greptime.public.distinct_basic",
860 to_dot_sep(expr.source_table_names[0].clone())
861 );
862 assert_eq!(
863 r"SELECT
864 DISTINCT number as dis
865FROM
866 distinct_basic",
867 expr.sql
868 );
869
870 let sql = r"
871CREATE FLOW `task_2`
872SINK TO schema_1.table_1
873AS
874SELECT max(c1), min(c2) FROM schema_2.table_2;";
875 let stmt =
876 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
877 .unwrap()
878 .pop()
879 .unwrap();
880
881 let Statement::CreateFlow(create_flow) = stmt else {
882 unreachable!()
883 };
884 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
885
886 let to_dot_sep =
887 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
888 assert_eq!("task_2", expr.flow_name);
889 assert_eq!("greptime", expr.catalog_name);
890 assert_eq!(
891 "greptime.schema_1.table_1",
892 expr.sink_table_name.map(to_dot_sep).unwrap()
893 );
894 assert_eq!(1, expr.source_table_names.len());
895 assert_eq!(
896 "greptime.schema_2.table_2",
897 to_dot_sep(expr.source_table_names[0].clone())
898 );
899 assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
900
901 let sql = r"
902CREATE FLOW abc.`task_2`
903SINK TO schema_1.table_1
904AS
905SELECT max(c1), min(c2) FROM schema_2.table_2;";
906 let stmt =
907 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
908 .unwrap()
909 .pop()
910 .unwrap();
911
912 let Statement::CreateFlow(create_flow) = stmt else {
913 unreachable!()
914 };
915 let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
916
917 assert!(res.is_err());
918 assert!(res
919 .unwrap_err()
920 .to_string()
921 .contains("Invalid flow name: abc.`task_2`"));
922 }
923
924 #[test]
925 fn test_create_to_expr() {
926 let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(ttl='3days', write_buffer_size='1024KB');";
927 let stmt =
928 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
929 .unwrap()
930 .pop()
931 .unwrap();
932
933 let Statement::CreateTable(create_table) = stmt else {
934 unreachable!()
935 };
936 let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
937 assert_eq!("3days", expr.table_options.get("ttl").unwrap());
938 assert_eq!(
939 "1.0MiB",
940 expr.table_options.get("write_buffer_size").unwrap()
941 );
942 }
943
944 #[test]
945 fn test_invalid_create_to_expr() {
946 let cases = [
947 "CREATE TABLE monitor (host STRING primary key, ts TIMESTAMP TIME INDEX, some_column text, some_column string);",
949 "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, some_column STRING, PRIMARY KEY (some_column, host, some_column));",
951 "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, PRIMARY KEY (host, ts));"
953 ];
954
955 for sql in cases {
956 let stmt = ParserContext::create_with_dialect(
957 sql,
958 &GreptimeDbDialect {},
959 ParseOptions::default(),
960 )
961 .unwrap()
962 .pop()
963 .unwrap();
964 let Statement::CreateTable(create_table) = stmt else {
965 unreachable!()
966 };
967 create_to_expr(&create_table, &QueryContext::arc()).unwrap_err();
968 }
969 }
970
971 #[test]
972 fn test_create_to_expr_with_default_timestamp_value() {
973 let sql = "CREATE TABLE monitor (v double,ts TIMESTAMP default '2024-01-30T00:01:01',TIME INDEX (ts)) engine=mito;";
974 let stmt =
975 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
976 .unwrap()
977 .pop()
978 .unwrap();
979
980 let Statement::CreateTable(create_table) = stmt else {
981 unreachable!()
982 };
983
984 let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
986 let ts_column = &expr.column_defs[1];
987 let constraint = assert_ts_column(ts_column);
988 assert!(
989 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
990 if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
991 );
992
993 let ctx = QueryContextBuilder::default()
995 .timezone(Timezone::from_tz_string("+08:00").unwrap())
996 .build()
997 .into();
998 let expr = create_to_expr(&create_table, &ctx).unwrap();
999 let ts_column = &expr.column_defs[1];
1000 let constraint = assert_ts_column(ts_column);
1001 assert!(
1002 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1003 if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1004 );
1005 }
1006
1007 fn assert_ts_column(ts_column: &api::v1::ColumnDef) -> ColumnDefaultConstraint {
1008 assert_eq!("ts", ts_column.name);
1009 assert_eq!(
1010 ColumnDataType::TimestampMillisecond as i32,
1011 ts_column.data_type
1012 );
1013 assert!(!ts_column.default_constraint.is_empty());
1014
1015 ColumnDefaultConstraint::try_from(&ts_column.default_constraint[..]).unwrap()
1016 }
1017
1018 #[test]
1019 fn test_to_alter_expr() {
1020 let sql = "ALTER DATABASE greptime SET key1='value1', key2='value2';";
1021 let stmt =
1022 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1023 .unwrap()
1024 .pop()
1025 .unwrap();
1026
1027 let Statement::AlterDatabase(alter_database) = stmt else {
1028 unreachable!()
1029 };
1030
1031 let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1032 let kind = expr.kind.unwrap();
1033
1034 let AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
1035 set_database_options,
1036 }) = kind
1037 else {
1038 unreachable!()
1039 };
1040
1041 assert_eq!(2, set_database_options.len());
1042 assert_eq!("key1", set_database_options[0].key);
1043 assert_eq!("value1", set_database_options[0].value);
1044 assert_eq!("key2", set_database_options[1].key);
1045 assert_eq!("value2", set_database_options[1].value);
1046
1047 let sql = "ALTER DATABASE greptime UNSET key1, key2;";
1048 let stmt =
1049 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1050 .unwrap()
1051 .pop()
1052 .unwrap();
1053
1054 let Statement::AlterDatabase(alter_database) = stmt else {
1055 unreachable!()
1056 };
1057
1058 let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1059 let kind = expr.kind.unwrap();
1060
1061 let AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys }) = kind else {
1062 unreachable!()
1063 };
1064
1065 assert_eq!(2, keys.len());
1066 assert!(keys.contains(&"key1".to_string()));
1067 assert!(keys.contains(&"key2".to_string()));
1068
1069 let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
1070 let stmt =
1071 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1072 .unwrap()
1073 .pop()
1074 .unwrap();
1075
1076 let Statement::AlterTable(alter_table) = stmt else {
1077 unreachable!()
1078 };
1079
1080 let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1082 let kind = expr.kind.unwrap();
1083
1084 let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1085 unreachable!()
1086 };
1087
1088 assert_eq!(1, add_columns.len());
1089 let ts_column = add_columns[0].column_def.clone().unwrap();
1090 let constraint = assert_ts_column(&ts_column);
1091 assert!(
1092 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1093 if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1094 );
1095
1096 let ctx = QueryContextBuilder::default()
1099 .timezone(Timezone::from_tz_string("+08:00").unwrap())
1100 .build()
1101 .into();
1102 let expr = to_alter_table_expr(alter_table, &ctx).unwrap();
1103 let kind = expr.kind.unwrap();
1104
1105 let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1106 unreachable!()
1107 };
1108
1109 assert_eq!(1, add_columns.len());
1110 let ts_column = add_columns[0].column_def.clone().unwrap();
1111 let constraint = assert_ts_column(&ts_column);
1112 assert!(
1113 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1114 if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1115 );
1116 }
1117
1118 #[test]
1119 fn test_to_alter_modify_column_type_expr() {
1120 let sql = "ALTER TABLE monitor MODIFY COLUMN mem_usage STRING;";
1121 let stmt =
1122 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1123 .unwrap()
1124 .pop()
1125 .unwrap();
1126
1127 let Statement::AlterTable(alter_table) = stmt else {
1128 unreachable!()
1129 };
1130
1131 let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1133 let kind = expr.kind.unwrap();
1134
1135 let AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
1136 modify_column_types,
1137 }) = kind
1138 else {
1139 unreachable!()
1140 };
1141
1142 assert_eq!(1, modify_column_types.len());
1143 let modify_column_type = &modify_column_types[0];
1144
1145 assert_eq!("mem_usage", modify_column_type.column_name);
1146 assert_eq!(
1147 ColumnDataType::String as i32,
1148 modify_column_type.target_type
1149 );
1150 assert!(modify_column_type.target_type_extension.is_none());
1151 }
1152
1153 fn new_test_table_names() -> Vec<TableName> {
1154 vec![
1155 TableName {
1156 catalog_name: "greptime".to_string(),
1157 schema_name: "public".to_string(),
1158 table_name: "a_table".to_string(),
1159 },
1160 TableName {
1161 catalog_name: "greptime".to_string(),
1162 schema_name: "public".to_string(),
1163 table_name: "b_table".to_string(),
1164 },
1165 ]
1166 }
1167
1168 #[test]
1169 fn test_to_create_view_expr() {
1170 let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
1171 let stmt =
1172 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1173 .unwrap()
1174 .pop()
1175 .unwrap();
1176
1177 let Statement::CreateView(stmt) = stmt else {
1178 unreachable!()
1179 };
1180
1181 let logical_plan = vec![1, 2, 3];
1182 let table_names = new_test_table_names();
1183 let columns = vec!["a".to_string()];
1184 let plan_columns = vec!["number".to_string()];
1185
1186 let expr = to_create_view_expr(
1187 stmt,
1188 logical_plan.clone(),
1189 table_names.clone(),
1190 columns.clone(),
1191 plan_columns.clone(),
1192 sql.to_string(),
1193 QueryContext::arc(),
1194 )
1195 .unwrap();
1196
1197 assert_eq!("greptime", expr.catalog_name);
1198 assert_eq!("public", expr.schema_name);
1199 assert_eq!("test", expr.view_name);
1200 assert!(!expr.create_if_not_exists);
1201 assert!(!expr.or_replace);
1202 assert_eq!(logical_plan, expr.logical_plan);
1203 assert_eq!(table_names, expr.table_names);
1204 assert_eq!(sql, expr.definition);
1205 assert_eq!(columns, expr.columns);
1206 assert_eq!(plan_columns, expr.plan_columns);
1207 }
1208
1209 #[test]
1210 fn test_to_create_view_expr_complex() {
1211 let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test.test_view AS SELECT * FROM NUMBERS";
1212 let stmt =
1213 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1214 .unwrap()
1215 .pop()
1216 .unwrap();
1217
1218 let Statement::CreateView(stmt) = stmt else {
1219 unreachable!()
1220 };
1221
1222 let logical_plan = vec![1, 2, 3];
1223 let table_names = new_test_table_names();
1224 let columns = vec!["a".to_string()];
1225 let plan_columns = vec!["number".to_string()];
1226
1227 let expr = to_create_view_expr(
1228 stmt,
1229 logical_plan.clone(),
1230 table_names.clone(),
1231 columns.clone(),
1232 plan_columns.clone(),
1233 sql.to_string(),
1234 QueryContext::arc(),
1235 )
1236 .unwrap();
1237
1238 assert_eq!("greptime", expr.catalog_name);
1239 assert_eq!("test", expr.schema_name);
1240 assert_eq!("test_view", expr.view_name);
1241 assert!(expr.create_if_not_exists);
1242 assert!(expr.or_replace);
1243 assert_eq!(logical_plan, expr.logical_plan);
1244 assert_eq!(table_names, expr.table_names);
1245 assert_eq!(sql, expr.definition);
1246 assert_eq!(columns, expr.columns);
1247 assert_eq!(plan_columns, expr.plan_columns);
1248 }
1249}