1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use api::helper::ColumnDataTypeWrapper;
19use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
20use api::v1::{
21 AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, column_def,
22};
23#[cfg(feature = "enterprise")]
24use api::v1::{
25 CreateTriggerExpr as PbCreateTriggerExpr, meta::CreateTriggerTask as PbCreateTriggerTask,
26};
27use catalog::CatalogManagerRef;
28use chrono::Utc;
29use common_base::regex_pattern::NAME_PATTERN_REG;
30use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, is_readonly_schema};
31use common_catalog::{format_full_flow_name, format_full_table_name};
32use common_error::ext::BoxedError;
33use common_meta::cache_invalidator::Context;
34use common_meta::ddl::create_flow::FlowType;
35use common_meta::instruction::CacheIdent;
36use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
37use common_meta::procedure_executor::ExecutorContext;
38#[cfg(feature = "enterprise")]
39use common_meta::rpc::ddl::trigger::CreateTriggerTask;
40#[cfg(feature = "enterprise")]
41use common_meta::rpc::ddl::trigger::DropTriggerTask;
42use common_meta::rpc::ddl::{
43 CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
44 SubmitDdlTaskResponse,
45};
46use common_query::Output;
47use common_sql::convert::sql_value_to_value;
48use common_telemetry::{debug, info, tracing, warn};
49use common_time::{Timestamp, Timezone};
50use datafusion_common::tree_node::TreeNodeVisitor;
51use datafusion_expr::LogicalPlan;
52use datatypes::prelude::ConcreteDataType;
53use datatypes::schema::{ColumnSchema, RawSchema, Schema};
54use datatypes::value::Value;
55use partition::expr::{Operand, PartitionExpr, RestrictedOp};
56use partition::multi_dim::MultiDimPartitionRule;
57use query::parser::QueryStatement;
58use query::plan::extract_and_rewrite_full_table_names;
59use query::query_engine::DefaultSerializer;
60use query::sql::create_table_stmt;
61use session::context::QueryContextRef;
62use session::table_name::table_idents_to_full_name;
63use snafu::{OptionExt, ResultExt, ensure};
64use sql::parser::{ParseOptions, ParserContext};
65#[cfg(feature = "enterprise")]
66use sql::statements::alter::trigger::AlterTrigger;
67use sql::statements::alter::{AlterDatabase, AlterTable, AlterTableOperation};
68#[cfg(feature = "enterprise")]
69use sql::statements::create::trigger::CreateTrigger;
70use sql::statements::create::{
71 CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
72};
73use sql::statements::statement::Statement;
74use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
75use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
76use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
77use table::TableRef;
78use table::dist_table::DistTable;
79use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
80use table::requests::{AlterKind, AlterTableRequest, COMMENT_KEY, TableOptions};
81use table::table_name::TableName;
82use table::table_reference::TableReference;
83
84use crate::error::{
85 self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
86 ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
87 DeserializePartitionExprSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
88 FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
89 InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu,
90 PartitionExprToPbSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu,
91 SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
92 UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
93};
94use crate::expr_helper::{self, RepartitionRequest};
95use crate::statement::StatementExecutor;
96use crate::statement::show::create_partitions_stmt;
97
98impl StatementExecutor {
99 pub fn catalog_manager(&self) -> CatalogManagerRef {
100 self.catalog_manager.clone()
101 }
102
103 #[tracing::instrument(skip_all)]
104 pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
105 let (catalog, schema, _table) = table_idents_to_full_name(&stmt.name, &ctx)
106 .map_err(BoxedError::new)
107 .context(error::ExternalSnafu)?;
108
109 let schema_options = self
110 .table_metadata_manager
111 .schema_manager()
112 .get(SchemaNameKey {
113 catalog: &catalog,
114 schema: &schema,
115 })
116 .await
117 .context(TableMetadataManagerSnafu)?
118 .map(|v| v.into_inner());
119
120 let create_expr = &mut expr_helper::create_to_expr(&stmt, &ctx)?;
121 if let Some(schema_options) = schema_options {
124 for (key, value) in schema_options.extra_options.iter() {
125 if key.starts_with("compaction.") {
126 continue;
127 }
128 create_expr
129 .table_options
130 .entry(key.clone())
131 .or_insert(value.clone());
132 }
133 }
134
135 self.create_table_inner(create_expr, stmt.partitions, ctx)
136 .await
137 }
138
139 #[tracing::instrument(skip_all)]
140 pub async fn create_table_like(
141 &self,
142 stmt: CreateTableLike,
143 ctx: QueryContextRef,
144 ) -> Result<TableRef> {
145 let (catalog, schema, table) = table_idents_to_full_name(&stmt.source_name, &ctx)
146 .map_err(BoxedError::new)
147 .context(error::ExternalSnafu)?;
148 let table_ref = self
149 .catalog_manager
150 .table(&catalog, &schema, &table, Some(&ctx))
151 .await
152 .context(CatalogSnafu)?
153 .context(TableNotFoundSnafu { table_name: &table })?;
154 let partitions = self
155 .partition_manager
156 .find_table_partitions(table_ref.table_info().table_id())
157 .await
158 .context(error::FindTablePartitionRuleSnafu { table_name: table })?;
159
160 let schema_options = self
162 .table_metadata_manager
163 .schema_manager()
164 .get(SchemaNameKey {
165 catalog: &catalog,
166 schema: &schema,
167 })
168 .await
169 .context(TableMetadataManagerSnafu)?
170 .map(|v| v.into_inner());
171
172 let quote_style = ctx.quote_style();
173 let mut create_stmt =
174 create_table_stmt(&table_ref.table_info(), schema_options, quote_style)
175 .context(error::ParseQuerySnafu)?;
176 create_stmt.name = stmt.table_name;
177 create_stmt.if_not_exists = false;
178
179 let table_info = table_ref.table_info();
180 let partitions =
181 create_partitions_stmt(&table_info, partitions)?.and_then(|mut partitions| {
182 if !partitions.column_list.is_empty() {
183 partitions.set_quote(quote_style);
184 Some(partitions)
185 } else {
186 None
187 }
188 });
189
190 let create_expr = &mut expr_helper::create_to_expr(&create_stmt, &ctx)?;
191 self.create_table_inner(create_expr, partitions, ctx).await
192 }
193
194 #[tracing::instrument(skip_all)]
195 pub async fn create_external_table(
196 &self,
197 create_expr: CreateExternalTable,
198 ctx: QueryContextRef,
199 ) -> Result<TableRef> {
200 let create_expr = &mut expr_helper::create_external_expr(create_expr, &ctx).await?;
201 self.create_table_inner(create_expr, None, ctx).await
202 }
203
204 #[tracing::instrument(skip_all)]
205 pub async fn create_table_inner(
206 &self,
207 create_table: &mut CreateTableExpr,
208 partitions: Option<Partitions>,
209 query_ctx: QueryContextRef,
210 ) -> Result<TableRef> {
211 ensure!(
212 !is_readonly_schema(&create_table.schema_name),
213 SchemaReadOnlySnafu {
214 name: create_table.schema_name.clone()
215 }
216 );
217
218 if create_table.engine == METRIC_ENGINE_NAME
219 && create_table
220 .table_options
221 .contains_key(LOGICAL_TABLE_METADATA_KEY)
222 {
223 if let Some(partitions) = partitions.as_ref()
224 && !partitions.exprs.is_empty()
225 {
226 self.validate_logical_table_partition_rule(create_table, partitions, &query_ctx)
227 .await?;
228 }
229 self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
231 .await?
232 .into_iter()
233 .next()
234 .context(error::UnexpectedSnafu {
235 violated: "expected to create logical tables",
236 })
237 } else {
238 self.create_non_logic_table(create_table, partitions, query_ctx)
240 .await
241 }
242 }
243
244 #[tracing::instrument(skip_all)]
245 pub async fn create_non_logic_table(
246 &self,
247 create_table: &mut CreateTableExpr,
248 partitions: Option<Partitions>,
249 query_ctx: QueryContextRef,
250 ) -> Result<TableRef> {
251 let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
252
253 let schema = self
255 .table_metadata_manager
256 .schema_manager()
257 .get(SchemaNameKey::new(
258 &create_table.catalog_name,
259 &create_table.schema_name,
260 ))
261 .await
262 .context(TableMetadataManagerSnafu)?;
263 ensure!(
264 schema.is_some(),
265 SchemaNotFoundSnafu {
266 schema_info: &create_table.schema_name,
267 }
268 );
269
270 if let Some(table) = self
272 .catalog_manager
273 .table(
274 &create_table.catalog_name,
275 &create_table.schema_name,
276 &create_table.table_name,
277 Some(&query_ctx),
278 )
279 .await
280 .context(CatalogSnafu)?
281 {
282 return if create_table.create_if_not_exists {
283 Ok(table)
284 } else {
285 TableAlreadyExistsSnafu {
286 table: format_full_table_name(
287 &create_table.catalog_name,
288 &create_table.schema_name,
289 &create_table.table_name,
290 ),
291 }
292 .fail()
293 };
294 }
295
296 ensure!(
297 NAME_PATTERN_REG.is_match(&create_table.table_name),
298 InvalidTableNameSnafu {
299 table_name: &create_table.table_name,
300 }
301 );
302
303 let table_name = TableName::new(
304 &create_table.catalog_name,
305 &create_table.schema_name,
306 &create_table.table_name,
307 );
308
309 let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
310 let mut table_info = create_table_info(create_table, partition_cols)?;
311
312 let resp = self
313 .create_table_procedure(
314 create_table.clone(),
315 partitions,
316 table_info.clone(),
317 query_ctx,
318 )
319 .await?;
320
321 let table_id = resp
322 .table_ids
323 .into_iter()
324 .next()
325 .context(error::UnexpectedSnafu {
326 violated: "expected table_id",
327 })?;
328 info!("Successfully created table '{table_name}' with table id {table_id}");
329
330 table_info.ident.table_id = table_id;
331
332 let table_info: Arc<TableInfo> =
333 Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?);
334 create_table.table_id = Some(api::v1::TableId { id: table_id });
335
336 let table = DistTable::table(table_info);
337
338 Ok(table)
339 }
340
341 #[tracing::instrument(skip_all)]
342 pub async fn create_logical_tables(
343 &self,
344 create_table_exprs: &[CreateTableExpr],
345 query_context: QueryContextRef,
346 ) -> Result<Vec<TableRef>> {
347 let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
348 ensure!(
349 !create_table_exprs.is_empty(),
350 EmptyDdlExprSnafu {
351 name: "create logic tables"
352 }
353 );
354
355 for create_table in create_table_exprs {
357 ensure!(
358 NAME_PATTERN_REG.is_match(&create_table.table_name),
359 InvalidTableNameSnafu {
360 table_name: &create_table.table_name,
361 }
362 );
363 }
364
365 let mut raw_tables_info = create_table_exprs
366 .iter()
367 .map(|create| create_table_info(create, vec![]))
368 .collect::<Result<Vec<_>>>()?;
369 let tables_data = create_table_exprs
370 .iter()
371 .cloned()
372 .zip(raw_tables_info.iter().cloned())
373 .collect::<Vec<_>>();
374
375 let resp = self
376 .create_logical_tables_procedure(tables_data, query_context)
377 .await?;
378
379 let table_ids = resp.table_ids;
380 ensure!(
381 table_ids.len() == raw_tables_info.len(),
382 CreateLogicalTablesSnafu {
383 reason: format!(
384 "The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}",
385 raw_tables_info.len(),
386 table_ids.len()
387 )
388 }
389 );
390 info!("Successfully created logical tables: {:?}", table_ids);
391
392 for (i, table_info) in raw_tables_info.iter_mut().enumerate() {
393 table_info.ident.table_id = table_ids[i];
394 }
395 let tables_info = raw_tables_info
396 .into_iter()
397 .map(|x| x.try_into().context(CreateTableInfoSnafu))
398 .collect::<Result<Vec<_>>>()?;
399
400 Ok(tables_info
401 .into_iter()
402 .map(|x| DistTable::table(Arc::new(x)))
403 .collect())
404 }
405
406 async fn validate_logical_table_partition_rule(
407 &self,
408 create_table: &CreateTableExpr,
409 partitions: &Partitions,
410 query_ctx: &QueryContextRef,
411 ) -> Result<()> {
412 let (_, mut logical_partition_exprs) =
413 parse_partitions_for_logical_validation(create_table, partitions, query_ctx)?;
414
415 let physical_table_name = create_table
416 .table_options
417 .get(LOGICAL_TABLE_METADATA_KEY)
418 .with_context(|| CreateLogicalTablesSnafu {
419 reason: format!(
420 "expect `{LOGICAL_TABLE_METADATA_KEY}` option on creating logical table"
421 ),
422 })?;
423
424 let physical_table = self
425 .catalog_manager
426 .table(
427 &create_table.catalog_name,
428 &create_table.schema_name,
429 physical_table_name,
430 Some(query_ctx),
431 )
432 .await
433 .context(CatalogSnafu)?
434 .context(TableNotFoundSnafu {
435 table_name: physical_table_name.clone(),
436 })?;
437
438 let physical_table_info = physical_table.table_info();
439 let partition_rule = self
440 .partition_manager
441 .find_table_partition_rule(&physical_table_info)
442 .await
443 .context(error::FindTablePartitionRuleSnafu {
444 table_name: physical_table_name.clone(),
445 })?;
446
447 let multi_dim_rule = partition_rule
448 .as_ref()
449 .as_any()
450 .downcast_ref::<MultiDimPartitionRule>()
451 .context(InvalidPartitionRuleSnafu {
452 reason: "physical table partition rule is not range-based",
453 })?;
454
455 let mut physical_partition_exprs = multi_dim_rule.exprs().to_vec();
457 logical_partition_exprs.sort_unstable();
458 physical_partition_exprs.sort_unstable();
459
460 ensure!(
461 physical_partition_exprs == logical_partition_exprs,
462 InvalidPartitionRuleSnafu {
463 reason: format!(
464 "logical table partition rule must match the corresponding physical table's\n logical table partition exprs:\t\t {:?}\n physical table partition exprs:\t {:?}",
465 logical_partition_exprs, physical_partition_exprs
466 ),
467 }
468 );
469
470 Ok(())
471 }
472
473 #[cfg(feature = "enterprise")]
474 #[tracing::instrument(skip_all)]
475 pub async fn create_trigger(
476 &self,
477 stmt: CreateTrigger,
478 query_context: QueryContextRef,
479 ) -> Result<Output> {
480 let expr = expr_helper::to_create_trigger_task_expr(stmt, &query_context)?;
481 self.create_trigger_inner(expr, query_context).await
482 }
483
484 #[cfg(feature = "enterprise")]
485 pub async fn create_trigger_inner(
486 &self,
487 expr: PbCreateTriggerExpr,
488 query_context: QueryContextRef,
489 ) -> Result<Output> {
490 self.create_trigger_procedure(expr, query_context).await?;
491 Ok(Output::new_with_affected_rows(0))
492 }
493
494 #[cfg(feature = "enterprise")]
495 async fn create_trigger_procedure(
496 &self,
497 expr: PbCreateTriggerExpr,
498 query_context: QueryContextRef,
499 ) -> Result<SubmitDdlTaskResponse> {
500 let task = CreateTriggerTask::try_from(PbCreateTriggerTask {
501 create_trigger: Some(expr),
502 })
503 .context(error::InvalidExprSnafu)?;
504
505 let request = SubmitDdlTaskRequest {
506 query_context,
507 task: DdlTask::new_create_trigger(task),
508 };
509
510 self.procedure_executor
511 .submit_ddl_task(&ExecutorContext::default(), request)
512 .await
513 .context(error::ExecuteDdlSnafu)
514 }
515
516 #[tracing::instrument(skip_all)]
517 pub async fn create_flow(
518 &self,
519 stmt: CreateFlow,
520 query_context: QueryContextRef,
521 ) -> Result<Output> {
522 let expr = expr_helper::to_create_flow_task_expr(stmt, &query_context)?;
524
525 self.create_flow_inner(expr, query_context).await
526 }
527
528 pub async fn create_flow_inner(
529 &self,
530 expr: CreateFlowExpr,
531 query_context: QueryContextRef,
532 ) -> Result<Output> {
533 self.create_flow_procedure(expr, query_context).await?;
534 Ok(Output::new_with_affected_rows(0))
535 }
536
537 async fn create_flow_procedure(
538 &self,
539 expr: CreateFlowExpr,
540 query_context: QueryContextRef,
541 ) -> Result<SubmitDdlTaskResponse> {
542 let flow_type = self
543 .determine_flow_type(&expr, query_context.clone())
544 .await?;
545 info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
546
547 let expr = {
548 let mut expr = expr;
549 expr.flow_options
550 .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
551 expr
552 };
553
554 let task = CreateFlowTask::try_from(PbCreateFlowTask {
555 create_flow: Some(expr),
556 })
557 .context(error::InvalidExprSnafu)?;
558 let request = SubmitDdlTaskRequest {
559 query_context,
560 task: DdlTask::new_create_flow(task),
561 };
562
563 self.procedure_executor
564 .submit_ddl_task(&ExecutorContext::default(), request)
565 .await
566 .context(error::ExecuteDdlSnafu)
567 }
568
569 async fn determine_flow_type(
573 &self,
574 expr: &CreateFlowExpr,
575 query_ctx: QueryContextRef,
576 ) -> Result<FlowType> {
577 for src_table_name in &expr.source_table_names {
579 let table = self
580 .catalog_manager()
581 .table(
582 &src_table_name.catalog_name,
583 &src_table_name.schema_name,
584 &src_table_name.table_name,
585 Some(&query_ctx),
586 )
587 .await
588 .map_err(BoxedError::new)
589 .context(ExternalSnafu)?
590 .with_context(|| TableNotFoundSnafu {
591 table_name: format_full_table_name(
592 &src_table_name.catalog_name,
593 &src_table_name.schema_name,
594 &src_table_name.table_name,
595 ),
596 })?;
597
598 if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
600 warn!(
601 "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
602 format_full_table_name(
603 &src_table_name.catalog_name,
604 &src_table_name.schema_name,
605 &src_table_name.table_name
606 ),
607 expr.flow_name
608 );
609 return Ok(FlowType::Streaming);
610 }
611 }
612
613 let engine = &self.query_engine;
614 let stmts = ParserContext::create_with_dialect(
615 &expr.sql,
616 query_ctx.sql_dialect(),
617 ParseOptions::default(),
618 )
619 .map_err(BoxedError::new)
620 .context(ExternalSnafu)?;
621
622 ensure!(
623 stmts.len() == 1,
624 InvalidSqlSnafu {
625 err_msg: format!("Expect only one statement, found {}", stmts.len())
626 }
627 );
628 let stmt = &stmts[0];
629
630 let plan = match stmt {
632 Statement::Tql(_) => return Ok(FlowType::Batching),
634 _ => engine
635 .planner()
636 .plan(&QueryStatement::Sql(stmt.clone()), query_ctx)
637 .await
638 .map_err(BoxedError::new)
639 .context(ExternalSnafu)?,
640 };
641
642 struct FindAggr {
644 is_aggr: bool,
645 }
646
647 impl TreeNodeVisitor<'_> for FindAggr {
648 type Node = LogicalPlan;
649 fn f_down(
650 &mut self,
651 node: &Self::Node,
652 ) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
653 {
654 match node {
655 LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
656 self.is_aggr = true;
657 return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
658 }
659 _ => (),
660 }
661 Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
662 }
663 }
664
665 let mut find_aggr = FindAggr { is_aggr: false };
666
667 plan.visit_with_subqueries(&mut find_aggr)
668 .context(BuildDfLogicalPlanSnafu)?;
669 if find_aggr.is_aggr {
670 Ok(FlowType::Batching)
671 } else {
672 Ok(FlowType::Streaming)
673 }
674 }
675
676 #[tracing::instrument(skip_all)]
677 pub async fn create_view(
678 &self,
679 create_view: CreateView,
680 ctx: QueryContextRef,
681 ) -> Result<TableRef> {
682 let logical_plan = match &*create_view.query {
684 Statement::Query(query) => {
685 self.plan(
686 &QueryStatement::Sql(Statement::Query(query.clone())),
687 ctx.clone(),
688 )
689 .await?
690 }
691 Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?,
692 _ => {
693 return InvalidViewStmtSnafu {}.fail();
694 }
695 };
696 let definition = create_view.to_string();
698
699 let schema: Schema = logical_plan
702 .schema()
703 .clone()
704 .try_into()
705 .context(ConvertSchemaSnafu)?;
706 let plan_columns: Vec<_> = schema
707 .column_schemas()
708 .iter()
709 .map(|c| c.name.clone())
710 .collect();
711
712 let columns: Vec<_> = create_view
713 .columns
714 .iter()
715 .map(|ident| ident.to_string())
716 .collect();
717
718 if !columns.is_empty() {
720 ensure!(
721 columns.len() == plan_columns.len(),
722 error::ViewColumnsMismatchSnafu {
723 view_name: create_view.name.to_string(),
724 expected: plan_columns.len(),
725 actual: columns.len(),
726 }
727 );
728 }
729
730 let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
733 .context(ExtractTableNamesSnafu)?;
734
735 let table_names = table_names.into_iter().map(|t| t.into()).collect();
736
737 let encoded_plan = DFLogicalSubstraitConvertor
744 .encode(&plan, DefaultSerializer)
745 .context(SubstraitCodecSnafu)?;
746
747 let expr = expr_helper::to_create_view_expr(
748 create_view,
749 encoded_plan.to_vec(),
750 table_names,
751 columns,
752 plan_columns,
753 definition,
754 ctx.clone(),
755 )?;
756
757 self.create_view_by_expr(expr, ctx).await
759 }
760
761 pub async fn create_view_by_expr(
762 &self,
763 expr: CreateViewExpr,
764 ctx: QueryContextRef,
765 ) -> Result<TableRef> {
766 ensure! {
767 !(expr.create_if_not_exists & expr.or_replace),
768 InvalidSqlSnafu {
769 err_msg: "syntax error Create Or Replace and If Not Exist cannot be used together",
770 }
771 };
772 let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
773
774 let schema_exists = self
775 .table_metadata_manager
776 .schema_manager()
777 .exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name))
778 .await
779 .context(TableMetadataManagerSnafu)?;
780
781 ensure!(
782 schema_exists,
783 SchemaNotFoundSnafu {
784 schema_info: &expr.schema_name,
785 }
786 );
787
788 if let Some(table) = self
790 .catalog_manager
791 .table(
792 &expr.catalog_name,
793 &expr.schema_name,
794 &expr.view_name,
795 Some(&ctx),
796 )
797 .await
798 .context(CatalogSnafu)?
799 {
800 let table_type = table.table_info().table_type;
801
802 match (table_type, expr.create_if_not_exists, expr.or_replace) {
803 (TableType::View, true, false) => {
804 return Ok(table);
805 }
806 (TableType::View, false, false) => {
807 return ViewAlreadyExistsSnafu {
808 name: format_full_table_name(
809 &expr.catalog_name,
810 &expr.schema_name,
811 &expr.view_name,
812 ),
813 }
814 .fail();
815 }
816 (TableType::View, _, true) => {
817 }
819 _ => {
820 return TableAlreadyExistsSnafu {
821 table: format_full_table_name(
822 &expr.catalog_name,
823 &expr.schema_name,
824 &expr.view_name,
825 ),
826 }
827 .fail();
828 }
829 }
830 }
831
832 ensure!(
833 NAME_PATTERN_REG.is_match(&expr.view_name),
834 InvalidViewNameSnafu {
835 name: expr.view_name.clone(),
836 }
837 );
838
839 let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
840
841 let mut view_info = RawTableInfo {
842 ident: metadata::TableIdent {
843 table_id: 0,
845 version: 0,
846 },
847 name: expr.view_name.clone(),
848 desc: None,
849 catalog_name: expr.catalog_name.clone(),
850 schema_name: expr.schema_name.clone(),
851 meta: RawTableMeta::default(),
853 table_type: TableType::View,
854 };
855
856 let request = SubmitDdlTaskRequest {
857 query_context: ctx,
858 task: DdlTask::new_create_view(expr, view_info.clone()),
859 };
860
861 let resp = self
862 .procedure_executor
863 .submit_ddl_task(&ExecutorContext::default(), request)
864 .await
865 .context(error::ExecuteDdlSnafu)?;
866
867 debug!(
868 "Submit creating view '{view_name}' task response: {:?}",
869 resp
870 );
871
872 let view_id = resp
873 .table_ids
874 .into_iter()
875 .next()
876 .context(error::UnexpectedSnafu {
877 violated: "expected table_id",
878 })?;
879 info!("Successfully created view '{view_name}' with view id {view_id}");
880
881 view_info.ident.table_id = view_id;
882
883 let view_info = Arc::new(view_info.try_into().context(CreateTableInfoSnafu)?);
884
885 let table = DistTable::table(view_info);
886
887 self.cache_invalidator
889 .invalidate(
890 &Context::default(),
891 &[
892 CacheIdent::TableId(view_id),
893 CacheIdent::TableName(view_name.clone()),
894 ],
895 )
896 .await
897 .context(error::InvalidateTableCacheSnafu)?;
898
899 Ok(table)
900 }
901
902 #[tracing::instrument(skip_all)]
903 pub async fn drop_flow(
904 &self,
905 catalog_name: String,
906 flow_name: String,
907 drop_if_exists: bool,
908 query_context: QueryContextRef,
909 ) -> Result<Output> {
910 if let Some(flow) = self
911 .flow_metadata_manager
912 .flow_name_manager()
913 .get(&catalog_name, &flow_name)
914 .await
915 .context(error::TableMetadataManagerSnafu)?
916 {
917 let flow_id = flow.flow_id();
918 let task = DropFlowTask {
919 catalog_name,
920 flow_name,
921 flow_id,
922 drop_if_exists,
923 };
924 self.drop_flow_procedure(task, query_context).await?;
925
926 Ok(Output::new_with_affected_rows(0))
927 } else if drop_if_exists {
928 Ok(Output::new_with_affected_rows(0))
929 } else {
930 FlowNotFoundSnafu {
931 flow_name: format_full_flow_name(&catalog_name, &flow_name),
932 }
933 .fail()
934 }
935 }
936
937 async fn drop_flow_procedure(
938 &self,
939 expr: DropFlowTask,
940 query_context: QueryContextRef,
941 ) -> Result<SubmitDdlTaskResponse> {
942 let request = SubmitDdlTaskRequest {
943 query_context,
944 task: DdlTask::new_drop_flow(expr),
945 };
946
947 self.procedure_executor
948 .submit_ddl_task(&ExecutorContext::default(), request)
949 .await
950 .context(error::ExecuteDdlSnafu)
951 }
952
953 #[cfg(feature = "enterprise")]
954 #[tracing::instrument(skip_all)]
955 pub(super) async fn drop_trigger(
956 &self,
957 catalog_name: String,
958 trigger_name: String,
959 drop_if_exists: bool,
960 query_context: QueryContextRef,
961 ) -> Result<Output> {
962 let task = DropTriggerTask {
963 catalog_name,
964 trigger_name,
965 drop_if_exists,
966 };
967 self.drop_trigger_procedure(task, query_context).await?;
968 Ok(Output::new_with_affected_rows(0))
969 }
970
971 #[cfg(feature = "enterprise")]
972 async fn drop_trigger_procedure(
973 &self,
974 expr: DropTriggerTask,
975 query_context: QueryContextRef,
976 ) -> Result<SubmitDdlTaskResponse> {
977 let request = SubmitDdlTaskRequest {
978 query_context,
979 task: DdlTask::new_drop_trigger(expr),
980 };
981
982 self.procedure_executor
983 .submit_ddl_task(&ExecutorContext::default(), request)
984 .await
985 .context(error::ExecuteDdlSnafu)
986 }
987
988 #[tracing::instrument(skip_all)]
990 pub(crate) async fn drop_view(
991 &self,
992 catalog: String,
993 schema: String,
994 view: String,
995 drop_if_exists: bool,
996 query_context: QueryContextRef,
997 ) -> Result<Output> {
998 let view_info = if let Some(view) = self
999 .catalog_manager
1000 .table(&catalog, &schema, &view, None)
1001 .await
1002 .context(CatalogSnafu)?
1003 {
1004 view.table_info()
1005 } else if drop_if_exists {
1006 return Ok(Output::new_with_affected_rows(0));
1008 } else {
1009 return TableNotFoundSnafu {
1010 table_name: format_full_table_name(&catalog, &schema, &view),
1011 }
1012 .fail();
1013 };
1014
1015 ensure!(
1017 view_info.table_type == TableType::View,
1018 error::InvalidViewSnafu {
1019 msg: "not a view",
1020 view_name: format_full_table_name(&catalog, &schema, &view),
1021 }
1022 );
1023
1024 let view_id = view_info.table_id();
1025
1026 let task = DropViewTask {
1027 catalog,
1028 schema,
1029 view,
1030 view_id,
1031 drop_if_exists,
1032 };
1033
1034 self.drop_view_procedure(task, query_context).await?;
1035
1036 Ok(Output::new_with_affected_rows(0))
1037 }
1038
1039 async fn drop_view_procedure(
1041 &self,
1042 expr: DropViewTask,
1043 query_context: QueryContextRef,
1044 ) -> Result<SubmitDdlTaskResponse> {
1045 let request = SubmitDdlTaskRequest {
1046 query_context,
1047 task: DdlTask::new_drop_view(expr),
1048 };
1049
1050 self.procedure_executor
1051 .submit_ddl_task(&ExecutorContext::default(), request)
1052 .await
1053 .context(error::ExecuteDdlSnafu)
1054 }
1055
1056 #[tracing::instrument(skip_all)]
1057 pub async fn alter_logical_tables(
1058 &self,
1059 alter_table_exprs: Vec<AlterTableExpr>,
1060 query_context: QueryContextRef,
1061 ) -> Result<Output> {
1062 let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
1063 ensure!(
1064 !alter_table_exprs.is_empty(),
1065 EmptyDdlExprSnafu {
1066 name: "alter logical tables"
1067 }
1068 );
1069
1070 let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
1072 for expr in alter_table_exprs {
1073 let catalog = if expr.catalog_name.is_empty() {
1075 query_context.current_catalog()
1076 } else {
1077 &expr.catalog_name
1078 };
1079 let schema = if expr.schema_name.is_empty() {
1080 query_context.current_schema()
1081 } else {
1082 expr.schema_name.clone()
1083 };
1084 let table_name = &expr.table_name;
1085 let table = self
1086 .catalog_manager
1087 .table(catalog, &schema, table_name, Some(&query_context))
1088 .await
1089 .context(CatalogSnafu)?
1090 .with_context(|| TableNotFoundSnafu {
1091 table_name: format_full_table_name(catalog, &schema, table_name),
1092 })?;
1093 let table_id = table.table_info().ident.table_id;
1094 let physical_table_id = self
1095 .table_metadata_manager
1096 .table_route_manager()
1097 .get_physical_table_id(table_id)
1098 .await
1099 .context(TableMetadataManagerSnafu)?;
1100 groups.entry(physical_table_id).or_default().push(expr);
1101 }
1102
1103 let mut handles = Vec::with_capacity(groups.len());
1105 for (_physical_table_id, exprs) in groups {
1106 let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
1107 handles.push(fut);
1108 }
1109 let _results = futures::future::try_join_all(handles).await?;
1110
1111 Ok(Output::new_with_affected_rows(0))
1112 }
1113
1114 #[tracing::instrument(skip_all)]
1115 pub async fn drop_table(
1116 &self,
1117 table_name: TableName,
1118 drop_if_exists: bool,
1119 query_context: QueryContextRef,
1120 ) -> Result<Output> {
1121 self.drop_tables(&[table_name], drop_if_exists, query_context)
1123 .await
1124 }
1125
1126 #[tracing::instrument(skip_all)]
1127 pub async fn drop_tables(
1128 &self,
1129 table_names: &[TableName],
1130 drop_if_exists: bool,
1131 query_context: QueryContextRef,
1132 ) -> Result<Output> {
1133 let mut tables = Vec::with_capacity(table_names.len());
1134 for table_name in table_names {
1135 ensure!(
1136 !is_readonly_schema(&table_name.schema_name),
1137 SchemaReadOnlySnafu {
1138 name: table_name.schema_name.clone()
1139 }
1140 );
1141
1142 if let Some(table) = self
1143 .catalog_manager
1144 .table(
1145 &table_name.catalog_name,
1146 &table_name.schema_name,
1147 &table_name.table_name,
1148 Some(&query_context),
1149 )
1150 .await
1151 .context(CatalogSnafu)?
1152 {
1153 tables.push(table.table_info().table_id());
1154 } else if drop_if_exists {
1155 continue;
1157 } else {
1158 return TableNotFoundSnafu {
1159 table_name: table_name.to_string(),
1160 }
1161 .fail();
1162 }
1163 }
1164
1165 for (table_name, table_id) in table_names.iter().zip(tables.into_iter()) {
1166 self.drop_table_procedure(table_name, table_id, drop_if_exists, query_context.clone())
1167 .await?;
1168
1169 self.cache_invalidator
1171 .invalidate(
1172 &Context::default(),
1173 &[
1174 CacheIdent::TableId(table_id),
1175 CacheIdent::TableName(table_name.clone()),
1176 ],
1177 )
1178 .await
1179 .context(error::InvalidateTableCacheSnafu)?;
1180 }
1181 Ok(Output::new_with_affected_rows(0))
1182 }
1183
1184 #[tracing::instrument(skip_all)]
1185 pub async fn drop_database(
1186 &self,
1187 catalog: String,
1188 schema: String,
1189 drop_if_exists: bool,
1190 query_context: QueryContextRef,
1191 ) -> Result<Output> {
1192 ensure!(
1193 !is_readonly_schema(&schema),
1194 SchemaReadOnlySnafu { name: schema }
1195 );
1196
1197 if self
1198 .catalog_manager
1199 .schema_exists(&catalog, &schema, None)
1200 .await
1201 .context(CatalogSnafu)?
1202 {
1203 if schema == query_context.current_schema() {
1204 SchemaInUseSnafu { name: schema }.fail()
1205 } else {
1206 self.drop_database_procedure(catalog, schema, drop_if_exists, query_context)
1207 .await?;
1208
1209 Ok(Output::new_with_affected_rows(0))
1210 }
1211 } else if drop_if_exists {
1212 Ok(Output::new_with_affected_rows(0))
1214 } else {
1215 SchemaNotFoundSnafu {
1216 schema_info: schema,
1217 }
1218 .fail()
1219 }
1220 }
1221
1222 #[tracing::instrument(skip_all)]
1223 pub async fn truncate_table(
1224 &self,
1225 table_name: TableName,
1226 time_ranges: Vec<(Timestamp, Timestamp)>,
1227 query_context: QueryContextRef,
1228 ) -> Result<Output> {
1229 ensure!(
1230 !is_readonly_schema(&table_name.schema_name),
1231 SchemaReadOnlySnafu {
1232 name: table_name.schema_name.clone()
1233 }
1234 );
1235
1236 let table = self
1237 .catalog_manager
1238 .table(
1239 &table_name.catalog_name,
1240 &table_name.schema_name,
1241 &table_name.table_name,
1242 Some(&query_context),
1243 )
1244 .await
1245 .context(CatalogSnafu)?
1246 .with_context(|| TableNotFoundSnafu {
1247 table_name: table_name.to_string(),
1248 })?;
1249 let table_id = table.table_info().table_id();
1250 self.truncate_table_procedure(&table_name, table_id, time_ranges, query_context)
1251 .await?;
1252
1253 Ok(Output::new_with_affected_rows(0))
1254 }
1255
1256 #[tracing::instrument(skip_all)]
1257 pub async fn alter_table(
1258 &self,
1259 alter_table: AlterTable,
1260 query_context: QueryContextRef,
1261 ) -> Result<Output> {
1262 if matches!(
1263 alter_table.alter_operation(),
1264 AlterTableOperation::Repartition { .. }
1265 ) {
1266 let request = expr_helper::to_repartition_request(alter_table, &query_context)?;
1267 return self.repartition_table(request, &query_context).await;
1268 }
1269
1270 let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
1271 self.alter_table_inner(expr, query_context).await
1272 }
1273
1274 #[tracing::instrument(skip_all)]
1275 pub async fn repartition_table(
1276 &self,
1277 request: RepartitionRequest,
1278 query_context: &QueryContextRef,
1279 ) -> Result<Output> {
1280 ensure!(
1282 !is_readonly_schema(&request.schema_name),
1283 SchemaReadOnlySnafu {
1284 name: request.schema_name.clone()
1285 }
1286 );
1287
1288 let table_ref = TableReference::full(
1289 &request.catalog_name,
1290 &request.schema_name,
1291 &request.table_name,
1292 );
1293 let table = self
1295 .catalog_manager
1296 .table(
1297 &request.catalog_name,
1298 &request.schema_name,
1299 &request.table_name,
1300 Some(query_context),
1301 )
1302 .await
1303 .context(CatalogSnafu)?
1304 .with_context(|| TableNotFoundSnafu {
1305 table_name: table_ref.to_string(),
1306 })?;
1307 let table_id = table.table_info().ident.table_id;
1308 let (physical_table_id, physical_table_route) = self
1310 .table_metadata_manager
1311 .table_route_manager()
1312 .get_physical_table_route(table_id)
1313 .await
1314 .context(TableMetadataManagerSnafu)?;
1315
1316 ensure!(
1317 physical_table_id == table_id,
1318 NotSupportedSnafu {
1319 feat: "REPARTITION on logical tables"
1320 }
1321 );
1322
1323 let table_info = table.table_info();
1324 let existing_partition_columns = table_info.meta.partition_columns().collect::<Vec<_>>();
1326 ensure!(
1328 !existing_partition_columns.is_empty(),
1329 InvalidPartitionRuleSnafu {
1330 reason: format!(
1331 "table {} does not have partition columns, cannot repartition",
1332 table_ref
1333 )
1334 }
1335 );
1336
1337 let column_name_and_type = existing_partition_columns
1340 .iter()
1341 .map(|column| (&column.name, column.data_type.clone()))
1342 .collect();
1343 let timezone = query_context.timezone();
1344 let from_partition_exprs = request
1346 .from_exprs
1347 .iter()
1348 .map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone))
1349 .collect::<Result<Vec<_>>>()?;
1350
1351 let into_partition_exprs = request
1352 .into_exprs
1353 .iter()
1354 .map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone))
1355 .collect::<Result<Vec<_>>>()?;
1356
1357 let mut existing_partition_exprs =
1359 Vec::with_capacity(physical_table_route.region_routes.len());
1360 for route in &physical_table_route.region_routes {
1361 let expr_json = route.region.partition_expr();
1362 if !expr_json.is_empty() {
1363 match PartitionExpr::from_json_str(&expr_json) {
1364 Ok(Some(expr)) => existing_partition_exprs.push(expr),
1365 Ok(None) => {
1366 }
1368 Err(e) => {
1369 return Err(e).context(DeserializePartitionExprSnafu);
1370 }
1371 }
1372 }
1373 }
1374
1375 for from_expr in &from_partition_exprs {
1378 ensure!(
1379 existing_partition_exprs.contains(from_expr),
1380 InvalidPartitionRuleSnafu {
1381 reason: format!(
1382 "partition expression '{}' does not exist in table {}",
1383 from_expr, table_ref
1384 )
1385 }
1386 );
1387 }
1388
1389 let new_partition_exprs: Vec<PartitionExpr> = existing_partition_exprs
1392 .into_iter()
1393 .filter(|expr| !from_partition_exprs.contains(expr))
1394 .chain(into_partition_exprs.clone().into_iter())
1395 .collect();
1396 let new_partition_exprs_len = new_partition_exprs.len();
1397
1398 let _ = MultiDimPartitionRule::try_new(
1400 existing_partition_columns
1401 .iter()
1402 .map(|c| c.name.clone())
1403 .collect(),
1404 vec![],
1405 new_partition_exprs,
1406 true,
1407 )
1408 .context(InvalidPartitionSnafu)?;
1409
1410 info!(
1411 "Repartition table {} (table_id={}) from {:?} to {:?}, new partition count: {}",
1412 table_ref,
1413 table_id,
1414 from_partition_exprs,
1415 into_partition_exprs,
1416 new_partition_exprs_len
1417 );
1418
1419 NotSupportedSnafu {
1422 feat: "ALTER TABLE REPARTITION",
1423 }
1424 .fail()
1425 }
1426
1427 #[tracing::instrument(skip_all)]
1428 pub async fn alter_table_inner(
1429 &self,
1430 expr: AlterTableExpr,
1431 query_context: QueryContextRef,
1432 ) -> Result<Output> {
1433 ensure!(
1434 !is_readonly_schema(&expr.schema_name),
1435 SchemaReadOnlySnafu {
1436 name: expr.schema_name.clone()
1437 }
1438 );
1439
1440 let catalog_name = if expr.catalog_name.is_empty() {
1441 DEFAULT_CATALOG_NAME.to_string()
1442 } else {
1443 expr.catalog_name.clone()
1444 };
1445
1446 let schema_name = if expr.schema_name.is_empty() {
1447 DEFAULT_SCHEMA_NAME.to_string()
1448 } else {
1449 expr.schema_name.clone()
1450 };
1451
1452 let table_name = expr.table_name.clone();
1453
1454 let table = self
1455 .catalog_manager
1456 .table(
1457 &catalog_name,
1458 &schema_name,
1459 &table_name,
1460 Some(&query_context),
1461 )
1462 .await
1463 .context(CatalogSnafu)?
1464 .with_context(|| TableNotFoundSnafu {
1465 table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
1466 })?;
1467
1468 let table_id = table.table_info().ident.table_id;
1469 let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?;
1470 if !need_alter {
1471 return Ok(Output::new_with_affected_rows(0));
1472 }
1473 info!(
1474 "Table info before alter is {:?}, expr: {:?}",
1475 table.table_info(),
1476 expr
1477 );
1478
1479 let physical_table_id = self
1480 .table_metadata_manager
1481 .table_route_manager()
1482 .get_physical_table_id(table_id)
1483 .await
1484 .context(TableMetadataManagerSnafu)?;
1485
1486 let (req, invalidate_keys) = if physical_table_id == table_id {
1487 let req = SubmitDdlTaskRequest {
1489 query_context,
1490 task: DdlTask::new_alter_table(expr),
1491 };
1492
1493 let invalidate_keys = vec![
1494 CacheIdent::TableId(table_id),
1495 CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1496 ];
1497
1498 (req, invalidate_keys)
1499 } else {
1500 let req = SubmitDdlTaskRequest {
1502 query_context,
1503 task: DdlTask::new_alter_logical_tables(vec![expr]),
1504 };
1505
1506 let mut invalidate_keys = vec![
1507 CacheIdent::TableId(physical_table_id),
1508 CacheIdent::TableId(table_id),
1509 CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1510 ];
1511
1512 let physical_table = self
1513 .table_metadata_manager
1514 .table_info_manager()
1515 .get(physical_table_id)
1516 .await
1517 .context(TableMetadataManagerSnafu)?
1518 .map(|x| x.into_inner());
1519 if let Some(physical_table) = physical_table {
1520 let physical_table_name = TableName::new(
1521 physical_table.table_info.catalog_name,
1522 physical_table.table_info.schema_name,
1523 physical_table.table_info.name,
1524 );
1525 invalidate_keys.push(CacheIdent::TableName(physical_table_name));
1526 }
1527
1528 (req, invalidate_keys)
1529 };
1530
1531 self.procedure_executor
1532 .submit_ddl_task(&ExecutorContext::default(), req)
1533 .await
1534 .context(error::ExecuteDdlSnafu)?;
1535
1536 self.cache_invalidator
1538 .invalidate(&Context::default(), &invalidate_keys)
1539 .await
1540 .context(error::InvalidateTableCacheSnafu)?;
1541
1542 Ok(Output::new_with_affected_rows(0))
1543 }
1544
1545 #[cfg(feature = "enterprise")]
1546 #[tracing::instrument(skip_all)]
1547 pub async fn alter_trigger(
1548 &self,
1549 _alter_expr: AlterTrigger,
1550 _query_context: QueryContextRef,
1551 ) -> Result<Output> {
1552 crate::error::NotSupportedSnafu {
1553 feat: "alter trigger",
1554 }
1555 .fail()
1556 }
1557
1558 #[tracing::instrument(skip_all)]
1559 pub async fn alter_database(
1560 &self,
1561 alter_expr: AlterDatabase,
1562 query_context: QueryContextRef,
1563 ) -> Result<Output> {
1564 let alter_expr = expr_helper::to_alter_database_expr(alter_expr, &query_context)?;
1565 self.alter_database_inner(alter_expr, query_context).await
1566 }
1567
1568 #[tracing::instrument(skip_all)]
1569 pub async fn alter_database_inner(
1570 &self,
1571 alter_expr: AlterDatabaseExpr,
1572 query_context: QueryContextRef,
1573 ) -> Result<Output> {
1574 ensure!(
1575 !is_readonly_schema(&alter_expr.schema_name),
1576 SchemaReadOnlySnafu {
1577 name: query_context.current_schema().clone()
1578 }
1579 );
1580
1581 let exists = self
1582 .catalog_manager
1583 .schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
1584 .await
1585 .context(CatalogSnafu)?;
1586 ensure!(
1587 exists,
1588 SchemaNotFoundSnafu {
1589 schema_info: alter_expr.schema_name,
1590 }
1591 );
1592
1593 let cache_ident = [CacheIdent::SchemaName(SchemaName {
1594 catalog_name: alter_expr.catalog_name.clone(),
1595 schema_name: alter_expr.schema_name.clone(),
1596 })];
1597
1598 self.alter_database_procedure(alter_expr, query_context)
1599 .await?;
1600
1601 self.cache_invalidator
1603 .invalidate(&Context::default(), &cache_ident)
1604 .await
1605 .context(error::InvalidateTableCacheSnafu)?;
1606
1607 Ok(Output::new_with_affected_rows(0))
1608 }
1609
1610 async fn create_table_procedure(
1611 &self,
1612 create_table: CreateTableExpr,
1613 partitions: Vec<PartitionExpr>,
1614 table_info: RawTableInfo,
1615 query_context: QueryContextRef,
1616 ) -> Result<SubmitDdlTaskResponse> {
1617 let partitions = partitions
1618 .into_iter()
1619 .map(|expr| expr.as_pb_partition().context(PartitionExprToPbSnafu))
1620 .collect::<Result<Vec<_>>>()?;
1621
1622 let request = SubmitDdlTaskRequest {
1623 query_context,
1624 task: DdlTask::new_create_table(create_table, partitions, table_info),
1625 };
1626
1627 self.procedure_executor
1628 .submit_ddl_task(&ExecutorContext::default(), request)
1629 .await
1630 .context(error::ExecuteDdlSnafu)
1631 }
1632
1633 async fn create_logical_tables_procedure(
1634 &self,
1635 tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
1636 query_context: QueryContextRef,
1637 ) -> Result<SubmitDdlTaskResponse> {
1638 let request = SubmitDdlTaskRequest {
1639 query_context,
1640 task: DdlTask::new_create_logical_tables(tables_data),
1641 };
1642
1643 self.procedure_executor
1644 .submit_ddl_task(&ExecutorContext::default(), request)
1645 .await
1646 .context(error::ExecuteDdlSnafu)
1647 }
1648
1649 async fn alter_logical_tables_procedure(
1650 &self,
1651 tables_data: Vec<AlterTableExpr>,
1652 query_context: QueryContextRef,
1653 ) -> Result<SubmitDdlTaskResponse> {
1654 let request = SubmitDdlTaskRequest {
1655 query_context,
1656 task: DdlTask::new_alter_logical_tables(tables_data),
1657 };
1658
1659 self.procedure_executor
1660 .submit_ddl_task(&ExecutorContext::default(), request)
1661 .await
1662 .context(error::ExecuteDdlSnafu)
1663 }
1664
1665 async fn drop_table_procedure(
1666 &self,
1667 table_name: &TableName,
1668 table_id: TableId,
1669 drop_if_exists: bool,
1670 query_context: QueryContextRef,
1671 ) -> Result<SubmitDdlTaskResponse> {
1672 let request = SubmitDdlTaskRequest {
1673 query_context,
1674 task: DdlTask::new_drop_table(
1675 table_name.catalog_name.clone(),
1676 table_name.schema_name.clone(),
1677 table_name.table_name.clone(),
1678 table_id,
1679 drop_if_exists,
1680 ),
1681 };
1682
1683 self.procedure_executor
1684 .submit_ddl_task(&ExecutorContext::default(), request)
1685 .await
1686 .context(error::ExecuteDdlSnafu)
1687 }
1688
1689 async fn drop_database_procedure(
1690 &self,
1691 catalog: String,
1692 schema: String,
1693 drop_if_exists: bool,
1694 query_context: QueryContextRef,
1695 ) -> Result<SubmitDdlTaskResponse> {
1696 let request = SubmitDdlTaskRequest {
1697 query_context,
1698 task: DdlTask::new_drop_database(catalog, schema, drop_if_exists),
1699 };
1700
1701 self.procedure_executor
1702 .submit_ddl_task(&ExecutorContext::default(), request)
1703 .await
1704 .context(error::ExecuteDdlSnafu)
1705 }
1706
1707 async fn alter_database_procedure(
1708 &self,
1709 alter_expr: AlterDatabaseExpr,
1710 query_context: QueryContextRef,
1711 ) -> Result<SubmitDdlTaskResponse> {
1712 let request = SubmitDdlTaskRequest {
1713 query_context,
1714 task: DdlTask::new_alter_database(alter_expr),
1715 };
1716
1717 self.procedure_executor
1718 .submit_ddl_task(&ExecutorContext::default(), request)
1719 .await
1720 .context(error::ExecuteDdlSnafu)
1721 }
1722
1723 async fn truncate_table_procedure(
1724 &self,
1725 table_name: &TableName,
1726 table_id: TableId,
1727 time_ranges: Vec<(Timestamp, Timestamp)>,
1728 query_context: QueryContextRef,
1729 ) -> Result<SubmitDdlTaskResponse> {
1730 let request = SubmitDdlTaskRequest {
1731 query_context,
1732 task: DdlTask::new_truncate_table(
1733 table_name.catalog_name.clone(),
1734 table_name.schema_name.clone(),
1735 table_name.table_name.clone(),
1736 table_id,
1737 time_ranges,
1738 ),
1739 };
1740
1741 self.procedure_executor
1742 .submit_ddl_task(&ExecutorContext::default(), request)
1743 .await
1744 .context(error::ExecuteDdlSnafu)
1745 }
1746
1747 #[tracing::instrument(skip_all)]
1748 pub async fn create_database(
1749 &self,
1750 database: &str,
1751 create_if_not_exists: bool,
1752 options: HashMap<String, String>,
1753 query_context: QueryContextRef,
1754 ) -> Result<Output> {
1755 let catalog = query_context.current_catalog();
1756 ensure!(
1757 NAME_PATTERN_REG.is_match(catalog),
1758 error::UnexpectedSnafu {
1759 violated: format!("Invalid catalog name: {}", catalog)
1760 }
1761 );
1762
1763 ensure!(
1764 NAME_PATTERN_REG.is_match(database),
1765 error::UnexpectedSnafu {
1766 violated: format!("Invalid database name: {}", database)
1767 }
1768 );
1769
1770 if !self
1771 .catalog_manager
1772 .schema_exists(catalog, database, None)
1773 .await
1774 .context(CatalogSnafu)?
1775 && !self.catalog_manager.is_reserved_schema_name(database)
1776 {
1777 self.create_database_procedure(
1778 catalog.to_string(),
1779 database.to_string(),
1780 create_if_not_exists,
1781 options,
1782 query_context,
1783 )
1784 .await?;
1785
1786 Ok(Output::new_with_affected_rows(1))
1787 } else if create_if_not_exists {
1788 Ok(Output::new_with_affected_rows(1))
1789 } else {
1790 error::SchemaExistsSnafu { name: database }.fail()
1791 }
1792 }
1793
1794 async fn create_database_procedure(
1795 &self,
1796 catalog: String,
1797 database: String,
1798 create_if_not_exists: bool,
1799 options: HashMap<String, String>,
1800 query_context: QueryContextRef,
1801 ) -> Result<SubmitDdlTaskResponse> {
1802 let request = SubmitDdlTaskRequest {
1803 query_context,
1804 task: DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
1805 };
1806
1807 self.procedure_executor
1808 .submit_ddl_task(&ExecutorContext::default(), request)
1809 .await
1810 .context(error::ExecuteDdlSnafu)
1811 }
1812}
1813
1814pub fn parse_partitions(
1816 create_table: &CreateTableExpr,
1817 partitions: Option<Partitions>,
1818 query_ctx: &QueryContextRef,
1819) -> Result<(Vec<PartitionExpr>, Vec<String>)> {
1820 let partition_columns = find_partition_columns(&partitions)?;
1823 let partition_exprs =
1824 find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
1825
1826 let exprs = partition_exprs.clone();
1828 MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true)
1829 .context(InvalidPartitionSnafu)?;
1830
1831 Ok((partition_exprs, partition_columns))
1832}
1833
1834fn parse_partitions_for_logical_validation(
1835 create_table: &CreateTableExpr,
1836 partitions: &Partitions,
1837 query_ctx: &QueryContextRef,
1838) -> Result<(Vec<String>, Vec<PartitionExpr>)> {
1839 let partition_columns = partitions
1840 .column_list
1841 .iter()
1842 .map(|ident| ident.value.clone())
1843 .collect::<Vec<_>>();
1844
1845 let column_name_and_type = partition_columns
1846 .iter()
1847 .map(|pc| {
1848 let column = create_table
1849 .column_defs
1850 .iter()
1851 .find(|c| &c.name == pc)
1852 .context(ColumnNotFoundSnafu { msg: pc.clone() })?;
1853 let column_name = &column.name;
1854 let data_type = ConcreteDataType::from(
1855 ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
1856 .context(ColumnDataTypeSnafu)?,
1857 );
1858 Ok((column_name, data_type))
1859 })
1860 .collect::<Result<HashMap<_, _>>>()?;
1861
1862 let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
1863 for expr in &partitions.exprs {
1864 let partition_expr = convert_one_expr(expr, &column_name_and_type, &query_ctx.timezone())?;
1865 partition_exprs.push(partition_expr);
1866 }
1867
1868 MultiDimPartitionRule::try_new(
1869 partition_columns.clone(),
1870 vec![],
1871 partition_exprs.clone(),
1872 true,
1873 )
1874 .context(InvalidPartitionSnafu)?;
1875
1876 Ok((partition_columns, partition_exprs))
1877}
1878
1879pub fn verify_alter(
1885 table_id: TableId,
1886 table_info: Arc<TableInfo>,
1887 expr: AlterTableExpr,
1888) -> Result<bool> {
1889 let request: AlterTableRequest =
1890 common_grpc_expr::alter_expr_to_request(table_id, expr, Some(&table_info.meta))
1891 .context(AlterExprToRequestSnafu)?;
1892
1893 let AlterTableRequest {
1894 table_name,
1895 alter_kind,
1896 ..
1897 } = &request;
1898
1899 if let AlterKind::RenameTable { new_table_name } = alter_kind {
1900 ensure!(
1901 NAME_PATTERN_REG.is_match(new_table_name),
1902 error::UnexpectedSnafu {
1903 violated: format!("Invalid table name: {}", new_table_name)
1904 }
1905 );
1906 } else if let AlterKind::AddColumns { columns } = alter_kind {
1907 let column_names: HashSet<_> = table_info
1910 .meta
1911 .schema
1912 .column_schemas()
1913 .iter()
1914 .map(|schema| &schema.name)
1915 .collect();
1916 if columns.iter().all(|column| {
1917 column_names.contains(&column.column_schema.name) && column.add_if_not_exists
1918 }) {
1919 return Ok(false);
1920 }
1921 }
1922
1923 let _ = table_info
1924 .meta
1925 .builder_with_alter_kind(table_name, &request.alter_kind)
1926 .context(error::TableSnafu)?
1927 .build()
1928 .context(error::BuildTableMetaSnafu { table_name })?;
1929
1930 Ok(true)
1931}
1932
1933pub fn create_table_info(
1934 create_table: &CreateTableExpr,
1935 partition_columns: Vec<String>,
1936) -> Result<RawTableInfo> {
1937 let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
1938 let mut column_name_to_index_map = HashMap::new();
1939
1940 for (idx, column) in create_table.column_defs.iter().enumerate() {
1941 let schema =
1942 column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
1943 column: &column.name,
1944 })?;
1945 let schema = schema.with_time_index(column.name == create_table.time_index);
1946
1947 column_schemas.push(schema);
1948 let _ = column_name_to_index_map.insert(column.name.clone(), idx);
1949 }
1950
1951 let timestamp_index = column_name_to_index_map
1952 .get(&create_table.time_index)
1953 .cloned();
1954
1955 let raw_schema = RawSchema {
1956 column_schemas: column_schemas.clone(),
1957 timestamp_index,
1958 version: 0,
1959 };
1960
1961 let primary_key_indices = create_table
1962 .primary_keys
1963 .iter()
1964 .map(|name| {
1965 column_name_to_index_map
1966 .get(name)
1967 .cloned()
1968 .context(ColumnNotFoundSnafu { msg: name })
1969 })
1970 .collect::<Result<Vec<_>>>()?;
1971
1972 let partition_key_indices = partition_columns
1973 .into_iter()
1974 .map(|col_name| {
1975 column_name_to_index_map
1976 .get(&col_name)
1977 .cloned()
1978 .context(ColumnNotFoundSnafu { msg: col_name })
1979 })
1980 .collect::<Result<Vec<_>>>()?;
1981
1982 let table_options = TableOptions::try_from_iter(&create_table.table_options)
1983 .context(UnrecognizedTableOptionSnafu)?;
1984
1985 let meta = RawTableMeta {
1986 schema: raw_schema,
1987 primary_key_indices,
1988 value_indices: vec![],
1989 engine: create_table.engine.clone(),
1990 next_column_id: column_schemas.len() as u32,
1991 options: table_options,
1992 created_on: Utc::now(),
1993 updated_on: Utc::now(),
1994 partition_key_indices,
1995 column_ids: vec![],
1996 };
1997
1998 let desc = if create_table.desc.is_empty() {
1999 create_table.table_options.get(COMMENT_KEY).cloned()
2000 } else {
2001 Some(create_table.desc.clone())
2002 };
2003
2004 let table_info = RawTableInfo {
2005 ident: metadata::TableIdent {
2006 table_id: 0,
2008 version: 0,
2009 },
2010 name: create_table.table_name.clone(),
2011 desc,
2012 catalog_name: create_table.catalog_name.clone(),
2013 schema_name: create_table.schema_name.clone(),
2014 meta,
2015 table_type: TableType::Base,
2016 };
2017 Ok(table_info)
2018}
2019
2020fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
2021 let columns = if let Some(partitions) = partitions {
2022 partitions
2023 .column_list
2024 .iter()
2025 .map(|x| x.value.clone())
2026 .collect::<Vec<_>>()
2027 } else {
2028 vec![]
2029 };
2030 Ok(columns)
2031}
2032
2033fn find_partition_entries(
2037 create_table: &CreateTableExpr,
2038 partitions: &Option<Partitions>,
2039 partition_columns: &[String],
2040 query_ctx: &QueryContextRef,
2041) -> Result<Vec<PartitionExpr>> {
2042 let Some(partitions) = partitions else {
2043 return Ok(vec![]);
2044 };
2045
2046 let column_name_and_type = partition_columns
2048 .iter()
2049 .map(|pc| {
2050 let column = create_table
2051 .column_defs
2052 .iter()
2053 .find(|c| &c.name == pc)
2054 .unwrap();
2056 let column_name = &column.name;
2057 let data_type = ConcreteDataType::from(
2058 ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
2059 .context(ColumnDataTypeSnafu)?,
2060 );
2061 Ok((column_name, data_type))
2062 })
2063 .collect::<Result<HashMap<_, _>>>()?;
2064
2065 let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
2067 for partition in &partitions.exprs {
2068 let partition_expr =
2069 convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
2070 partition_exprs.push(partition_expr);
2071 }
2072
2073 Ok(partition_exprs)
2074}
2075
2076fn convert_one_expr(
2077 expr: &Expr,
2078 column_name_and_type: &HashMap<&String, ConcreteDataType>,
2079 timezone: &Timezone,
2080) -> Result<PartitionExpr> {
2081 let Expr::BinaryOp { left, op, right } = expr else {
2082 return InvalidPartitionRuleSnafu {
2083 reason: "partition rule must be a binary expression",
2084 }
2085 .fail();
2086 };
2087
2088 let op =
2089 RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
2090 reason: format!("unsupported operator in partition expr {op}"),
2091 })?;
2092
2093 let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
2095 (Expr::Identifier(ident), Expr::Value(value)) => {
2097 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2098 let value = convert_value(&value.value, data_type, timezone, None)?;
2099 (Operand::Column(column_name), op, Operand::Value(value))
2100 }
2101 (Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
2102 if let Expr::Value(v) = &**expr =>
2103 {
2104 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2105 let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
2106 (Operand::Column(column_name), op, Operand::Value(value))
2107 }
2108 (Expr::Value(value), Expr::Identifier(ident)) => {
2110 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2111 let value = convert_value(&value.value, data_type, timezone, None)?;
2112 (Operand::Value(value), op, Operand::Column(column_name))
2113 }
2114 (Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
2115 if let Expr::Value(v) = &**expr =>
2116 {
2117 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2118 let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
2119 (Operand::Value(value), op, Operand::Column(column_name))
2120 }
2121 (Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
2122 let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
2124 let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
2125 (Operand::Expr(lhs), op, Operand::Expr(rhs))
2126 }
2127 _ => {
2128 return InvalidPartitionRuleSnafu {
2129 reason: format!("invalid partition expr {expr}"),
2130 }
2131 .fail();
2132 }
2133 };
2134
2135 Ok(PartitionExpr::new(lhs, op, rhs))
2136}
2137
2138fn convert_identifier(
2139 ident: &Ident,
2140 column_name_and_type: &HashMap<&String, ConcreteDataType>,
2141) -> Result<(String, ConcreteDataType)> {
2142 let column_name = ident.value.clone();
2143 let data_type = column_name_and_type
2144 .get(&column_name)
2145 .cloned()
2146 .with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
2147 Ok((column_name, data_type))
2148}
2149
2150fn convert_value(
2151 value: &ParserValue,
2152 data_type: ConcreteDataType,
2153 timezone: &Timezone,
2154 unary_op: Option<UnaryOperator>,
2155) -> Result<Value> {
2156 sql_value_to_value(
2157 &ColumnSchema::new("<NONAME>", data_type, true),
2158 value,
2159 Some(timezone),
2160 unary_op,
2161 false,
2162 )
2163 .context(error::SqlCommonSnafu)
2164}
2165
2166#[cfg(test)]
2167mod test {
2168 use session::context::{QueryContext, QueryContextBuilder};
2169 use sql::dialect::GreptimeDbDialect;
2170 use sql::parser::{ParseOptions, ParserContext};
2171 use sql::statements::statement::Statement;
2172
2173 use super::*;
2174 use crate::expr_helper;
2175
2176 #[test]
2177 fn test_name_is_match() {
2178 assert!(!NAME_PATTERN_REG.is_match("/adaf"));
2179 assert!(!NAME_PATTERN_REG.is_match("🈲"));
2180 assert!(NAME_PATTERN_REG.is_match("hello"));
2181 assert!(NAME_PATTERN_REG.is_match("test@"));
2182 assert!(!NAME_PATTERN_REG.is_match("@test"));
2183 assert!(NAME_PATTERN_REG.is_match("test#"));
2184 assert!(!NAME_PATTERN_REG.is_match("#test"));
2185 assert!(!NAME_PATTERN_REG.is_match("@"));
2186 assert!(!NAME_PATTERN_REG.is_match("#"));
2187 }
2188
2189 #[tokio::test]
2190 #[ignore = "TODO(ruihang): WIP new partition rule"]
2191 async fn test_parse_partitions() {
2192 common_telemetry::init_default_ut_logging();
2193 let cases = [
2194 (
2195 r"
2196CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
2197PARTITION ON COLUMNS (b) (
2198 b < 'hz',
2199 b >= 'hz' AND b < 'sh',
2200 b >= 'sh'
2201)
2202ENGINE=mito",
2203 r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
2204 ),
2205 (
2206 r"
2207CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
2208PARTITION BY RANGE COLUMNS (b, a) (
2209 PARTITION r0 VALUES LESS THAN ('hz', 10),
2210 b < 'hz' AND a < 10,
2211 b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20,
2212 b >= 'sh' AND a >= 20
2213)
2214ENGINE=mito",
2215 r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
2216 ),
2217 ];
2218 let ctx = QueryContextBuilder::default().build().into();
2219 for (sql, expected) in cases {
2220 let result = ParserContext::create_with_dialect(
2221 sql,
2222 &GreptimeDbDialect {},
2223 ParseOptions::default(),
2224 )
2225 .unwrap();
2226 match &result[0] {
2227 Statement::CreateTable(c) => {
2228 let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
2229 let (partitions, _) =
2230 parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
2231 let json = serde_json::to_string(&partitions).unwrap();
2232 assert_eq!(json, expected);
2233 }
2234 _ => unreachable!(),
2235 }
2236 }
2237 }
2238}