1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use api::helper::ColumnDataTypeWrapper;
19use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
20use api::v1::{
21 AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, column_def,
22};
23#[cfg(feature = "enterprise")]
24use api::v1::{
25 CreateTriggerExpr as PbCreateTriggerExpr, meta::CreateTriggerTask as PbCreateTriggerTask,
26};
27use catalog::CatalogManagerRef;
28use chrono::Utc;
29use common_base::regex_pattern::NAME_PATTERN_REG;
30use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, is_readonly_schema};
31use common_catalog::{format_full_flow_name, format_full_table_name};
32use common_error::ext::BoxedError;
33use common_meta::cache_invalidator::Context;
34use common_meta::ddl::create_flow::FlowType;
35use common_meta::instruction::CacheIdent;
36use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
37use common_meta::procedure_executor::ExecutorContext;
38#[cfg(feature = "enterprise")]
39use common_meta::rpc::ddl::trigger::CreateTriggerTask;
40#[cfg(feature = "enterprise")]
41use common_meta::rpc::ddl::trigger::DropTriggerTask;
42use common_meta::rpc::ddl::{
43 CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
44 SubmitDdlTaskResponse,
45};
46use common_query::Output;
47use common_sql::convert::sql_value_to_value;
48use common_telemetry::{debug, info, tracing, warn};
49use common_time::{Timestamp, Timezone};
50use datafusion_common::tree_node::TreeNodeVisitor;
51use datafusion_expr::LogicalPlan;
52use datatypes::prelude::ConcreteDataType;
53use datatypes::schema::{RawSchema, Schema};
54use datatypes::value::Value;
55use partition::expr::{Operand, PartitionExpr, RestrictedOp};
56use partition::multi_dim::MultiDimPartitionRule;
57use query::parser::QueryStatement;
58use query::plan::extract_and_rewrite_full_table_names;
59use query::query_engine::DefaultSerializer;
60use query::sql::create_table_stmt;
61use session::context::QueryContextRef;
62use session::table_name::table_idents_to_full_name;
63use snafu::{OptionExt, ResultExt, ensure};
64use sql::parser::{ParseOptions, ParserContext};
65#[cfg(feature = "enterprise")]
66use sql::statements::alter::trigger::AlterTrigger;
67use sql::statements::alter::{AlterDatabase, AlterTable, AlterTableOperation};
68#[cfg(feature = "enterprise")]
69use sql::statements::create::trigger::CreateTrigger;
70use sql::statements::create::{
71 CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
72};
73use sql::statements::statement::Statement;
74use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
75use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
76use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
77use table::TableRef;
78use table::dist_table::DistTable;
79use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
80use table::requests::{AlterKind, AlterTableRequest, COMMENT_KEY, TableOptions};
81use table::table_name::TableName;
82
83use crate::error::{
84 self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
85 ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
86 EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu,
87 InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu, InvalidTableNameSnafu,
88 InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu, PartitionExprToPbSnafu, Result,
89 SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu,
90 TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
91 UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
92};
93use crate::expr_helper;
94use crate::statement::StatementExecutor;
95use crate::statement::show::create_partitions_stmt;
96
97impl StatementExecutor {
98 pub fn catalog_manager(&self) -> CatalogManagerRef {
99 self.catalog_manager.clone()
100 }
101
102 #[tracing::instrument(skip_all)]
103 pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
104 let (catalog, schema, _table) = table_idents_to_full_name(&stmt.name, &ctx)
105 .map_err(BoxedError::new)
106 .context(error::ExternalSnafu)?;
107
108 let schema_options = self
109 .table_metadata_manager
110 .schema_manager()
111 .get(SchemaNameKey {
112 catalog: &catalog,
113 schema: &schema,
114 })
115 .await
116 .context(TableMetadataManagerSnafu)?
117 .map(|v| v.into_inner());
118
119 let create_expr = &mut expr_helper::create_to_expr(&stmt, &ctx)?;
120 if let Some(schema_options) = schema_options {
123 for (key, value) in schema_options.extra_options.iter() {
124 create_expr
125 .table_options
126 .entry(key.clone())
127 .or_insert(value.clone());
128 }
129 }
130
131 self.create_table_inner(create_expr, stmt.partitions, ctx)
132 .await
133 }
134
135 #[tracing::instrument(skip_all)]
136 pub async fn create_table_like(
137 &self,
138 stmt: CreateTableLike,
139 ctx: QueryContextRef,
140 ) -> Result<TableRef> {
141 let (catalog, schema, table) = table_idents_to_full_name(&stmt.source_name, &ctx)
142 .map_err(BoxedError::new)
143 .context(error::ExternalSnafu)?;
144 let table_ref = self
145 .catalog_manager
146 .table(&catalog, &schema, &table, Some(&ctx))
147 .await
148 .context(CatalogSnafu)?
149 .context(TableNotFoundSnafu { table_name: &table })?;
150 let partitions = self
151 .partition_manager
152 .find_table_partitions(table_ref.table_info().table_id())
153 .await
154 .context(error::FindTablePartitionRuleSnafu { table_name: table })?;
155
156 let schema_options = self
158 .table_metadata_manager
159 .schema_manager()
160 .get(SchemaNameKey {
161 catalog: &catalog,
162 schema: &schema,
163 })
164 .await
165 .context(TableMetadataManagerSnafu)?
166 .map(|v| v.into_inner());
167
168 let quote_style = ctx.quote_style();
169 let mut create_stmt =
170 create_table_stmt(&table_ref.table_info(), schema_options, quote_style)
171 .context(error::ParseQuerySnafu)?;
172 create_stmt.name = stmt.table_name;
173 create_stmt.if_not_exists = false;
174
175 let table_info = table_ref.table_info();
176 let partitions =
177 create_partitions_stmt(&table_info, partitions)?.and_then(|mut partitions| {
178 if !partitions.column_list.is_empty() {
179 partitions.set_quote(quote_style);
180 Some(partitions)
181 } else {
182 None
183 }
184 });
185
186 let create_expr = &mut expr_helper::create_to_expr(&create_stmt, &ctx)?;
187 self.create_table_inner(create_expr, partitions, ctx).await
188 }
189
190 #[tracing::instrument(skip_all)]
191 pub async fn create_external_table(
192 &self,
193 create_expr: CreateExternalTable,
194 ctx: QueryContextRef,
195 ) -> Result<TableRef> {
196 let create_expr = &mut expr_helper::create_external_expr(create_expr, &ctx).await?;
197 self.create_table_inner(create_expr, None, ctx).await
198 }
199
200 #[tracing::instrument(skip_all)]
201 pub async fn create_table_inner(
202 &self,
203 create_table: &mut CreateTableExpr,
204 partitions: Option<Partitions>,
205 query_ctx: QueryContextRef,
206 ) -> Result<TableRef> {
207 ensure!(
208 !is_readonly_schema(&create_table.schema_name),
209 SchemaReadOnlySnafu {
210 name: create_table.schema_name.clone()
211 }
212 );
213
214 if create_table.engine == METRIC_ENGINE_NAME
215 && create_table
216 .table_options
217 .contains_key(LOGICAL_TABLE_METADATA_KEY)
218 {
219 if let Some(partitions) = partitions.as_ref()
220 && !partitions.exprs.is_empty()
221 {
222 self.validate_logical_table_partition_rule(create_table, partitions, &query_ctx)
223 .await?;
224 }
225 self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
227 .await?
228 .into_iter()
229 .next()
230 .context(error::UnexpectedSnafu {
231 violated: "expected to create logical tables",
232 })
233 } else {
234 self.create_non_logic_table(create_table, partitions, query_ctx)
236 .await
237 }
238 }
239
240 #[tracing::instrument(skip_all)]
241 pub async fn create_non_logic_table(
242 &self,
243 create_table: &mut CreateTableExpr,
244 partitions: Option<Partitions>,
245 query_ctx: QueryContextRef,
246 ) -> Result<TableRef> {
247 let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
248
249 let schema = self
251 .table_metadata_manager
252 .schema_manager()
253 .get(SchemaNameKey::new(
254 &create_table.catalog_name,
255 &create_table.schema_name,
256 ))
257 .await
258 .context(TableMetadataManagerSnafu)?;
259 ensure!(
260 schema.is_some(),
261 SchemaNotFoundSnafu {
262 schema_info: &create_table.schema_name,
263 }
264 );
265
266 if let Some(table) = self
268 .catalog_manager
269 .table(
270 &create_table.catalog_name,
271 &create_table.schema_name,
272 &create_table.table_name,
273 Some(&query_ctx),
274 )
275 .await
276 .context(CatalogSnafu)?
277 {
278 return if create_table.create_if_not_exists {
279 Ok(table)
280 } else {
281 TableAlreadyExistsSnafu {
282 table: format_full_table_name(
283 &create_table.catalog_name,
284 &create_table.schema_name,
285 &create_table.table_name,
286 ),
287 }
288 .fail()
289 };
290 }
291
292 ensure!(
293 NAME_PATTERN_REG.is_match(&create_table.table_name),
294 InvalidTableNameSnafu {
295 table_name: &create_table.table_name,
296 }
297 );
298
299 let table_name = TableName::new(
300 &create_table.catalog_name,
301 &create_table.schema_name,
302 &create_table.table_name,
303 );
304
305 let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
306 let mut table_info = create_table_info(create_table, partition_cols)?;
307
308 let resp = self
309 .create_table_procedure(
310 create_table.clone(),
311 partitions,
312 table_info.clone(),
313 query_ctx,
314 )
315 .await?;
316
317 let table_id = resp
318 .table_ids
319 .into_iter()
320 .next()
321 .context(error::UnexpectedSnafu {
322 violated: "expected table_id",
323 })?;
324 info!("Successfully created table '{table_name}' with table id {table_id}");
325
326 table_info.ident.table_id = table_id;
327
328 let table_info: Arc<TableInfo> =
329 Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?);
330 create_table.table_id = Some(api::v1::TableId { id: table_id });
331
332 let table = DistTable::table(table_info);
333
334 Ok(table)
335 }
336
337 #[tracing::instrument(skip_all)]
338 pub async fn create_logical_tables(
339 &self,
340 create_table_exprs: &[CreateTableExpr],
341 query_context: QueryContextRef,
342 ) -> Result<Vec<TableRef>> {
343 let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
344 ensure!(
345 !create_table_exprs.is_empty(),
346 EmptyDdlExprSnafu {
347 name: "create logic tables"
348 }
349 );
350
351 for create_table in create_table_exprs {
353 ensure!(
354 NAME_PATTERN_REG.is_match(&create_table.table_name),
355 InvalidTableNameSnafu {
356 table_name: &create_table.table_name,
357 }
358 );
359 }
360
361 let mut raw_tables_info = create_table_exprs
362 .iter()
363 .map(|create| create_table_info(create, vec![]))
364 .collect::<Result<Vec<_>>>()?;
365 let tables_data = create_table_exprs
366 .iter()
367 .cloned()
368 .zip(raw_tables_info.iter().cloned())
369 .collect::<Vec<_>>();
370
371 let resp = self
372 .create_logical_tables_procedure(tables_data, query_context)
373 .await?;
374
375 let table_ids = resp.table_ids;
376 ensure!(
377 table_ids.len() == raw_tables_info.len(),
378 CreateLogicalTablesSnafu {
379 reason: format!(
380 "The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}",
381 raw_tables_info.len(),
382 table_ids.len()
383 )
384 }
385 );
386 info!("Successfully created logical tables: {:?}", table_ids);
387
388 for (i, table_info) in raw_tables_info.iter_mut().enumerate() {
389 table_info.ident.table_id = table_ids[i];
390 }
391 let tables_info = raw_tables_info
392 .into_iter()
393 .map(|x| x.try_into().context(CreateTableInfoSnafu))
394 .collect::<Result<Vec<_>>>()?;
395
396 Ok(tables_info
397 .into_iter()
398 .map(|x| DistTable::table(Arc::new(x)))
399 .collect())
400 }
401
402 async fn validate_logical_table_partition_rule(
403 &self,
404 create_table: &CreateTableExpr,
405 partitions: &Partitions,
406 query_ctx: &QueryContextRef,
407 ) -> Result<()> {
408 let (_, mut logical_partition_exprs) =
409 parse_partitions_for_logical_validation(create_table, partitions, query_ctx)?;
410
411 let physical_table_name = create_table
412 .table_options
413 .get(LOGICAL_TABLE_METADATA_KEY)
414 .with_context(|| CreateLogicalTablesSnafu {
415 reason: format!(
416 "expect `{LOGICAL_TABLE_METADATA_KEY}` option on creating logical table"
417 ),
418 })?;
419
420 let physical_table = self
421 .catalog_manager
422 .table(
423 &create_table.catalog_name,
424 &create_table.schema_name,
425 physical_table_name,
426 Some(query_ctx),
427 )
428 .await
429 .context(CatalogSnafu)?
430 .context(TableNotFoundSnafu {
431 table_name: physical_table_name.clone(),
432 })?;
433
434 let physical_table_info = physical_table.table_info();
435 let partition_rule = self
436 .partition_manager
437 .find_table_partition_rule(&physical_table_info)
438 .await
439 .context(error::FindTablePartitionRuleSnafu {
440 table_name: physical_table_name.clone(),
441 })?;
442
443 let multi_dim_rule = partition_rule
444 .as_ref()
445 .as_any()
446 .downcast_ref::<MultiDimPartitionRule>()
447 .context(InvalidPartitionRuleSnafu {
448 reason: "physical table partition rule is not range-based",
449 })?;
450
451 let mut physical_partition_exprs = multi_dim_rule.exprs().to_vec();
453 logical_partition_exprs.sort_unstable();
454 physical_partition_exprs.sort_unstable();
455
456 ensure!(
457 physical_partition_exprs == logical_partition_exprs,
458 InvalidPartitionRuleSnafu {
459 reason: format!(
460 "logical table partition rule must match the corresponding physical table's\n logical table partition exprs:\t\t {:?}\n physical table partition exprs:\t {:?}",
461 logical_partition_exprs, physical_partition_exprs
462 ),
463 }
464 );
465
466 Ok(())
467 }
468
469 #[cfg(feature = "enterprise")]
470 #[tracing::instrument(skip_all)]
471 pub async fn create_trigger(
472 &self,
473 stmt: CreateTrigger,
474 query_context: QueryContextRef,
475 ) -> Result<Output> {
476 let expr = expr_helper::to_create_trigger_task_expr(stmt, &query_context)?;
477 self.create_trigger_inner(expr, query_context).await
478 }
479
480 #[cfg(feature = "enterprise")]
481 pub async fn create_trigger_inner(
482 &self,
483 expr: PbCreateTriggerExpr,
484 query_context: QueryContextRef,
485 ) -> Result<Output> {
486 self.create_trigger_procedure(expr, query_context).await?;
487 Ok(Output::new_with_affected_rows(0))
488 }
489
490 #[cfg(feature = "enterprise")]
491 async fn create_trigger_procedure(
492 &self,
493 expr: PbCreateTriggerExpr,
494 query_context: QueryContextRef,
495 ) -> Result<SubmitDdlTaskResponse> {
496 let task = CreateTriggerTask::try_from(PbCreateTriggerTask {
497 create_trigger: Some(expr),
498 })
499 .context(error::InvalidExprSnafu)?;
500
501 let request = SubmitDdlTaskRequest {
502 query_context,
503 task: DdlTask::new_create_trigger(task),
504 };
505
506 self.procedure_executor
507 .submit_ddl_task(&ExecutorContext::default(), request)
508 .await
509 .context(error::ExecuteDdlSnafu)
510 }
511
512 #[tracing::instrument(skip_all)]
513 pub async fn create_flow(
514 &self,
515 stmt: CreateFlow,
516 query_context: QueryContextRef,
517 ) -> Result<Output> {
518 let expr = expr_helper::to_create_flow_task_expr(stmt, &query_context)?;
520
521 self.create_flow_inner(expr, query_context).await
522 }
523
524 pub async fn create_flow_inner(
525 &self,
526 expr: CreateFlowExpr,
527 query_context: QueryContextRef,
528 ) -> Result<Output> {
529 self.create_flow_procedure(expr, query_context).await?;
530 Ok(Output::new_with_affected_rows(0))
531 }
532
533 async fn create_flow_procedure(
534 &self,
535 expr: CreateFlowExpr,
536 query_context: QueryContextRef,
537 ) -> Result<SubmitDdlTaskResponse> {
538 let flow_type = self
539 .determine_flow_type(&expr, query_context.clone())
540 .await?;
541 info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
542
543 let expr = {
544 let mut expr = expr;
545 expr.flow_options
546 .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
547 expr
548 };
549
550 let task = CreateFlowTask::try_from(PbCreateFlowTask {
551 create_flow: Some(expr),
552 })
553 .context(error::InvalidExprSnafu)?;
554 let request = SubmitDdlTaskRequest {
555 query_context,
556 task: DdlTask::new_create_flow(task),
557 };
558
559 self.procedure_executor
560 .submit_ddl_task(&ExecutorContext::default(), request)
561 .await
562 .context(error::ExecuteDdlSnafu)
563 }
564
565 async fn determine_flow_type(
569 &self,
570 expr: &CreateFlowExpr,
571 query_ctx: QueryContextRef,
572 ) -> Result<FlowType> {
573 for src_table_name in &expr.source_table_names {
575 let table = self
576 .catalog_manager()
577 .table(
578 &src_table_name.catalog_name,
579 &src_table_name.schema_name,
580 &src_table_name.table_name,
581 Some(&query_ctx),
582 )
583 .await
584 .map_err(BoxedError::new)
585 .context(ExternalSnafu)?
586 .with_context(|| TableNotFoundSnafu {
587 table_name: format_full_table_name(
588 &src_table_name.catalog_name,
589 &src_table_name.schema_name,
590 &src_table_name.table_name,
591 ),
592 })?;
593
594 if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
596 warn!(
597 "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
598 format_full_table_name(
599 &src_table_name.catalog_name,
600 &src_table_name.schema_name,
601 &src_table_name.table_name
602 ),
603 expr.flow_name
604 );
605 return Ok(FlowType::Streaming);
606 }
607 }
608
609 let engine = &self.query_engine;
610 let stmts = ParserContext::create_with_dialect(
611 &expr.sql,
612 query_ctx.sql_dialect(),
613 ParseOptions::default(),
614 )
615 .map_err(BoxedError::new)
616 .context(ExternalSnafu)?;
617
618 ensure!(
619 stmts.len() == 1,
620 InvalidSqlSnafu {
621 err_msg: format!("Expect only one statement, found {}", stmts.len())
622 }
623 );
624 let stmt = &stmts[0];
625
626 let plan = match stmt {
628 Statement::Tql(_) => return Ok(FlowType::Batching),
630 _ => engine
631 .planner()
632 .plan(&QueryStatement::Sql(stmt.clone()), query_ctx)
633 .await
634 .map_err(BoxedError::new)
635 .context(ExternalSnafu)?,
636 };
637
638 struct FindAggr {
640 is_aggr: bool,
641 }
642
643 impl TreeNodeVisitor<'_> for FindAggr {
644 type Node = LogicalPlan;
645 fn f_down(
646 &mut self,
647 node: &Self::Node,
648 ) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
649 {
650 match node {
651 LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
652 self.is_aggr = true;
653 return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
654 }
655 _ => (),
656 }
657 Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
658 }
659 }
660
661 let mut find_aggr = FindAggr { is_aggr: false };
662
663 plan.visit_with_subqueries(&mut find_aggr)
664 .context(BuildDfLogicalPlanSnafu)?;
665 if find_aggr.is_aggr {
666 Ok(FlowType::Batching)
667 } else {
668 Ok(FlowType::Streaming)
669 }
670 }
671
672 #[tracing::instrument(skip_all)]
673 pub async fn create_view(
674 &self,
675 create_view: CreateView,
676 ctx: QueryContextRef,
677 ) -> Result<TableRef> {
678 let logical_plan = match &*create_view.query {
680 Statement::Query(query) => {
681 self.plan(
682 &QueryStatement::Sql(Statement::Query(query.clone())),
683 ctx.clone(),
684 )
685 .await?
686 }
687 Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?,
688 _ => {
689 return InvalidViewStmtSnafu {}.fail();
690 }
691 };
692 let definition = create_view.to_string();
694
695 let schema: Schema = logical_plan
698 .schema()
699 .clone()
700 .try_into()
701 .context(ConvertSchemaSnafu)?;
702 let plan_columns: Vec<_> = schema
703 .column_schemas()
704 .iter()
705 .map(|c| c.name.clone())
706 .collect();
707
708 let columns: Vec<_> = create_view
709 .columns
710 .iter()
711 .map(|ident| ident.to_string())
712 .collect();
713
714 if !columns.is_empty() {
716 ensure!(
717 columns.len() == plan_columns.len(),
718 error::ViewColumnsMismatchSnafu {
719 view_name: create_view.name.to_string(),
720 expected: plan_columns.len(),
721 actual: columns.len(),
722 }
723 );
724 }
725
726 let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
729 .context(ExtractTableNamesSnafu)?;
730
731 let table_names = table_names.into_iter().map(|t| t.into()).collect();
732
733 let encoded_plan = DFLogicalSubstraitConvertor
740 .encode(&plan, DefaultSerializer)
741 .context(SubstraitCodecSnafu)?;
742
743 let expr = expr_helper::to_create_view_expr(
744 create_view,
745 encoded_plan.to_vec(),
746 table_names,
747 columns,
748 plan_columns,
749 definition,
750 ctx.clone(),
751 )?;
752
753 self.create_view_by_expr(expr, ctx).await
755 }
756
757 pub async fn create_view_by_expr(
758 &self,
759 expr: CreateViewExpr,
760 ctx: QueryContextRef,
761 ) -> Result<TableRef> {
762 ensure! {
763 !(expr.create_if_not_exists & expr.or_replace),
764 InvalidSqlSnafu {
765 err_msg: "syntax error Create Or Replace and If Not Exist cannot be used together",
766 }
767 };
768 let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
769
770 let schema_exists = self
771 .table_metadata_manager
772 .schema_manager()
773 .exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name))
774 .await
775 .context(TableMetadataManagerSnafu)?;
776
777 ensure!(
778 schema_exists,
779 SchemaNotFoundSnafu {
780 schema_info: &expr.schema_name,
781 }
782 );
783
784 if let Some(table) = self
786 .catalog_manager
787 .table(
788 &expr.catalog_name,
789 &expr.schema_name,
790 &expr.view_name,
791 Some(&ctx),
792 )
793 .await
794 .context(CatalogSnafu)?
795 {
796 let table_type = table.table_info().table_type;
797
798 match (table_type, expr.create_if_not_exists, expr.or_replace) {
799 (TableType::View, true, false) => {
800 return Ok(table);
801 }
802 (TableType::View, false, false) => {
803 return ViewAlreadyExistsSnafu {
804 name: format_full_table_name(
805 &expr.catalog_name,
806 &expr.schema_name,
807 &expr.view_name,
808 ),
809 }
810 .fail();
811 }
812 (TableType::View, _, true) => {
813 }
815 _ => {
816 return TableAlreadyExistsSnafu {
817 table: format_full_table_name(
818 &expr.catalog_name,
819 &expr.schema_name,
820 &expr.view_name,
821 ),
822 }
823 .fail();
824 }
825 }
826 }
827
828 ensure!(
829 NAME_PATTERN_REG.is_match(&expr.view_name),
830 InvalidViewNameSnafu {
831 name: expr.view_name.clone(),
832 }
833 );
834
835 let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
836
837 let mut view_info = RawTableInfo {
838 ident: metadata::TableIdent {
839 table_id: 0,
841 version: 0,
842 },
843 name: expr.view_name.clone(),
844 desc: None,
845 catalog_name: expr.catalog_name.clone(),
846 schema_name: expr.schema_name.clone(),
847 meta: RawTableMeta::default(),
849 table_type: TableType::View,
850 };
851
852 let request = SubmitDdlTaskRequest {
853 query_context: ctx,
854 task: DdlTask::new_create_view(expr, view_info.clone()),
855 };
856
857 let resp = self
858 .procedure_executor
859 .submit_ddl_task(&ExecutorContext::default(), request)
860 .await
861 .context(error::ExecuteDdlSnafu)?;
862
863 debug!(
864 "Submit creating view '{view_name}' task response: {:?}",
865 resp
866 );
867
868 let view_id = resp
869 .table_ids
870 .into_iter()
871 .next()
872 .context(error::UnexpectedSnafu {
873 violated: "expected table_id",
874 })?;
875 info!("Successfully created view '{view_name}' with view id {view_id}");
876
877 view_info.ident.table_id = view_id;
878
879 let view_info = Arc::new(view_info.try_into().context(CreateTableInfoSnafu)?);
880
881 let table = DistTable::table(view_info);
882
883 self.cache_invalidator
885 .invalidate(
886 &Context::default(),
887 &[
888 CacheIdent::TableId(view_id),
889 CacheIdent::TableName(view_name.clone()),
890 ],
891 )
892 .await
893 .context(error::InvalidateTableCacheSnafu)?;
894
895 Ok(table)
896 }
897
898 #[tracing::instrument(skip_all)]
899 pub async fn drop_flow(
900 &self,
901 catalog_name: String,
902 flow_name: String,
903 drop_if_exists: bool,
904 query_context: QueryContextRef,
905 ) -> Result<Output> {
906 if let Some(flow) = self
907 .flow_metadata_manager
908 .flow_name_manager()
909 .get(&catalog_name, &flow_name)
910 .await
911 .context(error::TableMetadataManagerSnafu)?
912 {
913 let flow_id = flow.flow_id();
914 let task = DropFlowTask {
915 catalog_name,
916 flow_name,
917 flow_id,
918 drop_if_exists,
919 };
920 self.drop_flow_procedure(task, query_context).await?;
921
922 Ok(Output::new_with_affected_rows(0))
923 } else if drop_if_exists {
924 Ok(Output::new_with_affected_rows(0))
925 } else {
926 FlowNotFoundSnafu {
927 flow_name: format_full_flow_name(&catalog_name, &flow_name),
928 }
929 .fail()
930 }
931 }
932
933 async fn drop_flow_procedure(
934 &self,
935 expr: DropFlowTask,
936 query_context: QueryContextRef,
937 ) -> Result<SubmitDdlTaskResponse> {
938 let request = SubmitDdlTaskRequest {
939 query_context,
940 task: DdlTask::new_drop_flow(expr),
941 };
942
943 self.procedure_executor
944 .submit_ddl_task(&ExecutorContext::default(), request)
945 .await
946 .context(error::ExecuteDdlSnafu)
947 }
948
949 #[cfg(feature = "enterprise")]
950 #[tracing::instrument(skip_all)]
951 pub(super) async fn drop_trigger(
952 &self,
953 catalog_name: String,
954 trigger_name: String,
955 drop_if_exists: bool,
956 query_context: QueryContextRef,
957 ) -> Result<Output> {
958 let task = DropTriggerTask {
959 catalog_name,
960 trigger_name,
961 drop_if_exists,
962 };
963 self.drop_trigger_procedure(task, query_context).await?;
964 Ok(Output::new_with_affected_rows(0))
965 }
966
967 #[cfg(feature = "enterprise")]
968 async fn drop_trigger_procedure(
969 &self,
970 expr: DropTriggerTask,
971 query_context: QueryContextRef,
972 ) -> Result<SubmitDdlTaskResponse> {
973 let request = SubmitDdlTaskRequest {
974 query_context,
975 task: DdlTask::new_drop_trigger(expr),
976 };
977
978 self.procedure_executor
979 .submit_ddl_task(&ExecutorContext::default(), request)
980 .await
981 .context(error::ExecuteDdlSnafu)
982 }
983
984 #[tracing::instrument(skip_all)]
986 pub(crate) async fn drop_view(
987 &self,
988 catalog: String,
989 schema: String,
990 view: String,
991 drop_if_exists: bool,
992 query_context: QueryContextRef,
993 ) -> Result<Output> {
994 let view_info = if let Some(view) = self
995 .catalog_manager
996 .table(&catalog, &schema, &view, None)
997 .await
998 .context(CatalogSnafu)?
999 {
1000 view.table_info()
1001 } else if drop_if_exists {
1002 return Ok(Output::new_with_affected_rows(0));
1004 } else {
1005 return TableNotFoundSnafu {
1006 table_name: format_full_table_name(&catalog, &schema, &view),
1007 }
1008 .fail();
1009 };
1010
1011 ensure!(
1013 view_info.table_type == TableType::View,
1014 error::InvalidViewSnafu {
1015 msg: "not a view",
1016 view_name: format_full_table_name(&catalog, &schema, &view),
1017 }
1018 );
1019
1020 let view_id = view_info.table_id();
1021
1022 let task = DropViewTask {
1023 catalog,
1024 schema,
1025 view,
1026 view_id,
1027 drop_if_exists,
1028 };
1029
1030 self.drop_view_procedure(task, query_context).await?;
1031
1032 Ok(Output::new_with_affected_rows(0))
1033 }
1034
1035 async fn drop_view_procedure(
1037 &self,
1038 expr: DropViewTask,
1039 query_context: QueryContextRef,
1040 ) -> Result<SubmitDdlTaskResponse> {
1041 let request = SubmitDdlTaskRequest {
1042 query_context,
1043 task: DdlTask::new_drop_view(expr),
1044 };
1045
1046 self.procedure_executor
1047 .submit_ddl_task(&ExecutorContext::default(), request)
1048 .await
1049 .context(error::ExecuteDdlSnafu)
1050 }
1051
1052 #[tracing::instrument(skip_all)]
1053 pub async fn alter_logical_tables(
1054 &self,
1055 alter_table_exprs: Vec<AlterTableExpr>,
1056 query_context: QueryContextRef,
1057 ) -> Result<Output> {
1058 let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
1059 ensure!(
1060 !alter_table_exprs.is_empty(),
1061 EmptyDdlExprSnafu {
1062 name: "alter logical tables"
1063 }
1064 );
1065
1066 let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
1068 for expr in alter_table_exprs {
1069 let catalog = if expr.catalog_name.is_empty() {
1071 query_context.current_catalog()
1072 } else {
1073 &expr.catalog_name
1074 };
1075 let schema = if expr.schema_name.is_empty() {
1076 query_context.current_schema()
1077 } else {
1078 expr.schema_name.clone()
1079 };
1080 let table_name = &expr.table_name;
1081 let table = self
1082 .catalog_manager
1083 .table(catalog, &schema, table_name, Some(&query_context))
1084 .await
1085 .context(CatalogSnafu)?
1086 .with_context(|| TableNotFoundSnafu {
1087 table_name: format_full_table_name(catalog, &schema, table_name),
1088 })?;
1089 let table_id = table.table_info().ident.table_id;
1090 let physical_table_id = self
1091 .table_metadata_manager
1092 .table_route_manager()
1093 .get_physical_table_id(table_id)
1094 .await
1095 .context(TableMetadataManagerSnafu)?;
1096 groups.entry(physical_table_id).or_default().push(expr);
1097 }
1098
1099 let mut handles = Vec::with_capacity(groups.len());
1101 for (_physical_table_id, exprs) in groups {
1102 let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
1103 handles.push(fut);
1104 }
1105 let _results = futures::future::try_join_all(handles).await?;
1106
1107 Ok(Output::new_with_affected_rows(0))
1108 }
1109
1110 #[tracing::instrument(skip_all)]
1111 pub async fn drop_table(
1112 &self,
1113 table_name: TableName,
1114 drop_if_exists: bool,
1115 query_context: QueryContextRef,
1116 ) -> Result<Output> {
1117 self.drop_tables(&[table_name], drop_if_exists, query_context)
1119 .await
1120 }
1121
1122 #[tracing::instrument(skip_all)]
1123 pub async fn drop_tables(
1124 &self,
1125 table_names: &[TableName],
1126 drop_if_exists: bool,
1127 query_context: QueryContextRef,
1128 ) -> Result<Output> {
1129 let mut tables = Vec::with_capacity(table_names.len());
1130 for table_name in table_names {
1131 ensure!(
1132 !is_readonly_schema(&table_name.schema_name),
1133 SchemaReadOnlySnafu {
1134 name: table_name.schema_name.clone()
1135 }
1136 );
1137
1138 if let Some(table) = self
1139 .catalog_manager
1140 .table(
1141 &table_name.catalog_name,
1142 &table_name.schema_name,
1143 &table_name.table_name,
1144 Some(&query_context),
1145 )
1146 .await
1147 .context(CatalogSnafu)?
1148 {
1149 tables.push(table.table_info().table_id());
1150 } else if drop_if_exists {
1151 continue;
1153 } else {
1154 return TableNotFoundSnafu {
1155 table_name: table_name.to_string(),
1156 }
1157 .fail();
1158 }
1159 }
1160
1161 for (table_name, table_id) in table_names.iter().zip(tables.into_iter()) {
1162 self.drop_table_procedure(table_name, table_id, drop_if_exists, query_context.clone())
1163 .await?;
1164
1165 self.cache_invalidator
1167 .invalidate(
1168 &Context::default(),
1169 &[
1170 CacheIdent::TableId(table_id),
1171 CacheIdent::TableName(table_name.clone()),
1172 ],
1173 )
1174 .await
1175 .context(error::InvalidateTableCacheSnafu)?;
1176 }
1177 Ok(Output::new_with_affected_rows(0))
1178 }
1179
1180 #[tracing::instrument(skip_all)]
1181 pub async fn drop_database(
1182 &self,
1183 catalog: String,
1184 schema: String,
1185 drop_if_exists: bool,
1186 query_context: QueryContextRef,
1187 ) -> Result<Output> {
1188 ensure!(
1189 !is_readonly_schema(&schema),
1190 SchemaReadOnlySnafu { name: schema }
1191 );
1192
1193 if self
1194 .catalog_manager
1195 .schema_exists(&catalog, &schema, None)
1196 .await
1197 .context(CatalogSnafu)?
1198 {
1199 if schema == query_context.current_schema() {
1200 SchemaInUseSnafu { name: schema }.fail()
1201 } else {
1202 self.drop_database_procedure(catalog, schema, drop_if_exists, query_context)
1203 .await?;
1204
1205 Ok(Output::new_with_affected_rows(0))
1206 }
1207 } else if drop_if_exists {
1208 Ok(Output::new_with_affected_rows(0))
1210 } else {
1211 SchemaNotFoundSnafu {
1212 schema_info: schema,
1213 }
1214 .fail()
1215 }
1216 }
1217
1218 #[tracing::instrument(skip_all)]
1219 pub async fn truncate_table(
1220 &self,
1221 table_name: TableName,
1222 time_ranges: Vec<(Timestamp, Timestamp)>,
1223 query_context: QueryContextRef,
1224 ) -> Result<Output> {
1225 ensure!(
1226 !is_readonly_schema(&table_name.schema_name),
1227 SchemaReadOnlySnafu {
1228 name: table_name.schema_name.clone()
1229 }
1230 );
1231
1232 let table = self
1233 .catalog_manager
1234 .table(
1235 &table_name.catalog_name,
1236 &table_name.schema_name,
1237 &table_name.table_name,
1238 Some(&query_context),
1239 )
1240 .await
1241 .context(CatalogSnafu)?
1242 .with_context(|| TableNotFoundSnafu {
1243 table_name: table_name.to_string(),
1244 })?;
1245 let table_id = table.table_info().table_id();
1246 self.truncate_table_procedure(&table_name, table_id, time_ranges, query_context)
1247 .await?;
1248
1249 Ok(Output::new_with_affected_rows(0))
1250 }
1251
1252 #[tracing::instrument(skip_all)]
1253 pub async fn alter_table(
1254 &self,
1255 alter_table: AlterTable,
1256 query_context: QueryContextRef,
1257 ) -> Result<Output> {
1258 if matches!(
1259 alter_table.alter_operation(),
1260 AlterTableOperation::Repartition { .. }
1261 ) {
1262 let _request = expr_helper::to_repartition_request(alter_table, &query_context)?;
1263 return NotSupportedSnafu {
1264 feat: "ALTER TABLE REPARTITION",
1265 }
1266 .fail();
1267 }
1268
1269 let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
1270 self.alter_table_inner(expr, query_context).await
1271 }
1272
1273 #[tracing::instrument(skip_all)]
1274 pub async fn alter_table_inner(
1275 &self,
1276 expr: AlterTableExpr,
1277 query_context: QueryContextRef,
1278 ) -> Result<Output> {
1279 ensure!(
1280 !is_readonly_schema(&expr.schema_name),
1281 SchemaReadOnlySnafu {
1282 name: expr.schema_name.clone()
1283 }
1284 );
1285
1286 let catalog_name = if expr.catalog_name.is_empty() {
1287 DEFAULT_CATALOG_NAME.to_string()
1288 } else {
1289 expr.catalog_name.clone()
1290 };
1291
1292 let schema_name = if expr.schema_name.is_empty() {
1293 DEFAULT_SCHEMA_NAME.to_string()
1294 } else {
1295 expr.schema_name.clone()
1296 };
1297
1298 let table_name = expr.table_name.clone();
1299
1300 let table = self
1301 .catalog_manager
1302 .table(
1303 &catalog_name,
1304 &schema_name,
1305 &table_name,
1306 Some(&query_context),
1307 )
1308 .await
1309 .context(CatalogSnafu)?
1310 .with_context(|| TableNotFoundSnafu {
1311 table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
1312 })?;
1313
1314 let table_id = table.table_info().ident.table_id;
1315 let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?;
1316 if !need_alter {
1317 return Ok(Output::new_with_affected_rows(0));
1318 }
1319 info!(
1320 "Table info before alter is {:?}, expr: {:?}",
1321 table.table_info(),
1322 expr
1323 );
1324
1325 let physical_table_id = self
1326 .table_metadata_manager
1327 .table_route_manager()
1328 .get_physical_table_id(table_id)
1329 .await
1330 .context(TableMetadataManagerSnafu)?;
1331
1332 let (req, invalidate_keys) = if physical_table_id == table_id {
1333 let req = SubmitDdlTaskRequest {
1335 query_context,
1336 task: DdlTask::new_alter_table(expr),
1337 };
1338
1339 let invalidate_keys = vec![
1340 CacheIdent::TableId(table_id),
1341 CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1342 ];
1343
1344 (req, invalidate_keys)
1345 } else {
1346 let req = SubmitDdlTaskRequest {
1348 query_context,
1349 task: DdlTask::new_alter_logical_tables(vec![expr]),
1350 };
1351
1352 let mut invalidate_keys = vec![
1353 CacheIdent::TableId(physical_table_id),
1354 CacheIdent::TableId(table_id),
1355 CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1356 ];
1357
1358 let physical_table = self
1359 .table_metadata_manager
1360 .table_info_manager()
1361 .get(physical_table_id)
1362 .await
1363 .context(TableMetadataManagerSnafu)?
1364 .map(|x| x.into_inner());
1365 if let Some(physical_table) = physical_table {
1366 let physical_table_name = TableName::new(
1367 physical_table.table_info.catalog_name,
1368 physical_table.table_info.schema_name,
1369 physical_table.table_info.name,
1370 );
1371 invalidate_keys.push(CacheIdent::TableName(physical_table_name));
1372 }
1373
1374 (req, invalidate_keys)
1375 };
1376
1377 self.procedure_executor
1378 .submit_ddl_task(&ExecutorContext::default(), req)
1379 .await
1380 .context(error::ExecuteDdlSnafu)?;
1381
1382 self.cache_invalidator
1384 .invalidate(&Context::default(), &invalidate_keys)
1385 .await
1386 .context(error::InvalidateTableCacheSnafu)?;
1387
1388 Ok(Output::new_with_affected_rows(0))
1389 }
1390
1391 #[cfg(feature = "enterprise")]
1392 #[tracing::instrument(skip_all)]
1393 pub async fn alter_trigger(
1394 &self,
1395 _alter_expr: AlterTrigger,
1396 _query_context: QueryContextRef,
1397 ) -> Result<Output> {
1398 crate::error::NotSupportedSnafu {
1399 feat: "alter trigger",
1400 }
1401 .fail()
1402 }
1403
1404 #[tracing::instrument(skip_all)]
1405 pub async fn alter_database(
1406 &self,
1407 alter_expr: AlterDatabase,
1408 query_context: QueryContextRef,
1409 ) -> Result<Output> {
1410 let alter_expr = expr_helper::to_alter_database_expr(alter_expr, &query_context)?;
1411 self.alter_database_inner(alter_expr, query_context).await
1412 }
1413
1414 #[tracing::instrument(skip_all)]
1415 pub async fn alter_database_inner(
1416 &self,
1417 alter_expr: AlterDatabaseExpr,
1418 query_context: QueryContextRef,
1419 ) -> Result<Output> {
1420 ensure!(
1421 !is_readonly_schema(&alter_expr.schema_name),
1422 SchemaReadOnlySnafu {
1423 name: query_context.current_schema().clone()
1424 }
1425 );
1426
1427 let exists = self
1428 .catalog_manager
1429 .schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
1430 .await
1431 .context(CatalogSnafu)?;
1432 ensure!(
1433 exists,
1434 SchemaNotFoundSnafu {
1435 schema_info: alter_expr.schema_name,
1436 }
1437 );
1438
1439 let cache_ident = [CacheIdent::SchemaName(SchemaName {
1440 catalog_name: alter_expr.catalog_name.clone(),
1441 schema_name: alter_expr.schema_name.clone(),
1442 })];
1443
1444 self.alter_database_procedure(alter_expr, query_context)
1445 .await?;
1446
1447 self.cache_invalidator
1449 .invalidate(&Context::default(), &cache_ident)
1450 .await
1451 .context(error::InvalidateTableCacheSnafu)?;
1452
1453 Ok(Output::new_with_affected_rows(0))
1454 }
1455
1456 async fn create_table_procedure(
1457 &self,
1458 create_table: CreateTableExpr,
1459 partitions: Vec<PartitionExpr>,
1460 table_info: RawTableInfo,
1461 query_context: QueryContextRef,
1462 ) -> Result<SubmitDdlTaskResponse> {
1463 let partitions = partitions
1464 .into_iter()
1465 .map(|expr| expr.as_pb_partition().context(PartitionExprToPbSnafu))
1466 .collect::<Result<Vec<_>>>()?;
1467
1468 let request = SubmitDdlTaskRequest {
1469 query_context,
1470 task: DdlTask::new_create_table(create_table, partitions, table_info),
1471 };
1472
1473 self.procedure_executor
1474 .submit_ddl_task(&ExecutorContext::default(), request)
1475 .await
1476 .context(error::ExecuteDdlSnafu)
1477 }
1478
1479 async fn create_logical_tables_procedure(
1480 &self,
1481 tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
1482 query_context: QueryContextRef,
1483 ) -> Result<SubmitDdlTaskResponse> {
1484 let request = SubmitDdlTaskRequest {
1485 query_context,
1486 task: DdlTask::new_create_logical_tables(tables_data),
1487 };
1488
1489 self.procedure_executor
1490 .submit_ddl_task(&ExecutorContext::default(), request)
1491 .await
1492 .context(error::ExecuteDdlSnafu)
1493 }
1494
1495 async fn alter_logical_tables_procedure(
1496 &self,
1497 tables_data: Vec<AlterTableExpr>,
1498 query_context: QueryContextRef,
1499 ) -> Result<SubmitDdlTaskResponse> {
1500 let request = SubmitDdlTaskRequest {
1501 query_context,
1502 task: DdlTask::new_alter_logical_tables(tables_data),
1503 };
1504
1505 self.procedure_executor
1506 .submit_ddl_task(&ExecutorContext::default(), request)
1507 .await
1508 .context(error::ExecuteDdlSnafu)
1509 }
1510
1511 async fn drop_table_procedure(
1512 &self,
1513 table_name: &TableName,
1514 table_id: TableId,
1515 drop_if_exists: bool,
1516 query_context: QueryContextRef,
1517 ) -> Result<SubmitDdlTaskResponse> {
1518 let request = SubmitDdlTaskRequest {
1519 query_context,
1520 task: DdlTask::new_drop_table(
1521 table_name.catalog_name.clone(),
1522 table_name.schema_name.clone(),
1523 table_name.table_name.clone(),
1524 table_id,
1525 drop_if_exists,
1526 ),
1527 };
1528
1529 self.procedure_executor
1530 .submit_ddl_task(&ExecutorContext::default(), request)
1531 .await
1532 .context(error::ExecuteDdlSnafu)
1533 }
1534
1535 async fn drop_database_procedure(
1536 &self,
1537 catalog: String,
1538 schema: String,
1539 drop_if_exists: bool,
1540 query_context: QueryContextRef,
1541 ) -> Result<SubmitDdlTaskResponse> {
1542 let request = SubmitDdlTaskRequest {
1543 query_context,
1544 task: DdlTask::new_drop_database(catalog, schema, drop_if_exists),
1545 };
1546
1547 self.procedure_executor
1548 .submit_ddl_task(&ExecutorContext::default(), request)
1549 .await
1550 .context(error::ExecuteDdlSnafu)
1551 }
1552
1553 async fn alter_database_procedure(
1554 &self,
1555 alter_expr: AlterDatabaseExpr,
1556 query_context: QueryContextRef,
1557 ) -> Result<SubmitDdlTaskResponse> {
1558 let request = SubmitDdlTaskRequest {
1559 query_context,
1560 task: DdlTask::new_alter_database(alter_expr),
1561 };
1562
1563 self.procedure_executor
1564 .submit_ddl_task(&ExecutorContext::default(), request)
1565 .await
1566 .context(error::ExecuteDdlSnafu)
1567 }
1568
1569 async fn truncate_table_procedure(
1570 &self,
1571 table_name: &TableName,
1572 table_id: TableId,
1573 time_ranges: Vec<(Timestamp, Timestamp)>,
1574 query_context: QueryContextRef,
1575 ) -> Result<SubmitDdlTaskResponse> {
1576 let request = SubmitDdlTaskRequest {
1577 query_context,
1578 task: DdlTask::new_truncate_table(
1579 table_name.catalog_name.clone(),
1580 table_name.schema_name.clone(),
1581 table_name.table_name.clone(),
1582 table_id,
1583 time_ranges,
1584 ),
1585 };
1586
1587 self.procedure_executor
1588 .submit_ddl_task(&ExecutorContext::default(), request)
1589 .await
1590 .context(error::ExecuteDdlSnafu)
1591 }
1592
1593 #[tracing::instrument(skip_all)]
1594 pub async fn create_database(
1595 &self,
1596 database: &str,
1597 create_if_not_exists: bool,
1598 options: HashMap<String, String>,
1599 query_context: QueryContextRef,
1600 ) -> Result<Output> {
1601 let catalog = query_context.current_catalog();
1602 ensure!(
1603 NAME_PATTERN_REG.is_match(catalog),
1604 error::UnexpectedSnafu {
1605 violated: format!("Invalid catalog name: {}", catalog)
1606 }
1607 );
1608
1609 ensure!(
1610 NAME_PATTERN_REG.is_match(database),
1611 error::UnexpectedSnafu {
1612 violated: format!("Invalid database name: {}", database)
1613 }
1614 );
1615
1616 if !self
1617 .catalog_manager
1618 .schema_exists(catalog, database, None)
1619 .await
1620 .context(CatalogSnafu)?
1621 && !self.catalog_manager.is_reserved_schema_name(database)
1622 {
1623 self.create_database_procedure(
1624 catalog.to_string(),
1625 database.to_string(),
1626 create_if_not_exists,
1627 options,
1628 query_context,
1629 )
1630 .await?;
1631
1632 Ok(Output::new_with_affected_rows(1))
1633 } else if create_if_not_exists {
1634 Ok(Output::new_with_affected_rows(1))
1635 } else {
1636 error::SchemaExistsSnafu { name: database }.fail()
1637 }
1638 }
1639
1640 async fn create_database_procedure(
1641 &self,
1642 catalog: String,
1643 database: String,
1644 create_if_not_exists: bool,
1645 options: HashMap<String, String>,
1646 query_context: QueryContextRef,
1647 ) -> Result<SubmitDdlTaskResponse> {
1648 let request = SubmitDdlTaskRequest {
1649 query_context,
1650 task: DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
1651 };
1652
1653 self.procedure_executor
1654 .submit_ddl_task(&ExecutorContext::default(), request)
1655 .await
1656 .context(error::ExecuteDdlSnafu)
1657 }
1658}
1659
1660pub fn parse_partitions(
1662 create_table: &CreateTableExpr,
1663 partitions: Option<Partitions>,
1664 query_ctx: &QueryContextRef,
1665) -> Result<(Vec<PartitionExpr>, Vec<String>)> {
1666 let partition_columns = find_partition_columns(&partitions)?;
1669 let partition_exprs =
1670 find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
1671
1672 let exprs = partition_exprs.clone();
1674 MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true)
1675 .context(InvalidPartitionSnafu)?;
1676
1677 Ok((partition_exprs, partition_columns))
1678}
1679
1680fn parse_partitions_for_logical_validation(
1681 create_table: &CreateTableExpr,
1682 partitions: &Partitions,
1683 query_ctx: &QueryContextRef,
1684) -> Result<(Vec<String>, Vec<PartitionExpr>)> {
1685 let partition_columns = partitions
1686 .column_list
1687 .iter()
1688 .map(|ident| ident.value.clone())
1689 .collect::<Vec<_>>();
1690
1691 let column_name_and_type = partition_columns
1692 .iter()
1693 .map(|pc| {
1694 let column = create_table
1695 .column_defs
1696 .iter()
1697 .find(|c| &c.name == pc)
1698 .context(ColumnNotFoundSnafu { msg: pc.clone() })?;
1699 let column_name = &column.name;
1700 let data_type = ConcreteDataType::from(
1701 ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
1702 .context(ColumnDataTypeSnafu)?,
1703 );
1704 Ok((column_name, data_type))
1705 })
1706 .collect::<Result<HashMap<_, _>>>()?;
1707
1708 let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
1709 for expr in &partitions.exprs {
1710 let partition_expr = convert_one_expr(expr, &column_name_and_type, &query_ctx.timezone())?;
1711 partition_exprs.push(partition_expr);
1712 }
1713
1714 MultiDimPartitionRule::try_new(
1715 partition_columns.clone(),
1716 vec![],
1717 partition_exprs.clone(),
1718 true,
1719 )
1720 .context(InvalidPartitionSnafu)?;
1721
1722 Ok((partition_columns, partition_exprs))
1723}
1724
1725pub fn verify_alter(
1731 table_id: TableId,
1732 table_info: Arc<TableInfo>,
1733 expr: AlterTableExpr,
1734) -> Result<bool> {
1735 let request: AlterTableRequest =
1736 common_grpc_expr::alter_expr_to_request(table_id, expr, Some(&table_info.meta))
1737 .context(AlterExprToRequestSnafu)?;
1738
1739 let AlterTableRequest {
1740 table_name,
1741 alter_kind,
1742 ..
1743 } = &request;
1744
1745 if let AlterKind::RenameTable { new_table_name } = alter_kind {
1746 ensure!(
1747 NAME_PATTERN_REG.is_match(new_table_name),
1748 error::UnexpectedSnafu {
1749 violated: format!("Invalid table name: {}", new_table_name)
1750 }
1751 );
1752 } else if let AlterKind::AddColumns { columns } = alter_kind {
1753 let column_names: HashSet<_> = table_info
1756 .meta
1757 .schema
1758 .column_schemas()
1759 .iter()
1760 .map(|schema| &schema.name)
1761 .collect();
1762 if columns.iter().all(|column| {
1763 column_names.contains(&column.column_schema.name) && column.add_if_not_exists
1764 }) {
1765 return Ok(false);
1766 }
1767 }
1768
1769 let _ = table_info
1770 .meta
1771 .builder_with_alter_kind(table_name, &request.alter_kind)
1772 .context(error::TableSnafu)?
1773 .build()
1774 .context(error::BuildTableMetaSnafu { table_name })?;
1775
1776 Ok(true)
1777}
1778
1779pub fn create_table_info(
1780 create_table: &CreateTableExpr,
1781 partition_columns: Vec<String>,
1782) -> Result<RawTableInfo> {
1783 let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
1784 let mut column_name_to_index_map = HashMap::new();
1785
1786 for (idx, column) in create_table.column_defs.iter().enumerate() {
1787 let schema =
1788 column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
1789 column: &column.name,
1790 })?;
1791 let schema = schema.with_time_index(column.name == create_table.time_index);
1792
1793 column_schemas.push(schema);
1794 let _ = column_name_to_index_map.insert(column.name.clone(), idx);
1795 }
1796
1797 let timestamp_index = column_name_to_index_map
1798 .get(&create_table.time_index)
1799 .cloned();
1800
1801 let raw_schema = RawSchema {
1802 column_schemas: column_schemas.clone(),
1803 timestamp_index,
1804 version: 0,
1805 };
1806
1807 let primary_key_indices = create_table
1808 .primary_keys
1809 .iter()
1810 .map(|name| {
1811 column_name_to_index_map
1812 .get(name)
1813 .cloned()
1814 .context(ColumnNotFoundSnafu { msg: name })
1815 })
1816 .collect::<Result<Vec<_>>>()?;
1817
1818 let partition_key_indices = partition_columns
1819 .into_iter()
1820 .map(|col_name| {
1821 column_name_to_index_map
1822 .get(&col_name)
1823 .cloned()
1824 .context(ColumnNotFoundSnafu { msg: col_name })
1825 })
1826 .collect::<Result<Vec<_>>>()?;
1827
1828 let table_options = TableOptions::try_from_iter(&create_table.table_options)
1829 .context(UnrecognizedTableOptionSnafu)?;
1830
1831 let meta = RawTableMeta {
1832 schema: raw_schema,
1833 primary_key_indices,
1834 value_indices: vec![],
1835 engine: create_table.engine.clone(),
1836 next_column_id: column_schemas.len() as u32,
1837 region_numbers: vec![],
1838 options: table_options,
1839 created_on: Utc::now(),
1840 updated_on: Utc::now(),
1841 partition_key_indices,
1842 column_ids: vec![],
1843 };
1844
1845 let desc = if create_table.desc.is_empty() {
1846 create_table.table_options.get(COMMENT_KEY).cloned()
1847 } else {
1848 Some(create_table.desc.clone())
1849 };
1850
1851 let table_info = RawTableInfo {
1852 ident: metadata::TableIdent {
1853 table_id: 0,
1855 version: 0,
1856 },
1857 name: create_table.table_name.clone(),
1858 desc,
1859 catalog_name: create_table.catalog_name.clone(),
1860 schema_name: create_table.schema_name.clone(),
1861 meta,
1862 table_type: TableType::Base,
1863 };
1864 Ok(table_info)
1865}
1866
1867fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
1868 let columns = if let Some(partitions) = partitions {
1869 partitions
1870 .column_list
1871 .iter()
1872 .map(|x| x.value.clone())
1873 .collect::<Vec<_>>()
1874 } else {
1875 vec![]
1876 };
1877 Ok(columns)
1878}
1879
1880fn find_partition_entries(
1884 create_table: &CreateTableExpr,
1885 partitions: &Option<Partitions>,
1886 partition_columns: &[String],
1887 query_ctx: &QueryContextRef,
1888) -> Result<Vec<PartitionExpr>> {
1889 let Some(partitions) = partitions else {
1890 return Ok(vec![]);
1891 };
1892
1893 let column_name_and_type = partition_columns
1895 .iter()
1896 .map(|pc| {
1897 let column = create_table
1898 .column_defs
1899 .iter()
1900 .find(|c| &c.name == pc)
1901 .unwrap();
1903 let column_name = &column.name;
1904 let data_type = ConcreteDataType::from(
1905 ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
1906 .context(ColumnDataTypeSnafu)?,
1907 );
1908 Ok((column_name, data_type))
1909 })
1910 .collect::<Result<HashMap<_, _>>>()?;
1911
1912 let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
1914 for partition in &partitions.exprs {
1915 let partition_expr =
1916 convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
1917 partition_exprs.push(partition_expr);
1918 }
1919
1920 Ok(partition_exprs)
1921}
1922
1923fn convert_one_expr(
1924 expr: &Expr,
1925 column_name_and_type: &HashMap<&String, ConcreteDataType>,
1926 timezone: &Timezone,
1927) -> Result<PartitionExpr> {
1928 let Expr::BinaryOp { left, op, right } = expr else {
1929 return InvalidPartitionRuleSnafu {
1930 reason: "partition rule must be a binary expression",
1931 }
1932 .fail();
1933 };
1934
1935 let op =
1936 RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
1937 reason: format!("unsupported operator in partition expr {op}"),
1938 })?;
1939
1940 let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
1942 (Expr::Identifier(ident), Expr::Value(value)) => {
1944 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1945 let value = convert_value(&value.value, data_type, timezone, None)?;
1946 (Operand::Column(column_name), op, Operand::Value(value))
1947 }
1948 (Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
1949 if let Expr::Value(v) = &**expr =>
1950 {
1951 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1952 let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
1953 (Operand::Column(column_name), op, Operand::Value(value))
1954 }
1955 (Expr::Value(value), Expr::Identifier(ident)) => {
1957 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1958 let value = convert_value(&value.value, data_type, timezone, None)?;
1959 (Operand::Value(value), op, Operand::Column(column_name))
1960 }
1961 (Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
1962 if let Expr::Value(v) = &**expr =>
1963 {
1964 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1965 let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
1966 (Operand::Value(value), op, Operand::Column(column_name))
1967 }
1968 (Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
1969 let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
1971 let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
1972 (Operand::Expr(lhs), op, Operand::Expr(rhs))
1973 }
1974 _ => {
1975 return InvalidPartitionRuleSnafu {
1976 reason: format!("invalid partition expr {expr}"),
1977 }
1978 .fail();
1979 }
1980 };
1981
1982 Ok(PartitionExpr::new(lhs, op, rhs))
1983}
1984
1985fn convert_identifier(
1986 ident: &Ident,
1987 column_name_and_type: &HashMap<&String, ConcreteDataType>,
1988) -> Result<(String, ConcreteDataType)> {
1989 let column_name = ident.value.clone();
1990 let data_type = column_name_and_type
1991 .get(&column_name)
1992 .cloned()
1993 .with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
1994 Ok((column_name, data_type))
1995}
1996
1997fn convert_value(
1998 value: &ParserValue,
1999 data_type: ConcreteDataType,
2000 timezone: &Timezone,
2001 unary_op: Option<UnaryOperator>,
2002) -> Result<Value> {
2003 sql_value_to_value(
2004 "<NONAME>",
2005 &data_type,
2006 value,
2007 Some(timezone),
2008 unary_op,
2009 false,
2010 )
2011 .context(error::SqlCommonSnafu)
2012}
2013
2014#[cfg(test)]
2015mod test {
2016 use session::context::{QueryContext, QueryContextBuilder};
2017 use sql::dialect::GreptimeDbDialect;
2018 use sql::parser::{ParseOptions, ParserContext};
2019 use sql::statements::statement::Statement;
2020
2021 use super::*;
2022 use crate::expr_helper;
2023
2024 #[test]
2025 fn test_name_is_match() {
2026 assert!(!NAME_PATTERN_REG.is_match("/adaf"));
2027 assert!(!NAME_PATTERN_REG.is_match("🈲"));
2028 assert!(NAME_PATTERN_REG.is_match("hello"));
2029 assert!(NAME_PATTERN_REG.is_match("test@"));
2030 assert!(!NAME_PATTERN_REG.is_match("@test"));
2031 assert!(NAME_PATTERN_REG.is_match("test#"));
2032 assert!(!NAME_PATTERN_REG.is_match("#test"));
2033 assert!(!NAME_PATTERN_REG.is_match("@"));
2034 assert!(!NAME_PATTERN_REG.is_match("#"));
2035 }
2036
2037 #[tokio::test]
2038 #[ignore = "TODO(ruihang): WIP new partition rule"]
2039 async fn test_parse_partitions() {
2040 common_telemetry::init_default_ut_logging();
2041 let cases = [
2042 (
2043 r"
2044CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
2045PARTITION ON COLUMNS (b) (
2046 b < 'hz',
2047 b >= 'hz' AND b < 'sh',
2048 b >= 'sh'
2049)
2050ENGINE=mito",
2051 r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
2052 ),
2053 (
2054 r"
2055CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
2056PARTITION BY RANGE COLUMNS (b, a) (
2057 PARTITION r0 VALUES LESS THAN ('hz', 10),
2058 b < 'hz' AND a < 10,
2059 b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20,
2060 b >= 'sh' AND a >= 20
2061)
2062ENGINE=mito",
2063 r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
2064 ),
2065 ];
2066 let ctx = QueryContextBuilder::default().build().into();
2067 for (sql, expected) in cases {
2068 let result = ParserContext::create_with_dialect(
2069 sql,
2070 &GreptimeDbDialect {},
2071 ParseOptions::default(),
2072 )
2073 .unwrap();
2074 match &result[0] {
2075 Statement::CreateTable(c) => {
2076 let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
2077 let (partitions, _) =
2078 parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
2079 let json = serde_json::to_string(&partitions).unwrap();
2080 assert_eq!(json, expected);
2081 }
2082 _ => unreachable!(),
2083 }
2084 }
2085 }
2086}