1#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19
20use api::helper::ColumnDataTypeWrapper;
21use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
22use api::v1::alter_table_expr::Kind as AlterTableKind;
23use api::v1::column_def::{options_from_column_schema, try_as_column_schema};
24use api::v1::{
25 AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer, ColumnDataType,
26 ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn,
27 DropColumns, DropDefaults, ExpireAfter, FulltextBackend as PbFulltextBackend, ModifyColumnType,
28 ModifyColumnTypes, RenameTable, SemanticType, SetDatabaseOptions, SetDefaults, SetFulltext,
29 SetIndex, SetIndexes, SetInverted, SetSkipping, SetTableOptions,
30 SkippingIndexType as PbSkippingIndexType, TableName, UnsetDatabaseOptions, UnsetFulltext,
31 UnsetIndex, UnsetIndexes, UnsetInverted, UnsetSkipping, UnsetTableOptions, set_index,
32 unset_index,
33};
34use common_error::ext::BoxedError;
35use common_grpc_expr::util::ColumnExpr;
36use common_time::Timezone;
37use datafusion::sql::planner::object_name_to_table_reference;
38use datatypes::schema::{
39 COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
40 COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
41 COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
42 COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
43 ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema,
44 SkippingIndexType,
45};
46use file_engine::FileOptions;
47use query::sql::{
48 check_file_to_table_schema_compatibility, file_column_schemas_to_table,
49 infer_file_table_schema, prepare_file_table_files,
50};
51use session::context::QueryContextRef;
52use session::table_name::table_idents_to_full_name;
53use snafu::{OptionExt, ResultExt, ensure};
54use sql::ast::{
55 ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName, ObjectNamePartExt,
56};
57use sql::dialect::GreptimeDbDialect;
58use sql::parser::ParserContext;
59use sql::statements::alter::{
60 AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
61};
62use sql::statements::create::{
63 Column as SqlColumn, ColumnExtensions, CreateExternalTable, CreateFlow, CreateTable,
64 CreateView, TableConstraint,
65};
66use sql::statements::{
67 OptionMap, column_to_schema, concrete_data_type_to_sql_data_type,
68 sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type, value_to_sql_value,
69};
70use sql::util::extract_tables_from_query;
71use table::requests::{FILE_TABLE_META_KEY, TableOptions};
72use table::table_reference::TableReference;
73#[cfg(feature = "enterprise")]
74pub use trigger::to_create_trigger_task_expr;
75
76use crate::error::{
77 BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
78 ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, FindNewColumnsOnInsertionSnafu,
79 IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidColumnDefSnafu,
80 InvalidFlowNameSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, ParseSqlValueSnafu,
81 PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu,
82};
83
84pub fn create_table_expr_by_column_schemas(
85 table_name: &TableReference<'_>,
86 column_schemas: &[api::v1::ColumnSchema],
87 engine: &str,
88 desc: Option<&str>,
89) -> Result<CreateTableExpr> {
90 let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
91 let expr = common_grpc_expr::util::build_create_table_expr(
92 None,
93 table_name,
94 column_exprs,
95 engine,
96 desc.unwrap_or("Created on insertion"),
97 )
98 .context(BuildCreateExprOnInsertionSnafu)?;
99
100 validate_create_expr(&expr)?;
101 Ok(expr)
102}
103
104pub fn extract_add_columns_expr(
105 schema: &Schema,
106 column_exprs: Vec<ColumnExpr>,
107) -> Result<Option<AddColumns>> {
108 let add_columns = common_grpc_expr::util::extract_new_columns(schema, column_exprs)
109 .context(FindNewColumnsOnInsertionSnafu)?;
110 if let Some(add_columns) = &add_columns {
111 validate_add_columns_expr(add_columns)?;
112 }
113 Ok(add_columns)
114}
115
116pub(crate) async fn create_external_expr(
140 create: CreateExternalTable,
141 query_ctx: &QueryContextRef,
142) -> Result<CreateTableExpr> {
143 let (catalog_name, schema_name, table_name) =
144 table_idents_to_full_name(&create.name, query_ctx)
145 .map_err(BoxedError::new)
146 .context(ExternalSnafu)?;
147
148 let mut table_options = create.options.into_map();
149
150 let (object_store, files) = prepare_file_table_files(&table_options)
151 .await
152 .context(PrepareFileTableSnafu)?;
153
154 let file_column_schemas = infer_file_table_schema(&object_store, &files, &table_options)
155 .await
156 .context(InferFileTableSchemaSnafu)?
157 .column_schemas;
158
159 let (time_index, primary_keys, table_column_schemas) = if !create.columns.is_empty() {
160 let time_index = find_time_index(&create.constraints)?;
162 let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
163 let column_schemas =
164 columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
165 (time_index, primary_keys, column_schemas)
166 } else {
167 let (column_schemas, time_index) = file_column_schemas_to_table(&file_column_schemas);
169 let primary_keys = vec![];
170 (time_index, primary_keys, column_schemas)
171 };
172
173 check_file_to_table_schema_compatibility(&file_column_schemas, &table_column_schemas)
174 .context(SchemaIncompatibleSnafu)?;
175
176 let meta = FileOptions {
177 files,
178 file_column_schemas,
179 };
180 table_options.insert(
181 FILE_TABLE_META_KEY.to_string(),
182 serde_json::to_string(&meta).context(EncodeJsonSnafu)?,
183 );
184
185 let column_defs = column_schemas_to_defs(table_column_schemas, &primary_keys)?;
186 let expr = CreateTableExpr {
187 catalog_name,
188 schema_name,
189 table_name,
190 desc: String::default(),
191 column_defs,
192 time_index,
193 primary_keys,
194 create_if_not_exists: create.if_not_exists,
195 table_options,
196 table_id: None,
197 engine: create.engine.clone(),
198 };
199
200 Ok(expr)
201}
202
203pub fn create_to_expr(
205 create: &CreateTable,
206 query_ctx: &QueryContextRef,
207) -> Result<CreateTableExpr> {
208 let (catalog_name, schema_name, table_name) =
209 table_idents_to_full_name(&create.name, query_ctx)
210 .map_err(BoxedError::new)
211 .context(ExternalSnafu)?;
212
213 let time_index = find_time_index(&create.constraints)?;
214 let table_options = HashMap::from(
215 &TableOptions::try_from_iter(create.options.to_str_map())
216 .context(UnrecognizedTableOptionSnafu)?,
217 );
218
219 let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
220
221 let expr = CreateTableExpr {
222 catalog_name,
223 schema_name,
224 table_name,
225 desc: String::default(),
226 column_defs: columns_to_expr(
227 &create.columns,
228 &time_index,
229 &primary_keys,
230 Some(&query_ctx.timezone()),
231 )?,
232 time_index,
233 primary_keys,
234 create_if_not_exists: create.if_not_exists,
235 table_options,
236 table_id: None,
237 engine: create.engine.clone(),
238 };
239
240 validate_create_expr(&expr)?;
241 Ok(expr)
242}
243
244pub fn expr_to_create(expr: &CreateTableExpr, quote_style: Option<char>) -> Result<CreateTable> {
252 let quote_style = quote_style.unwrap_or('`');
253
254 let table_name = ObjectName(vec![sql::ast::ObjectNamePart::Identifier(
256 sql::ast::Ident::with_quote(quote_style, &expr.table_name),
257 )]);
258
259 let mut columns = Vec::with_capacity(expr.column_defs.len());
261 for column_def in &expr.column_defs {
262 let column_schema = try_as_column_schema(column_def).context(InvalidColumnDefSnafu {
263 column: &column_def.name,
264 })?;
265
266 let mut options = Vec::new();
267
268 if column_def.is_nullable {
270 options.push(ColumnOptionDef {
271 name: None,
272 option: ColumnOption::Null,
273 });
274 } else {
275 options.push(ColumnOptionDef {
276 name: None,
277 option: ColumnOption::NotNull,
278 });
279 }
280
281 if let Some(default_constraint) = column_schema.default_constraint() {
283 let expr = match default_constraint {
284 ColumnDefaultConstraint::Value(v) => {
285 Expr::Value(value_to_sql_value(v).context(ParseSqlValueSnafu)?.into())
286 }
287 ColumnDefaultConstraint::Function(func_expr) => {
288 ParserContext::parse_function(func_expr, &GreptimeDbDialect {})
289 .context(ParseSqlSnafu)?
290 }
291 };
292 options.push(ColumnOptionDef {
293 name: None,
294 option: ColumnOption::Default(expr),
295 });
296 }
297
298 if !column_def.comment.is_empty() {
300 options.push(ColumnOptionDef {
301 name: None,
302 option: ColumnOption::Comment(column_def.comment.clone()),
303 });
304 }
305
306 let mut extensions = ColumnExtensions::default();
311
312 if let Ok(Some(opt)) = column_schema.fulltext_options()
314 && opt.enable
315 {
316 let mut map = HashMap::from([
317 (
318 COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
319 opt.analyzer.to_string(),
320 ),
321 (
322 COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
323 opt.case_sensitive.to_string(),
324 ),
325 (
326 COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
327 opt.backend.to_string(),
328 ),
329 ]);
330 if opt.backend == FulltextBackend::Bloom {
331 map.insert(
332 COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(),
333 opt.granularity.to_string(),
334 );
335 map.insert(
336 COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
337 opt.false_positive_rate().to_string(),
338 );
339 }
340 extensions.fulltext_index_options = Some(map.into());
341 }
342
343 if let Ok(Some(opt)) = column_schema.skipping_index_options() {
345 let map = HashMap::from([
346 (
347 COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
348 opt.granularity.to_string(),
349 ),
350 (
351 COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
352 opt.false_positive_rate().to_string(),
353 ),
354 (
355 COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
356 opt.index_type.to_string(),
357 ),
358 ]);
359 extensions.skipping_index_options = Some(map.into());
360 }
361
362 if column_schema.is_inverted_indexed() {
364 extensions.inverted_index_options = Some(HashMap::new().into());
365 }
366
367 let sql_column = SqlColumn {
368 column_def: ColumnDef {
369 name: Ident::with_quote(quote_style, &column_def.name),
370 data_type: concrete_data_type_to_sql_data_type(&column_schema.data_type)
371 .context(ParseSqlSnafu)?,
372 options,
373 },
374 extensions,
375 };
376
377 columns.push(sql_column);
378 }
379
380 let mut constraints = Vec::new();
382
383 constraints.push(TableConstraint::TimeIndex {
385 column: Ident::with_quote(quote_style, &expr.time_index),
386 });
387
388 if !expr.primary_keys.is_empty() {
390 let primary_key_columns: Vec<Ident> = expr
391 .primary_keys
392 .iter()
393 .map(|pk| Ident::with_quote(quote_style, pk))
394 .collect();
395
396 constraints.push(TableConstraint::PrimaryKey {
397 columns: primary_key_columns,
398 });
399 }
400
401 let mut options = OptionMap::default();
403 for (key, value) in &expr.table_options {
404 options.insert(key.clone(), value.clone());
405 }
406
407 Ok(CreateTable {
408 if_not_exists: expr.create_if_not_exists,
409 table_id: expr.table_id.as_ref().map(|tid| tid.id).unwrap_or(0),
410 name: table_name,
411 columns,
412 engine: expr.engine.clone(),
413 constraints,
414 options,
415 partitions: None,
416 })
417}
418
419pub fn validate_create_expr(create: &CreateTableExpr) -> Result<()> {
421 let mut column_to_indices = HashMap::with_capacity(create.column_defs.len());
423 for (idx, column) in create.column_defs.iter().enumerate() {
424 if let Some(indices) = column_to_indices.get(&column.name) {
425 return InvalidSqlSnafu {
426 err_msg: format!(
427 "column name `{}` is duplicated at index {} and {}",
428 column.name, indices, idx
429 ),
430 }
431 .fail();
432 }
433 column_to_indices.insert(&column.name, idx);
434 }
435
436 let _ = column_to_indices
438 .get(&create.time_index)
439 .with_context(|| InvalidSqlSnafu {
440 err_msg: format!(
441 "column name `{}` is not found in column list",
442 create.time_index
443 ),
444 })?;
445
446 for pk in &create.primary_keys {
448 let _ = column_to_indices
449 .get(&pk)
450 .with_context(|| InvalidSqlSnafu {
451 err_msg: format!("column name `{}` is not found in column list", pk),
452 })?;
453 }
454
455 let mut pk_set = HashSet::new();
457 for pk in &create.primary_keys {
458 if !pk_set.insert(pk) {
459 return InvalidSqlSnafu {
460 err_msg: format!("column name `{}` is duplicated in primary keys", pk),
461 }
462 .fail();
463 }
464 }
465
466 if pk_set.contains(&create.time_index) {
468 return InvalidSqlSnafu {
469 err_msg: format!(
470 "column name `{}` is both primary key and time index",
471 create.time_index
472 ),
473 }
474 .fail();
475 }
476
477 for column in &create.column_defs {
478 if is_interval_type(&column.data_type()) {
480 return InvalidSqlSnafu {
481 err_msg: format!(
482 "column name `{}` is interval type, which is not supported",
483 column.name
484 ),
485 }
486 .fail();
487 }
488 if is_date_time_type(&column.data_type()) {
490 return InvalidSqlSnafu {
491 err_msg: format!(
492 "column name `{}` is datetime type, which is not supported, please use `timestamp` type instead",
493 column.name
494 ),
495 }
496 .fail();
497 }
498 }
499 Ok(())
500}
501
502fn validate_add_columns_expr(add_columns: &AddColumns) -> Result<()> {
503 for add_column in &add_columns.add_columns {
504 let Some(column_def) = &add_column.column_def else {
505 continue;
506 };
507 if is_date_time_type(&column_def.data_type()) {
508 return InvalidSqlSnafu {
509 err_msg: format!("column name `{}` is datetime type, which is not supported, please use `timestamp` type instead", column_def.name),
510 }
511 .fail();
512 }
513 if is_interval_type(&column_def.data_type()) {
514 return InvalidSqlSnafu {
515 err_msg: format!(
516 "column name `{}` is interval type, which is not supported",
517 column_def.name
518 ),
519 }
520 .fail();
521 }
522 }
523 Ok(())
524}
525
526fn is_date_time_type(data_type: &ColumnDataType) -> bool {
527 matches!(data_type, ColumnDataType::Datetime)
528}
529
530fn is_interval_type(data_type: &ColumnDataType) -> bool {
531 matches!(
532 data_type,
533 ColumnDataType::IntervalYearMonth
534 | ColumnDataType::IntervalDayTime
535 | ColumnDataType::IntervalMonthDayNano
536 )
537}
538
539fn find_primary_keys(
540 columns: &[SqlColumn],
541 constraints: &[TableConstraint],
542) -> Result<Vec<String>> {
543 let columns_pk = columns
544 .iter()
545 .filter_map(|x| {
546 if x.options().iter().any(|o| {
547 matches!(
548 o.option,
549 ColumnOption::Unique {
550 is_primary: true,
551 ..
552 }
553 )
554 }) {
555 Some(x.name().value.clone())
556 } else {
557 None
558 }
559 })
560 .collect::<Vec<String>>();
561
562 ensure!(
563 columns_pk.len() <= 1,
564 IllegalPrimaryKeysDefSnafu {
565 msg: "not allowed to inline multiple primary keys in columns options"
566 }
567 );
568
569 let constraints_pk = constraints
570 .iter()
571 .filter_map(|constraint| match constraint {
572 TableConstraint::PrimaryKey { columns, .. } => {
573 Some(columns.iter().map(|ident| ident.value.clone()))
574 }
575 _ => None,
576 })
577 .flatten()
578 .collect::<Vec<String>>();
579
580 ensure!(
581 columns_pk.is_empty() || constraints_pk.is_empty(),
582 IllegalPrimaryKeysDefSnafu {
583 msg: "found definitions of primary keys in multiple places"
584 }
585 );
586
587 let mut primary_keys = Vec::with_capacity(columns_pk.len() + constraints_pk.len());
588 primary_keys.extend(columns_pk);
589 primary_keys.extend(constraints_pk);
590 Ok(primary_keys)
591}
592
593pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
594 let time_index = constraints
595 .iter()
596 .filter_map(|constraint| match constraint {
597 TableConstraint::TimeIndex { column, .. } => Some(&column.value),
598 _ => None,
599 })
600 .collect::<Vec<&String>>();
601 ensure!(
602 time_index.len() == 1,
603 InvalidSqlSnafu {
604 err_msg: "must have one and only one TimeIndex columns",
605 }
606 );
607 Ok(time_index[0].clone())
608}
609
610fn columns_to_expr(
611 column_defs: &[SqlColumn],
612 time_index: &str,
613 primary_keys: &[String],
614 timezone: Option<&Timezone>,
615) -> Result<Vec<api::v1::ColumnDef>> {
616 let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
617 column_schemas_to_defs(column_schemas, primary_keys)
618}
619
620fn columns_to_column_schemas(
621 columns: &[SqlColumn],
622 time_index: &str,
623 timezone: Option<&Timezone>,
624) -> Result<Vec<ColumnSchema>> {
625 columns
626 .iter()
627 .map(|c| column_to_schema(c, time_index, timezone).context(ParseSqlSnafu))
628 .collect::<Result<Vec<ColumnSchema>>>()
629}
630
631pub fn column_schemas_to_defs(
633 column_schemas: Vec<ColumnSchema>,
634 primary_keys: &[String],
635) -> Result<Vec<api::v1::ColumnDef>> {
636 let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
637 .iter()
638 .map(|c| {
639 ColumnDataTypeWrapper::try_from(c.data_type.clone())
640 .map(|w| w.to_parts())
641 .context(ColumnDataTypeSnafu)
642 })
643 .collect::<Result<Vec<_>>>()?;
644
645 column_schemas
646 .iter()
647 .zip(column_datatypes)
648 .map(|(schema, datatype)| {
649 let semantic_type = if schema.is_time_index() {
650 SemanticType::Timestamp
651 } else if primary_keys.contains(&schema.name) {
652 SemanticType::Tag
653 } else {
654 SemanticType::Field
655 } as i32;
656 let comment = schema
657 .metadata()
658 .get(COMMENT_KEY)
659 .cloned()
660 .unwrap_or_default();
661
662 Ok(api::v1::ColumnDef {
663 name: schema.name.clone(),
664 data_type: datatype.0 as i32,
665 is_nullable: schema.is_nullable(),
666 default_constraint: match schema.default_constraint() {
667 None => vec![],
668 Some(v) => {
669 v.clone()
670 .try_into()
671 .context(ConvertColumnDefaultConstraintSnafu {
672 column_name: &schema.name,
673 })?
674 }
675 },
676 semantic_type,
677 comment,
678 datatype_extension: datatype.1,
679 options: options_from_column_schema(schema),
680 })
681 })
682 .collect()
683}
684
685#[derive(Debug, Clone, PartialEq, Eq)]
686pub struct RepartitionRequest {
687 pub catalog_name: String,
688 pub schema_name: String,
689 pub table_name: String,
690 pub from_exprs: Vec<Expr>,
691 pub into_exprs: Vec<Expr>,
692}
693
694pub(crate) fn to_repartition_request(
695 alter_table: AlterTable,
696 query_ctx: &QueryContextRef,
697) -> Result<RepartitionRequest> {
698 let (catalog_name, schema_name, table_name) =
699 table_idents_to_full_name(alter_table.table_name(), query_ctx)
700 .map_err(BoxedError::new)
701 .context(ExternalSnafu)?;
702
703 let AlterTableOperation::Repartition { operation } = alter_table.alter_operation else {
704 return InvalidSqlSnafu {
705 err_msg: "expected REPARTITION operation",
706 }
707 .fail();
708 };
709
710 Ok(RepartitionRequest {
711 catalog_name,
712 schema_name,
713 table_name,
714 from_exprs: operation.from_exprs,
715 into_exprs: operation.into_exprs,
716 })
717}
718
719pub(crate) fn to_alter_table_expr(
721 alter_table: AlterTable,
722 query_ctx: &QueryContextRef,
723) -> Result<AlterTableExpr> {
724 let (catalog_name, schema_name, table_name) =
725 table_idents_to_full_name(alter_table.table_name(), query_ctx)
726 .map_err(BoxedError::new)
727 .context(ExternalSnafu)?;
728
729 let kind = match alter_table.alter_operation {
730 AlterTableOperation::AddConstraint(_) => {
731 return NotSupportedSnafu {
732 feat: "ADD CONSTRAINT",
733 }
734 .fail();
735 }
736 AlterTableOperation::AddColumns { add_columns } => AlterTableKind::AddColumns(AddColumns {
737 add_columns: add_columns
738 .into_iter()
739 .map(|add_column| {
740 let column_def = sql_column_def_to_grpc_column_def(
741 &add_column.column_def,
742 Some(&query_ctx.timezone()),
743 )
744 .map_err(BoxedError::new)
745 .context(ExternalSnafu)?;
746 if is_interval_type(&column_def.data_type()) {
747 return NotSupportedSnafu {
748 feat: "Add column with interval type",
749 }
750 .fail();
751 }
752 Ok(AddColumn {
753 column_def: Some(column_def),
754 location: add_column.location.as_ref().map(From::from),
755 add_if_not_exists: add_column.add_if_not_exists,
756 })
757 })
758 .collect::<Result<Vec<AddColumn>>>()?,
759 }),
760 AlterTableOperation::ModifyColumnType {
761 column_name,
762 target_type,
763 } => {
764 let target_type =
765 sql_data_type_to_concrete_data_type(&target_type, &Default::default())
766 .context(ParseSqlSnafu)?;
767 let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
768 .map(|w| w.to_parts())
769 .context(ColumnDataTypeSnafu)?;
770 if is_interval_type(&target_type) {
771 return NotSupportedSnafu {
772 feat: "Modify column type to interval type",
773 }
774 .fail();
775 }
776 AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
777 modify_column_types: vec![ModifyColumnType {
778 column_name: column_name.value,
779 target_type: target_type as i32,
780 target_type_extension,
781 }],
782 })
783 }
784 AlterTableOperation::DropColumn { name } => AlterTableKind::DropColumns(DropColumns {
785 drop_columns: vec![DropColumn {
786 name: name.value.clone(),
787 }],
788 }),
789 AlterTableOperation::RenameTable { new_table_name } => {
790 AlterTableKind::RenameTable(RenameTable {
791 new_table_name: new_table_name.clone(),
792 })
793 }
794 AlterTableOperation::SetTableOptions { options } => {
795 AlterTableKind::SetTableOptions(SetTableOptions {
796 table_options: options.into_iter().map(Into::into).collect(),
797 })
798 }
799 AlterTableOperation::UnsetTableOptions { keys } => {
800 AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys })
801 }
802 AlterTableOperation::Repartition { .. } => {
803 return NotSupportedSnafu {
804 feat: "ALTER TABLE ... REPARTITION",
805 }
806 .fail();
807 }
808 AlterTableOperation::SetIndex { options } => {
809 let option = match options {
810 sql::statements::alter::SetIndexOperation::Fulltext {
811 column_name,
812 options,
813 } => SetIndex {
814 options: Some(set_index::Options::Fulltext(SetFulltext {
815 column_name: column_name.value,
816 enable: options.enable,
817 analyzer: match options.analyzer {
818 FulltextAnalyzer::English => Analyzer::English.into(),
819 FulltextAnalyzer::Chinese => Analyzer::Chinese.into(),
820 },
821 case_sensitive: options.case_sensitive,
822 backend: match options.backend {
823 FulltextBackend::Bloom => PbFulltextBackend::Bloom.into(),
824 FulltextBackend::Tantivy => PbFulltextBackend::Tantivy.into(),
825 },
826 granularity: options.granularity as u64,
827 false_positive_rate: options.false_positive_rate(),
828 })),
829 },
830 sql::statements::alter::SetIndexOperation::Inverted { column_name } => SetIndex {
831 options: Some(set_index::Options::Inverted(SetInverted {
832 column_name: column_name.value,
833 })),
834 },
835 sql::statements::alter::SetIndexOperation::Skipping {
836 column_name,
837 options,
838 } => SetIndex {
839 options: Some(set_index::Options::Skipping(SetSkipping {
840 column_name: column_name.value,
841 enable: true,
842 granularity: options.granularity as u64,
843 false_positive_rate: options.false_positive_rate(),
844 skipping_index_type: match options.index_type {
845 SkippingIndexType::BloomFilter => {
846 PbSkippingIndexType::BloomFilter.into()
847 }
848 },
849 })),
850 },
851 };
852 AlterTableKind::SetIndexes(SetIndexes {
853 set_indexes: vec![option],
854 })
855 }
856 AlterTableOperation::UnsetIndex { options } => {
857 let option = match options {
858 sql::statements::alter::UnsetIndexOperation::Fulltext { column_name } => {
859 UnsetIndex {
860 options: Some(unset_index::Options::Fulltext(UnsetFulltext {
861 column_name: column_name.value,
862 })),
863 }
864 }
865 sql::statements::alter::UnsetIndexOperation::Inverted { column_name } => {
866 UnsetIndex {
867 options: Some(unset_index::Options::Inverted(UnsetInverted {
868 column_name: column_name.value,
869 })),
870 }
871 }
872 sql::statements::alter::UnsetIndexOperation::Skipping { column_name } => {
873 UnsetIndex {
874 options: Some(unset_index::Options::Skipping(UnsetSkipping {
875 column_name: column_name.value,
876 })),
877 }
878 }
879 };
880
881 AlterTableKind::UnsetIndexes(UnsetIndexes {
882 unset_indexes: vec![option],
883 })
884 }
885 AlterTableOperation::DropDefaults { columns } => {
886 AlterTableKind::DropDefaults(DropDefaults {
887 drop_defaults: columns
888 .into_iter()
889 .map(|col| {
890 let column_name = col.0.to_string();
891 Ok(api::v1::DropDefault { column_name })
892 })
893 .collect::<Result<Vec<_>>>()?,
894 })
895 }
896 AlterTableOperation::SetDefaults { defaults } => AlterTableKind::SetDefaults(SetDefaults {
897 set_defaults: defaults
898 .into_iter()
899 .map(|col| {
900 let column_name = col.column_name.to_string();
901 let default_constraint = serde_json::to_string(&col.default_constraint)
902 .context(EncodeJsonSnafu)?
903 .into_bytes();
904 Ok(api::v1::SetDefault {
905 column_name,
906 default_constraint,
907 })
908 })
909 .collect::<Result<Vec<_>>>()?,
910 }),
911 };
912
913 Ok(AlterTableExpr {
914 catalog_name,
915 schema_name,
916 table_name,
917 kind: Some(kind),
918 })
919}
920
921pub fn to_alter_database_expr(
923 alter_database: AlterDatabase,
924 query_ctx: &QueryContextRef,
925) -> Result<AlterDatabaseExpr> {
926 let catalog = query_ctx.current_catalog();
927 let schema = alter_database.database_name;
928
929 let kind = match alter_database.alter_operation {
930 AlterDatabaseOperation::SetDatabaseOption { options } => {
931 let options = options.into_iter().map(Into::into).collect();
932 AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
933 set_database_options: options,
934 })
935 }
936 AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
937 AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys })
938 }
939 };
940
941 Ok(AlterDatabaseExpr {
942 catalog_name: catalog.to_string(),
943 schema_name: schema.to_string(),
944 kind: Some(kind),
945 })
946}
947
948pub fn to_create_view_expr(
950 stmt: CreateView,
951 logical_plan: Vec<u8>,
952 table_names: Vec<TableName>,
953 columns: Vec<String>,
954 plan_columns: Vec<String>,
955 definition: String,
956 query_ctx: QueryContextRef,
957) -> Result<CreateViewExpr> {
958 let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx)
959 .map_err(BoxedError::new)
960 .context(ExternalSnafu)?;
961
962 let expr = CreateViewExpr {
963 catalog_name,
964 schema_name,
965 view_name,
966 logical_plan,
967 create_if_not_exists: stmt.if_not_exists,
968 or_replace: stmt.or_replace,
969 table_names,
970 columns,
971 plan_columns,
972 definition,
973 };
974
975 Ok(expr)
976}
977
978pub fn to_create_flow_task_expr(
979 create_flow: CreateFlow,
980 query_ctx: &QueryContextRef,
981) -> Result<CreateFlowExpr> {
982 let sink_table_ref = object_name_to_table_reference(create_flow.sink_table_name.clone(), true)
984 .with_context(|_| ConvertIdentifierSnafu {
985 ident: create_flow.sink_table_name.to_string(),
986 })?;
987 let catalog = sink_table_ref
988 .catalog()
989 .unwrap_or(query_ctx.current_catalog())
990 .to_string();
991 let schema = sink_table_ref
992 .schema()
993 .map(|s| s.to_owned())
994 .unwrap_or(query_ctx.current_schema());
995
996 let sink_table_name = TableName {
997 catalog_name: catalog,
998 schema_name: schema,
999 table_name: sink_table_ref.table().to_string(),
1000 };
1001
1002 let source_table_names = extract_tables_from_query(&create_flow.query)
1003 .map(|name| {
1004 let reference =
1005 object_name_to_table_reference(name.clone(), true).with_context(|_| {
1006 ConvertIdentifierSnafu {
1007 ident: name.to_string(),
1008 }
1009 })?;
1010 let catalog = reference
1011 .catalog()
1012 .unwrap_or(query_ctx.current_catalog())
1013 .to_string();
1014 let schema = reference
1015 .schema()
1016 .map(|s| s.to_string())
1017 .unwrap_or(query_ctx.current_schema());
1018
1019 let table_name = TableName {
1020 catalog_name: catalog,
1021 schema_name: schema,
1022 table_name: reference.table().to_string(),
1023 };
1024 Ok(table_name)
1025 })
1026 .collect::<Result<Vec<_>>>()?;
1027
1028 let eval_interval = create_flow.eval_interval;
1029
1030 Ok(CreateFlowExpr {
1031 catalog_name: query_ctx.current_catalog().to_string(),
1032 flow_name: sanitize_flow_name(create_flow.flow_name)?,
1033 source_table_names,
1034 sink_table_name: Some(sink_table_name),
1035 or_replace: create_flow.or_replace,
1036 create_if_not_exists: create_flow.if_not_exists,
1037 expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
1038 eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }),
1039 comment: create_flow.comment.unwrap_or_default(),
1040 sql: create_flow.query.to_string(),
1041 flow_options: Default::default(),
1042 })
1043}
1044
1045fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
1047 ensure!(
1048 flow_name.0.len() == 1,
1049 InvalidFlowNameSnafu {
1050 name: flow_name.to_string(),
1051 }
1052 );
1053 Ok(flow_name.0.swap_remove(0).to_string_unquoted())
1055}
1056
1057#[cfg(test)]
1058mod tests {
1059 use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
1060 use datatypes::value::Value;
1061 use session::context::{QueryContext, QueryContextBuilder};
1062 use sql::dialect::GreptimeDbDialect;
1063 use sql::parser::{ParseOptions, ParserContext};
1064 use sql::statements::statement::Statement;
1065 use store_api::storage::ColumnDefaultConstraint;
1066
1067 use super::*;
1068
1069 #[test]
1070 fn test_create_flow_tql_expr() {
1071 let sql = r#"
1072CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1073TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#;
1074 let stmt =
1075 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1076
1077 assert!(
1078 stmt.is_err(),
1079 "Expected error for invalid TQL EVAL parameters: {:#?}",
1080 stmt
1081 );
1082
1083 let sql = r#"
1084CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1085TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests);"#;
1086 let stmt =
1087 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1088 .unwrap()
1089 .pop()
1090 .unwrap();
1091
1092 let Statement::CreateFlow(create_flow) = stmt else {
1093 unreachable!()
1094 };
1095 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1096
1097 let to_dot_sep =
1098 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1099 assert_eq!("calc_reqs", expr.flow_name);
1100 assert_eq!("greptime", expr.catalog_name);
1101 assert_eq!(
1102 "greptime.public.cnt_reqs",
1103 expr.sink_table_name.map(to_dot_sep).unwrap()
1104 );
1105 assert!(expr.source_table_names.is_empty());
1106 assert_eq!(
1107 r#"TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests)"#,
1108 expr.sql
1109 );
1110 }
1111
1112 #[test]
1113 fn test_create_flow_expr() {
1114 let sql = r"
1115CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
1116SELECT
1117 DISTINCT number as dis
1118FROM
1119 distinct_basic;";
1120 let stmt =
1121 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1122 .unwrap()
1123 .pop()
1124 .unwrap();
1125
1126 let Statement::CreateFlow(create_flow) = stmt else {
1127 unreachable!()
1128 };
1129 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1130
1131 let to_dot_sep =
1132 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1133 assert_eq!("test_distinct_basic", expr.flow_name);
1134 assert_eq!("greptime", expr.catalog_name);
1135 assert_eq!(
1136 "greptime.public.out_distinct_basic",
1137 expr.sink_table_name.map(to_dot_sep).unwrap()
1138 );
1139 assert_eq!(1, expr.source_table_names.len());
1140 assert_eq!(
1141 "greptime.public.distinct_basic",
1142 to_dot_sep(expr.source_table_names[0].clone())
1143 );
1144 assert_eq!(
1145 r"SELECT
1146 DISTINCT number as dis
1147FROM
1148 distinct_basic",
1149 expr.sql
1150 );
1151
1152 let sql = r"
1153CREATE FLOW `task_2`
1154SINK TO schema_1.table_1
1155AS
1156SELECT max(c1), min(c2) FROM schema_2.table_2;";
1157 let stmt =
1158 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1159 .unwrap()
1160 .pop()
1161 .unwrap();
1162
1163 let Statement::CreateFlow(create_flow) = stmt else {
1164 unreachable!()
1165 };
1166 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1167
1168 let to_dot_sep =
1169 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1170 assert_eq!("task_2", expr.flow_name);
1171 assert_eq!("greptime", expr.catalog_name);
1172 assert_eq!(
1173 "greptime.schema_1.table_1",
1174 expr.sink_table_name.map(to_dot_sep).unwrap()
1175 );
1176 assert_eq!(1, expr.source_table_names.len());
1177 assert_eq!(
1178 "greptime.schema_2.table_2",
1179 to_dot_sep(expr.source_table_names[0].clone())
1180 );
1181 assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
1182
1183 let sql = r"
1184CREATE FLOW abc.`task_2`
1185SINK TO schema_1.table_1
1186AS
1187SELECT max(c1), min(c2) FROM schema_2.table_2;";
1188 let stmt =
1189 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1190 .unwrap()
1191 .pop()
1192 .unwrap();
1193
1194 let Statement::CreateFlow(create_flow) = stmt else {
1195 unreachable!()
1196 };
1197 let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
1198
1199 assert!(res.is_err());
1200 assert!(
1201 res.unwrap_err()
1202 .to_string()
1203 .contains("Invalid flow name: abc.`task_2`")
1204 );
1205 }
1206
1207 #[test]
1208 fn test_create_to_expr() {
1209 let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(ttl='3days', write_buffer_size='1024KB');";
1210 let stmt =
1211 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1212 .unwrap()
1213 .pop()
1214 .unwrap();
1215
1216 let Statement::CreateTable(create_table) = stmt else {
1217 unreachable!()
1218 };
1219 let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1220 assert_eq!("3days", expr.table_options.get("ttl").unwrap());
1221 assert_eq!(
1222 "1.0MiB",
1223 expr.table_options.get("write_buffer_size").unwrap()
1224 );
1225 }
1226
1227 #[test]
1228 fn test_invalid_create_to_expr() {
1229 let cases = [
1230 "CREATE TABLE monitor (host STRING primary key, ts TIMESTAMP TIME INDEX, some_column text, some_column string);",
1232 "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, some_column STRING, PRIMARY KEY (some_column, host, some_column));",
1234 "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, PRIMARY KEY (host, ts));",
1236 ];
1237
1238 for sql in cases {
1239 let stmt = ParserContext::create_with_dialect(
1240 sql,
1241 &GreptimeDbDialect {},
1242 ParseOptions::default(),
1243 )
1244 .unwrap()
1245 .pop()
1246 .unwrap();
1247 let Statement::CreateTable(create_table) = stmt else {
1248 unreachable!()
1249 };
1250 create_to_expr(&create_table, &QueryContext::arc()).unwrap_err();
1251 }
1252 }
1253
1254 #[test]
1255 fn test_create_to_expr_with_default_timestamp_value() {
1256 let sql = "CREATE TABLE monitor (v double,ts TIMESTAMP default '2024-01-30T00:01:01',TIME INDEX (ts)) engine=mito;";
1257 let stmt =
1258 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1259 .unwrap()
1260 .pop()
1261 .unwrap();
1262
1263 let Statement::CreateTable(create_table) = stmt else {
1264 unreachable!()
1265 };
1266
1267 let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1269 let ts_column = &expr.column_defs[1];
1270 let constraint = assert_ts_column(ts_column);
1271 assert!(
1272 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1273 if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1274 );
1275
1276 let ctx = QueryContextBuilder::default()
1278 .timezone(Timezone::from_tz_string("+08:00").unwrap())
1279 .build()
1280 .into();
1281 let expr = create_to_expr(&create_table, &ctx).unwrap();
1282 let ts_column = &expr.column_defs[1];
1283 let constraint = assert_ts_column(ts_column);
1284 assert!(
1285 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1286 if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1287 );
1288 }
1289
1290 fn assert_ts_column(ts_column: &api::v1::ColumnDef) -> ColumnDefaultConstraint {
1291 assert_eq!("ts", ts_column.name);
1292 assert_eq!(
1293 ColumnDataType::TimestampMillisecond as i32,
1294 ts_column.data_type
1295 );
1296 assert!(!ts_column.default_constraint.is_empty());
1297
1298 ColumnDefaultConstraint::try_from(&ts_column.default_constraint[..]).unwrap()
1299 }
1300
1301 #[test]
1302 fn test_to_alter_expr() {
1303 let sql = "ALTER DATABASE greptime SET key1='value1', key2='value2';";
1304 let stmt =
1305 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1306 .unwrap()
1307 .pop()
1308 .unwrap();
1309
1310 let Statement::AlterDatabase(alter_database) = stmt else {
1311 unreachable!()
1312 };
1313
1314 let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1315 let kind = expr.kind.unwrap();
1316
1317 let AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
1318 set_database_options,
1319 }) = kind
1320 else {
1321 unreachable!()
1322 };
1323
1324 assert_eq!(2, set_database_options.len());
1325 assert_eq!("key1", set_database_options[0].key);
1326 assert_eq!("value1", set_database_options[0].value);
1327 assert_eq!("key2", set_database_options[1].key);
1328 assert_eq!("value2", set_database_options[1].value);
1329
1330 let sql = "ALTER DATABASE greptime UNSET key1, key2;";
1331 let stmt =
1332 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1333 .unwrap()
1334 .pop()
1335 .unwrap();
1336
1337 let Statement::AlterDatabase(alter_database) = stmt else {
1338 unreachable!()
1339 };
1340
1341 let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1342 let kind = expr.kind.unwrap();
1343
1344 let AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys }) = kind else {
1345 unreachable!()
1346 };
1347
1348 assert_eq!(2, keys.len());
1349 assert!(keys.contains(&"key1".to_string()));
1350 assert!(keys.contains(&"key2".to_string()));
1351
1352 let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
1353 let stmt =
1354 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1355 .unwrap()
1356 .pop()
1357 .unwrap();
1358
1359 let Statement::AlterTable(alter_table) = stmt else {
1360 unreachable!()
1361 };
1362
1363 let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1365 let kind = expr.kind.unwrap();
1366
1367 let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1368 unreachable!()
1369 };
1370
1371 assert_eq!(1, add_columns.len());
1372 let ts_column = add_columns[0].column_def.clone().unwrap();
1373 let constraint = assert_ts_column(&ts_column);
1374 assert!(
1375 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1376 if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1377 );
1378
1379 let ctx = QueryContextBuilder::default()
1382 .timezone(Timezone::from_tz_string("+08:00").unwrap())
1383 .build()
1384 .into();
1385 let expr = to_alter_table_expr(alter_table, &ctx).unwrap();
1386 let kind = expr.kind.unwrap();
1387
1388 let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1389 unreachable!()
1390 };
1391
1392 assert_eq!(1, add_columns.len());
1393 let ts_column = add_columns[0].column_def.clone().unwrap();
1394 let constraint = assert_ts_column(&ts_column);
1395 assert!(
1396 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1397 if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1398 );
1399 }
1400
1401 #[test]
1402 fn test_to_alter_modify_column_type_expr() {
1403 let sql = "ALTER TABLE monitor MODIFY COLUMN mem_usage STRING;";
1404 let stmt =
1405 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1406 .unwrap()
1407 .pop()
1408 .unwrap();
1409
1410 let Statement::AlterTable(alter_table) = stmt else {
1411 unreachable!()
1412 };
1413
1414 let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1416 let kind = expr.kind.unwrap();
1417
1418 let AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
1419 modify_column_types,
1420 }) = kind
1421 else {
1422 unreachable!()
1423 };
1424
1425 assert_eq!(1, modify_column_types.len());
1426 let modify_column_type = &modify_column_types[0];
1427
1428 assert_eq!("mem_usage", modify_column_type.column_name);
1429 assert_eq!(
1430 ColumnDataType::String as i32,
1431 modify_column_type.target_type
1432 );
1433 assert!(modify_column_type.target_type_extension.is_none());
1434 }
1435
1436 #[test]
1437 fn test_to_repartition_request() {
1438 let sql = r#"
1439ALTER TABLE metrics REPARTITION (
1440 device_id < 100
1441) INTO (
1442 device_id < 100 AND area < 'South',
1443 device_id < 100 AND area >= 'South'
1444);"#;
1445 let stmt =
1446 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1447 .unwrap()
1448 .pop()
1449 .unwrap();
1450
1451 let Statement::AlterTable(alter_table) = stmt else {
1452 unreachable!()
1453 };
1454
1455 let request = to_repartition_request(alter_table, &QueryContext::arc()).unwrap();
1456 assert_eq!("greptime", request.catalog_name);
1457 assert_eq!("public", request.schema_name);
1458 assert_eq!("metrics", request.table_name);
1459 assert_eq!(
1460 request
1461 .from_exprs
1462 .into_iter()
1463 .map(|x| x.to_string())
1464 .collect::<Vec<_>>(),
1465 vec!["device_id < 100".to_string()]
1466 );
1467 assert_eq!(
1468 request
1469 .into_exprs
1470 .into_iter()
1471 .map(|x| x.to_string())
1472 .collect::<Vec<_>>(),
1473 vec![
1474 "device_id < 100 AND area < 'South'".to_string(),
1475 "device_id < 100 AND area >= 'South'".to_string()
1476 ]
1477 );
1478 }
1479
1480 fn new_test_table_names() -> Vec<TableName> {
1481 vec![
1482 TableName {
1483 catalog_name: "greptime".to_string(),
1484 schema_name: "public".to_string(),
1485 table_name: "a_table".to_string(),
1486 },
1487 TableName {
1488 catalog_name: "greptime".to_string(),
1489 schema_name: "public".to_string(),
1490 table_name: "b_table".to_string(),
1491 },
1492 ]
1493 }
1494
1495 #[test]
1496 fn test_to_create_view_expr() {
1497 let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
1498 let stmt =
1499 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1500 .unwrap()
1501 .pop()
1502 .unwrap();
1503
1504 let Statement::CreateView(stmt) = stmt else {
1505 unreachable!()
1506 };
1507
1508 let logical_plan = vec![1, 2, 3];
1509 let table_names = new_test_table_names();
1510 let columns = vec!["a".to_string()];
1511 let plan_columns = vec!["number".to_string()];
1512
1513 let expr = to_create_view_expr(
1514 stmt,
1515 logical_plan.clone(),
1516 table_names.clone(),
1517 columns.clone(),
1518 plan_columns.clone(),
1519 sql.to_string(),
1520 QueryContext::arc(),
1521 )
1522 .unwrap();
1523
1524 assert_eq!("greptime", expr.catalog_name);
1525 assert_eq!("public", expr.schema_name);
1526 assert_eq!("test", expr.view_name);
1527 assert!(!expr.create_if_not_exists);
1528 assert!(!expr.or_replace);
1529 assert_eq!(logical_plan, expr.logical_plan);
1530 assert_eq!(table_names, expr.table_names);
1531 assert_eq!(sql, expr.definition);
1532 assert_eq!(columns, expr.columns);
1533 assert_eq!(plan_columns, expr.plan_columns);
1534 }
1535
1536 #[test]
1537 fn test_to_create_view_expr_complex() {
1538 let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test.test_view AS SELECT * FROM NUMBERS";
1539 let stmt =
1540 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1541 .unwrap()
1542 .pop()
1543 .unwrap();
1544
1545 let Statement::CreateView(stmt) = stmt else {
1546 unreachable!()
1547 };
1548
1549 let logical_plan = vec![1, 2, 3];
1550 let table_names = new_test_table_names();
1551 let columns = vec!["a".to_string()];
1552 let plan_columns = vec!["number".to_string()];
1553
1554 let expr = to_create_view_expr(
1555 stmt,
1556 logical_plan.clone(),
1557 table_names.clone(),
1558 columns.clone(),
1559 plan_columns.clone(),
1560 sql.to_string(),
1561 QueryContext::arc(),
1562 )
1563 .unwrap();
1564
1565 assert_eq!("greptime", expr.catalog_name);
1566 assert_eq!("test", expr.schema_name);
1567 assert_eq!("test_view", expr.view_name);
1568 assert!(expr.create_if_not_exists);
1569 assert!(expr.or_replace);
1570 assert_eq!(logical_plan, expr.logical_plan);
1571 assert_eq!(table_names, expr.table_names);
1572 assert_eq!(sql, expr.definition);
1573 assert_eq!(columns, expr.columns);
1574 assert_eq!(plan_columns, expr.plan_columns);
1575 }
1576
1577 #[test]
1578 fn test_expr_to_create() {
1579 let sql = r#"CREATE TABLE IF NOT EXISTS `tt` (
1580 `timestamp` TIMESTAMP(9) NOT NULL,
1581 `ip_address` STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),
1582 `username` STRING NULL,
1583 `http_method` STRING NULL INVERTED INDEX,
1584 `request_line` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),
1585 `protocol` STRING NULL,
1586 `status_code` INT NULL INVERTED INDEX,
1587 `response_size` BIGINT NULL,
1588 `message` STRING NULL,
1589 TIME INDEX (`timestamp`),
1590 PRIMARY KEY (`username`, `status_code`)
1591)
1592ENGINE=mito
1593WITH(
1594 append_mode = 'true'
1595)"#;
1596 let stmt =
1597 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1598 .unwrap()
1599 .pop()
1600 .unwrap();
1601
1602 let Statement::CreateTable(original_create) = stmt else {
1603 unreachable!()
1604 };
1605
1606 let expr = create_to_expr(&original_create, &QueryContext::arc()).unwrap();
1608
1609 let create_table = expr_to_create(&expr, Some('`')).unwrap();
1610 let new_sql = format!("{:#}", create_table);
1611 assert_eq!(sql, new_sql);
1612 }
1613}