1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use api::helper::ColumnDataTypeWrapper;
19use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
20use api::v1::{
21 column_def, AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
22};
23#[cfg(feature = "enterprise")]
24use api::v1::{
25 meta::CreateTriggerTask as PbCreateTriggerTask, CreateTriggerExpr as PbCreateTriggerExpr,
26};
27use catalog::CatalogManagerRef;
28use chrono::Utc;
29use common_catalog::consts::{is_readonly_schema, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
30use common_catalog::{format_full_flow_name, format_full_table_name};
31use common_error::ext::BoxedError;
32use common_meta::cache_invalidator::Context;
33use common_meta::ddl::create_flow::FlowType;
34use common_meta::instruction::CacheIdent;
35use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
36use common_meta::key::NAME_PATTERN;
37use common_meta::procedure_executor::ExecutorContext;
38#[cfg(feature = "enterprise")]
39use common_meta::rpc::ddl::trigger::CreateTriggerTask;
40#[cfg(feature = "enterprise")]
41use common_meta::rpc::ddl::trigger::DropTriggerTask;
42use common_meta::rpc::ddl::{
43 CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
44 SubmitDdlTaskResponse,
45};
46use common_query::Output;
47use common_sql::convert::sql_value_to_value;
48use common_telemetry::{debug, info, tracing, warn};
49use common_time::{Timestamp, Timezone};
50use datafusion_common::tree_node::TreeNodeVisitor;
51use datafusion_expr::LogicalPlan;
52use datatypes::prelude::ConcreteDataType;
53use datatypes::schema::{RawSchema, Schema};
54use datatypes::value::Value;
55use lazy_static::lazy_static;
56use partition::expr::{Operand, PartitionExpr, RestrictedOp};
57use partition::multi_dim::MultiDimPartitionRule;
58use query::parser::QueryStatement;
59use query::plan::extract_and_rewrite_full_table_names;
60use query::query_engine::DefaultSerializer;
61use query::sql::create_table_stmt;
62use regex::Regex;
63use session::context::QueryContextRef;
64use session::table_name::table_idents_to_full_name;
65use snafu::{ensure, OptionExt, ResultExt};
66use sql::parser::{ParseOptions, ParserContext};
67#[cfg(feature = "enterprise")]
68use sql::statements::alter::trigger::AlterTrigger;
69use sql::statements::alter::{AlterDatabase, AlterTable};
70#[cfg(feature = "enterprise")]
71use sql::statements::create::trigger::CreateTrigger;
72use sql::statements::create::{
73 CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
74};
75use sql::statements::statement::Statement;
76use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
77use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
78use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
79use table::dist_table::DistTable;
80use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
81use table::requests::{AlterKind, AlterTableRequest, TableOptions, COMMENT_KEY};
82use table::table_name::TableName;
83use table::TableRef;
84
85use crate::error::{
86 self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
87 ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
88 EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu,
89 InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu, InvalidTableNameSnafu,
90 InvalidViewNameSnafu, InvalidViewStmtSnafu, PartitionExprToPbSnafu, Result, SchemaInUseSnafu,
91 SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu,
92 TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
93 ViewAlreadyExistsSnafu,
94};
95use crate::expr_helper;
96use crate::statement::show::create_partitions_stmt;
97use crate::statement::StatementExecutor;
98
99lazy_static! {
100 pub static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap();
101}
102
103impl StatementExecutor {
104 pub fn catalog_manager(&self) -> CatalogManagerRef {
105 self.catalog_manager.clone()
106 }
107
108 #[tracing::instrument(skip_all)]
109 pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
110 let (catalog, schema, _table) = table_idents_to_full_name(&stmt.name, &ctx)
111 .map_err(BoxedError::new)
112 .context(error::ExternalSnafu)?;
113
114 let schema_options = self
115 .table_metadata_manager
116 .schema_manager()
117 .get(SchemaNameKey {
118 catalog: &catalog,
119 schema: &schema,
120 })
121 .await
122 .context(TableMetadataManagerSnafu)?
123 .map(|v| v.into_inner());
124
125 let create_expr = &mut expr_helper::create_to_expr(&stmt, &ctx)?;
126 if let Some(schema_options) = schema_options {
129 for (key, value) in schema_options.extra_options.iter() {
130 create_expr
131 .table_options
132 .entry(key.clone())
133 .or_insert(value.clone());
134 }
135 }
136
137 self.create_table_inner(create_expr, stmt.partitions, ctx)
138 .await
139 }
140
141 #[tracing::instrument(skip_all)]
142 pub async fn create_table_like(
143 &self,
144 stmt: CreateTableLike,
145 ctx: QueryContextRef,
146 ) -> Result<TableRef> {
147 let (catalog, schema, table) = table_idents_to_full_name(&stmt.source_name, &ctx)
148 .map_err(BoxedError::new)
149 .context(error::ExternalSnafu)?;
150 let table_ref = self
151 .catalog_manager
152 .table(&catalog, &schema, &table, Some(&ctx))
153 .await
154 .context(CatalogSnafu)?
155 .context(TableNotFoundSnafu { table_name: &table })?;
156 let partitions = self
157 .partition_manager
158 .find_table_partitions(table_ref.table_info().table_id())
159 .await
160 .context(error::FindTablePartitionRuleSnafu { table_name: table })?;
161
162 let schema_options = self
164 .table_metadata_manager
165 .schema_manager()
166 .get(SchemaNameKey {
167 catalog: &catalog,
168 schema: &schema,
169 })
170 .await
171 .context(TableMetadataManagerSnafu)?
172 .map(|v| v.into_inner());
173
174 let quote_style = ctx.quote_style();
175 let mut create_stmt =
176 create_table_stmt(&table_ref.table_info(), schema_options, quote_style)
177 .context(error::ParseQuerySnafu)?;
178 create_stmt.name = stmt.table_name;
179 create_stmt.if_not_exists = false;
180
181 let table_info = table_ref.table_info();
182 let partitions =
183 create_partitions_stmt(&table_info, partitions)?.and_then(|mut partitions| {
184 if !partitions.column_list.is_empty() {
185 partitions.set_quote(quote_style);
186 Some(partitions)
187 } else {
188 None
189 }
190 });
191
192 let create_expr = &mut expr_helper::create_to_expr(&create_stmt, &ctx)?;
193 self.create_table_inner(create_expr, partitions, ctx).await
194 }
195
196 #[tracing::instrument(skip_all)]
197 pub async fn create_external_table(
198 &self,
199 create_expr: CreateExternalTable,
200 ctx: QueryContextRef,
201 ) -> Result<TableRef> {
202 let create_expr = &mut expr_helper::create_external_expr(create_expr, &ctx).await?;
203 self.create_table_inner(create_expr, None, ctx).await
204 }
205
206 #[tracing::instrument(skip_all)]
207 pub async fn create_table_inner(
208 &self,
209 create_table: &mut CreateTableExpr,
210 partitions: Option<Partitions>,
211 query_ctx: QueryContextRef,
212 ) -> Result<TableRef> {
213 ensure!(
214 !is_readonly_schema(&create_table.schema_name),
215 SchemaReadOnlySnafu {
216 name: create_table.schema_name.clone()
217 }
218 );
219
220 if create_table.engine == METRIC_ENGINE_NAME
221 && create_table
222 .table_options
223 .contains_key(LOGICAL_TABLE_METADATA_KEY)
224 {
225 ensure!(
227 partitions.is_none(),
228 InvalidPartitionRuleSnafu {
229 reason: "logical table in metric engine should not have partition rule, it will be inherited from physical table",
230 }
231 );
232 self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
233 .await?
234 .into_iter()
235 .next()
236 .context(error::UnexpectedSnafu {
237 violated: "expected to create logical tables",
238 })
239 } else {
240 self.create_non_logic_table(create_table, partitions, query_ctx)
242 .await
243 }
244 }
245
246 #[tracing::instrument(skip_all)]
247 pub async fn create_non_logic_table(
248 &self,
249 create_table: &mut CreateTableExpr,
250 partitions: Option<Partitions>,
251 query_ctx: QueryContextRef,
252 ) -> Result<TableRef> {
253 let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
254
255 let schema = self
257 .table_metadata_manager
258 .schema_manager()
259 .get(SchemaNameKey::new(
260 &create_table.catalog_name,
261 &create_table.schema_name,
262 ))
263 .await
264 .context(TableMetadataManagerSnafu)?;
265 ensure!(
266 schema.is_some(),
267 SchemaNotFoundSnafu {
268 schema_info: &create_table.schema_name,
269 }
270 );
271
272 if let Some(table) = self
274 .catalog_manager
275 .table(
276 &create_table.catalog_name,
277 &create_table.schema_name,
278 &create_table.table_name,
279 Some(&query_ctx),
280 )
281 .await
282 .context(CatalogSnafu)?
283 {
284 return if create_table.create_if_not_exists {
285 Ok(table)
286 } else {
287 TableAlreadyExistsSnafu {
288 table: format_full_table_name(
289 &create_table.catalog_name,
290 &create_table.schema_name,
291 &create_table.table_name,
292 ),
293 }
294 .fail()
295 };
296 }
297
298 ensure!(
299 NAME_PATTERN_REG.is_match(&create_table.table_name),
300 InvalidTableNameSnafu {
301 table_name: &create_table.table_name,
302 }
303 );
304
305 let table_name = TableName::new(
306 &create_table.catalog_name,
307 &create_table.schema_name,
308 &create_table.table_name,
309 );
310
311 let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
312 let mut table_info = create_table_info(create_table, partition_cols)?;
313
314 let resp = self
315 .create_table_procedure(
316 create_table.clone(),
317 partitions,
318 table_info.clone(),
319 query_ctx,
320 )
321 .await?;
322
323 let table_id = resp
324 .table_ids
325 .into_iter()
326 .next()
327 .context(error::UnexpectedSnafu {
328 violated: "expected table_id",
329 })?;
330 info!("Successfully created table '{table_name}' with table id {table_id}");
331
332 table_info.ident.table_id = table_id;
333
334 let table_info: Arc<TableInfo> =
335 Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?);
336 create_table.table_id = Some(api::v1::TableId { id: table_id });
337
338 let table = DistTable::table(table_info);
339
340 Ok(table)
341 }
342
343 #[tracing::instrument(skip_all)]
344 pub async fn create_logical_tables(
345 &self,
346 create_table_exprs: &[CreateTableExpr],
347 query_context: QueryContextRef,
348 ) -> Result<Vec<TableRef>> {
349 let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
350 ensure!(
351 !create_table_exprs.is_empty(),
352 EmptyDdlExprSnafu {
353 name: "create logic tables"
354 }
355 );
356
357 for create_table in create_table_exprs {
359 ensure!(
360 NAME_PATTERN_REG.is_match(&create_table.table_name),
361 InvalidTableNameSnafu {
362 table_name: &create_table.table_name,
363 }
364 );
365 }
366
367 let mut raw_tables_info = create_table_exprs
368 .iter()
369 .map(|create| create_table_info(create, vec![]))
370 .collect::<Result<Vec<_>>>()?;
371 let tables_data = create_table_exprs
372 .iter()
373 .cloned()
374 .zip(raw_tables_info.iter().cloned())
375 .collect::<Vec<_>>();
376
377 let resp = self
378 .create_logical_tables_procedure(tables_data, query_context)
379 .await?;
380
381 let table_ids = resp.table_ids;
382 ensure!(table_ids.len() == raw_tables_info.len(), CreateLogicalTablesSnafu {
383 reason: format!("The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}", raw_tables_info.len(), table_ids.len())
384 });
385 info!("Successfully created logical tables: {:?}", table_ids);
386
387 for (i, table_info) in raw_tables_info.iter_mut().enumerate() {
388 table_info.ident.table_id = table_ids[i];
389 }
390 let tables_info = raw_tables_info
391 .into_iter()
392 .map(|x| x.try_into().context(CreateTableInfoSnafu))
393 .collect::<Result<Vec<_>>>()?;
394
395 Ok(tables_info
396 .into_iter()
397 .map(|x| DistTable::table(Arc::new(x)))
398 .collect())
399 }
400
401 #[cfg(feature = "enterprise")]
402 #[tracing::instrument(skip_all)]
403 pub async fn create_trigger(
404 &self,
405 stmt: CreateTrigger,
406 query_context: QueryContextRef,
407 ) -> Result<Output> {
408 let expr = expr_helper::to_create_trigger_task_expr(stmt, &query_context)?;
409 self.create_trigger_inner(expr, query_context).await
410 }
411
412 #[cfg(feature = "enterprise")]
413 pub async fn create_trigger_inner(
414 &self,
415 expr: PbCreateTriggerExpr,
416 query_context: QueryContextRef,
417 ) -> Result<Output> {
418 self.create_trigger_procedure(expr, query_context).await?;
419 Ok(Output::new_with_affected_rows(0))
420 }
421
422 #[cfg(feature = "enterprise")]
423 async fn create_trigger_procedure(
424 &self,
425 expr: PbCreateTriggerExpr,
426 query_context: QueryContextRef,
427 ) -> Result<SubmitDdlTaskResponse> {
428 let task = CreateTriggerTask::try_from(PbCreateTriggerTask {
429 create_trigger: Some(expr),
430 })
431 .context(error::InvalidExprSnafu)?;
432
433 let request = SubmitDdlTaskRequest {
434 query_context,
435 task: DdlTask::new_create_trigger(task),
436 };
437
438 self.procedure_executor
439 .submit_ddl_task(&ExecutorContext::default(), request)
440 .await
441 .context(error::ExecuteDdlSnafu)
442 }
443
444 #[tracing::instrument(skip_all)]
445 pub async fn create_flow(
446 &self,
447 stmt: CreateFlow,
448 query_context: QueryContextRef,
449 ) -> Result<Output> {
450 let expr = expr_helper::to_create_flow_task_expr(stmt, &query_context)?;
452
453 self.create_flow_inner(expr, query_context).await
454 }
455
456 pub async fn create_flow_inner(
457 &self,
458 expr: CreateFlowExpr,
459 query_context: QueryContextRef,
460 ) -> Result<Output> {
461 self.create_flow_procedure(expr, query_context).await?;
462 Ok(Output::new_with_affected_rows(0))
463 }
464
465 async fn create_flow_procedure(
466 &self,
467 expr: CreateFlowExpr,
468 query_context: QueryContextRef,
469 ) -> Result<SubmitDdlTaskResponse> {
470 let flow_type = self
471 .determine_flow_type(&expr, query_context.clone())
472 .await?;
473 info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
474
475 let expr = {
476 let mut expr = expr;
477 expr.flow_options
478 .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
479 expr
480 };
481
482 let task = CreateFlowTask::try_from(PbCreateFlowTask {
483 create_flow: Some(expr),
484 })
485 .context(error::InvalidExprSnafu)?;
486 let request = SubmitDdlTaskRequest {
487 query_context,
488 task: DdlTask::new_create_flow(task),
489 };
490
491 self.procedure_executor
492 .submit_ddl_task(&ExecutorContext::default(), request)
493 .await
494 .context(error::ExecuteDdlSnafu)
495 }
496
497 async fn determine_flow_type(
501 &self,
502 expr: &CreateFlowExpr,
503 query_ctx: QueryContextRef,
504 ) -> Result<FlowType> {
505 for src_table_name in &expr.source_table_names {
507 let table = self
508 .catalog_manager()
509 .table(
510 &src_table_name.catalog_name,
511 &src_table_name.schema_name,
512 &src_table_name.table_name,
513 Some(&query_ctx),
514 )
515 .await
516 .map_err(BoxedError::new)
517 .context(ExternalSnafu)?
518 .with_context(|| TableNotFoundSnafu {
519 table_name: format_full_table_name(
520 &src_table_name.catalog_name,
521 &src_table_name.schema_name,
522 &src_table_name.table_name,
523 ),
524 })?;
525
526 if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
528 warn!(
529 "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
530 format_full_table_name(
531 &src_table_name.catalog_name,
532 &src_table_name.schema_name,
533 &src_table_name.table_name
534 ),
535 expr.flow_name
536 );
537 return Ok(FlowType::Streaming);
538 }
539 }
540
541 let engine = &self.query_engine;
542 let stmts = ParserContext::create_with_dialect(
543 &expr.sql,
544 query_ctx.sql_dialect(),
545 ParseOptions::default(),
546 )
547 .map_err(BoxedError::new)
548 .context(ExternalSnafu)?;
549
550 ensure!(
551 stmts.len() == 1,
552 InvalidSqlSnafu {
553 err_msg: format!("Expect only one statement, found {}", stmts.len())
554 }
555 );
556 let stmt = &stmts[0];
557
558 let plan = match stmt {
560 Statement::Tql(_) => return Ok(FlowType::Batching),
562 _ => engine
563 .planner()
564 .plan(&QueryStatement::Sql(stmt.clone()), query_ctx)
565 .await
566 .map_err(BoxedError::new)
567 .context(ExternalSnafu)?,
568 };
569
570 struct FindAggr {
572 is_aggr: bool,
573 }
574
575 impl TreeNodeVisitor<'_> for FindAggr {
576 type Node = LogicalPlan;
577 fn f_down(
578 &mut self,
579 node: &Self::Node,
580 ) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
581 {
582 match node {
583 LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
584 self.is_aggr = true;
585 return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
586 }
587 _ => (),
588 }
589 Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
590 }
591 }
592
593 let mut find_aggr = FindAggr { is_aggr: false };
594
595 plan.visit_with_subqueries(&mut find_aggr)
596 .context(BuildDfLogicalPlanSnafu)?;
597 if find_aggr.is_aggr {
598 Ok(FlowType::Batching)
599 } else {
600 Ok(FlowType::Streaming)
601 }
602 }
603
604 #[tracing::instrument(skip_all)]
605 pub async fn create_view(
606 &self,
607 create_view: CreateView,
608 ctx: QueryContextRef,
609 ) -> Result<TableRef> {
610 let logical_plan = match &*create_view.query {
612 Statement::Query(query) => {
613 self.plan(
614 &QueryStatement::Sql(Statement::Query(query.clone())),
615 ctx.clone(),
616 )
617 .await?
618 }
619 Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?,
620 _ => {
621 return InvalidViewStmtSnafu {}.fail();
622 }
623 };
624 let definition = create_view.to_string();
626
627 let schema: Schema = logical_plan
630 .schema()
631 .clone()
632 .try_into()
633 .context(ConvertSchemaSnafu)?;
634 let plan_columns: Vec<_> = schema
635 .column_schemas()
636 .iter()
637 .map(|c| c.name.clone())
638 .collect();
639
640 let columns: Vec<_> = create_view
641 .columns
642 .iter()
643 .map(|ident| ident.to_string())
644 .collect();
645
646 if !columns.is_empty() {
648 ensure!(
649 columns.len() == plan_columns.len(),
650 error::ViewColumnsMismatchSnafu {
651 view_name: create_view.name.to_string(),
652 expected: plan_columns.len(),
653 actual: columns.len(),
654 }
655 );
656 }
657
658 let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
661 .context(ExtractTableNamesSnafu)?;
662
663 let table_names = table_names.into_iter().map(|t| t.into()).collect();
664
665 let encoded_plan = DFLogicalSubstraitConvertor
672 .encode(&plan, DefaultSerializer)
673 .context(SubstraitCodecSnafu)?;
674
675 let expr = expr_helper::to_create_view_expr(
676 create_view,
677 encoded_plan.to_vec(),
678 table_names,
679 columns,
680 plan_columns,
681 definition,
682 ctx.clone(),
683 )?;
684
685 self.create_view_by_expr(expr, ctx).await
687 }
688
689 pub async fn create_view_by_expr(
690 &self,
691 expr: CreateViewExpr,
692 ctx: QueryContextRef,
693 ) -> Result<TableRef> {
694 ensure! {
695 !(expr.create_if_not_exists & expr.or_replace),
696 InvalidSqlSnafu {
697 err_msg: "syntax error Create Or Replace and If Not Exist cannot be used together",
698 }
699 };
700 let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
701
702 let schema_exists = self
703 .table_metadata_manager
704 .schema_manager()
705 .exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name))
706 .await
707 .context(TableMetadataManagerSnafu)?;
708
709 ensure!(
710 schema_exists,
711 SchemaNotFoundSnafu {
712 schema_info: &expr.schema_name,
713 }
714 );
715
716 if let Some(table) = self
718 .catalog_manager
719 .table(
720 &expr.catalog_name,
721 &expr.schema_name,
722 &expr.view_name,
723 Some(&ctx),
724 )
725 .await
726 .context(CatalogSnafu)?
727 {
728 let table_type = table.table_info().table_type;
729
730 match (table_type, expr.create_if_not_exists, expr.or_replace) {
731 (TableType::View, true, false) => {
732 return Ok(table);
733 }
734 (TableType::View, false, false) => {
735 return ViewAlreadyExistsSnafu {
736 name: format_full_table_name(
737 &expr.catalog_name,
738 &expr.schema_name,
739 &expr.view_name,
740 ),
741 }
742 .fail();
743 }
744 (TableType::View, _, true) => {
745 }
747 _ => {
748 return TableAlreadyExistsSnafu {
749 table: format_full_table_name(
750 &expr.catalog_name,
751 &expr.schema_name,
752 &expr.view_name,
753 ),
754 }
755 .fail();
756 }
757 }
758 }
759
760 ensure!(
761 NAME_PATTERN_REG.is_match(&expr.view_name),
762 InvalidViewNameSnafu {
763 name: expr.view_name.clone(),
764 }
765 );
766
767 let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
768
769 let mut view_info = RawTableInfo {
770 ident: metadata::TableIdent {
771 table_id: 0,
773 version: 0,
774 },
775 name: expr.view_name.clone(),
776 desc: None,
777 catalog_name: expr.catalog_name.clone(),
778 schema_name: expr.schema_name.clone(),
779 meta: RawTableMeta::default(),
781 table_type: TableType::View,
782 };
783
784 let request = SubmitDdlTaskRequest {
785 query_context: ctx,
786 task: DdlTask::new_create_view(expr, view_info.clone()),
787 };
788
789 let resp = self
790 .procedure_executor
791 .submit_ddl_task(&ExecutorContext::default(), request)
792 .await
793 .context(error::ExecuteDdlSnafu)?;
794
795 debug!(
796 "Submit creating view '{view_name}' task response: {:?}",
797 resp
798 );
799
800 let view_id = resp
801 .table_ids
802 .into_iter()
803 .next()
804 .context(error::UnexpectedSnafu {
805 violated: "expected table_id",
806 })?;
807 info!("Successfully created view '{view_name}' with view id {view_id}");
808
809 view_info.ident.table_id = view_id;
810
811 let view_info = Arc::new(view_info.try_into().context(CreateTableInfoSnafu)?);
812
813 let table = DistTable::table(view_info);
814
815 self.cache_invalidator
817 .invalidate(
818 &Context::default(),
819 &[
820 CacheIdent::TableId(view_id),
821 CacheIdent::TableName(view_name.clone()),
822 ],
823 )
824 .await
825 .context(error::InvalidateTableCacheSnafu)?;
826
827 Ok(table)
828 }
829
830 #[tracing::instrument(skip_all)]
831 pub async fn drop_flow(
832 &self,
833 catalog_name: String,
834 flow_name: String,
835 drop_if_exists: bool,
836 query_context: QueryContextRef,
837 ) -> Result<Output> {
838 if let Some(flow) = self
839 .flow_metadata_manager
840 .flow_name_manager()
841 .get(&catalog_name, &flow_name)
842 .await
843 .context(error::TableMetadataManagerSnafu)?
844 {
845 let flow_id = flow.flow_id();
846 let task = DropFlowTask {
847 catalog_name,
848 flow_name,
849 flow_id,
850 drop_if_exists,
851 };
852 self.drop_flow_procedure(task, query_context).await?;
853
854 Ok(Output::new_with_affected_rows(0))
855 } else if drop_if_exists {
856 Ok(Output::new_with_affected_rows(0))
857 } else {
858 FlowNotFoundSnafu {
859 flow_name: format_full_flow_name(&catalog_name, &flow_name),
860 }
861 .fail()
862 }
863 }
864
865 async fn drop_flow_procedure(
866 &self,
867 expr: DropFlowTask,
868 query_context: QueryContextRef,
869 ) -> Result<SubmitDdlTaskResponse> {
870 let request = SubmitDdlTaskRequest {
871 query_context,
872 task: DdlTask::new_drop_flow(expr),
873 };
874
875 self.procedure_executor
876 .submit_ddl_task(&ExecutorContext::default(), request)
877 .await
878 .context(error::ExecuteDdlSnafu)
879 }
880
881 #[cfg(feature = "enterprise")]
882 #[tracing::instrument(skip_all)]
883 pub(super) async fn drop_trigger(
884 &self,
885 catalog_name: String,
886 trigger_name: String,
887 drop_if_exists: bool,
888 query_context: QueryContextRef,
889 ) -> Result<Output> {
890 let task = DropTriggerTask {
891 catalog_name,
892 trigger_name,
893 drop_if_exists,
894 };
895 self.drop_trigger_procedure(task, query_context).await?;
896 Ok(Output::new_with_affected_rows(0))
897 }
898
899 #[cfg(feature = "enterprise")]
900 async fn drop_trigger_procedure(
901 &self,
902 expr: DropTriggerTask,
903 query_context: QueryContextRef,
904 ) -> Result<SubmitDdlTaskResponse> {
905 let request = SubmitDdlTaskRequest {
906 query_context,
907 task: DdlTask::new_drop_trigger(expr),
908 };
909
910 self.procedure_executor
911 .submit_ddl_task(&ExecutorContext::default(), request)
912 .await
913 .context(error::ExecuteDdlSnafu)
914 }
915
916 #[tracing::instrument(skip_all)]
918 pub(crate) async fn drop_view(
919 &self,
920 catalog: String,
921 schema: String,
922 view: String,
923 drop_if_exists: bool,
924 query_context: QueryContextRef,
925 ) -> Result<Output> {
926 let view_info = if let Some(view) = self
927 .catalog_manager
928 .table(&catalog, &schema, &view, None)
929 .await
930 .context(CatalogSnafu)?
931 {
932 view.table_info()
933 } else if drop_if_exists {
934 return Ok(Output::new_with_affected_rows(0));
936 } else {
937 return TableNotFoundSnafu {
938 table_name: format_full_table_name(&catalog, &schema, &view),
939 }
940 .fail();
941 };
942
943 ensure!(
945 view_info.table_type == TableType::View,
946 error::InvalidViewSnafu {
947 msg: "not a view",
948 view_name: format_full_table_name(&catalog, &schema, &view),
949 }
950 );
951
952 let view_id = view_info.table_id();
953
954 let task = DropViewTask {
955 catalog,
956 schema,
957 view,
958 view_id,
959 drop_if_exists,
960 };
961
962 self.drop_view_procedure(task, query_context).await?;
963
964 Ok(Output::new_with_affected_rows(0))
965 }
966
967 async fn drop_view_procedure(
969 &self,
970 expr: DropViewTask,
971 query_context: QueryContextRef,
972 ) -> Result<SubmitDdlTaskResponse> {
973 let request = SubmitDdlTaskRequest {
974 query_context,
975 task: DdlTask::new_drop_view(expr),
976 };
977
978 self.procedure_executor
979 .submit_ddl_task(&ExecutorContext::default(), request)
980 .await
981 .context(error::ExecuteDdlSnafu)
982 }
983
984 #[tracing::instrument(skip_all)]
985 pub async fn alter_logical_tables(
986 &self,
987 alter_table_exprs: Vec<AlterTableExpr>,
988 query_context: QueryContextRef,
989 ) -> Result<Output> {
990 let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
991 ensure!(
992 !alter_table_exprs.is_empty(),
993 EmptyDdlExprSnafu {
994 name: "alter logical tables"
995 }
996 );
997
998 let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
1000 for expr in alter_table_exprs {
1001 let catalog = if expr.catalog_name.is_empty() {
1003 query_context.current_catalog()
1004 } else {
1005 &expr.catalog_name
1006 };
1007 let schema = if expr.schema_name.is_empty() {
1008 query_context.current_schema()
1009 } else {
1010 expr.schema_name.to_string()
1011 };
1012 let table_name = &expr.table_name;
1013 let table = self
1014 .catalog_manager
1015 .table(catalog, &schema, table_name, Some(&query_context))
1016 .await
1017 .context(CatalogSnafu)?
1018 .with_context(|| TableNotFoundSnafu {
1019 table_name: format_full_table_name(catalog, &schema, table_name),
1020 })?;
1021 let table_id = table.table_info().ident.table_id;
1022 let physical_table_id = self
1023 .table_metadata_manager
1024 .table_route_manager()
1025 .get_physical_table_id(table_id)
1026 .await
1027 .context(TableMetadataManagerSnafu)?;
1028 groups.entry(physical_table_id).or_default().push(expr);
1029 }
1030
1031 let mut handles = Vec::with_capacity(groups.len());
1033 for (_physical_table_id, exprs) in groups {
1034 let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
1035 handles.push(fut);
1036 }
1037 let _results = futures::future::try_join_all(handles).await?;
1038
1039 Ok(Output::new_with_affected_rows(0))
1040 }
1041
1042 #[tracing::instrument(skip_all)]
1043 pub async fn drop_table(
1044 &self,
1045 table_name: TableName,
1046 drop_if_exists: bool,
1047 query_context: QueryContextRef,
1048 ) -> Result<Output> {
1049 self.drop_tables(&[table_name], drop_if_exists, query_context)
1051 .await
1052 }
1053
1054 #[tracing::instrument(skip_all)]
1055 pub async fn drop_tables(
1056 &self,
1057 table_names: &[TableName],
1058 drop_if_exists: bool,
1059 query_context: QueryContextRef,
1060 ) -> Result<Output> {
1061 let mut tables = Vec::with_capacity(table_names.len());
1062 for table_name in table_names {
1063 ensure!(
1064 !is_readonly_schema(&table_name.schema_name),
1065 SchemaReadOnlySnafu {
1066 name: table_name.schema_name.clone()
1067 }
1068 );
1069
1070 if let Some(table) = self
1071 .catalog_manager
1072 .table(
1073 &table_name.catalog_name,
1074 &table_name.schema_name,
1075 &table_name.table_name,
1076 Some(&query_context),
1077 )
1078 .await
1079 .context(CatalogSnafu)?
1080 {
1081 tables.push(table.table_info().table_id());
1082 } else if drop_if_exists {
1083 continue;
1085 } else {
1086 return TableNotFoundSnafu {
1087 table_name: table_name.to_string(),
1088 }
1089 .fail();
1090 }
1091 }
1092
1093 for (table_name, table_id) in table_names.iter().zip(tables.into_iter()) {
1094 self.drop_table_procedure(table_name, table_id, drop_if_exists, query_context.clone())
1095 .await?;
1096
1097 self.cache_invalidator
1099 .invalidate(
1100 &Context::default(),
1101 &[
1102 CacheIdent::TableId(table_id),
1103 CacheIdent::TableName(table_name.clone()),
1104 ],
1105 )
1106 .await
1107 .context(error::InvalidateTableCacheSnafu)?;
1108 }
1109 Ok(Output::new_with_affected_rows(0))
1110 }
1111
1112 #[tracing::instrument(skip_all)]
1113 pub async fn drop_database(
1114 &self,
1115 catalog: String,
1116 schema: String,
1117 drop_if_exists: bool,
1118 query_context: QueryContextRef,
1119 ) -> Result<Output> {
1120 ensure!(
1121 !is_readonly_schema(&schema),
1122 SchemaReadOnlySnafu { name: schema }
1123 );
1124
1125 if self
1126 .catalog_manager
1127 .schema_exists(&catalog, &schema, None)
1128 .await
1129 .context(CatalogSnafu)?
1130 {
1131 if schema == query_context.current_schema() {
1132 SchemaInUseSnafu { name: schema }.fail()
1133 } else {
1134 self.drop_database_procedure(catalog, schema, drop_if_exists, query_context)
1135 .await?;
1136
1137 Ok(Output::new_with_affected_rows(0))
1138 }
1139 } else if drop_if_exists {
1140 Ok(Output::new_with_affected_rows(0))
1142 } else {
1143 SchemaNotFoundSnafu {
1144 schema_info: schema,
1145 }
1146 .fail()
1147 }
1148 }
1149
1150 #[tracing::instrument(skip_all)]
1151 pub async fn truncate_table(
1152 &self,
1153 table_name: TableName,
1154 time_ranges: Vec<(Timestamp, Timestamp)>,
1155 query_context: QueryContextRef,
1156 ) -> Result<Output> {
1157 ensure!(
1158 !is_readonly_schema(&table_name.schema_name),
1159 SchemaReadOnlySnafu {
1160 name: table_name.schema_name.clone()
1161 }
1162 );
1163
1164 let table = self
1165 .catalog_manager
1166 .table(
1167 &table_name.catalog_name,
1168 &table_name.schema_name,
1169 &table_name.table_name,
1170 Some(&query_context),
1171 )
1172 .await
1173 .context(CatalogSnafu)?
1174 .with_context(|| TableNotFoundSnafu {
1175 table_name: table_name.to_string(),
1176 })?;
1177 let table_id = table.table_info().table_id();
1178 self.truncate_table_procedure(&table_name, table_id, time_ranges, query_context)
1179 .await?;
1180
1181 Ok(Output::new_with_affected_rows(0))
1182 }
1183
1184 #[tracing::instrument(skip_all)]
1185 pub async fn alter_table(
1186 &self,
1187 alter_table: AlterTable,
1188 query_context: QueryContextRef,
1189 ) -> Result<Output> {
1190 let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
1191 self.alter_table_inner(expr, query_context).await
1192 }
1193
1194 #[tracing::instrument(skip_all)]
1195 pub async fn alter_table_inner(
1196 &self,
1197 expr: AlterTableExpr,
1198 query_context: QueryContextRef,
1199 ) -> Result<Output> {
1200 ensure!(
1201 !is_readonly_schema(&expr.schema_name),
1202 SchemaReadOnlySnafu {
1203 name: expr.schema_name.clone()
1204 }
1205 );
1206
1207 let catalog_name = if expr.catalog_name.is_empty() {
1208 DEFAULT_CATALOG_NAME.to_string()
1209 } else {
1210 expr.catalog_name.clone()
1211 };
1212
1213 let schema_name = if expr.schema_name.is_empty() {
1214 DEFAULT_SCHEMA_NAME.to_string()
1215 } else {
1216 expr.schema_name.clone()
1217 };
1218
1219 let table_name = expr.table_name.clone();
1220
1221 let table = self
1222 .catalog_manager
1223 .table(
1224 &catalog_name,
1225 &schema_name,
1226 &table_name,
1227 Some(&query_context),
1228 )
1229 .await
1230 .context(CatalogSnafu)?
1231 .with_context(|| TableNotFoundSnafu {
1232 table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
1233 })?;
1234
1235 let table_id = table.table_info().ident.table_id;
1236 let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?;
1237 if !need_alter {
1238 return Ok(Output::new_with_affected_rows(0));
1239 }
1240 info!(
1241 "Table info before alter is {:?}, expr: {:?}",
1242 table.table_info(),
1243 expr
1244 );
1245
1246 let physical_table_id = self
1247 .table_metadata_manager
1248 .table_route_manager()
1249 .get_physical_table_id(table_id)
1250 .await
1251 .context(TableMetadataManagerSnafu)?;
1252
1253 let (req, invalidate_keys) = if physical_table_id == table_id {
1254 let req = SubmitDdlTaskRequest {
1256 query_context,
1257 task: DdlTask::new_alter_table(expr),
1258 };
1259
1260 let invalidate_keys = vec![
1261 CacheIdent::TableId(table_id),
1262 CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1263 ];
1264
1265 (req, invalidate_keys)
1266 } else {
1267 let req = SubmitDdlTaskRequest {
1269 query_context,
1270 task: DdlTask::new_alter_logical_tables(vec![expr]),
1271 };
1272
1273 let mut invalidate_keys = vec![
1274 CacheIdent::TableId(physical_table_id),
1275 CacheIdent::TableId(table_id),
1276 CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1277 ];
1278
1279 let physical_table = self
1280 .table_metadata_manager
1281 .table_info_manager()
1282 .get(physical_table_id)
1283 .await
1284 .context(TableMetadataManagerSnafu)?
1285 .map(|x| x.into_inner());
1286 if let Some(physical_table) = physical_table {
1287 let physical_table_name = TableName::new(
1288 physical_table.table_info.catalog_name,
1289 physical_table.table_info.schema_name,
1290 physical_table.table_info.name,
1291 );
1292 invalidate_keys.push(CacheIdent::TableName(physical_table_name));
1293 }
1294
1295 (req, invalidate_keys)
1296 };
1297
1298 self.procedure_executor
1299 .submit_ddl_task(&ExecutorContext::default(), req)
1300 .await
1301 .context(error::ExecuteDdlSnafu)?;
1302
1303 self.cache_invalidator
1305 .invalidate(&Context::default(), &invalidate_keys)
1306 .await
1307 .context(error::InvalidateTableCacheSnafu)?;
1308
1309 Ok(Output::new_with_affected_rows(0))
1310 }
1311
1312 #[cfg(feature = "enterprise")]
1313 #[tracing::instrument(skip_all)]
1314 pub async fn alter_trigger(
1315 &self,
1316 _alter_expr: AlterTrigger,
1317 _query_context: QueryContextRef,
1318 ) -> Result<Output> {
1319 crate::error::NotSupportedSnafu {
1320 feat: "alter trigger",
1321 }
1322 .fail()
1323 }
1324
1325 #[tracing::instrument(skip_all)]
1326 pub async fn alter_database(
1327 &self,
1328 alter_expr: AlterDatabase,
1329 query_context: QueryContextRef,
1330 ) -> Result<Output> {
1331 let alter_expr = expr_helper::to_alter_database_expr(alter_expr, &query_context)?;
1332 self.alter_database_inner(alter_expr, query_context).await
1333 }
1334
1335 #[tracing::instrument(skip_all)]
1336 pub async fn alter_database_inner(
1337 &self,
1338 alter_expr: AlterDatabaseExpr,
1339 query_context: QueryContextRef,
1340 ) -> Result<Output> {
1341 ensure!(
1342 !is_readonly_schema(&alter_expr.schema_name),
1343 SchemaReadOnlySnafu {
1344 name: query_context.current_schema().clone()
1345 }
1346 );
1347
1348 let exists = self
1349 .catalog_manager
1350 .schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
1351 .await
1352 .context(CatalogSnafu)?;
1353 ensure!(
1354 exists,
1355 SchemaNotFoundSnafu {
1356 schema_info: alter_expr.schema_name,
1357 }
1358 );
1359
1360 let cache_ident = [CacheIdent::SchemaName(SchemaName {
1361 catalog_name: alter_expr.catalog_name.clone(),
1362 schema_name: alter_expr.schema_name.clone(),
1363 })];
1364
1365 self.alter_database_procedure(alter_expr, query_context)
1366 .await?;
1367
1368 self.cache_invalidator
1370 .invalidate(&Context::default(), &cache_ident)
1371 .await
1372 .context(error::InvalidateTableCacheSnafu)?;
1373
1374 Ok(Output::new_with_affected_rows(0))
1375 }
1376
1377 async fn create_table_procedure(
1378 &self,
1379 create_table: CreateTableExpr,
1380 partitions: Vec<PartitionExpr>,
1381 table_info: RawTableInfo,
1382 query_context: QueryContextRef,
1383 ) -> Result<SubmitDdlTaskResponse> {
1384 let partitions = partitions
1385 .into_iter()
1386 .map(|expr| expr.as_pb_partition().context(PartitionExprToPbSnafu))
1387 .collect::<Result<Vec<_>>>()?;
1388
1389 let request = SubmitDdlTaskRequest {
1390 query_context,
1391 task: DdlTask::new_create_table(create_table, partitions, table_info),
1392 };
1393
1394 self.procedure_executor
1395 .submit_ddl_task(&ExecutorContext::default(), request)
1396 .await
1397 .context(error::ExecuteDdlSnafu)
1398 }
1399
1400 async fn create_logical_tables_procedure(
1401 &self,
1402 tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
1403 query_context: QueryContextRef,
1404 ) -> Result<SubmitDdlTaskResponse> {
1405 let request = SubmitDdlTaskRequest {
1406 query_context,
1407 task: DdlTask::new_create_logical_tables(tables_data),
1408 };
1409
1410 self.procedure_executor
1411 .submit_ddl_task(&ExecutorContext::default(), request)
1412 .await
1413 .context(error::ExecuteDdlSnafu)
1414 }
1415
1416 async fn alter_logical_tables_procedure(
1417 &self,
1418 tables_data: Vec<AlterTableExpr>,
1419 query_context: QueryContextRef,
1420 ) -> Result<SubmitDdlTaskResponse> {
1421 let request = SubmitDdlTaskRequest {
1422 query_context,
1423 task: DdlTask::new_alter_logical_tables(tables_data),
1424 };
1425
1426 self.procedure_executor
1427 .submit_ddl_task(&ExecutorContext::default(), request)
1428 .await
1429 .context(error::ExecuteDdlSnafu)
1430 }
1431
1432 async fn drop_table_procedure(
1433 &self,
1434 table_name: &TableName,
1435 table_id: TableId,
1436 drop_if_exists: bool,
1437 query_context: QueryContextRef,
1438 ) -> Result<SubmitDdlTaskResponse> {
1439 let request = SubmitDdlTaskRequest {
1440 query_context,
1441 task: DdlTask::new_drop_table(
1442 table_name.catalog_name.to_string(),
1443 table_name.schema_name.to_string(),
1444 table_name.table_name.to_string(),
1445 table_id,
1446 drop_if_exists,
1447 ),
1448 };
1449
1450 self.procedure_executor
1451 .submit_ddl_task(&ExecutorContext::default(), request)
1452 .await
1453 .context(error::ExecuteDdlSnafu)
1454 }
1455
1456 async fn drop_database_procedure(
1457 &self,
1458 catalog: String,
1459 schema: String,
1460 drop_if_exists: bool,
1461 query_context: QueryContextRef,
1462 ) -> Result<SubmitDdlTaskResponse> {
1463 let request = SubmitDdlTaskRequest {
1464 query_context,
1465 task: DdlTask::new_drop_database(catalog, schema, drop_if_exists),
1466 };
1467
1468 self.procedure_executor
1469 .submit_ddl_task(&ExecutorContext::default(), request)
1470 .await
1471 .context(error::ExecuteDdlSnafu)
1472 }
1473
1474 async fn alter_database_procedure(
1475 &self,
1476 alter_expr: AlterDatabaseExpr,
1477 query_context: QueryContextRef,
1478 ) -> Result<SubmitDdlTaskResponse> {
1479 let request = SubmitDdlTaskRequest {
1480 query_context,
1481 task: DdlTask::new_alter_database(alter_expr),
1482 };
1483
1484 self.procedure_executor
1485 .submit_ddl_task(&ExecutorContext::default(), request)
1486 .await
1487 .context(error::ExecuteDdlSnafu)
1488 }
1489
1490 async fn truncate_table_procedure(
1491 &self,
1492 table_name: &TableName,
1493 table_id: TableId,
1494 time_ranges: Vec<(Timestamp, Timestamp)>,
1495 query_context: QueryContextRef,
1496 ) -> Result<SubmitDdlTaskResponse> {
1497 let request = SubmitDdlTaskRequest {
1498 query_context,
1499 task: DdlTask::new_truncate_table(
1500 table_name.catalog_name.to_string(),
1501 table_name.schema_name.to_string(),
1502 table_name.table_name.to_string(),
1503 table_id,
1504 time_ranges,
1505 ),
1506 };
1507
1508 self.procedure_executor
1509 .submit_ddl_task(&ExecutorContext::default(), request)
1510 .await
1511 .context(error::ExecuteDdlSnafu)
1512 }
1513
1514 #[tracing::instrument(skip_all)]
1515 pub async fn create_database(
1516 &self,
1517 database: &str,
1518 create_if_not_exists: bool,
1519 options: HashMap<String, String>,
1520 query_context: QueryContextRef,
1521 ) -> Result<Output> {
1522 let catalog = query_context.current_catalog();
1523 ensure!(
1524 NAME_PATTERN_REG.is_match(catalog),
1525 error::UnexpectedSnafu {
1526 violated: format!("Invalid catalog name: {}", catalog)
1527 }
1528 );
1529
1530 ensure!(
1531 NAME_PATTERN_REG.is_match(database),
1532 error::UnexpectedSnafu {
1533 violated: format!("Invalid database name: {}", database)
1534 }
1535 );
1536
1537 if !self
1538 .catalog_manager
1539 .schema_exists(catalog, database, None)
1540 .await
1541 .context(CatalogSnafu)?
1542 && !self.catalog_manager.is_reserved_schema_name(database)
1543 {
1544 self.create_database_procedure(
1545 catalog.to_string(),
1546 database.to_string(),
1547 create_if_not_exists,
1548 options,
1549 query_context,
1550 )
1551 .await?;
1552
1553 Ok(Output::new_with_affected_rows(1))
1554 } else if create_if_not_exists {
1555 Ok(Output::new_with_affected_rows(1))
1556 } else {
1557 error::SchemaExistsSnafu { name: database }.fail()
1558 }
1559 }
1560
1561 async fn create_database_procedure(
1562 &self,
1563 catalog: String,
1564 database: String,
1565 create_if_not_exists: bool,
1566 options: HashMap<String, String>,
1567 query_context: QueryContextRef,
1568 ) -> Result<SubmitDdlTaskResponse> {
1569 let request = SubmitDdlTaskRequest {
1570 query_context,
1571 task: DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
1572 };
1573
1574 self.procedure_executor
1575 .submit_ddl_task(&ExecutorContext::default(), request)
1576 .await
1577 .context(error::ExecuteDdlSnafu)
1578 }
1579}
1580
1581pub fn parse_partitions(
1583 create_table: &CreateTableExpr,
1584 partitions: Option<Partitions>,
1585 query_ctx: &QueryContextRef,
1586) -> Result<(Vec<PartitionExpr>, Vec<String>)> {
1587 let partition_columns = find_partition_columns(&partitions)?;
1590 let partition_exprs =
1591 find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
1592
1593 let exprs = partition_exprs.clone();
1595 MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true)
1596 .context(InvalidPartitionSnafu)?;
1597
1598 Ok((partition_exprs, partition_columns))
1599}
1600
1601pub fn verify_alter(
1607 table_id: TableId,
1608 table_info: Arc<TableInfo>,
1609 expr: AlterTableExpr,
1610) -> Result<bool> {
1611 let request: AlterTableRequest =
1612 common_grpc_expr::alter_expr_to_request(table_id, expr, Some(&table_info.meta))
1613 .context(AlterExprToRequestSnafu)?;
1614
1615 let AlterTableRequest {
1616 table_name,
1617 alter_kind,
1618 ..
1619 } = &request;
1620
1621 if let AlterKind::RenameTable { new_table_name } = alter_kind {
1622 ensure!(
1623 NAME_PATTERN_REG.is_match(new_table_name),
1624 error::UnexpectedSnafu {
1625 violated: format!("Invalid table name: {}", new_table_name)
1626 }
1627 );
1628 } else if let AlterKind::AddColumns { columns } = alter_kind {
1629 let column_names: HashSet<_> = table_info
1632 .meta
1633 .schema
1634 .column_schemas()
1635 .iter()
1636 .map(|schema| &schema.name)
1637 .collect();
1638 if columns.iter().all(|column| {
1639 column_names.contains(&column.column_schema.name) && column.add_if_not_exists
1640 }) {
1641 return Ok(false);
1642 }
1643 }
1644
1645 let _ = table_info
1646 .meta
1647 .builder_with_alter_kind(table_name, &request.alter_kind)
1648 .context(error::TableSnafu)?
1649 .build()
1650 .context(error::BuildTableMetaSnafu { table_name })?;
1651
1652 Ok(true)
1653}
1654
1655pub fn create_table_info(
1656 create_table: &CreateTableExpr,
1657 partition_columns: Vec<String>,
1658) -> Result<RawTableInfo> {
1659 let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
1660 let mut column_name_to_index_map = HashMap::new();
1661
1662 for (idx, column) in create_table.column_defs.iter().enumerate() {
1663 let schema =
1664 column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
1665 column: &column.name,
1666 })?;
1667 let schema = schema.with_time_index(column.name == create_table.time_index);
1668
1669 column_schemas.push(schema);
1670 let _ = column_name_to_index_map.insert(column.name.clone(), idx);
1671 }
1672
1673 let timestamp_index = column_name_to_index_map
1674 .get(&create_table.time_index)
1675 .cloned();
1676
1677 let raw_schema = RawSchema {
1678 column_schemas: column_schemas.clone(),
1679 timestamp_index,
1680 version: 0,
1681 };
1682
1683 let primary_key_indices = create_table
1684 .primary_keys
1685 .iter()
1686 .map(|name| {
1687 column_name_to_index_map
1688 .get(name)
1689 .cloned()
1690 .context(ColumnNotFoundSnafu { msg: name })
1691 })
1692 .collect::<Result<Vec<_>>>()?;
1693
1694 let partition_key_indices = partition_columns
1695 .into_iter()
1696 .map(|col_name| {
1697 column_name_to_index_map
1698 .get(&col_name)
1699 .cloned()
1700 .context(ColumnNotFoundSnafu { msg: col_name })
1701 })
1702 .collect::<Result<Vec<_>>>()?;
1703
1704 let table_options = TableOptions::try_from_iter(&create_table.table_options)
1705 .context(UnrecognizedTableOptionSnafu)?;
1706
1707 let meta = RawTableMeta {
1708 schema: raw_schema,
1709 primary_key_indices,
1710 value_indices: vec![],
1711 engine: create_table.engine.clone(),
1712 next_column_id: column_schemas.len() as u32,
1713 region_numbers: vec![],
1714 options: table_options,
1715 created_on: Utc::now(),
1716 partition_key_indices,
1717 column_ids: vec![],
1718 };
1719
1720 let desc = if create_table.desc.is_empty() {
1721 create_table.table_options.get(COMMENT_KEY).cloned()
1722 } else {
1723 Some(create_table.desc.clone())
1724 };
1725
1726 let table_info = RawTableInfo {
1727 ident: metadata::TableIdent {
1728 table_id: 0,
1730 version: 0,
1731 },
1732 name: create_table.table_name.clone(),
1733 desc,
1734 catalog_name: create_table.catalog_name.clone(),
1735 schema_name: create_table.schema_name.clone(),
1736 meta,
1737 table_type: TableType::Base,
1738 };
1739 Ok(table_info)
1740}
1741
1742fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
1743 let columns = if let Some(partitions) = partitions {
1744 partitions
1745 .column_list
1746 .iter()
1747 .map(|x| x.value.clone())
1748 .collect::<Vec<_>>()
1749 } else {
1750 vec![]
1751 };
1752 Ok(columns)
1753}
1754
1755fn find_partition_entries(
1759 create_table: &CreateTableExpr,
1760 partitions: &Option<Partitions>,
1761 partition_columns: &[String],
1762 query_ctx: &QueryContextRef,
1763) -> Result<Vec<PartitionExpr>> {
1764 let Some(partitions) = partitions else {
1765 return Ok(vec![]);
1766 };
1767
1768 let column_name_and_type = partition_columns
1770 .iter()
1771 .map(|pc| {
1772 let column = create_table
1773 .column_defs
1774 .iter()
1775 .find(|c| &c.name == pc)
1776 .unwrap();
1778 let column_name = &column.name;
1779 let data_type = ConcreteDataType::from(
1780 ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension)
1781 .context(ColumnDataTypeSnafu)?,
1782 );
1783 Ok((column_name, data_type))
1784 })
1785 .collect::<Result<HashMap<_, _>>>()?;
1786
1787 let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
1789 for partition in &partitions.exprs {
1790 let partition_expr =
1791 convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
1792 partition_exprs.push(partition_expr);
1793 }
1794
1795 Ok(partition_exprs)
1796}
1797
1798fn convert_one_expr(
1799 expr: &Expr,
1800 column_name_and_type: &HashMap<&String, ConcreteDataType>,
1801 timezone: &Timezone,
1802) -> Result<PartitionExpr> {
1803 let Expr::BinaryOp { left, op, right } = expr else {
1804 return InvalidPartitionRuleSnafu {
1805 reason: "partition rule must be a binary expression",
1806 }
1807 .fail();
1808 };
1809
1810 let op =
1811 RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
1812 reason: format!("unsupported operator in partition expr {op}"),
1813 })?;
1814
1815 let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
1817 (Expr::Identifier(ident), Expr::Value(value)) => {
1819 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1820 let value = convert_value(&value.value, data_type, timezone, None)?;
1821 (Operand::Column(column_name), op, Operand::Value(value))
1822 }
1823 (Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
1824 if let Expr::Value(v) = &**expr =>
1825 {
1826 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1827 let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
1828 (Operand::Column(column_name), op, Operand::Value(value))
1829 }
1830 (Expr::Value(value), Expr::Identifier(ident)) => {
1832 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1833 let value = convert_value(&value.value, data_type, timezone, None)?;
1834 (Operand::Value(value), op, Operand::Column(column_name))
1835 }
1836 (Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
1837 if let Expr::Value(v) = &**expr =>
1838 {
1839 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1840 let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
1841 (Operand::Value(value), op, Operand::Column(column_name))
1842 }
1843 (Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
1844 let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
1846 let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
1847 (Operand::Expr(lhs), op, Operand::Expr(rhs))
1848 }
1849 _ => {
1850 return InvalidPartitionRuleSnafu {
1851 reason: format!("invalid partition expr {expr}"),
1852 }
1853 .fail();
1854 }
1855 };
1856
1857 Ok(PartitionExpr::new(lhs, op, rhs))
1858}
1859
1860fn convert_identifier(
1861 ident: &Ident,
1862 column_name_and_type: &HashMap<&String, ConcreteDataType>,
1863) -> Result<(String, ConcreteDataType)> {
1864 let column_name = ident.value.clone();
1865 let data_type = column_name_and_type
1866 .get(&column_name)
1867 .cloned()
1868 .with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
1869 Ok((column_name, data_type))
1870}
1871
1872fn convert_value(
1873 value: &ParserValue,
1874 data_type: ConcreteDataType,
1875 timezone: &Timezone,
1876 unary_op: Option<UnaryOperator>,
1877) -> Result<Value> {
1878 sql_value_to_value(
1879 "<NONAME>",
1880 &data_type,
1881 value,
1882 Some(timezone),
1883 unary_op,
1884 false,
1885 )
1886 .context(error::SqlCommonSnafu)
1887}
1888
1889#[cfg(test)]
1890mod test {
1891 use session::context::{QueryContext, QueryContextBuilder};
1892 use sql::dialect::GreptimeDbDialect;
1893 use sql::parser::{ParseOptions, ParserContext};
1894 use sql::statements::statement::Statement;
1895
1896 use super::*;
1897 use crate::expr_helper;
1898
1899 #[test]
1900 fn test_name_is_match() {
1901 assert!(!NAME_PATTERN_REG.is_match("/adaf"));
1902 assert!(!NAME_PATTERN_REG.is_match("🈲"));
1903 assert!(NAME_PATTERN_REG.is_match("hello"));
1904 assert!(NAME_PATTERN_REG.is_match("test@"));
1905 assert!(!NAME_PATTERN_REG.is_match("@test"));
1906 assert!(NAME_PATTERN_REG.is_match("test#"));
1907 assert!(!NAME_PATTERN_REG.is_match("#test"));
1908 assert!(!NAME_PATTERN_REG.is_match("@"));
1909 assert!(!NAME_PATTERN_REG.is_match("#"));
1910 }
1911
1912 #[tokio::test]
1913 #[ignore = "TODO(ruihang): WIP new partition rule"]
1914 async fn test_parse_partitions() {
1915 common_telemetry::init_default_ut_logging();
1916 let cases = [
1917 (
1918 r"
1919CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
1920PARTITION ON COLUMNS (b) (
1921 b < 'hz',
1922 b >= 'hz' AND b < 'sh',
1923 b >= 'sh'
1924)
1925ENGINE=mito",
1926 r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
1927 ),
1928 (
1929 r"
1930CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
1931PARTITION BY RANGE COLUMNS (b, a) (
1932 PARTITION r0 VALUES LESS THAN ('hz', 10),
1933 b < 'hz' AND a < 10,
1934 b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20,
1935 b >= 'sh' AND a >= 20
1936)
1937ENGINE=mito",
1938 r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
1939 ),
1940 ];
1941 let ctx = QueryContextBuilder::default().build().into();
1942 for (sql, expected) in cases {
1943 let result = ParserContext::create_with_dialect(
1944 sql,
1945 &GreptimeDbDialect {},
1946 ParseOptions::default(),
1947 )
1948 .unwrap();
1949 match &result[0] {
1950 Statement::CreateTable(c) => {
1951 let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
1952 let (partitions, _) =
1953 parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
1954 let json = serde_json::to_string(&partitions).unwrap();
1955 assert_eq!(json, expected);
1956 }
1957 _ => unreachable!(),
1958 }
1959 }
1960 }
1961}