1#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19
20use api::helper::ColumnDataTypeWrapper;
21use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
22use api::v1::alter_table_expr::Kind as AlterTableKind;
23use api::v1::column_def::{options_from_column_schema, try_as_column_schema};
24use api::v1::{
25 AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer, ColumnDataType,
26 ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn,
27 DropColumns, DropDefaults, ExpireAfter, FulltextBackend as PbFulltextBackend, ModifyColumnType,
28 ModifyColumnTypes, RenameTable, SemanticType, SetDatabaseOptions, SetDefaults, SetFulltext,
29 SetIndex, SetIndexes, SetInverted, SetSkipping, SetTableOptions,
30 SkippingIndexType as PbSkippingIndexType, TableName, UnsetDatabaseOptions, UnsetFulltext,
31 UnsetIndex, UnsetIndexes, UnsetInverted, UnsetSkipping, UnsetTableOptions, set_index,
32 unset_index,
33};
34use common_error::ext::BoxedError;
35use common_grpc_expr::util::ColumnExpr;
36use common_time::Timezone;
37use datafusion::sql::planner::object_name_to_table_reference;
38use datatypes::schema::{
39 COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
40 COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
41 COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
42 COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
43 ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema,
44 SkippingIndexType,
45};
46use file_engine::FileOptions;
47use query::sql::{
48 check_file_to_table_schema_compatibility, file_column_schemas_to_table,
49 infer_file_table_schema, prepare_file_table_files,
50};
51use session::context::QueryContextRef;
52use session::table_name::table_idents_to_full_name;
53use snafu::{OptionExt, ResultExt, ensure};
54use sql::ast::{
55 ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName, ObjectNamePartExt,
56};
57use sql::dialect::GreptimeDbDialect;
58use sql::parser::ParserContext;
59use sql::statements::alter::{
60 AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
61};
62use sql::statements::create::{
63 Column as SqlColumn, ColumnExtensions, CreateExternalTable, CreateFlow, CreateTable,
64 CreateView, TableConstraint,
65};
66use sql::statements::{
67 OptionMap, column_to_schema, concrete_data_type_to_sql_data_type,
68 sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type, value_to_sql_value,
69};
70use sql::util::extract_tables_from_query;
71use table::requests::{FILE_TABLE_META_KEY, TableOptions};
72use table::table_reference::TableReference;
73#[cfg(feature = "enterprise")]
74pub use trigger::to_create_trigger_task_expr;
75
76use crate::error::{
77 BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
78 ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, FindNewColumnsOnInsertionSnafu,
79 IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidColumnDefSnafu,
80 InvalidFlowNameSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, ParseSqlValueSnafu,
81 PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu,
82};
83
84pub fn create_table_expr_by_column_schemas(
85 table_name: &TableReference<'_>,
86 column_schemas: &[api::v1::ColumnSchema],
87 engine: &str,
88 desc: Option<&str>,
89) -> Result<CreateTableExpr> {
90 let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
91 let expr = common_grpc_expr::util::build_create_table_expr(
92 None,
93 table_name,
94 column_exprs,
95 engine,
96 desc.unwrap_or("Created on insertion"),
97 )
98 .context(BuildCreateExprOnInsertionSnafu)?;
99
100 validate_create_expr(&expr)?;
101 Ok(expr)
102}
103
104pub fn extract_add_columns_expr(
105 schema: &Schema,
106 column_exprs: Vec<ColumnExpr>,
107) -> Result<Option<AddColumns>> {
108 let add_columns = common_grpc_expr::util::extract_new_columns(schema, column_exprs)
109 .context(FindNewColumnsOnInsertionSnafu)?;
110 if let Some(add_columns) = &add_columns {
111 validate_add_columns_expr(add_columns)?;
112 }
113 Ok(add_columns)
114}
115
116pub(crate) async fn create_external_expr(
140 create: CreateExternalTable,
141 query_ctx: &QueryContextRef,
142) -> Result<CreateTableExpr> {
143 let (catalog_name, schema_name, table_name) =
144 table_idents_to_full_name(&create.name, query_ctx)
145 .map_err(BoxedError::new)
146 .context(ExternalSnafu)?;
147
148 let mut table_options = create.options.into_map();
149
150 let (object_store, files) = prepare_file_table_files(&table_options)
151 .await
152 .context(PrepareFileTableSnafu)?;
153
154 let file_column_schemas = infer_file_table_schema(&object_store, &files, &table_options)
155 .await
156 .context(InferFileTableSchemaSnafu)?
157 .column_schemas;
158
159 let (time_index, primary_keys, table_column_schemas) = if !create.columns.is_empty() {
160 let time_index = find_time_index(&create.constraints)?;
162 let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
163 let column_schemas =
164 columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
165 (time_index, primary_keys, column_schemas)
166 } else {
167 let (column_schemas, time_index) = file_column_schemas_to_table(&file_column_schemas);
169 let primary_keys = vec![];
170 (time_index, primary_keys, column_schemas)
171 };
172
173 check_file_to_table_schema_compatibility(&file_column_schemas, &table_column_schemas)
174 .context(SchemaIncompatibleSnafu)?;
175
176 let meta = FileOptions {
177 files,
178 file_column_schemas,
179 };
180 table_options.insert(
181 FILE_TABLE_META_KEY.to_string(),
182 serde_json::to_string(&meta).context(EncodeJsonSnafu)?,
183 );
184
185 let column_defs = column_schemas_to_defs(table_column_schemas, &primary_keys)?;
186 let expr = CreateTableExpr {
187 catalog_name,
188 schema_name,
189 table_name,
190 desc: String::default(),
191 column_defs,
192 time_index,
193 primary_keys,
194 create_if_not_exists: create.if_not_exists,
195 table_options,
196 table_id: None,
197 engine: create.engine.clone(),
198 };
199
200 Ok(expr)
201}
202
203pub fn create_to_expr(
205 create: &CreateTable,
206 query_ctx: &QueryContextRef,
207) -> Result<CreateTableExpr> {
208 let (catalog_name, schema_name, table_name) =
209 table_idents_to_full_name(&create.name, query_ctx)
210 .map_err(BoxedError::new)
211 .context(ExternalSnafu)?;
212
213 let time_index = find_time_index(&create.constraints)?;
214 let table_options = HashMap::from(
215 &TableOptions::try_from_iter(create.options.to_str_map())
216 .context(UnrecognizedTableOptionSnafu)?,
217 );
218
219 let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
220
221 let expr = CreateTableExpr {
222 catalog_name,
223 schema_name,
224 table_name,
225 desc: String::default(),
226 column_defs: columns_to_expr(
227 &create.columns,
228 &time_index,
229 &primary_keys,
230 Some(&query_ctx.timezone()),
231 )?,
232 time_index,
233 primary_keys,
234 create_if_not_exists: create.if_not_exists,
235 table_options,
236 table_id: None,
237 engine: create.engine.clone(),
238 };
239
240 validate_create_expr(&expr)?;
241 Ok(expr)
242}
243
244pub fn expr_to_create(expr: &CreateTableExpr, quote_style: Option<char>) -> Result<CreateTable> {
252 let quote_style = quote_style.unwrap_or('`');
253
254 let table_name = ObjectName(vec![sql::ast::ObjectNamePart::Identifier(
256 sql::ast::Ident::with_quote(quote_style, &expr.table_name),
257 )]);
258
259 let mut columns = Vec::with_capacity(expr.column_defs.len());
261 for column_def in &expr.column_defs {
262 let column_schema = try_as_column_schema(column_def).context(InvalidColumnDefSnafu {
263 column: &column_def.name,
264 })?;
265
266 let mut options = Vec::new();
267
268 if column_def.is_nullable {
270 options.push(ColumnOptionDef {
271 name: None,
272 option: ColumnOption::Null,
273 });
274 } else {
275 options.push(ColumnOptionDef {
276 name: None,
277 option: ColumnOption::NotNull,
278 });
279 }
280
281 if let Some(default_constraint) = column_schema.default_constraint() {
283 let expr = match default_constraint {
284 ColumnDefaultConstraint::Value(v) => {
285 Expr::Value(value_to_sql_value(v).context(ParseSqlValueSnafu)?.into())
286 }
287 ColumnDefaultConstraint::Function(func_expr) => {
288 ParserContext::parse_function(func_expr, &GreptimeDbDialect {})
289 .context(ParseSqlSnafu)?
290 }
291 };
292 options.push(ColumnOptionDef {
293 name: None,
294 option: ColumnOption::Default(expr),
295 });
296 }
297
298 if !column_def.comment.is_empty() {
300 options.push(ColumnOptionDef {
301 name: None,
302 option: ColumnOption::Comment(column_def.comment.clone()),
303 });
304 }
305
306 let mut extensions = ColumnExtensions::default();
311
312 if let Ok(Some(opt)) = column_schema.fulltext_options()
314 && opt.enable
315 {
316 let mut map = HashMap::from([
317 (
318 COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
319 opt.analyzer.to_string(),
320 ),
321 (
322 COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
323 opt.case_sensitive.to_string(),
324 ),
325 (
326 COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
327 opt.backend.to_string(),
328 ),
329 ]);
330 if opt.backend == FulltextBackend::Bloom {
331 map.insert(
332 COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(),
333 opt.granularity.to_string(),
334 );
335 map.insert(
336 COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
337 opt.false_positive_rate().to_string(),
338 );
339 }
340 extensions.fulltext_index_options = Some(map.into());
341 }
342
343 if let Ok(Some(opt)) = column_schema.skipping_index_options() {
345 let map = HashMap::from([
346 (
347 COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
348 opt.granularity.to_string(),
349 ),
350 (
351 COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
352 opt.false_positive_rate().to_string(),
353 ),
354 (
355 COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
356 opt.index_type.to_string(),
357 ),
358 ]);
359 extensions.skipping_index_options = Some(map.into());
360 }
361
362 if column_schema.is_inverted_indexed() {
364 extensions.inverted_index_options = Some(HashMap::new().into());
365 }
366
367 let sql_column = SqlColumn {
368 column_def: ColumnDef {
369 name: Ident::with_quote(quote_style, &column_def.name),
370 data_type: concrete_data_type_to_sql_data_type(&column_schema.data_type)
371 .context(ParseSqlSnafu)?,
372 options,
373 },
374 extensions,
375 };
376
377 columns.push(sql_column);
378 }
379
380 let mut constraints = Vec::new();
382
383 constraints.push(TableConstraint::TimeIndex {
385 column: Ident::with_quote(quote_style, &expr.time_index),
386 });
387
388 if !expr.primary_keys.is_empty() {
390 let primary_key_columns: Vec<Ident> = expr
391 .primary_keys
392 .iter()
393 .map(|pk| Ident::with_quote(quote_style, pk))
394 .collect();
395
396 constraints.push(TableConstraint::PrimaryKey {
397 columns: primary_key_columns,
398 });
399 }
400
401 let mut options = OptionMap::default();
403 for (key, value) in &expr.table_options {
404 options.insert(key.clone(), value.clone());
405 }
406
407 Ok(CreateTable {
408 if_not_exists: expr.create_if_not_exists,
409 table_id: expr.table_id.as_ref().map(|tid| tid.id).unwrap_or(0),
410 name: table_name,
411 columns,
412 engine: expr.engine.clone(),
413 constraints,
414 options,
415 partitions: None,
416 })
417}
418
419pub fn validate_create_expr(create: &CreateTableExpr) -> Result<()> {
421 let mut column_to_indices = HashMap::with_capacity(create.column_defs.len());
423 for (idx, column) in create.column_defs.iter().enumerate() {
424 if let Some(indices) = column_to_indices.get(&column.name) {
425 return InvalidSqlSnafu {
426 err_msg: format!(
427 "column name `{}` is duplicated at index {} and {}",
428 column.name, indices, idx
429 ),
430 }
431 .fail();
432 }
433 column_to_indices.insert(&column.name, idx);
434 }
435
436 let _ = column_to_indices
438 .get(&create.time_index)
439 .with_context(|| InvalidSqlSnafu {
440 err_msg: format!(
441 "column name `{}` is not found in column list",
442 create.time_index
443 ),
444 })?;
445
446 for pk in &create.primary_keys {
448 let _ = column_to_indices
449 .get(&pk)
450 .with_context(|| InvalidSqlSnafu {
451 err_msg: format!("column name `{}` is not found in column list", pk),
452 })?;
453 }
454
455 let mut pk_set = HashSet::new();
457 for pk in &create.primary_keys {
458 if !pk_set.insert(pk) {
459 return InvalidSqlSnafu {
460 err_msg: format!("column name `{}` is duplicated in primary keys", pk),
461 }
462 .fail();
463 }
464 }
465
466 if pk_set.contains(&create.time_index) {
468 return InvalidSqlSnafu {
469 err_msg: format!(
470 "column name `{}` is both primary key and time index",
471 create.time_index
472 ),
473 }
474 .fail();
475 }
476
477 for column in &create.column_defs {
478 if is_interval_type(&column.data_type()) {
480 return InvalidSqlSnafu {
481 err_msg: format!(
482 "column name `{}` is interval type, which is not supported",
483 column.name
484 ),
485 }
486 .fail();
487 }
488 if is_date_time_type(&column.data_type()) {
490 return InvalidSqlSnafu {
491 err_msg: format!(
492 "column name `{}` is datetime type, which is not supported, please use `timestamp` type instead",
493 column.name
494 ),
495 }
496 .fail();
497 }
498 }
499 Ok(())
500}
501
502fn validate_add_columns_expr(add_columns: &AddColumns) -> Result<()> {
503 for add_column in &add_columns.add_columns {
504 let Some(column_def) = &add_column.column_def else {
505 continue;
506 };
507 if is_date_time_type(&column_def.data_type()) {
508 return InvalidSqlSnafu {
509 err_msg: format!("column name `{}` is datetime type, which is not supported, please use `timestamp` type instead", column_def.name),
510 }
511 .fail();
512 }
513 if is_interval_type(&column_def.data_type()) {
514 return InvalidSqlSnafu {
515 err_msg: format!(
516 "column name `{}` is interval type, which is not supported",
517 column_def.name
518 ),
519 }
520 .fail();
521 }
522 }
523 Ok(())
524}
525
526fn is_date_time_type(data_type: &ColumnDataType) -> bool {
527 matches!(data_type, ColumnDataType::Datetime)
528}
529
530fn is_interval_type(data_type: &ColumnDataType) -> bool {
531 matches!(
532 data_type,
533 ColumnDataType::IntervalYearMonth
534 | ColumnDataType::IntervalDayTime
535 | ColumnDataType::IntervalMonthDayNano
536 )
537}
538
539fn find_primary_keys(
540 columns: &[SqlColumn],
541 constraints: &[TableConstraint],
542) -> Result<Vec<String>> {
543 let columns_pk = columns
544 .iter()
545 .filter_map(|x| {
546 if x.options().iter().any(|o| {
547 matches!(
548 o.option,
549 ColumnOption::Unique {
550 is_primary: true,
551 ..
552 }
553 )
554 }) {
555 Some(x.name().value.clone())
556 } else {
557 None
558 }
559 })
560 .collect::<Vec<String>>();
561
562 ensure!(
563 columns_pk.len() <= 1,
564 IllegalPrimaryKeysDefSnafu {
565 msg: "not allowed to inline multiple primary keys in columns options"
566 }
567 );
568
569 let constraints_pk = constraints
570 .iter()
571 .filter_map(|constraint| match constraint {
572 TableConstraint::PrimaryKey { columns, .. } => {
573 Some(columns.iter().map(|ident| ident.value.clone()))
574 }
575 _ => None,
576 })
577 .flatten()
578 .collect::<Vec<String>>();
579
580 ensure!(
581 columns_pk.is_empty() || constraints_pk.is_empty(),
582 IllegalPrimaryKeysDefSnafu {
583 msg: "found definitions of primary keys in multiple places"
584 }
585 );
586
587 let mut primary_keys = Vec::with_capacity(columns_pk.len() + constraints_pk.len());
588 primary_keys.extend(columns_pk);
589 primary_keys.extend(constraints_pk);
590 Ok(primary_keys)
591}
592
593pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
594 let time_index = constraints
595 .iter()
596 .filter_map(|constraint| match constraint {
597 TableConstraint::TimeIndex { column, .. } => Some(&column.value),
598 _ => None,
599 })
600 .collect::<Vec<&String>>();
601 ensure!(
602 time_index.len() == 1,
603 InvalidSqlSnafu {
604 err_msg: "must have one and only one TimeIndex columns",
605 }
606 );
607 Ok(time_index[0].clone())
608}
609
610fn columns_to_expr(
611 column_defs: &[SqlColumn],
612 time_index: &str,
613 primary_keys: &[String],
614 timezone: Option<&Timezone>,
615) -> Result<Vec<api::v1::ColumnDef>> {
616 let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
617 column_schemas_to_defs(column_schemas, primary_keys)
618}
619
620fn columns_to_column_schemas(
621 columns: &[SqlColumn],
622 time_index: &str,
623 timezone: Option<&Timezone>,
624) -> Result<Vec<ColumnSchema>> {
625 columns
626 .iter()
627 .map(|c| column_to_schema(c, time_index, timezone).context(ParseSqlSnafu))
628 .collect::<Result<Vec<ColumnSchema>>>()
629}
630
631pub fn column_schemas_to_defs(
633 column_schemas: Vec<ColumnSchema>,
634 primary_keys: &[String],
635) -> Result<Vec<api::v1::ColumnDef>> {
636 let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
637 .iter()
638 .map(|c| {
639 ColumnDataTypeWrapper::try_from(c.data_type.clone())
640 .map(|w| w.to_parts())
641 .context(ColumnDataTypeSnafu)
642 })
643 .collect::<Result<Vec<_>>>()?;
644
645 column_schemas
646 .iter()
647 .zip(column_datatypes)
648 .map(|(schema, datatype)| {
649 let semantic_type = if schema.is_time_index() {
650 SemanticType::Timestamp
651 } else if primary_keys.contains(&schema.name) {
652 SemanticType::Tag
653 } else {
654 SemanticType::Field
655 } as i32;
656 let comment = schema
657 .metadata()
658 .get(COMMENT_KEY)
659 .cloned()
660 .unwrap_or_default();
661
662 Ok(api::v1::ColumnDef {
663 name: schema.name.clone(),
664 data_type: datatype.0 as i32,
665 is_nullable: schema.is_nullable(),
666 default_constraint: match schema.default_constraint() {
667 None => vec![],
668 Some(v) => {
669 v.clone()
670 .try_into()
671 .context(ConvertColumnDefaultConstraintSnafu {
672 column_name: &schema.name,
673 })?
674 }
675 },
676 semantic_type,
677 comment,
678 datatype_extension: datatype.1,
679 options: options_from_column_schema(schema),
680 })
681 })
682 .collect()
683}
684
685#[derive(Debug, Clone, PartialEq, Eq)]
686pub struct RepartitionRequest {
687 pub catalog_name: String,
688 pub schema_name: String,
689 pub table_name: String,
690 pub from_exprs: Vec<Expr>,
691 pub into_exprs: Vec<Expr>,
692}
693
694pub(crate) fn to_repartition_request(
695 alter_table: AlterTable,
696 query_ctx: &QueryContextRef,
697) -> Result<RepartitionRequest> {
698 let (catalog_name, schema_name, table_name) =
699 table_idents_to_full_name(alter_table.table_name(), query_ctx)
700 .map_err(BoxedError::new)
701 .context(ExternalSnafu)?;
702
703 let AlterTableOperation::Repartition { operation } = alter_table.alter_operation else {
704 return InvalidSqlSnafu {
705 err_msg: "expected REPARTITION operation",
706 }
707 .fail();
708 };
709
710 Ok(RepartitionRequest {
711 catalog_name,
712 schema_name,
713 table_name,
714 from_exprs: operation.from_exprs,
715 into_exprs: operation.into_exprs,
716 })
717}
718
719pub(crate) fn to_alter_table_expr(
721 alter_table: AlterTable,
722 query_ctx: &QueryContextRef,
723) -> Result<AlterTableExpr> {
724 let (catalog_name, schema_name, table_name) =
725 table_idents_to_full_name(alter_table.table_name(), query_ctx)
726 .map_err(BoxedError::new)
727 .context(ExternalSnafu)?;
728
729 let kind = match alter_table.alter_operation {
730 AlterTableOperation::AddConstraint(_) => {
731 return NotSupportedSnafu {
732 feat: "ADD CONSTRAINT",
733 }
734 .fail();
735 }
736 AlterTableOperation::AddColumns { add_columns } => AlterTableKind::AddColumns(AddColumns {
737 add_columns: add_columns
738 .into_iter()
739 .map(|add_column| {
740 let column_def = sql_column_def_to_grpc_column_def(
741 &add_column.column_def,
742 Some(&query_ctx.timezone()),
743 )
744 .map_err(BoxedError::new)
745 .context(ExternalSnafu)?;
746 if is_interval_type(&column_def.data_type()) {
747 return NotSupportedSnafu {
748 feat: "Add column with interval type",
749 }
750 .fail();
751 }
752 Ok(AddColumn {
753 column_def: Some(column_def),
754 location: add_column.location.as_ref().map(From::from),
755 add_if_not_exists: add_column.add_if_not_exists,
756 })
757 })
758 .collect::<Result<Vec<AddColumn>>>()?,
759 }),
760 AlterTableOperation::ModifyColumnType {
761 column_name,
762 target_type,
763 } => {
764 let target_type =
765 sql_data_type_to_concrete_data_type(&target_type).context(ParseSqlSnafu)?;
766 let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
767 .map(|w| w.to_parts())
768 .context(ColumnDataTypeSnafu)?;
769 if is_interval_type(&target_type) {
770 return NotSupportedSnafu {
771 feat: "Modify column type to interval type",
772 }
773 .fail();
774 }
775 AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
776 modify_column_types: vec![ModifyColumnType {
777 column_name: column_name.value,
778 target_type: target_type as i32,
779 target_type_extension,
780 }],
781 })
782 }
783 AlterTableOperation::DropColumn { name } => AlterTableKind::DropColumns(DropColumns {
784 drop_columns: vec![DropColumn {
785 name: name.value.clone(),
786 }],
787 }),
788 AlterTableOperation::RenameTable { new_table_name } => {
789 AlterTableKind::RenameTable(RenameTable {
790 new_table_name: new_table_name.clone(),
791 })
792 }
793 AlterTableOperation::SetTableOptions { options } => {
794 AlterTableKind::SetTableOptions(SetTableOptions {
795 table_options: options.into_iter().map(Into::into).collect(),
796 })
797 }
798 AlterTableOperation::UnsetTableOptions { keys } => {
799 AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys })
800 }
801 AlterTableOperation::Repartition { .. } => {
802 return NotSupportedSnafu {
803 feat: "ALTER TABLE ... REPARTITION",
804 }
805 .fail();
806 }
807 AlterTableOperation::SetIndex { options } => {
808 let option = match options {
809 sql::statements::alter::SetIndexOperation::Fulltext {
810 column_name,
811 options,
812 } => SetIndex {
813 options: Some(set_index::Options::Fulltext(SetFulltext {
814 column_name: column_name.value,
815 enable: options.enable,
816 analyzer: match options.analyzer {
817 FulltextAnalyzer::English => Analyzer::English.into(),
818 FulltextAnalyzer::Chinese => Analyzer::Chinese.into(),
819 },
820 case_sensitive: options.case_sensitive,
821 backend: match options.backend {
822 FulltextBackend::Bloom => PbFulltextBackend::Bloom.into(),
823 FulltextBackend::Tantivy => PbFulltextBackend::Tantivy.into(),
824 },
825 granularity: options.granularity as u64,
826 false_positive_rate: options.false_positive_rate(),
827 })),
828 },
829 sql::statements::alter::SetIndexOperation::Inverted { column_name } => SetIndex {
830 options: Some(set_index::Options::Inverted(SetInverted {
831 column_name: column_name.value,
832 })),
833 },
834 sql::statements::alter::SetIndexOperation::Skipping {
835 column_name,
836 options,
837 } => SetIndex {
838 options: Some(set_index::Options::Skipping(SetSkipping {
839 column_name: column_name.value,
840 enable: true,
841 granularity: options.granularity as u64,
842 false_positive_rate: options.false_positive_rate(),
843 skipping_index_type: match options.index_type {
844 SkippingIndexType::BloomFilter => {
845 PbSkippingIndexType::BloomFilter.into()
846 }
847 },
848 })),
849 },
850 };
851 AlterTableKind::SetIndexes(SetIndexes {
852 set_indexes: vec![option],
853 })
854 }
855 AlterTableOperation::UnsetIndex { options } => {
856 let option = match options {
857 sql::statements::alter::UnsetIndexOperation::Fulltext { column_name } => {
858 UnsetIndex {
859 options: Some(unset_index::Options::Fulltext(UnsetFulltext {
860 column_name: column_name.value,
861 })),
862 }
863 }
864 sql::statements::alter::UnsetIndexOperation::Inverted { column_name } => {
865 UnsetIndex {
866 options: Some(unset_index::Options::Inverted(UnsetInverted {
867 column_name: column_name.value,
868 })),
869 }
870 }
871 sql::statements::alter::UnsetIndexOperation::Skipping { column_name } => {
872 UnsetIndex {
873 options: Some(unset_index::Options::Skipping(UnsetSkipping {
874 column_name: column_name.value,
875 })),
876 }
877 }
878 };
879
880 AlterTableKind::UnsetIndexes(UnsetIndexes {
881 unset_indexes: vec![option],
882 })
883 }
884 AlterTableOperation::DropDefaults { columns } => {
885 AlterTableKind::DropDefaults(DropDefaults {
886 drop_defaults: columns
887 .into_iter()
888 .map(|col| {
889 let column_name = col.0.to_string();
890 Ok(api::v1::DropDefault { column_name })
891 })
892 .collect::<Result<Vec<_>>>()?,
893 })
894 }
895 AlterTableOperation::SetDefaults { defaults } => AlterTableKind::SetDefaults(SetDefaults {
896 set_defaults: defaults
897 .into_iter()
898 .map(|col| {
899 let column_name = col.column_name.to_string();
900 let default_constraint = serde_json::to_string(&col.default_constraint)
901 .context(EncodeJsonSnafu)?
902 .into_bytes();
903 Ok(api::v1::SetDefault {
904 column_name,
905 default_constraint,
906 })
907 })
908 .collect::<Result<Vec<_>>>()?,
909 }),
910 };
911
912 Ok(AlterTableExpr {
913 catalog_name,
914 schema_name,
915 table_name,
916 kind: Some(kind),
917 })
918}
919
920pub fn to_alter_database_expr(
922 alter_database: AlterDatabase,
923 query_ctx: &QueryContextRef,
924) -> Result<AlterDatabaseExpr> {
925 let catalog = query_ctx.current_catalog();
926 let schema = alter_database.database_name;
927
928 let kind = match alter_database.alter_operation {
929 AlterDatabaseOperation::SetDatabaseOption { options } => {
930 let options = options.into_iter().map(Into::into).collect();
931 AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
932 set_database_options: options,
933 })
934 }
935 AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
936 AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys })
937 }
938 };
939
940 Ok(AlterDatabaseExpr {
941 catalog_name: catalog.to_string(),
942 schema_name: schema.to_string(),
943 kind: Some(kind),
944 })
945}
946
947pub fn to_create_view_expr(
949 stmt: CreateView,
950 logical_plan: Vec<u8>,
951 table_names: Vec<TableName>,
952 columns: Vec<String>,
953 plan_columns: Vec<String>,
954 definition: String,
955 query_ctx: QueryContextRef,
956) -> Result<CreateViewExpr> {
957 let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx)
958 .map_err(BoxedError::new)
959 .context(ExternalSnafu)?;
960
961 let expr = CreateViewExpr {
962 catalog_name,
963 schema_name,
964 view_name,
965 logical_plan,
966 create_if_not_exists: stmt.if_not_exists,
967 or_replace: stmt.or_replace,
968 table_names,
969 columns,
970 plan_columns,
971 definition,
972 };
973
974 Ok(expr)
975}
976
977pub fn to_create_flow_task_expr(
978 create_flow: CreateFlow,
979 query_ctx: &QueryContextRef,
980) -> Result<CreateFlowExpr> {
981 let sink_table_ref = object_name_to_table_reference(create_flow.sink_table_name.clone(), true)
983 .with_context(|_| ConvertIdentifierSnafu {
984 ident: create_flow.sink_table_name.to_string(),
985 })?;
986 let catalog = sink_table_ref
987 .catalog()
988 .unwrap_or(query_ctx.current_catalog())
989 .to_string();
990 let schema = sink_table_ref
991 .schema()
992 .map(|s| s.to_owned())
993 .unwrap_or(query_ctx.current_schema());
994
995 let sink_table_name = TableName {
996 catalog_name: catalog,
997 schema_name: schema,
998 table_name: sink_table_ref.table().to_string(),
999 };
1000
1001 let source_table_names = extract_tables_from_query(&create_flow.query)
1002 .map(|name| {
1003 let reference =
1004 object_name_to_table_reference(name.clone(), true).with_context(|_| {
1005 ConvertIdentifierSnafu {
1006 ident: name.to_string(),
1007 }
1008 })?;
1009 let catalog = reference
1010 .catalog()
1011 .unwrap_or(query_ctx.current_catalog())
1012 .to_string();
1013 let schema = reference
1014 .schema()
1015 .map(|s| s.to_string())
1016 .unwrap_or(query_ctx.current_schema());
1017
1018 let table_name = TableName {
1019 catalog_name: catalog,
1020 schema_name: schema,
1021 table_name: reference.table().to_string(),
1022 };
1023 Ok(table_name)
1024 })
1025 .collect::<Result<Vec<_>>>()?;
1026
1027 let eval_interval = create_flow.eval_interval;
1028
1029 Ok(CreateFlowExpr {
1030 catalog_name: query_ctx.current_catalog().to_string(),
1031 flow_name: sanitize_flow_name(create_flow.flow_name)?,
1032 source_table_names,
1033 sink_table_name: Some(sink_table_name),
1034 or_replace: create_flow.or_replace,
1035 create_if_not_exists: create_flow.if_not_exists,
1036 expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
1037 eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }),
1038 comment: create_flow.comment.unwrap_or_default(),
1039 sql: create_flow.query.to_string(),
1040 flow_options: Default::default(),
1041 })
1042}
1043
1044fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
1046 ensure!(
1047 flow_name.0.len() == 1,
1048 InvalidFlowNameSnafu {
1049 name: flow_name.to_string(),
1050 }
1051 );
1052 Ok(flow_name.0.swap_remove(0).to_string_unquoted())
1054}
1055
1056#[cfg(test)]
1057mod tests {
1058 use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
1059 use datatypes::value::Value;
1060 use session::context::{QueryContext, QueryContextBuilder};
1061 use sql::dialect::GreptimeDbDialect;
1062 use sql::parser::{ParseOptions, ParserContext};
1063 use sql::statements::statement::Statement;
1064 use store_api::storage::ColumnDefaultConstraint;
1065
1066 use super::*;
1067
1068 #[test]
1069 fn test_create_flow_tql_expr() {
1070 let sql = r#"
1071CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1072TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#;
1073 let stmt =
1074 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1075
1076 assert!(
1077 stmt.is_err(),
1078 "Expected error for invalid TQL EVAL parameters: {:#?}",
1079 stmt
1080 );
1081
1082 let sql = r#"
1083CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1084TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests);"#;
1085 let stmt =
1086 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1087 .unwrap()
1088 .pop()
1089 .unwrap();
1090
1091 let Statement::CreateFlow(create_flow) = stmt else {
1092 unreachable!()
1093 };
1094 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1095
1096 let to_dot_sep =
1097 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1098 assert_eq!("calc_reqs", expr.flow_name);
1099 assert_eq!("greptime", expr.catalog_name);
1100 assert_eq!(
1101 "greptime.public.cnt_reqs",
1102 expr.sink_table_name.map(to_dot_sep).unwrap()
1103 );
1104 assert!(expr.source_table_names.is_empty());
1105 assert_eq!(
1106 r#"TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests)"#,
1107 expr.sql
1108 );
1109 }
1110
1111 #[test]
1112 fn test_create_flow_expr() {
1113 let sql = r"
1114CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
1115SELECT
1116 DISTINCT number as dis
1117FROM
1118 distinct_basic;";
1119 let stmt =
1120 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1121 .unwrap()
1122 .pop()
1123 .unwrap();
1124
1125 let Statement::CreateFlow(create_flow) = stmt else {
1126 unreachable!()
1127 };
1128 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1129
1130 let to_dot_sep =
1131 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1132 assert_eq!("test_distinct_basic", expr.flow_name);
1133 assert_eq!("greptime", expr.catalog_name);
1134 assert_eq!(
1135 "greptime.public.out_distinct_basic",
1136 expr.sink_table_name.map(to_dot_sep).unwrap()
1137 );
1138 assert_eq!(1, expr.source_table_names.len());
1139 assert_eq!(
1140 "greptime.public.distinct_basic",
1141 to_dot_sep(expr.source_table_names[0].clone())
1142 );
1143 assert_eq!(
1144 r"SELECT
1145 DISTINCT number as dis
1146FROM
1147 distinct_basic",
1148 expr.sql
1149 );
1150
1151 let sql = r"
1152CREATE FLOW `task_2`
1153SINK TO schema_1.table_1
1154AS
1155SELECT max(c1), min(c2) FROM schema_2.table_2;";
1156 let stmt =
1157 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1158 .unwrap()
1159 .pop()
1160 .unwrap();
1161
1162 let Statement::CreateFlow(create_flow) = stmt else {
1163 unreachable!()
1164 };
1165 let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1166
1167 let to_dot_sep =
1168 |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1169 assert_eq!("task_2", expr.flow_name);
1170 assert_eq!("greptime", expr.catalog_name);
1171 assert_eq!(
1172 "greptime.schema_1.table_1",
1173 expr.sink_table_name.map(to_dot_sep).unwrap()
1174 );
1175 assert_eq!(1, expr.source_table_names.len());
1176 assert_eq!(
1177 "greptime.schema_2.table_2",
1178 to_dot_sep(expr.source_table_names[0].clone())
1179 );
1180 assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
1181
1182 let sql = r"
1183CREATE FLOW abc.`task_2`
1184SINK TO schema_1.table_1
1185AS
1186SELECT max(c1), min(c2) FROM schema_2.table_2;";
1187 let stmt =
1188 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1189 .unwrap()
1190 .pop()
1191 .unwrap();
1192
1193 let Statement::CreateFlow(create_flow) = stmt else {
1194 unreachable!()
1195 };
1196 let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
1197
1198 assert!(res.is_err());
1199 assert!(
1200 res.unwrap_err()
1201 .to_string()
1202 .contains("Invalid flow name: abc.`task_2`")
1203 );
1204 }
1205
1206 #[test]
1207 fn test_create_to_expr() {
1208 let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(ttl='3days', write_buffer_size='1024KB');";
1209 let stmt =
1210 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1211 .unwrap()
1212 .pop()
1213 .unwrap();
1214
1215 let Statement::CreateTable(create_table) = stmt else {
1216 unreachable!()
1217 };
1218 let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1219 assert_eq!("3days", expr.table_options.get("ttl").unwrap());
1220 assert_eq!(
1221 "1.0MiB",
1222 expr.table_options.get("write_buffer_size").unwrap()
1223 );
1224 }
1225
1226 #[test]
1227 fn test_invalid_create_to_expr() {
1228 let cases = [
1229 "CREATE TABLE monitor (host STRING primary key, ts TIMESTAMP TIME INDEX, some_column text, some_column string);",
1231 "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, some_column STRING, PRIMARY KEY (some_column, host, some_column));",
1233 "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, PRIMARY KEY (host, ts));",
1235 ];
1236
1237 for sql in cases {
1238 let stmt = ParserContext::create_with_dialect(
1239 sql,
1240 &GreptimeDbDialect {},
1241 ParseOptions::default(),
1242 )
1243 .unwrap()
1244 .pop()
1245 .unwrap();
1246 let Statement::CreateTable(create_table) = stmt else {
1247 unreachable!()
1248 };
1249 create_to_expr(&create_table, &QueryContext::arc()).unwrap_err();
1250 }
1251 }
1252
1253 #[test]
1254 fn test_create_to_expr_with_default_timestamp_value() {
1255 let sql = "CREATE TABLE monitor (v double,ts TIMESTAMP default '2024-01-30T00:01:01',TIME INDEX (ts)) engine=mito;";
1256 let stmt =
1257 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1258 .unwrap()
1259 .pop()
1260 .unwrap();
1261
1262 let Statement::CreateTable(create_table) = stmt else {
1263 unreachable!()
1264 };
1265
1266 let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1268 let ts_column = &expr.column_defs[1];
1269 let constraint = assert_ts_column(ts_column);
1270 assert!(
1271 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1272 if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1273 );
1274
1275 let ctx = QueryContextBuilder::default()
1277 .timezone(Timezone::from_tz_string("+08:00").unwrap())
1278 .build()
1279 .into();
1280 let expr = create_to_expr(&create_table, &ctx).unwrap();
1281 let ts_column = &expr.column_defs[1];
1282 let constraint = assert_ts_column(ts_column);
1283 assert!(
1284 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1285 if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1286 );
1287 }
1288
1289 fn assert_ts_column(ts_column: &api::v1::ColumnDef) -> ColumnDefaultConstraint {
1290 assert_eq!("ts", ts_column.name);
1291 assert_eq!(
1292 ColumnDataType::TimestampMillisecond as i32,
1293 ts_column.data_type
1294 );
1295 assert!(!ts_column.default_constraint.is_empty());
1296
1297 ColumnDefaultConstraint::try_from(&ts_column.default_constraint[..]).unwrap()
1298 }
1299
1300 #[test]
1301 fn test_to_alter_expr() {
1302 let sql = "ALTER DATABASE greptime SET key1='value1', key2='value2';";
1303 let stmt =
1304 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1305 .unwrap()
1306 .pop()
1307 .unwrap();
1308
1309 let Statement::AlterDatabase(alter_database) = stmt else {
1310 unreachable!()
1311 };
1312
1313 let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1314 let kind = expr.kind.unwrap();
1315
1316 let AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
1317 set_database_options,
1318 }) = kind
1319 else {
1320 unreachable!()
1321 };
1322
1323 assert_eq!(2, set_database_options.len());
1324 assert_eq!("key1", set_database_options[0].key);
1325 assert_eq!("value1", set_database_options[0].value);
1326 assert_eq!("key2", set_database_options[1].key);
1327 assert_eq!("value2", set_database_options[1].value);
1328
1329 let sql = "ALTER DATABASE greptime UNSET key1, key2;";
1330 let stmt =
1331 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1332 .unwrap()
1333 .pop()
1334 .unwrap();
1335
1336 let Statement::AlterDatabase(alter_database) = stmt else {
1337 unreachable!()
1338 };
1339
1340 let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1341 let kind = expr.kind.unwrap();
1342
1343 let AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys }) = kind else {
1344 unreachable!()
1345 };
1346
1347 assert_eq!(2, keys.len());
1348 assert!(keys.contains(&"key1".to_string()));
1349 assert!(keys.contains(&"key2".to_string()));
1350
1351 let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
1352 let stmt =
1353 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1354 .unwrap()
1355 .pop()
1356 .unwrap();
1357
1358 let Statement::AlterTable(alter_table) = stmt else {
1359 unreachable!()
1360 };
1361
1362 let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1364 let kind = expr.kind.unwrap();
1365
1366 let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1367 unreachable!()
1368 };
1369
1370 assert_eq!(1, add_columns.len());
1371 let ts_column = add_columns[0].column_def.clone().unwrap();
1372 let constraint = assert_ts_column(&ts_column);
1373 assert!(
1374 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1375 if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1376 );
1377
1378 let ctx = QueryContextBuilder::default()
1381 .timezone(Timezone::from_tz_string("+08:00").unwrap())
1382 .build()
1383 .into();
1384 let expr = to_alter_table_expr(alter_table, &ctx).unwrap();
1385 let kind = expr.kind.unwrap();
1386
1387 let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1388 unreachable!()
1389 };
1390
1391 assert_eq!(1, add_columns.len());
1392 let ts_column = add_columns[0].column_def.clone().unwrap();
1393 let constraint = assert_ts_column(&ts_column);
1394 assert!(
1395 matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1396 if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1397 );
1398 }
1399
1400 #[test]
1401 fn test_to_alter_modify_column_type_expr() {
1402 let sql = "ALTER TABLE monitor MODIFY COLUMN mem_usage STRING;";
1403 let stmt =
1404 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1405 .unwrap()
1406 .pop()
1407 .unwrap();
1408
1409 let Statement::AlterTable(alter_table) = stmt else {
1410 unreachable!()
1411 };
1412
1413 let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1415 let kind = expr.kind.unwrap();
1416
1417 let AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
1418 modify_column_types,
1419 }) = kind
1420 else {
1421 unreachable!()
1422 };
1423
1424 assert_eq!(1, modify_column_types.len());
1425 let modify_column_type = &modify_column_types[0];
1426
1427 assert_eq!("mem_usage", modify_column_type.column_name);
1428 assert_eq!(
1429 ColumnDataType::String as i32,
1430 modify_column_type.target_type
1431 );
1432 assert!(modify_column_type.target_type_extension.is_none());
1433 }
1434
1435 #[test]
1436 fn test_to_repartition_request() {
1437 let sql = r#"
1438ALTER TABLE metrics REPARTITION (
1439 device_id < 100
1440) INTO (
1441 device_id < 100 AND area < 'South',
1442 device_id < 100 AND area >= 'South'
1443);"#;
1444 let stmt =
1445 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1446 .unwrap()
1447 .pop()
1448 .unwrap();
1449
1450 let Statement::AlterTable(alter_table) = stmt else {
1451 unreachable!()
1452 };
1453
1454 let request = to_repartition_request(alter_table, &QueryContext::arc()).unwrap();
1455 assert_eq!("greptime", request.catalog_name);
1456 assert_eq!("public", request.schema_name);
1457 assert_eq!("metrics", request.table_name);
1458 assert_eq!(
1459 request
1460 .from_exprs
1461 .into_iter()
1462 .map(|x| x.to_string())
1463 .collect::<Vec<_>>(),
1464 vec!["device_id < 100".to_string()]
1465 );
1466 assert_eq!(
1467 request
1468 .into_exprs
1469 .into_iter()
1470 .map(|x| x.to_string())
1471 .collect::<Vec<_>>(),
1472 vec![
1473 "device_id < 100 AND area < 'South'".to_string(),
1474 "device_id < 100 AND area >= 'South'".to_string()
1475 ]
1476 );
1477 }
1478
1479 fn new_test_table_names() -> Vec<TableName> {
1480 vec![
1481 TableName {
1482 catalog_name: "greptime".to_string(),
1483 schema_name: "public".to_string(),
1484 table_name: "a_table".to_string(),
1485 },
1486 TableName {
1487 catalog_name: "greptime".to_string(),
1488 schema_name: "public".to_string(),
1489 table_name: "b_table".to_string(),
1490 },
1491 ]
1492 }
1493
1494 #[test]
1495 fn test_to_create_view_expr() {
1496 let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
1497 let stmt =
1498 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1499 .unwrap()
1500 .pop()
1501 .unwrap();
1502
1503 let Statement::CreateView(stmt) = stmt else {
1504 unreachable!()
1505 };
1506
1507 let logical_plan = vec![1, 2, 3];
1508 let table_names = new_test_table_names();
1509 let columns = vec!["a".to_string()];
1510 let plan_columns = vec!["number".to_string()];
1511
1512 let expr = to_create_view_expr(
1513 stmt,
1514 logical_plan.clone(),
1515 table_names.clone(),
1516 columns.clone(),
1517 plan_columns.clone(),
1518 sql.to_string(),
1519 QueryContext::arc(),
1520 )
1521 .unwrap();
1522
1523 assert_eq!("greptime", expr.catalog_name);
1524 assert_eq!("public", expr.schema_name);
1525 assert_eq!("test", expr.view_name);
1526 assert!(!expr.create_if_not_exists);
1527 assert!(!expr.or_replace);
1528 assert_eq!(logical_plan, expr.logical_plan);
1529 assert_eq!(table_names, expr.table_names);
1530 assert_eq!(sql, expr.definition);
1531 assert_eq!(columns, expr.columns);
1532 assert_eq!(plan_columns, expr.plan_columns);
1533 }
1534
1535 #[test]
1536 fn test_to_create_view_expr_complex() {
1537 let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test.test_view AS SELECT * FROM NUMBERS";
1538 let stmt =
1539 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1540 .unwrap()
1541 .pop()
1542 .unwrap();
1543
1544 let Statement::CreateView(stmt) = stmt else {
1545 unreachable!()
1546 };
1547
1548 let logical_plan = vec![1, 2, 3];
1549 let table_names = new_test_table_names();
1550 let columns = vec!["a".to_string()];
1551 let plan_columns = vec!["number".to_string()];
1552
1553 let expr = to_create_view_expr(
1554 stmt,
1555 logical_plan.clone(),
1556 table_names.clone(),
1557 columns.clone(),
1558 plan_columns.clone(),
1559 sql.to_string(),
1560 QueryContext::arc(),
1561 )
1562 .unwrap();
1563
1564 assert_eq!("greptime", expr.catalog_name);
1565 assert_eq!("test", expr.schema_name);
1566 assert_eq!("test_view", expr.view_name);
1567 assert!(expr.create_if_not_exists);
1568 assert!(expr.or_replace);
1569 assert_eq!(logical_plan, expr.logical_plan);
1570 assert_eq!(table_names, expr.table_names);
1571 assert_eq!(sql, expr.definition);
1572 assert_eq!(columns, expr.columns);
1573 assert_eq!(plan_columns, expr.plan_columns);
1574 }
1575
1576 #[test]
1577 fn test_expr_to_create() {
1578 let sql = r#"CREATE TABLE IF NOT EXISTS `tt` (
1579 `timestamp` TIMESTAMP(9) NOT NULL,
1580 `ip_address` STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),
1581 `username` STRING NULL,
1582 `http_method` STRING NULL INVERTED INDEX,
1583 `request_line` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),
1584 `protocol` STRING NULL,
1585 `status_code` INT NULL INVERTED INDEX,
1586 `response_size` BIGINT NULL,
1587 `message` STRING NULL,
1588 TIME INDEX (`timestamp`),
1589 PRIMARY KEY (`username`, `status_code`)
1590)
1591ENGINE=mito
1592WITH(
1593 append_mode = 'true'
1594)"#;
1595 let stmt =
1596 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1597 .unwrap()
1598 .pop()
1599 .unwrap();
1600
1601 let Statement::CreateTable(original_create) = stmt else {
1602 unreachable!()
1603 };
1604
1605 let expr = create_to_expr(&original_create, &QueryContext::arc()).unwrap();
1607
1608 let create_table = expr_to_create(&expr, Some('`')).unwrap();
1609 let new_sql = format!("{:#}", create_table);
1610 assert_eq!(sql, new_sql);
1611 }
1612}