1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::helper::ColumnDataTypeWrapper;
20use api::v1::alter_table_expr::Kind;
21use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
22use api::v1::{
23 AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
24 Repartition, column_def,
25};
26#[cfg(feature = "enterprise")]
27use api::v1::{
28 CreateTriggerExpr as PbCreateTriggerExpr, meta::CreateTriggerTask as PbCreateTriggerTask,
29};
30use catalog::CatalogManagerRef;
31use chrono::Utc;
32use common_base::regex_pattern::NAME_PATTERN_REG;
33use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, is_readonly_schema};
34use common_catalog::{format_full_flow_name, format_full_table_name};
35use common_error::ext::BoxedError;
36use common_meta::cache_invalidator::Context;
37use common_meta::ddl::create_flow::FlowType;
38use common_meta::instruction::CacheIdent;
39use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
40use common_meta::procedure_executor::ExecutorContext;
41#[cfg(feature = "enterprise")]
42use common_meta::rpc::ddl::trigger::CreateTriggerTask;
43#[cfg(feature = "enterprise")]
44use common_meta::rpc::ddl::trigger::DropTriggerTask;
45use common_meta::rpc::ddl::{
46 CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
47 SubmitDdlTaskResponse,
48};
49use common_query::Output;
50use common_recordbatch::{RecordBatch, RecordBatches};
51use common_sql::convert::sql_value_to_value;
52use common_telemetry::{debug, info, tracing, warn};
53use common_time::{Timestamp, Timezone};
54use datafusion_common::tree_node::TreeNodeVisitor;
55use datafusion_expr::LogicalPlan;
56use datatypes::prelude::ConcreteDataType;
57use datatypes::schema::{ColumnSchema, Schema};
58use datatypes::value::Value;
59use datatypes::vectors::{StringVector, VectorRef};
60use humantime::parse_duration;
61use partition::expr::{Operand, PartitionExpr, RestrictedOp};
62use partition::multi_dim::MultiDimPartitionRule;
63use query::parser::QueryStatement;
64use query::plan::extract_and_rewrite_full_table_names;
65use query::query_engine::DefaultSerializer;
66use query::sql::create_table_stmt;
67use session::context::QueryContextRef;
68use session::table_name::table_idents_to_full_name;
69use snafu::{OptionExt, ResultExt, ensure};
70use sql::parser::{ParseOptions, ParserContext};
71use sql::statements::OptionMap;
72#[cfg(feature = "enterprise")]
73use sql::statements::alter::trigger::AlterTrigger;
74use sql::statements::alter::{AlterDatabase, AlterTable, AlterTableOperation};
75#[cfg(feature = "enterprise")]
76use sql::statements::create::trigger::CreateTrigger;
77use sql::statements::create::{
78 CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
79};
80use sql::statements::statement::Statement;
81use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
82use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
83use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
84use table::TableRef;
85use table::dist_table::DistTable;
86use table::metadata::{self, TableId, TableInfo, TableMeta, TableType};
87use table::requests::{
88 AlterKind, AlterTableRequest, COMMENT_KEY, DDL_TIMEOUT, DDL_WAIT, TableOptions,
89};
90use table::table_name::TableName;
91use table::table_reference::TableReference;
92
93use crate::error::{
94 self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
95 ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu,
96 DeserializePartitionExprSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
97 FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
98 InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu,
99 PartitionExprToPbSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu,
100 SerializePartitionExprSnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu,
101 TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
102 ViewAlreadyExistsSnafu,
103};
104use crate::expr_helper::{self, RepartitionRequest};
105use crate::statement::StatementExecutor;
106use crate::statement::show::create_partitions_stmt;
107use crate::utils::to_meta_query_context;
108
109#[derive(Debug, Clone, Copy)]
110struct DdlSubmitOptions {
111 wait: bool,
112 timeout: Duration,
113}
114
115fn build_procedure_id_output(procedure_id: Vec<u8>) -> Result<Output> {
116 let procedure_id = String::from_utf8_lossy(&procedure_id).to_string();
117 let vector: VectorRef = Arc::new(StringVector::from(vec![procedure_id]));
118 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
119 "Procedure ID",
120 vector.data_type(),
121 false,
122 )]));
123 let batch =
124 RecordBatch::new(schema.clone(), vec![vector]).context(error::BuildRecordBatchSnafu)?;
125 let batches =
126 RecordBatches::try_new(schema, vec![batch]).context(error::BuildRecordBatchSnafu)?;
127 Ok(Output::new_with_record_batches(batches))
128}
129
130fn parse_ddl_options(options: &OptionMap) -> Result<DdlSubmitOptions> {
131 let wait = match options.get(DDL_WAIT) {
132 Some(value) => value.parse::<bool>().map_err(|_| {
133 InvalidSqlSnafu {
134 err_msg: format!("invalid DDL option '{DDL_WAIT}': '{value}'"),
135 }
136 .build()
137 })?,
138 None => SubmitDdlTaskRequest::default_wait(),
139 };
140
141 let timeout = match options.get(DDL_TIMEOUT) {
142 Some(value) => parse_duration(value).map_err(|err| {
143 InvalidSqlSnafu {
144 err_msg: format!("invalid DDL option '{DDL_TIMEOUT}': '{value}': {err}"),
145 }
146 .build()
147 })?,
148 None => SubmitDdlTaskRequest::default_timeout(),
149 };
150
151 Ok(DdlSubmitOptions { wait, timeout })
152}
153
154impl StatementExecutor {
155 pub fn catalog_manager(&self) -> CatalogManagerRef {
156 self.catalog_manager.clone()
157 }
158
159 #[tracing::instrument(skip_all)]
160 pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
161 let (catalog, schema, _table) = table_idents_to_full_name(&stmt.name, &ctx)
162 .map_err(BoxedError::new)
163 .context(error::ExternalSnafu)?;
164
165 let schema_options = self
166 .table_metadata_manager
167 .schema_manager()
168 .get(SchemaNameKey {
169 catalog: &catalog,
170 schema: &schema,
171 })
172 .await
173 .context(TableMetadataManagerSnafu)?
174 .map(|v| v.into_inner());
175
176 let create_expr = &mut expr_helper::create_to_expr(&stmt, &ctx)?;
177 if let Some(schema_options) = schema_options {
180 for (key, value) in schema_options.extra_options.iter() {
181 if key.starts_with("compaction.") {
182 continue;
183 }
184 create_expr
185 .table_options
186 .entry(key.clone())
187 .or_insert(value.clone());
188 }
189 }
190
191 self.create_table_inner(create_expr, stmt.partitions, ctx)
192 .await
193 }
194
195 #[tracing::instrument(skip_all)]
196 pub async fn create_table_like(
197 &self,
198 stmt: CreateTableLike,
199 ctx: QueryContextRef,
200 ) -> Result<TableRef> {
201 let (catalog, schema, table) = table_idents_to_full_name(&stmt.source_name, &ctx)
202 .map_err(BoxedError::new)
203 .context(error::ExternalSnafu)?;
204 let table_ref = self
205 .catalog_manager
206 .table(&catalog, &schema, &table, Some(&ctx))
207 .await
208 .context(CatalogSnafu)?
209 .context(TableNotFoundSnafu { table_name: &table })?;
210 let partition_info = self
211 .partition_manager
212 .find_physical_partition_info(table_ref.table_info().table_id())
213 .await
214 .context(error::FindTablePartitionRuleSnafu { table_name: table })?;
215
216 let schema_options = self
218 .table_metadata_manager
219 .schema_manager()
220 .get(SchemaNameKey {
221 catalog: &catalog,
222 schema: &schema,
223 })
224 .await
225 .context(TableMetadataManagerSnafu)?
226 .map(|v| v.into_inner());
227
228 let quote_style = ctx.quote_style();
229 let mut create_stmt =
230 create_table_stmt(&table_ref.table_info(), schema_options, quote_style)
231 .context(error::ParseQuerySnafu)?;
232 create_stmt.name = stmt.table_name;
233 create_stmt.if_not_exists = false;
234
235 let table_info = table_ref.table_info();
236 let partitions = create_partitions_stmt(&table_info, &partition_info.partitions)?.and_then(
237 |mut partitions| {
238 if !partitions.column_list.is_empty() {
239 partitions.set_quote(quote_style);
240 Some(partitions)
241 } else {
242 None
243 }
244 },
245 );
246
247 let create_expr = &mut expr_helper::create_to_expr(&create_stmt, &ctx)?;
248 self.create_table_inner(create_expr, partitions, ctx).await
249 }
250
251 #[tracing::instrument(skip_all)]
252 pub async fn create_external_table(
253 &self,
254 create_expr: CreateExternalTable,
255 ctx: QueryContextRef,
256 ) -> Result<TableRef> {
257 let create_expr = &mut expr_helper::create_external_expr(create_expr, &ctx).await?;
258 self.create_table_inner(create_expr, None, ctx).await
259 }
260
261 #[tracing::instrument(skip_all)]
262 pub async fn create_table_inner(
263 &self,
264 create_table: &mut CreateTableExpr,
265 partitions: Option<Partitions>,
266 query_ctx: QueryContextRef,
267 ) -> Result<TableRef> {
268 ensure!(
269 !is_readonly_schema(&create_table.schema_name),
270 SchemaReadOnlySnafu {
271 name: create_table.schema_name.clone()
272 }
273 );
274
275 if create_table.engine == METRIC_ENGINE_NAME
276 && create_table
277 .table_options
278 .contains_key(LOGICAL_TABLE_METADATA_KEY)
279 {
280 if let Some(partitions) = partitions.as_ref()
281 && !partitions.exprs.is_empty()
282 {
283 self.validate_logical_table_partition_rule(create_table, partitions, &query_ctx)
284 .await?;
285 }
286 self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
288 .await?
289 .into_iter()
290 .next()
291 .context(error::UnexpectedSnafu {
292 violated: "expected to create logical tables",
293 })
294 } else {
295 self.create_non_logic_table(create_table, partitions, query_ctx)
297 .await
298 }
299 }
300
301 #[tracing::instrument(skip_all)]
302 pub async fn create_non_logic_table(
303 &self,
304 create_table: &mut CreateTableExpr,
305 partitions: Option<Partitions>,
306 query_ctx: QueryContextRef,
307 ) -> Result<TableRef> {
308 let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
309
310 let schema = self
312 .table_metadata_manager
313 .schema_manager()
314 .get(SchemaNameKey::new(
315 &create_table.catalog_name,
316 &create_table.schema_name,
317 ))
318 .await
319 .context(TableMetadataManagerSnafu)?;
320 ensure!(
321 schema.is_some(),
322 SchemaNotFoundSnafu {
323 schema_info: &create_table.schema_name,
324 }
325 );
326
327 if let Some(table) = self
329 .catalog_manager
330 .table(
331 &create_table.catalog_name,
332 &create_table.schema_name,
333 &create_table.table_name,
334 Some(&query_ctx),
335 )
336 .await
337 .context(CatalogSnafu)?
338 {
339 return if create_table.create_if_not_exists {
340 Ok(table)
341 } else {
342 TableAlreadyExistsSnafu {
343 table: format_full_table_name(
344 &create_table.catalog_name,
345 &create_table.schema_name,
346 &create_table.table_name,
347 ),
348 }
349 .fail()
350 };
351 }
352
353 ensure!(
354 NAME_PATTERN_REG.is_match(&create_table.table_name),
355 InvalidTableNameSnafu {
356 table_name: &create_table.table_name,
357 }
358 );
359
360 let table_name = TableName::new(
361 &create_table.catalog_name,
362 &create_table.schema_name,
363 &create_table.table_name,
364 );
365
366 let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
367 let mut table_info = create_table_info(create_table, partition_cols)?;
368
369 let resp = self
370 .create_table_procedure(
371 create_table.clone(),
372 partitions,
373 table_info.clone(),
374 query_ctx,
375 )
376 .await?;
377
378 let table_id = resp
379 .table_ids
380 .into_iter()
381 .next()
382 .context(error::UnexpectedSnafu {
383 violated: "expected table_id",
384 })?;
385 info!("Successfully created table '{table_name}' with table id {table_id}");
386
387 table_info.ident.table_id = table_id;
388
389 let table_info = Arc::new(table_info);
390 create_table.table_id = Some(api::v1::TableId { id: table_id });
391
392 let table = DistTable::table(table_info);
393
394 Ok(table)
395 }
396
397 #[tracing::instrument(skip_all)]
398 pub async fn create_logical_tables(
399 &self,
400 create_table_exprs: &[CreateTableExpr],
401 query_context: QueryContextRef,
402 ) -> Result<Vec<TableRef>> {
403 let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
404 ensure!(
405 !create_table_exprs.is_empty(),
406 EmptyDdlExprSnafu {
407 name: "create logic tables"
408 }
409 );
410
411 for create_table in create_table_exprs {
413 ensure!(
414 NAME_PATTERN_REG.is_match(&create_table.table_name),
415 InvalidTableNameSnafu {
416 table_name: &create_table.table_name,
417 }
418 );
419 }
420
421 let raw_tables_info = create_table_exprs
422 .iter()
423 .map(|create| create_table_info(create, vec![]))
424 .collect::<Result<Vec<_>>>()?;
425 let tables_data = create_table_exprs
426 .iter()
427 .cloned()
428 .zip(raw_tables_info.iter().cloned())
429 .collect::<Vec<_>>();
430
431 let resp = self
432 .create_logical_tables_procedure(tables_data, query_context.clone())
433 .await?;
434
435 let table_ids = resp.table_ids;
436 ensure!(
437 table_ids.len() == raw_tables_info.len(),
438 CreateLogicalTablesSnafu {
439 reason: format!(
440 "The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}",
441 raw_tables_info.len(),
442 table_ids.len()
443 )
444 }
445 );
446 info!("Successfully created logical tables: {:?}", table_ids);
447
448 let mut tables_info = Vec::with_capacity(table_ids.len());
452 for (table_id, create_table) in table_ids.iter().zip(create_table_exprs.iter()) {
453 let table = self
454 .catalog_manager
455 .table(
456 &create_table.catalog_name,
457 &create_table.schema_name,
458 &create_table.table_name,
459 Some(&query_context),
460 )
461 .await
462 .context(CatalogSnafu)?
463 .with_context(|| TableNotFoundSnafu {
464 table_name: format_full_table_name(
465 &create_table.catalog_name,
466 &create_table.schema_name,
467 &create_table.table_name,
468 ),
469 })?;
470
471 let table_info = table.table_info();
472 ensure!(
474 table_info.table_id() == *table_id,
475 CreateLogicalTablesSnafu {
476 reason: format!(
477 "Table id mismatch after creation, expected {}, got {} for table {}",
478 table_id,
479 table_info.table_id(),
480 format_full_table_name(
481 &create_table.catalog_name,
482 &create_table.schema_name,
483 &create_table.table_name
484 )
485 )
486 }
487 );
488
489 tables_info.push(table_info);
490 }
491
492 Ok(tables_info.into_iter().map(DistTable::table).collect())
493 }
494
495 async fn validate_logical_table_partition_rule(
496 &self,
497 create_table: &CreateTableExpr,
498 partitions: &Partitions,
499 query_ctx: &QueryContextRef,
500 ) -> Result<()> {
501 let (_, mut logical_partition_exprs) =
502 parse_partitions_for_logical_validation(create_table, partitions, query_ctx)?;
503
504 let physical_table_name = create_table
505 .table_options
506 .get(LOGICAL_TABLE_METADATA_KEY)
507 .with_context(|| CreateLogicalTablesSnafu {
508 reason: format!(
509 "expect `{LOGICAL_TABLE_METADATA_KEY}` option on creating logical table"
510 ),
511 })?;
512
513 let physical_table = self
514 .catalog_manager
515 .table(
516 &create_table.catalog_name,
517 &create_table.schema_name,
518 physical_table_name,
519 Some(query_ctx),
520 )
521 .await
522 .context(CatalogSnafu)?
523 .context(TableNotFoundSnafu {
524 table_name: physical_table_name.clone(),
525 })?;
526
527 let physical_table_info = physical_table.table_info();
528 let (partition_rule, _) = self
529 .partition_manager
530 .find_table_partition_rule(&physical_table_info)
531 .await
532 .context(error::FindTablePartitionRuleSnafu {
533 table_name: physical_table_name.clone(),
534 })?;
535
536 let multi_dim_rule = partition_rule
537 .as_ref()
538 .as_any()
539 .downcast_ref::<MultiDimPartitionRule>()
540 .context(InvalidPartitionRuleSnafu {
541 reason: "physical table partition rule is not range-based",
542 })?;
543
544 let mut physical_partition_exprs = multi_dim_rule.exprs().to_vec();
546 logical_partition_exprs.sort_unstable();
547 physical_partition_exprs.sort_unstable();
548
549 ensure!(
550 physical_partition_exprs == logical_partition_exprs,
551 InvalidPartitionRuleSnafu {
552 reason: format!(
553 "logical table partition rule must match the corresponding physical table's\n logical table partition exprs:\t\t {:?}\n physical table partition exprs:\t {:?}",
554 logical_partition_exprs, physical_partition_exprs
555 ),
556 }
557 );
558
559 Ok(())
560 }
561
562 #[cfg(feature = "enterprise")]
563 #[tracing::instrument(skip_all)]
564 pub async fn create_trigger(
565 &self,
566 stmt: CreateTrigger,
567 query_context: QueryContextRef,
568 ) -> Result<Output> {
569 let expr = expr_helper::to_create_trigger_task_expr(stmt, &query_context)?;
570 self.create_trigger_inner(expr, query_context).await
571 }
572
573 #[cfg(feature = "enterprise")]
574 pub async fn create_trigger_inner(
575 &self,
576 expr: PbCreateTriggerExpr,
577 query_context: QueryContextRef,
578 ) -> Result<Output> {
579 self.create_trigger_procedure(expr, query_context).await?;
580 Ok(Output::new_with_affected_rows(0))
581 }
582
583 #[cfg(feature = "enterprise")]
584 async fn create_trigger_procedure(
585 &self,
586 expr: PbCreateTriggerExpr,
587 query_context: QueryContextRef,
588 ) -> Result<SubmitDdlTaskResponse> {
589 let task = CreateTriggerTask::try_from(PbCreateTriggerTask {
590 create_trigger: Some(expr),
591 })
592 .context(error::InvalidExprSnafu)?;
593
594 let request = SubmitDdlTaskRequest::new(
595 to_meta_query_context(query_context),
596 DdlTask::new_create_trigger(task),
597 );
598
599 self.procedure_executor
600 .submit_ddl_task(&ExecutorContext::default(), request)
601 .await
602 .context(error::ExecuteDdlSnafu)
603 }
604
605 #[tracing::instrument(skip_all)]
606 pub async fn create_flow(
607 &self,
608 stmt: CreateFlow,
609 query_context: QueryContextRef,
610 ) -> Result<Output> {
611 let expr = expr_helper::to_create_flow_task_expr(stmt, &query_context)?;
613
614 self.create_flow_inner(expr, query_context).await
615 }
616
617 pub async fn create_flow_inner(
618 &self,
619 expr: CreateFlowExpr,
620 query_context: QueryContextRef,
621 ) -> Result<Output> {
622 self.create_flow_procedure(expr, query_context).await?;
623 Ok(Output::new_with_affected_rows(0))
624 }
625
626 async fn create_flow_procedure(
627 &self,
628 expr: CreateFlowExpr,
629 query_context: QueryContextRef,
630 ) -> Result<SubmitDdlTaskResponse> {
631 let flow_type = self
632 .determine_flow_type(&expr, query_context.clone())
633 .await?;
634 info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
635
636 let expr = {
637 let mut expr = expr;
638 expr.flow_options
639 .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
640 expr
641 };
642
643 let task = CreateFlowTask::try_from(PbCreateFlowTask {
644 create_flow: Some(expr),
645 })
646 .context(error::InvalidExprSnafu)?;
647 let request = SubmitDdlTaskRequest::new(
648 to_meta_query_context(query_context),
649 DdlTask::new_create_flow(task),
650 );
651
652 self.procedure_executor
653 .submit_ddl_task(&ExecutorContext::default(), request)
654 .await
655 .context(error::ExecuteDdlSnafu)
656 }
657
658 async fn determine_flow_type(
662 &self,
663 expr: &CreateFlowExpr,
664 query_ctx: QueryContextRef,
665 ) -> Result<FlowType> {
666 for src_table_name in &expr.source_table_names {
668 let table = self
669 .catalog_manager()
670 .table(
671 &src_table_name.catalog_name,
672 &src_table_name.schema_name,
673 &src_table_name.table_name,
674 Some(&query_ctx),
675 )
676 .await
677 .map_err(BoxedError::new)
678 .context(ExternalSnafu)?
679 .with_context(|| TableNotFoundSnafu {
680 table_name: format_full_table_name(
681 &src_table_name.catalog_name,
682 &src_table_name.schema_name,
683 &src_table_name.table_name,
684 ),
685 })?;
686
687 if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
689 warn!(
690 "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
691 format_full_table_name(
692 &src_table_name.catalog_name,
693 &src_table_name.schema_name,
694 &src_table_name.table_name
695 ),
696 expr.flow_name
697 );
698 return Ok(FlowType::Streaming);
699 }
700 }
701
702 let engine = &self.query_engine;
703 let stmts = ParserContext::create_with_dialect(
704 &expr.sql,
705 query_ctx.sql_dialect(),
706 ParseOptions::default(),
707 )
708 .map_err(BoxedError::new)
709 .context(ExternalSnafu)?;
710
711 ensure!(
712 stmts.len() == 1,
713 InvalidSqlSnafu {
714 err_msg: format!("Expect only one statement, found {}", stmts.len())
715 }
716 );
717 let stmt = &stmts[0];
718
719 let plan = match stmt {
721 Statement::Tql(_) => return Ok(FlowType::Batching),
723 _ => engine
724 .planner()
725 .plan(&QueryStatement::Sql(stmt.clone()), query_ctx)
726 .await
727 .map_err(BoxedError::new)
728 .context(ExternalSnafu)?,
729 };
730
731 struct FindAggr {
733 is_aggr: bool,
734 }
735
736 impl TreeNodeVisitor<'_> for FindAggr {
737 type Node = LogicalPlan;
738 fn f_down(
739 &mut self,
740 node: &Self::Node,
741 ) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
742 {
743 match node {
744 LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
745 self.is_aggr = true;
746 return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
747 }
748 _ => (),
749 }
750 Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
751 }
752 }
753
754 let mut find_aggr = FindAggr { is_aggr: false };
755
756 plan.visit_with_subqueries(&mut find_aggr)
757 .context(BuildDfLogicalPlanSnafu)?;
758 if find_aggr.is_aggr {
759 Ok(FlowType::Batching)
760 } else {
761 Ok(FlowType::Streaming)
762 }
763 }
764
765 #[tracing::instrument(skip_all)]
766 pub async fn create_view(
767 &self,
768 create_view: CreateView,
769 ctx: QueryContextRef,
770 ) -> Result<TableRef> {
771 let logical_plan = match &*create_view.query {
773 Statement::Query(query) => {
774 self.plan(
775 &QueryStatement::Sql(Statement::Query(query.clone())),
776 ctx.clone(),
777 )
778 .await?
779 }
780 Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?,
781 _ => {
782 return InvalidViewStmtSnafu {}.fail();
783 }
784 };
785 let definition = create_view.to_string();
787
788 let schema: Schema = logical_plan
791 .schema()
792 .clone()
793 .try_into()
794 .context(ConvertSchemaSnafu)?;
795 let plan_columns: Vec<_> = schema
796 .column_schemas()
797 .iter()
798 .map(|c| c.name.clone())
799 .collect();
800
801 let columns: Vec<_> = create_view
802 .columns
803 .iter()
804 .map(|ident| ident.to_string())
805 .collect();
806
807 if !columns.is_empty() {
809 ensure!(
810 columns.len() == plan_columns.len(),
811 error::ViewColumnsMismatchSnafu {
812 view_name: create_view.name.to_string(),
813 expected: plan_columns.len(),
814 actual: columns.len(),
815 }
816 );
817 }
818
819 let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
822 .context(ExtractTableNamesSnafu)?;
823
824 let table_names = table_names.into_iter().map(|t| t.into()).collect();
825
826 let encoded_plan = DFLogicalSubstraitConvertor
833 .encode(&plan, DefaultSerializer)
834 .context(SubstraitCodecSnafu)?;
835
836 let expr = expr_helper::to_create_view_expr(
837 create_view,
838 encoded_plan.to_vec(),
839 table_names,
840 columns,
841 plan_columns,
842 definition,
843 ctx.clone(),
844 )?;
845
846 self.create_view_by_expr(expr, ctx).await
848 }
849
850 pub async fn create_view_by_expr(
851 &self,
852 expr: CreateViewExpr,
853 ctx: QueryContextRef,
854 ) -> Result<TableRef> {
855 ensure! {
856 !(expr.create_if_not_exists & expr.or_replace),
857 InvalidSqlSnafu {
858 err_msg: "syntax error Create Or Replace and If Not Exist cannot be used together",
859 }
860 };
861 let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
862
863 let schema_exists = self
864 .table_metadata_manager
865 .schema_manager()
866 .exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name))
867 .await
868 .context(TableMetadataManagerSnafu)?;
869
870 ensure!(
871 schema_exists,
872 SchemaNotFoundSnafu {
873 schema_info: &expr.schema_name,
874 }
875 );
876
877 if let Some(table) = self
879 .catalog_manager
880 .table(
881 &expr.catalog_name,
882 &expr.schema_name,
883 &expr.view_name,
884 Some(&ctx),
885 )
886 .await
887 .context(CatalogSnafu)?
888 {
889 let table_type = table.table_info().table_type;
890
891 match (table_type, expr.create_if_not_exists, expr.or_replace) {
892 (TableType::View, true, false) => {
893 return Ok(table);
894 }
895 (TableType::View, false, false) => {
896 return ViewAlreadyExistsSnafu {
897 name: format_full_table_name(
898 &expr.catalog_name,
899 &expr.schema_name,
900 &expr.view_name,
901 ),
902 }
903 .fail();
904 }
905 (TableType::View, _, true) => {
906 }
908 _ => {
909 return TableAlreadyExistsSnafu {
910 table: format_full_table_name(
911 &expr.catalog_name,
912 &expr.schema_name,
913 &expr.view_name,
914 ),
915 }
916 .fail();
917 }
918 }
919 }
920
921 ensure!(
922 NAME_PATTERN_REG.is_match(&expr.view_name),
923 InvalidViewNameSnafu {
924 name: expr.view_name.clone(),
925 }
926 );
927
928 let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
929
930 let mut view_info = TableInfo {
931 ident: metadata::TableIdent {
932 table_id: 0,
934 version: 0,
935 },
936 name: expr.view_name.clone(),
937 desc: None,
938 catalog_name: expr.catalog_name.clone(),
939 schema_name: expr.schema_name.clone(),
940 meta: TableMeta::empty(),
942 table_type: TableType::View,
943 };
944
945 let request = SubmitDdlTaskRequest::new(
946 to_meta_query_context(ctx),
947 DdlTask::new_create_view(expr, view_info.clone()),
948 );
949
950 let resp = self
951 .procedure_executor
952 .submit_ddl_task(&ExecutorContext::default(), request)
953 .await
954 .context(error::ExecuteDdlSnafu)?;
955
956 debug!(
957 "Submit creating view '{view_name}' task response: {:?}",
958 resp
959 );
960
961 let view_id = resp
962 .table_ids
963 .into_iter()
964 .next()
965 .context(error::UnexpectedSnafu {
966 violated: "expected table_id",
967 })?;
968 info!("Successfully created view '{view_name}' with view id {view_id}");
969
970 view_info.ident.table_id = view_id;
971
972 let view_info = Arc::new(view_info);
973
974 let table = DistTable::table(view_info);
975
976 self.cache_invalidator
978 .invalidate(
979 &Context::default(),
980 &[
981 CacheIdent::TableId(view_id),
982 CacheIdent::TableName(view_name.clone()),
983 ],
984 )
985 .await
986 .context(error::InvalidateTableCacheSnafu)?;
987
988 Ok(table)
989 }
990
991 #[tracing::instrument(skip_all)]
992 pub async fn drop_flow(
993 &self,
994 catalog_name: String,
995 flow_name: String,
996 drop_if_exists: bool,
997 query_context: QueryContextRef,
998 ) -> Result<Output> {
999 if let Some(flow) = self
1000 .flow_metadata_manager
1001 .flow_name_manager()
1002 .get(&catalog_name, &flow_name)
1003 .await
1004 .context(error::TableMetadataManagerSnafu)?
1005 {
1006 let flow_id = flow.flow_id();
1007 let task = DropFlowTask {
1008 catalog_name,
1009 flow_name,
1010 flow_id,
1011 drop_if_exists,
1012 };
1013 self.drop_flow_procedure(task, query_context).await?;
1014
1015 Ok(Output::new_with_affected_rows(0))
1016 } else if drop_if_exists {
1017 Ok(Output::new_with_affected_rows(0))
1018 } else {
1019 FlowNotFoundSnafu {
1020 flow_name: format_full_flow_name(&catalog_name, &flow_name),
1021 }
1022 .fail()
1023 }
1024 }
1025
1026 async fn drop_flow_procedure(
1027 &self,
1028 expr: DropFlowTask,
1029 query_context: QueryContextRef,
1030 ) -> Result<SubmitDdlTaskResponse> {
1031 let request = SubmitDdlTaskRequest::new(
1032 to_meta_query_context(query_context),
1033 DdlTask::new_drop_flow(expr),
1034 );
1035
1036 self.procedure_executor
1037 .submit_ddl_task(&ExecutorContext::default(), request)
1038 .await
1039 .context(error::ExecuteDdlSnafu)
1040 }
1041
1042 #[cfg(feature = "enterprise")]
1043 #[tracing::instrument(skip_all)]
1044 pub(super) async fn drop_trigger(
1045 &self,
1046 catalog_name: String,
1047 trigger_name: String,
1048 drop_if_exists: bool,
1049 query_context: QueryContextRef,
1050 ) -> Result<Output> {
1051 let task = DropTriggerTask {
1052 catalog_name,
1053 trigger_name,
1054 drop_if_exists,
1055 };
1056 self.drop_trigger_procedure(task, query_context).await?;
1057 Ok(Output::new_with_affected_rows(0))
1058 }
1059
1060 #[cfg(feature = "enterprise")]
1061 async fn drop_trigger_procedure(
1062 &self,
1063 expr: DropTriggerTask,
1064 query_context: QueryContextRef,
1065 ) -> Result<SubmitDdlTaskResponse> {
1066 let request = SubmitDdlTaskRequest::new(
1067 to_meta_query_context(query_context),
1068 DdlTask::new_drop_trigger(expr),
1069 );
1070
1071 self.procedure_executor
1072 .submit_ddl_task(&ExecutorContext::default(), request)
1073 .await
1074 .context(error::ExecuteDdlSnafu)
1075 }
1076
1077 #[tracing::instrument(skip_all)]
1079 pub(crate) async fn drop_view(
1080 &self,
1081 catalog: String,
1082 schema: String,
1083 view: String,
1084 drop_if_exists: bool,
1085 query_context: QueryContextRef,
1086 ) -> Result<Output> {
1087 let view_info = if let Some(view) = self
1088 .catalog_manager
1089 .table(&catalog, &schema, &view, None)
1090 .await
1091 .context(CatalogSnafu)?
1092 {
1093 view.table_info()
1094 } else if drop_if_exists {
1095 return Ok(Output::new_with_affected_rows(0));
1097 } else {
1098 return TableNotFoundSnafu {
1099 table_name: format_full_table_name(&catalog, &schema, &view),
1100 }
1101 .fail();
1102 };
1103
1104 ensure!(
1106 view_info.table_type == TableType::View,
1107 error::InvalidViewSnafu {
1108 msg: "not a view",
1109 view_name: format_full_table_name(&catalog, &schema, &view),
1110 }
1111 );
1112
1113 let view_id = view_info.table_id();
1114
1115 let task = DropViewTask {
1116 catalog,
1117 schema,
1118 view,
1119 view_id,
1120 drop_if_exists,
1121 };
1122
1123 self.drop_view_procedure(task, query_context).await?;
1124
1125 Ok(Output::new_with_affected_rows(0))
1126 }
1127
1128 async fn drop_view_procedure(
1130 &self,
1131 expr: DropViewTask,
1132 query_context: QueryContextRef,
1133 ) -> Result<SubmitDdlTaskResponse> {
1134 let request = SubmitDdlTaskRequest::new(
1135 to_meta_query_context(query_context),
1136 DdlTask::new_drop_view(expr),
1137 );
1138
1139 self.procedure_executor
1140 .submit_ddl_task(&ExecutorContext::default(), request)
1141 .await
1142 .context(error::ExecuteDdlSnafu)
1143 }
1144
1145 #[tracing::instrument(skip_all)]
1146 pub async fn alter_logical_tables(
1147 &self,
1148 alter_table_exprs: Vec<AlterTableExpr>,
1149 query_context: QueryContextRef,
1150 ) -> Result<Output> {
1151 let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
1152 ensure!(
1153 !alter_table_exprs.is_empty(),
1154 EmptyDdlExprSnafu {
1155 name: "alter logical tables"
1156 }
1157 );
1158
1159 let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
1161 for expr in alter_table_exprs {
1162 let catalog = if expr.catalog_name.is_empty() {
1164 query_context.current_catalog()
1165 } else {
1166 &expr.catalog_name
1167 };
1168 let schema = if expr.schema_name.is_empty() {
1169 query_context.current_schema()
1170 } else {
1171 expr.schema_name.clone()
1172 };
1173 let table_name = &expr.table_name;
1174 let table = self
1175 .catalog_manager
1176 .table(catalog, &schema, table_name, Some(&query_context))
1177 .await
1178 .context(CatalogSnafu)?
1179 .with_context(|| TableNotFoundSnafu {
1180 table_name: format_full_table_name(catalog, &schema, table_name),
1181 })?;
1182 let table_id = table.table_info().ident.table_id;
1183 let physical_table_id = self
1184 .table_metadata_manager
1185 .table_route_manager()
1186 .get_physical_table_id(table_id)
1187 .await
1188 .context(TableMetadataManagerSnafu)?;
1189 groups.entry(physical_table_id).or_default().push(expr);
1190 }
1191
1192 let mut handles = Vec::with_capacity(groups.len());
1194 for (_physical_table_id, exprs) in groups {
1195 let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
1196 handles.push(fut);
1197 }
1198 let _results = futures::future::try_join_all(handles).await?;
1199
1200 Ok(Output::new_with_affected_rows(0))
1201 }
1202
1203 #[tracing::instrument(skip_all)]
1204 pub async fn drop_table(
1205 &self,
1206 table_name: TableName,
1207 drop_if_exists: bool,
1208 query_context: QueryContextRef,
1209 ) -> Result<Output> {
1210 self.drop_tables(&[table_name], drop_if_exists, query_context)
1212 .await
1213 }
1214
1215 #[tracing::instrument(skip_all)]
1216 pub async fn drop_tables(
1217 &self,
1218 table_names: &[TableName],
1219 drop_if_exists: bool,
1220 query_context: QueryContextRef,
1221 ) -> Result<Output> {
1222 let mut tables = Vec::with_capacity(table_names.len());
1223 for table_name in table_names {
1224 ensure!(
1225 !is_readonly_schema(&table_name.schema_name),
1226 SchemaReadOnlySnafu {
1227 name: table_name.schema_name.clone()
1228 }
1229 );
1230
1231 if let Some(table) = self
1232 .catalog_manager
1233 .table(
1234 &table_name.catalog_name,
1235 &table_name.schema_name,
1236 &table_name.table_name,
1237 Some(&query_context),
1238 )
1239 .await
1240 .context(CatalogSnafu)?
1241 {
1242 tables.push(table.table_info().table_id());
1243 } else if drop_if_exists {
1244 continue;
1246 } else {
1247 return TableNotFoundSnafu {
1248 table_name: table_name.to_string(),
1249 }
1250 .fail();
1251 }
1252 }
1253
1254 for (table_name, table_id) in table_names.iter().zip(tables.into_iter()) {
1255 self.drop_table_procedure(table_name, table_id, drop_if_exists, query_context.clone())
1256 .await?;
1257
1258 self.cache_invalidator
1260 .invalidate(
1261 &Context::default(),
1262 &[
1263 CacheIdent::TableId(table_id),
1264 CacheIdent::TableName(table_name.clone()),
1265 ],
1266 )
1267 .await
1268 .context(error::InvalidateTableCacheSnafu)?;
1269 }
1270 Ok(Output::new_with_affected_rows(0))
1271 }
1272
1273 #[tracing::instrument(skip_all)]
1274 pub async fn drop_database(
1275 &self,
1276 catalog: String,
1277 schema: String,
1278 drop_if_exists: bool,
1279 query_context: QueryContextRef,
1280 ) -> Result<Output> {
1281 ensure!(
1282 !is_readonly_schema(&schema),
1283 SchemaReadOnlySnafu { name: schema }
1284 );
1285
1286 if self
1287 .catalog_manager
1288 .schema_exists(&catalog, &schema, None)
1289 .await
1290 .context(CatalogSnafu)?
1291 {
1292 if schema == query_context.current_schema() {
1293 SchemaInUseSnafu { name: schema }.fail()
1294 } else {
1295 self.drop_database_procedure(catalog, schema, drop_if_exists, query_context)
1296 .await?;
1297
1298 Ok(Output::new_with_affected_rows(0))
1299 }
1300 } else if drop_if_exists {
1301 Ok(Output::new_with_affected_rows(0))
1303 } else {
1304 SchemaNotFoundSnafu {
1305 schema_info: schema,
1306 }
1307 .fail()
1308 }
1309 }
1310
1311 #[tracing::instrument(skip_all)]
1312 pub async fn truncate_table(
1313 &self,
1314 table_name: TableName,
1315 time_ranges: Vec<(Timestamp, Timestamp)>,
1316 query_context: QueryContextRef,
1317 ) -> Result<Output> {
1318 ensure!(
1319 !is_readonly_schema(&table_name.schema_name),
1320 SchemaReadOnlySnafu {
1321 name: table_name.schema_name.clone()
1322 }
1323 );
1324
1325 let table = self
1326 .catalog_manager
1327 .table(
1328 &table_name.catalog_name,
1329 &table_name.schema_name,
1330 &table_name.table_name,
1331 Some(&query_context),
1332 )
1333 .await
1334 .context(CatalogSnafu)?
1335 .with_context(|| TableNotFoundSnafu {
1336 table_name: table_name.to_string(),
1337 })?;
1338 let table_id = table.table_info().table_id();
1339 self.truncate_table_procedure(&table_name, table_id, time_ranges, query_context)
1340 .await?;
1341
1342 Ok(Output::new_with_affected_rows(0))
1343 }
1344
1345 #[tracing::instrument(skip_all)]
1346 pub async fn alter_table(
1347 &self,
1348 alter_table: AlterTable,
1349 query_context: QueryContextRef,
1350 ) -> Result<Output> {
1351 if matches!(
1352 alter_table.alter_operation(),
1353 AlterTableOperation::Repartition { .. }
1354 ) {
1355 let request = expr_helper::to_repartition_request(alter_table, &query_context)?;
1356 return self.repartition_table(request, &query_context).await;
1357 }
1358
1359 let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
1360 self.alter_table_inner(expr, query_context).await
1361 }
1362
1363 #[tracing::instrument(skip_all)]
1364 pub async fn repartition_table(
1365 &self,
1366 request: RepartitionRequest,
1367 query_context: &QueryContextRef,
1368 ) -> Result<Output> {
1369 ensure!(
1371 !is_readonly_schema(&request.schema_name),
1372 SchemaReadOnlySnafu {
1373 name: request.schema_name.clone()
1374 }
1375 );
1376
1377 let table_ref = TableReference::full(
1378 &request.catalog_name,
1379 &request.schema_name,
1380 &request.table_name,
1381 );
1382 let table = self
1384 .catalog_manager
1385 .table(
1386 &request.catalog_name,
1387 &request.schema_name,
1388 &request.table_name,
1389 Some(query_context),
1390 )
1391 .await
1392 .context(CatalogSnafu)?
1393 .with_context(|| TableNotFoundSnafu {
1394 table_name: table_ref.to_string(),
1395 })?;
1396 let table_id = table.table_info().ident.table_id;
1397 let (physical_table_id, physical_table_route) = self
1399 .table_metadata_manager
1400 .table_route_manager()
1401 .get_physical_table_route(table_id)
1402 .await
1403 .context(TableMetadataManagerSnafu)?;
1404
1405 ensure!(
1406 physical_table_id == table_id,
1407 NotSupportedSnafu {
1408 feat: "REPARTITION on logical tables"
1409 }
1410 );
1411
1412 let table_info = table.table_info();
1413 let existing_partition_columns = table_info.meta.partition_columns().collect::<Vec<_>>();
1415 ensure!(
1417 !existing_partition_columns.is_empty(),
1418 InvalidPartitionRuleSnafu {
1419 reason: format!(
1420 "table {} does not have partition columns, cannot repartition",
1421 table_ref
1422 )
1423 }
1424 );
1425
1426 let column_name_and_type = existing_partition_columns
1429 .iter()
1430 .map(|column| (&column.name, column.data_type.clone()))
1431 .collect();
1432 let timezone = query_context.timezone();
1433 let from_partition_exprs = request
1435 .from_exprs
1436 .iter()
1437 .map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone))
1438 .collect::<Result<Vec<_>>>()?;
1439
1440 let mut into_partition_exprs = request
1441 .into_exprs
1442 .iter()
1443 .map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone))
1444 .collect::<Result<Vec<_>>>()?;
1445
1446 if from_partition_exprs.len() > 1
1449 && into_partition_exprs.len() == 1
1450 && let Some(expr) = into_partition_exprs.pop()
1451 {
1452 into_partition_exprs.push(partition::simplify::simplify_merged_partition_expr(expr));
1453 }
1454
1455 let mut existing_partition_exprs =
1457 Vec::with_capacity(physical_table_route.region_routes.len());
1458 for route in &physical_table_route.region_routes {
1459 let expr_json = route.region.partition_expr();
1460 if !expr_json.is_empty() {
1461 match PartitionExpr::from_json_str(&expr_json) {
1462 Ok(Some(expr)) => existing_partition_exprs.push(expr),
1463 Ok(None) => {
1464 }
1466 Err(e) => {
1467 return Err(e).context(DeserializePartitionExprSnafu);
1468 }
1469 }
1470 }
1471 }
1472
1473 for from_expr in &from_partition_exprs {
1476 ensure!(
1477 existing_partition_exprs.contains(from_expr),
1478 InvalidPartitionRuleSnafu {
1479 reason: format!(
1480 "partition expression '{}' does not exist in table {}",
1481 from_expr, table_ref
1482 )
1483 }
1484 );
1485 }
1486
1487 let new_partition_exprs: Vec<PartitionExpr> = existing_partition_exprs
1490 .into_iter()
1491 .filter(|expr| !from_partition_exprs.contains(expr))
1492 .chain(into_partition_exprs.clone().into_iter())
1493 .collect();
1494 let new_partition_exprs_len = new_partition_exprs.len();
1495 let from_partition_exprs_len = from_partition_exprs.len();
1496
1497 let _ = MultiDimPartitionRule::try_new(
1499 existing_partition_columns
1500 .iter()
1501 .map(|c| c.name.clone())
1502 .collect(),
1503 vec![],
1504 new_partition_exprs,
1505 true,
1506 )
1507 .context(InvalidPartitionSnafu)?;
1508
1509 let ddl_options = parse_ddl_options(&request.options)?;
1510 let serialize_exprs = |exprs: Vec<PartitionExpr>| -> Result<Vec<String>> {
1511 let mut json_exprs = Vec::with_capacity(exprs.len());
1512 for expr in exprs {
1513 json_exprs.push(expr.as_json_str().context(SerializePartitionExprSnafu)?);
1514 }
1515 Ok(json_exprs)
1516 };
1517 let from_partition_exprs_json = serialize_exprs(from_partition_exprs)?;
1518 let into_partition_exprs_json = serialize_exprs(into_partition_exprs)?;
1519 let mut req = SubmitDdlTaskRequest::new(
1520 to_meta_query_context(query_context.clone()),
1521 DdlTask::new_alter_table(AlterTableExpr {
1522 catalog_name: request.catalog_name.clone(),
1523 schema_name: request.schema_name.clone(),
1524 table_name: request.table_name.clone(),
1525 kind: Some(Kind::Repartition(Repartition {
1526 from_partition_exprs: from_partition_exprs_json,
1527 into_partition_exprs: into_partition_exprs_json,
1528 })),
1529 }),
1530 );
1531 req.wait = ddl_options.wait;
1532 req.timeout = ddl_options.timeout;
1533
1534 info!(
1535 "Submitting repartition task for table {} (table_id={}), from {} to {} partitions, timeout: {:?}, wait: {}",
1536 table_ref,
1537 table_id,
1538 from_partition_exprs_len,
1539 new_partition_exprs_len,
1540 ddl_options.timeout,
1541 ddl_options.wait
1542 );
1543
1544 let response = self
1545 .procedure_executor
1546 .submit_ddl_task(&ExecutorContext::default(), req)
1547 .await
1548 .context(error::ExecuteDdlSnafu)?;
1549
1550 if !ddl_options.wait {
1551 return build_procedure_id_output(response.key);
1552 }
1553
1554 let invalidate_keys = vec![
1556 CacheIdent::TableId(table_id),
1557 CacheIdent::TableName(TableName::new(
1558 request.catalog_name,
1559 request.schema_name,
1560 request.table_name,
1561 )),
1562 ];
1563
1564 self.cache_invalidator
1566 .invalidate(&Context::default(), &invalidate_keys)
1567 .await
1568 .context(error::InvalidateTableCacheSnafu)?;
1569
1570 Ok(Output::new_with_affected_rows(0))
1571 }
1572
1573 #[tracing::instrument(skip_all)]
1574 pub async fn alter_table_inner(
1575 &self,
1576 expr: AlterTableExpr,
1577 query_context: QueryContextRef,
1578 ) -> Result<Output> {
1579 ensure!(
1580 !is_readonly_schema(&expr.schema_name),
1581 SchemaReadOnlySnafu {
1582 name: expr.schema_name.clone()
1583 }
1584 );
1585
1586 let catalog_name = if expr.catalog_name.is_empty() {
1587 DEFAULT_CATALOG_NAME.to_string()
1588 } else {
1589 expr.catalog_name.clone()
1590 };
1591
1592 let schema_name = if expr.schema_name.is_empty() {
1593 DEFAULT_SCHEMA_NAME.to_string()
1594 } else {
1595 expr.schema_name.clone()
1596 };
1597
1598 let table_name = expr.table_name.clone();
1599
1600 let table = self
1601 .catalog_manager
1602 .table(
1603 &catalog_name,
1604 &schema_name,
1605 &table_name,
1606 Some(&query_context),
1607 )
1608 .await
1609 .context(CatalogSnafu)?
1610 .with_context(|| TableNotFoundSnafu {
1611 table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
1612 })?;
1613
1614 let table_id = table.table_info().ident.table_id;
1615 let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?;
1616 if !need_alter {
1617 return Ok(Output::new_with_affected_rows(0));
1618 }
1619 info!(
1620 "Table info before alter is {:?}, expr: {:?}",
1621 table.table_info(),
1622 expr
1623 );
1624
1625 let physical_table_id = self
1626 .table_metadata_manager
1627 .table_route_manager()
1628 .get_physical_table_id(table_id)
1629 .await
1630 .context(TableMetadataManagerSnafu)?;
1631
1632 let (req, invalidate_keys) = if physical_table_id == table_id {
1633 let req = SubmitDdlTaskRequest::new(
1635 to_meta_query_context(query_context),
1636 DdlTask::new_alter_table(expr),
1637 );
1638
1639 let invalidate_keys = vec![
1640 CacheIdent::TableId(table_id),
1641 CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1642 ];
1643
1644 (req, invalidate_keys)
1645 } else {
1646 let req = SubmitDdlTaskRequest::new(
1648 to_meta_query_context(query_context),
1649 DdlTask::new_alter_logical_tables(vec![expr]),
1650 );
1651
1652 let mut invalidate_keys = vec![
1653 CacheIdent::TableId(physical_table_id),
1654 CacheIdent::TableId(table_id),
1655 CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1656 ];
1657
1658 let physical_table = self
1659 .table_metadata_manager
1660 .table_info_manager()
1661 .get(physical_table_id)
1662 .await
1663 .context(TableMetadataManagerSnafu)?
1664 .map(|x| x.into_inner());
1665 if let Some(physical_table) = physical_table {
1666 let physical_table_name = TableName::new(
1667 physical_table.table_info.catalog_name,
1668 physical_table.table_info.schema_name,
1669 physical_table.table_info.name,
1670 );
1671 invalidate_keys.push(CacheIdent::TableName(physical_table_name));
1672 }
1673
1674 (req, invalidate_keys)
1675 };
1676
1677 self.procedure_executor
1678 .submit_ddl_task(&ExecutorContext::default(), req)
1679 .await
1680 .context(error::ExecuteDdlSnafu)?;
1681
1682 self.cache_invalidator
1684 .invalidate(&Context::default(), &invalidate_keys)
1685 .await
1686 .context(error::InvalidateTableCacheSnafu)?;
1687
1688 Ok(Output::new_with_affected_rows(0))
1689 }
1690
1691 #[cfg(feature = "enterprise")]
1692 #[tracing::instrument(skip_all)]
1693 pub async fn alter_trigger(
1694 &self,
1695 _alter_expr: AlterTrigger,
1696 _query_context: QueryContextRef,
1697 ) -> Result<Output> {
1698 crate::error::NotSupportedSnafu {
1699 feat: "alter trigger",
1700 }
1701 .fail()
1702 }
1703
1704 #[tracing::instrument(skip_all)]
1705 pub async fn alter_database(
1706 &self,
1707 alter_expr: AlterDatabase,
1708 query_context: QueryContextRef,
1709 ) -> Result<Output> {
1710 let alter_expr = expr_helper::to_alter_database_expr(alter_expr, &query_context)?;
1711 self.alter_database_inner(alter_expr, query_context).await
1712 }
1713
1714 #[tracing::instrument(skip_all)]
1715 pub async fn alter_database_inner(
1716 &self,
1717 alter_expr: AlterDatabaseExpr,
1718 query_context: QueryContextRef,
1719 ) -> Result<Output> {
1720 ensure!(
1721 !is_readonly_schema(&alter_expr.schema_name),
1722 SchemaReadOnlySnafu {
1723 name: query_context.current_schema().clone()
1724 }
1725 );
1726
1727 let exists = self
1728 .catalog_manager
1729 .schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
1730 .await
1731 .context(CatalogSnafu)?;
1732 ensure!(
1733 exists,
1734 SchemaNotFoundSnafu {
1735 schema_info: alter_expr.schema_name,
1736 }
1737 );
1738
1739 let cache_ident = [CacheIdent::SchemaName(SchemaName {
1740 catalog_name: alter_expr.catalog_name.clone(),
1741 schema_name: alter_expr.schema_name.clone(),
1742 })];
1743
1744 self.alter_database_procedure(alter_expr, query_context)
1745 .await?;
1746
1747 self.cache_invalidator
1749 .invalidate(&Context::default(), &cache_ident)
1750 .await
1751 .context(error::InvalidateTableCacheSnafu)?;
1752
1753 Ok(Output::new_with_affected_rows(0))
1754 }
1755
1756 async fn create_table_procedure(
1757 &self,
1758 create_table: CreateTableExpr,
1759 partitions: Vec<PartitionExpr>,
1760 table_info: TableInfo,
1761 query_context: QueryContextRef,
1762 ) -> Result<SubmitDdlTaskResponse> {
1763 let partitions = partitions
1764 .into_iter()
1765 .map(|expr| expr.as_pb_partition().context(PartitionExprToPbSnafu))
1766 .collect::<Result<Vec<_>>>()?;
1767
1768 let request = SubmitDdlTaskRequest::new(
1769 to_meta_query_context(query_context),
1770 DdlTask::new_create_table(create_table, partitions, table_info),
1771 );
1772
1773 self.procedure_executor
1774 .submit_ddl_task(&ExecutorContext::default(), request)
1775 .await
1776 .context(error::ExecuteDdlSnafu)
1777 }
1778
1779 async fn create_logical_tables_procedure(
1780 &self,
1781 tables_data: Vec<(CreateTableExpr, TableInfo)>,
1782 query_context: QueryContextRef,
1783 ) -> Result<SubmitDdlTaskResponse> {
1784 let request = SubmitDdlTaskRequest::new(
1785 to_meta_query_context(query_context),
1786 DdlTask::new_create_logical_tables(tables_data),
1787 );
1788
1789 self.procedure_executor
1790 .submit_ddl_task(&ExecutorContext::default(), request)
1791 .await
1792 .context(error::ExecuteDdlSnafu)
1793 }
1794
1795 async fn alter_logical_tables_procedure(
1796 &self,
1797 tables_data: Vec<AlterTableExpr>,
1798 query_context: QueryContextRef,
1799 ) -> Result<SubmitDdlTaskResponse> {
1800 let request = SubmitDdlTaskRequest::new(
1801 to_meta_query_context(query_context),
1802 DdlTask::new_alter_logical_tables(tables_data),
1803 );
1804
1805 self.procedure_executor
1806 .submit_ddl_task(&ExecutorContext::default(), request)
1807 .await
1808 .context(error::ExecuteDdlSnafu)
1809 }
1810
1811 async fn drop_table_procedure(
1812 &self,
1813 table_name: &TableName,
1814 table_id: TableId,
1815 drop_if_exists: bool,
1816 query_context: QueryContextRef,
1817 ) -> Result<SubmitDdlTaskResponse> {
1818 let request = SubmitDdlTaskRequest::new(
1819 to_meta_query_context(query_context),
1820 DdlTask::new_drop_table(
1821 table_name.catalog_name.clone(),
1822 table_name.schema_name.clone(),
1823 table_name.table_name.clone(),
1824 table_id,
1825 drop_if_exists,
1826 ),
1827 );
1828
1829 self.procedure_executor
1830 .submit_ddl_task(&ExecutorContext::default(), request)
1831 .await
1832 .context(error::ExecuteDdlSnafu)
1833 }
1834
1835 async fn drop_database_procedure(
1836 &self,
1837 catalog: String,
1838 schema: String,
1839 drop_if_exists: bool,
1840 query_context: QueryContextRef,
1841 ) -> Result<SubmitDdlTaskResponse> {
1842 let request = SubmitDdlTaskRequest::new(
1843 to_meta_query_context(query_context),
1844 DdlTask::new_drop_database(catalog, schema, drop_if_exists),
1845 );
1846
1847 self.procedure_executor
1848 .submit_ddl_task(&ExecutorContext::default(), request)
1849 .await
1850 .context(error::ExecuteDdlSnafu)
1851 }
1852
1853 async fn alter_database_procedure(
1854 &self,
1855 alter_expr: AlterDatabaseExpr,
1856 query_context: QueryContextRef,
1857 ) -> Result<SubmitDdlTaskResponse> {
1858 let request = SubmitDdlTaskRequest::new(
1859 to_meta_query_context(query_context),
1860 DdlTask::new_alter_database(alter_expr),
1861 );
1862
1863 self.procedure_executor
1864 .submit_ddl_task(&ExecutorContext::default(), request)
1865 .await
1866 .context(error::ExecuteDdlSnafu)
1867 }
1868
1869 async fn truncate_table_procedure(
1870 &self,
1871 table_name: &TableName,
1872 table_id: TableId,
1873 time_ranges: Vec<(Timestamp, Timestamp)>,
1874 query_context: QueryContextRef,
1875 ) -> Result<SubmitDdlTaskResponse> {
1876 let request = SubmitDdlTaskRequest::new(
1877 to_meta_query_context(query_context),
1878 DdlTask::new_truncate_table(
1879 table_name.catalog_name.clone(),
1880 table_name.schema_name.clone(),
1881 table_name.table_name.clone(),
1882 table_id,
1883 time_ranges,
1884 ),
1885 );
1886
1887 self.procedure_executor
1888 .submit_ddl_task(&ExecutorContext::default(), request)
1889 .await
1890 .context(error::ExecuteDdlSnafu)
1891 }
1892
1893 #[tracing::instrument(skip_all)]
1894 pub async fn create_database(
1895 &self,
1896 database: &str,
1897 create_if_not_exists: bool,
1898 options: HashMap<String, String>,
1899 query_context: QueryContextRef,
1900 ) -> Result<Output> {
1901 let catalog = query_context.current_catalog();
1902 ensure!(
1903 NAME_PATTERN_REG.is_match(catalog),
1904 error::UnexpectedSnafu {
1905 violated: format!("Invalid catalog name: {}", catalog)
1906 }
1907 );
1908
1909 ensure!(
1910 NAME_PATTERN_REG.is_match(database),
1911 error::UnexpectedSnafu {
1912 violated: format!("Invalid database name: {}", database)
1913 }
1914 );
1915
1916 if !self
1917 .catalog_manager
1918 .schema_exists(catalog, database, None)
1919 .await
1920 .context(CatalogSnafu)?
1921 && !self.catalog_manager.is_reserved_schema_name(database)
1922 {
1923 self.create_database_procedure(
1924 catalog.to_string(),
1925 database.to_string(),
1926 create_if_not_exists,
1927 options,
1928 query_context,
1929 )
1930 .await?;
1931
1932 Ok(Output::new_with_affected_rows(1))
1933 } else if create_if_not_exists {
1934 Ok(Output::new_with_affected_rows(1))
1935 } else {
1936 error::SchemaExistsSnafu { name: database }.fail()
1937 }
1938 }
1939
1940 async fn create_database_procedure(
1941 &self,
1942 catalog: String,
1943 database: String,
1944 create_if_not_exists: bool,
1945 options: HashMap<String, String>,
1946 query_context: QueryContextRef,
1947 ) -> Result<SubmitDdlTaskResponse> {
1948 let request = SubmitDdlTaskRequest::new(
1949 to_meta_query_context(query_context),
1950 DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
1951 );
1952
1953 self.procedure_executor
1954 .submit_ddl_task(&ExecutorContext::default(), request)
1955 .await
1956 .context(error::ExecuteDdlSnafu)
1957 }
1958}
1959
1960pub fn parse_partitions(
1962 create_table: &CreateTableExpr,
1963 partitions: Option<Partitions>,
1964 query_ctx: &QueryContextRef,
1965) -> Result<(Vec<PartitionExpr>, Vec<String>)> {
1966 let partition_columns = find_partition_columns(&partitions)?;
1969 let partition_exprs =
1970 find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
1971
1972 let exprs = partition_exprs.clone();
1974 MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true)
1975 .context(InvalidPartitionSnafu)?;
1976
1977 Ok((partition_exprs, partition_columns))
1978}
1979
1980fn parse_partitions_for_logical_validation(
1981 create_table: &CreateTableExpr,
1982 partitions: &Partitions,
1983 query_ctx: &QueryContextRef,
1984) -> Result<(Vec<String>, Vec<PartitionExpr>)> {
1985 let partition_columns = partitions
1986 .column_list
1987 .iter()
1988 .map(|ident| ident.value.clone())
1989 .collect::<Vec<_>>();
1990
1991 let column_name_and_type = partition_columns
1992 .iter()
1993 .map(|pc| {
1994 let column = create_table
1995 .column_defs
1996 .iter()
1997 .find(|c| &c.name == pc)
1998 .context(ColumnNotFoundSnafu { msg: pc.clone() })?;
1999 let column_name = &column.name;
2000 let data_type = ConcreteDataType::from(
2001 ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
2002 .context(ColumnDataTypeSnafu)?,
2003 );
2004 Ok((column_name, data_type))
2005 })
2006 .collect::<Result<HashMap<_, _>>>()?;
2007
2008 let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
2009 for expr in &partitions.exprs {
2010 let partition_expr = convert_one_expr(expr, &column_name_and_type, &query_ctx.timezone())?;
2011 partition_exprs.push(partition_expr);
2012 }
2013
2014 MultiDimPartitionRule::try_new(
2015 partition_columns.clone(),
2016 vec![],
2017 partition_exprs.clone(),
2018 true,
2019 )
2020 .context(InvalidPartitionSnafu)?;
2021
2022 Ok((partition_columns, partition_exprs))
2023}
2024
2025pub fn verify_alter(
2031 table_id: TableId,
2032 table_info: Arc<TableInfo>,
2033 expr: AlterTableExpr,
2034) -> Result<bool> {
2035 let request: AlterTableRequest =
2036 common_grpc_expr::alter_expr_to_request(table_id, expr, Some(&table_info.meta))
2037 .context(AlterExprToRequestSnafu)?;
2038
2039 let AlterTableRequest {
2040 table_name,
2041 alter_kind,
2042 ..
2043 } = &request;
2044
2045 if let AlterKind::RenameTable { new_table_name } = alter_kind {
2046 ensure!(
2047 NAME_PATTERN_REG.is_match(new_table_name),
2048 error::UnexpectedSnafu {
2049 violated: format!("Invalid table name: {}", new_table_name)
2050 }
2051 );
2052 } else if let AlterKind::AddColumns { columns } = alter_kind {
2053 let column_names: HashSet<_> = table_info
2056 .meta
2057 .schema
2058 .column_schemas()
2059 .iter()
2060 .map(|schema| &schema.name)
2061 .collect();
2062 if columns.iter().all(|column| {
2063 column_names.contains(&column.column_schema.name) && column.add_if_not_exists
2064 }) {
2065 return Ok(false);
2066 }
2067 }
2068
2069 let _ = table_info
2070 .meta
2071 .builder_with_alter_kind(table_name, &request.alter_kind)
2072 .context(error::TableSnafu)?
2073 .build()
2074 .context(error::BuildTableMetaSnafu { table_name })?;
2075
2076 Ok(true)
2077}
2078
2079pub fn create_table_info(
2080 create_table: &CreateTableExpr,
2081 partition_columns: Vec<String>,
2082) -> Result<TableInfo> {
2083 let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
2084 let mut column_name_to_index_map = HashMap::new();
2085
2086 for (idx, column) in create_table.column_defs.iter().enumerate() {
2087 let schema =
2088 column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
2089 column: &column.name,
2090 })?;
2091 let schema = schema.with_time_index(column.name == create_table.time_index);
2092
2093 column_schemas.push(schema);
2094 let _ = column_name_to_index_map.insert(column.name.clone(), idx);
2095 }
2096
2097 let next_column_id = column_schemas.len() as u32;
2098 let schema = Arc::new(Schema::new(column_schemas));
2099
2100 let primary_key_indices = create_table
2101 .primary_keys
2102 .iter()
2103 .map(|name| {
2104 column_name_to_index_map
2105 .get(name)
2106 .cloned()
2107 .context(ColumnNotFoundSnafu { msg: name })
2108 })
2109 .collect::<Result<Vec<_>>>()?;
2110
2111 let partition_key_indices = partition_columns
2112 .into_iter()
2113 .map(|col_name| {
2114 column_name_to_index_map
2115 .get(&col_name)
2116 .cloned()
2117 .context(ColumnNotFoundSnafu { msg: col_name })
2118 })
2119 .collect::<Result<Vec<_>>>()?;
2120
2121 let table_options = TableOptions::try_from_iter(&create_table.table_options)
2122 .context(UnrecognizedTableOptionSnafu)?;
2123
2124 let meta = TableMeta {
2125 schema,
2126 primary_key_indices,
2127 value_indices: vec![],
2128 engine: create_table.engine.clone(),
2129 next_column_id,
2130 options: table_options,
2131 created_on: Utc::now(),
2132 updated_on: Utc::now(),
2133 partition_key_indices,
2134 column_ids: vec![],
2135 };
2136
2137 let desc = if create_table.desc.is_empty() {
2138 create_table.table_options.get(COMMENT_KEY).cloned()
2139 } else {
2140 Some(create_table.desc.clone())
2141 };
2142
2143 let table_info = TableInfo {
2144 ident: metadata::TableIdent {
2145 table_id: 0,
2147 version: 0,
2148 },
2149 name: create_table.table_name.clone(),
2150 desc,
2151 catalog_name: create_table.catalog_name.clone(),
2152 schema_name: create_table.schema_name.clone(),
2153 meta,
2154 table_type: TableType::Base,
2155 };
2156 Ok(table_info)
2157}
2158
2159fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
2160 let columns = if let Some(partitions) = partitions {
2161 partitions
2162 .column_list
2163 .iter()
2164 .map(|x| x.value.clone())
2165 .collect::<Vec<_>>()
2166 } else {
2167 vec![]
2168 };
2169 Ok(columns)
2170}
2171
2172fn find_partition_entries(
2176 create_table: &CreateTableExpr,
2177 partitions: &Option<Partitions>,
2178 partition_columns: &[String],
2179 query_ctx: &QueryContextRef,
2180) -> Result<Vec<PartitionExpr>> {
2181 let Some(partitions) = partitions else {
2182 return Ok(vec![]);
2183 };
2184
2185 let column_name_and_type = partition_columns
2187 .iter()
2188 .map(|pc| {
2189 let column = create_table
2190 .column_defs
2191 .iter()
2192 .find(|c| &c.name == pc)
2193 .unwrap();
2195 let column_name = &column.name;
2196 let data_type = ConcreteDataType::from(
2197 ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
2198 .context(ColumnDataTypeSnafu)?,
2199 );
2200 Ok((column_name, data_type))
2201 })
2202 .collect::<Result<HashMap<_, _>>>()?;
2203
2204 let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
2206 for partition in &partitions.exprs {
2207 let partition_expr =
2208 convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
2209 partition_exprs.push(partition_expr);
2210 }
2211
2212 Ok(partition_exprs)
2213}
2214
2215fn convert_one_expr(
2216 expr: &Expr,
2217 column_name_and_type: &HashMap<&String, ConcreteDataType>,
2218 timezone: &Timezone,
2219) -> Result<PartitionExpr> {
2220 let Expr::BinaryOp { left, op, right } = expr else {
2221 return InvalidPartitionRuleSnafu {
2222 reason: "partition rule must be a binary expression",
2223 }
2224 .fail();
2225 };
2226
2227 let op =
2228 RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
2229 reason: format!("unsupported operator in partition expr {op}"),
2230 })?;
2231
2232 let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
2234 (Expr::Identifier(ident), Expr::Value(value)) => {
2236 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2237 let value = convert_value(&value.value, data_type, timezone, None)?;
2238 (Operand::Column(column_name), op, Operand::Value(value))
2239 }
2240 (Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
2241 if let Expr::Value(v) = &**expr =>
2242 {
2243 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2244 let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
2245 (Operand::Column(column_name), op, Operand::Value(value))
2246 }
2247 (Expr::Value(value), Expr::Identifier(ident)) => {
2249 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2250 let value = convert_value(&value.value, data_type, timezone, None)?;
2251 (Operand::Value(value), op, Operand::Column(column_name))
2252 }
2253 (Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
2254 if let Expr::Value(v) = &**expr =>
2255 {
2256 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2257 let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
2258 (Operand::Value(value), op, Operand::Column(column_name))
2259 }
2260 (Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
2261 let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
2263 let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
2264 (Operand::Expr(lhs), op, Operand::Expr(rhs))
2265 }
2266 _ => {
2267 return InvalidPartitionRuleSnafu {
2268 reason: format!("invalid partition expr {expr}"),
2269 }
2270 .fail();
2271 }
2272 };
2273
2274 Ok(PartitionExpr::new(lhs, op, rhs))
2275}
2276
2277fn convert_identifier(
2278 ident: &Ident,
2279 column_name_and_type: &HashMap<&String, ConcreteDataType>,
2280) -> Result<(String, ConcreteDataType)> {
2281 let column_name = ident.value.clone();
2282 let data_type = column_name_and_type
2283 .get(&column_name)
2284 .cloned()
2285 .with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
2286 Ok((column_name, data_type))
2287}
2288
2289fn convert_value(
2290 value: &ParserValue,
2291 data_type: ConcreteDataType,
2292 timezone: &Timezone,
2293 unary_op: Option<UnaryOperator>,
2294) -> Result<Value> {
2295 sql_value_to_value(
2296 &ColumnSchema::new("<NONAME>", data_type, true),
2297 value,
2298 Some(timezone),
2299 unary_op,
2300 false,
2301 )
2302 .context(error::SqlCommonSnafu)
2303}
2304
2305#[cfg(test)]
2306mod test {
2307 use std::time::Duration;
2308
2309 use session::context::{QueryContext, QueryContextBuilder};
2310 use sql::dialect::GreptimeDbDialect;
2311 use sql::parser::{ParseOptions, ParserContext};
2312 use sql::statements::statement::Statement;
2313 use sqlparser::parser::Parser;
2314
2315 use super::*;
2316 use crate::expr_helper;
2317
2318 #[test]
2319 fn test_parse_ddl_options() {
2320 let options = OptionMap::from([
2321 ("timeout".to_string(), "5m".to_string()),
2322 ("wait".to_string(), "false".to_string()),
2323 ]);
2324 let ddl_options = parse_ddl_options(&options).unwrap();
2325 assert!(!ddl_options.wait);
2326 assert_eq!(Duration::from_secs(300), ddl_options.timeout);
2327 }
2328
2329 #[test]
2330 fn test_name_is_match() {
2331 assert!(!NAME_PATTERN_REG.is_match("/adaf"));
2332 assert!(!NAME_PATTERN_REG.is_match("🈲"));
2333 assert!(NAME_PATTERN_REG.is_match("hello"));
2334 assert!(NAME_PATTERN_REG.is_match("test@"));
2335 assert!(!NAME_PATTERN_REG.is_match("@test"));
2336 assert!(NAME_PATTERN_REG.is_match("test#"));
2337 assert!(!NAME_PATTERN_REG.is_match("#test"));
2338 assert!(!NAME_PATTERN_REG.is_match("@"));
2339 assert!(!NAME_PATTERN_REG.is_match("#"));
2340 }
2341
2342 #[test]
2343 fn test_partition_expr_equivalence_with_swapped_operands() {
2344 let column_name = "device_id".to_string();
2345 let column_name_and_type =
2346 HashMap::from([(&column_name, ConcreteDataType::int32_datatype())]);
2347 let timezone = Timezone::from_tz_string("UTC").unwrap();
2348 let dialect = GreptimeDbDialect {};
2349
2350 let mut parser = Parser::new(&dialect)
2351 .try_with_sql("device_id < 100")
2352 .unwrap();
2353 let expr_left = parser.parse_expr().unwrap();
2354
2355 let mut parser = Parser::new(&dialect)
2356 .try_with_sql("100 > device_id")
2357 .unwrap();
2358 let expr_right = parser.parse_expr().unwrap();
2359
2360 let partition_left =
2361 convert_one_expr(&expr_left, &column_name_and_type, &timezone).unwrap();
2362 let partition_right =
2363 convert_one_expr(&expr_right, &column_name_and_type, &timezone).unwrap();
2364
2365 assert_eq!(partition_left, partition_right);
2366 assert!([partition_left.clone()].contains(&partition_right));
2367
2368 let mut physical_partition_exprs = vec![partition_left];
2369 let mut logical_partition_exprs = vec![partition_right];
2370 physical_partition_exprs.sort_unstable();
2371 logical_partition_exprs.sort_unstable();
2372 assert_eq!(physical_partition_exprs, logical_partition_exprs);
2373 }
2374
2375 #[tokio::test]
2376 #[ignore = "TODO(ruihang): WIP new partition rule"]
2377 async fn test_parse_partitions() {
2378 common_telemetry::init_default_ut_logging();
2379 let cases = [
2380 (
2381 r"
2382CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
2383PARTITION ON COLUMNS (b) (
2384 b < 'hz',
2385 b >= 'hz' AND b < 'sh',
2386 b >= 'sh'
2387)
2388ENGINE=mito",
2389 r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
2390 ),
2391 (
2392 r"
2393CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
2394PARTITION BY RANGE COLUMNS (b, a) (
2395 PARTITION r0 VALUES LESS THAN ('hz', 10),
2396 b < 'hz' AND a < 10,
2397 b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20,
2398 b >= 'sh' AND a >= 20
2399)
2400ENGINE=mito",
2401 r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
2402 ),
2403 ];
2404 let ctx = QueryContextBuilder::default().build().into();
2405 for (sql, expected) in cases {
2406 let result = ParserContext::create_with_dialect(
2407 sql,
2408 &GreptimeDbDialect {},
2409 ParseOptions::default(),
2410 )
2411 .unwrap();
2412 match &result[0] {
2413 Statement::CreateTable(c) => {
2414 let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
2415 let (partitions, _) =
2416 parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
2417 let json = serde_json::to_string(&partitions).unwrap();
2418 assert_eq!(json, expected);
2419 }
2420 _ => unreachable!(),
2421 }
2422 }
2423 }
2424}