1#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19
20use api::helper::ColumnDataTypeWrapper;
21use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
22use api::v1::alter_table_expr::Kind as AlterTableKind;
23use api::v1::column_def::{options_from_column_schema, try_as_column_schema};
24use api::v1::{
25 AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer, ColumnDataType,
26 ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn,
27 DropColumns, DropDefaults, ExpireAfter, FulltextBackend as PbFulltextBackend, ModifyColumnType,
28 ModifyColumnTypes, RenameTable, SemanticType, SetDatabaseOptions, SetDefaults, SetFulltext,
29 SetIndex, SetIndexes, SetInverted, SetSkipping, SetTableOptions,
30 SkippingIndexType as PbSkippingIndexType, TableName, UnsetDatabaseOptions, UnsetFulltext,
31 UnsetIndex, UnsetIndexes, UnsetInverted, UnsetSkipping, UnsetTableOptions, set_index,
32 unset_index,
33};
34use common_error::ext::BoxedError;
35use common_grpc_expr::util::ColumnExpr;
36use common_time::Timezone;
37use datafusion::sql::planner::object_name_to_table_reference;
38use datatypes::schema::{
39 COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
40 COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
41 COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
42 COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
43 ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema,
44 SkippingIndexType,
45};
46use file_engine::FileOptions;
47use query::sql::{
48 check_file_to_table_schema_compatibility, file_column_schemas_to_table,
49 infer_file_table_schema, prepare_file_table_files,
50};
51use session::context::QueryContextRef;
52use session::table_name::table_idents_to_full_name;
53use snafu::{OptionExt, ResultExt, ensure};
54use sql::ast::{
55 ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName, ObjectNamePartExt,
56};
57use sql::dialect::GreptimeDbDialect;
58use sql::parser::ParserContext;
59use sql::statements::alter::{
60 AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
61};
62use sql::statements::create::{
63 Column as SqlColumn, ColumnExtensions, CreateExternalTable, CreateFlow, CreateTable,
64 CreateView, TableConstraint,
65};
66use sql::statements::{
67 OptionMap, column_to_schema, concrete_data_type_to_sql_data_type,
68 sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type, value_to_sql_value,
69};
70use sql::util::extract_tables_from_query;
71use table::requests::{FILE_TABLE_META_KEY, TableOptions};
72use table::table_reference::TableReference;
73#[cfg(feature = "enterprise")]
74pub use trigger::to_create_trigger_task_expr;
75
76use crate::error::{
77 BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
78 ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, FindNewColumnsOnInsertionSnafu,
79 IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidColumnDefSnafu,
80 InvalidFlowNameSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, ParseSqlValueSnafu,
81 PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu,
82};
83
84pub fn create_table_expr_by_column_schemas(
85 table_name: &TableReference<'_>,
86 column_schemas: &[api::v1::ColumnSchema],
87 engine: &str,
88 desc: Option<&str>,
89) -> Result<CreateTableExpr> {
90 let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
91 let expr = common_grpc_expr::util::build_create_table_expr(
92 None,
93 table_name,
94 column_exprs,
95 engine,
96 desc.unwrap_or("Created on insertion"),
97 )
98 .context(BuildCreateExprOnInsertionSnafu)?;
99
100 validate_create_expr(&expr)?;
101 Ok(expr)
102}
103
104pub fn extract_add_columns_expr(
105 schema: &Schema,
106 column_exprs: Vec<ColumnExpr>,
107) -> Result<Option<AddColumns>> {
108 let add_columns = common_grpc_expr::util::extract_new_columns(schema, column_exprs)
109 .context(FindNewColumnsOnInsertionSnafu)?;
110 if let Some(add_columns) = &add_columns {
111 validate_add_columns_expr(add_columns)?;
112 }
113 Ok(add_columns)
114}
115
116pub(crate) async fn create_external_expr(
140 create: CreateExternalTable,
141 query_ctx: &QueryContextRef,
142) -> Result<CreateTableExpr> {
143 let (catalog_name, schema_name, table_name) =
144 table_idents_to_full_name(&create.name, query_ctx)
145 .map_err(BoxedError::new)
146 .context(ExternalSnafu)?;
147
148 let mut table_options = create.options.into_map();
149
150 let (object_store, files) = prepare_file_table_files(&table_options)
151 .await
152 .context(PrepareFileTableSnafu)?;
153
154 let file_column_schemas = infer_file_table_schema(&object_store, &files, &table_options)
155 .await
156 .context(InferFileTableSchemaSnafu)?
157 .column_schemas;
158
159 let (time_index, primary_keys, table_column_schemas) = if !create.columns.is_empty() {
160 let time_index = find_time_index(&create.constraints)?;
162 let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
163 let column_schemas =
164 columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
165 (time_index, primary_keys, column_schemas)
166 } else {
167 let (column_schemas, time_index) = file_column_schemas_to_table(&file_column_schemas);
169 let primary_keys = vec![];
170 (time_index, primary_keys, column_schemas)
171 };
172
173 check_file_to_table_schema_compatibility(&file_column_schemas, &table_column_schemas)
174 .context(SchemaIncompatibleSnafu)?;
175
176 let meta = FileOptions {
177 files,
178 file_column_schemas,
179 };
180 table_options.insert(
181 FILE_TABLE_META_KEY.to_string(),
182 serde_json::to_string(&meta).context(EncodeJsonSnafu)?,
183 );
184
185 let column_defs = column_schemas_to_defs(table_column_schemas, &primary_keys)?;
186 let expr = CreateTableExpr {
187 catalog_name,
188 schema_name,
189 table_name,
190 desc: String::default(),
191 column_defs,
192 time_index,
193 primary_keys,
194 create_if_not_exists: create.if_not_exists,
195 table_options,
196 table_id: None,
197 engine: create.engine.to_string(),
198 };
199
200 Ok(expr)
201}
202
203pub fn create_to_expr(
205 create: &CreateTable,
206 query_ctx: &QueryContextRef,
207) -> Result<CreateTableExpr> {
208 let (catalog_name, schema_name, table_name) =
209 table_idents_to_full_name(&create.name, query_ctx)
210 .map_err(BoxedError::new)
211 .context(ExternalSnafu)?;
212
213 let time_index = find_time_index(&create.constraints)?;
214 let table_options = HashMap::from(
215 &TableOptions::try_from_iter(create.options.to_str_map())
216 .context(UnrecognizedTableOptionSnafu)?,
217 );
218
219 let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
220
221 let expr = CreateTableExpr {
222 catalog_name,
223 schema_name,
224 table_name,
225 desc: String::default(),
226 column_defs: columns_to_expr(
227 &create.columns,
228 &time_index,
229 &primary_keys,
230 Some(&query_ctx.timezone()),
231 )?,
232 time_index,
233 primary_keys,
234 create_if_not_exists: create.if_not_exists,
235 table_options,
236 table_id: None,
237 engine: create.engine.to_string(),
238 };
239
240 validate_create_expr(&expr)?;
241 Ok(expr)
242}
243
244pub fn expr_to_create(expr: &CreateTableExpr, quote_style: Option<char>) -> Result<CreateTable> {
252 let quote_style = quote_style.unwrap_or('`');
253
254 let table_name = ObjectName(vec![sql::ast::ObjectNamePart::Identifier(
256 sql::ast::Ident::with_quote(quote_style, &expr.table_name),
257 )]);
258
259 let mut columns = Vec::with_capacity(expr.column_defs.len());
261 for column_def in &expr.column_defs {
262 let column_schema = try_as_column_schema(column_def).context(InvalidColumnDefSnafu {
263 column: &column_def.name,
264 })?;
265
266 let mut options = Vec::new();
267
268 if column_def.is_nullable {
270 options.push(ColumnOptionDef {
271 name: None,
272 option: ColumnOption::Null,
273 });
274 } else {
275 options.push(ColumnOptionDef {
276 name: None,
277 option: ColumnOption::NotNull,
278 });
279 }
280
281 if let Some(default_constraint) = column_schema.default_constraint() {
283 let expr = match default_constraint {
284 ColumnDefaultConstraint::Value(v) => {
285 Expr::Value(value_to_sql_value(v).context(ParseSqlValueSnafu)?.into())
286 }
287 ColumnDefaultConstraint::Function(func_expr) => {
288 ParserContext::parse_function(func_expr, &GreptimeDbDialect {})
289 .context(ParseSqlSnafu)?
290 }
291 };
292 options.push(ColumnOptionDef {
293 name: None,
294 option: ColumnOption::Default(expr),
295 });
296 }
297
298 if !column_def.comment.is_empty() {
300 options.push(ColumnOptionDef {
301 name: None,
302 option: ColumnOption::Comment(column_def.comment.clone()),
303 });
304 }
305
306 let mut extensions = ColumnExtensions::default();
311
312 if let Ok(Some(opt)) = column_schema.fulltext_options()
314 && opt.enable
315 {
316 let mut map = HashMap::from([
317 (
318 COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
319 opt.analyzer.to_string(),
320 ),
321 (
322 COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
323 opt.case_sensitive.to_string(),
324 ),
325 (
326 COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
327 opt.backend.to_string(),
328 ),
329 ]);
330 if opt.backend == FulltextBackend::Bloom {
331 map.insert(
332 COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(),
333 opt.granularity.to_string(),
334 );
335 map.insert(
336 COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
337 opt.false_positive_rate().to_string(),
338 );
339 }
340 extensions.fulltext_index_options = Some(map.into());
341 }
342
343 if let Ok(Some(opt)) = column_schema.skipping_index_options() {
345 let map = HashMap::from([
346 (
347 COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
348 opt.granularity.to_string(),
349 ),
350 (
351 COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
352 opt.false_positive_rate().to_string(),
353 ),
354 (
355 COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
356 opt.index_type.to_string(),
357 ),
358 ]);
359 extensions.skipping_index_options = Some(map.into());
360 }
361
362 if column_schema.is_inverted_indexed() {
364 extensions.inverted_index_options = Some(HashMap::new().into());
365 }
366
367 let sql_column = SqlColumn {
368 column_def: ColumnDef {
369 name: Ident::with_quote(quote_style, &column_def.name),
370 data_type: concrete_data_type_to_sql_data_type(&column_schema.data_type)
371 .context(ParseSqlSnafu)?,
372 options,
373 },
374 extensions,
375 };
376
377 columns.push(sql_column);
378 }
379
380 let mut constraints = Vec::new();
382
383 constraints.push(TableConstraint::TimeIndex {
385 column: Ident::with_quote(quote_style, &expr.time_index),
386 });
387
388 if !expr.primary_keys.is_empty() {
390 let primary_key_columns: Vec<Ident> = expr
391 .primary_keys
392 .iter()
393 .map(|pk| Ident::with_quote(quote_style, pk))
394 .collect();
395
396 constraints.push(TableConstraint::PrimaryKey {
397 columns: primary_key_columns,
398 });
399 }
400
401 let mut options = OptionMap::default();
403 for (key, value) in &expr.table_options {
404 options.insert(key.clone(), value.clone());
405 }
406
407 Ok(CreateTable {
408 if_not_exists: expr.create_if_not_exists,
409 table_id: expr.table_id.as_ref().map(|tid| tid.id).unwrap_or(0),
410 name: table_name,
411 columns,
412 engine: expr.engine.clone(),
413 constraints,
414 options,
415 partitions: None,
416 })
417}
418
419pub fn validate_create_expr(create: &CreateTableExpr) -> Result<()> {
421 let mut column_to_indices = HashMap::with_capacity(create.column_defs.len());
423 for (idx, column) in create.column_defs.iter().enumerate() {
424 if let Some(indices) = column_to_indices.get(&column.name) {
425 return InvalidSqlSnafu {
426 err_msg: format!(
427 "column name `{}` is duplicated at index {} and {}",
428 column.name, indices, idx
429 ),
430 }
431 .fail();
432 }
433 column_to_indices.insert(&column.name, idx);
434 }
435
436 let _ = column_to_indices
438 .get(&create.time_index)
439 .with_context(|| InvalidSqlSnafu {
440 err_msg: format!(
441 "column name `{}` is not found in column list",
442 create.time_index
443 ),
444 })?;
445
446 for pk in &create.primary_keys {
448 let _ = column_to_indices
449 .get(&pk)
450 .with_context(|| InvalidSqlSnafu {
451 err_msg: format!("column name `{}` is not found in column list", pk),
452 })?;
453 }
454
455 let mut pk_set = HashSet::new();
457 for pk in &create.primary_keys {
458 if !pk_set.insert(pk) {
459 return InvalidSqlSnafu {
460 err_msg: format!("column name `{}` is duplicated in primary keys", pk),
461 }
462 .fail();
463 }
464 }
465
466 if pk_set.contains(&create.time_index) {
468 return InvalidSqlSnafu {
469 err_msg: format!(
470 "column name `{}` is both primary key and time index",
471 create.time_index
472 ),
473 }
474 .fail();
475 }
476
477 for column in &create.column_defs {
478 if is_interval_type(&column.data_type()) {
480 return InvalidSqlSnafu {
481 err_msg: format!(
482 "column name `{}` is interval type, which is not supported",
483 column.name
484 ),
485 }
486 .fail();
487 }
488 if is_date_time_type(&column.data_type()) {
490 return InvalidSqlSnafu {
491 err_msg: format!(
492 "column name `{}` is datetime type, which is not supported, please use `timestamp` type instead",
493 column.name
494 ),
495 }
496 .fail();
497 }
498 }
499 Ok(())
500}
501
502fn validate_add_columns_expr(add_columns: &AddColumns) -> Result<()> {
503 for add_column in &add_columns.add_columns {
504 let Some(column_def) = &add_column.column_def else {
505 continue;
506 };
507 if is_date_time_type(&column_def.data_type()) {
508 return InvalidSqlSnafu {
509 err_msg: format!("column name `{}` is datetime type, which is not supported, please use `timestamp` type instead", column_def.name),
510 }
511 .fail();
512 }
513 if is_interval_type(&column_def.data_type()) {
514 return InvalidSqlSnafu {
515 err_msg: format!(
516 "column name `{}` is interval type, which is not supported",
517 column_def.name
518 ),
519 }
520 .fail();
521 }
522 }
523 Ok(())
524}
525
526fn is_date_time_type(data_type: &ColumnDataType) -> bool {
527 matches!(data_type, ColumnDataType::Datetime)
528}
529
530fn is_interval_type(data_type: &ColumnDataType) -> bool {
531 matches!(
532 data_type,
533 ColumnDataType::IntervalYearMonth
534 | ColumnDataType::IntervalDayTime
535 | ColumnDataType::IntervalMonthDayNano
536 )
537}
538
539fn find_primary_keys(
540 columns: &[SqlColumn],
541 constraints: &[TableConstraint],
542) -> Result<Vec<String>> {
543 let columns_pk = columns
544 .iter()
545 .filter_map(|x| {
546 if x.options().iter().any(|o| {
547 matches!(
548 o.option,
549 ColumnOption::Unique {
550 is_primary: true,
551 ..
552 }
553 )
554 }) {
555 Some(x.name().value.clone())
556 } else {
557 None
558 }
559 })
560 .collect::<Vec<String>>();
561
562 ensure!(
563 columns_pk.len() <= 1,
564 IllegalPrimaryKeysDefSnafu {
565 msg: "not allowed to inline multiple primary keys in columns options"
566 }
567 );
568
569 let constraints_pk = constraints
570 .iter()
571 .filter_map(|constraint| match constraint {
572 TableConstraint::PrimaryKey { columns, .. } => {
573 Some(columns.iter().map(|ident| ident.value.clone()))
574 }
575 _ => None,
576 })
577 .flatten()
578 .collect::<Vec<String>>();
579
580 ensure!(
581 columns_pk.is_empty() || constraints_pk.is_empty(),
582 IllegalPrimaryKeysDefSnafu {
583 msg: "found definitions of primary keys in multiple places"
584 }
585 );
586
587 let mut primary_keys = Vec::with_capacity(columns_pk.len() + constraints_pk.len());
588 primary_keys.extend(columns_pk);
589 primary_keys.extend(constraints_pk);
590 Ok(primary_keys)
591}
592
593pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
594 let time_index = constraints
595 .iter()
596 .filter_map(|constraint| match constraint {
597 TableConstraint::TimeIndex { column, .. } => Some(&column.value),
598 _ => None,
599 })
600 .collect::<Vec<&String>>();
601 ensure!(
602 time_index.len() == 1,
603 InvalidSqlSnafu {
604 err_msg: "must have one and only one TimeIndex columns",
605 }
606 );
607 Ok(time_index.first().unwrap().to_string())
608}
609
610fn columns_to_expr(
611 column_defs: &[SqlColumn],
612 time_index: &str,
613 primary_keys: &[String],
614 timezone: Option<&Timezone>,
615) -> Result<Vec<api::v1::ColumnDef>> {
616 let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
617 column_schemas_to_defs(column_schemas, primary_keys)
618}
619
620fn columns_to_column_schemas(
621 columns: &[SqlColumn],
622 time_index: &str,
623 timezone: Option<&Timezone>,
624) -> Result<Vec<ColumnSchema>> {
625 columns
626 .iter()
627 .map(|c| column_to_schema(c, time_index, timezone).context(ParseSqlSnafu))
628 .collect::<Result<Vec<ColumnSchema>>>()
629}
630
631pub fn column_schemas_to_defs(
633 column_schemas: Vec<ColumnSchema>,
634 primary_keys: &[String],
635) -> Result<Vec<api::v1::ColumnDef>> {
636 let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
637 .iter()
638 .map(|c| {
639 ColumnDataTypeWrapper::try_from(c.data_type.clone())
640 .map(|w| w.to_parts())
641 .context(ColumnDataTypeSnafu)
642 })
643 .collect::<Result<Vec<_>>>()?;
644
645 column_schemas
646 .iter()
647 .zip(column_datatypes)
648 .map(|(schema, datatype)| {
649 let semantic_type = if schema.is_time_index() {
650 SemanticType::Timestamp
651 } else if primary_keys.contains(&schema.name) {
652 SemanticType::Tag
653 } else {
654 SemanticType::Field
655 } as i32;
656 let comment = schema
657 .metadata()
658 .get(COMMENT_KEY)
659 .cloned()
660 .unwrap_or_default();
661
662 Ok(api::v1::ColumnDef {
663 name: schema.name.clone(),
664 data_type: datatype.0 as i32,
665 is_nullable: schema.is_nullable(),
666 default_constraint: match schema.default_constraint() {
667 None => vec![],
668 Some(v) => {
669 v.clone()
670 .try_into()
671 .context(ConvertColumnDefaultConstraintSnafu {
672 column_name: &schema.name,
673 })?
674 }
675 },
676 semantic_type,
677 comment,
678 datatype_extension: datatype.1,
679 options: options_from_column_schema(schema),
680 })
681 })
682 .collect()
683}
684
685pub(crate) fn to_alter_table_expr(
687 alter_table: AlterTable,
688 query_ctx: &QueryContextRef,
689) -> Result<AlterTableExpr> {
690 let (catalog_name, schema_name, table_name) =
691 table_idents_to_full_name(alter_table.table_name(), query_ctx)
692 .map_err(BoxedError::new)
693 .context(ExternalSnafu)?;
694
695 let kind = match alter_table.alter_operation {
696 AlterTableOperation::AddConstraint(_) => {
697 return NotSupportedSnafu {
698 feat: "ADD CONSTRAINT",
699 }
700 .fail();
701 }
702 AlterTableOperation::AddColumns { add_columns } => AlterTableKind::AddColumns(AddColumns {
703 add_columns: add_columns
704 .into_iter()
705 .map(|add_column| {
706 let column_def = sql_column_def_to_grpc_column_def(
707 &add_column.column_def,
708 Some(&query_ctx.timezone()),
709 )
710 .map_err(BoxedError::new)
711 .context(ExternalSnafu)?;
712 if is_interval_type(&column_def.data_type()) {
713 return NotSupportedSnafu {
714 feat: "Add column with interval type",
715 }
716 .fail();
717 }
718 Ok(AddColumn {
719 column_def: Some(column_def),
720 location: add_column.location.as_ref().map(From::from),
721 add_if_not_exists: add_column.add_if_not_exists,
722 })
723 })
724 .collect::<Result<Vec<AddColumn>>>()?,
725 }),
726 AlterTableOperation::ModifyColumnType {
727 column_name,
728 target_type,
729 } => {
730 let target_type =
731 sql_data_type_to_concrete_data_type(&target_type).context(ParseSqlSnafu)?;
732 let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
733 .map(|w| w.to_parts())
734 .context(ColumnDataTypeSnafu)?;
735 if is_interval_type(&target_type) {
736 return NotSupportedSnafu {
737 feat: "Modify column type to interval type",
738 }
739 .fail();
740 }
741 AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
742 modify_column_types: vec![ModifyColumnType {
743 column_name: column_name.value,
744 target_type: target_type as i32,
745 target_type_extension,
746 }],
747 })
748 }
749 AlterTableOperation::DropColumn { name } => AlterTableKind::DropColumns(DropColumns {
750 drop_columns: vec![DropColumn {
751 name: name.value.to_string(),
752 }],
753 }),
754 AlterTableOperation::RenameTable { new_table_name } => {
755 AlterTableKind::RenameTable(RenameTable {
756 new_table_name: new_table_name.to_string(),
757 })
758 }
759 AlterTableOperation::SetTableOptions { options } => {
760 AlterTableKind::SetTableOptions(SetTableOptions {
761 table_options: options.into_iter().map(Into::into).collect(),
762 })
763 }
764 AlterTableOperation::UnsetTableOptions { keys } => {
765 AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys })
766 }
767 AlterTableOperation::SetIndex { options } => {
768 let option = match options {
769 sql::statements::alter::SetIndexOperation::Fulltext {
770 column_name,
771 options,
772 } => SetIndex {
773 options: Some(set_index::Options::Fulltext(SetFulltext {
774 column_name: column_name.value,
775 enable: options.enable,
776 analyzer: match options.analyzer {
777 FulltextAnalyzer::English => Analyzer::English.into(),
778 FulltextAnalyzer::Chinese => Analyzer::Chinese.into(),
779 },
780 case_sensitive: options.case_sensitive,
781 backend: match options.backend {
782 FulltextBackend::Bloom => PbFulltextBackend::Bloom.into(),
783 FulltextBackend::Tantivy => PbFulltextBackend::Tantivy.into(),
784 },
785 granularity: options.granularity as u64,
786 false_positive_rate: options.false_positive_rate(),
787 })),
788 },
789 sql::statements::alter::SetIndexOperation::Inverted { column_name } => SetIndex {
790 options: Some(set_index::Options::Inverted(SetInverted {
791 column_name: column_name.value,
792 })),
793 },
794 sql::statements::alter::SetIndexOperation::Skipping {
795 column_name,
796 options,
797 } => SetIndex {
798 options: Some(set_index::Options::Skipping(SetSkipping {
799 column_name: column_name.value,
800 enable: true,
801 granularity: options.granularity as u64,
802 false_positive_rate: options.false_positive_rate(),
803 skipping_index_type: match options.index_type {
804 SkippingIndexType::BloomFilter => {
805 PbSkippingIndexType::BloomFilter.into()
806 }
807 },
808 })),
809 },
810 };
811 AlterTableKind::SetIndexes(SetIndexes {
812 set_indexes: vec![option],
813 })
814 }
815 AlterTableOperation::UnsetIndex { options } => {
816 let option = match options {
817 sql::statements::alter::UnsetIndexOperation::Fulltext { column_name } => {
818 UnsetIndex {
819 options: Some(unset_index::Options::Fulltext(UnsetFulltext {
820 column_name: column_name.value,
821 })),
822 }
823 }
824 sql::statements::alter::UnsetIndexOperation::Inverted { column_name } => {
825 UnsetIndex {
826 options: Some(unset_index::Options::Inverted(UnsetInverted {
827 column_name: column_name.value,
828 })),
829 }
830 }
831 sql::statements::alter::UnsetIndexOperation::Skipping { column_name } => {
832 UnsetIndex {
833 options: Some(unset_index::Options::Skipping(UnsetSkipping {
834 column_name: column_name.value,
835 })),
836 }
837 }
838 };
839
840 AlterTableKind::UnsetIndexes(UnsetIndexes {
841 unset_indexes: vec![option],
842 })
843 }
844 AlterTableOperation::DropDefaults { columns } => {
845 AlterTableKind::DropDefaults(DropDefaults {
846 drop_defaults: columns
847 .into_iter()
848 .map(|col| {
849 let column_name = col.0.to_string();
850 Ok(api::v1::DropDefault { column_name })
851 })
852 .collect::<Result<Vec<_>>>()?,
853 })
854 }
855 AlterTableOperation::SetDefaults { defaults } => AlterTableKind::SetDefaults(SetDefaults {
856 set_defaults: defaults
857 .into_iter()
858 .map(|col| {
859 let column_name = col.column_name.to_string();
860 let default_constraint = serde_json::to_string(&col.default_constraint)
861 .context(EncodeJsonSnafu)?
862 .into_bytes();
863 Ok(api::v1::SetDefault {
864 column_name,
865 default_constraint,
866 })
867 })
868 .collect::<Result<Vec<_>>>()?,
869 }),
870 };
871
872 Ok(AlterTableExpr {
873 catalog_name,
874 schema_name,
875 table_name,
876 kind: Some(kind),
877 })
878}
879
880pub fn to_alter_database_expr(
882 alter_database: AlterDatabase,
883 query_ctx: &QueryContextRef,
884) -> Result<AlterDatabaseExpr> {
885 let catalog = query_ctx.current_catalog();
886 let schema = alter_database.database_name;
887
888 let kind = match alter_database.alter_operation {
889 AlterDatabaseOperation::SetDatabaseOption { options } => {
890 let options = options.into_iter().map(Into::into).collect();
891 AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
892 set_database_options: options,
893 })
894 }
895 AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
896 AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys })
897 }
898 };
899
900 Ok(AlterDatabaseExpr {
901 catalog_name: catalog.to_string(),
902 schema_name: schema.to_string(),
903 kind: Some(kind),
904 })
905}
906
907pub fn to_create_view_expr(
909 stmt: CreateView,
910 logical_plan: Vec<u8>,
911 table_names: Vec<TableName>,
912 columns: Vec<String>,
913 plan_columns: Vec<String>,
914 definition: String,
915 query_ctx: QueryContextRef,
916) -> Result<CreateViewExpr> {
917 let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx)
918 .map_err(BoxedError::new)
919 .context(ExternalSnafu)?;
920
921 let expr = CreateViewExpr {
922 catalog_name,
923 schema_name,
924 view_name,
925 logical_plan,
926 create_if_not_exists: stmt.if_not_exists,
927 or_replace: stmt.or_replace,
928 table_names,
929 columns,
930 plan_columns,
931 definition,
932 };
933
934 Ok(expr)
935}
936
937pub fn to_create_flow_task_expr(
938 create_flow: CreateFlow,
939 query_ctx: &QueryContextRef,
940) -> Result<CreateFlowExpr> {
941 let sink_table_ref =
943 object_name_to_table_reference(create_flow.sink_table_name.clone().into(), true)
944 .with_context(|_| ConvertIdentifierSnafu {
945 ident: create_flow.sink_table_name.to_string(),
946 })?;
947 let catalog = sink_table_ref
948 .catalog()
949 .unwrap_or(query_ctx.current_catalog())
950 .to_string();
951 let schema = sink_table_ref
952 .schema()
953 .map(|s| s.to_owned())
954 .unwrap_or(query_ctx.current_schema());
955
956 let sink_table_name = TableName {
957 catalog_name: catalog,
958 schema_name: schema,
959 table_name: sink_table_ref.table().to_string(),
960 };
961
962 let source_table_names = extract_tables_from_query(&create_flow.query)
963 .map(|name| {
964 let reference = object_name_to_table_reference(name.clone().into(), true)
965 .with_context(|_| ConvertIdentifierSnafu {
966 ident: name.to_string(),
967 })?;
968 let catalog = reference
969 .catalog()
970 .unwrap_or(query_ctx.current_catalog())
971 .to_string();
972 let schema = reference
973 .schema()
974 .map(|s| s.to_string())
975 .unwrap_or(query_ctx.current_schema());
976
977 let table_name = TableName {
978 catalog_name: catalog,
979 schema_name: schema,
980 table_name: reference.table().to_string(),
981 };
982 Ok(table_name)
983 })
984 .collect::<Result<Vec<_>>>()?;
985
986 let eval_interval = create_flow.eval_interval;
987
988 Ok(CreateFlowExpr {
989 catalog_name: query_ctx.current_catalog().to_string(),
990 flow_name: sanitize_flow_name(create_flow.flow_name)?,
991 source_table_names,
992 sink_table_name: Some(sink_table_name),
993 or_replace: create_flow.or_replace,
994 create_if_not_exists: create_flow.if_not_exists,
995 expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
996 eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }),
997 comment: create_flow.comment.unwrap_or_default(),
998 sql: create_flow.query.to_string(),
999 flow_options: Default::default(),
1000 })
1001}
1002
1003fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
1005 ensure!(
1006 flow_name.0.len() == 1,
1007 InvalidFlowNameSnafu {
1008 name: flow_name.to_string(),
1009 }
1010 );
1011 Ok(flow_name.0.swap_remove(0).to_string_unquoted())
1013}
1014
1015#[cfg(test)]
1016mod tests {
1017 use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
1018 use datatypes::value::Value;
1019 use session::context::{QueryContext, QueryContextBuilder};
1020 use sql::dialect::GreptimeDbDialect;
1021 use sql::parser::{ParseOptions, ParserContext};
1022 use sql::statements::statement::Statement;
1023 use store_api::storage::ColumnDefaultConstraint;
1024
1025 use super::*;
1026
1027 #[test]
1028 fn test_create_flow_tql_expr() {
1029 let sql = r#"
1030CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1031TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#;
1032 let stmt =
1033 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1034
1035 assert!(
1036 stmt.is_err(),
1037 "Expected error for invalid TQL EVAL parameters: {:#?}",
1038 stmt
1039 );
1040
1041 let sql = r#"
1042CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1043TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests);"#;
1044 let stmt =
1045 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1046 .unwrap()
1047 .pop()
1048 .unwrap();
1049
1050 let Statement::CreateFlow(create_flow) = stmt else {
1051 unreachable!()
1052 };
1053 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1054
1055 let to_dot_sep =
1056 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1057 assert_eq!("calc_reqs", expr.flow_name);
1058 assert_eq!("greptime", expr.catalog_name);
1059 assert_eq!(
1060 "greptime.public.cnt_reqs",
1061 expr.sink_table_name.map(to_dot_sep).unwrap()
1062 );
1063 assert!(expr.source_table_names.is_empty());
1064 assert_eq!(
1065 r#"TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests)"#,
1066 expr.sql
1067 );
1068 }
1069
1070 #[test]
1071 fn test_create_flow_expr() {
1072 let sql = r"
1073CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
1074SELECT
1075 DISTINCT number as dis
1076FROM
1077 distinct_basic;";
1078 let stmt =
1079 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1080 .unwrap()
1081 .pop()
1082 .unwrap();
1083
1084 let Statement::CreateFlow(create_flow) = stmt else {
1085 unreachable!()
1086 };
1087 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1088
1089 let to_dot_sep =
1090 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1091 assert_eq!("test_distinct_basic", expr.flow_name);
1092 assert_eq!("greptime", expr.catalog_name);
1093 assert_eq!(
1094 "greptime.public.out_distinct_basic",
1095 expr.sink_table_name.map(to_dot_sep).unwrap()
1096 );
1097 assert_eq!(1, expr.source_table_names.len());
1098 assert_eq!(
1099 "greptime.public.distinct_basic",
1100 to_dot_sep(expr.source_table_names[0].clone())
1101 );
1102 assert_eq!(
1103 r"SELECT
1104 DISTINCT number as dis
1105FROM
1106 distinct_basic",
1107 expr.sql
1108 );
1109
1110 let sql = r"
1111CREATE FLOW `task_2`
1112SINK TO schema_1.table_1
1113AS
1114SELECT max(c1), min(c2) FROM schema_2.table_2;";
1115 let stmt =
1116 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1117 .unwrap()
1118 .pop()
1119 .unwrap();
1120
1121 let Statement::CreateFlow(create_flow) = stmt else {
1122 unreachable!()
1123 };
1124 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1125
1126 let to_dot_sep =
1127 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1128 assert_eq!("task_2", expr.flow_name);
1129 assert_eq!("greptime", expr.catalog_name);
1130 assert_eq!(
1131 "greptime.schema_1.table_1",
1132 expr.sink_table_name.map(to_dot_sep).unwrap()
1133 );
1134 assert_eq!(1, expr.source_table_names.len());
1135 assert_eq!(
1136 "greptime.schema_2.table_2",
1137 to_dot_sep(expr.source_table_names[0].clone())
1138 );
1139 assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
1140
1141 let sql = r"
1142CREATE FLOW abc.`task_2`
1143SINK TO schema_1.table_1
1144AS
1145SELECT max(c1), min(c2) FROM schema_2.table_2;";
1146 let stmt =
1147 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1148 .unwrap()
1149 .pop()
1150 .unwrap();
1151
1152 let Statement::CreateFlow(create_flow) = stmt else {
1153 unreachable!()
1154 };
1155 let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
1156
1157 assert!(res.is_err());
1158 assert!(
1159 res.unwrap_err()
1160 .to_string()
1161 .contains("Invalid flow name: abc.`task_2`")
1162 );
1163 }
1164
1165 #[test]
1166 fn test_create_to_expr() {
1167 let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(ttl='3days', write_buffer_size='1024KB');";
1168 let stmt =
1169 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1170 .unwrap()
1171 .pop()
1172 .unwrap();
1173
1174 let Statement::CreateTable(create_table) = stmt else {
1175 unreachable!()
1176 };
1177 let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1178 assert_eq!("3days", expr.table_options.get("ttl").unwrap());
1179 assert_eq!(
1180 "1.0MiB",
1181 expr.table_options.get("write_buffer_size").unwrap()
1182 );
1183 }
1184
1185 #[test]
1186 fn test_invalid_create_to_expr() {
1187 let cases = [
1188 "CREATE TABLE monitor (host STRING primary key, ts TIMESTAMP TIME INDEX, some_column text, some_column string);",
1190 "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, some_column STRING, PRIMARY KEY (some_column, host, some_column));",
1192 "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, PRIMARY KEY (host, ts));",
1194 ];
1195
1196 for sql in cases {
1197 let stmt = ParserContext::create_with_dialect(
1198 sql,
1199 &GreptimeDbDialect {},
1200 ParseOptions::default(),
1201 )
1202 .unwrap()
1203 .pop()
1204 .unwrap();
1205 let Statement::CreateTable(create_table) = stmt else {
1206 unreachable!()
1207 };
1208 create_to_expr(&create_table, &QueryContext::arc()).unwrap_err();
1209 }
1210 }
1211
1212 #[test]
1213 fn test_create_to_expr_with_default_timestamp_value() {
1214 let sql = "CREATE TABLE monitor (v double,ts TIMESTAMP default '2024-01-30T00:01:01',TIME INDEX (ts)) engine=mito;";
1215 let stmt =
1216 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1217 .unwrap()
1218 .pop()
1219 .unwrap();
1220
1221 let Statement::CreateTable(create_table) = stmt else {
1222 unreachable!()
1223 };
1224
1225 let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1227 let ts_column = &expr.column_defs[1];
1228 let constraint = assert_ts_column(ts_column);
1229 assert!(
1230 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1231 if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1232 );
1233
1234 let ctx = QueryContextBuilder::default()
1236 .timezone(Timezone::from_tz_string("+08:00").unwrap())
1237 .build()
1238 .into();
1239 let expr = create_to_expr(&create_table, &ctx).unwrap();
1240 let ts_column = &expr.column_defs[1];
1241 let constraint = assert_ts_column(ts_column);
1242 assert!(
1243 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1244 if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1245 );
1246 }
1247
1248 fn assert_ts_column(ts_column: &api::v1::ColumnDef) -> ColumnDefaultConstraint {
1249 assert_eq!("ts", ts_column.name);
1250 assert_eq!(
1251 ColumnDataType::TimestampMillisecond as i32,
1252 ts_column.data_type
1253 );
1254 assert!(!ts_column.default_constraint.is_empty());
1255
1256 ColumnDefaultConstraint::try_from(&ts_column.default_constraint[..]).unwrap()
1257 }
1258
1259 #[test]
1260 fn test_to_alter_expr() {
1261 let sql = "ALTER DATABASE greptime SET key1='value1', key2='value2';";
1262 let stmt =
1263 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1264 .unwrap()
1265 .pop()
1266 .unwrap();
1267
1268 let Statement::AlterDatabase(alter_database) = stmt else {
1269 unreachable!()
1270 };
1271
1272 let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1273 let kind = expr.kind.unwrap();
1274
1275 let AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
1276 set_database_options,
1277 }) = kind
1278 else {
1279 unreachable!()
1280 };
1281
1282 assert_eq!(2, set_database_options.len());
1283 assert_eq!("key1", set_database_options[0].key);
1284 assert_eq!("value1", set_database_options[0].value);
1285 assert_eq!("key2", set_database_options[1].key);
1286 assert_eq!("value2", set_database_options[1].value);
1287
1288 let sql = "ALTER DATABASE greptime UNSET key1, key2;";
1289 let stmt =
1290 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1291 .unwrap()
1292 .pop()
1293 .unwrap();
1294
1295 let Statement::AlterDatabase(alter_database) = stmt else {
1296 unreachable!()
1297 };
1298
1299 let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1300 let kind = expr.kind.unwrap();
1301
1302 let AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys }) = kind else {
1303 unreachable!()
1304 };
1305
1306 assert_eq!(2, keys.len());
1307 assert!(keys.contains(&"key1".to_string()));
1308 assert!(keys.contains(&"key2".to_string()));
1309
1310 let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
1311 let stmt =
1312 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1313 .unwrap()
1314 .pop()
1315 .unwrap();
1316
1317 let Statement::AlterTable(alter_table) = stmt else {
1318 unreachable!()
1319 };
1320
1321 let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1323 let kind = expr.kind.unwrap();
1324
1325 let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1326 unreachable!()
1327 };
1328
1329 assert_eq!(1, add_columns.len());
1330 let ts_column = add_columns[0].column_def.clone().unwrap();
1331 let constraint = assert_ts_column(&ts_column);
1332 assert!(
1333 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1334 if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1335 );
1336
1337 let ctx = QueryContextBuilder::default()
1340 .timezone(Timezone::from_tz_string("+08:00").unwrap())
1341 .build()
1342 .into();
1343 let expr = to_alter_table_expr(alter_table, &ctx).unwrap();
1344 let kind = expr.kind.unwrap();
1345
1346 let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1347 unreachable!()
1348 };
1349
1350 assert_eq!(1, add_columns.len());
1351 let ts_column = add_columns[0].column_def.clone().unwrap();
1352 let constraint = assert_ts_column(&ts_column);
1353 assert!(
1354 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1355 if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1356 );
1357 }
1358
1359 #[test]
1360 fn test_to_alter_modify_column_type_expr() {
1361 let sql = "ALTER TABLE monitor MODIFY COLUMN mem_usage STRING;";
1362 let stmt =
1363 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1364 .unwrap()
1365 .pop()
1366 .unwrap();
1367
1368 let Statement::AlterTable(alter_table) = stmt else {
1369 unreachable!()
1370 };
1371
1372 let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1374 let kind = expr.kind.unwrap();
1375
1376 let AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
1377 modify_column_types,
1378 }) = kind
1379 else {
1380 unreachable!()
1381 };
1382
1383 assert_eq!(1, modify_column_types.len());
1384 let modify_column_type = &modify_column_types[0];
1385
1386 assert_eq!("mem_usage", modify_column_type.column_name);
1387 assert_eq!(
1388 ColumnDataType::String as i32,
1389 modify_column_type.target_type
1390 );
1391 assert!(modify_column_type.target_type_extension.is_none());
1392 }
1393
1394 fn new_test_table_names() -> Vec<TableName> {
1395 vec![
1396 TableName {
1397 catalog_name: "greptime".to_string(),
1398 schema_name: "public".to_string(),
1399 table_name: "a_table".to_string(),
1400 },
1401 TableName {
1402 catalog_name: "greptime".to_string(),
1403 schema_name: "public".to_string(),
1404 table_name: "b_table".to_string(),
1405 },
1406 ]
1407 }
1408
1409 #[test]
1410 fn test_to_create_view_expr() {
1411 let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
1412 let stmt =
1413 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1414 .unwrap()
1415 .pop()
1416 .unwrap();
1417
1418 let Statement::CreateView(stmt) = stmt else {
1419 unreachable!()
1420 };
1421
1422 let logical_plan = vec![1, 2, 3];
1423 let table_names = new_test_table_names();
1424 let columns = vec!["a".to_string()];
1425 let plan_columns = vec!["number".to_string()];
1426
1427 let expr = to_create_view_expr(
1428 stmt,
1429 logical_plan.clone(),
1430 table_names.clone(),
1431 columns.clone(),
1432 plan_columns.clone(),
1433 sql.to_string(),
1434 QueryContext::arc(),
1435 )
1436 .unwrap();
1437
1438 assert_eq!("greptime", expr.catalog_name);
1439 assert_eq!("public", expr.schema_name);
1440 assert_eq!("test", expr.view_name);
1441 assert!(!expr.create_if_not_exists);
1442 assert!(!expr.or_replace);
1443 assert_eq!(logical_plan, expr.logical_plan);
1444 assert_eq!(table_names, expr.table_names);
1445 assert_eq!(sql, expr.definition);
1446 assert_eq!(columns, expr.columns);
1447 assert_eq!(plan_columns, expr.plan_columns);
1448 }
1449
1450 #[test]
1451 fn test_to_create_view_expr_complex() {
1452 let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test.test_view AS SELECT * FROM NUMBERS";
1453 let stmt =
1454 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1455 .unwrap()
1456 .pop()
1457 .unwrap();
1458
1459 let Statement::CreateView(stmt) = stmt else {
1460 unreachable!()
1461 };
1462
1463 let logical_plan = vec![1, 2, 3];
1464 let table_names = new_test_table_names();
1465 let columns = vec!["a".to_string()];
1466 let plan_columns = vec!["number".to_string()];
1467
1468 let expr = to_create_view_expr(
1469 stmt,
1470 logical_plan.clone(),
1471 table_names.clone(),
1472 columns.clone(),
1473 plan_columns.clone(),
1474 sql.to_string(),
1475 QueryContext::arc(),
1476 )
1477 .unwrap();
1478
1479 assert_eq!("greptime", expr.catalog_name);
1480 assert_eq!("test", expr.schema_name);
1481 assert_eq!("test_view", expr.view_name);
1482 assert!(expr.create_if_not_exists);
1483 assert!(expr.or_replace);
1484 assert_eq!(logical_plan, expr.logical_plan);
1485 assert_eq!(table_names, expr.table_names);
1486 assert_eq!(sql, expr.definition);
1487 assert_eq!(columns, expr.columns);
1488 assert_eq!(plan_columns, expr.plan_columns);
1489 }
1490
1491 #[test]
1492 fn test_expr_to_create() {
1493 let sql = r#"CREATE TABLE IF NOT EXISTS `tt` (
1494 `timestamp` TIMESTAMP(9) NOT NULL,
1495 `ip_address` STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),
1496 `username` STRING NULL,
1497 `http_method` STRING NULL INVERTED INDEX,
1498 `request_line` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),
1499 `protocol` STRING NULL,
1500 `status_code` INT NULL INVERTED INDEX,
1501 `response_size` BIGINT NULL,
1502 `message` STRING NULL,
1503 TIME INDEX (`timestamp`),
1504 PRIMARY KEY (`username`, `status_code`)
1505)
1506ENGINE=mito
1507WITH(
1508 append_mode = 'true'
1509)"#;
1510 let stmt =
1511 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1512 .unwrap()
1513 .pop()
1514 .unwrap();
1515
1516 let Statement::CreateTable(original_create) = stmt else {
1517 unreachable!()
1518 };
1519
1520 let expr = create_to_expr(&original_create, &QueryContext::arc()).unwrap();
1522
1523 let create_table = expr_to_create(&expr, Some('`')).unwrap();
1524 let new_sql = format!("{:#}", create_table);
1525 assert_eq!(sql, new_sql);
1526 }
1527}