1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::helper::ColumnDataTypeWrapper;
20use api::v1::alter_table_expr::Kind;
21use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
22use api::v1::{
23 AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
24 Repartition, column_def,
25};
26#[cfg(feature = "enterprise")]
27use api::v1::{
28 CreateTriggerExpr as PbCreateTriggerExpr, meta::CreateTriggerTask as PbCreateTriggerTask,
29};
30use catalog::CatalogManagerRef;
31use chrono::Utc;
32use common_base::regex_pattern::NAME_PATTERN_REG;
33use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, is_readonly_schema};
34use common_catalog::{format_full_flow_name, format_full_table_name};
35use common_error::ext::BoxedError;
36use common_meta::cache_invalidator::Context;
37use common_meta::ddl::create_flow::FlowType;
38use common_meta::instruction::CacheIdent;
39use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
40use common_meta::procedure_executor::ExecutorContext;
41#[cfg(feature = "enterprise")]
42use common_meta::rpc::ddl::trigger::CreateTriggerTask;
43#[cfg(feature = "enterprise")]
44use common_meta::rpc::ddl::trigger::DropTriggerTask;
45use common_meta::rpc::ddl::{
46 CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
47 SubmitDdlTaskResponse,
48};
49use common_query::Output;
50use common_recordbatch::{RecordBatch, RecordBatches};
51use common_sql::convert::sql_value_to_value;
52use common_telemetry::{debug, info, tracing, warn};
53use common_time::{Timestamp, Timezone};
54use datafusion_common::tree_node::TreeNodeVisitor;
55use datafusion_expr::LogicalPlan;
56use datatypes::prelude::ConcreteDataType;
57use datatypes::schema::{ColumnSchema, RawSchema, Schema};
58use datatypes::value::Value;
59use datatypes::vectors::{StringVector, VectorRef};
60use humantime::parse_duration;
61use partition::expr::{Operand, PartitionExpr, RestrictedOp};
62use partition::multi_dim::MultiDimPartitionRule;
63use query::parser::QueryStatement;
64use query::plan::extract_and_rewrite_full_table_names;
65use query::query_engine::DefaultSerializer;
66use query::sql::create_table_stmt;
67use session::context::QueryContextRef;
68use session::table_name::table_idents_to_full_name;
69use snafu::{OptionExt, ResultExt, ensure};
70use sql::parser::{ParseOptions, ParserContext};
71use sql::statements::OptionMap;
72#[cfg(feature = "enterprise")]
73use sql::statements::alter::trigger::AlterTrigger;
74use sql::statements::alter::{AlterDatabase, AlterTable, AlterTableOperation};
75#[cfg(feature = "enterprise")]
76use sql::statements::create::trigger::CreateTrigger;
77use sql::statements::create::{
78 CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
79};
80use sql::statements::statement::Statement;
81use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
82use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
83use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
84use table::TableRef;
85use table::dist_table::DistTable;
86use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
87use table::requests::{
88 AlterKind, AlterTableRequest, COMMENT_KEY, DDL_TIMEOUT, DDL_WAIT, TableOptions,
89};
90use table::table_name::TableName;
91use table::table_reference::TableReference;
92
93use crate::error::{
94 self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
95 ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
96 DeserializePartitionExprSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
97 FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
98 InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu,
99 PartitionExprToPbSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu,
100 SerializePartitionExprSnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu,
101 TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
102 ViewAlreadyExistsSnafu,
103};
104use crate::expr_helper::{self, RepartitionRequest};
105use crate::statement::StatementExecutor;
106use crate::statement::show::create_partitions_stmt;
107
108#[derive(Debug, Clone, Copy)]
109struct DdlSubmitOptions {
110 wait: bool,
111 timeout: Duration,
112}
113
114fn build_procedure_id_output(procedure_id: Vec<u8>) -> Result<Output> {
115 let procedure_id = String::from_utf8_lossy(&procedure_id).to_string();
116 let vector: VectorRef = Arc::new(StringVector::from(vec![procedure_id]));
117 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
118 "Procedure ID",
119 vector.data_type(),
120 false,
121 )]));
122 let batch =
123 RecordBatch::new(schema.clone(), vec![vector]).context(error::BuildRecordBatchSnafu)?;
124 let batches =
125 RecordBatches::try_new(schema, vec![batch]).context(error::BuildRecordBatchSnafu)?;
126 Ok(Output::new_with_record_batches(batches))
127}
128
129fn parse_ddl_options(options: &OptionMap) -> Result<DdlSubmitOptions> {
130 let wait = match options.get(DDL_WAIT) {
131 Some(value) => value.parse::<bool>().map_err(|_| {
132 InvalidSqlSnafu {
133 err_msg: format!("invalid DDL option '{DDL_WAIT}': '{value}'"),
134 }
135 .build()
136 })?,
137 None => SubmitDdlTaskRequest::default_wait(),
138 };
139
140 let timeout = match options.get(DDL_TIMEOUT) {
141 Some(value) => parse_duration(value).map_err(|err| {
142 InvalidSqlSnafu {
143 err_msg: format!("invalid DDL option '{DDL_TIMEOUT}': '{value}': {err}"),
144 }
145 .build()
146 })?,
147 None => SubmitDdlTaskRequest::default_timeout(),
148 };
149
150 Ok(DdlSubmitOptions { wait, timeout })
151}
152
153impl StatementExecutor {
154 pub fn catalog_manager(&self) -> CatalogManagerRef {
155 self.catalog_manager.clone()
156 }
157
158 #[tracing::instrument(skip_all)]
159 pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
160 let (catalog, schema, _table) = table_idents_to_full_name(&stmt.name, &ctx)
161 .map_err(BoxedError::new)
162 .context(error::ExternalSnafu)?;
163
164 let schema_options = self
165 .table_metadata_manager
166 .schema_manager()
167 .get(SchemaNameKey {
168 catalog: &catalog,
169 schema: &schema,
170 })
171 .await
172 .context(TableMetadataManagerSnafu)?
173 .map(|v| v.into_inner());
174
175 let create_expr = &mut expr_helper::create_to_expr(&stmt, &ctx)?;
176 if let Some(schema_options) = schema_options {
179 for (key, value) in schema_options.extra_options.iter() {
180 if key.starts_with("compaction.") {
181 continue;
182 }
183 create_expr
184 .table_options
185 .entry(key.clone())
186 .or_insert(value.clone());
187 }
188 }
189
190 self.create_table_inner(create_expr, stmt.partitions, ctx)
191 .await
192 }
193
194 #[tracing::instrument(skip_all)]
195 pub async fn create_table_like(
196 &self,
197 stmt: CreateTableLike,
198 ctx: QueryContextRef,
199 ) -> Result<TableRef> {
200 let (catalog, schema, table) = table_idents_to_full_name(&stmt.source_name, &ctx)
201 .map_err(BoxedError::new)
202 .context(error::ExternalSnafu)?;
203 let table_ref = self
204 .catalog_manager
205 .table(&catalog, &schema, &table, Some(&ctx))
206 .await
207 .context(CatalogSnafu)?
208 .context(TableNotFoundSnafu { table_name: &table })?;
209 let partition_info = self
210 .partition_manager
211 .find_physical_partition_info(table_ref.table_info().table_id())
212 .await
213 .context(error::FindTablePartitionRuleSnafu { table_name: table })?;
214
215 let schema_options = self
217 .table_metadata_manager
218 .schema_manager()
219 .get(SchemaNameKey {
220 catalog: &catalog,
221 schema: &schema,
222 })
223 .await
224 .context(TableMetadataManagerSnafu)?
225 .map(|v| v.into_inner());
226
227 let quote_style = ctx.quote_style();
228 let mut create_stmt =
229 create_table_stmt(&table_ref.table_info(), schema_options, quote_style)
230 .context(error::ParseQuerySnafu)?;
231 create_stmt.name = stmt.table_name;
232 create_stmt.if_not_exists = false;
233
234 let table_info = table_ref.table_info();
235 let partitions = create_partitions_stmt(&table_info, &partition_info.partitions)?.and_then(
236 |mut partitions| {
237 if !partitions.column_list.is_empty() {
238 partitions.set_quote(quote_style);
239 Some(partitions)
240 } else {
241 None
242 }
243 },
244 );
245
246 let create_expr = &mut expr_helper::create_to_expr(&create_stmt, &ctx)?;
247 self.create_table_inner(create_expr, partitions, ctx).await
248 }
249
250 #[tracing::instrument(skip_all)]
251 pub async fn create_external_table(
252 &self,
253 create_expr: CreateExternalTable,
254 ctx: QueryContextRef,
255 ) -> Result<TableRef> {
256 let create_expr = &mut expr_helper::create_external_expr(create_expr, &ctx).await?;
257 self.create_table_inner(create_expr, None, ctx).await
258 }
259
260 #[tracing::instrument(skip_all)]
261 pub async fn create_table_inner(
262 &self,
263 create_table: &mut CreateTableExpr,
264 partitions: Option<Partitions>,
265 query_ctx: QueryContextRef,
266 ) -> Result<TableRef> {
267 ensure!(
268 !is_readonly_schema(&create_table.schema_name),
269 SchemaReadOnlySnafu {
270 name: create_table.schema_name.clone()
271 }
272 );
273
274 if create_table.engine == METRIC_ENGINE_NAME
275 && create_table
276 .table_options
277 .contains_key(LOGICAL_TABLE_METADATA_KEY)
278 {
279 if let Some(partitions) = partitions.as_ref()
280 && !partitions.exprs.is_empty()
281 {
282 self.validate_logical_table_partition_rule(create_table, partitions, &query_ctx)
283 .await?;
284 }
285 self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
287 .await?
288 .into_iter()
289 .next()
290 .context(error::UnexpectedSnafu {
291 violated: "expected to create logical tables",
292 })
293 } else {
294 self.create_non_logic_table(create_table, partitions, query_ctx)
296 .await
297 }
298 }
299
300 #[tracing::instrument(skip_all)]
301 pub async fn create_non_logic_table(
302 &self,
303 create_table: &mut CreateTableExpr,
304 partitions: Option<Partitions>,
305 query_ctx: QueryContextRef,
306 ) -> Result<TableRef> {
307 let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
308
309 let schema = self
311 .table_metadata_manager
312 .schema_manager()
313 .get(SchemaNameKey::new(
314 &create_table.catalog_name,
315 &create_table.schema_name,
316 ))
317 .await
318 .context(TableMetadataManagerSnafu)?;
319 ensure!(
320 schema.is_some(),
321 SchemaNotFoundSnafu {
322 schema_info: &create_table.schema_name,
323 }
324 );
325
326 if let Some(table) = self
328 .catalog_manager
329 .table(
330 &create_table.catalog_name,
331 &create_table.schema_name,
332 &create_table.table_name,
333 Some(&query_ctx),
334 )
335 .await
336 .context(CatalogSnafu)?
337 {
338 return if create_table.create_if_not_exists {
339 Ok(table)
340 } else {
341 TableAlreadyExistsSnafu {
342 table: format_full_table_name(
343 &create_table.catalog_name,
344 &create_table.schema_name,
345 &create_table.table_name,
346 ),
347 }
348 .fail()
349 };
350 }
351
352 ensure!(
353 NAME_PATTERN_REG.is_match(&create_table.table_name),
354 InvalidTableNameSnafu {
355 table_name: &create_table.table_name,
356 }
357 );
358
359 let table_name = TableName::new(
360 &create_table.catalog_name,
361 &create_table.schema_name,
362 &create_table.table_name,
363 );
364
365 let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
366 let mut table_info = create_table_info(create_table, partition_cols)?;
367
368 let resp = self
369 .create_table_procedure(
370 create_table.clone(),
371 partitions,
372 table_info.clone(),
373 query_ctx,
374 )
375 .await?;
376
377 let table_id = resp
378 .table_ids
379 .into_iter()
380 .next()
381 .context(error::UnexpectedSnafu {
382 violated: "expected table_id",
383 })?;
384 info!("Successfully created table '{table_name}' with table id {table_id}");
385
386 table_info.ident.table_id = table_id;
387
388 let table_info: Arc<TableInfo> =
389 Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?);
390 create_table.table_id = Some(api::v1::TableId { id: table_id });
391
392 let table = DistTable::table(table_info);
393
394 Ok(table)
395 }
396
397 #[tracing::instrument(skip_all)]
398 pub async fn create_logical_tables(
399 &self,
400 create_table_exprs: &[CreateTableExpr],
401 query_context: QueryContextRef,
402 ) -> Result<Vec<TableRef>> {
403 let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
404 ensure!(
405 !create_table_exprs.is_empty(),
406 EmptyDdlExprSnafu {
407 name: "create logic tables"
408 }
409 );
410
411 for create_table in create_table_exprs {
413 ensure!(
414 NAME_PATTERN_REG.is_match(&create_table.table_name),
415 InvalidTableNameSnafu {
416 table_name: &create_table.table_name,
417 }
418 );
419 }
420
421 let raw_tables_info = create_table_exprs
422 .iter()
423 .map(|create| create_table_info(create, vec![]))
424 .collect::<Result<Vec<_>>>()?;
425 let tables_data = create_table_exprs
426 .iter()
427 .cloned()
428 .zip(raw_tables_info.iter().cloned())
429 .collect::<Vec<_>>();
430
431 let resp = self
432 .create_logical_tables_procedure(tables_data, query_context.clone())
433 .await?;
434
435 let table_ids = resp.table_ids;
436 ensure!(
437 table_ids.len() == raw_tables_info.len(),
438 CreateLogicalTablesSnafu {
439 reason: format!(
440 "The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}",
441 raw_tables_info.len(),
442 table_ids.len()
443 )
444 }
445 );
446 info!("Successfully created logical tables: {:?}", table_ids);
447
448 let mut tables_info = Vec::with_capacity(table_ids.len());
452 for (table_id, create_table) in table_ids.iter().zip(create_table_exprs.iter()) {
453 let table = self
454 .catalog_manager
455 .table(
456 &create_table.catalog_name,
457 &create_table.schema_name,
458 &create_table.table_name,
459 Some(&query_context),
460 )
461 .await
462 .context(CatalogSnafu)?
463 .with_context(|| TableNotFoundSnafu {
464 table_name: format_full_table_name(
465 &create_table.catalog_name,
466 &create_table.schema_name,
467 &create_table.table_name,
468 ),
469 })?;
470
471 let table_info = table.table_info();
472 ensure!(
474 table_info.table_id() == *table_id,
475 CreateLogicalTablesSnafu {
476 reason: format!(
477 "Table id mismatch after creation, expected {}, got {} for table {}",
478 table_id,
479 table_info.table_id(),
480 format_full_table_name(
481 &create_table.catalog_name,
482 &create_table.schema_name,
483 &create_table.table_name
484 )
485 )
486 }
487 );
488
489 tables_info.push(table_info);
490 }
491
492 Ok(tables_info.into_iter().map(DistTable::table).collect())
493 }
494
495 async fn validate_logical_table_partition_rule(
496 &self,
497 create_table: &CreateTableExpr,
498 partitions: &Partitions,
499 query_ctx: &QueryContextRef,
500 ) -> Result<()> {
501 let (_, mut logical_partition_exprs) =
502 parse_partitions_for_logical_validation(create_table, partitions, query_ctx)?;
503
504 let physical_table_name = create_table
505 .table_options
506 .get(LOGICAL_TABLE_METADATA_KEY)
507 .with_context(|| CreateLogicalTablesSnafu {
508 reason: format!(
509 "expect `{LOGICAL_TABLE_METADATA_KEY}` option on creating logical table"
510 ),
511 })?;
512
513 let physical_table = self
514 .catalog_manager
515 .table(
516 &create_table.catalog_name,
517 &create_table.schema_name,
518 physical_table_name,
519 Some(query_ctx),
520 )
521 .await
522 .context(CatalogSnafu)?
523 .context(TableNotFoundSnafu {
524 table_name: physical_table_name.clone(),
525 })?;
526
527 let physical_table_info = physical_table.table_info();
528 let (partition_rule, _) = self
529 .partition_manager
530 .find_table_partition_rule(&physical_table_info)
531 .await
532 .context(error::FindTablePartitionRuleSnafu {
533 table_name: physical_table_name.clone(),
534 })?;
535
536 let multi_dim_rule = partition_rule
537 .as_ref()
538 .as_any()
539 .downcast_ref::<MultiDimPartitionRule>()
540 .context(InvalidPartitionRuleSnafu {
541 reason: "physical table partition rule is not range-based",
542 })?;
543
544 let mut physical_partition_exprs = multi_dim_rule.exprs().to_vec();
546 logical_partition_exprs.sort_unstable();
547 physical_partition_exprs.sort_unstable();
548
549 ensure!(
550 physical_partition_exprs == logical_partition_exprs,
551 InvalidPartitionRuleSnafu {
552 reason: format!(
553 "logical table partition rule must match the corresponding physical table's\n logical table partition exprs:\t\t {:?}\n physical table partition exprs:\t {:?}",
554 logical_partition_exprs, physical_partition_exprs
555 ),
556 }
557 );
558
559 Ok(())
560 }
561
562 #[cfg(feature = "enterprise")]
563 #[tracing::instrument(skip_all)]
564 pub async fn create_trigger(
565 &self,
566 stmt: CreateTrigger,
567 query_context: QueryContextRef,
568 ) -> Result<Output> {
569 let expr = expr_helper::to_create_trigger_task_expr(stmt, &query_context)?;
570 self.create_trigger_inner(expr, query_context).await
571 }
572
573 #[cfg(feature = "enterprise")]
574 pub async fn create_trigger_inner(
575 &self,
576 expr: PbCreateTriggerExpr,
577 query_context: QueryContextRef,
578 ) -> Result<Output> {
579 self.create_trigger_procedure(expr, query_context).await?;
580 Ok(Output::new_with_affected_rows(0))
581 }
582
583 #[cfg(feature = "enterprise")]
584 async fn create_trigger_procedure(
585 &self,
586 expr: PbCreateTriggerExpr,
587 query_context: QueryContextRef,
588 ) -> Result<SubmitDdlTaskResponse> {
589 let task = CreateTriggerTask::try_from(PbCreateTriggerTask {
590 create_trigger: Some(expr),
591 })
592 .context(error::InvalidExprSnafu)?;
593
594 let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_create_trigger(task));
595
596 self.procedure_executor
597 .submit_ddl_task(&ExecutorContext::default(), request)
598 .await
599 .context(error::ExecuteDdlSnafu)
600 }
601
602 #[tracing::instrument(skip_all)]
603 pub async fn create_flow(
604 &self,
605 stmt: CreateFlow,
606 query_context: QueryContextRef,
607 ) -> Result<Output> {
608 let expr = expr_helper::to_create_flow_task_expr(stmt, &query_context)?;
610
611 self.create_flow_inner(expr, query_context).await
612 }
613
614 pub async fn create_flow_inner(
615 &self,
616 expr: CreateFlowExpr,
617 query_context: QueryContextRef,
618 ) -> Result<Output> {
619 self.create_flow_procedure(expr, query_context).await?;
620 Ok(Output::new_with_affected_rows(0))
621 }
622
623 async fn create_flow_procedure(
624 &self,
625 expr: CreateFlowExpr,
626 query_context: QueryContextRef,
627 ) -> Result<SubmitDdlTaskResponse> {
628 let flow_type = self
629 .determine_flow_type(&expr, query_context.clone())
630 .await?;
631 info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
632
633 let expr = {
634 let mut expr = expr;
635 expr.flow_options
636 .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
637 expr
638 };
639
640 let task = CreateFlowTask::try_from(PbCreateFlowTask {
641 create_flow: Some(expr),
642 })
643 .context(error::InvalidExprSnafu)?;
644 let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_create_flow(task));
645
646 self.procedure_executor
647 .submit_ddl_task(&ExecutorContext::default(), request)
648 .await
649 .context(error::ExecuteDdlSnafu)
650 }
651
652 async fn determine_flow_type(
656 &self,
657 expr: &CreateFlowExpr,
658 query_ctx: QueryContextRef,
659 ) -> Result<FlowType> {
660 for src_table_name in &expr.source_table_names {
662 let table = self
663 .catalog_manager()
664 .table(
665 &src_table_name.catalog_name,
666 &src_table_name.schema_name,
667 &src_table_name.table_name,
668 Some(&query_ctx),
669 )
670 .await
671 .map_err(BoxedError::new)
672 .context(ExternalSnafu)?
673 .with_context(|| TableNotFoundSnafu {
674 table_name: format_full_table_name(
675 &src_table_name.catalog_name,
676 &src_table_name.schema_name,
677 &src_table_name.table_name,
678 ),
679 })?;
680
681 if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
683 warn!(
684 "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
685 format_full_table_name(
686 &src_table_name.catalog_name,
687 &src_table_name.schema_name,
688 &src_table_name.table_name
689 ),
690 expr.flow_name
691 );
692 return Ok(FlowType::Streaming);
693 }
694 }
695
696 let engine = &self.query_engine;
697 let stmts = ParserContext::create_with_dialect(
698 &expr.sql,
699 query_ctx.sql_dialect(),
700 ParseOptions::default(),
701 )
702 .map_err(BoxedError::new)
703 .context(ExternalSnafu)?;
704
705 ensure!(
706 stmts.len() == 1,
707 InvalidSqlSnafu {
708 err_msg: format!("Expect only one statement, found {}", stmts.len())
709 }
710 );
711 let stmt = &stmts[0];
712
713 let plan = match stmt {
715 Statement::Tql(_) => return Ok(FlowType::Batching),
717 _ => engine
718 .planner()
719 .plan(&QueryStatement::Sql(stmt.clone()), query_ctx)
720 .await
721 .map_err(BoxedError::new)
722 .context(ExternalSnafu)?,
723 };
724
725 struct FindAggr {
727 is_aggr: bool,
728 }
729
730 impl TreeNodeVisitor<'_> for FindAggr {
731 type Node = LogicalPlan;
732 fn f_down(
733 &mut self,
734 node: &Self::Node,
735 ) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
736 {
737 match node {
738 LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
739 self.is_aggr = true;
740 return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
741 }
742 _ => (),
743 }
744 Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
745 }
746 }
747
748 let mut find_aggr = FindAggr { is_aggr: false };
749
750 plan.visit_with_subqueries(&mut find_aggr)
751 .context(BuildDfLogicalPlanSnafu)?;
752 if find_aggr.is_aggr {
753 Ok(FlowType::Batching)
754 } else {
755 Ok(FlowType::Streaming)
756 }
757 }
758
759 #[tracing::instrument(skip_all)]
760 pub async fn create_view(
761 &self,
762 create_view: CreateView,
763 ctx: QueryContextRef,
764 ) -> Result<TableRef> {
765 let logical_plan = match &*create_view.query {
767 Statement::Query(query) => {
768 self.plan(
769 &QueryStatement::Sql(Statement::Query(query.clone())),
770 ctx.clone(),
771 )
772 .await?
773 }
774 Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?,
775 _ => {
776 return InvalidViewStmtSnafu {}.fail();
777 }
778 };
779 let definition = create_view.to_string();
781
782 let schema: Schema = logical_plan
785 .schema()
786 .clone()
787 .try_into()
788 .context(ConvertSchemaSnafu)?;
789 let plan_columns: Vec<_> = schema
790 .column_schemas()
791 .iter()
792 .map(|c| c.name.clone())
793 .collect();
794
795 let columns: Vec<_> = create_view
796 .columns
797 .iter()
798 .map(|ident| ident.to_string())
799 .collect();
800
801 if !columns.is_empty() {
803 ensure!(
804 columns.len() == plan_columns.len(),
805 error::ViewColumnsMismatchSnafu {
806 view_name: create_view.name.to_string(),
807 expected: plan_columns.len(),
808 actual: columns.len(),
809 }
810 );
811 }
812
813 let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
816 .context(ExtractTableNamesSnafu)?;
817
818 let table_names = table_names.into_iter().map(|t| t.into()).collect();
819
820 let encoded_plan = DFLogicalSubstraitConvertor
827 .encode(&plan, DefaultSerializer)
828 .context(SubstraitCodecSnafu)?;
829
830 let expr = expr_helper::to_create_view_expr(
831 create_view,
832 encoded_plan.to_vec(),
833 table_names,
834 columns,
835 plan_columns,
836 definition,
837 ctx.clone(),
838 )?;
839
840 self.create_view_by_expr(expr, ctx).await
842 }
843
844 pub async fn create_view_by_expr(
845 &self,
846 expr: CreateViewExpr,
847 ctx: QueryContextRef,
848 ) -> Result<TableRef> {
849 ensure! {
850 !(expr.create_if_not_exists & expr.or_replace),
851 InvalidSqlSnafu {
852 err_msg: "syntax error Create Or Replace and If Not Exist cannot be used together",
853 }
854 };
855 let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
856
857 let schema_exists = self
858 .table_metadata_manager
859 .schema_manager()
860 .exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name))
861 .await
862 .context(TableMetadataManagerSnafu)?;
863
864 ensure!(
865 schema_exists,
866 SchemaNotFoundSnafu {
867 schema_info: &expr.schema_name,
868 }
869 );
870
871 if let Some(table) = self
873 .catalog_manager
874 .table(
875 &expr.catalog_name,
876 &expr.schema_name,
877 &expr.view_name,
878 Some(&ctx),
879 )
880 .await
881 .context(CatalogSnafu)?
882 {
883 let table_type = table.table_info().table_type;
884
885 match (table_type, expr.create_if_not_exists, expr.or_replace) {
886 (TableType::View, true, false) => {
887 return Ok(table);
888 }
889 (TableType::View, false, false) => {
890 return ViewAlreadyExistsSnafu {
891 name: format_full_table_name(
892 &expr.catalog_name,
893 &expr.schema_name,
894 &expr.view_name,
895 ),
896 }
897 .fail();
898 }
899 (TableType::View, _, true) => {
900 }
902 _ => {
903 return TableAlreadyExistsSnafu {
904 table: format_full_table_name(
905 &expr.catalog_name,
906 &expr.schema_name,
907 &expr.view_name,
908 ),
909 }
910 .fail();
911 }
912 }
913 }
914
915 ensure!(
916 NAME_PATTERN_REG.is_match(&expr.view_name),
917 InvalidViewNameSnafu {
918 name: expr.view_name.clone(),
919 }
920 );
921
922 let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
923
924 let mut view_info = RawTableInfo {
925 ident: metadata::TableIdent {
926 table_id: 0,
928 version: 0,
929 },
930 name: expr.view_name.clone(),
931 desc: None,
932 catalog_name: expr.catalog_name.clone(),
933 schema_name: expr.schema_name.clone(),
934 meta: RawTableMeta::default(),
936 table_type: TableType::View,
937 };
938
939 let request =
940 SubmitDdlTaskRequest::new(ctx, DdlTask::new_create_view(expr, view_info.clone()));
941
942 let resp = self
943 .procedure_executor
944 .submit_ddl_task(&ExecutorContext::default(), request)
945 .await
946 .context(error::ExecuteDdlSnafu)?;
947
948 debug!(
949 "Submit creating view '{view_name}' task response: {:?}",
950 resp
951 );
952
953 let view_id = resp
954 .table_ids
955 .into_iter()
956 .next()
957 .context(error::UnexpectedSnafu {
958 violated: "expected table_id",
959 })?;
960 info!("Successfully created view '{view_name}' with view id {view_id}");
961
962 view_info.ident.table_id = view_id;
963
964 let view_info = Arc::new(view_info.try_into().context(CreateTableInfoSnafu)?);
965
966 let table = DistTable::table(view_info);
967
968 self.cache_invalidator
970 .invalidate(
971 &Context::default(),
972 &[
973 CacheIdent::TableId(view_id),
974 CacheIdent::TableName(view_name.clone()),
975 ],
976 )
977 .await
978 .context(error::InvalidateTableCacheSnafu)?;
979
980 Ok(table)
981 }
982
983 #[tracing::instrument(skip_all)]
984 pub async fn drop_flow(
985 &self,
986 catalog_name: String,
987 flow_name: String,
988 drop_if_exists: bool,
989 query_context: QueryContextRef,
990 ) -> Result<Output> {
991 if let Some(flow) = self
992 .flow_metadata_manager
993 .flow_name_manager()
994 .get(&catalog_name, &flow_name)
995 .await
996 .context(error::TableMetadataManagerSnafu)?
997 {
998 let flow_id = flow.flow_id();
999 let task = DropFlowTask {
1000 catalog_name,
1001 flow_name,
1002 flow_id,
1003 drop_if_exists,
1004 };
1005 self.drop_flow_procedure(task, query_context).await?;
1006
1007 Ok(Output::new_with_affected_rows(0))
1008 } else if drop_if_exists {
1009 Ok(Output::new_with_affected_rows(0))
1010 } else {
1011 FlowNotFoundSnafu {
1012 flow_name: format_full_flow_name(&catalog_name, &flow_name),
1013 }
1014 .fail()
1015 }
1016 }
1017
1018 async fn drop_flow_procedure(
1019 &self,
1020 expr: DropFlowTask,
1021 query_context: QueryContextRef,
1022 ) -> Result<SubmitDdlTaskResponse> {
1023 let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_drop_flow(expr));
1024
1025 self.procedure_executor
1026 .submit_ddl_task(&ExecutorContext::default(), request)
1027 .await
1028 .context(error::ExecuteDdlSnafu)
1029 }
1030
1031 #[cfg(feature = "enterprise")]
1032 #[tracing::instrument(skip_all)]
1033 pub(super) async fn drop_trigger(
1034 &self,
1035 catalog_name: String,
1036 trigger_name: String,
1037 drop_if_exists: bool,
1038 query_context: QueryContextRef,
1039 ) -> Result<Output> {
1040 let task = DropTriggerTask {
1041 catalog_name,
1042 trigger_name,
1043 drop_if_exists,
1044 };
1045 self.drop_trigger_procedure(task, query_context).await?;
1046 Ok(Output::new_with_affected_rows(0))
1047 }
1048
1049 #[cfg(feature = "enterprise")]
1050 async fn drop_trigger_procedure(
1051 &self,
1052 expr: DropTriggerTask,
1053 query_context: QueryContextRef,
1054 ) -> Result<SubmitDdlTaskResponse> {
1055 let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_drop_trigger(expr));
1056
1057 self.procedure_executor
1058 .submit_ddl_task(&ExecutorContext::default(), request)
1059 .await
1060 .context(error::ExecuteDdlSnafu)
1061 }
1062
1063 #[tracing::instrument(skip_all)]
1065 pub(crate) async fn drop_view(
1066 &self,
1067 catalog: String,
1068 schema: String,
1069 view: String,
1070 drop_if_exists: bool,
1071 query_context: QueryContextRef,
1072 ) -> Result<Output> {
1073 let view_info = if let Some(view) = self
1074 .catalog_manager
1075 .table(&catalog, &schema, &view, None)
1076 .await
1077 .context(CatalogSnafu)?
1078 {
1079 view.table_info()
1080 } else if drop_if_exists {
1081 return Ok(Output::new_with_affected_rows(0));
1083 } else {
1084 return TableNotFoundSnafu {
1085 table_name: format_full_table_name(&catalog, &schema, &view),
1086 }
1087 .fail();
1088 };
1089
1090 ensure!(
1092 view_info.table_type == TableType::View,
1093 error::InvalidViewSnafu {
1094 msg: "not a view",
1095 view_name: format_full_table_name(&catalog, &schema, &view),
1096 }
1097 );
1098
1099 let view_id = view_info.table_id();
1100
1101 let task = DropViewTask {
1102 catalog,
1103 schema,
1104 view,
1105 view_id,
1106 drop_if_exists,
1107 };
1108
1109 self.drop_view_procedure(task, query_context).await?;
1110
1111 Ok(Output::new_with_affected_rows(0))
1112 }
1113
1114 async fn drop_view_procedure(
1116 &self,
1117 expr: DropViewTask,
1118 query_context: QueryContextRef,
1119 ) -> Result<SubmitDdlTaskResponse> {
1120 let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_drop_view(expr));
1121
1122 self.procedure_executor
1123 .submit_ddl_task(&ExecutorContext::default(), request)
1124 .await
1125 .context(error::ExecuteDdlSnafu)
1126 }
1127
1128 #[tracing::instrument(skip_all)]
1129 pub async fn alter_logical_tables(
1130 &self,
1131 alter_table_exprs: Vec<AlterTableExpr>,
1132 query_context: QueryContextRef,
1133 ) -> Result<Output> {
1134 let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
1135 ensure!(
1136 !alter_table_exprs.is_empty(),
1137 EmptyDdlExprSnafu {
1138 name: "alter logical tables"
1139 }
1140 );
1141
1142 let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
1144 for expr in alter_table_exprs {
1145 let catalog = if expr.catalog_name.is_empty() {
1147 query_context.current_catalog()
1148 } else {
1149 &expr.catalog_name
1150 };
1151 let schema = if expr.schema_name.is_empty() {
1152 query_context.current_schema()
1153 } else {
1154 expr.schema_name.clone()
1155 };
1156 let table_name = &expr.table_name;
1157 let table = self
1158 .catalog_manager
1159 .table(catalog, &schema, table_name, Some(&query_context))
1160 .await
1161 .context(CatalogSnafu)?
1162 .with_context(|| TableNotFoundSnafu {
1163 table_name: format_full_table_name(catalog, &schema, table_name),
1164 })?;
1165 let table_id = table.table_info().ident.table_id;
1166 let physical_table_id = self
1167 .table_metadata_manager
1168 .table_route_manager()
1169 .get_physical_table_id(table_id)
1170 .await
1171 .context(TableMetadataManagerSnafu)?;
1172 groups.entry(physical_table_id).or_default().push(expr);
1173 }
1174
1175 let mut handles = Vec::with_capacity(groups.len());
1177 for (_physical_table_id, exprs) in groups {
1178 let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
1179 handles.push(fut);
1180 }
1181 let _results = futures::future::try_join_all(handles).await?;
1182
1183 Ok(Output::new_with_affected_rows(0))
1184 }
1185
1186 #[tracing::instrument(skip_all)]
1187 pub async fn drop_table(
1188 &self,
1189 table_name: TableName,
1190 drop_if_exists: bool,
1191 query_context: QueryContextRef,
1192 ) -> Result<Output> {
1193 self.drop_tables(&[table_name], drop_if_exists, query_context)
1195 .await
1196 }
1197
1198 #[tracing::instrument(skip_all)]
1199 pub async fn drop_tables(
1200 &self,
1201 table_names: &[TableName],
1202 drop_if_exists: bool,
1203 query_context: QueryContextRef,
1204 ) -> Result<Output> {
1205 let mut tables = Vec::with_capacity(table_names.len());
1206 for table_name in table_names {
1207 ensure!(
1208 !is_readonly_schema(&table_name.schema_name),
1209 SchemaReadOnlySnafu {
1210 name: table_name.schema_name.clone()
1211 }
1212 );
1213
1214 if let Some(table) = self
1215 .catalog_manager
1216 .table(
1217 &table_name.catalog_name,
1218 &table_name.schema_name,
1219 &table_name.table_name,
1220 Some(&query_context),
1221 )
1222 .await
1223 .context(CatalogSnafu)?
1224 {
1225 tables.push(table.table_info().table_id());
1226 } else if drop_if_exists {
1227 continue;
1229 } else {
1230 return TableNotFoundSnafu {
1231 table_name: table_name.to_string(),
1232 }
1233 .fail();
1234 }
1235 }
1236
1237 for (table_name, table_id) in table_names.iter().zip(tables.into_iter()) {
1238 self.drop_table_procedure(table_name, table_id, drop_if_exists, query_context.clone())
1239 .await?;
1240
1241 self.cache_invalidator
1243 .invalidate(
1244 &Context::default(),
1245 &[
1246 CacheIdent::TableId(table_id),
1247 CacheIdent::TableName(table_name.clone()),
1248 ],
1249 )
1250 .await
1251 .context(error::InvalidateTableCacheSnafu)?;
1252 }
1253 Ok(Output::new_with_affected_rows(0))
1254 }
1255
1256 #[tracing::instrument(skip_all)]
1257 pub async fn drop_database(
1258 &self,
1259 catalog: String,
1260 schema: String,
1261 drop_if_exists: bool,
1262 query_context: QueryContextRef,
1263 ) -> Result<Output> {
1264 ensure!(
1265 !is_readonly_schema(&schema),
1266 SchemaReadOnlySnafu { name: schema }
1267 );
1268
1269 if self
1270 .catalog_manager
1271 .schema_exists(&catalog, &schema, None)
1272 .await
1273 .context(CatalogSnafu)?
1274 {
1275 if schema == query_context.current_schema() {
1276 SchemaInUseSnafu { name: schema }.fail()
1277 } else {
1278 self.drop_database_procedure(catalog, schema, drop_if_exists, query_context)
1279 .await?;
1280
1281 Ok(Output::new_with_affected_rows(0))
1282 }
1283 } else if drop_if_exists {
1284 Ok(Output::new_with_affected_rows(0))
1286 } else {
1287 SchemaNotFoundSnafu {
1288 schema_info: schema,
1289 }
1290 .fail()
1291 }
1292 }
1293
1294 #[tracing::instrument(skip_all)]
1295 pub async fn truncate_table(
1296 &self,
1297 table_name: TableName,
1298 time_ranges: Vec<(Timestamp, Timestamp)>,
1299 query_context: QueryContextRef,
1300 ) -> Result<Output> {
1301 ensure!(
1302 !is_readonly_schema(&table_name.schema_name),
1303 SchemaReadOnlySnafu {
1304 name: table_name.schema_name.clone()
1305 }
1306 );
1307
1308 let table = self
1309 .catalog_manager
1310 .table(
1311 &table_name.catalog_name,
1312 &table_name.schema_name,
1313 &table_name.table_name,
1314 Some(&query_context),
1315 )
1316 .await
1317 .context(CatalogSnafu)?
1318 .with_context(|| TableNotFoundSnafu {
1319 table_name: table_name.to_string(),
1320 })?;
1321 let table_id = table.table_info().table_id();
1322 self.truncate_table_procedure(&table_name, table_id, time_ranges, query_context)
1323 .await?;
1324
1325 Ok(Output::new_with_affected_rows(0))
1326 }
1327
1328 #[tracing::instrument(skip_all)]
1329 pub async fn alter_table(
1330 &self,
1331 alter_table: AlterTable,
1332 query_context: QueryContextRef,
1333 ) -> Result<Output> {
1334 if matches!(
1335 alter_table.alter_operation(),
1336 AlterTableOperation::Repartition { .. }
1337 ) {
1338 let request = expr_helper::to_repartition_request(alter_table, &query_context)?;
1339 return self.repartition_table(request, &query_context).await;
1340 }
1341
1342 let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
1343 self.alter_table_inner(expr, query_context).await
1344 }
1345
1346 #[tracing::instrument(skip_all)]
1347 pub async fn repartition_table(
1348 &self,
1349 request: RepartitionRequest,
1350 query_context: &QueryContextRef,
1351 ) -> Result<Output> {
1352 ensure!(
1354 !is_readonly_schema(&request.schema_name),
1355 SchemaReadOnlySnafu {
1356 name: request.schema_name.clone()
1357 }
1358 );
1359
1360 let table_ref = TableReference::full(
1361 &request.catalog_name,
1362 &request.schema_name,
1363 &request.table_name,
1364 );
1365 let table = self
1367 .catalog_manager
1368 .table(
1369 &request.catalog_name,
1370 &request.schema_name,
1371 &request.table_name,
1372 Some(query_context),
1373 )
1374 .await
1375 .context(CatalogSnafu)?
1376 .with_context(|| TableNotFoundSnafu {
1377 table_name: table_ref.to_string(),
1378 })?;
1379 let table_id = table.table_info().ident.table_id;
1380 let (physical_table_id, physical_table_route) = self
1382 .table_metadata_manager
1383 .table_route_manager()
1384 .get_physical_table_route(table_id)
1385 .await
1386 .context(TableMetadataManagerSnafu)?;
1387
1388 ensure!(
1389 physical_table_id == table_id,
1390 NotSupportedSnafu {
1391 feat: "REPARTITION on logical tables"
1392 }
1393 );
1394
1395 let table_info = table.table_info();
1396 let existing_partition_columns = table_info.meta.partition_columns().collect::<Vec<_>>();
1398 ensure!(
1400 !existing_partition_columns.is_empty(),
1401 InvalidPartitionRuleSnafu {
1402 reason: format!(
1403 "table {} does not have partition columns, cannot repartition",
1404 table_ref
1405 )
1406 }
1407 );
1408
1409 let column_name_and_type = existing_partition_columns
1412 .iter()
1413 .map(|column| (&column.name, column.data_type.clone()))
1414 .collect();
1415 let timezone = query_context.timezone();
1416 let from_partition_exprs = request
1418 .from_exprs
1419 .iter()
1420 .map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone))
1421 .collect::<Result<Vec<_>>>()?;
1422
1423 let mut into_partition_exprs = request
1424 .into_exprs
1425 .iter()
1426 .map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone))
1427 .collect::<Result<Vec<_>>>()?;
1428
1429 if from_partition_exprs.len() > 1
1432 && into_partition_exprs.len() == 1
1433 && let Some(expr) = into_partition_exprs.pop()
1434 {
1435 into_partition_exprs.push(partition::simplify::simplify_merged_partition_expr(expr));
1436 }
1437
1438 let mut existing_partition_exprs =
1440 Vec::with_capacity(physical_table_route.region_routes.len());
1441 for route in &physical_table_route.region_routes {
1442 let expr_json = route.region.partition_expr();
1443 if !expr_json.is_empty() {
1444 match PartitionExpr::from_json_str(&expr_json) {
1445 Ok(Some(expr)) => existing_partition_exprs.push(expr),
1446 Ok(None) => {
1447 }
1449 Err(e) => {
1450 return Err(e).context(DeserializePartitionExprSnafu);
1451 }
1452 }
1453 }
1454 }
1455
1456 for from_expr in &from_partition_exprs {
1459 ensure!(
1460 existing_partition_exprs.contains(from_expr),
1461 InvalidPartitionRuleSnafu {
1462 reason: format!(
1463 "partition expression '{}' does not exist in table {}",
1464 from_expr, table_ref
1465 )
1466 }
1467 );
1468 }
1469
1470 let new_partition_exprs: Vec<PartitionExpr> = existing_partition_exprs
1473 .into_iter()
1474 .filter(|expr| !from_partition_exprs.contains(expr))
1475 .chain(into_partition_exprs.clone().into_iter())
1476 .collect();
1477 let new_partition_exprs_len = new_partition_exprs.len();
1478 let from_partition_exprs_len = from_partition_exprs.len();
1479
1480 let _ = MultiDimPartitionRule::try_new(
1482 existing_partition_columns
1483 .iter()
1484 .map(|c| c.name.clone())
1485 .collect(),
1486 vec![],
1487 new_partition_exprs,
1488 true,
1489 )
1490 .context(InvalidPartitionSnafu)?;
1491
1492 let ddl_options = parse_ddl_options(&request.options)?;
1493 let serialize_exprs = |exprs: Vec<PartitionExpr>| -> Result<Vec<String>> {
1494 let mut json_exprs = Vec::with_capacity(exprs.len());
1495 for expr in exprs {
1496 json_exprs.push(expr.as_json_str().context(SerializePartitionExprSnafu)?);
1497 }
1498 Ok(json_exprs)
1499 };
1500 let from_partition_exprs_json = serialize_exprs(from_partition_exprs)?;
1501 let into_partition_exprs_json = serialize_exprs(into_partition_exprs)?;
1502 let mut req = SubmitDdlTaskRequest::new(
1503 query_context.clone(),
1504 DdlTask::new_alter_table(AlterTableExpr {
1505 catalog_name: request.catalog_name.clone(),
1506 schema_name: request.schema_name.clone(),
1507 table_name: request.table_name.clone(),
1508 kind: Some(Kind::Repartition(Repartition {
1509 from_partition_exprs: from_partition_exprs_json,
1510 into_partition_exprs: into_partition_exprs_json,
1511 })),
1512 }),
1513 );
1514 req.wait = ddl_options.wait;
1515 req.timeout = ddl_options.timeout;
1516
1517 info!(
1518 "Submitting repartition task for table {} (table_id={}), from {} to {} partitions, timeout: {:?}, wait: {}",
1519 table_ref,
1520 table_id,
1521 from_partition_exprs_len,
1522 new_partition_exprs_len,
1523 ddl_options.timeout,
1524 ddl_options.wait
1525 );
1526
1527 let response = self
1528 .procedure_executor
1529 .submit_ddl_task(&ExecutorContext::default(), req)
1530 .await
1531 .context(error::ExecuteDdlSnafu)?;
1532
1533 if !ddl_options.wait {
1534 return build_procedure_id_output(response.key);
1535 }
1536
1537 let invalidate_keys = vec![
1539 CacheIdent::TableId(table_id),
1540 CacheIdent::TableName(TableName::new(
1541 request.catalog_name,
1542 request.schema_name,
1543 request.table_name,
1544 )),
1545 ];
1546
1547 self.cache_invalidator
1549 .invalidate(&Context::default(), &invalidate_keys)
1550 .await
1551 .context(error::InvalidateTableCacheSnafu)?;
1552
1553 Ok(Output::new_with_affected_rows(0))
1554 }
1555
1556 #[tracing::instrument(skip_all)]
1557 pub async fn alter_table_inner(
1558 &self,
1559 expr: AlterTableExpr,
1560 query_context: QueryContextRef,
1561 ) -> Result<Output> {
1562 ensure!(
1563 !is_readonly_schema(&expr.schema_name),
1564 SchemaReadOnlySnafu {
1565 name: expr.schema_name.clone()
1566 }
1567 );
1568
1569 let catalog_name = if expr.catalog_name.is_empty() {
1570 DEFAULT_CATALOG_NAME.to_string()
1571 } else {
1572 expr.catalog_name.clone()
1573 };
1574
1575 let schema_name = if expr.schema_name.is_empty() {
1576 DEFAULT_SCHEMA_NAME.to_string()
1577 } else {
1578 expr.schema_name.clone()
1579 };
1580
1581 let table_name = expr.table_name.clone();
1582
1583 let table = self
1584 .catalog_manager
1585 .table(
1586 &catalog_name,
1587 &schema_name,
1588 &table_name,
1589 Some(&query_context),
1590 )
1591 .await
1592 .context(CatalogSnafu)?
1593 .with_context(|| TableNotFoundSnafu {
1594 table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
1595 })?;
1596
1597 let table_id = table.table_info().ident.table_id;
1598 let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?;
1599 if !need_alter {
1600 return Ok(Output::new_with_affected_rows(0));
1601 }
1602 info!(
1603 "Table info before alter is {:?}, expr: {:?}",
1604 table.table_info(),
1605 expr
1606 );
1607
1608 let physical_table_id = self
1609 .table_metadata_manager
1610 .table_route_manager()
1611 .get_physical_table_id(table_id)
1612 .await
1613 .context(TableMetadataManagerSnafu)?;
1614
1615 let (req, invalidate_keys) = if physical_table_id == table_id {
1616 let req = SubmitDdlTaskRequest::new(query_context, DdlTask::new_alter_table(expr));
1618
1619 let invalidate_keys = vec![
1620 CacheIdent::TableId(table_id),
1621 CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1622 ];
1623
1624 (req, invalidate_keys)
1625 } else {
1626 let req = SubmitDdlTaskRequest::new(
1628 query_context,
1629 DdlTask::new_alter_logical_tables(vec![expr]),
1630 );
1631
1632 let mut invalidate_keys = vec![
1633 CacheIdent::TableId(physical_table_id),
1634 CacheIdent::TableId(table_id),
1635 CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1636 ];
1637
1638 let physical_table = self
1639 .table_metadata_manager
1640 .table_info_manager()
1641 .get(physical_table_id)
1642 .await
1643 .context(TableMetadataManagerSnafu)?
1644 .map(|x| x.into_inner());
1645 if let Some(physical_table) = physical_table {
1646 let physical_table_name = TableName::new(
1647 physical_table.table_info.catalog_name,
1648 physical_table.table_info.schema_name,
1649 physical_table.table_info.name,
1650 );
1651 invalidate_keys.push(CacheIdent::TableName(physical_table_name));
1652 }
1653
1654 (req, invalidate_keys)
1655 };
1656
1657 self.procedure_executor
1658 .submit_ddl_task(&ExecutorContext::default(), req)
1659 .await
1660 .context(error::ExecuteDdlSnafu)?;
1661
1662 self.cache_invalidator
1664 .invalidate(&Context::default(), &invalidate_keys)
1665 .await
1666 .context(error::InvalidateTableCacheSnafu)?;
1667
1668 Ok(Output::new_with_affected_rows(0))
1669 }
1670
1671 #[cfg(feature = "enterprise")]
1672 #[tracing::instrument(skip_all)]
1673 pub async fn alter_trigger(
1674 &self,
1675 _alter_expr: AlterTrigger,
1676 _query_context: QueryContextRef,
1677 ) -> Result<Output> {
1678 crate::error::NotSupportedSnafu {
1679 feat: "alter trigger",
1680 }
1681 .fail()
1682 }
1683
1684 #[tracing::instrument(skip_all)]
1685 pub async fn alter_database(
1686 &self,
1687 alter_expr: AlterDatabase,
1688 query_context: QueryContextRef,
1689 ) -> Result<Output> {
1690 let alter_expr = expr_helper::to_alter_database_expr(alter_expr, &query_context)?;
1691 self.alter_database_inner(alter_expr, query_context).await
1692 }
1693
1694 #[tracing::instrument(skip_all)]
1695 pub async fn alter_database_inner(
1696 &self,
1697 alter_expr: AlterDatabaseExpr,
1698 query_context: QueryContextRef,
1699 ) -> Result<Output> {
1700 ensure!(
1701 !is_readonly_schema(&alter_expr.schema_name),
1702 SchemaReadOnlySnafu {
1703 name: query_context.current_schema().clone()
1704 }
1705 );
1706
1707 let exists = self
1708 .catalog_manager
1709 .schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
1710 .await
1711 .context(CatalogSnafu)?;
1712 ensure!(
1713 exists,
1714 SchemaNotFoundSnafu {
1715 schema_info: alter_expr.schema_name,
1716 }
1717 );
1718
1719 let cache_ident = [CacheIdent::SchemaName(SchemaName {
1720 catalog_name: alter_expr.catalog_name.clone(),
1721 schema_name: alter_expr.schema_name.clone(),
1722 })];
1723
1724 self.alter_database_procedure(alter_expr, query_context)
1725 .await?;
1726
1727 self.cache_invalidator
1729 .invalidate(&Context::default(), &cache_ident)
1730 .await
1731 .context(error::InvalidateTableCacheSnafu)?;
1732
1733 Ok(Output::new_with_affected_rows(0))
1734 }
1735
1736 async fn create_table_procedure(
1737 &self,
1738 create_table: CreateTableExpr,
1739 partitions: Vec<PartitionExpr>,
1740 table_info: RawTableInfo,
1741 query_context: QueryContextRef,
1742 ) -> Result<SubmitDdlTaskResponse> {
1743 let partitions = partitions
1744 .into_iter()
1745 .map(|expr| expr.as_pb_partition().context(PartitionExprToPbSnafu))
1746 .collect::<Result<Vec<_>>>()?;
1747
1748 let request = SubmitDdlTaskRequest::new(
1749 query_context,
1750 DdlTask::new_create_table(create_table, partitions, table_info),
1751 );
1752
1753 self.procedure_executor
1754 .submit_ddl_task(&ExecutorContext::default(), request)
1755 .await
1756 .context(error::ExecuteDdlSnafu)
1757 }
1758
1759 async fn create_logical_tables_procedure(
1760 &self,
1761 tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
1762 query_context: QueryContextRef,
1763 ) -> Result<SubmitDdlTaskResponse> {
1764 let request = SubmitDdlTaskRequest::new(
1765 query_context,
1766 DdlTask::new_create_logical_tables(tables_data),
1767 );
1768
1769 self.procedure_executor
1770 .submit_ddl_task(&ExecutorContext::default(), request)
1771 .await
1772 .context(error::ExecuteDdlSnafu)
1773 }
1774
1775 async fn alter_logical_tables_procedure(
1776 &self,
1777 tables_data: Vec<AlterTableExpr>,
1778 query_context: QueryContextRef,
1779 ) -> Result<SubmitDdlTaskResponse> {
1780 let request = SubmitDdlTaskRequest::new(
1781 query_context,
1782 DdlTask::new_alter_logical_tables(tables_data),
1783 );
1784
1785 self.procedure_executor
1786 .submit_ddl_task(&ExecutorContext::default(), request)
1787 .await
1788 .context(error::ExecuteDdlSnafu)
1789 }
1790
1791 async fn drop_table_procedure(
1792 &self,
1793 table_name: &TableName,
1794 table_id: TableId,
1795 drop_if_exists: bool,
1796 query_context: QueryContextRef,
1797 ) -> Result<SubmitDdlTaskResponse> {
1798 let request = SubmitDdlTaskRequest::new(
1799 query_context,
1800 DdlTask::new_drop_table(
1801 table_name.catalog_name.clone(),
1802 table_name.schema_name.clone(),
1803 table_name.table_name.clone(),
1804 table_id,
1805 drop_if_exists,
1806 ),
1807 );
1808
1809 self.procedure_executor
1810 .submit_ddl_task(&ExecutorContext::default(), request)
1811 .await
1812 .context(error::ExecuteDdlSnafu)
1813 }
1814
1815 async fn drop_database_procedure(
1816 &self,
1817 catalog: String,
1818 schema: String,
1819 drop_if_exists: bool,
1820 query_context: QueryContextRef,
1821 ) -> Result<SubmitDdlTaskResponse> {
1822 let request = SubmitDdlTaskRequest::new(
1823 query_context,
1824 DdlTask::new_drop_database(catalog, schema, drop_if_exists),
1825 );
1826
1827 self.procedure_executor
1828 .submit_ddl_task(&ExecutorContext::default(), request)
1829 .await
1830 .context(error::ExecuteDdlSnafu)
1831 }
1832
1833 async fn alter_database_procedure(
1834 &self,
1835 alter_expr: AlterDatabaseExpr,
1836 query_context: QueryContextRef,
1837 ) -> Result<SubmitDdlTaskResponse> {
1838 let request =
1839 SubmitDdlTaskRequest::new(query_context, DdlTask::new_alter_database(alter_expr));
1840
1841 self.procedure_executor
1842 .submit_ddl_task(&ExecutorContext::default(), request)
1843 .await
1844 .context(error::ExecuteDdlSnafu)
1845 }
1846
1847 async fn truncate_table_procedure(
1848 &self,
1849 table_name: &TableName,
1850 table_id: TableId,
1851 time_ranges: Vec<(Timestamp, Timestamp)>,
1852 query_context: QueryContextRef,
1853 ) -> Result<SubmitDdlTaskResponse> {
1854 let request = SubmitDdlTaskRequest::new(
1855 query_context,
1856 DdlTask::new_truncate_table(
1857 table_name.catalog_name.clone(),
1858 table_name.schema_name.clone(),
1859 table_name.table_name.clone(),
1860 table_id,
1861 time_ranges,
1862 ),
1863 );
1864
1865 self.procedure_executor
1866 .submit_ddl_task(&ExecutorContext::default(), request)
1867 .await
1868 .context(error::ExecuteDdlSnafu)
1869 }
1870
1871 #[tracing::instrument(skip_all)]
1872 pub async fn create_database(
1873 &self,
1874 database: &str,
1875 create_if_not_exists: bool,
1876 options: HashMap<String, String>,
1877 query_context: QueryContextRef,
1878 ) -> Result<Output> {
1879 let catalog = query_context.current_catalog();
1880 ensure!(
1881 NAME_PATTERN_REG.is_match(catalog),
1882 error::UnexpectedSnafu {
1883 violated: format!("Invalid catalog name: {}", catalog)
1884 }
1885 );
1886
1887 ensure!(
1888 NAME_PATTERN_REG.is_match(database),
1889 error::UnexpectedSnafu {
1890 violated: format!("Invalid database name: {}", database)
1891 }
1892 );
1893
1894 if !self
1895 .catalog_manager
1896 .schema_exists(catalog, database, None)
1897 .await
1898 .context(CatalogSnafu)?
1899 && !self.catalog_manager.is_reserved_schema_name(database)
1900 {
1901 self.create_database_procedure(
1902 catalog.to_string(),
1903 database.to_string(),
1904 create_if_not_exists,
1905 options,
1906 query_context,
1907 )
1908 .await?;
1909
1910 Ok(Output::new_with_affected_rows(1))
1911 } else if create_if_not_exists {
1912 Ok(Output::new_with_affected_rows(1))
1913 } else {
1914 error::SchemaExistsSnafu { name: database }.fail()
1915 }
1916 }
1917
1918 async fn create_database_procedure(
1919 &self,
1920 catalog: String,
1921 database: String,
1922 create_if_not_exists: bool,
1923 options: HashMap<String, String>,
1924 query_context: QueryContextRef,
1925 ) -> Result<SubmitDdlTaskResponse> {
1926 let request = SubmitDdlTaskRequest::new(
1927 query_context,
1928 DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
1929 );
1930
1931 self.procedure_executor
1932 .submit_ddl_task(&ExecutorContext::default(), request)
1933 .await
1934 .context(error::ExecuteDdlSnafu)
1935 }
1936}
1937
1938pub fn parse_partitions(
1940 create_table: &CreateTableExpr,
1941 partitions: Option<Partitions>,
1942 query_ctx: &QueryContextRef,
1943) -> Result<(Vec<PartitionExpr>, Vec<String>)> {
1944 let partition_columns = find_partition_columns(&partitions)?;
1947 let partition_exprs =
1948 find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
1949
1950 let exprs = partition_exprs.clone();
1952 MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true)
1953 .context(InvalidPartitionSnafu)?;
1954
1955 Ok((partition_exprs, partition_columns))
1956}
1957
1958fn parse_partitions_for_logical_validation(
1959 create_table: &CreateTableExpr,
1960 partitions: &Partitions,
1961 query_ctx: &QueryContextRef,
1962) -> Result<(Vec<String>, Vec<PartitionExpr>)> {
1963 let partition_columns = partitions
1964 .column_list
1965 .iter()
1966 .map(|ident| ident.value.clone())
1967 .collect::<Vec<_>>();
1968
1969 let column_name_and_type = partition_columns
1970 .iter()
1971 .map(|pc| {
1972 let column = create_table
1973 .column_defs
1974 .iter()
1975 .find(|c| &c.name == pc)
1976 .context(ColumnNotFoundSnafu { msg: pc.clone() })?;
1977 let column_name = &column.name;
1978 let data_type = ConcreteDataType::from(
1979 ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
1980 .context(ColumnDataTypeSnafu)?,
1981 );
1982 Ok((column_name, data_type))
1983 })
1984 .collect::<Result<HashMap<_, _>>>()?;
1985
1986 let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
1987 for expr in &partitions.exprs {
1988 let partition_expr = convert_one_expr(expr, &column_name_and_type, &query_ctx.timezone())?;
1989 partition_exprs.push(partition_expr);
1990 }
1991
1992 MultiDimPartitionRule::try_new(
1993 partition_columns.clone(),
1994 vec![],
1995 partition_exprs.clone(),
1996 true,
1997 )
1998 .context(InvalidPartitionSnafu)?;
1999
2000 Ok((partition_columns, partition_exprs))
2001}
2002
2003pub fn verify_alter(
2009 table_id: TableId,
2010 table_info: Arc<TableInfo>,
2011 expr: AlterTableExpr,
2012) -> Result<bool> {
2013 let request: AlterTableRequest =
2014 common_grpc_expr::alter_expr_to_request(table_id, expr, Some(&table_info.meta))
2015 .context(AlterExprToRequestSnafu)?;
2016
2017 let AlterTableRequest {
2018 table_name,
2019 alter_kind,
2020 ..
2021 } = &request;
2022
2023 if let AlterKind::RenameTable { new_table_name } = alter_kind {
2024 ensure!(
2025 NAME_PATTERN_REG.is_match(new_table_name),
2026 error::UnexpectedSnafu {
2027 violated: format!("Invalid table name: {}", new_table_name)
2028 }
2029 );
2030 } else if let AlterKind::AddColumns { columns } = alter_kind {
2031 let column_names: HashSet<_> = table_info
2034 .meta
2035 .schema
2036 .column_schemas()
2037 .iter()
2038 .map(|schema| &schema.name)
2039 .collect();
2040 if columns.iter().all(|column| {
2041 column_names.contains(&column.column_schema.name) && column.add_if_not_exists
2042 }) {
2043 return Ok(false);
2044 }
2045 }
2046
2047 let _ = table_info
2048 .meta
2049 .builder_with_alter_kind(table_name, &request.alter_kind)
2050 .context(error::TableSnafu)?
2051 .build()
2052 .context(error::BuildTableMetaSnafu { table_name })?;
2053
2054 Ok(true)
2055}
2056
2057pub fn create_table_info(
2058 create_table: &CreateTableExpr,
2059 partition_columns: Vec<String>,
2060) -> Result<RawTableInfo> {
2061 let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
2062 let mut column_name_to_index_map = HashMap::new();
2063
2064 for (idx, column) in create_table.column_defs.iter().enumerate() {
2065 let schema =
2066 column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
2067 column: &column.name,
2068 })?;
2069 let schema = schema.with_time_index(column.name == create_table.time_index);
2070
2071 column_schemas.push(schema);
2072 let _ = column_name_to_index_map.insert(column.name.clone(), idx);
2073 }
2074
2075 let timestamp_index = column_name_to_index_map
2076 .get(&create_table.time_index)
2077 .cloned();
2078
2079 let raw_schema = RawSchema {
2080 column_schemas: column_schemas.clone(),
2081 timestamp_index,
2082 version: 0,
2083 };
2084
2085 let primary_key_indices = create_table
2086 .primary_keys
2087 .iter()
2088 .map(|name| {
2089 column_name_to_index_map
2090 .get(name)
2091 .cloned()
2092 .context(ColumnNotFoundSnafu { msg: name })
2093 })
2094 .collect::<Result<Vec<_>>>()?;
2095
2096 let partition_key_indices = partition_columns
2097 .into_iter()
2098 .map(|col_name| {
2099 column_name_to_index_map
2100 .get(&col_name)
2101 .cloned()
2102 .context(ColumnNotFoundSnafu { msg: col_name })
2103 })
2104 .collect::<Result<Vec<_>>>()?;
2105
2106 let table_options = TableOptions::try_from_iter(&create_table.table_options)
2107 .context(UnrecognizedTableOptionSnafu)?;
2108
2109 let meta = RawTableMeta {
2110 schema: raw_schema,
2111 primary_key_indices,
2112 value_indices: vec![],
2113 engine: create_table.engine.clone(),
2114 next_column_id: column_schemas.len() as u32,
2115 options: table_options,
2116 created_on: Utc::now(),
2117 updated_on: Utc::now(),
2118 partition_key_indices,
2119 column_ids: vec![],
2120 };
2121
2122 let desc = if create_table.desc.is_empty() {
2123 create_table.table_options.get(COMMENT_KEY).cloned()
2124 } else {
2125 Some(create_table.desc.clone())
2126 };
2127
2128 let table_info = RawTableInfo {
2129 ident: metadata::TableIdent {
2130 table_id: 0,
2132 version: 0,
2133 },
2134 name: create_table.table_name.clone(),
2135 desc,
2136 catalog_name: create_table.catalog_name.clone(),
2137 schema_name: create_table.schema_name.clone(),
2138 meta,
2139 table_type: TableType::Base,
2140 };
2141 Ok(table_info)
2142}
2143
2144fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
2145 let columns = if let Some(partitions) = partitions {
2146 partitions
2147 .column_list
2148 .iter()
2149 .map(|x| x.value.clone())
2150 .collect::<Vec<_>>()
2151 } else {
2152 vec![]
2153 };
2154 Ok(columns)
2155}
2156
2157fn find_partition_entries(
2161 create_table: &CreateTableExpr,
2162 partitions: &Option<Partitions>,
2163 partition_columns: &[String],
2164 query_ctx: &QueryContextRef,
2165) -> Result<Vec<PartitionExpr>> {
2166 let Some(partitions) = partitions else {
2167 return Ok(vec![]);
2168 };
2169
2170 let column_name_and_type = partition_columns
2172 .iter()
2173 .map(|pc| {
2174 let column = create_table
2175 .column_defs
2176 .iter()
2177 .find(|c| &c.name == pc)
2178 .unwrap();
2180 let column_name = &column.name;
2181 let data_type = ConcreteDataType::from(
2182 ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
2183 .context(ColumnDataTypeSnafu)?,
2184 );
2185 Ok((column_name, data_type))
2186 })
2187 .collect::<Result<HashMap<_, _>>>()?;
2188
2189 let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
2191 for partition in &partitions.exprs {
2192 let partition_expr =
2193 convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
2194 partition_exprs.push(partition_expr);
2195 }
2196
2197 Ok(partition_exprs)
2198}
2199
2200fn convert_one_expr(
2201 expr: &Expr,
2202 column_name_and_type: &HashMap<&String, ConcreteDataType>,
2203 timezone: &Timezone,
2204) -> Result<PartitionExpr> {
2205 let Expr::BinaryOp { left, op, right } = expr else {
2206 return InvalidPartitionRuleSnafu {
2207 reason: "partition rule must be a binary expression",
2208 }
2209 .fail();
2210 };
2211
2212 let op =
2213 RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
2214 reason: format!("unsupported operator in partition expr {op}"),
2215 })?;
2216
2217 let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
2219 (Expr::Identifier(ident), Expr::Value(value)) => {
2221 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2222 let value = convert_value(&value.value, data_type, timezone, None)?;
2223 (Operand::Column(column_name), op, Operand::Value(value))
2224 }
2225 (Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
2226 if let Expr::Value(v) = &**expr =>
2227 {
2228 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2229 let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
2230 (Operand::Column(column_name), op, Operand::Value(value))
2231 }
2232 (Expr::Value(value), Expr::Identifier(ident)) => {
2234 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2235 let value = convert_value(&value.value, data_type, timezone, None)?;
2236 (Operand::Value(value), op, Operand::Column(column_name))
2237 }
2238 (Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
2239 if let Expr::Value(v) = &**expr =>
2240 {
2241 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2242 let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
2243 (Operand::Value(value), op, Operand::Column(column_name))
2244 }
2245 (Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
2246 let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
2248 let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
2249 (Operand::Expr(lhs), op, Operand::Expr(rhs))
2250 }
2251 _ => {
2252 return InvalidPartitionRuleSnafu {
2253 reason: format!("invalid partition expr {expr}"),
2254 }
2255 .fail();
2256 }
2257 };
2258
2259 Ok(PartitionExpr::new(lhs, op, rhs))
2260}
2261
2262fn convert_identifier(
2263 ident: &Ident,
2264 column_name_and_type: &HashMap<&String, ConcreteDataType>,
2265) -> Result<(String, ConcreteDataType)> {
2266 let column_name = ident.value.clone();
2267 let data_type = column_name_and_type
2268 .get(&column_name)
2269 .cloned()
2270 .with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
2271 Ok((column_name, data_type))
2272}
2273
2274fn convert_value(
2275 value: &ParserValue,
2276 data_type: ConcreteDataType,
2277 timezone: &Timezone,
2278 unary_op: Option<UnaryOperator>,
2279) -> Result<Value> {
2280 sql_value_to_value(
2281 &ColumnSchema::new("<NONAME>", data_type, true),
2282 value,
2283 Some(timezone),
2284 unary_op,
2285 false,
2286 )
2287 .context(error::SqlCommonSnafu)
2288}
2289
2290#[cfg(test)]
2291mod test {
2292 use std::time::Duration;
2293
2294 use session::context::{QueryContext, QueryContextBuilder};
2295 use sql::dialect::GreptimeDbDialect;
2296 use sql::parser::{ParseOptions, ParserContext};
2297 use sql::statements::statement::Statement;
2298 use sqlparser::parser::Parser;
2299
2300 use super::*;
2301 use crate::expr_helper;
2302
2303 #[test]
2304 fn test_parse_ddl_options() {
2305 let options = OptionMap::from([
2306 ("timeout".to_string(), "5m".to_string()),
2307 ("wait".to_string(), "false".to_string()),
2308 ]);
2309 let ddl_options = parse_ddl_options(&options).unwrap();
2310 assert!(!ddl_options.wait);
2311 assert_eq!(Duration::from_secs(300), ddl_options.timeout);
2312 }
2313
2314 #[test]
2315 fn test_name_is_match() {
2316 assert!(!NAME_PATTERN_REG.is_match("/adaf"));
2317 assert!(!NAME_PATTERN_REG.is_match("🈲"));
2318 assert!(NAME_PATTERN_REG.is_match("hello"));
2319 assert!(NAME_PATTERN_REG.is_match("test@"));
2320 assert!(!NAME_PATTERN_REG.is_match("@test"));
2321 assert!(NAME_PATTERN_REG.is_match("test#"));
2322 assert!(!NAME_PATTERN_REG.is_match("#test"));
2323 assert!(!NAME_PATTERN_REG.is_match("@"));
2324 assert!(!NAME_PATTERN_REG.is_match("#"));
2325 }
2326
2327 #[test]
2328 fn test_partition_expr_equivalence_with_swapped_operands() {
2329 let column_name = "device_id".to_string();
2330 let column_name_and_type =
2331 HashMap::from([(&column_name, ConcreteDataType::int32_datatype())]);
2332 let timezone = Timezone::from_tz_string("UTC").unwrap();
2333 let dialect = GreptimeDbDialect {};
2334
2335 let mut parser = Parser::new(&dialect)
2336 .try_with_sql("device_id < 100")
2337 .unwrap();
2338 let expr_left = parser.parse_expr().unwrap();
2339
2340 let mut parser = Parser::new(&dialect)
2341 .try_with_sql("100 > device_id")
2342 .unwrap();
2343 let expr_right = parser.parse_expr().unwrap();
2344
2345 let partition_left =
2346 convert_one_expr(&expr_left, &column_name_and_type, &timezone).unwrap();
2347 let partition_right =
2348 convert_one_expr(&expr_right, &column_name_and_type, &timezone).unwrap();
2349
2350 assert_eq!(partition_left, partition_right);
2351 assert!([partition_left.clone()].contains(&partition_right));
2352
2353 let mut physical_partition_exprs = vec![partition_left];
2354 let mut logical_partition_exprs = vec![partition_right];
2355 physical_partition_exprs.sort_unstable();
2356 logical_partition_exprs.sort_unstable();
2357 assert_eq!(physical_partition_exprs, logical_partition_exprs);
2358 }
2359
2360 #[tokio::test]
2361 #[ignore = "TODO(ruihang): WIP new partition rule"]
2362 async fn test_parse_partitions() {
2363 common_telemetry::init_default_ut_logging();
2364 let cases = [
2365 (
2366 r"
2367CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
2368PARTITION ON COLUMNS (b) (
2369 b < 'hz',
2370 b >= 'hz' AND b < 'sh',
2371 b >= 'sh'
2372)
2373ENGINE=mito",
2374 r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
2375 ),
2376 (
2377 r"
2378CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
2379PARTITION BY RANGE COLUMNS (b, a) (
2380 PARTITION r0 VALUES LESS THAN ('hz', 10),
2381 b < 'hz' AND a < 10,
2382 b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20,
2383 b >= 'sh' AND a >= 20
2384)
2385ENGINE=mito",
2386 r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
2387 ),
2388 ];
2389 let ctx = QueryContextBuilder::default().build().into();
2390 for (sql, expected) in cases {
2391 let result = ParserContext::create_with_dialect(
2392 sql,
2393 &GreptimeDbDialect {},
2394 ParseOptions::default(),
2395 )
2396 .unwrap();
2397 match &result[0] {
2398 Statement::CreateTable(c) => {
2399 let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
2400 let (partitions, _) =
2401 parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
2402 let json = serde_json::to_string(&partitions).unwrap();
2403 assert_eq!(json, expected);
2404 }
2405 _ => unreachable!(),
2406 }
2407 }
2408 }
2409}