1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use api::helper::ColumnDataTypeWrapper;
19use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
20use api::v1::{
21 AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, column_def,
22};
23#[cfg(feature = "enterprise")]
24use api::v1::{
25 CreateTriggerExpr as PbCreateTriggerExpr, meta::CreateTriggerTask as PbCreateTriggerTask,
26};
27use catalog::CatalogManagerRef;
28use chrono::Utc;
29use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, is_readonly_schema};
30use common_catalog::{format_full_flow_name, format_full_table_name};
31use common_error::ext::BoxedError;
32use common_meta::cache_invalidator::Context;
33use common_meta::ddl::create_flow::FlowType;
34use common_meta::instruction::CacheIdent;
35use common_meta::key::NAME_PATTERN;
36use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
37use common_meta::procedure_executor::ExecutorContext;
38#[cfg(feature = "enterprise")]
39use common_meta::rpc::ddl::trigger::CreateTriggerTask;
40#[cfg(feature = "enterprise")]
41use common_meta::rpc::ddl::trigger::DropTriggerTask;
42use common_meta::rpc::ddl::{
43 CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
44 SubmitDdlTaskResponse,
45};
46use common_query::Output;
47use common_sql::convert::sql_value_to_value;
48use common_telemetry::{debug, info, tracing, warn};
49use common_time::{Timestamp, Timezone};
50use datafusion_common::tree_node::TreeNodeVisitor;
51use datafusion_expr::LogicalPlan;
52use datatypes::prelude::ConcreteDataType;
53use datatypes::schema::{RawSchema, Schema};
54use datatypes::value::Value;
55use lazy_static::lazy_static;
56use partition::expr::{Operand, PartitionExpr, RestrictedOp};
57use partition::multi_dim::MultiDimPartitionRule;
58use query::parser::QueryStatement;
59use query::plan::extract_and_rewrite_full_table_names;
60use query::query_engine::DefaultSerializer;
61use query::sql::create_table_stmt;
62use regex::Regex;
63use session::context::QueryContextRef;
64use session::table_name::table_idents_to_full_name;
65use snafu::{OptionExt, ResultExt, ensure};
66use sql::parser::{ParseOptions, ParserContext};
67#[cfg(feature = "enterprise")]
68use sql::statements::alter::trigger::AlterTrigger;
69use sql::statements::alter::{AlterDatabase, AlterTable, AlterTableOperation};
70#[cfg(feature = "enterprise")]
71use sql::statements::create::trigger::CreateTrigger;
72use sql::statements::create::{
73 CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
74};
75use sql::statements::statement::Statement;
76use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
77use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
78use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
79use table::TableRef;
80use table::dist_table::DistTable;
81use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
82use table::requests::{AlterKind, AlterTableRequest, COMMENT_KEY, TableOptions};
83use table::table_name::TableName;
84
85use crate::error::{
86 self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
87 ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
88 EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu,
89 InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu, InvalidTableNameSnafu,
90 InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu, PartitionExprToPbSnafu, Result,
91 SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu,
92 TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
93 UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
94};
95use crate::expr_helper;
96use crate::statement::StatementExecutor;
97use crate::statement::show::create_partitions_stmt;
98
99lazy_static! {
100 pub static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap();
101}
102
103impl StatementExecutor {
104 pub fn catalog_manager(&self) -> CatalogManagerRef {
105 self.catalog_manager.clone()
106 }
107
108 #[tracing::instrument(skip_all)]
109 pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
110 let (catalog, schema, _table) = table_idents_to_full_name(&stmt.name, &ctx)
111 .map_err(BoxedError::new)
112 .context(error::ExternalSnafu)?;
113
114 let schema_options = self
115 .table_metadata_manager
116 .schema_manager()
117 .get(SchemaNameKey {
118 catalog: &catalog,
119 schema: &schema,
120 })
121 .await
122 .context(TableMetadataManagerSnafu)?
123 .map(|v| v.into_inner());
124
125 let create_expr = &mut expr_helper::create_to_expr(&stmt, &ctx)?;
126 if let Some(schema_options) = schema_options {
129 for (key, value) in schema_options.extra_options.iter() {
130 create_expr
131 .table_options
132 .entry(key.clone())
133 .or_insert(value.clone());
134 }
135 }
136
137 self.create_table_inner(create_expr, stmt.partitions, ctx)
138 .await
139 }
140
141 #[tracing::instrument(skip_all)]
142 pub async fn create_table_like(
143 &self,
144 stmt: CreateTableLike,
145 ctx: QueryContextRef,
146 ) -> Result<TableRef> {
147 let (catalog, schema, table) = table_idents_to_full_name(&stmt.source_name, &ctx)
148 .map_err(BoxedError::new)
149 .context(error::ExternalSnafu)?;
150 let table_ref = self
151 .catalog_manager
152 .table(&catalog, &schema, &table, Some(&ctx))
153 .await
154 .context(CatalogSnafu)?
155 .context(TableNotFoundSnafu { table_name: &table })?;
156 let partitions = self
157 .partition_manager
158 .find_table_partitions(table_ref.table_info().table_id())
159 .await
160 .context(error::FindTablePartitionRuleSnafu { table_name: table })?;
161
162 let schema_options = self
164 .table_metadata_manager
165 .schema_manager()
166 .get(SchemaNameKey {
167 catalog: &catalog,
168 schema: &schema,
169 })
170 .await
171 .context(TableMetadataManagerSnafu)?
172 .map(|v| v.into_inner());
173
174 let quote_style = ctx.quote_style();
175 let mut create_stmt =
176 create_table_stmt(&table_ref.table_info(), schema_options, quote_style)
177 .context(error::ParseQuerySnafu)?;
178 create_stmt.name = stmt.table_name;
179 create_stmt.if_not_exists = false;
180
181 let table_info = table_ref.table_info();
182 let partitions =
183 create_partitions_stmt(&table_info, partitions)?.and_then(|mut partitions| {
184 if !partitions.column_list.is_empty() {
185 partitions.set_quote(quote_style);
186 Some(partitions)
187 } else {
188 None
189 }
190 });
191
192 let create_expr = &mut expr_helper::create_to_expr(&create_stmt, &ctx)?;
193 self.create_table_inner(create_expr, partitions, ctx).await
194 }
195
196 #[tracing::instrument(skip_all)]
197 pub async fn create_external_table(
198 &self,
199 create_expr: CreateExternalTable,
200 ctx: QueryContextRef,
201 ) -> Result<TableRef> {
202 let create_expr = &mut expr_helper::create_external_expr(create_expr, &ctx).await?;
203 self.create_table_inner(create_expr, None, ctx).await
204 }
205
206 #[tracing::instrument(skip_all)]
207 pub async fn create_table_inner(
208 &self,
209 create_table: &mut CreateTableExpr,
210 partitions: Option<Partitions>,
211 query_ctx: QueryContextRef,
212 ) -> Result<TableRef> {
213 ensure!(
214 !is_readonly_schema(&create_table.schema_name),
215 SchemaReadOnlySnafu {
216 name: create_table.schema_name.clone()
217 }
218 );
219
220 if create_table.engine == METRIC_ENGINE_NAME
221 && create_table
222 .table_options
223 .contains_key(LOGICAL_TABLE_METADATA_KEY)
224 {
225 ensure!(
227 partitions.is_none(),
228 InvalidPartitionRuleSnafu {
229 reason: "logical table in metric engine should not have partition rule, it will be inherited from physical table",
230 }
231 );
232 self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
233 .await?
234 .into_iter()
235 .next()
236 .context(error::UnexpectedSnafu {
237 violated: "expected to create logical tables",
238 })
239 } else {
240 self.create_non_logic_table(create_table, partitions, query_ctx)
242 .await
243 }
244 }
245
246 #[tracing::instrument(skip_all)]
247 pub async fn create_non_logic_table(
248 &self,
249 create_table: &mut CreateTableExpr,
250 partitions: Option<Partitions>,
251 query_ctx: QueryContextRef,
252 ) -> Result<TableRef> {
253 let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
254
255 let schema = self
257 .table_metadata_manager
258 .schema_manager()
259 .get(SchemaNameKey::new(
260 &create_table.catalog_name,
261 &create_table.schema_name,
262 ))
263 .await
264 .context(TableMetadataManagerSnafu)?;
265 ensure!(
266 schema.is_some(),
267 SchemaNotFoundSnafu {
268 schema_info: &create_table.schema_name,
269 }
270 );
271
272 if let Some(table) = self
274 .catalog_manager
275 .table(
276 &create_table.catalog_name,
277 &create_table.schema_name,
278 &create_table.table_name,
279 Some(&query_ctx),
280 )
281 .await
282 .context(CatalogSnafu)?
283 {
284 return if create_table.create_if_not_exists {
285 Ok(table)
286 } else {
287 TableAlreadyExistsSnafu {
288 table: format_full_table_name(
289 &create_table.catalog_name,
290 &create_table.schema_name,
291 &create_table.table_name,
292 ),
293 }
294 .fail()
295 };
296 }
297
298 ensure!(
299 NAME_PATTERN_REG.is_match(&create_table.table_name),
300 InvalidTableNameSnafu {
301 table_name: &create_table.table_name,
302 }
303 );
304
305 let table_name = TableName::new(
306 &create_table.catalog_name,
307 &create_table.schema_name,
308 &create_table.table_name,
309 );
310
311 let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
312 let mut table_info = create_table_info(create_table, partition_cols)?;
313
314 let resp = self
315 .create_table_procedure(
316 create_table.clone(),
317 partitions,
318 table_info.clone(),
319 query_ctx,
320 )
321 .await?;
322
323 let table_id = resp
324 .table_ids
325 .into_iter()
326 .next()
327 .context(error::UnexpectedSnafu {
328 violated: "expected table_id",
329 })?;
330 info!("Successfully created table '{table_name}' with table id {table_id}");
331
332 table_info.ident.table_id = table_id;
333
334 let table_info: Arc<TableInfo> =
335 Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?);
336 create_table.table_id = Some(api::v1::TableId { id: table_id });
337
338 let table = DistTable::table(table_info);
339
340 Ok(table)
341 }
342
343 #[tracing::instrument(skip_all)]
344 pub async fn create_logical_tables(
345 &self,
346 create_table_exprs: &[CreateTableExpr],
347 query_context: QueryContextRef,
348 ) -> Result<Vec<TableRef>> {
349 let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
350 ensure!(
351 !create_table_exprs.is_empty(),
352 EmptyDdlExprSnafu {
353 name: "create logic tables"
354 }
355 );
356
357 for create_table in create_table_exprs {
359 ensure!(
360 NAME_PATTERN_REG.is_match(&create_table.table_name),
361 InvalidTableNameSnafu {
362 table_name: &create_table.table_name,
363 }
364 );
365 }
366
367 let mut raw_tables_info = create_table_exprs
368 .iter()
369 .map(|create| create_table_info(create, vec![]))
370 .collect::<Result<Vec<_>>>()?;
371 let tables_data = create_table_exprs
372 .iter()
373 .cloned()
374 .zip(raw_tables_info.iter().cloned())
375 .collect::<Vec<_>>();
376
377 let resp = self
378 .create_logical_tables_procedure(tables_data, query_context)
379 .await?;
380
381 let table_ids = resp.table_ids;
382 ensure!(
383 table_ids.len() == raw_tables_info.len(),
384 CreateLogicalTablesSnafu {
385 reason: format!(
386 "The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}",
387 raw_tables_info.len(),
388 table_ids.len()
389 )
390 }
391 );
392 info!("Successfully created logical tables: {:?}", table_ids);
393
394 for (i, table_info) in raw_tables_info.iter_mut().enumerate() {
395 table_info.ident.table_id = table_ids[i];
396 }
397 let tables_info = raw_tables_info
398 .into_iter()
399 .map(|x| x.try_into().context(CreateTableInfoSnafu))
400 .collect::<Result<Vec<_>>>()?;
401
402 Ok(tables_info
403 .into_iter()
404 .map(|x| DistTable::table(Arc::new(x)))
405 .collect())
406 }
407
408 #[cfg(feature = "enterprise")]
409 #[tracing::instrument(skip_all)]
410 pub async fn create_trigger(
411 &self,
412 stmt: CreateTrigger,
413 query_context: QueryContextRef,
414 ) -> Result<Output> {
415 let expr = expr_helper::to_create_trigger_task_expr(stmt, &query_context)?;
416 self.create_trigger_inner(expr, query_context).await
417 }
418
419 #[cfg(feature = "enterprise")]
420 pub async fn create_trigger_inner(
421 &self,
422 expr: PbCreateTriggerExpr,
423 query_context: QueryContextRef,
424 ) -> Result<Output> {
425 self.create_trigger_procedure(expr, query_context).await?;
426 Ok(Output::new_with_affected_rows(0))
427 }
428
429 #[cfg(feature = "enterprise")]
430 async fn create_trigger_procedure(
431 &self,
432 expr: PbCreateTriggerExpr,
433 query_context: QueryContextRef,
434 ) -> Result<SubmitDdlTaskResponse> {
435 let task = CreateTriggerTask::try_from(PbCreateTriggerTask {
436 create_trigger: Some(expr),
437 })
438 .context(error::InvalidExprSnafu)?;
439
440 let request = SubmitDdlTaskRequest {
441 query_context,
442 task: DdlTask::new_create_trigger(task),
443 };
444
445 self.procedure_executor
446 .submit_ddl_task(&ExecutorContext::default(), request)
447 .await
448 .context(error::ExecuteDdlSnafu)
449 }
450
451 #[tracing::instrument(skip_all)]
452 pub async fn create_flow(
453 &self,
454 stmt: CreateFlow,
455 query_context: QueryContextRef,
456 ) -> Result<Output> {
457 let expr = expr_helper::to_create_flow_task_expr(stmt, &query_context)?;
459
460 self.create_flow_inner(expr, query_context).await
461 }
462
463 pub async fn create_flow_inner(
464 &self,
465 expr: CreateFlowExpr,
466 query_context: QueryContextRef,
467 ) -> Result<Output> {
468 self.create_flow_procedure(expr, query_context).await?;
469 Ok(Output::new_with_affected_rows(0))
470 }
471
472 async fn create_flow_procedure(
473 &self,
474 expr: CreateFlowExpr,
475 query_context: QueryContextRef,
476 ) -> Result<SubmitDdlTaskResponse> {
477 let flow_type = self
478 .determine_flow_type(&expr, query_context.clone())
479 .await?;
480 info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
481
482 let expr = {
483 let mut expr = expr;
484 expr.flow_options
485 .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
486 expr
487 };
488
489 let task = CreateFlowTask::try_from(PbCreateFlowTask {
490 create_flow: Some(expr),
491 })
492 .context(error::InvalidExprSnafu)?;
493 let request = SubmitDdlTaskRequest {
494 query_context,
495 task: DdlTask::new_create_flow(task),
496 };
497
498 self.procedure_executor
499 .submit_ddl_task(&ExecutorContext::default(), request)
500 .await
501 .context(error::ExecuteDdlSnafu)
502 }
503
504 async fn determine_flow_type(
508 &self,
509 expr: &CreateFlowExpr,
510 query_ctx: QueryContextRef,
511 ) -> Result<FlowType> {
512 for src_table_name in &expr.source_table_names {
514 let table = self
515 .catalog_manager()
516 .table(
517 &src_table_name.catalog_name,
518 &src_table_name.schema_name,
519 &src_table_name.table_name,
520 Some(&query_ctx),
521 )
522 .await
523 .map_err(BoxedError::new)
524 .context(ExternalSnafu)?
525 .with_context(|| TableNotFoundSnafu {
526 table_name: format_full_table_name(
527 &src_table_name.catalog_name,
528 &src_table_name.schema_name,
529 &src_table_name.table_name,
530 ),
531 })?;
532
533 if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
535 warn!(
536 "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
537 format_full_table_name(
538 &src_table_name.catalog_name,
539 &src_table_name.schema_name,
540 &src_table_name.table_name
541 ),
542 expr.flow_name
543 );
544 return Ok(FlowType::Streaming);
545 }
546 }
547
548 let engine = &self.query_engine;
549 let stmts = ParserContext::create_with_dialect(
550 &expr.sql,
551 query_ctx.sql_dialect(),
552 ParseOptions::default(),
553 )
554 .map_err(BoxedError::new)
555 .context(ExternalSnafu)?;
556
557 ensure!(
558 stmts.len() == 1,
559 InvalidSqlSnafu {
560 err_msg: format!("Expect only one statement, found {}", stmts.len())
561 }
562 );
563 let stmt = &stmts[0];
564
565 let plan = match stmt {
567 Statement::Tql(_) => return Ok(FlowType::Batching),
569 _ => engine
570 .planner()
571 .plan(&QueryStatement::Sql(stmt.clone()), query_ctx)
572 .await
573 .map_err(BoxedError::new)
574 .context(ExternalSnafu)?,
575 };
576
577 struct FindAggr {
579 is_aggr: bool,
580 }
581
582 impl TreeNodeVisitor<'_> for FindAggr {
583 type Node = LogicalPlan;
584 fn f_down(
585 &mut self,
586 node: &Self::Node,
587 ) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
588 {
589 match node {
590 LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
591 self.is_aggr = true;
592 return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
593 }
594 _ => (),
595 }
596 Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
597 }
598 }
599
600 let mut find_aggr = FindAggr { is_aggr: false };
601
602 plan.visit_with_subqueries(&mut find_aggr)
603 .context(BuildDfLogicalPlanSnafu)?;
604 if find_aggr.is_aggr {
605 Ok(FlowType::Batching)
606 } else {
607 Ok(FlowType::Streaming)
608 }
609 }
610
611 #[tracing::instrument(skip_all)]
612 pub async fn create_view(
613 &self,
614 create_view: CreateView,
615 ctx: QueryContextRef,
616 ) -> Result<TableRef> {
617 let logical_plan = match &*create_view.query {
619 Statement::Query(query) => {
620 self.plan(
621 &QueryStatement::Sql(Statement::Query(query.clone())),
622 ctx.clone(),
623 )
624 .await?
625 }
626 Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?,
627 _ => {
628 return InvalidViewStmtSnafu {}.fail();
629 }
630 };
631 let definition = create_view.to_string();
633
634 let schema: Schema = logical_plan
637 .schema()
638 .clone()
639 .try_into()
640 .context(ConvertSchemaSnafu)?;
641 let plan_columns: Vec<_> = schema
642 .column_schemas()
643 .iter()
644 .map(|c| c.name.clone())
645 .collect();
646
647 let columns: Vec<_> = create_view
648 .columns
649 .iter()
650 .map(|ident| ident.to_string())
651 .collect();
652
653 if !columns.is_empty() {
655 ensure!(
656 columns.len() == plan_columns.len(),
657 error::ViewColumnsMismatchSnafu {
658 view_name: create_view.name.to_string(),
659 expected: plan_columns.len(),
660 actual: columns.len(),
661 }
662 );
663 }
664
665 let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
668 .context(ExtractTableNamesSnafu)?;
669
670 let table_names = table_names.into_iter().map(|t| t.into()).collect();
671
672 let encoded_plan = DFLogicalSubstraitConvertor
679 .encode(&plan, DefaultSerializer)
680 .context(SubstraitCodecSnafu)?;
681
682 let expr = expr_helper::to_create_view_expr(
683 create_view,
684 encoded_plan.to_vec(),
685 table_names,
686 columns,
687 plan_columns,
688 definition,
689 ctx.clone(),
690 )?;
691
692 self.create_view_by_expr(expr, ctx).await
694 }
695
696 pub async fn create_view_by_expr(
697 &self,
698 expr: CreateViewExpr,
699 ctx: QueryContextRef,
700 ) -> Result<TableRef> {
701 ensure! {
702 !(expr.create_if_not_exists & expr.or_replace),
703 InvalidSqlSnafu {
704 err_msg: "syntax error Create Or Replace and If Not Exist cannot be used together",
705 }
706 };
707 let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
708
709 let schema_exists = self
710 .table_metadata_manager
711 .schema_manager()
712 .exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name))
713 .await
714 .context(TableMetadataManagerSnafu)?;
715
716 ensure!(
717 schema_exists,
718 SchemaNotFoundSnafu {
719 schema_info: &expr.schema_name,
720 }
721 );
722
723 if let Some(table) = self
725 .catalog_manager
726 .table(
727 &expr.catalog_name,
728 &expr.schema_name,
729 &expr.view_name,
730 Some(&ctx),
731 )
732 .await
733 .context(CatalogSnafu)?
734 {
735 let table_type = table.table_info().table_type;
736
737 match (table_type, expr.create_if_not_exists, expr.or_replace) {
738 (TableType::View, true, false) => {
739 return Ok(table);
740 }
741 (TableType::View, false, false) => {
742 return ViewAlreadyExistsSnafu {
743 name: format_full_table_name(
744 &expr.catalog_name,
745 &expr.schema_name,
746 &expr.view_name,
747 ),
748 }
749 .fail();
750 }
751 (TableType::View, _, true) => {
752 }
754 _ => {
755 return TableAlreadyExistsSnafu {
756 table: format_full_table_name(
757 &expr.catalog_name,
758 &expr.schema_name,
759 &expr.view_name,
760 ),
761 }
762 .fail();
763 }
764 }
765 }
766
767 ensure!(
768 NAME_PATTERN_REG.is_match(&expr.view_name),
769 InvalidViewNameSnafu {
770 name: expr.view_name.clone(),
771 }
772 );
773
774 let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
775
776 let mut view_info = RawTableInfo {
777 ident: metadata::TableIdent {
778 table_id: 0,
780 version: 0,
781 },
782 name: expr.view_name.clone(),
783 desc: None,
784 catalog_name: expr.catalog_name.clone(),
785 schema_name: expr.schema_name.clone(),
786 meta: RawTableMeta::default(),
788 table_type: TableType::View,
789 };
790
791 let request = SubmitDdlTaskRequest {
792 query_context: ctx,
793 task: DdlTask::new_create_view(expr, view_info.clone()),
794 };
795
796 let resp = self
797 .procedure_executor
798 .submit_ddl_task(&ExecutorContext::default(), request)
799 .await
800 .context(error::ExecuteDdlSnafu)?;
801
802 debug!(
803 "Submit creating view '{view_name}' task response: {:?}",
804 resp
805 );
806
807 let view_id = resp
808 .table_ids
809 .into_iter()
810 .next()
811 .context(error::UnexpectedSnafu {
812 violated: "expected table_id",
813 })?;
814 info!("Successfully created view '{view_name}' with view id {view_id}");
815
816 view_info.ident.table_id = view_id;
817
818 let view_info = Arc::new(view_info.try_into().context(CreateTableInfoSnafu)?);
819
820 let table = DistTable::table(view_info);
821
822 self.cache_invalidator
824 .invalidate(
825 &Context::default(),
826 &[
827 CacheIdent::TableId(view_id),
828 CacheIdent::TableName(view_name.clone()),
829 ],
830 )
831 .await
832 .context(error::InvalidateTableCacheSnafu)?;
833
834 Ok(table)
835 }
836
837 #[tracing::instrument(skip_all)]
838 pub async fn drop_flow(
839 &self,
840 catalog_name: String,
841 flow_name: String,
842 drop_if_exists: bool,
843 query_context: QueryContextRef,
844 ) -> Result<Output> {
845 if let Some(flow) = self
846 .flow_metadata_manager
847 .flow_name_manager()
848 .get(&catalog_name, &flow_name)
849 .await
850 .context(error::TableMetadataManagerSnafu)?
851 {
852 let flow_id = flow.flow_id();
853 let task = DropFlowTask {
854 catalog_name,
855 flow_name,
856 flow_id,
857 drop_if_exists,
858 };
859 self.drop_flow_procedure(task, query_context).await?;
860
861 Ok(Output::new_with_affected_rows(0))
862 } else if drop_if_exists {
863 Ok(Output::new_with_affected_rows(0))
864 } else {
865 FlowNotFoundSnafu {
866 flow_name: format_full_flow_name(&catalog_name, &flow_name),
867 }
868 .fail()
869 }
870 }
871
872 async fn drop_flow_procedure(
873 &self,
874 expr: DropFlowTask,
875 query_context: QueryContextRef,
876 ) -> Result<SubmitDdlTaskResponse> {
877 let request = SubmitDdlTaskRequest {
878 query_context,
879 task: DdlTask::new_drop_flow(expr),
880 };
881
882 self.procedure_executor
883 .submit_ddl_task(&ExecutorContext::default(), request)
884 .await
885 .context(error::ExecuteDdlSnafu)
886 }
887
888 #[cfg(feature = "enterprise")]
889 #[tracing::instrument(skip_all)]
890 pub(super) async fn drop_trigger(
891 &self,
892 catalog_name: String,
893 trigger_name: String,
894 drop_if_exists: bool,
895 query_context: QueryContextRef,
896 ) -> Result<Output> {
897 let task = DropTriggerTask {
898 catalog_name,
899 trigger_name,
900 drop_if_exists,
901 };
902 self.drop_trigger_procedure(task, query_context).await?;
903 Ok(Output::new_with_affected_rows(0))
904 }
905
906 #[cfg(feature = "enterprise")]
907 async fn drop_trigger_procedure(
908 &self,
909 expr: DropTriggerTask,
910 query_context: QueryContextRef,
911 ) -> Result<SubmitDdlTaskResponse> {
912 let request = SubmitDdlTaskRequest {
913 query_context,
914 task: DdlTask::new_drop_trigger(expr),
915 };
916
917 self.procedure_executor
918 .submit_ddl_task(&ExecutorContext::default(), request)
919 .await
920 .context(error::ExecuteDdlSnafu)
921 }
922
923 #[tracing::instrument(skip_all)]
925 pub(crate) async fn drop_view(
926 &self,
927 catalog: String,
928 schema: String,
929 view: String,
930 drop_if_exists: bool,
931 query_context: QueryContextRef,
932 ) -> Result<Output> {
933 let view_info = if let Some(view) = self
934 .catalog_manager
935 .table(&catalog, &schema, &view, None)
936 .await
937 .context(CatalogSnafu)?
938 {
939 view.table_info()
940 } else if drop_if_exists {
941 return Ok(Output::new_with_affected_rows(0));
943 } else {
944 return TableNotFoundSnafu {
945 table_name: format_full_table_name(&catalog, &schema, &view),
946 }
947 .fail();
948 };
949
950 ensure!(
952 view_info.table_type == TableType::View,
953 error::InvalidViewSnafu {
954 msg: "not a view",
955 view_name: format_full_table_name(&catalog, &schema, &view),
956 }
957 );
958
959 let view_id = view_info.table_id();
960
961 let task = DropViewTask {
962 catalog,
963 schema,
964 view,
965 view_id,
966 drop_if_exists,
967 };
968
969 self.drop_view_procedure(task, query_context).await?;
970
971 Ok(Output::new_with_affected_rows(0))
972 }
973
974 async fn drop_view_procedure(
976 &self,
977 expr: DropViewTask,
978 query_context: QueryContextRef,
979 ) -> Result<SubmitDdlTaskResponse> {
980 let request = SubmitDdlTaskRequest {
981 query_context,
982 task: DdlTask::new_drop_view(expr),
983 };
984
985 self.procedure_executor
986 .submit_ddl_task(&ExecutorContext::default(), request)
987 .await
988 .context(error::ExecuteDdlSnafu)
989 }
990
991 #[tracing::instrument(skip_all)]
992 pub async fn alter_logical_tables(
993 &self,
994 alter_table_exprs: Vec<AlterTableExpr>,
995 query_context: QueryContextRef,
996 ) -> Result<Output> {
997 let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
998 ensure!(
999 !alter_table_exprs.is_empty(),
1000 EmptyDdlExprSnafu {
1001 name: "alter logical tables"
1002 }
1003 );
1004
1005 let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
1007 for expr in alter_table_exprs {
1008 let catalog = if expr.catalog_name.is_empty() {
1010 query_context.current_catalog()
1011 } else {
1012 &expr.catalog_name
1013 };
1014 let schema = if expr.schema_name.is_empty() {
1015 query_context.current_schema()
1016 } else {
1017 expr.schema_name.clone()
1018 };
1019 let table_name = &expr.table_name;
1020 let table = self
1021 .catalog_manager
1022 .table(catalog, &schema, table_name, Some(&query_context))
1023 .await
1024 .context(CatalogSnafu)?
1025 .with_context(|| TableNotFoundSnafu {
1026 table_name: format_full_table_name(catalog, &schema, table_name),
1027 })?;
1028 let table_id = table.table_info().ident.table_id;
1029 let physical_table_id = self
1030 .table_metadata_manager
1031 .table_route_manager()
1032 .get_physical_table_id(table_id)
1033 .await
1034 .context(TableMetadataManagerSnafu)?;
1035 groups.entry(physical_table_id).or_default().push(expr);
1036 }
1037
1038 let mut handles = Vec::with_capacity(groups.len());
1040 for (_physical_table_id, exprs) in groups {
1041 let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
1042 handles.push(fut);
1043 }
1044 let _results = futures::future::try_join_all(handles).await?;
1045
1046 Ok(Output::new_with_affected_rows(0))
1047 }
1048
1049 #[tracing::instrument(skip_all)]
1050 pub async fn drop_table(
1051 &self,
1052 table_name: TableName,
1053 drop_if_exists: bool,
1054 query_context: QueryContextRef,
1055 ) -> Result<Output> {
1056 self.drop_tables(&[table_name], drop_if_exists, query_context)
1058 .await
1059 }
1060
1061 #[tracing::instrument(skip_all)]
1062 pub async fn drop_tables(
1063 &self,
1064 table_names: &[TableName],
1065 drop_if_exists: bool,
1066 query_context: QueryContextRef,
1067 ) -> Result<Output> {
1068 let mut tables = Vec::with_capacity(table_names.len());
1069 for table_name in table_names {
1070 ensure!(
1071 !is_readonly_schema(&table_name.schema_name),
1072 SchemaReadOnlySnafu {
1073 name: table_name.schema_name.clone()
1074 }
1075 );
1076
1077 if let Some(table) = self
1078 .catalog_manager
1079 .table(
1080 &table_name.catalog_name,
1081 &table_name.schema_name,
1082 &table_name.table_name,
1083 Some(&query_context),
1084 )
1085 .await
1086 .context(CatalogSnafu)?
1087 {
1088 tables.push(table.table_info().table_id());
1089 } else if drop_if_exists {
1090 continue;
1092 } else {
1093 return TableNotFoundSnafu {
1094 table_name: table_name.to_string(),
1095 }
1096 .fail();
1097 }
1098 }
1099
1100 for (table_name, table_id) in table_names.iter().zip(tables.into_iter()) {
1101 self.drop_table_procedure(table_name, table_id, drop_if_exists, query_context.clone())
1102 .await?;
1103
1104 self.cache_invalidator
1106 .invalidate(
1107 &Context::default(),
1108 &[
1109 CacheIdent::TableId(table_id),
1110 CacheIdent::TableName(table_name.clone()),
1111 ],
1112 )
1113 .await
1114 .context(error::InvalidateTableCacheSnafu)?;
1115 }
1116 Ok(Output::new_with_affected_rows(0))
1117 }
1118
1119 #[tracing::instrument(skip_all)]
1120 pub async fn drop_database(
1121 &self,
1122 catalog: String,
1123 schema: String,
1124 drop_if_exists: bool,
1125 query_context: QueryContextRef,
1126 ) -> Result<Output> {
1127 ensure!(
1128 !is_readonly_schema(&schema),
1129 SchemaReadOnlySnafu { name: schema }
1130 );
1131
1132 if self
1133 .catalog_manager
1134 .schema_exists(&catalog, &schema, None)
1135 .await
1136 .context(CatalogSnafu)?
1137 {
1138 if schema == query_context.current_schema() {
1139 SchemaInUseSnafu { name: schema }.fail()
1140 } else {
1141 self.drop_database_procedure(catalog, schema, drop_if_exists, query_context)
1142 .await?;
1143
1144 Ok(Output::new_with_affected_rows(0))
1145 }
1146 } else if drop_if_exists {
1147 Ok(Output::new_with_affected_rows(0))
1149 } else {
1150 SchemaNotFoundSnafu {
1151 schema_info: schema,
1152 }
1153 .fail()
1154 }
1155 }
1156
1157 #[tracing::instrument(skip_all)]
1158 pub async fn truncate_table(
1159 &self,
1160 table_name: TableName,
1161 time_ranges: Vec<(Timestamp, Timestamp)>,
1162 query_context: QueryContextRef,
1163 ) -> Result<Output> {
1164 ensure!(
1165 !is_readonly_schema(&table_name.schema_name),
1166 SchemaReadOnlySnafu {
1167 name: table_name.schema_name.clone()
1168 }
1169 );
1170
1171 let table = self
1172 .catalog_manager
1173 .table(
1174 &table_name.catalog_name,
1175 &table_name.schema_name,
1176 &table_name.table_name,
1177 Some(&query_context),
1178 )
1179 .await
1180 .context(CatalogSnafu)?
1181 .with_context(|| TableNotFoundSnafu {
1182 table_name: table_name.to_string(),
1183 })?;
1184 let table_id = table.table_info().table_id();
1185 self.truncate_table_procedure(&table_name, table_id, time_ranges, query_context)
1186 .await?;
1187
1188 Ok(Output::new_with_affected_rows(0))
1189 }
1190
1191 #[tracing::instrument(skip_all)]
1192 pub async fn alter_table(
1193 &self,
1194 alter_table: AlterTable,
1195 query_context: QueryContextRef,
1196 ) -> Result<Output> {
1197 if matches!(
1198 alter_table.alter_operation(),
1199 AlterTableOperation::Repartition { .. }
1200 ) {
1201 let _request = expr_helper::to_repartition_request(alter_table, &query_context)?;
1202 return NotSupportedSnafu {
1203 feat: "ALTER TABLE REPARTITION",
1204 }
1205 .fail();
1206 }
1207
1208 let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
1209 self.alter_table_inner(expr, query_context).await
1210 }
1211
1212 #[tracing::instrument(skip_all)]
1213 pub async fn alter_table_inner(
1214 &self,
1215 expr: AlterTableExpr,
1216 query_context: QueryContextRef,
1217 ) -> Result<Output> {
1218 ensure!(
1219 !is_readonly_schema(&expr.schema_name),
1220 SchemaReadOnlySnafu {
1221 name: expr.schema_name.clone()
1222 }
1223 );
1224
1225 let catalog_name = if expr.catalog_name.is_empty() {
1226 DEFAULT_CATALOG_NAME.to_string()
1227 } else {
1228 expr.catalog_name.clone()
1229 };
1230
1231 let schema_name = if expr.schema_name.is_empty() {
1232 DEFAULT_SCHEMA_NAME.to_string()
1233 } else {
1234 expr.schema_name.clone()
1235 };
1236
1237 let table_name = expr.table_name.clone();
1238
1239 let table = self
1240 .catalog_manager
1241 .table(
1242 &catalog_name,
1243 &schema_name,
1244 &table_name,
1245 Some(&query_context),
1246 )
1247 .await
1248 .context(CatalogSnafu)?
1249 .with_context(|| TableNotFoundSnafu {
1250 table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
1251 })?;
1252
1253 let table_id = table.table_info().ident.table_id;
1254 let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?;
1255 if !need_alter {
1256 return Ok(Output::new_with_affected_rows(0));
1257 }
1258 info!(
1259 "Table info before alter is {:?}, expr: {:?}",
1260 table.table_info(),
1261 expr
1262 );
1263
1264 let physical_table_id = self
1265 .table_metadata_manager
1266 .table_route_manager()
1267 .get_physical_table_id(table_id)
1268 .await
1269 .context(TableMetadataManagerSnafu)?;
1270
1271 let (req, invalidate_keys) = if physical_table_id == table_id {
1272 let req = SubmitDdlTaskRequest {
1274 query_context,
1275 task: DdlTask::new_alter_table(expr),
1276 };
1277
1278 let invalidate_keys = vec![
1279 CacheIdent::TableId(table_id),
1280 CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1281 ];
1282
1283 (req, invalidate_keys)
1284 } else {
1285 let req = SubmitDdlTaskRequest {
1287 query_context,
1288 task: DdlTask::new_alter_logical_tables(vec![expr]),
1289 };
1290
1291 let mut invalidate_keys = vec![
1292 CacheIdent::TableId(physical_table_id),
1293 CacheIdent::TableId(table_id),
1294 CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1295 ];
1296
1297 let physical_table = self
1298 .table_metadata_manager
1299 .table_info_manager()
1300 .get(physical_table_id)
1301 .await
1302 .context(TableMetadataManagerSnafu)?
1303 .map(|x| x.into_inner());
1304 if let Some(physical_table) = physical_table {
1305 let physical_table_name = TableName::new(
1306 physical_table.table_info.catalog_name,
1307 physical_table.table_info.schema_name,
1308 physical_table.table_info.name,
1309 );
1310 invalidate_keys.push(CacheIdent::TableName(physical_table_name));
1311 }
1312
1313 (req, invalidate_keys)
1314 };
1315
1316 self.procedure_executor
1317 .submit_ddl_task(&ExecutorContext::default(), req)
1318 .await
1319 .context(error::ExecuteDdlSnafu)?;
1320
1321 self.cache_invalidator
1323 .invalidate(&Context::default(), &invalidate_keys)
1324 .await
1325 .context(error::InvalidateTableCacheSnafu)?;
1326
1327 Ok(Output::new_with_affected_rows(0))
1328 }
1329
1330 #[cfg(feature = "enterprise")]
1331 #[tracing::instrument(skip_all)]
1332 pub async fn alter_trigger(
1333 &self,
1334 _alter_expr: AlterTrigger,
1335 _query_context: QueryContextRef,
1336 ) -> Result<Output> {
1337 crate::error::NotSupportedSnafu {
1338 feat: "alter trigger",
1339 }
1340 .fail()
1341 }
1342
1343 #[tracing::instrument(skip_all)]
1344 pub async fn alter_database(
1345 &self,
1346 alter_expr: AlterDatabase,
1347 query_context: QueryContextRef,
1348 ) -> Result<Output> {
1349 let alter_expr = expr_helper::to_alter_database_expr(alter_expr, &query_context)?;
1350 self.alter_database_inner(alter_expr, query_context).await
1351 }
1352
1353 #[tracing::instrument(skip_all)]
1354 pub async fn alter_database_inner(
1355 &self,
1356 alter_expr: AlterDatabaseExpr,
1357 query_context: QueryContextRef,
1358 ) -> Result<Output> {
1359 ensure!(
1360 !is_readonly_schema(&alter_expr.schema_name),
1361 SchemaReadOnlySnafu {
1362 name: query_context.current_schema().clone()
1363 }
1364 );
1365
1366 let exists = self
1367 .catalog_manager
1368 .schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
1369 .await
1370 .context(CatalogSnafu)?;
1371 ensure!(
1372 exists,
1373 SchemaNotFoundSnafu {
1374 schema_info: alter_expr.schema_name,
1375 }
1376 );
1377
1378 let cache_ident = [CacheIdent::SchemaName(SchemaName {
1379 catalog_name: alter_expr.catalog_name.clone(),
1380 schema_name: alter_expr.schema_name.clone(),
1381 })];
1382
1383 self.alter_database_procedure(alter_expr, query_context)
1384 .await?;
1385
1386 self.cache_invalidator
1388 .invalidate(&Context::default(), &cache_ident)
1389 .await
1390 .context(error::InvalidateTableCacheSnafu)?;
1391
1392 Ok(Output::new_with_affected_rows(0))
1393 }
1394
1395 async fn create_table_procedure(
1396 &self,
1397 create_table: CreateTableExpr,
1398 partitions: Vec<PartitionExpr>,
1399 table_info: RawTableInfo,
1400 query_context: QueryContextRef,
1401 ) -> Result<SubmitDdlTaskResponse> {
1402 let partitions = partitions
1403 .into_iter()
1404 .map(|expr| expr.as_pb_partition().context(PartitionExprToPbSnafu))
1405 .collect::<Result<Vec<_>>>()?;
1406
1407 let request = SubmitDdlTaskRequest {
1408 query_context,
1409 task: DdlTask::new_create_table(create_table, partitions, table_info),
1410 };
1411
1412 self.procedure_executor
1413 .submit_ddl_task(&ExecutorContext::default(), request)
1414 .await
1415 .context(error::ExecuteDdlSnafu)
1416 }
1417
1418 async fn create_logical_tables_procedure(
1419 &self,
1420 tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
1421 query_context: QueryContextRef,
1422 ) -> Result<SubmitDdlTaskResponse> {
1423 let request = SubmitDdlTaskRequest {
1424 query_context,
1425 task: DdlTask::new_create_logical_tables(tables_data),
1426 };
1427
1428 self.procedure_executor
1429 .submit_ddl_task(&ExecutorContext::default(), request)
1430 .await
1431 .context(error::ExecuteDdlSnafu)
1432 }
1433
1434 async fn alter_logical_tables_procedure(
1435 &self,
1436 tables_data: Vec<AlterTableExpr>,
1437 query_context: QueryContextRef,
1438 ) -> Result<SubmitDdlTaskResponse> {
1439 let request = SubmitDdlTaskRequest {
1440 query_context,
1441 task: DdlTask::new_alter_logical_tables(tables_data),
1442 };
1443
1444 self.procedure_executor
1445 .submit_ddl_task(&ExecutorContext::default(), request)
1446 .await
1447 .context(error::ExecuteDdlSnafu)
1448 }
1449
1450 async fn drop_table_procedure(
1451 &self,
1452 table_name: &TableName,
1453 table_id: TableId,
1454 drop_if_exists: bool,
1455 query_context: QueryContextRef,
1456 ) -> Result<SubmitDdlTaskResponse> {
1457 let request = SubmitDdlTaskRequest {
1458 query_context,
1459 task: DdlTask::new_drop_table(
1460 table_name.catalog_name.clone(),
1461 table_name.schema_name.clone(),
1462 table_name.table_name.clone(),
1463 table_id,
1464 drop_if_exists,
1465 ),
1466 };
1467
1468 self.procedure_executor
1469 .submit_ddl_task(&ExecutorContext::default(), request)
1470 .await
1471 .context(error::ExecuteDdlSnafu)
1472 }
1473
1474 async fn drop_database_procedure(
1475 &self,
1476 catalog: String,
1477 schema: String,
1478 drop_if_exists: bool,
1479 query_context: QueryContextRef,
1480 ) -> Result<SubmitDdlTaskResponse> {
1481 let request = SubmitDdlTaskRequest {
1482 query_context,
1483 task: DdlTask::new_drop_database(catalog, schema, drop_if_exists),
1484 };
1485
1486 self.procedure_executor
1487 .submit_ddl_task(&ExecutorContext::default(), request)
1488 .await
1489 .context(error::ExecuteDdlSnafu)
1490 }
1491
1492 async fn alter_database_procedure(
1493 &self,
1494 alter_expr: AlterDatabaseExpr,
1495 query_context: QueryContextRef,
1496 ) -> Result<SubmitDdlTaskResponse> {
1497 let request = SubmitDdlTaskRequest {
1498 query_context,
1499 task: DdlTask::new_alter_database(alter_expr),
1500 };
1501
1502 self.procedure_executor
1503 .submit_ddl_task(&ExecutorContext::default(), request)
1504 .await
1505 .context(error::ExecuteDdlSnafu)
1506 }
1507
1508 async fn truncate_table_procedure(
1509 &self,
1510 table_name: &TableName,
1511 table_id: TableId,
1512 time_ranges: Vec<(Timestamp, Timestamp)>,
1513 query_context: QueryContextRef,
1514 ) -> Result<SubmitDdlTaskResponse> {
1515 let request = SubmitDdlTaskRequest {
1516 query_context,
1517 task: DdlTask::new_truncate_table(
1518 table_name.catalog_name.clone(),
1519 table_name.schema_name.clone(),
1520 table_name.table_name.clone(),
1521 table_id,
1522 time_ranges,
1523 ),
1524 };
1525
1526 self.procedure_executor
1527 .submit_ddl_task(&ExecutorContext::default(), request)
1528 .await
1529 .context(error::ExecuteDdlSnafu)
1530 }
1531
1532 #[tracing::instrument(skip_all)]
1533 pub async fn create_database(
1534 &self,
1535 database: &str,
1536 create_if_not_exists: bool,
1537 options: HashMap<String, String>,
1538 query_context: QueryContextRef,
1539 ) -> Result<Output> {
1540 let catalog = query_context.current_catalog();
1541 ensure!(
1542 NAME_PATTERN_REG.is_match(catalog),
1543 error::UnexpectedSnafu {
1544 violated: format!("Invalid catalog name: {}", catalog)
1545 }
1546 );
1547
1548 ensure!(
1549 NAME_PATTERN_REG.is_match(database),
1550 error::UnexpectedSnafu {
1551 violated: format!("Invalid database name: {}", database)
1552 }
1553 );
1554
1555 if !self
1556 .catalog_manager
1557 .schema_exists(catalog, database, None)
1558 .await
1559 .context(CatalogSnafu)?
1560 && !self.catalog_manager.is_reserved_schema_name(database)
1561 {
1562 self.create_database_procedure(
1563 catalog.to_string(),
1564 database.to_string(),
1565 create_if_not_exists,
1566 options,
1567 query_context,
1568 )
1569 .await?;
1570
1571 Ok(Output::new_with_affected_rows(1))
1572 } else if create_if_not_exists {
1573 Ok(Output::new_with_affected_rows(1))
1574 } else {
1575 error::SchemaExistsSnafu { name: database }.fail()
1576 }
1577 }
1578
1579 async fn create_database_procedure(
1580 &self,
1581 catalog: String,
1582 database: String,
1583 create_if_not_exists: bool,
1584 options: HashMap<String, String>,
1585 query_context: QueryContextRef,
1586 ) -> Result<SubmitDdlTaskResponse> {
1587 let request = SubmitDdlTaskRequest {
1588 query_context,
1589 task: DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
1590 };
1591
1592 self.procedure_executor
1593 .submit_ddl_task(&ExecutorContext::default(), request)
1594 .await
1595 .context(error::ExecuteDdlSnafu)
1596 }
1597}
1598
1599pub fn parse_partitions(
1601 create_table: &CreateTableExpr,
1602 partitions: Option<Partitions>,
1603 query_ctx: &QueryContextRef,
1604) -> Result<(Vec<PartitionExpr>, Vec<String>)> {
1605 let partition_columns = find_partition_columns(&partitions)?;
1608 let partition_exprs =
1609 find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
1610
1611 let exprs = partition_exprs.clone();
1613 MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true)
1614 .context(InvalidPartitionSnafu)?;
1615
1616 Ok((partition_exprs, partition_columns))
1617}
1618
1619pub fn verify_alter(
1625 table_id: TableId,
1626 table_info: Arc<TableInfo>,
1627 expr: AlterTableExpr,
1628) -> Result<bool> {
1629 let request: AlterTableRequest =
1630 common_grpc_expr::alter_expr_to_request(table_id, expr, Some(&table_info.meta))
1631 .context(AlterExprToRequestSnafu)?;
1632
1633 let AlterTableRequest {
1634 table_name,
1635 alter_kind,
1636 ..
1637 } = &request;
1638
1639 if let AlterKind::RenameTable { new_table_name } = alter_kind {
1640 ensure!(
1641 NAME_PATTERN_REG.is_match(new_table_name),
1642 error::UnexpectedSnafu {
1643 violated: format!("Invalid table name: {}", new_table_name)
1644 }
1645 );
1646 } else if let AlterKind::AddColumns { columns } = alter_kind {
1647 let column_names: HashSet<_> = table_info
1650 .meta
1651 .schema
1652 .column_schemas()
1653 .iter()
1654 .map(|schema| &schema.name)
1655 .collect();
1656 if columns.iter().all(|column| {
1657 column_names.contains(&column.column_schema.name) && column.add_if_not_exists
1658 }) {
1659 return Ok(false);
1660 }
1661 }
1662
1663 let _ = table_info
1664 .meta
1665 .builder_with_alter_kind(table_name, &request.alter_kind)
1666 .context(error::TableSnafu)?
1667 .build()
1668 .context(error::BuildTableMetaSnafu { table_name })?;
1669
1670 Ok(true)
1671}
1672
1673pub fn create_table_info(
1674 create_table: &CreateTableExpr,
1675 partition_columns: Vec<String>,
1676) -> Result<RawTableInfo> {
1677 let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
1678 let mut column_name_to_index_map = HashMap::new();
1679
1680 for (idx, column) in create_table.column_defs.iter().enumerate() {
1681 let schema =
1682 column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
1683 column: &column.name,
1684 })?;
1685 let schema = schema.with_time_index(column.name == create_table.time_index);
1686
1687 column_schemas.push(schema);
1688 let _ = column_name_to_index_map.insert(column.name.clone(), idx);
1689 }
1690
1691 let timestamp_index = column_name_to_index_map
1692 .get(&create_table.time_index)
1693 .cloned();
1694
1695 let raw_schema = RawSchema {
1696 column_schemas: column_schemas.clone(),
1697 timestamp_index,
1698 version: 0,
1699 };
1700
1701 let primary_key_indices = create_table
1702 .primary_keys
1703 .iter()
1704 .map(|name| {
1705 column_name_to_index_map
1706 .get(name)
1707 .cloned()
1708 .context(ColumnNotFoundSnafu { msg: name })
1709 })
1710 .collect::<Result<Vec<_>>>()?;
1711
1712 let partition_key_indices = partition_columns
1713 .into_iter()
1714 .map(|col_name| {
1715 column_name_to_index_map
1716 .get(&col_name)
1717 .cloned()
1718 .context(ColumnNotFoundSnafu { msg: col_name })
1719 })
1720 .collect::<Result<Vec<_>>>()?;
1721
1722 let table_options = TableOptions::try_from_iter(&create_table.table_options)
1723 .context(UnrecognizedTableOptionSnafu)?;
1724
1725 let meta = RawTableMeta {
1726 schema: raw_schema,
1727 primary_key_indices,
1728 value_indices: vec![],
1729 engine: create_table.engine.clone(),
1730 next_column_id: column_schemas.len() as u32,
1731 region_numbers: vec![],
1732 options: table_options,
1733 created_on: Utc::now(),
1734 partition_key_indices,
1735 column_ids: vec![],
1736 };
1737
1738 let desc = if create_table.desc.is_empty() {
1739 create_table.table_options.get(COMMENT_KEY).cloned()
1740 } else {
1741 Some(create_table.desc.clone())
1742 };
1743
1744 let table_info = RawTableInfo {
1745 ident: metadata::TableIdent {
1746 table_id: 0,
1748 version: 0,
1749 },
1750 name: create_table.table_name.clone(),
1751 desc,
1752 catalog_name: create_table.catalog_name.clone(),
1753 schema_name: create_table.schema_name.clone(),
1754 meta,
1755 table_type: TableType::Base,
1756 };
1757 Ok(table_info)
1758}
1759
1760fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
1761 let columns = if let Some(partitions) = partitions {
1762 partitions
1763 .column_list
1764 .iter()
1765 .map(|x| x.value.clone())
1766 .collect::<Vec<_>>()
1767 } else {
1768 vec![]
1769 };
1770 Ok(columns)
1771}
1772
1773fn find_partition_entries(
1777 create_table: &CreateTableExpr,
1778 partitions: &Option<Partitions>,
1779 partition_columns: &[String],
1780 query_ctx: &QueryContextRef,
1781) -> Result<Vec<PartitionExpr>> {
1782 let Some(partitions) = partitions else {
1783 return Ok(vec![]);
1784 };
1785
1786 let column_name_and_type = partition_columns
1788 .iter()
1789 .map(|pc| {
1790 let column = create_table
1791 .column_defs
1792 .iter()
1793 .find(|c| &c.name == pc)
1794 .unwrap();
1796 let column_name = &column.name;
1797 let data_type = ConcreteDataType::from(
1798 ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
1799 .context(ColumnDataTypeSnafu)?,
1800 );
1801 Ok((column_name, data_type))
1802 })
1803 .collect::<Result<HashMap<_, _>>>()?;
1804
1805 let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
1807 for partition in &partitions.exprs {
1808 let partition_expr =
1809 convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
1810 partition_exprs.push(partition_expr);
1811 }
1812
1813 Ok(partition_exprs)
1814}
1815
1816fn convert_one_expr(
1817 expr: &Expr,
1818 column_name_and_type: &HashMap<&String, ConcreteDataType>,
1819 timezone: &Timezone,
1820) -> Result<PartitionExpr> {
1821 let Expr::BinaryOp { left, op, right } = expr else {
1822 return InvalidPartitionRuleSnafu {
1823 reason: "partition rule must be a binary expression",
1824 }
1825 .fail();
1826 };
1827
1828 let op =
1829 RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
1830 reason: format!("unsupported operator in partition expr {op}"),
1831 })?;
1832
1833 let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
1835 (Expr::Identifier(ident), Expr::Value(value)) => {
1837 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1838 let value = convert_value(&value.value, data_type, timezone, None)?;
1839 (Operand::Column(column_name), op, Operand::Value(value))
1840 }
1841 (Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
1842 if let Expr::Value(v) = &**expr =>
1843 {
1844 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1845 let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
1846 (Operand::Column(column_name), op, Operand::Value(value))
1847 }
1848 (Expr::Value(value), Expr::Identifier(ident)) => {
1850 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1851 let value = convert_value(&value.value, data_type, timezone, None)?;
1852 (Operand::Value(value), op, Operand::Column(column_name))
1853 }
1854 (Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
1855 if let Expr::Value(v) = &**expr =>
1856 {
1857 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1858 let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
1859 (Operand::Value(value), op, Operand::Column(column_name))
1860 }
1861 (Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
1862 let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
1864 let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
1865 (Operand::Expr(lhs), op, Operand::Expr(rhs))
1866 }
1867 _ => {
1868 return InvalidPartitionRuleSnafu {
1869 reason: format!("invalid partition expr {expr}"),
1870 }
1871 .fail();
1872 }
1873 };
1874
1875 Ok(PartitionExpr::new(lhs, op, rhs))
1876}
1877
1878fn convert_identifier(
1879 ident: &Ident,
1880 column_name_and_type: &HashMap<&String, ConcreteDataType>,
1881) -> Result<(String, ConcreteDataType)> {
1882 let column_name = ident.value.clone();
1883 let data_type = column_name_and_type
1884 .get(&column_name)
1885 .cloned()
1886 .with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
1887 Ok((column_name, data_type))
1888}
1889
1890fn convert_value(
1891 value: &ParserValue,
1892 data_type: ConcreteDataType,
1893 timezone: &Timezone,
1894 unary_op: Option<UnaryOperator>,
1895) -> Result<Value> {
1896 sql_value_to_value(
1897 "<NONAME>",
1898 &data_type,
1899 value,
1900 Some(timezone),
1901 unary_op,
1902 false,
1903 )
1904 .context(error::SqlCommonSnafu)
1905}
1906
1907#[cfg(test)]
1908mod test {
1909 use session::context::{QueryContext, QueryContextBuilder};
1910 use sql::dialect::GreptimeDbDialect;
1911 use sql::parser::{ParseOptions, ParserContext};
1912 use sql::statements::statement::Statement;
1913
1914 use super::*;
1915 use crate::expr_helper;
1916
1917 #[test]
1918 fn test_name_is_match() {
1919 assert!(!NAME_PATTERN_REG.is_match("/adaf"));
1920 assert!(!NAME_PATTERN_REG.is_match("🈲"));
1921 assert!(NAME_PATTERN_REG.is_match("hello"));
1922 assert!(NAME_PATTERN_REG.is_match("test@"));
1923 assert!(!NAME_PATTERN_REG.is_match("@test"));
1924 assert!(NAME_PATTERN_REG.is_match("test#"));
1925 assert!(!NAME_PATTERN_REG.is_match("#test"));
1926 assert!(!NAME_PATTERN_REG.is_match("@"));
1927 assert!(!NAME_PATTERN_REG.is_match("#"));
1928 }
1929
1930 #[tokio::test]
1931 #[ignore = "TODO(ruihang): WIP new partition rule"]
1932 async fn test_parse_partitions() {
1933 common_telemetry::init_default_ut_logging();
1934 let cases = [
1935 (
1936 r"
1937CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
1938PARTITION ON COLUMNS (b) (
1939 b < 'hz',
1940 b >= 'hz' AND b < 'sh',
1941 b >= 'sh'
1942)
1943ENGINE=mito",
1944 r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
1945 ),
1946 (
1947 r"
1948CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
1949PARTITION BY RANGE COLUMNS (b, a) (
1950 PARTITION r0 VALUES LESS THAN ('hz', 10),
1951 b < 'hz' AND a < 10,
1952 b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20,
1953 b >= 'sh' AND a >= 20
1954)
1955ENGINE=mito",
1956 r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
1957 ),
1958 ];
1959 let ctx = QueryContextBuilder::default().build().into();
1960 for (sql, expected) in cases {
1961 let result = ParserContext::create_with_dialect(
1962 sql,
1963 &GreptimeDbDialect {},
1964 ParseOptions::default(),
1965 )
1966 .unwrap();
1967 match &result[0] {
1968 Statement::CreateTable(c) => {
1969 let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
1970 let (partitions, _) =
1971 parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
1972 let json = serde_json::to_string(&partitions).unwrap();
1973 assert_eq!(json, expected);
1974 }
1975 _ => unreachable!(),
1976 }
1977 }
1978 }
1979}