1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use api::helper::ColumnDataTypeWrapper;
19use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
20use api::v1::{
21 column_def, AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
22};
23#[cfg(feature = "enterprise")]
24use api::v1::{
25 meta::CreateTriggerTask as PbCreateTriggerTask, CreateTriggerExpr as PbCreateTriggerExpr,
26};
27use catalog::CatalogManagerRef;
28use chrono::Utc;
29use common_catalog::consts::{is_readonly_schema, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
30use common_catalog::{format_full_flow_name, format_full_table_name};
31use common_error::ext::BoxedError;
32use common_meta::cache_invalidator::Context;
33use common_meta::ddl::create_flow::FlowType;
34use common_meta::ddl::ExecutorContext;
35use common_meta::instruction::CacheIdent;
36use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
37use common_meta::key::NAME_PATTERN;
38#[cfg(feature = "enterprise")]
39use common_meta::rpc::ddl::trigger::CreateTriggerTask;
40use common_meta::rpc::ddl::{
41 CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
42 SubmitDdlTaskResponse,
43};
44use common_meta::rpc::router::{Partition, Partition as MetaPartition};
45use common_query::Output;
46use common_telemetry::{debug, info, tracing, warn};
47use common_time::Timezone;
48use datafusion_common::tree_node::TreeNodeVisitor;
49use datafusion_expr::LogicalPlan;
50use datatypes::prelude::ConcreteDataType;
51use datatypes::schema::{RawSchema, Schema};
52use datatypes::value::Value;
53use lazy_static::lazy_static;
54use partition::expr::{Operand, PartitionExpr, RestrictedOp};
55use partition::multi_dim::MultiDimPartitionRule;
56use partition::partition::{PartitionBound, PartitionDef};
57use query::parser::QueryStatement;
58use query::plan::extract_and_rewrite_full_table_names;
59use query::query_engine::DefaultSerializer;
60use query::sql::create_table_stmt;
61use regex::Regex;
62use session::context::QueryContextRef;
63use session::table_name::table_idents_to_full_name;
64use snafu::{ensure, OptionExt, ResultExt};
65use sql::parser::{ParseOptions, ParserContext};
66use sql::statements::alter::{AlterDatabase, AlterTable};
67#[cfg(feature = "enterprise")]
68use sql::statements::create::trigger::CreateTrigger;
69use sql::statements::create::{
70 CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
71};
72use sql::statements::sql_value_to_value;
73use sql::statements::statement::Statement;
74use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
75use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
76use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
77use table::dist_table::DistTable;
78use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
79use table::requests::{AlterKind, AlterTableRequest, TableOptions, COMMENT_KEY};
80use table::table_name::TableName;
81use table::TableRef;
82
83use crate::error::{
84 self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
85 ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
86 DeserializePartitionSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
87 FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
88 InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, ParseSqlValueSnafu, Result,
89 SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu,
90 TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
91 UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
92};
93use crate::expr_helper;
94use crate::statement::show::create_partitions_stmt;
95use crate::statement::StatementExecutor;
96
97lazy_static! {
98 static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap();
99}
100
101impl StatementExecutor {
102 pub fn catalog_manager(&self) -> CatalogManagerRef {
103 self.catalog_manager.clone()
104 }
105
106 #[tracing::instrument(skip_all)]
107 pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
108 let create_expr = &mut expr_helper::create_to_expr(&stmt, &ctx)?;
109 self.create_table_inner(create_expr, stmt.partitions, ctx)
110 .await
111 }
112
113 #[tracing::instrument(skip_all)]
114 pub async fn create_table_like(
115 &self,
116 stmt: CreateTableLike,
117 ctx: QueryContextRef,
118 ) -> Result<TableRef> {
119 let (catalog, schema, table) = table_idents_to_full_name(&stmt.source_name, &ctx)
120 .map_err(BoxedError::new)
121 .context(error::ExternalSnafu)?;
122 let table_ref = self
123 .catalog_manager
124 .table(&catalog, &schema, &table, Some(&ctx))
125 .await
126 .context(CatalogSnafu)?
127 .context(TableNotFoundSnafu { table_name: &table })?;
128 let partitions = self
129 .partition_manager
130 .find_table_partitions(table_ref.table_info().table_id())
131 .await
132 .context(error::FindTablePartitionRuleSnafu { table_name: table })?;
133
134 let schema_options = self
136 .table_metadata_manager
137 .schema_manager()
138 .get(SchemaNameKey {
139 catalog: &catalog,
140 schema: &schema,
141 })
142 .await
143 .context(TableMetadataManagerSnafu)?
144 .map(|v| v.into_inner());
145
146 let quote_style = ctx.quote_style();
147 let mut create_stmt =
148 create_table_stmt(&table_ref.table_info(), schema_options, quote_style)
149 .context(error::ParseQuerySnafu)?;
150 create_stmt.name = stmt.table_name;
151 create_stmt.if_not_exists = false;
152
153 let partitions = create_partitions_stmt(partitions)?.and_then(|mut partitions| {
154 if !partitions.column_list.is_empty() {
155 partitions.set_quote(quote_style);
156 Some(partitions)
157 } else {
158 None
159 }
160 });
161
162 let create_expr = &mut expr_helper::create_to_expr(&create_stmt, &ctx)?;
163 self.create_table_inner(create_expr, partitions, ctx).await
164 }
165
166 #[tracing::instrument(skip_all)]
167 pub async fn create_external_table(
168 &self,
169 create_expr: CreateExternalTable,
170 ctx: QueryContextRef,
171 ) -> Result<TableRef> {
172 let create_expr = &mut expr_helper::create_external_expr(create_expr, &ctx).await?;
173 self.create_table_inner(create_expr, None, ctx).await
174 }
175
176 #[tracing::instrument(skip_all)]
177 pub async fn create_table_inner(
178 &self,
179 create_table: &mut CreateTableExpr,
180 partitions: Option<Partitions>,
181 query_ctx: QueryContextRef,
182 ) -> Result<TableRef> {
183 ensure!(
184 !is_readonly_schema(&create_table.schema_name),
185 SchemaReadOnlySnafu {
186 name: create_table.schema_name.clone()
187 }
188 );
189
190 if create_table.engine == METRIC_ENGINE_NAME
191 && create_table
192 .table_options
193 .contains_key(LOGICAL_TABLE_METADATA_KEY)
194 {
195 ensure!(
197 partitions.is_none(),
198 InvalidPartitionRuleSnafu {
199 reason: "logical table in metric engine should not have partition rule, it will be inherited from physical table",
200 }
201 );
202 self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
203 .await?
204 .into_iter()
205 .next()
206 .context(error::UnexpectedSnafu {
207 violated: "expected to create logical tables",
208 })
209 } else {
210 self.create_non_logic_table(create_table, partitions, query_ctx)
212 .await
213 }
214 }
215
216 #[tracing::instrument(skip_all)]
217 pub async fn create_non_logic_table(
218 &self,
219 create_table: &mut CreateTableExpr,
220 partitions: Option<Partitions>,
221 query_ctx: QueryContextRef,
222 ) -> Result<TableRef> {
223 let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
224
225 let schema = self
227 .table_metadata_manager
228 .schema_manager()
229 .get(SchemaNameKey::new(
230 &create_table.catalog_name,
231 &create_table.schema_name,
232 ))
233 .await
234 .context(TableMetadataManagerSnafu)?;
235 ensure!(
236 schema.is_some(),
237 SchemaNotFoundSnafu {
238 schema_info: &create_table.schema_name,
239 }
240 );
241
242 if let Some(table) = self
244 .catalog_manager
245 .table(
246 &create_table.catalog_name,
247 &create_table.schema_name,
248 &create_table.table_name,
249 Some(&query_ctx),
250 )
251 .await
252 .context(CatalogSnafu)?
253 {
254 return if create_table.create_if_not_exists {
255 Ok(table)
256 } else {
257 TableAlreadyExistsSnafu {
258 table: format_full_table_name(
259 &create_table.catalog_name,
260 &create_table.schema_name,
261 &create_table.table_name,
262 ),
263 }
264 .fail()
265 };
266 }
267
268 ensure!(
269 NAME_PATTERN_REG.is_match(&create_table.table_name),
270 InvalidTableNameSnafu {
271 table_name: &create_table.table_name,
272 }
273 );
274
275 let table_name = TableName::new(
276 &create_table.catalog_name,
277 &create_table.schema_name,
278 &create_table.table_name,
279 );
280
281 let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
282 let mut table_info = create_table_info(create_table, partition_cols)?;
283
284 let resp = self
285 .create_table_procedure(
286 create_table.clone(),
287 partitions,
288 table_info.clone(),
289 query_ctx,
290 )
291 .await?;
292
293 let table_id = resp
294 .table_ids
295 .into_iter()
296 .next()
297 .context(error::UnexpectedSnafu {
298 violated: "expected table_id",
299 })?;
300 info!("Successfully created table '{table_name}' with table id {table_id}");
301
302 table_info.ident.table_id = table_id;
303
304 let table_info: Arc<TableInfo> =
305 Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?);
306 create_table.table_id = Some(api::v1::TableId { id: table_id });
307
308 let table = DistTable::table(table_info);
309
310 Ok(table)
311 }
312
313 #[tracing::instrument(skip_all)]
314 pub async fn create_logical_tables(
315 &self,
316 create_table_exprs: &[CreateTableExpr],
317 query_context: QueryContextRef,
318 ) -> Result<Vec<TableRef>> {
319 let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
320 ensure!(
321 !create_table_exprs.is_empty(),
322 EmptyDdlExprSnafu {
323 name: "create logic tables"
324 }
325 );
326
327 for create_table in create_table_exprs {
329 ensure!(
330 NAME_PATTERN_REG.is_match(&create_table.table_name),
331 InvalidTableNameSnafu {
332 table_name: &create_table.table_name,
333 }
334 );
335 }
336
337 let mut raw_tables_info = create_table_exprs
338 .iter()
339 .map(|create| create_table_info(create, vec![]))
340 .collect::<Result<Vec<_>>>()?;
341 let tables_data = create_table_exprs
342 .iter()
343 .cloned()
344 .zip(raw_tables_info.iter().cloned())
345 .collect::<Vec<_>>();
346
347 let resp = self
348 .create_logical_tables_procedure(tables_data, query_context)
349 .await?;
350
351 let table_ids = resp.table_ids;
352 ensure!(table_ids.len() == raw_tables_info.len(), CreateLogicalTablesSnafu {
353 reason: format!("The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}", raw_tables_info.len(), table_ids.len())
354 });
355 info!("Successfully created logical tables: {:?}", table_ids);
356
357 for (i, table_info) in raw_tables_info.iter_mut().enumerate() {
358 table_info.ident.table_id = table_ids[i];
359 }
360 let tables_info = raw_tables_info
361 .into_iter()
362 .map(|x| x.try_into().context(CreateTableInfoSnafu))
363 .collect::<Result<Vec<_>>>()?;
364
365 Ok(tables_info
366 .into_iter()
367 .map(|x| DistTable::table(Arc::new(x)))
368 .collect())
369 }
370
371 #[cfg(feature = "enterprise")]
372 #[tracing::instrument(skip_all)]
373 pub async fn create_trigger(
374 &self,
375 stmt: CreateTrigger,
376 query_context: QueryContextRef,
377 ) -> Result<Output> {
378 let expr = expr_helper::to_create_trigger_task_expr(stmt, &query_context)?;
379 self.create_trigger_inner(expr, query_context).await
380 }
381
382 #[cfg(feature = "enterprise")]
383 pub async fn create_trigger_inner(
384 &self,
385 expr: PbCreateTriggerExpr,
386 query_context: QueryContextRef,
387 ) -> Result<Output> {
388 self.create_trigger_procedure(expr, query_context).await?;
389 Ok(Output::new_with_affected_rows(0))
390 }
391
392 #[cfg(feature = "enterprise")]
393 async fn create_trigger_procedure(
394 &self,
395 expr: PbCreateTriggerExpr,
396 query_context: QueryContextRef,
397 ) -> Result<SubmitDdlTaskResponse> {
398 let task = CreateTriggerTask::try_from(PbCreateTriggerTask {
399 create_trigger: Some(expr),
400 })
401 .context(error::InvalidExprSnafu)?;
402
403 let request = SubmitDdlTaskRequest {
404 query_context,
405 task: DdlTask::new_create_trigger(task),
406 };
407
408 self.procedure_executor
409 .submit_ddl_task(&ExecutorContext::default(), request)
410 .await
411 .context(error::ExecuteDdlSnafu)
412 }
413
414 #[tracing::instrument(skip_all)]
415 pub async fn create_flow(
416 &self,
417 stmt: CreateFlow,
418 query_context: QueryContextRef,
419 ) -> Result<Output> {
420 let expr = expr_helper::to_create_flow_task_expr(stmt, &query_context)?;
422
423 self.create_flow_inner(expr, query_context).await
424 }
425
426 pub async fn create_flow_inner(
427 &self,
428 expr: CreateFlowExpr,
429 query_context: QueryContextRef,
430 ) -> Result<Output> {
431 self.create_flow_procedure(expr, query_context).await?;
432 Ok(Output::new_with_affected_rows(0))
433 }
434
435 async fn create_flow_procedure(
436 &self,
437 expr: CreateFlowExpr,
438 query_context: QueryContextRef,
439 ) -> Result<SubmitDdlTaskResponse> {
440 let flow_type = self
441 .determine_flow_type(&expr, query_context.clone())
442 .await?;
443 info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
444
445 let expr = {
446 let mut expr = expr;
447 expr.flow_options
448 .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
449 expr
450 };
451
452 let task = CreateFlowTask::try_from(PbCreateFlowTask {
453 create_flow: Some(expr),
454 })
455 .context(error::InvalidExprSnafu)?;
456 let request = SubmitDdlTaskRequest {
457 query_context,
458 task: DdlTask::new_create_flow(task),
459 };
460
461 self.procedure_executor
462 .submit_ddl_task(&ExecutorContext::default(), request)
463 .await
464 .context(error::ExecuteDdlSnafu)
465 }
466
467 async fn determine_flow_type(
471 &self,
472 expr: &CreateFlowExpr,
473 query_ctx: QueryContextRef,
474 ) -> Result<FlowType> {
475 for src_table_name in &expr.source_table_names {
477 let table = self
478 .catalog_manager()
479 .table(
480 &src_table_name.catalog_name,
481 &src_table_name.schema_name,
482 &src_table_name.table_name,
483 Some(&query_ctx),
484 )
485 .await
486 .map_err(BoxedError::new)
487 .context(ExternalSnafu)?
488 .with_context(|| TableNotFoundSnafu {
489 table_name: format_full_table_name(
490 &src_table_name.catalog_name,
491 &src_table_name.schema_name,
492 &src_table_name.table_name,
493 ),
494 })?;
495
496 if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
498 warn!(
499 "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
500 format_full_table_name(
501 &src_table_name.catalog_name,
502 &src_table_name.schema_name,
503 &src_table_name.table_name
504 ),
505 expr.flow_name
506 );
507 return Ok(FlowType::Streaming);
508 }
509 }
510
511 let engine = &self.query_engine;
512 let stmts = ParserContext::create_with_dialect(
513 &expr.sql,
514 query_ctx.sql_dialect(),
515 ParseOptions::default(),
516 )
517 .map_err(BoxedError::new)
518 .context(ExternalSnafu)?;
519
520 ensure!(
521 stmts.len() == 1,
522 InvalidSqlSnafu {
523 err_msg: format!("Expect only one statement, found {}", stmts.len())
524 }
525 );
526 let stmt = &stmts[0];
527
528 let plan = match stmt {
530 Statement::Tql(_) => return Ok(FlowType::Batching),
532 _ => engine
533 .planner()
534 .plan(&QueryStatement::Sql(stmt.clone()), query_ctx)
535 .await
536 .map_err(BoxedError::new)
537 .context(ExternalSnafu)?,
538 };
539
540 struct FindAggr {
542 is_aggr: bool,
543 }
544
545 impl TreeNodeVisitor<'_> for FindAggr {
546 type Node = LogicalPlan;
547 fn f_down(
548 &mut self,
549 node: &Self::Node,
550 ) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
551 {
552 match node {
553 LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
554 self.is_aggr = true;
555 return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
556 }
557 _ => (),
558 }
559 Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
560 }
561 }
562
563 let mut find_aggr = FindAggr { is_aggr: false };
564
565 plan.visit_with_subqueries(&mut find_aggr)
566 .context(BuildDfLogicalPlanSnafu)?;
567 if find_aggr.is_aggr {
568 Ok(FlowType::Batching)
569 } else {
570 Ok(FlowType::Streaming)
571 }
572 }
573
574 #[tracing::instrument(skip_all)]
575 pub async fn create_view(
576 &self,
577 create_view: CreateView,
578 ctx: QueryContextRef,
579 ) -> Result<TableRef> {
580 let logical_plan = match &*create_view.query {
582 Statement::Query(query) => {
583 self.plan(
584 &QueryStatement::Sql(Statement::Query(query.clone())),
585 ctx.clone(),
586 )
587 .await?
588 }
589 Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?,
590 _ => {
591 return InvalidViewStmtSnafu {}.fail();
592 }
593 };
594 let definition = create_view.to_string();
596
597 let schema: Schema = logical_plan
600 .schema()
601 .clone()
602 .try_into()
603 .context(ConvertSchemaSnafu)?;
604 let plan_columns: Vec<_> = schema
605 .column_schemas()
606 .iter()
607 .map(|c| c.name.clone())
608 .collect();
609
610 let columns: Vec<_> = create_view
611 .columns
612 .iter()
613 .map(|ident| ident.to_string())
614 .collect();
615
616 if !columns.is_empty() {
618 ensure!(
619 columns.len() == plan_columns.len(),
620 error::ViewColumnsMismatchSnafu {
621 view_name: create_view.name.to_string(),
622 expected: plan_columns.len(),
623 actual: columns.len(),
624 }
625 );
626 }
627
628 let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
631 .context(ExtractTableNamesSnafu)?;
632
633 let table_names = table_names.into_iter().map(|t| t.into()).collect();
634
635 let encoded_plan = DFLogicalSubstraitConvertor
642 .encode(&plan, DefaultSerializer)
643 .context(SubstraitCodecSnafu)?;
644
645 let expr = expr_helper::to_create_view_expr(
646 create_view,
647 encoded_plan.to_vec(),
648 table_names,
649 columns,
650 plan_columns,
651 definition,
652 ctx.clone(),
653 )?;
654
655 self.create_view_by_expr(expr, ctx).await
657 }
658
659 pub async fn create_view_by_expr(
660 &self,
661 expr: CreateViewExpr,
662 ctx: QueryContextRef,
663 ) -> Result<TableRef> {
664 ensure! {
665 !(expr.create_if_not_exists & expr.or_replace),
666 InvalidSqlSnafu {
667 err_msg: "syntax error Create Or Replace and If Not Exist cannot be used together",
668 }
669 };
670 let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
671
672 let schema_exists = self
673 .table_metadata_manager
674 .schema_manager()
675 .exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name))
676 .await
677 .context(TableMetadataManagerSnafu)?;
678
679 ensure!(
680 schema_exists,
681 SchemaNotFoundSnafu {
682 schema_info: &expr.schema_name,
683 }
684 );
685
686 if let Some(table) = self
688 .catalog_manager
689 .table(
690 &expr.catalog_name,
691 &expr.schema_name,
692 &expr.view_name,
693 Some(&ctx),
694 )
695 .await
696 .context(CatalogSnafu)?
697 {
698 let table_type = table.table_info().table_type;
699
700 match (table_type, expr.create_if_not_exists, expr.or_replace) {
701 (TableType::View, true, false) => {
702 return Ok(table);
703 }
704 (TableType::View, false, false) => {
705 return ViewAlreadyExistsSnafu {
706 name: format_full_table_name(
707 &expr.catalog_name,
708 &expr.schema_name,
709 &expr.view_name,
710 ),
711 }
712 .fail();
713 }
714 (TableType::View, _, true) => {
715 }
717 _ => {
718 return TableAlreadyExistsSnafu {
719 table: format_full_table_name(
720 &expr.catalog_name,
721 &expr.schema_name,
722 &expr.view_name,
723 ),
724 }
725 .fail();
726 }
727 }
728 }
729
730 ensure!(
731 NAME_PATTERN_REG.is_match(&expr.view_name),
732 InvalidViewNameSnafu {
733 name: expr.view_name.clone(),
734 }
735 );
736
737 let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
738
739 let mut view_info = RawTableInfo {
740 ident: metadata::TableIdent {
741 table_id: 0,
743 version: 0,
744 },
745 name: expr.view_name.clone(),
746 desc: None,
747 catalog_name: expr.catalog_name.clone(),
748 schema_name: expr.schema_name.clone(),
749 meta: RawTableMeta::default(),
751 table_type: TableType::View,
752 };
753
754 let request = SubmitDdlTaskRequest {
755 query_context: ctx,
756 task: DdlTask::new_create_view(expr, view_info.clone()),
757 };
758
759 let resp = self
760 .procedure_executor
761 .submit_ddl_task(&ExecutorContext::default(), request)
762 .await
763 .context(error::ExecuteDdlSnafu)?;
764
765 debug!(
766 "Submit creating view '{view_name}' task response: {:?}",
767 resp
768 );
769
770 let view_id = resp
771 .table_ids
772 .into_iter()
773 .next()
774 .context(error::UnexpectedSnafu {
775 violated: "expected table_id",
776 })?;
777 info!("Successfully created view '{view_name}' with view id {view_id}");
778
779 view_info.ident.table_id = view_id;
780
781 let view_info = Arc::new(view_info.try_into().context(CreateTableInfoSnafu)?);
782
783 let table = DistTable::table(view_info);
784
785 self.cache_invalidator
787 .invalidate(
788 &Context::default(),
789 &[
790 CacheIdent::TableId(view_id),
791 CacheIdent::TableName(view_name.clone()),
792 ],
793 )
794 .await
795 .context(error::InvalidateTableCacheSnafu)?;
796
797 Ok(table)
798 }
799
800 #[tracing::instrument(skip_all)]
801 pub async fn drop_flow(
802 &self,
803 catalog_name: String,
804 flow_name: String,
805 drop_if_exists: bool,
806 query_context: QueryContextRef,
807 ) -> Result<Output> {
808 if let Some(flow) = self
809 .flow_metadata_manager
810 .flow_name_manager()
811 .get(&catalog_name, &flow_name)
812 .await
813 .context(error::TableMetadataManagerSnafu)?
814 {
815 let flow_id = flow.flow_id();
816 let task = DropFlowTask {
817 catalog_name,
818 flow_name,
819 flow_id,
820 drop_if_exists,
821 };
822 self.drop_flow_procedure(task, query_context).await?;
823
824 Ok(Output::new_with_affected_rows(0))
825 } else if drop_if_exists {
826 Ok(Output::new_with_affected_rows(0))
827 } else {
828 FlowNotFoundSnafu {
829 flow_name: format_full_flow_name(&catalog_name, &flow_name),
830 }
831 .fail()
832 }
833 }
834
835 async fn drop_flow_procedure(
836 &self,
837 expr: DropFlowTask,
838 query_context: QueryContextRef,
839 ) -> Result<SubmitDdlTaskResponse> {
840 let request = SubmitDdlTaskRequest {
841 query_context,
842 task: DdlTask::new_drop_flow(expr),
843 };
844
845 self.procedure_executor
846 .submit_ddl_task(&ExecutorContext::default(), request)
847 .await
848 .context(error::ExecuteDdlSnafu)
849 }
850
851 #[tracing::instrument(skip_all)]
853 pub(crate) async fn drop_view(
854 &self,
855 catalog: String,
856 schema: String,
857 view: String,
858 drop_if_exists: bool,
859 query_context: QueryContextRef,
860 ) -> Result<Output> {
861 let view_info = if let Some(view) = self
862 .catalog_manager
863 .table(&catalog, &schema, &view, None)
864 .await
865 .context(CatalogSnafu)?
866 {
867 view.table_info()
868 } else if drop_if_exists {
869 return Ok(Output::new_with_affected_rows(0));
871 } else {
872 return TableNotFoundSnafu {
873 table_name: format_full_table_name(&catalog, &schema, &view),
874 }
875 .fail();
876 };
877
878 ensure!(
880 view_info.table_type == TableType::View,
881 error::InvalidViewSnafu {
882 msg: "not a view",
883 view_name: format_full_table_name(&catalog, &schema, &view),
884 }
885 );
886
887 let view_id = view_info.table_id();
888
889 let task = DropViewTask {
890 catalog,
891 schema,
892 view,
893 view_id,
894 drop_if_exists,
895 };
896
897 self.drop_view_procedure(task, query_context).await?;
898
899 Ok(Output::new_with_affected_rows(0))
900 }
901
902 async fn drop_view_procedure(
904 &self,
905 expr: DropViewTask,
906 query_context: QueryContextRef,
907 ) -> Result<SubmitDdlTaskResponse> {
908 let request = SubmitDdlTaskRequest {
909 query_context,
910 task: DdlTask::new_drop_view(expr),
911 };
912
913 self.procedure_executor
914 .submit_ddl_task(&ExecutorContext::default(), request)
915 .await
916 .context(error::ExecuteDdlSnafu)
917 }
918
919 #[tracing::instrument(skip_all)]
920 pub async fn alter_logical_tables(
921 &self,
922 alter_table_exprs: Vec<AlterTableExpr>,
923 query_context: QueryContextRef,
924 ) -> Result<Output> {
925 let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
926 ensure!(
927 !alter_table_exprs.is_empty(),
928 EmptyDdlExprSnafu {
929 name: "alter logical tables"
930 }
931 );
932
933 let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
935 for expr in alter_table_exprs {
936 let catalog = if expr.catalog_name.is_empty() {
938 query_context.current_catalog()
939 } else {
940 &expr.catalog_name
941 };
942 let schema = if expr.schema_name.is_empty() {
943 query_context.current_schema()
944 } else {
945 expr.schema_name.to_string()
946 };
947 let table_name = &expr.table_name;
948 let table = self
949 .catalog_manager
950 .table(catalog, &schema, table_name, Some(&query_context))
951 .await
952 .context(CatalogSnafu)?
953 .with_context(|| TableNotFoundSnafu {
954 table_name: format_full_table_name(catalog, &schema, table_name),
955 })?;
956 let table_id = table.table_info().ident.table_id;
957 let physical_table_id = self
958 .table_metadata_manager
959 .table_route_manager()
960 .get_physical_table_id(table_id)
961 .await
962 .context(TableMetadataManagerSnafu)?;
963 groups.entry(physical_table_id).or_default().push(expr);
964 }
965
966 let mut handles = Vec::with_capacity(groups.len());
968 for (_physical_table_id, exprs) in groups {
969 let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
970 handles.push(fut);
971 }
972 let _results = futures::future::try_join_all(handles).await?;
973
974 Ok(Output::new_with_affected_rows(0))
975 }
976
977 #[tracing::instrument(skip_all)]
978 pub async fn drop_table(
979 &self,
980 table_name: TableName,
981 drop_if_exists: bool,
982 query_context: QueryContextRef,
983 ) -> Result<Output> {
984 self.drop_tables(&[table_name], drop_if_exists, query_context)
986 .await
987 }
988
989 #[tracing::instrument(skip_all)]
990 pub async fn drop_tables(
991 &self,
992 table_names: &[TableName],
993 drop_if_exists: bool,
994 query_context: QueryContextRef,
995 ) -> Result<Output> {
996 let mut tables = Vec::with_capacity(table_names.len());
997 for table_name in table_names {
998 ensure!(
999 !is_readonly_schema(&table_name.schema_name),
1000 SchemaReadOnlySnafu {
1001 name: table_name.schema_name.clone()
1002 }
1003 );
1004
1005 if let Some(table) = self
1006 .catalog_manager
1007 .table(
1008 &table_name.catalog_name,
1009 &table_name.schema_name,
1010 &table_name.table_name,
1011 Some(&query_context),
1012 )
1013 .await
1014 .context(CatalogSnafu)?
1015 {
1016 tables.push(table.table_info().table_id());
1017 } else if drop_if_exists {
1018 continue;
1020 } else {
1021 return TableNotFoundSnafu {
1022 table_name: table_name.to_string(),
1023 }
1024 .fail();
1025 }
1026 }
1027
1028 for (table_name, table_id) in table_names.iter().zip(tables.into_iter()) {
1029 self.drop_table_procedure(table_name, table_id, drop_if_exists, query_context.clone())
1030 .await?;
1031
1032 self.cache_invalidator
1034 .invalidate(
1035 &Context::default(),
1036 &[
1037 CacheIdent::TableId(table_id),
1038 CacheIdent::TableName(table_name.clone()),
1039 ],
1040 )
1041 .await
1042 .context(error::InvalidateTableCacheSnafu)?;
1043 }
1044 Ok(Output::new_with_affected_rows(0))
1045 }
1046
1047 #[tracing::instrument(skip_all)]
1048 pub async fn drop_database(
1049 &self,
1050 catalog: String,
1051 schema: String,
1052 drop_if_exists: bool,
1053 query_context: QueryContextRef,
1054 ) -> Result<Output> {
1055 ensure!(
1056 !is_readonly_schema(&schema),
1057 SchemaReadOnlySnafu { name: schema }
1058 );
1059
1060 if self
1061 .catalog_manager
1062 .schema_exists(&catalog, &schema, None)
1063 .await
1064 .context(CatalogSnafu)?
1065 {
1066 if schema == query_context.current_schema() {
1067 SchemaInUseSnafu { name: schema }.fail()
1068 } else {
1069 self.drop_database_procedure(catalog, schema, drop_if_exists, query_context)
1070 .await?;
1071
1072 Ok(Output::new_with_affected_rows(0))
1073 }
1074 } else if drop_if_exists {
1075 Ok(Output::new_with_affected_rows(0))
1077 } else {
1078 SchemaNotFoundSnafu {
1079 schema_info: schema,
1080 }
1081 .fail()
1082 }
1083 }
1084
1085 #[tracing::instrument(skip_all)]
1086 pub async fn truncate_table(
1087 &self,
1088 table_name: TableName,
1089 query_context: QueryContextRef,
1090 ) -> Result<Output> {
1091 ensure!(
1092 !is_readonly_schema(&table_name.schema_name),
1093 SchemaReadOnlySnafu {
1094 name: table_name.schema_name.clone()
1095 }
1096 );
1097
1098 let table = self
1099 .catalog_manager
1100 .table(
1101 &table_name.catalog_name,
1102 &table_name.schema_name,
1103 &table_name.table_name,
1104 Some(&query_context),
1105 )
1106 .await
1107 .context(CatalogSnafu)?
1108 .with_context(|| TableNotFoundSnafu {
1109 table_name: table_name.to_string(),
1110 })?;
1111 let table_id = table.table_info().table_id();
1112 self.truncate_table_procedure(&table_name, table_id, query_context)
1113 .await?;
1114
1115 Ok(Output::new_with_affected_rows(0))
1116 }
1117
1118 fn verify_alter(
1124 &self,
1125 table_id: TableId,
1126 table_info: Arc<TableInfo>,
1127 expr: AlterTableExpr,
1128 ) -> Result<bool> {
1129 let request: AlterTableRequest = common_grpc_expr::alter_expr_to_request(table_id, expr)
1130 .context(AlterExprToRequestSnafu)?;
1131
1132 let AlterTableRequest {
1133 table_name,
1134 alter_kind,
1135 ..
1136 } = &request;
1137
1138 if let AlterKind::RenameTable { new_table_name } = alter_kind {
1139 ensure!(
1140 NAME_PATTERN_REG.is_match(new_table_name),
1141 error::UnexpectedSnafu {
1142 violated: format!("Invalid table name: {}", new_table_name)
1143 }
1144 );
1145 } else if let AlterKind::AddColumns { columns } = alter_kind {
1146 let column_names: HashSet<_> = table_info
1149 .meta
1150 .schema
1151 .column_schemas()
1152 .iter()
1153 .map(|schema| &schema.name)
1154 .collect();
1155 if columns.iter().all(|column| {
1156 column_names.contains(&column.column_schema.name) && column.add_if_not_exists
1157 }) {
1158 return Ok(false);
1159 }
1160 }
1161
1162 let _ = table_info
1163 .meta
1164 .builder_with_alter_kind(table_name, &request.alter_kind)
1165 .context(error::TableSnafu)?
1166 .build()
1167 .context(error::BuildTableMetaSnafu { table_name })?;
1168
1169 Ok(true)
1170 }
1171
1172 #[tracing::instrument(skip_all)]
1173 pub async fn alter_table(
1174 &self,
1175 alter_table: AlterTable,
1176 query_context: QueryContextRef,
1177 ) -> Result<Output> {
1178 let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
1179 self.alter_table_inner(expr, query_context).await
1180 }
1181
1182 #[tracing::instrument(skip_all)]
1183 pub async fn alter_table_inner(
1184 &self,
1185 expr: AlterTableExpr,
1186 query_context: QueryContextRef,
1187 ) -> Result<Output> {
1188 ensure!(
1189 !is_readonly_schema(&expr.schema_name),
1190 SchemaReadOnlySnafu {
1191 name: expr.schema_name.clone()
1192 }
1193 );
1194
1195 let catalog_name = if expr.catalog_name.is_empty() {
1196 DEFAULT_CATALOG_NAME.to_string()
1197 } else {
1198 expr.catalog_name.clone()
1199 };
1200
1201 let schema_name = if expr.schema_name.is_empty() {
1202 DEFAULT_SCHEMA_NAME.to_string()
1203 } else {
1204 expr.schema_name.clone()
1205 };
1206
1207 let table_name = expr.table_name.clone();
1208
1209 let table = self
1210 .catalog_manager
1211 .table(
1212 &catalog_name,
1213 &schema_name,
1214 &table_name,
1215 Some(&query_context),
1216 )
1217 .await
1218 .context(CatalogSnafu)?
1219 .with_context(|| TableNotFoundSnafu {
1220 table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
1221 })?;
1222
1223 let table_id = table.table_info().ident.table_id;
1224 let need_alter = self.verify_alter(table_id, table.table_info(), expr.clone())?;
1225 if !need_alter {
1226 return Ok(Output::new_with_affected_rows(0));
1227 }
1228 info!(
1229 "Table info before alter is {:?}, expr: {:?}",
1230 table.table_info(),
1231 expr
1232 );
1233
1234 let physical_table_id = self
1235 .table_metadata_manager
1236 .table_route_manager()
1237 .get_physical_table_id(table_id)
1238 .await
1239 .context(TableMetadataManagerSnafu)?;
1240
1241 let (req, invalidate_keys) = if physical_table_id == table_id {
1242 let req = SubmitDdlTaskRequest {
1244 query_context,
1245 task: DdlTask::new_alter_table(expr),
1246 };
1247
1248 let invalidate_keys = vec![
1249 CacheIdent::TableId(table_id),
1250 CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1251 ];
1252
1253 (req, invalidate_keys)
1254 } else {
1255 let req = SubmitDdlTaskRequest {
1257 query_context,
1258 task: DdlTask::new_alter_logical_tables(vec![expr]),
1259 };
1260
1261 let mut invalidate_keys = vec![
1262 CacheIdent::TableId(physical_table_id),
1263 CacheIdent::TableId(table_id),
1264 CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1265 ];
1266
1267 let physical_table = self
1268 .table_metadata_manager
1269 .table_info_manager()
1270 .get(physical_table_id)
1271 .await
1272 .context(TableMetadataManagerSnafu)?
1273 .map(|x| x.into_inner());
1274 if let Some(physical_table) = physical_table {
1275 let physical_table_name = TableName::new(
1276 physical_table.table_info.catalog_name,
1277 physical_table.table_info.schema_name,
1278 physical_table.table_info.name,
1279 );
1280 invalidate_keys.push(CacheIdent::TableName(physical_table_name));
1281 }
1282
1283 (req, invalidate_keys)
1284 };
1285
1286 self.procedure_executor
1287 .submit_ddl_task(&ExecutorContext::default(), req)
1288 .await
1289 .context(error::ExecuteDdlSnafu)?;
1290
1291 self.cache_invalidator
1293 .invalidate(&Context::default(), &invalidate_keys)
1294 .await
1295 .context(error::InvalidateTableCacheSnafu)?;
1296
1297 Ok(Output::new_with_affected_rows(0))
1298 }
1299
1300 #[tracing::instrument(skip_all)]
1301 pub async fn alter_database(
1302 &self,
1303 alter_expr: AlterDatabase,
1304 query_context: QueryContextRef,
1305 ) -> Result<Output> {
1306 let alter_expr = expr_helper::to_alter_database_expr(alter_expr, &query_context)?;
1307 self.alter_database_inner(alter_expr, query_context).await
1308 }
1309
1310 #[tracing::instrument(skip_all)]
1311 pub async fn alter_database_inner(
1312 &self,
1313 alter_expr: AlterDatabaseExpr,
1314 query_context: QueryContextRef,
1315 ) -> Result<Output> {
1316 ensure!(
1317 !is_readonly_schema(&alter_expr.schema_name),
1318 SchemaReadOnlySnafu {
1319 name: query_context.current_schema().clone()
1320 }
1321 );
1322
1323 let exists = self
1324 .catalog_manager
1325 .schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
1326 .await
1327 .context(CatalogSnafu)?;
1328 ensure!(
1329 exists,
1330 SchemaNotFoundSnafu {
1331 schema_info: alter_expr.schema_name,
1332 }
1333 );
1334
1335 let cache_ident = [CacheIdent::SchemaName(SchemaName {
1336 catalog_name: alter_expr.catalog_name.clone(),
1337 schema_name: alter_expr.schema_name.clone(),
1338 })];
1339
1340 self.alter_database_procedure(alter_expr, query_context)
1341 .await?;
1342
1343 self.cache_invalidator
1345 .invalidate(&Context::default(), &cache_ident)
1346 .await
1347 .context(error::InvalidateTableCacheSnafu)?;
1348
1349 Ok(Output::new_with_affected_rows(0))
1350 }
1351
1352 async fn create_table_procedure(
1353 &self,
1354 create_table: CreateTableExpr,
1355 partitions: Vec<Partition>,
1356 table_info: RawTableInfo,
1357 query_context: QueryContextRef,
1358 ) -> Result<SubmitDdlTaskResponse> {
1359 let partitions = partitions.into_iter().map(Into::into).collect();
1360
1361 let request = SubmitDdlTaskRequest {
1362 query_context,
1363 task: DdlTask::new_create_table(create_table, partitions, table_info),
1364 };
1365
1366 self.procedure_executor
1367 .submit_ddl_task(&ExecutorContext::default(), request)
1368 .await
1369 .context(error::ExecuteDdlSnafu)
1370 }
1371
1372 async fn create_logical_tables_procedure(
1373 &self,
1374 tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
1375 query_context: QueryContextRef,
1376 ) -> Result<SubmitDdlTaskResponse> {
1377 let request = SubmitDdlTaskRequest {
1378 query_context,
1379 task: DdlTask::new_create_logical_tables(tables_data),
1380 };
1381
1382 self.procedure_executor
1383 .submit_ddl_task(&ExecutorContext::default(), request)
1384 .await
1385 .context(error::ExecuteDdlSnafu)
1386 }
1387
1388 async fn alter_logical_tables_procedure(
1389 &self,
1390 tables_data: Vec<AlterTableExpr>,
1391 query_context: QueryContextRef,
1392 ) -> Result<SubmitDdlTaskResponse> {
1393 let request = SubmitDdlTaskRequest {
1394 query_context,
1395 task: DdlTask::new_alter_logical_tables(tables_data),
1396 };
1397
1398 self.procedure_executor
1399 .submit_ddl_task(&ExecutorContext::default(), request)
1400 .await
1401 .context(error::ExecuteDdlSnafu)
1402 }
1403
1404 async fn drop_table_procedure(
1405 &self,
1406 table_name: &TableName,
1407 table_id: TableId,
1408 drop_if_exists: bool,
1409 query_context: QueryContextRef,
1410 ) -> Result<SubmitDdlTaskResponse> {
1411 let request = SubmitDdlTaskRequest {
1412 query_context,
1413 task: DdlTask::new_drop_table(
1414 table_name.catalog_name.to_string(),
1415 table_name.schema_name.to_string(),
1416 table_name.table_name.to_string(),
1417 table_id,
1418 drop_if_exists,
1419 ),
1420 };
1421
1422 self.procedure_executor
1423 .submit_ddl_task(&ExecutorContext::default(), request)
1424 .await
1425 .context(error::ExecuteDdlSnafu)
1426 }
1427
1428 async fn drop_database_procedure(
1429 &self,
1430 catalog: String,
1431 schema: String,
1432 drop_if_exists: bool,
1433 query_context: QueryContextRef,
1434 ) -> Result<SubmitDdlTaskResponse> {
1435 let request = SubmitDdlTaskRequest {
1436 query_context,
1437 task: DdlTask::new_drop_database(catalog, schema, drop_if_exists),
1438 };
1439
1440 self.procedure_executor
1441 .submit_ddl_task(&ExecutorContext::default(), request)
1442 .await
1443 .context(error::ExecuteDdlSnafu)
1444 }
1445
1446 async fn alter_database_procedure(
1447 &self,
1448 alter_expr: AlterDatabaseExpr,
1449 query_context: QueryContextRef,
1450 ) -> Result<SubmitDdlTaskResponse> {
1451 let request = SubmitDdlTaskRequest {
1452 query_context,
1453 task: DdlTask::new_alter_database(alter_expr),
1454 };
1455
1456 self.procedure_executor
1457 .submit_ddl_task(&ExecutorContext::default(), request)
1458 .await
1459 .context(error::ExecuteDdlSnafu)
1460 }
1461
1462 async fn truncate_table_procedure(
1463 &self,
1464 table_name: &TableName,
1465 table_id: TableId,
1466 query_context: QueryContextRef,
1467 ) -> Result<SubmitDdlTaskResponse> {
1468 let request = SubmitDdlTaskRequest {
1469 query_context,
1470 task: DdlTask::new_truncate_table(
1471 table_name.catalog_name.to_string(),
1472 table_name.schema_name.to_string(),
1473 table_name.table_name.to_string(),
1474 table_id,
1475 ),
1476 };
1477
1478 self.procedure_executor
1479 .submit_ddl_task(&ExecutorContext::default(), request)
1480 .await
1481 .context(error::ExecuteDdlSnafu)
1482 }
1483
1484 #[tracing::instrument(skip_all)]
1485 pub async fn create_database(
1486 &self,
1487 database: &str,
1488 create_if_not_exists: bool,
1489 options: HashMap<String, String>,
1490 query_context: QueryContextRef,
1491 ) -> Result<Output> {
1492 let catalog = query_context.current_catalog();
1493 ensure!(
1494 NAME_PATTERN_REG.is_match(catalog),
1495 error::UnexpectedSnafu {
1496 violated: format!("Invalid catalog name: {}", catalog)
1497 }
1498 );
1499
1500 ensure!(
1501 NAME_PATTERN_REG.is_match(database),
1502 error::UnexpectedSnafu {
1503 violated: format!("Invalid database name: {}", database)
1504 }
1505 );
1506
1507 if !self
1508 .catalog_manager
1509 .schema_exists(catalog, database, None)
1510 .await
1511 .context(CatalogSnafu)?
1512 && !self.catalog_manager.is_reserved_schema_name(database)
1513 {
1514 self.create_database_procedure(
1515 catalog.to_string(),
1516 database.to_string(),
1517 create_if_not_exists,
1518 options,
1519 query_context,
1520 )
1521 .await?;
1522
1523 Ok(Output::new_with_affected_rows(1))
1524 } else if create_if_not_exists {
1525 Ok(Output::new_with_affected_rows(1))
1526 } else {
1527 error::SchemaExistsSnafu { name: database }.fail()
1528 }
1529 }
1530
1531 async fn create_database_procedure(
1532 &self,
1533 catalog: String,
1534 database: String,
1535 create_if_not_exists: bool,
1536 options: HashMap<String, String>,
1537 query_context: QueryContextRef,
1538 ) -> Result<SubmitDdlTaskResponse> {
1539 let request = SubmitDdlTaskRequest {
1540 query_context,
1541 task: DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
1542 };
1543
1544 self.procedure_executor
1545 .submit_ddl_task(&ExecutorContext::default(), request)
1546 .await
1547 .context(error::ExecuteDdlSnafu)
1548 }
1549}
1550
1551fn parse_partitions(
1553 create_table: &CreateTableExpr,
1554 partitions: Option<Partitions>,
1555 query_ctx: &QueryContextRef,
1556) -> Result<(Vec<MetaPartition>, Vec<String>)> {
1557 let partition_columns = find_partition_columns(&partitions)?;
1560 let partition_entries =
1561 find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
1562
1563 let mut exprs = vec![];
1565 for partition in &partition_entries {
1566 for bound in partition {
1567 if let PartitionBound::Expr(expr) = bound {
1568 exprs.push(expr.clone());
1569 }
1570 }
1571 }
1572 MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs)
1573 .context(InvalidPartitionSnafu)?;
1574
1575 Ok((
1576 partition_entries
1577 .into_iter()
1578 .map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x)))
1579 .collect::<std::result::Result<_, _>>()
1580 .context(DeserializePartitionSnafu)?,
1581 partition_columns,
1582 ))
1583}
1584
1585fn create_table_info(
1586 create_table: &CreateTableExpr,
1587 partition_columns: Vec<String>,
1588) -> Result<RawTableInfo> {
1589 let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
1590 let mut column_name_to_index_map = HashMap::new();
1591
1592 for (idx, column) in create_table.column_defs.iter().enumerate() {
1593 let schema =
1594 column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
1595 column: &column.name,
1596 })?;
1597 let schema = schema.with_time_index(column.name == create_table.time_index);
1598
1599 column_schemas.push(schema);
1600 let _ = column_name_to_index_map.insert(column.name.clone(), idx);
1601 }
1602
1603 let timestamp_index = column_name_to_index_map
1604 .get(&create_table.time_index)
1605 .cloned();
1606
1607 let raw_schema = RawSchema {
1608 column_schemas: column_schemas.clone(),
1609 timestamp_index,
1610 version: 0,
1611 };
1612
1613 let primary_key_indices = create_table
1614 .primary_keys
1615 .iter()
1616 .map(|name| {
1617 column_name_to_index_map
1618 .get(name)
1619 .cloned()
1620 .context(ColumnNotFoundSnafu { msg: name })
1621 })
1622 .collect::<Result<Vec<_>>>()?;
1623
1624 let partition_key_indices = partition_columns
1625 .into_iter()
1626 .map(|col_name| {
1627 column_name_to_index_map
1628 .get(&col_name)
1629 .cloned()
1630 .context(ColumnNotFoundSnafu { msg: col_name })
1631 })
1632 .collect::<Result<Vec<_>>>()?;
1633
1634 let table_options = TableOptions::try_from_iter(&create_table.table_options)
1635 .context(UnrecognizedTableOptionSnafu)?;
1636
1637 let meta = RawTableMeta {
1638 schema: raw_schema,
1639 primary_key_indices,
1640 value_indices: vec![],
1641 engine: create_table.engine.clone(),
1642 next_column_id: column_schemas.len() as u32,
1643 region_numbers: vec![],
1644 options: table_options,
1645 created_on: Utc::now(),
1646 partition_key_indices,
1647 };
1648
1649 let desc = if create_table.desc.is_empty() {
1650 create_table.table_options.get(COMMENT_KEY).cloned()
1651 } else {
1652 Some(create_table.desc.clone())
1653 };
1654
1655 let table_info = RawTableInfo {
1656 ident: metadata::TableIdent {
1657 table_id: 0,
1659 version: 0,
1660 },
1661 name: create_table.table_name.clone(),
1662 desc,
1663 catalog_name: create_table.catalog_name.clone(),
1664 schema_name: create_table.schema_name.clone(),
1665 meta,
1666 table_type: TableType::Base,
1667 };
1668 Ok(table_info)
1669}
1670
1671fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
1672 let columns = if let Some(partitions) = partitions {
1673 partitions
1674 .column_list
1675 .iter()
1676 .map(|x| x.value.clone())
1677 .collect::<Vec<_>>()
1678 } else {
1679 vec![]
1680 };
1681 Ok(columns)
1682}
1683
1684fn find_partition_entries(
1688 create_table: &CreateTableExpr,
1689 partitions: &Option<Partitions>,
1690 partition_columns: &[String],
1691 query_ctx: &QueryContextRef,
1692) -> Result<Vec<Vec<PartitionBound>>> {
1693 let entries = if let Some(partitions) = partitions {
1694 let column_defs = partition_columns
1696 .iter()
1697 .map(|pc| {
1698 create_table
1699 .column_defs
1700 .iter()
1701 .find(|c| &c.name == pc)
1702 .unwrap()
1704 })
1705 .collect::<Vec<_>>();
1706 let mut column_name_and_type = HashMap::with_capacity(column_defs.len());
1707 for column in column_defs {
1708 let column_name = &column.name;
1709 let data_type = ConcreteDataType::from(
1710 ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension)
1711 .context(ColumnDataTypeSnafu)?,
1712 );
1713 column_name_and_type.insert(column_name, data_type);
1714 }
1715
1716 let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
1718 for partition in &partitions.exprs {
1719 let partition_expr =
1720 convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
1721 partition_exprs.push(vec![PartitionBound::Expr(partition_expr)]);
1722 }
1723
1724 if partition_exprs.is_empty() {
1726 partition_exprs.push(vec![PartitionBound::MaxValue]);
1727 }
1728
1729 partition_exprs
1730 } else {
1731 vec![vec![PartitionBound::MaxValue]]
1732 };
1733 Ok(entries)
1734}
1735
1736fn convert_one_expr(
1737 expr: &Expr,
1738 column_name_and_type: &HashMap<&String, ConcreteDataType>,
1739 timezone: &Timezone,
1740) -> Result<PartitionExpr> {
1741 let Expr::BinaryOp { left, op, right } = expr else {
1742 return InvalidPartitionRuleSnafu {
1743 reason: "partition rule must be a binary expression",
1744 }
1745 .fail();
1746 };
1747
1748 let op =
1749 RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
1750 reason: format!("unsupported operator in partition expr {op}"),
1751 })?;
1752
1753 let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
1755 (Expr::Identifier(ident), Expr::Value(value)) => {
1757 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1758 let value = convert_value(value, data_type, timezone, None)?;
1759 (Operand::Column(column_name), op, Operand::Value(value))
1760 }
1761 (Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
1762 if let Expr::Value(v) = &**expr =>
1763 {
1764 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1765 let value = convert_value(v, data_type, timezone, Some(*unary_op))?;
1766 (Operand::Column(column_name), op, Operand::Value(value))
1767 }
1768 (Expr::Value(value), Expr::Identifier(ident)) => {
1770 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1771 let value = convert_value(value, data_type, timezone, None)?;
1772 (Operand::Value(value), op, Operand::Column(column_name))
1773 }
1774 (Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
1775 if let Expr::Value(v) = &**expr =>
1776 {
1777 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1778 let value = convert_value(v, data_type, timezone, Some(*unary_op))?;
1779 (Operand::Value(value), op, Operand::Column(column_name))
1780 }
1781 (Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
1782 let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
1784 let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
1785 (Operand::Expr(lhs), op, Operand::Expr(rhs))
1786 }
1787 _ => {
1788 return InvalidPartitionRuleSnafu {
1789 reason: format!("invalid partition expr {expr}"),
1790 }
1791 .fail();
1792 }
1793 };
1794
1795 Ok(PartitionExpr::new(lhs, op, rhs))
1796}
1797
1798fn convert_identifier(
1799 ident: &Ident,
1800 column_name_and_type: &HashMap<&String, ConcreteDataType>,
1801) -> Result<(String, ConcreteDataType)> {
1802 let column_name = ident.value.clone();
1803 let data_type = column_name_and_type
1804 .get(&column_name)
1805 .cloned()
1806 .with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
1807 Ok((column_name, data_type))
1808}
1809
1810fn convert_value(
1811 value: &ParserValue,
1812 data_type: ConcreteDataType,
1813 timezone: &Timezone,
1814 unary_op: Option<UnaryOperator>,
1815) -> Result<Value> {
1816 sql_value_to_value(
1817 "<NONAME>",
1818 &data_type,
1819 value,
1820 Some(timezone),
1821 unary_op,
1822 false,
1823 )
1824 .context(ParseSqlValueSnafu)
1825}
1826
1827#[cfg(test)]
1828mod test {
1829 use session::context::{QueryContext, QueryContextBuilder};
1830 use sql::dialect::GreptimeDbDialect;
1831 use sql::parser::{ParseOptions, ParserContext};
1832 use sql::statements::statement::Statement;
1833
1834 use super::*;
1835 use crate::expr_helper;
1836
1837 #[test]
1838 fn test_name_is_match() {
1839 assert!(!NAME_PATTERN_REG.is_match("/adaf"));
1840 assert!(!NAME_PATTERN_REG.is_match("🈲"));
1841 assert!(NAME_PATTERN_REG.is_match("hello"));
1842 assert!(NAME_PATTERN_REG.is_match("test@"));
1843 assert!(!NAME_PATTERN_REG.is_match("@test"));
1844 assert!(NAME_PATTERN_REG.is_match("test#"));
1845 assert!(!NAME_PATTERN_REG.is_match("#test"));
1846 assert!(!NAME_PATTERN_REG.is_match("@"));
1847 assert!(!NAME_PATTERN_REG.is_match("#"));
1848 }
1849
1850 #[tokio::test]
1851 #[ignore = "TODO(ruihang): WIP new partition rule"]
1852 async fn test_parse_partitions() {
1853 common_telemetry::init_default_ut_logging();
1854 let cases = [
1855 (
1856 r"
1857CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
1858PARTITION ON COLUMNS (b) (
1859 b < 'hz',
1860 b >= 'hz' AND b < 'sh',
1861 b >= 'sh'
1862)
1863ENGINE=mito",
1864 r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
1865 ),
1866 (
1867 r"
1868CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
1869PARTITION BY RANGE COLUMNS (b, a) (
1870 PARTITION r0 VALUES LESS THAN ('hz', 10),
1871 b < 'hz' AND a < 10,
1872 b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20,
1873 b >= 'sh' AND a >= 20
1874)
1875ENGINE=mito",
1876 r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
1877 ),
1878 ];
1879 let ctx = QueryContextBuilder::default().build().into();
1880 for (sql, expected) in cases {
1881 let result = ParserContext::create_with_dialect(
1882 sql,
1883 &GreptimeDbDialect {},
1884 ParseOptions::default(),
1885 )
1886 .unwrap();
1887 match &result[0] {
1888 Statement::CreateTable(c) => {
1889 let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
1890 let (partitions, _) =
1891 parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
1892 let json = serde_json::to_string(&partitions).unwrap();
1893 assert_eq!(json, expected);
1894 }
1895 _ => unreachable!(),
1896 }
1897 }
1898 }
1899}