1use std::collections::{HashMap, HashSet};
16
17use api::helper::ColumnDataTypeWrapper;
18use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
19use api::v1::alter_table_expr::Kind as AlterTableKind;
20use api::v1::column_def::options_from_column_schema;
21use api::v1::{
22 set_index, unset_index, AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer,
23 ColumnDataType, ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
24 DropColumn, DropColumns, ExpireAfter, FulltextBackend as PbFulltextBackend, ModifyColumnType,
25 ModifyColumnTypes, RenameTable, SemanticType, SetDatabaseOptions, SetFulltext, SetIndex,
26 SetInverted, SetSkipping, SetTableOptions, SkippingIndexType as PbSkippingIndexType, TableName,
27 UnsetDatabaseOptions, UnsetFulltext, UnsetIndex, UnsetInverted, UnsetSkipping,
28 UnsetTableOptions,
29};
30use common_error::ext::BoxedError;
31use common_grpc_expr::util::ColumnExpr;
32use common_time::Timezone;
33use datafusion::sql::planner::object_name_to_table_reference;
34use datatypes::schema::{
35 ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SkippingIndexType, COMMENT_KEY,
36};
37use file_engine::FileOptions;
38use query::sql::{
39 check_file_to_table_schema_compatibility, file_column_schemas_to_table,
40 infer_file_table_schema, prepare_file_table_files,
41};
42use session::context::QueryContextRef;
43use session::table_name::table_idents_to_full_name;
44use snafu::{ensure, OptionExt, ResultExt};
45use sql::ast::{ColumnOption, ObjectName};
46use sql::statements::alter::{
47 AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
48};
49use sql::statements::create::{
50 Column as SqlColumn, CreateExternalTable, CreateFlow, CreateTable, CreateView, TableConstraint,
51};
52use sql::statements::{
53 column_to_schema, sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type,
54};
55use sql::util::extract_tables_from_query;
56use table::requests::{TableOptions, FILE_TABLE_META_KEY};
57use table::table_reference::TableReference;
58
59use crate::error::{
60 BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
61 ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, FindNewColumnsOnInsertionSnafu,
62 IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidFlowNameSnafu, InvalidSqlSnafu,
63 NotSupportedSnafu, ParseSqlSnafu, PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu,
64 UnrecognizedTableOptionSnafu,
65};
66
67pub fn create_table_expr_by_column_schemas(
68 table_name: &TableReference<'_>,
69 column_schemas: &[api::v1::ColumnSchema],
70 engine: &str,
71 desc: Option<&str>,
72) -> Result<CreateTableExpr> {
73 let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
74 let expr = common_grpc_expr::util::build_create_table_expr(
75 None,
76 table_name,
77 column_exprs,
78 engine,
79 desc.unwrap_or("Created on insertion"),
80 )
81 .context(BuildCreateExprOnInsertionSnafu)?;
82
83 validate_create_expr(&expr)?;
84 Ok(expr)
85}
86
87pub(crate) fn extract_add_columns_expr(
88 schema: &Schema,
89 column_exprs: Vec<ColumnExpr>,
90) -> Result<Option<AddColumns>> {
91 let add_columns = common_grpc_expr::util::extract_new_columns(schema, column_exprs)
92 .context(FindNewColumnsOnInsertionSnafu)?;
93 if let Some(add_columns) = &add_columns {
94 validate_add_columns_expr(add_columns)?;
95 }
96 Ok(add_columns)
97}
98
99pub(crate) async fn create_external_expr(
123 create: CreateExternalTable,
124 query_ctx: &QueryContextRef,
125) -> Result<CreateTableExpr> {
126 let (catalog_name, schema_name, table_name) =
127 table_idents_to_full_name(&create.name, query_ctx)
128 .map_err(BoxedError::new)
129 .context(ExternalSnafu)?;
130
131 let mut table_options = create.options.into_map();
132
133 let (object_store, files) = prepare_file_table_files(&table_options)
134 .await
135 .context(PrepareFileTableSnafu)?;
136
137 let file_column_schemas = infer_file_table_schema(&object_store, &files, &table_options)
138 .await
139 .context(InferFileTableSchemaSnafu)?
140 .column_schemas;
141
142 let (time_index, primary_keys, table_column_schemas) = if !create.columns.is_empty() {
143 let time_index = find_time_index(&create.constraints)?;
145 let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
146 let column_schemas =
147 columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
148 (time_index, primary_keys, column_schemas)
149 } else {
150 let (column_schemas, time_index) = file_column_schemas_to_table(&file_column_schemas);
152 let primary_keys = vec![];
153 (time_index, primary_keys, column_schemas)
154 };
155
156 check_file_to_table_schema_compatibility(&file_column_schemas, &table_column_schemas)
157 .context(SchemaIncompatibleSnafu)?;
158
159 let meta = FileOptions {
160 files,
161 file_column_schemas,
162 };
163 table_options.insert(
164 FILE_TABLE_META_KEY.to_string(),
165 serde_json::to_string(&meta).context(EncodeJsonSnafu)?,
166 );
167
168 let column_defs = column_schemas_to_defs(table_column_schemas, &primary_keys)?;
169 let expr = CreateTableExpr {
170 catalog_name,
171 schema_name,
172 table_name,
173 desc: String::default(),
174 column_defs,
175 time_index,
176 primary_keys,
177 create_if_not_exists: create.if_not_exists,
178 table_options,
179 table_id: None,
180 engine: create.engine.to_string(),
181 };
182
183 Ok(expr)
184}
185
186pub fn create_to_expr(
188 create: &CreateTable,
189 query_ctx: &QueryContextRef,
190) -> Result<CreateTableExpr> {
191 let (catalog_name, schema_name, table_name) =
192 table_idents_to_full_name(&create.name, query_ctx)
193 .map_err(BoxedError::new)
194 .context(ExternalSnafu)?;
195
196 let time_index = find_time_index(&create.constraints)?;
197 let table_options = HashMap::from(
198 &TableOptions::try_from_iter(create.options.to_str_map())
199 .context(UnrecognizedTableOptionSnafu)?,
200 );
201
202 let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
203
204 let expr = CreateTableExpr {
205 catalog_name,
206 schema_name,
207 table_name,
208 desc: String::default(),
209 column_defs: columns_to_expr(
210 &create.columns,
211 &time_index,
212 &primary_keys,
213 Some(&query_ctx.timezone()),
214 )?,
215 time_index,
216 primary_keys,
217 create_if_not_exists: create.if_not_exists,
218 table_options,
219 table_id: None,
220 engine: create.engine.to_string(),
221 };
222
223 validate_create_expr(&expr)?;
224 Ok(expr)
225}
226
227pub fn validate_create_expr(create: &CreateTableExpr) -> Result<()> {
229 let mut column_to_indices = HashMap::with_capacity(create.column_defs.len());
231 for (idx, column) in create.column_defs.iter().enumerate() {
232 if let Some(indices) = column_to_indices.get(&column.name) {
233 return InvalidSqlSnafu {
234 err_msg: format!(
235 "column name `{}` is duplicated at index {} and {}",
236 column.name, indices, idx
237 ),
238 }
239 .fail();
240 }
241 column_to_indices.insert(&column.name, idx);
242 }
243
244 let _ = column_to_indices
246 .get(&create.time_index)
247 .with_context(|| InvalidSqlSnafu {
248 err_msg: format!(
249 "column name `{}` is not found in column list",
250 create.time_index
251 ),
252 })?;
253
254 for pk in &create.primary_keys {
256 let _ = column_to_indices
257 .get(&pk)
258 .with_context(|| InvalidSqlSnafu {
259 err_msg: format!("column name `{}` is not found in column list", pk),
260 })?;
261 }
262
263 let mut pk_set = HashSet::new();
265 for pk in &create.primary_keys {
266 if !pk_set.insert(pk) {
267 return InvalidSqlSnafu {
268 err_msg: format!("column name `{}` is duplicated in primary keys", pk),
269 }
270 .fail();
271 }
272 }
273
274 if pk_set.contains(&create.time_index) {
276 return InvalidSqlSnafu {
277 err_msg: format!(
278 "column name `{}` is both primary key and time index",
279 create.time_index
280 ),
281 }
282 .fail();
283 }
284
285 for column in &create.column_defs {
286 if is_interval_type(&column.data_type()) {
288 return InvalidSqlSnafu {
289 err_msg: format!(
290 "column name `{}` is interval type, which is not supported",
291 column.name
292 ),
293 }
294 .fail();
295 }
296 if is_date_time_type(&column.data_type()) {
298 return InvalidSqlSnafu {
299 err_msg: format!(
300 "column name `{}` is datetime type, which is not supported, please use `timestamp` type instead",
301 column.name
302 ),
303 }
304 .fail();
305 }
306 }
307 Ok(())
308}
309
310fn validate_add_columns_expr(add_columns: &AddColumns) -> Result<()> {
311 for add_column in &add_columns.add_columns {
312 let Some(column_def) = &add_column.column_def else {
313 continue;
314 };
315 if is_date_time_type(&column_def.data_type()) {
316 return InvalidSqlSnafu {
317 err_msg: format!("column name `{}` is datetime type, which is not supported, please use `timestamp` type instead", column_def.name),
318 }
319 .fail();
320 }
321 if is_interval_type(&column_def.data_type()) {
322 return InvalidSqlSnafu {
323 err_msg: format!(
324 "column name `{}` is interval type, which is not supported",
325 column_def.name
326 ),
327 }
328 .fail();
329 }
330 }
331 Ok(())
332}
333
334fn is_date_time_type(data_type: &ColumnDataType) -> bool {
335 matches!(data_type, ColumnDataType::Datetime)
336}
337
338fn is_interval_type(data_type: &ColumnDataType) -> bool {
339 matches!(
340 data_type,
341 ColumnDataType::IntervalYearMonth
342 | ColumnDataType::IntervalDayTime
343 | ColumnDataType::IntervalMonthDayNano
344 )
345}
346
347fn find_primary_keys(
348 columns: &[SqlColumn],
349 constraints: &[TableConstraint],
350) -> Result<Vec<String>> {
351 let columns_pk = columns
352 .iter()
353 .filter_map(|x| {
354 if x.options().iter().any(|o| {
355 matches!(
356 o.option,
357 ColumnOption::Unique {
358 is_primary: true,
359 ..
360 }
361 )
362 }) {
363 Some(x.name().value.clone())
364 } else {
365 None
366 }
367 })
368 .collect::<Vec<String>>();
369
370 ensure!(
371 columns_pk.len() <= 1,
372 IllegalPrimaryKeysDefSnafu {
373 msg: "not allowed to inline multiple primary keys in columns options"
374 }
375 );
376
377 let constraints_pk = constraints
378 .iter()
379 .filter_map(|constraint| match constraint {
380 TableConstraint::PrimaryKey { columns, .. } => {
381 Some(columns.iter().map(|ident| ident.value.clone()))
382 }
383 _ => None,
384 })
385 .flatten()
386 .collect::<Vec<String>>();
387
388 ensure!(
389 columns_pk.is_empty() || constraints_pk.is_empty(),
390 IllegalPrimaryKeysDefSnafu {
391 msg: "found definitions of primary keys in multiple places"
392 }
393 );
394
395 let mut primary_keys = Vec::with_capacity(columns_pk.len() + constraints_pk.len());
396 primary_keys.extend(columns_pk);
397 primary_keys.extend(constraints_pk);
398 Ok(primary_keys)
399}
400
401pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
402 let time_index = constraints
403 .iter()
404 .filter_map(|constraint| match constraint {
405 TableConstraint::TimeIndex { column, .. } => Some(&column.value),
406 _ => None,
407 })
408 .collect::<Vec<&String>>();
409 ensure!(
410 time_index.len() == 1,
411 InvalidSqlSnafu {
412 err_msg: "must have one and only one TimeIndex columns",
413 }
414 );
415 Ok(time_index.first().unwrap().to_string())
416}
417
418fn columns_to_expr(
419 column_defs: &[SqlColumn],
420 time_index: &str,
421 primary_keys: &[String],
422 timezone: Option<&Timezone>,
423) -> Result<Vec<api::v1::ColumnDef>> {
424 let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
425 column_schemas_to_defs(column_schemas, primary_keys)
426}
427
428fn columns_to_column_schemas(
429 columns: &[SqlColumn],
430 time_index: &str,
431 timezone: Option<&Timezone>,
432) -> Result<Vec<ColumnSchema>> {
433 columns
434 .iter()
435 .map(|c| column_to_schema(c, time_index, timezone).context(ParseSqlSnafu))
436 .collect::<Result<Vec<ColumnSchema>>>()
437}
438
439pub fn column_schemas_to_defs(
440 column_schemas: Vec<ColumnSchema>,
441 primary_keys: &[String],
442) -> Result<Vec<api::v1::ColumnDef>> {
443 let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
444 .iter()
445 .map(|c| {
446 ColumnDataTypeWrapper::try_from(c.data_type.clone())
447 .map(|w| w.to_parts())
448 .context(ColumnDataTypeSnafu)
449 })
450 .collect::<Result<Vec<_>>>()?;
451
452 column_schemas
453 .iter()
454 .zip(column_datatypes)
455 .map(|(schema, datatype)| {
456 let semantic_type = if schema.is_time_index() {
457 SemanticType::Timestamp
458 } else if primary_keys.contains(&schema.name) {
459 SemanticType::Tag
460 } else {
461 SemanticType::Field
462 } as i32;
463 let comment = schema
464 .metadata()
465 .get(COMMENT_KEY)
466 .cloned()
467 .unwrap_or_default();
468
469 Ok(api::v1::ColumnDef {
470 name: schema.name.clone(),
471 data_type: datatype.0 as i32,
472 is_nullable: schema.is_nullable(),
473 default_constraint: match schema.default_constraint() {
474 None => vec![],
475 Some(v) => {
476 v.clone()
477 .try_into()
478 .context(ConvertColumnDefaultConstraintSnafu {
479 column_name: &schema.name,
480 })?
481 }
482 },
483 semantic_type,
484 comment,
485 datatype_extension: datatype.1,
486 options: options_from_column_schema(schema),
487 })
488 })
489 .collect()
490}
491
492pub(crate) fn to_alter_table_expr(
494 alter_table: AlterTable,
495 query_ctx: &QueryContextRef,
496) -> Result<AlterTableExpr> {
497 let (catalog_name, schema_name, table_name) =
498 table_idents_to_full_name(alter_table.table_name(), query_ctx)
499 .map_err(BoxedError::new)
500 .context(ExternalSnafu)?;
501
502 let kind = match alter_table.alter_operation {
503 AlterTableOperation::AddConstraint(_) => {
504 return NotSupportedSnafu {
505 feat: "ADD CONSTRAINT",
506 }
507 .fail();
508 }
509 AlterTableOperation::AddColumns { add_columns } => AlterTableKind::AddColumns(AddColumns {
510 add_columns: add_columns
511 .into_iter()
512 .map(|add_column| {
513 let column_def = sql_column_def_to_grpc_column_def(
514 &add_column.column_def,
515 Some(&query_ctx.timezone()),
516 )
517 .map_err(BoxedError::new)
518 .context(ExternalSnafu)?;
519 if is_interval_type(&column_def.data_type()) {
520 return NotSupportedSnafu {
521 feat: "Add column with interval type",
522 }
523 .fail();
524 }
525 Ok(AddColumn {
526 column_def: Some(column_def),
527 location: add_column.location.as_ref().map(From::from),
528 add_if_not_exists: add_column.add_if_not_exists,
529 })
530 })
531 .collect::<Result<Vec<AddColumn>>>()?,
532 }),
533 AlterTableOperation::ModifyColumnType {
534 column_name,
535 target_type,
536 } => {
537 let target_type =
538 sql_data_type_to_concrete_data_type(&target_type).context(ParseSqlSnafu)?;
539 let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
540 .map(|w| w.to_parts())
541 .context(ColumnDataTypeSnafu)?;
542 if is_interval_type(&target_type) {
543 return NotSupportedSnafu {
544 feat: "Modify column type to interval type",
545 }
546 .fail();
547 }
548 AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
549 modify_column_types: vec![ModifyColumnType {
550 column_name: column_name.value,
551 target_type: target_type as i32,
552 target_type_extension,
553 }],
554 })
555 }
556 AlterTableOperation::DropColumn { name } => AlterTableKind::DropColumns(DropColumns {
557 drop_columns: vec![DropColumn {
558 name: name.value.to_string(),
559 }],
560 }),
561 AlterTableOperation::RenameTable { new_table_name } => {
562 AlterTableKind::RenameTable(RenameTable {
563 new_table_name: new_table_name.to_string(),
564 })
565 }
566 AlterTableOperation::SetTableOptions { options } => {
567 AlterTableKind::SetTableOptions(SetTableOptions {
568 table_options: options.into_iter().map(Into::into).collect(),
569 })
570 }
571 AlterTableOperation::UnsetTableOptions { keys } => {
572 AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys })
573 }
574 AlterTableOperation::SetIndex { options } => AlterTableKind::SetIndex(match options {
575 sql::statements::alter::SetIndexOperation::Fulltext {
576 column_name,
577 options,
578 } => SetIndex {
579 options: Some(set_index::Options::Fulltext(SetFulltext {
580 column_name: column_name.value,
581 enable: options.enable,
582 analyzer: match options.analyzer {
583 FulltextAnalyzer::English => Analyzer::English.into(),
584 FulltextAnalyzer::Chinese => Analyzer::Chinese.into(),
585 },
586 case_sensitive: options.case_sensitive,
587 backend: match options.backend {
588 FulltextBackend::Bloom => PbFulltextBackend::Bloom.into(),
589 FulltextBackend::Tantivy => PbFulltextBackend::Tantivy.into(),
590 },
591 })),
592 },
593 sql::statements::alter::SetIndexOperation::Inverted { column_name } => SetIndex {
594 options: Some(set_index::Options::Inverted(SetInverted {
595 column_name: column_name.value,
596 })),
597 },
598 sql::statements::alter::SetIndexOperation::Skipping {
599 column_name,
600 options,
601 } => SetIndex {
602 options: Some(set_index::Options::Skipping(SetSkipping {
603 column_name: column_name.value,
604 enable: true,
605 granularity: options.granularity as u64,
606 skipping_index_type: match options.index_type {
607 SkippingIndexType::BloomFilter => PbSkippingIndexType::BloomFilter.into(),
608 },
609 })),
610 },
611 }),
612 AlterTableOperation::UnsetIndex { options } => AlterTableKind::UnsetIndex(match options {
613 sql::statements::alter::UnsetIndexOperation::Fulltext { column_name } => UnsetIndex {
614 options: Some(unset_index::Options::Fulltext(UnsetFulltext {
615 column_name: column_name.value,
616 })),
617 },
618 sql::statements::alter::UnsetIndexOperation::Inverted { column_name } => UnsetIndex {
619 options: Some(unset_index::Options::Inverted(UnsetInverted {
620 column_name: column_name.value,
621 })),
622 },
623 sql::statements::alter::UnsetIndexOperation::Skipping { column_name } => UnsetIndex {
624 options: Some(unset_index::Options::Skipping(UnsetSkipping {
625 column_name: column_name.value,
626 })),
627 },
628 }),
629 };
630
631 Ok(AlterTableExpr {
632 catalog_name,
633 schema_name,
634 table_name,
635 kind: Some(kind),
636 })
637}
638
639pub fn to_alter_database_expr(
641 alter_database: AlterDatabase,
642 query_ctx: &QueryContextRef,
643) -> Result<AlterDatabaseExpr> {
644 let catalog = query_ctx.current_catalog();
645 let schema = alter_database.database_name;
646
647 let kind = match alter_database.alter_operation {
648 AlterDatabaseOperation::SetDatabaseOption { options } => {
649 let options = options.into_iter().map(Into::into).collect();
650 AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
651 set_database_options: options,
652 })
653 }
654 AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
655 AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys })
656 }
657 };
658
659 Ok(AlterDatabaseExpr {
660 catalog_name: catalog.to_string(),
661 schema_name: schema.to_string(),
662 kind: Some(kind),
663 })
664}
665
666pub fn to_create_view_expr(
668 stmt: CreateView,
669 logical_plan: Vec<u8>,
670 table_names: Vec<TableName>,
671 columns: Vec<String>,
672 plan_columns: Vec<String>,
673 definition: String,
674 query_ctx: QueryContextRef,
675) -> Result<CreateViewExpr> {
676 let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx)
677 .map_err(BoxedError::new)
678 .context(ExternalSnafu)?;
679
680 let expr = CreateViewExpr {
681 catalog_name,
682 schema_name,
683 view_name,
684 logical_plan,
685 create_if_not_exists: stmt.if_not_exists,
686 or_replace: stmt.or_replace,
687 table_names,
688 columns,
689 plan_columns,
690 definition,
691 };
692
693 Ok(expr)
694}
695
696pub fn to_create_flow_task_expr(
697 create_flow: CreateFlow,
698 query_ctx: &QueryContextRef,
699) -> Result<CreateFlowExpr> {
700 let sink_table_ref =
702 object_name_to_table_reference(create_flow.sink_table_name.clone().into(), true)
703 .with_context(|_| ConvertIdentifierSnafu {
704 ident: create_flow.sink_table_name.to_string(),
705 })?;
706 let catalog = sink_table_ref
707 .catalog()
708 .unwrap_or(query_ctx.current_catalog())
709 .to_string();
710 let schema = sink_table_ref
711 .schema()
712 .map(|s| s.to_owned())
713 .unwrap_or(query_ctx.current_schema());
714
715 let sink_table_name = TableName {
716 catalog_name: catalog,
717 schema_name: schema,
718 table_name: sink_table_ref.table().to_string(),
719 };
720
721 let source_table_names = extract_tables_from_query(&create_flow.query)
722 .map(|name| {
723 let reference = object_name_to_table_reference(name.clone().into(), true)
724 .with_context(|_| ConvertIdentifierSnafu {
725 ident: name.to_string(),
726 })?;
727 let catalog = reference
728 .catalog()
729 .unwrap_or(query_ctx.current_catalog())
730 .to_string();
731 let schema = reference
732 .schema()
733 .map(|s| s.to_string())
734 .unwrap_or(query_ctx.current_schema());
735
736 let table_name = TableName {
737 catalog_name: catalog,
738 schema_name: schema,
739 table_name: reference.table().to_string(),
740 };
741 Ok(table_name)
742 })
743 .collect::<Result<Vec<_>>>()?;
744
745 Ok(CreateFlowExpr {
746 catalog_name: query_ctx.current_catalog().to_string(),
747 flow_name: sanitize_flow_name(create_flow.flow_name)?,
748 source_table_names,
749 sink_table_name: Some(sink_table_name),
750 or_replace: create_flow.or_replace,
751 create_if_not_exists: create_flow.if_not_exists,
752 expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
753 comment: create_flow.comment.unwrap_or_default(),
754 sql: create_flow.query.to_string(),
755 flow_options: HashMap::new(),
756 })
757}
758
759fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
761 ensure!(
762 flow_name.0.len() == 1,
763 InvalidFlowNameSnafu {
764 name: flow_name.to_string(),
765 }
766 );
767 Ok(flow_name.0.swap_remove(0).value)
769}
770
771#[cfg(test)]
772mod tests {
773 use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
774 use datatypes::value::Value;
775 use session::context::{QueryContext, QueryContextBuilder};
776 use sql::dialect::GreptimeDbDialect;
777 use sql::parser::{ParseOptions, ParserContext};
778 use sql::statements::statement::Statement;
779 use store_api::storage::ColumnDefaultConstraint;
780
781 use super::*;
782
783 #[test]
784 fn test_create_flow_expr() {
785 let sql = r"
786CREATE FLOW `task_2`
787SINK TO schema_1.table_1
788AS
789SELECT max(c1), min(c2) FROM schema_2.table_2;";
790 let stmt =
791 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
792 .unwrap()
793 .pop()
794 .unwrap();
795
796 let Statement::CreateFlow(create_flow) = stmt else {
797 unreachable!()
798 };
799 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
800
801 let to_dot_sep =
802 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
803 assert_eq!("task_2", expr.flow_name);
804 assert_eq!("greptime", expr.catalog_name);
805 assert_eq!(
806 "greptime.schema_1.table_1",
807 expr.sink_table_name.map(to_dot_sep).unwrap()
808 );
809 assert_eq!(1, expr.source_table_names.len());
810 assert_eq!(
811 "greptime.schema_2.table_2",
812 to_dot_sep(expr.source_table_names[0].clone())
813 );
814 assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
815
816 let sql = r"
817CREATE FLOW abc.`task_2`
818SINK TO schema_1.table_1
819AS
820SELECT max(c1), min(c2) FROM schema_2.table_2;";
821 let stmt =
822 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
823 .unwrap()
824 .pop()
825 .unwrap();
826
827 let Statement::CreateFlow(create_flow) = stmt else {
828 unreachable!()
829 };
830 let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
831
832 assert!(res.is_err());
833 assert!(res
834 .unwrap_err()
835 .to_string()
836 .contains("Invalid flow name: abc.`task_2`"));
837 }
838
839 #[test]
840 fn test_create_to_expr() {
841 let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(ttl='3days', write_buffer_size='1024KB');";
842 let stmt =
843 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
844 .unwrap()
845 .pop()
846 .unwrap();
847
848 let Statement::CreateTable(create_table) = stmt else {
849 unreachable!()
850 };
851 let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
852 assert_eq!("3days", expr.table_options.get("ttl").unwrap());
853 assert_eq!(
854 "1.0MiB",
855 expr.table_options.get("write_buffer_size").unwrap()
856 );
857 }
858
859 #[test]
860 fn test_invalid_create_to_expr() {
861 let cases = [
862 "CREATE TABLE monitor (host STRING primary key, ts TIMESTAMP TIME INDEX, some_column text, some_column string);",
864 "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, some_column STRING, PRIMARY KEY (some_column, host, some_column));",
866 "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, PRIMARY KEY (host, ts));"
868 ];
869
870 for sql in cases {
871 let stmt = ParserContext::create_with_dialect(
872 sql,
873 &GreptimeDbDialect {},
874 ParseOptions::default(),
875 )
876 .unwrap()
877 .pop()
878 .unwrap();
879 let Statement::CreateTable(create_table) = stmt else {
880 unreachable!()
881 };
882 create_to_expr(&create_table, &QueryContext::arc()).unwrap_err();
883 }
884 }
885
886 #[test]
887 fn test_create_to_expr_with_default_timestamp_value() {
888 let sql = "CREATE TABLE monitor (v double,ts TIMESTAMP default '2024-01-30T00:01:01',TIME INDEX (ts)) engine=mito;";
889 let stmt =
890 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
891 .unwrap()
892 .pop()
893 .unwrap();
894
895 let Statement::CreateTable(create_table) = stmt else {
896 unreachable!()
897 };
898
899 let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
901 let ts_column = &expr.column_defs[1];
902 let constraint = assert_ts_column(ts_column);
903 assert!(
904 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
905 if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
906 );
907
908 let ctx = QueryContextBuilder::default()
910 .timezone(Timezone::from_tz_string("+08:00").unwrap())
911 .build()
912 .into();
913 let expr = create_to_expr(&create_table, &ctx).unwrap();
914 let ts_column = &expr.column_defs[1];
915 let constraint = assert_ts_column(ts_column);
916 assert!(
917 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
918 if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
919 );
920 }
921
922 fn assert_ts_column(ts_column: &api::v1::ColumnDef) -> ColumnDefaultConstraint {
923 assert_eq!("ts", ts_column.name);
924 assert_eq!(
925 ColumnDataType::TimestampMillisecond as i32,
926 ts_column.data_type
927 );
928 assert!(!ts_column.default_constraint.is_empty());
929
930 ColumnDefaultConstraint::try_from(&ts_column.default_constraint[..]).unwrap()
931 }
932
933 #[test]
934 fn test_to_alter_expr() {
935 let sql = "ALTER DATABASE greptime SET key1='value1', key2='value2';";
936 let stmt =
937 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
938 .unwrap()
939 .pop()
940 .unwrap();
941
942 let Statement::AlterDatabase(alter_database) = stmt else {
943 unreachable!()
944 };
945
946 let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
947 let kind = expr.kind.unwrap();
948
949 let AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
950 set_database_options,
951 }) = kind
952 else {
953 unreachable!()
954 };
955
956 assert_eq!(2, set_database_options.len());
957 assert_eq!("key1", set_database_options[0].key);
958 assert_eq!("value1", set_database_options[0].value);
959 assert_eq!("key2", set_database_options[1].key);
960 assert_eq!("value2", set_database_options[1].value);
961
962 let sql = "ALTER DATABASE greptime UNSET key1, key2;";
963 let stmt =
964 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
965 .unwrap()
966 .pop()
967 .unwrap();
968
969 let Statement::AlterDatabase(alter_database) = stmt else {
970 unreachable!()
971 };
972
973 let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
974 let kind = expr.kind.unwrap();
975
976 let AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys }) = kind else {
977 unreachable!()
978 };
979
980 assert_eq!(2, keys.len());
981 assert!(keys.contains(&"key1".to_string()));
982 assert!(keys.contains(&"key2".to_string()));
983
984 let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
985 let stmt =
986 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
987 .unwrap()
988 .pop()
989 .unwrap();
990
991 let Statement::AlterTable(alter_table) = stmt else {
992 unreachable!()
993 };
994
995 let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
997 let kind = expr.kind.unwrap();
998
999 let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1000 unreachable!()
1001 };
1002
1003 assert_eq!(1, add_columns.len());
1004 let ts_column = add_columns[0].column_def.clone().unwrap();
1005 let constraint = assert_ts_column(&ts_column);
1006 assert!(
1007 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1008 if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1009 );
1010
1011 let ctx = QueryContextBuilder::default()
1014 .timezone(Timezone::from_tz_string("+08:00").unwrap())
1015 .build()
1016 .into();
1017 let expr = to_alter_table_expr(alter_table, &ctx).unwrap();
1018 let kind = expr.kind.unwrap();
1019
1020 let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1021 unreachable!()
1022 };
1023
1024 assert_eq!(1, add_columns.len());
1025 let ts_column = add_columns[0].column_def.clone().unwrap();
1026 let constraint = assert_ts_column(&ts_column);
1027 assert!(
1028 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1029 if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1030 );
1031 }
1032
1033 #[test]
1034 fn test_to_alter_modify_column_type_expr() {
1035 let sql = "ALTER TABLE monitor MODIFY COLUMN mem_usage STRING;";
1036 let stmt =
1037 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1038 .unwrap()
1039 .pop()
1040 .unwrap();
1041
1042 let Statement::AlterTable(alter_table) = stmt else {
1043 unreachable!()
1044 };
1045
1046 let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1048 let kind = expr.kind.unwrap();
1049
1050 let AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
1051 modify_column_types,
1052 }) = kind
1053 else {
1054 unreachable!()
1055 };
1056
1057 assert_eq!(1, modify_column_types.len());
1058 let modify_column_type = &modify_column_types[0];
1059
1060 assert_eq!("mem_usage", modify_column_type.column_name);
1061 assert_eq!(
1062 ColumnDataType::String as i32,
1063 modify_column_type.target_type
1064 );
1065 assert!(modify_column_type.target_type_extension.is_none());
1066 }
1067
1068 fn new_test_table_names() -> Vec<TableName> {
1069 vec![
1070 TableName {
1071 catalog_name: "greptime".to_string(),
1072 schema_name: "public".to_string(),
1073 table_name: "a_table".to_string(),
1074 },
1075 TableName {
1076 catalog_name: "greptime".to_string(),
1077 schema_name: "public".to_string(),
1078 table_name: "b_table".to_string(),
1079 },
1080 ]
1081 }
1082
1083 #[test]
1084 fn test_to_create_view_expr() {
1085 let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
1086 let stmt =
1087 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1088 .unwrap()
1089 .pop()
1090 .unwrap();
1091
1092 let Statement::CreateView(stmt) = stmt else {
1093 unreachable!()
1094 };
1095
1096 let logical_plan = vec![1, 2, 3];
1097 let table_names = new_test_table_names();
1098 let columns = vec!["a".to_string()];
1099 let plan_columns = vec!["number".to_string()];
1100
1101 let expr = to_create_view_expr(
1102 stmt,
1103 logical_plan.clone(),
1104 table_names.clone(),
1105 columns.clone(),
1106 plan_columns.clone(),
1107 sql.to_string(),
1108 QueryContext::arc(),
1109 )
1110 .unwrap();
1111
1112 assert_eq!("greptime", expr.catalog_name);
1113 assert_eq!("public", expr.schema_name);
1114 assert_eq!("test", expr.view_name);
1115 assert!(!expr.create_if_not_exists);
1116 assert!(!expr.or_replace);
1117 assert_eq!(logical_plan, expr.logical_plan);
1118 assert_eq!(table_names, expr.table_names);
1119 assert_eq!(sql, expr.definition);
1120 assert_eq!(columns, expr.columns);
1121 assert_eq!(plan_columns, expr.plan_columns);
1122 }
1123
1124 #[test]
1125 fn test_to_create_view_expr_complex() {
1126 let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test.test_view AS SELECT * FROM NUMBERS";
1127 let stmt =
1128 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1129 .unwrap()
1130 .pop()
1131 .unwrap();
1132
1133 let Statement::CreateView(stmt) = stmt else {
1134 unreachable!()
1135 };
1136
1137 let logical_plan = vec![1, 2, 3];
1138 let table_names = new_test_table_names();
1139 let columns = vec!["a".to_string()];
1140 let plan_columns = vec!["number".to_string()];
1141
1142 let expr = to_create_view_expr(
1143 stmt,
1144 logical_plan.clone(),
1145 table_names.clone(),
1146 columns.clone(),
1147 plan_columns.clone(),
1148 sql.to_string(),
1149 QueryContext::arc(),
1150 )
1151 .unwrap();
1152
1153 assert_eq!("greptime", expr.catalog_name);
1154 assert_eq!("test", expr.schema_name);
1155 assert_eq!("test_view", expr.view_name);
1156 assert!(expr.create_if_not_exists);
1157 assert!(expr.or_replace);
1158 assert_eq!(logical_plan, expr.logical_plan);
1159 assert_eq!(table_names, expr.table_names);
1160 assert_eq!(sql, expr.definition);
1161 assert_eq!(columns, expr.columns);
1162 assert_eq!(plan_columns, expr.plan_columns);
1163 }
1164}