1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use api::helper::ColumnDataTypeWrapper;
19use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
20use api::v1::{
21 column_def, AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
22};
23#[cfg(feature = "enterprise")]
24use api::v1::{
25 meta::CreateTriggerTask as PbCreateTriggerTask, CreateTriggerExpr as PbCreateTriggerExpr,
26};
27use catalog::CatalogManagerRef;
28use chrono::Utc;
29use common_catalog::consts::{is_readonly_schema, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
30use common_catalog::{format_full_flow_name, format_full_table_name};
31use common_error::ext::BoxedError;
32use common_meta::cache_invalidator::Context;
33use common_meta::ddl::create_flow::FlowType;
34use common_meta::ddl::ExecutorContext;
35use common_meta::instruction::CacheIdent;
36use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
37use common_meta::key::NAME_PATTERN;
38#[cfg(feature = "enterprise")]
39use common_meta::rpc::ddl::trigger::CreateTriggerTask;
40#[cfg(feature = "enterprise")]
41use common_meta::rpc::ddl::trigger::DropTriggerTask;
42use common_meta::rpc::ddl::{
43 CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
44 SubmitDdlTaskResponse,
45};
46use common_meta::rpc::router::{Partition, Partition as MetaPartition};
47use common_query::Output;
48use common_sql::convert::sql_value_to_value;
49use common_telemetry::{debug, info, tracing, warn};
50use common_time::Timezone;
51use datafusion_common::tree_node::TreeNodeVisitor;
52use datafusion_expr::LogicalPlan;
53use datatypes::prelude::ConcreteDataType;
54use datatypes::schema::{RawSchema, Schema};
55use datatypes::value::Value;
56use lazy_static::lazy_static;
57use partition::expr::{Operand, PartitionExpr, RestrictedOp};
58use partition::multi_dim::MultiDimPartitionRule;
59use partition::partition::{PartitionBound, PartitionDef};
60use query::parser::QueryStatement;
61use query::plan::extract_and_rewrite_full_table_names;
62use query::query_engine::DefaultSerializer;
63use query::sql::create_table_stmt;
64use regex::Regex;
65use session::context::QueryContextRef;
66use session::table_name::table_idents_to_full_name;
67use snafu::{ensure, OptionExt, ResultExt};
68use sql::parser::{ParseOptions, ParserContext};
69use sql::statements::alter::{AlterDatabase, AlterTable};
70#[cfg(feature = "enterprise")]
71use sql::statements::create::trigger::CreateTrigger;
72use sql::statements::create::{
73 CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
74};
75use sql::statements::statement::Statement;
76use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
77use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
78use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
79use table::dist_table::DistTable;
80use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
81use table::requests::{AlterKind, AlterTableRequest, TableOptions, COMMENT_KEY};
82use table::table_name::TableName;
83use table::TableRef;
84
85use crate::error::{
86 self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
87 ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
88 DeserializePartitionSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
89 FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
90 InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, Result, SchemaInUseSnafu,
91 SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu,
92 TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
93 ViewAlreadyExistsSnafu,
94};
95use crate::expr_helper;
96use crate::statement::show::create_partitions_stmt;
97use crate::statement::StatementExecutor;
98
99lazy_static! {
100 pub static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap();
101}
102
103impl StatementExecutor {
104 pub fn catalog_manager(&self) -> CatalogManagerRef {
105 self.catalog_manager.clone()
106 }
107
108 #[tracing::instrument(skip_all)]
109 pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
110 let create_expr = &mut expr_helper::create_to_expr(&stmt, &ctx)?;
111 self.create_table_inner(create_expr, stmt.partitions, ctx)
112 .await
113 }
114
115 #[tracing::instrument(skip_all)]
116 pub async fn create_table_like(
117 &self,
118 stmt: CreateTableLike,
119 ctx: QueryContextRef,
120 ) -> Result<TableRef> {
121 let (catalog, schema, table) = table_idents_to_full_name(&stmt.source_name, &ctx)
122 .map_err(BoxedError::new)
123 .context(error::ExternalSnafu)?;
124 let table_ref = self
125 .catalog_manager
126 .table(&catalog, &schema, &table, Some(&ctx))
127 .await
128 .context(CatalogSnafu)?
129 .context(TableNotFoundSnafu { table_name: &table })?;
130 let partitions = self
131 .partition_manager
132 .find_table_partitions(table_ref.table_info().table_id())
133 .await
134 .context(error::FindTablePartitionRuleSnafu { table_name: table })?;
135
136 let schema_options = self
138 .table_metadata_manager
139 .schema_manager()
140 .get(SchemaNameKey {
141 catalog: &catalog,
142 schema: &schema,
143 })
144 .await
145 .context(TableMetadataManagerSnafu)?
146 .map(|v| v.into_inner());
147
148 let quote_style = ctx.quote_style();
149 let mut create_stmt =
150 create_table_stmt(&table_ref.table_info(), schema_options, quote_style)
151 .context(error::ParseQuerySnafu)?;
152 create_stmt.name = stmt.table_name;
153 create_stmt.if_not_exists = false;
154
155 let partitions = create_partitions_stmt(partitions)?.and_then(|mut partitions| {
156 if !partitions.column_list.is_empty() {
157 partitions.set_quote(quote_style);
158 Some(partitions)
159 } else {
160 None
161 }
162 });
163
164 let create_expr = &mut expr_helper::create_to_expr(&create_stmt, &ctx)?;
165 self.create_table_inner(create_expr, partitions, ctx).await
166 }
167
168 #[tracing::instrument(skip_all)]
169 pub async fn create_external_table(
170 &self,
171 create_expr: CreateExternalTable,
172 ctx: QueryContextRef,
173 ) -> Result<TableRef> {
174 let create_expr = &mut expr_helper::create_external_expr(create_expr, &ctx).await?;
175 self.create_table_inner(create_expr, None, ctx).await
176 }
177
178 #[tracing::instrument(skip_all)]
179 pub async fn create_table_inner(
180 &self,
181 create_table: &mut CreateTableExpr,
182 partitions: Option<Partitions>,
183 query_ctx: QueryContextRef,
184 ) -> Result<TableRef> {
185 ensure!(
186 !is_readonly_schema(&create_table.schema_name),
187 SchemaReadOnlySnafu {
188 name: create_table.schema_name.clone()
189 }
190 );
191
192 if create_table.engine == METRIC_ENGINE_NAME
193 && create_table
194 .table_options
195 .contains_key(LOGICAL_TABLE_METADATA_KEY)
196 {
197 ensure!(
199 partitions.is_none(),
200 InvalidPartitionRuleSnafu {
201 reason: "logical table in metric engine should not have partition rule, it will be inherited from physical table",
202 }
203 );
204 self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
205 .await?
206 .into_iter()
207 .next()
208 .context(error::UnexpectedSnafu {
209 violated: "expected to create logical tables",
210 })
211 } else {
212 self.create_non_logic_table(create_table, partitions, query_ctx)
214 .await
215 }
216 }
217
218 #[tracing::instrument(skip_all)]
219 pub async fn create_non_logic_table(
220 &self,
221 create_table: &mut CreateTableExpr,
222 partitions: Option<Partitions>,
223 query_ctx: QueryContextRef,
224 ) -> Result<TableRef> {
225 let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
226
227 let schema = self
229 .table_metadata_manager
230 .schema_manager()
231 .get(SchemaNameKey::new(
232 &create_table.catalog_name,
233 &create_table.schema_name,
234 ))
235 .await
236 .context(TableMetadataManagerSnafu)?;
237 ensure!(
238 schema.is_some(),
239 SchemaNotFoundSnafu {
240 schema_info: &create_table.schema_name,
241 }
242 );
243
244 if let Some(table) = self
246 .catalog_manager
247 .table(
248 &create_table.catalog_name,
249 &create_table.schema_name,
250 &create_table.table_name,
251 Some(&query_ctx),
252 )
253 .await
254 .context(CatalogSnafu)?
255 {
256 return if create_table.create_if_not_exists {
257 Ok(table)
258 } else {
259 TableAlreadyExistsSnafu {
260 table: format_full_table_name(
261 &create_table.catalog_name,
262 &create_table.schema_name,
263 &create_table.table_name,
264 ),
265 }
266 .fail()
267 };
268 }
269
270 ensure!(
271 NAME_PATTERN_REG.is_match(&create_table.table_name),
272 InvalidTableNameSnafu {
273 table_name: &create_table.table_name,
274 }
275 );
276
277 let table_name = TableName::new(
278 &create_table.catalog_name,
279 &create_table.schema_name,
280 &create_table.table_name,
281 );
282
283 let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
284 let mut table_info = create_table_info(create_table, partition_cols)?;
285
286 let resp = self
287 .create_table_procedure(
288 create_table.clone(),
289 partitions,
290 table_info.clone(),
291 query_ctx,
292 )
293 .await?;
294
295 let table_id = resp
296 .table_ids
297 .into_iter()
298 .next()
299 .context(error::UnexpectedSnafu {
300 violated: "expected table_id",
301 })?;
302 info!("Successfully created table '{table_name}' with table id {table_id}");
303
304 table_info.ident.table_id = table_id;
305
306 let table_info: Arc<TableInfo> =
307 Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?);
308 create_table.table_id = Some(api::v1::TableId { id: table_id });
309
310 let table = DistTable::table(table_info);
311
312 Ok(table)
313 }
314
315 #[tracing::instrument(skip_all)]
316 pub async fn create_logical_tables(
317 &self,
318 create_table_exprs: &[CreateTableExpr],
319 query_context: QueryContextRef,
320 ) -> Result<Vec<TableRef>> {
321 let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
322 ensure!(
323 !create_table_exprs.is_empty(),
324 EmptyDdlExprSnafu {
325 name: "create logic tables"
326 }
327 );
328
329 for create_table in create_table_exprs {
331 ensure!(
332 NAME_PATTERN_REG.is_match(&create_table.table_name),
333 InvalidTableNameSnafu {
334 table_name: &create_table.table_name,
335 }
336 );
337 }
338
339 let mut raw_tables_info = create_table_exprs
340 .iter()
341 .map(|create| create_table_info(create, vec![]))
342 .collect::<Result<Vec<_>>>()?;
343 let tables_data = create_table_exprs
344 .iter()
345 .cloned()
346 .zip(raw_tables_info.iter().cloned())
347 .collect::<Vec<_>>();
348
349 let resp = self
350 .create_logical_tables_procedure(tables_data, query_context)
351 .await?;
352
353 let table_ids = resp.table_ids;
354 ensure!(table_ids.len() == raw_tables_info.len(), CreateLogicalTablesSnafu {
355 reason: format!("The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}", raw_tables_info.len(), table_ids.len())
356 });
357 info!("Successfully created logical tables: {:?}", table_ids);
358
359 for (i, table_info) in raw_tables_info.iter_mut().enumerate() {
360 table_info.ident.table_id = table_ids[i];
361 }
362 let tables_info = raw_tables_info
363 .into_iter()
364 .map(|x| x.try_into().context(CreateTableInfoSnafu))
365 .collect::<Result<Vec<_>>>()?;
366
367 Ok(tables_info
368 .into_iter()
369 .map(|x| DistTable::table(Arc::new(x)))
370 .collect())
371 }
372
373 #[cfg(feature = "enterprise")]
374 #[tracing::instrument(skip_all)]
375 pub async fn create_trigger(
376 &self,
377 stmt: CreateTrigger,
378 query_context: QueryContextRef,
379 ) -> Result<Output> {
380 let expr = expr_helper::to_create_trigger_task_expr(stmt, &query_context)?;
381 self.create_trigger_inner(expr, query_context).await
382 }
383
384 #[cfg(feature = "enterprise")]
385 pub async fn create_trigger_inner(
386 &self,
387 expr: PbCreateTriggerExpr,
388 query_context: QueryContextRef,
389 ) -> Result<Output> {
390 self.create_trigger_procedure(expr, query_context).await?;
391 Ok(Output::new_with_affected_rows(0))
392 }
393
394 #[cfg(feature = "enterprise")]
395 async fn create_trigger_procedure(
396 &self,
397 expr: PbCreateTriggerExpr,
398 query_context: QueryContextRef,
399 ) -> Result<SubmitDdlTaskResponse> {
400 let task = CreateTriggerTask::try_from(PbCreateTriggerTask {
401 create_trigger: Some(expr),
402 })
403 .context(error::InvalidExprSnafu)?;
404
405 let request = SubmitDdlTaskRequest {
406 query_context,
407 task: DdlTask::new_create_trigger(task),
408 };
409
410 self.procedure_executor
411 .submit_ddl_task(&ExecutorContext::default(), request)
412 .await
413 .context(error::ExecuteDdlSnafu)
414 }
415
416 #[tracing::instrument(skip_all)]
417 pub async fn create_flow(
418 &self,
419 stmt: CreateFlow,
420 query_context: QueryContextRef,
421 ) -> Result<Output> {
422 let expr = expr_helper::to_create_flow_task_expr(stmt, &query_context)?;
424
425 self.create_flow_inner(expr, query_context).await
426 }
427
428 pub async fn create_flow_inner(
429 &self,
430 expr: CreateFlowExpr,
431 query_context: QueryContextRef,
432 ) -> Result<Output> {
433 self.create_flow_procedure(expr, query_context).await?;
434 Ok(Output::new_with_affected_rows(0))
435 }
436
437 async fn create_flow_procedure(
438 &self,
439 expr: CreateFlowExpr,
440 query_context: QueryContextRef,
441 ) -> Result<SubmitDdlTaskResponse> {
442 let flow_type = self
443 .determine_flow_type(&expr, query_context.clone())
444 .await?;
445 info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
446
447 let expr = {
448 let mut expr = expr;
449 expr.flow_options
450 .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
451 expr
452 };
453
454 let task = CreateFlowTask::try_from(PbCreateFlowTask {
455 create_flow: Some(expr),
456 })
457 .context(error::InvalidExprSnafu)?;
458 let request = SubmitDdlTaskRequest {
459 query_context,
460 task: DdlTask::new_create_flow(task),
461 };
462
463 self.procedure_executor
464 .submit_ddl_task(&ExecutorContext::default(), request)
465 .await
466 .context(error::ExecuteDdlSnafu)
467 }
468
469 async fn determine_flow_type(
473 &self,
474 expr: &CreateFlowExpr,
475 query_ctx: QueryContextRef,
476 ) -> Result<FlowType> {
477 for src_table_name in &expr.source_table_names {
479 let table = self
480 .catalog_manager()
481 .table(
482 &src_table_name.catalog_name,
483 &src_table_name.schema_name,
484 &src_table_name.table_name,
485 Some(&query_ctx),
486 )
487 .await
488 .map_err(BoxedError::new)
489 .context(ExternalSnafu)?
490 .with_context(|| TableNotFoundSnafu {
491 table_name: format_full_table_name(
492 &src_table_name.catalog_name,
493 &src_table_name.schema_name,
494 &src_table_name.table_name,
495 ),
496 })?;
497
498 if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
500 warn!(
501 "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
502 format_full_table_name(
503 &src_table_name.catalog_name,
504 &src_table_name.schema_name,
505 &src_table_name.table_name
506 ),
507 expr.flow_name
508 );
509 return Ok(FlowType::Streaming);
510 }
511 }
512
513 let engine = &self.query_engine;
514 let stmts = ParserContext::create_with_dialect(
515 &expr.sql,
516 query_ctx.sql_dialect(),
517 ParseOptions::default(),
518 )
519 .map_err(BoxedError::new)
520 .context(ExternalSnafu)?;
521
522 ensure!(
523 stmts.len() == 1,
524 InvalidSqlSnafu {
525 err_msg: format!("Expect only one statement, found {}", stmts.len())
526 }
527 );
528 let stmt = &stmts[0];
529
530 let plan = match stmt {
532 Statement::Tql(_) => return Ok(FlowType::Batching),
534 _ => engine
535 .planner()
536 .plan(&QueryStatement::Sql(stmt.clone()), query_ctx)
537 .await
538 .map_err(BoxedError::new)
539 .context(ExternalSnafu)?,
540 };
541
542 struct FindAggr {
544 is_aggr: bool,
545 }
546
547 impl TreeNodeVisitor<'_> for FindAggr {
548 type Node = LogicalPlan;
549 fn f_down(
550 &mut self,
551 node: &Self::Node,
552 ) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
553 {
554 match node {
555 LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
556 self.is_aggr = true;
557 return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
558 }
559 _ => (),
560 }
561 Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
562 }
563 }
564
565 let mut find_aggr = FindAggr { is_aggr: false };
566
567 plan.visit_with_subqueries(&mut find_aggr)
568 .context(BuildDfLogicalPlanSnafu)?;
569 if find_aggr.is_aggr {
570 Ok(FlowType::Batching)
571 } else {
572 Ok(FlowType::Streaming)
573 }
574 }
575
576 #[tracing::instrument(skip_all)]
577 pub async fn create_view(
578 &self,
579 create_view: CreateView,
580 ctx: QueryContextRef,
581 ) -> Result<TableRef> {
582 let logical_plan = match &*create_view.query {
584 Statement::Query(query) => {
585 self.plan(
586 &QueryStatement::Sql(Statement::Query(query.clone())),
587 ctx.clone(),
588 )
589 .await?
590 }
591 Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?,
592 _ => {
593 return InvalidViewStmtSnafu {}.fail();
594 }
595 };
596 let definition = create_view.to_string();
598
599 let schema: Schema = logical_plan
602 .schema()
603 .clone()
604 .try_into()
605 .context(ConvertSchemaSnafu)?;
606 let plan_columns: Vec<_> = schema
607 .column_schemas()
608 .iter()
609 .map(|c| c.name.clone())
610 .collect();
611
612 let columns: Vec<_> = create_view
613 .columns
614 .iter()
615 .map(|ident| ident.to_string())
616 .collect();
617
618 if !columns.is_empty() {
620 ensure!(
621 columns.len() == plan_columns.len(),
622 error::ViewColumnsMismatchSnafu {
623 view_name: create_view.name.to_string(),
624 expected: plan_columns.len(),
625 actual: columns.len(),
626 }
627 );
628 }
629
630 let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
633 .context(ExtractTableNamesSnafu)?;
634
635 let table_names = table_names.into_iter().map(|t| t.into()).collect();
636
637 let encoded_plan = DFLogicalSubstraitConvertor
644 .encode(&plan, DefaultSerializer)
645 .context(SubstraitCodecSnafu)?;
646
647 let expr = expr_helper::to_create_view_expr(
648 create_view,
649 encoded_plan.to_vec(),
650 table_names,
651 columns,
652 plan_columns,
653 definition,
654 ctx.clone(),
655 )?;
656
657 self.create_view_by_expr(expr, ctx).await
659 }
660
661 pub async fn create_view_by_expr(
662 &self,
663 expr: CreateViewExpr,
664 ctx: QueryContextRef,
665 ) -> Result<TableRef> {
666 ensure! {
667 !(expr.create_if_not_exists & expr.or_replace),
668 InvalidSqlSnafu {
669 err_msg: "syntax error Create Or Replace and If Not Exist cannot be used together",
670 }
671 };
672 let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
673
674 let schema_exists = self
675 .table_metadata_manager
676 .schema_manager()
677 .exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name))
678 .await
679 .context(TableMetadataManagerSnafu)?;
680
681 ensure!(
682 schema_exists,
683 SchemaNotFoundSnafu {
684 schema_info: &expr.schema_name,
685 }
686 );
687
688 if let Some(table) = self
690 .catalog_manager
691 .table(
692 &expr.catalog_name,
693 &expr.schema_name,
694 &expr.view_name,
695 Some(&ctx),
696 )
697 .await
698 .context(CatalogSnafu)?
699 {
700 let table_type = table.table_info().table_type;
701
702 match (table_type, expr.create_if_not_exists, expr.or_replace) {
703 (TableType::View, true, false) => {
704 return Ok(table);
705 }
706 (TableType::View, false, false) => {
707 return ViewAlreadyExistsSnafu {
708 name: format_full_table_name(
709 &expr.catalog_name,
710 &expr.schema_name,
711 &expr.view_name,
712 ),
713 }
714 .fail();
715 }
716 (TableType::View, _, true) => {
717 }
719 _ => {
720 return TableAlreadyExistsSnafu {
721 table: format_full_table_name(
722 &expr.catalog_name,
723 &expr.schema_name,
724 &expr.view_name,
725 ),
726 }
727 .fail();
728 }
729 }
730 }
731
732 ensure!(
733 NAME_PATTERN_REG.is_match(&expr.view_name),
734 InvalidViewNameSnafu {
735 name: expr.view_name.clone(),
736 }
737 );
738
739 let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
740
741 let mut view_info = RawTableInfo {
742 ident: metadata::TableIdent {
743 table_id: 0,
745 version: 0,
746 },
747 name: expr.view_name.clone(),
748 desc: None,
749 catalog_name: expr.catalog_name.clone(),
750 schema_name: expr.schema_name.clone(),
751 meta: RawTableMeta::default(),
753 table_type: TableType::View,
754 };
755
756 let request = SubmitDdlTaskRequest {
757 query_context: ctx,
758 task: DdlTask::new_create_view(expr, view_info.clone()),
759 };
760
761 let resp = self
762 .procedure_executor
763 .submit_ddl_task(&ExecutorContext::default(), request)
764 .await
765 .context(error::ExecuteDdlSnafu)?;
766
767 debug!(
768 "Submit creating view '{view_name}' task response: {:?}",
769 resp
770 );
771
772 let view_id = resp
773 .table_ids
774 .into_iter()
775 .next()
776 .context(error::UnexpectedSnafu {
777 violated: "expected table_id",
778 })?;
779 info!("Successfully created view '{view_name}' with view id {view_id}");
780
781 view_info.ident.table_id = view_id;
782
783 let view_info = Arc::new(view_info.try_into().context(CreateTableInfoSnafu)?);
784
785 let table = DistTable::table(view_info);
786
787 self.cache_invalidator
789 .invalidate(
790 &Context::default(),
791 &[
792 CacheIdent::TableId(view_id),
793 CacheIdent::TableName(view_name.clone()),
794 ],
795 )
796 .await
797 .context(error::InvalidateTableCacheSnafu)?;
798
799 Ok(table)
800 }
801
802 #[tracing::instrument(skip_all)]
803 pub async fn drop_flow(
804 &self,
805 catalog_name: String,
806 flow_name: String,
807 drop_if_exists: bool,
808 query_context: QueryContextRef,
809 ) -> Result<Output> {
810 if let Some(flow) = self
811 .flow_metadata_manager
812 .flow_name_manager()
813 .get(&catalog_name, &flow_name)
814 .await
815 .context(error::TableMetadataManagerSnafu)?
816 {
817 let flow_id = flow.flow_id();
818 let task = DropFlowTask {
819 catalog_name,
820 flow_name,
821 flow_id,
822 drop_if_exists,
823 };
824 self.drop_flow_procedure(task, query_context).await?;
825
826 Ok(Output::new_with_affected_rows(0))
827 } else if drop_if_exists {
828 Ok(Output::new_with_affected_rows(0))
829 } else {
830 FlowNotFoundSnafu {
831 flow_name: format_full_flow_name(&catalog_name, &flow_name),
832 }
833 .fail()
834 }
835 }
836
837 async fn drop_flow_procedure(
838 &self,
839 expr: DropFlowTask,
840 query_context: QueryContextRef,
841 ) -> Result<SubmitDdlTaskResponse> {
842 let request = SubmitDdlTaskRequest {
843 query_context,
844 task: DdlTask::new_drop_flow(expr),
845 };
846
847 self.procedure_executor
848 .submit_ddl_task(&ExecutorContext::default(), request)
849 .await
850 .context(error::ExecuteDdlSnafu)
851 }
852
853 #[cfg(feature = "enterprise")]
854 #[tracing::instrument(skip_all)]
855 pub(super) async fn drop_trigger(
856 &self,
857 catalog_name: String,
858 trigger_name: String,
859 drop_if_exists: bool,
860 query_context: QueryContextRef,
861 ) -> Result<Output> {
862 let task = DropTriggerTask {
863 catalog_name,
864 trigger_name,
865 drop_if_exists,
866 };
867 self.drop_trigger_procedure(task, query_context).await?;
868 Ok(Output::new_with_affected_rows(0))
869 }
870
871 #[cfg(feature = "enterprise")]
872 async fn drop_trigger_procedure(
873 &self,
874 expr: DropTriggerTask,
875 query_context: QueryContextRef,
876 ) -> Result<SubmitDdlTaskResponse> {
877 let request = SubmitDdlTaskRequest {
878 query_context,
879 task: DdlTask::new_drop_trigger(expr),
880 };
881
882 self.procedure_executor
883 .submit_ddl_task(&ExecutorContext::default(), request)
884 .await
885 .context(error::ExecuteDdlSnafu)
886 }
887
888 #[tracing::instrument(skip_all)]
890 pub(crate) async fn drop_view(
891 &self,
892 catalog: String,
893 schema: String,
894 view: String,
895 drop_if_exists: bool,
896 query_context: QueryContextRef,
897 ) -> Result<Output> {
898 let view_info = if let Some(view) = self
899 .catalog_manager
900 .table(&catalog, &schema, &view, None)
901 .await
902 .context(CatalogSnafu)?
903 {
904 view.table_info()
905 } else if drop_if_exists {
906 return Ok(Output::new_with_affected_rows(0));
908 } else {
909 return TableNotFoundSnafu {
910 table_name: format_full_table_name(&catalog, &schema, &view),
911 }
912 .fail();
913 };
914
915 ensure!(
917 view_info.table_type == TableType::View,
918 error::InvalidViewSnafu {
919 msg: "not a view",
920 view_name: format_full_table_name(&catalog, &schema, &view),
921 }
922 );
923
924 let view_id = view_info.table_id();
925
926 let task = DropViewTask {
927 catalog,
928 schema,
929 view,
930 view_id,
931 drop_if_exists,
932 };
933
934 self.drop_view_procedure(task, query_context).await?;
935
936 Ok(Output::new_with_affected_rows(0))
937 }
938
939 async fn drop_view_procedure(
941 &self,
942 expr: DropViewTask,
943 query_context: QueryContextRef,
944 ) -> Result<SubmitDdlTaskResponse> {
945 let request = SubmitDdlTaskRequest {
946 query_context,
947 task: DdlTask::new_drop_view(expr),
948 };
949
950 self.procedure_executor
951 .submit_ddl_task(&ExecutorContext::default(), request)
952 .await
953 .context(error::ExecuteDdlSnafu)
954 }
955
956 #[tracing::instrument(skip_all)]
957 pub async fn alter_logical_tables(
958 &self,
959 alter_table_exprs: Vec<AlterTableExpr>,
960 query_context: QueryContextRef,
961 ) -> Result<Output> {
962 let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
963 ensure!(
964 !alter_table_exprs.is_empty(),
965 EmptyDdlExprSnafu {
966 name: "alter logical tables"
967 }
968 );
969
970 let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
972 for expr in alter_table_exprs {
973 let catalog = if expr.catalog_name.is_empty() {
975 query_context.current_catalog()
976 } else {
977 &expr.catalog_name
978 };
979 let schema = if expr.schema_name.is_empty() {
980 query_context.current_schema()
981 } else {
982 expr.schema_name.to_string()
983 };
984 let table_name = &expr.table_name;
985 let table = self
986 .catalog_manager
987 .table(catalog, &schema, table_name, Some(&query_context))
988 .await
989 .context(CatalogSnafu)?
990 .with_context(|| TableNotFoundSnafu {
991 table_name: format_full_table_name(catalog, &schema, table_name),
992 })?;
993 let table_id = table.table_info().ident.table_id;
994 let physical_table_id = self
995 .table_metadata_manager
996 .table_route_manager()
997 .get_physical_table_id(table_id)
998 .await
999 .context(TableMetadataManagerSnafu)?;
1000 groups.entry(physical_table_id).or_default().push(expr);
1001 }
1002
1003 let mut handles = Vec::with_capacity(groups.len());
1005 for (_physical_table_id, exprs) in groups {
1006 let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
1007 handles.push(fut);
1008 }
1009 let _results = futures::future::try_join_all(handles).await?;
1010
1011 Ok(Output::new_with_affected_rows(0))
1012 }
1013
1014 #[tracing::instrument(skip_all)]
1015 pub async fn drop_table(
1016 &self,
1017 table_name: TableName,
1018 drop_if_exists: bool,
1019 query_context: QueryContextRef,
1020 ) -> Result<Output> {
1021 self.drop_tables(&[table_name], drop_if_exists, query_context)
1023 .await
1024 }
1025
1026 #[tracing::instrument(skip_all)]
1027 pub async fn drop_tables(
1028 &self,
1029 table_names: &[TableName],
1030 drop_if_exists: bool,
1031 query_context: QueryContextRef,
1032 ) -> Result<Output> {
1033 let mut tables = Vec::with_capacity(table_names.len());
1034 for table_name in table_names {
1035 ensure!(
1036 !is_readonly_schema(&table_name.schema_name),
1037 SchemaReadOnlySnafu {
1038 name: table_name.schema_name.clone()
1039 }
1040 );
1041
1042 if let Some(table) = self
1043 .catalog_manager
1044 .table(
1045 &table_name.catalog_name,
1046 &table_name.schema_name,
1047 &table_name.table_name,
1048 Some(&query_context),
1049 )
1050 .await
1051 .context(CatalogSnafu)?
1052 {
1053 tables.push(table.table_info().table_id());
1054 } else if drop_if_exists {
1055 continue;
1057 } else {
1058 return TableNotFoundSnafu {
1059 table_name: table_name.to_string(),
1060 }
1061 .fail();
1062 }
1063 }
1064
1065 for (table_name, table_id) in table_names.iter().zip(tables.into_iter()) {
1066 self.drop_table_procedure(table_name, table_id, drop_if_exists, query_context.clone())
1067 .await?;
1068
1069 self.cache_invalidator
1071 .invalidate(
1072 &Context::default(),
1073 &[
1074 CacheIdent::TableId(table_id),
1075 CacheIdent::TableName(table_name.clone()),
1076 ],
1077 )
1078 .await
1079 .context(error::InvalidateTableCacheSnafu)?;
1080 }
1081 Ok(Output::new_with_affected_rows(0))
1082 }
1083
1084 #[tracing::instrument(skip_all)]
1085 pub async fn drop_database(
1086 &self,
1087 catalog: String,
1088 schema: String,
1089 drop_if_exists: bool,
1090 query_context: QueryContextRef,
1091 ) -> Result<Output> {
1092 ensure!(
1093 !is_readonly_schema(&schema),
1094 SchemaReadOnlySnafu { name: schema }
1095 );
1096
1097 if self
1098 .catalog_manager
1099 .schema_exists(&catalog, &schema, None)
1100 .await
1101 .context(CatalogSnafu)?
1102 {
1103 if schema == query_context.current_schema() {
1104 SchemaInUseSnafu { name: schema }.fail()
1105 } else {
1106 self.drop_database_procedure(catalog, schema, drop_if_exists, query_context)
1107 .await?;
1108
1109 Ok(Output::new_with_affected_rows(0))
1110 }
1111 } else if drop_if_exists {
1112 Ok(Output::new_with_affected_rows(0))
1114 } else {
1115 SchemaNotFoundSnafu {
1116 schema_info: schema,
1117 }
1118 .fail()
1119 }
1120 }
1121
1122 #[tracing::instrument(skip_all)]
1123 pub async fn truncate_table(
1124 &self,
1125 table_name: TableName,
1126 query_context: QueryContextRef,
1127 ) -> Result<Output> {
1128 ensure!(
1129 !is_readonly_schema(&table_name.schema_name),
1130 SchemaReadOnlySnafu {
1131 name: table_name.schema_name.clone()
1132 }
1133 );
1134
1135 let table = self
1136 .catalog_manager
1137 .table(
1138 &table_name.catalog_name,
1139 &table_name.schema_name,
1140 &table_name.table_name,
1141 Some(&query_context),
1142 )
1143 .await
1144 .context(CatalogSnafu)?
1145 .with_context(|| TableNotFoundSnafu {
1146 table_name: table_name.to_string(),
1147 })?;
1148 let table_id = table.table_info().table_id();
1149 self.truncate_table_procedure(&table_name, table_id, query_context)
1150 .await?;
1151
1152 Ok(Output::new_with_affected_rows(0))
1153 }
1154
1155 #[tracing::instrument(skip_all)]
1156 pub async fn alter_table(
1157 &self,
1158 alter_table: AlterTable,
1159 query_context: QueryContextRef,
1160 ) -> Result<Output> {
1161 let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
1162 self.alter_table_inner(expr, query_context).await
1163 }
1164
1165 #[tracing::instrument(skip_all)]
1166 pub async fn alter_table_inner(
1167 &self,
1168 expr: AlterTableExpr,
1169 query_context: QueryContextRef,
1170 ) -> Result<Output> {
1171 ensure!(
1172 !is_readonly_schema(&expr.schema_name),
1173 SchemaReadOnlySnafu {
1174 name: expr.schema_name.clone()
1175 }
1176 );
1177
1178 let catalog_name = if expr.catalog_name.is_empty() {
1179 DEFAULT_CATALOG_NAME.to_string()
1180 } else {
1181 expr.catalog_name.clone()
1182 };
1183
1184 let schema_name = if expr.schema_name.is_empty() {
1185 DEFAULT_SCHEMA_NAME.to_string()
1186 } else {
1187 expr.schema_name.clone()
1188 };
1189
1190 let table_name = expr.table_name.clone();
1191
1192 let table = self
1193 .catalog_manager
1194 .table(
1195 &catalog_name,
1196 &schema_name,
1197 &table_name,
1198 Some(&query_context),
1199 )
1200 .await
1201 .context(CatalogSnafu)?
1202 .with_context(|| TableNotFoundSnafu {
1203 table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
1204 })?;
1205
1206 let table_id = table.table_info().ident.table_id;
1207 let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?;
1208 if !need_alter {
1209 return Ok(Output::new_with_affected_rows(0));
1210 }
1211 info!(
1212 "Table info before alter is {:?}, expr: {:?}",
1213 table.table_info(),
1214 expr
1215 );
1216
1217 let physical_table_id = self
1218 .table_metadata_manager
1219 .table_route_manager()
1220 .get_physical_table_id(table_id)
1221 .await
1222 .context(TableMetadataManagerSnafu)?;
1223
1224 let (req, invalidate_keys) = if physical_table_id == table_id {
1225 let req = SubmitDdlTaskRequest {
1227 query_context,
1228 task: DdlTask::new_alter_table(expr),
1229 };
1230
1231 let invalidate_keys = vec![
1232 CacheIdent::TableId(table_id),
1233 CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1234 ];
1235
1236 (req, invalidate_keys)
1237 } else {
1238 let req = SubmitDdlTaskRequest {
1240 query_context,
1241 task: DdlTask::new_alter_logical_tables(vec![expr]),
1242 };
1243
1244 let mut invalidate_keys = vec![
1245 CacheIdent::TableId(physical_table_id),
1246 CacheIdent::TableId(table_id),
1247 CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1248 ];
1249
1250 let physical_table = self
1251 .table_metadata_manager
1252 .table_info_manager()
1253 .get(physical_table_id)
1254 .await
1255 .context(TableMetadataManagerSnafu)?
1256 .map(|x| x.into_inner());
1257 if let Some(physical_table) = physical_table {
1258 let physical_table_name = TableName::new(
1259 physical_table.table_info.catalog_name,
1260 physical_table.table_info.schema_name,
1261 physical_table.table_info.name,
1262 );
1263 invalidate_keys.push(CacheIdent::TableName(physical_table_name));
1264 }
1265
1266 (req, invalidate_keys)
1267 };
1268
1269 self.procedure_executor
1270 .submit_ddl_task(&ExecutorContext::default(), req)
1271 .await
1272 .context(error::ExecuteDdlSnafu)?;
1273
1274 self.cache_invalidator
1276 .invalidate(&Context::default(), &invalidate_keys)
1277 .await
1278 .context(error::InvalidateTableCacheSnafu)?;
1279
1280 Ok(Output::new_with_affected_rows(0))
1281 }
1282
1283 #[tracing::instrument(skip_all)]
1284 pub async fn alter_database(
1285 &self,
1286 alter_expr: AlterDatabase,
1287 query_context: QueryContextRef,
1288 ) -> Result<Output> {
1289 let alter_expr = expr_helper::to_alter_database_expr(alter_expr, &query_context)?;
1290 self.alter_database_inner(alter_expr, query_context).await
1291 }
1292
1293 #[tracing::instrument(skip_all)]
1294 pub async fn alter_database_inner(
1295 &self,
1296 alter_expr: AlterDatabaseExpr,
1297 query_context: QueryContextRef,
1298 ) -> Result<Output> {
1299 ensure!(
1300 !is_readonly_schema(&alter_expr.schema_name),
1301 SchemaReadOnlySnafu {
1302 name: query_context.current_schema().clone()
1303 }
1304 );
1305
1306 let exists = self
1307 .catalog_manager
1308 .schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
1309 .await
1310 .context(CatalogSnafu)?;
1311 ensure!(
1312 exists,
1313 SchemaNotFoundSnafu {
1314 schema_info: alter_expr.schema_name,
1315 }
1316 );
1317
1318 let cache_ident = [CacheIdent::SchemaName(SchemaName {
1319 catalog_name: alter_expr.catalog_name.clone(),
1320 schema_name: alter_expr.schema_name.clone(),
1321 })];
1322
1323 self.alter_database_procedure(alter_expr, query_context)
1324 .await?;
1325
1326 self.cache_invalidator
1328 .invalidate(&Context::default(), &cache_ident)
1329 .await
1330 .context(error::InvalidateTableCacheSnafu)?;
1331
1332 Ok(Output::new_with_affected_rows(0))
1333 }
1334
1335 async fn create_table_procedure(
1336 &self,
1337 create_table: CreateTableExpr,
1338 partitions: Vec<Partition>,
1339 table_info: RawTableInfo,
1340 query_context: QueryContextRef,
1341 ) -> Result<SubmitDdlTaskResponse> {
1342 let partitions = partitions.into_iter().map(Into::into).collect();
1343
1344 let request = SubmitDdlTaskRequest {
1345 query_context,
1346 task: DdlTask::new_create_table(create_table, partitions, table_info),
1347 };
1348
1349 self.procedure_executor
1350 .submit_ddl_task(&ExecutorContext::default(), request)
1351 .await
1352 .context(error::ExecuteDdlSnafu)
1353 }
1354
1355 async fn create_logical_tables_procedure(
1356 &self,
1357 tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
1358 query_context: QueryContextRef,
1359 ) -> Result<SubmitDdlTaskResponse> {
1360 let request = SubmitDdlTaskRequest {
1361 query_context,
1362 task: DdlTask::new_create_logical_tables(tables_data),
1363 };
1364
1365 self.procedure_executor
1366 .submit_ddl_task(&ExecutorContext::default(), request)
1367 .await
1368 .context(error::ExecuteDdlSnafu)
1369 }
1370
1371 async fn alter_logical_tables_procedure(
1372 &self,
1373 tables_data: Vec<AlterTableExpr>,
1374 query_context: QueryContextRef,
1375 ) -> Result<SubmitDdlTaskResponse> {
1376 let request = SubmitDdlTaskRequest {
1377 query_context,
1378 task: DdlTask::new_alter_logical_tables(tables_data),
1379 };
1380
1381 self.procedure_executor
1382 .submit_ddl_task(&ExecutorContext::default(), request)
1383 .await
1384 .context(error::ExecuteDdlSnafu)
1385 }
1386
1387 async fn drop_table_procedure(
1388 &self,
1389 table_name: &TableName,
1390 table_id: TableId,
1391 drop_if_exists: bool,
1392 query_context: QueryContextRef,
1393 ) -> Result<SubmitDdlTaskResponse> {
1394 let request = SubmitDdlTaskRequest {
1395 query_context,
1396 task: DdlTask::new_drop_table(
1397 table_name.catalog_name.to_string(),
1398 table_name.schema_name.to_string(),
1399 table_name.table_name.to_string(),
1400 table_id,
1401 drop_if_exists,
1402 ),
1403 };
1404
1405 self.procedure_executor
1406 .submit_ddl_task(&ExecutorContext::default(), request)
1407 .await
1408 .context(error::ExecuteDdlSnafu)
1409 }
1410
1411 async fn drop_database_procedure(
1412 &self,
1413 catalog: String,
1414 schema: String,
1415 drop_if_exists: bool,
1416 query_context: QueryContextRef,
1417 ) -> Result<SubmitDdlTaskResponse> {
1418 let request = SubmitDdlTaskRequest {
1419 query_context,
1420 task: DdlTask::new_drop_database(catalog, schema, drop_if_exists),
1421 };
1422
1423 self.procedure_executor
1424 .submit_ddl_task(&ExecutorContext::default(), request)
1425 .await
1426 .context(error::ExecuteDdlSnafu)
1427 }
1428
1429 async fn alter_database_procedure(
1430 &self,
1431 alter_expr: AlterDatabaseExpr,
1432 query_context: QueryContextRef,
1433 ) -> Result<SubmitDdlTaskResponse> {
1434 let request = SubmitDdlTaskRequest {
1435 query_context,
1436 task: DdlTask::new_alter_database(alter_expr),
1437 };
1438
1439 self.procedure_executor
1440 .submit_ddl_task(&ExecutorContext::default(), request)
1441 .await
1442 .context(error::ExecuteDdlSnafu)
1443 }
1444
1445 async fn truncate_table_procedure(
1446 &self,
1447 table_name: &TableName,
1448 table_id: TableId,
1449 query_context: QueryContextRef,
1450 ) -> Result<SubmitDdlTaskResponse> {
1451 let request = SubmitDdlTaskRequest {
1452 query_context,
1453 task: DdlTask::new_truncate_table(
1454 table_name.catalog_name.to_string(),
1455 table_name.schema_name.to_string(),
1456 table_name.table_name.to_string(),
1457 table_id,
1458 ),
1459 };
1460
1461 self.procedure_executor
1462 .submit_ddl_task(&ExecutorContext::default(), request)
1463 .await
1464 .context(error::ExecuteDdlSnafu)
1465 }
1466
1467 #[tracing::instrument(skip_all)]
1468 pub async fn create_database(
1469 &self,
1470 database: &str,
1471 create_if_not_exists: bool,
1472 options: HashMap<String, String>,
1473 query_context: QueryContextRef,
1474 ) -> Result<Output> {
1475 let catalog = query_context.current_catalog();
1476 ensure!(
1477 NAME_PATTERN_REG.is_match(catalog),
1478 error::UnexpectedSnafu {
1479 violated: format!("Invalid catalog name: {}", catalog)
1480 }
1481 );
1482
1483 ensure!(
1484 NAME_PATTERN_REG.is_match(database),
1485 error::UnexpectedSnafu {
1486 violated: format!("Invalid database name: {}", database)
1487 }
1488 );
1489
1490 if !self
1491 .catalog_manager
1492 .schema_exists(catalog, database, None)
1493 .await
1494 .context(CatalogSnafu)?
1495 && !self.catalog_manager.is_reserved_schema_name(database)
1496 {
1497 self.create_database_procedure(
1498 catalog.to_string(),
1499 database.to_string(),
1500 create_if_not_exists,
1501 options,
1502 query_context,
1503 )
1504 .await?;
1505
1506 Ok(Output::new_with_affected_rows(1))
1507 } else if create_if_not_exists {
1508 Ok(Output::new_with_affected_rows(1))
1509 } else {
1510 error::SchemaExistsSnafu { name: database }.fail()
1511 }
1512 }
1513
1514 async fn create_database_procedure(
1515 &self,
1516 catalog: String,
1517 database: String,
1518 create_if_not_exists: bool,
1519 options: HashMap<String, String>,
1520 query_context: QueryContextRef,
1521 ) -> Result<SubmitDdlTaskResponse> {
1522 let request = SubmitDdlTaskRequest {
1523 query_context,
1524 task: DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
1525 };
1526
1527 self.procedure_executor
1528 .submit_ddl_task(&ExecutorContext::default(), request)
1529 .await
1530 .context(error::ExecuteDdlSnafu)
1531 }
1532}
1533
1534pub fn parse_partitions(
1536 create_table: &CreateTableExpr,
1537 partitions: Option<Partitions>,
1538 query_ctx: &QueryContextRef,
1539) -> Result<(Vec<MetaPartition>, Vec<String>)> {
1540 let partition_columns = find_partition_columns(&partitions)?;
1543 let partition_entries =
1544 find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
1545
1546 let mut exprs = vec![];
1548 for partition in &partition_entries {
1549 for bound in partition {
1550 if let PartitionBound::Expr(expr) = bound {
1551 exprs.push(expr.clone());
1552 }
1553 }
1554 }
1555 MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true)
1556 .context(InvalidPartitionSnafu)?;
1557
1558 Ok((
1559 partition_entries
1560 .into_iter()
1561 .map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x)))
1562 .collect::<std::result::Result<_, _>>()
1563 .context(DeserializePartitionSnafu)?,
1564 partition_columns,
1565 ))
1566}
1567
1568pub fn verify_alter(
1574 table_id: TableId,
1575 table_info: Arc<TableInfo>,
1576 expr: AlterTableExpr,
1577) -> Result<bool> {
1578 let request: AlterTableRequest =
1579 common_grpc_expr::alter_expr_to_request(table_id, expr).context(AlterExprToRequestSnafu)?;
1580
1581 let AlterTableRequest {
1582 table_name,
1583 alter_kind,
1584 ..
1585 } = &request;
1586
1587 if let AlterKind::RenameTable { new_table_name } = alter_kind {
1588 ensure!(
1589 NAME_PATTERN_REG.is_match(new_table_name),
1590 error::UnexpectedSnafu {
1591 violated: format!("Invalid table name: {}", new_table_name)
1592 }
1593 );
1594 } else if let AlterKind::AddColumns { columns } = alter_kind {
1595 let column_names: HashSet<_> = table_info
1598 .meta
1599 .schema
1600 .column_schemas()
1601 .iter()
1602 .map(|schema| &schema.name)
1603 .collect();
1604 if columns.iter().all(|column| {
1605 column_names.contains(&column.column_schema.name) && column.add_if_not_exists
1606 }) {
1607 return Ok(false);
1608 }
1609 }
1610
1611 let _ = table_info
1612 .meta
1613 .builder_with_alter_kind(table_name, &request.alter_kind)
1614 .context(error::TableSnafu)?
1615 .build()
1616 .context(error::BuildTableMetaSnafu { table_name })?;
1617
1618 Ok(true)
1619}
1620
1621pub fn create_table_info(
1622 create_table: &CreateTableExpr,
1623 partition_columns: Vec<String>,
1624) -> Result<RawTableInfo> {
1625 let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
1626 let mut column_name_to_index_map = HashMap::new();
1627
1628 for (idx, column) in create_table.column_defs.iter().enumerate() {
1629 let schema =
1630 column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
1631 column: &column.name,
1632 })?;
1633 let schema = schema.with_time_index(column.name == create_table.time_index);
1634
1635 column_schemas.push(schema);
1636 let _ = column_name_to_index_map.insert(column.name.clone(), idx);
1637 }
1638
1639 let timestamp_index = column_name_to_index_map
1640 .get(&create_table.time_index)
1641 .cloned();
1642
1643 let raw_schema = RawSchema {
1644 column_schemas: column_schemas.clone(),
1645 timestamp_index,
1646 version: 0,
1647 };
1648
1649 let primary_key_indices = create_table
1650 .primary_keys
1651 .iter()
1652 .map(|name| {
1653 column_name_to_index_map
1654 .get(name)
1655 .cloned()
1656 .context(ColumnNotFoundSnafu { msg: name })
1657 })
1658 .collect::<Result<Vec<_>>>()?;
1659
1660 let partition_key_indices = partition_columns
1661 .into_iter()
1662 .map(|col_name| {
1663 column_name_to_index_map
1664 .get(&col_name)
1665 .cloned()
1666 .context(ColumnNotFoundSnafu { msg: col_name })
1667 })
1668 .collect::<Result<Vec<_>>>()?;
1669
1670 let table_options = TableOptions::try_from_iter(&create_table.table_options)
1671 .context(UnrecognizedTableOptionSnafu)?;
1672
1673 let meta = RawTableMeta {
1674 schema: raw_schema,
1675 primary_key_indices,
1676 value_indices: vec![],
1677 engine: create_table.engine.clone(),
1678 next_column_id: column_schemas.len() as u32,
1679 region_numbers: vec![],
1680 options: table_options,
1681 created_on: Utc::now(),
1682 partition_key_indices,
1683 column_ids: vec![],
1684 };
1685
1686 let desc = if create_table.desc.is_empty() {
1687 create_table.table_options.get(COMMENT_KEY).cloned()
1688 } else {
1689 Some(create_table.desc.clone())
1690 };
1691
1692 let table_info = RawTableInfo {
1693 ident: metadata::TableIdent {
1694 table_id: 0,
1696 version: 0,
1697 },
1698 name: create_table.table_name.clone(),
1699 desc,
1700 catalog_name: create_table.catalog_name.clone(),
1701 schema_name: create_table.schema_name.clone(),
1702 meta,
1703 table_type: TableType::Base,
1704 };
1705 Ok(table_info)
1706}
1707
1708fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
1709 let columns = if let Some(partitions) = partitions {
1710 partitions
1711 .column_list
1712 .iter()
1713 .map(|x| x.value.clone())
1714 .collect::<Vec<_>>()
1715 } else {
1716 vec![]
1717 };
1718 Ok(columns)
1719}
1720
1721fn find_partition_entries(
1725 create_table: &CreateTableExpr,
1726 partitions: &Option<Partitions>,
1727 partition_columns: &[String],
1728 query_ctx: &QueryContextRef,
1729) -> Result<Vec<Vec<PartitionBound>>> {
1730 let entries = if let Some(partitions) = partitions {
1731 let column_defs = partition_columns
1733 .iter()
1734 .map(|pc| {
1735 create_table
1736 .column_defs
1737 .iter()
1738 .find(|c| &c.name == pc)
1739 .unwrap()
1741 })
1742 .collect::<Vec<_>>();
1743 let mut column_name_and_type = HashMap::with_capacity(column_defs.len());
1744 for column in column_defs {
1745 let column_name = &column.name;
1746 let data_type = ConcreteDataType::from(
1747 ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension)
1748 .context(ColumnDataTypeSnafu)?,
1749 );
1750 column_name_and_type.insert(column_name, data_type);
1751 }
1752
1753 let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
1755 for partition in &partitions.exprs {
1756 let partition_expr =
1757 convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
1758 partition_exprs.push(vec![PartitionBound::Expr(partition_expr)]);
1759 }
1760
1761 if partition_exprs.is_empty() {
1763 partition_exprs.push(vec![PartitionBound::MaxValue]);
1764 }
1765
1766 partition_exprs
1767 } else {
1768 vec![vec![PartitionBound::MaxValue]]
1769 };
1770 Ok(entries)
1771}
1772
1773fn convert_one_expr(
1774 expr: &Expr,
1775 column_name_and_type: &HashMap<&String, ConcreteDataType>,
1776 timezone: &Timezone,
1777) -> Result<PartitionExpr> {
1778 let Expr::BinaryOp { left, op, right } = expr else {
1779 return InvalidPartitionRuleSnafu {
1780 reason: "partition rule must be a binary expression",
1781 }
1782 .fail();
1783 };
1784
1785 let op =
1786 RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
1787 reason: format!("unsupported operator in partition expr {op}"),
1788 })?;
1789
1790 let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
1792 (Expr::Identifier(ident), Expr::Value(value)) => {
1794 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1795 let value = convert_value(value, data_type, timezone, None)?;
1796 (Operand::Column(column_name), op, Operand::Value(value))
1797 }
1798 (Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
1799 if let Expr::Value(v) = &**expr =>
1800 {
1801 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1802 let value = convert_value(v, data_type, timezone, Some(*unary_op))?;
1803 (Operand::Column(column_name), op, Operand::Value(value))
1804 }
1805 (Expr::Value(value), Expr::Identifier(ident)) => {
1807 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1808 let value = convert_value(value, data_type, timezone, None)?;
1809 (Operand::Value(value), op, Operand::Column(column_name))
1810 }
1811 (Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
1812 if let Expr::Value(v) = &**expr =>
1813 {
1814 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1815 let value = convert_value(v, data_type, timezone, Some(*unary_op))?;
1816 (Operand::Value(value), op, Operand::Column(column_name))
1817 }
1818 (Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
1819 let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
1821 let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
1822 (Operand::Expr(lhs), op, Operand::Expr(rhs))
1823 }
1824 _ => {
1825 return InvalidPartitionRuleSnafu {
1826 reason: format!("invalid partition expr {expr}"),
1827 }
1828 .fail();
1829 }
1830 };
1831
1832 Ok(PartitionExpr::new(lhs, op, rhs))
1833}
1834
1835fn convert_identifier(
1836 ident: &Ident,
1837 column_name_and_type: &HashMap<&String, ConcreteDataType>,
1838) -> Result<(String, ConcreteDataType)> {
1839 let column_name = ident.value.clone();
1840 let data_type = column_name_and_type
1841 .get(&column_name)
1842 .cloned()
1843 .with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
1844 Ok((column_name, data_type))
1845}
1846
1847fn convert_value(
1848 value: &ParserValue,
1849 data_type: ConcreteDataType,
1850 timezone: &Timezone,
1851 unary_op: Option<UnaryOperator>,
1852) -> Result<Value> {
1853 sql_value_to_value(
1854 "<NONAME>",
1855 &data_type,
1856 value,
1857 Some(timezone),
1858 unary_op,
1859 false,
1860 )
1861 .context(error::SqlCommonSnafu)
1862}
1863
1864#[cfg(test)]
1865mod test {
1866 use session::context::{QueryContext, QueryContextBuilder};
1867 use sql::dialect::GreptimeDbDialect;
1868 use sql::parser::{ParseOptions, ParserContext};
1869 use sql::statements::statement::Statement;
1870
1871 use super::*;
1872 use crate::expr_helper;
1873
1874 #[test]
1875 fn test_name_is_match() {
1876 assert!(!NAME_PATTERN_REG.is_match("/adaf"));
1877 assert!(!NAME_PATTERN_REG.is_match("🈲"));
1878 assert!(NAME_PATTERN_REG.is_match("hello"));
1879 assert!(NAME_PATTERN_REG.is_match("test@"));
1880 assert!(!NAME_PATTERN_REG.is_match("@test"));
1881 assert!(NAME_PATTERN_REG.is_match("test#"));
1882 assert!(!NAME_PATTERN_REG.is_match("#test"));
1883 assert!(!NAME_PATTERN_REG.is_match("@"));
1884 assert!(!NAME_PATTERN_REG.is_match("#"));
1885 }
1886
1887 #[tokio::test]
1888 #[ignore = "TODO(ruihang): WIP new partition rule"]
1889 async fn test_parse_partitions() {
1890 common_telemetry::init_default_ut_logging();
1891 let cases = [
1892 (
1893 r"
1894CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
1895PARTITION ON COLUMNS (b) (
1896 b < 'hz',
1897 b >= 'hz' AND b < 'sh',
1898 b >= 'sh'
1899)
1900ENGINE=mito",
1901 r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
1902 ),
1903 (
1904 r"
1905CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
1906PARTITION BY RANGE COLUMNS (b, a) (
1907 PARTITION r0 VALUES LESS THAN ('hz', 10),
1908 b < 'hz' AND a < 10,
1909 b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20,
1910 b >= 'sh' AND a >= 20
1911)
1912ENGINE=mito",
1913 r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
1914 ),
1915 ];
1916 let ctx = QueryContextBuilder::default().build().into();
1917 for (sql, expected) in cases {
1918 let result = ParserContext::create_with_dialect(
1919 sql,
1920 &GreptimeDbDialect {},
1921 ParseOptions::default(),
1922 )
1923 .unwrap();
1924 match &result[0] {
1925 Statement::CreateTable(c) => {
1926 let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
1927 let (partitions, _) =
1928 parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
1929 let json = serde_json::to_string(&partitions).unwrap();
1930 assert_eq!(json, expected);
1931 }
1932 _ => unreachable!(),
1933 }
1934 }
1935 }
1936}