1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use api::helper::ColumnDataTypeWrapper;
19use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
20use api::v1::{
21 column_def, AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
22};
23use catalog::CatalogManagerRef;
24use chrono::Utc;
25use common_catalog::consts::{is_readonly_schema, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
26use common_catalog::{format_full_flow_name, format_full_table_name};
27use common_error::ext::BoxedError;
28use common_meta::cache_invalidator::Context;
29use common_meta::ddl::create_flow::FlowType;
30use common_meta::ddl::ExecutorContext;
31use common_meta::instruction::CacheIdent;
32use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
33use common_meta::key::NAME_PATTERN;
34use common_meta::rpc::ddl::{
35 CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
36 SubmitDdlTaskResponse,
37};
38use common_meta::rpc::router::{Partition, Partition as MetaPartition};
39use common_query::Output;
40use common_telemetry::{debug, info, tracing, warn};
41use common_time::Timezone;
42use datafusion_common::tree_node::TreeNodeVisitor;
43use datafusion_expr::LogicalPlan;
44use datatypes::prelude::ConcreteDataType;
45use datatypes::schema::{RawSchema, Schema};
46use datatypes::value::Value;
47use lazy_static::lazy_static;
48use partition::expr::{Operand, PartitionExpr, RestrictedOp};
49use partition::multi_dim::MultiDimPartitionRule;
50use partition::partition::{PartitionBound, PartitionDef};
51use query::parser::{QueryLanguageParser, QueryStatement};
52use query::plan::extract_and_rewrite_full_table_names;
53use query::query_engine::DefaultSerializer;
54use query::sql::create_table_stmt;
55use regex::Regex;
56use session::context::QueryContextRef;
57use session::table_name::table_idents_to_full_name;
58use snafu::{ensure, OptionExt, ResultExt};
59use sql::statements::alter::{AlterDatabase, AlterTable};
60use sql::statements::create::{
61 CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
62};
63use sql::statements::sql_value_to_value;
64use sql::statements::statement::Statement;
65use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
66use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
67use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
68use table::dist_table::DistTable;
69use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
70use table::requests::{AlterKind, AlterTableRequest, TableOptions, COMMENT_KEY};
71use table::table_name::TableName;
72use table::TableRef;
73
74use crate::error::{
75 self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
76 ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
77 DeserializePartitionSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
78 FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
79 InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, ParseSqlValueSnafu, Result,
80 SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu,
81 TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
82 UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
83};
84use crate::expr_helper;
85use crate::statement::show::create_partitions_stmt;
86use crate::statement::StatementExecutor;
87
88lazy_static! {
89 static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap();
90}
91
92impl StatementExecutor {
93 pub fn catalog_manager(&self) -> CatalogManagerRef {
94 self.catalog_manager.clone()
95 }
96
97 #[tracing::instrument(skip_all)]
98 pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
99 let create_expr = &mut expr_helper::create_to_expr(&stmt, &ctx)?;
100 self.create_table_inner(create_expr, stmt.partitions, ctx)
101 .await
102 }
103
104 #[tracing::instrument(skip_all)]
105 pub async fn create_table_like(
106 &self,
107 stmt: CreateTableLike,
108 ctx: QueryContextRef,
109 ) -> Result<TableRef> {
110 let (catalog, schema, table) = table_idents_to_full_name(&stmt.source_name, &ctx)
111 .map_err(BoxedError::new)
112 .context(error::ExternalSnafu)?;
113 let table_ref = self
114 .catalog_manager
115 .table(&catalog, &schema, &table, Some(&ctx))
116 .await
117 .context(CatalogSnafu)?
118 .context(TableNotFoundSnafu { table_name: &table })?;
119 let partitions = self
120 .partition_manager
121 .find_table_partitions(table_ref.table_info().table_id())
122 .await
123 .context(error::FindTablePartitionRuleSnafu { table_name: table })?;
124
125 let schema_options = self
127 .table_metadata_manager
128 .schema_manager()
129 .get(SchemaNameKey {
130 catalog: &catalog,
131 schema: &schema,
132 })
133 .await
134 .context(TableMetadataManagerSnafu)?
135 .map(|v| v.into_inner());
136
137 let quote_style = ctx.quote_style();
138 let mut create_stmt =
139 create_table_stmt(&table_ref.table_info(), schema_options, quote_style)
140 .context(error::ParseQuerySnafu)?;
141 create_stmt.name = stmt.table_name;
142 create_stmt.if_not_exists = false;
143
144 let partitions = create_partitions_stmt(partitions)?.and_then(|mut partitions| {
145 if !partitions.column_list.is_empty() {
146 partitions.set_quote(quote_style);
147 Some(partitions)
148 } else {
149 None
150 }
151 });
152
153 let create_expr = &mut expr_helper::create_to_expr(&create_stmt, &ctx)?;
154 self.create_table_inner(create_expr, partitions, ctx).await
155 }
156
157 #[tracing::instrument(skip_all)]
158 pub async fn create_external_table(
159 &self,
160 create_expr: CreateExternalTable,
161 ctx: QueryContextRef,
162 ) -> Result<TableRef> {
163 let create_expr = &mut expr_helper::create_external_expr(create_expr, &ctx).await?;
164 self.create_table_inner(create_expr, None, ctx).await
165 }
166
167 #[tracing::instrument(skip_all)]
168 pub async fn create_table_inner(
169 &self,
170 create_table: &mut CreateTableExpr,
171 partitions: Option<Partitions>,
172 query_ctx: QueryContextRef,
173 ) -> Result<TableRef> {
174 ensure!(
175 !is_readonly_schema(&create_table.schema_name),
176 SchemaReadOnlySnafu {
177 name: create_table.schema_name.clone()
178 }
179 );
180
181 if create_table.engine == METRIC_ENGINE_NAME
183 && create_table
184 .table_options
185 .contains_key(LOGICAL_TABLE_METADATA_KEY)
186 {
187 return self
188 .create_logical_tables(&[create_table.clone()], query_ctx)
189 .await?
190 .into_iter()
191 .next()
192 .context(error::UnexpectedSnafu {
193 violated: "expected to create a logical table",
194 });
195 }
196
197 let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
198
199 let schema = self
200 .table_metadata_manager
201 .schema_manager()
202 .get(SchemaNameKey::new(
203 &create_table.catalog_name,
204 &create_table.schema_name,
205 ))
206 .await
207 .context(TableMetadataManagerSnafu)?;
208
209 ensure!(
210 schema.is_some(),
211 SchemaNotFoundSnafu {
212 schema_info: &create_table.schema_name,
213 }
214 );
215
216 if let Some(table) = self
218 .catalog_manager
219 .table(
220 &create_table.catalog_name,
221 &create_table.schema_name,
222 &create_table.table_name,
223 Some(&query_ctx),
224 )
225 .await
226 .context(CatalogSnafu)?
227 {
228 return if create_table.create_if_not_exists {
229 Ok(table)
230 } else {
231 TableAlreadyExistsSnafu {
232 table: format_full_table_name(
233 &create_table.catalog_name,
234 &create_table.schema_name,
235 &create_table.table_name,
236 ),
237 }
238 .fail()
239 };
240 }
241
242 ensure!(
243 NAME_PATTERN_REG.is_match(&create_table.table_name),
244 InvalidTableNameSnafu {
245 table_name: &create_table.table_name,
246 }
247 );
248
249 let table_name = TableName::new(
250 &create_table.catalog_name,
251 &create_table.schema_name,
252 &create_table.table_name,
253 );
254
255 let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
256 let mut table_info = create_table_info(create_table, partition_cols)?;
257
258 let resp = self
259 .create_table_procedure(
260 create_table.clone(),
261 partitions,
262 table_info.clone(),
263 query_ctx,
264 )
265 .await?;
266
267 let table_id = resp
268 .table_ids
269 .into_iter()
270 .next()
271 .context(error::UnexpectedSnafu {
272 violated: "expected table_id",
273 })?;
274 info!("Successfully created table '{table_name}' with table id {table_id}");
275
276 table_info.ident.table_id = table_id;
277
278 let table_info: Arc<TableInfo> =
279 Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?);
280 create_table.table_id = Some(api::v1::TableId { id: table_id });
281
282 let table = DistTable::table(table_info);
283
284 Ok(table)
285 }
286
287 #[tracing::instrument(skip_all)]
288 pub async fn create_logical_tables(
289 &self,
290 create_table_exprs: &[CreateTableExpr],
291 query_context: QueryContextRef,
292 ) -> Result<Vec<TableRef>> {
293 let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
294 ensure!(
295 !create_table_exprs.is_empty(),
296 EmptyDdlExprSnafu {
297 name: "create logic tables"
298 }
299 );
300
301 for create_table in create_table_exprs {
303 ensure!(
304 NAME_PATTERN_REG.is_match(&create_table.table_name),
305 InvalidTableNameSnafu {
306 table_name: &create_table.table_name,
307 }
308 );
309 }
310
311 let mut raw_tables_info = create_table_exprs
312 .iter()
313 .map(|create| create_table_info(create, vec![]))
314 .collect::<Result<Vec<_>>>()?;
315 let tables_data = create_table_exprs
316 .iter()
317 .cloned()
318 .zip(raw_tables_info.iter().cloned())
319 .collect::<Vec<_>>();
320
321 let resp = self
322 .create_logical_tables_procedure(tables_data, query_context)
323 .await?;
324
325 let table_ids = resp.table_ids;
326 ensure!(table_ids.len() == raw_tables_info.len(), CreateLogicalTablesSnafu {
327 reason: format!("The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}", raw_tables_info.len(), table_ids.len())
328 });
329 info!("Successfully created logical tables: {:?}", table_ids);
330
331 for (i, table_info) in raw_tables_info.iter_mut().enumerate() {
332 table_info.ident.table_id = table_ids[i];
333 }
334 let tables_info = raw_tables_info
335 .into_iter()
336 .map(|x| x.try_into().context(CreateTableInfoSnafu))
337 .collect::<Result<Vec<_>>>()?;
338
339 Ok(tables_info
340 .into_iter()
341 .map(|x| DistTable::table(Arc::new(x)))
342 .collect())
343 }
344
345 #[tracing::instrument(skip_all)]
346 pub async fn create_flow(
347 &self,
348 stmt: CreateFlow,
349 query_context: QueryContextRef,
350 ) -> Result<Output> {
351 let expr = expr_helper::to_create_flow_task_expr(stmt, &query_context)?;
353
354 self.create_flow_inner(expr, query_context).await
355 }
356
357 pub async fn create_flow_inner(
358 &self,
359 expr: CreateFlowExpr,
360 query_context: QueryContextRef,
361 ) -> Result<Output> {
362 self.create_flow_procedure(expr, query_context).await?;
363 Ok(Output::new_with_affected_rows(0))
364 }
365
366 async fn create_flow_procedure(
367 &self,
368 expr: CreateFlowExpr,
369 query_context: QueryContextRef,
370 ) -> Result<SubmitDdlTaskResponse> {
371 let flow_type = self
372 .determine_flow_type(&expr, query_context.clone())
373 .await?;
374 info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
375
376 let expr = {
377 let mut expr = expr;
378 expr.flow_options
379 .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
380 expr
381 };
382
383 let task = CreateFlowTask::try_from(PbCreateFlowTask {
384 create_flow: Some(expr),
385 })
386 .context(error::InvalidExprSnafu)?;
387 let request = SubmitDdlTaskRequest {
388 query_context,
389 task: DdlTask::new_create_flow(task),
390 };
391
392 self.procedure_executor
393 .submit_ddl_task(&ExecutorContext::default(), request)
394 .await
395 .context(error::ExecuteDdlSnafu)
396 }
397
398 async fn determine_flow_type(
402 &self,
403 expr: &CreateFlowExpr,
404 query_ctx: QueryContextRef,
405 ) -> Result<FlowType> {
406 for src_table_name in &expr.source_table_names {
408 let table = self
409 .catalog_manager()
410 .table(
411 &src_table_name.catalog_name,
412 &src_table_name.schema_name,
413 &src_table_name.table_name,
414 Some(&query_ctx),
415 )
416 .await
417 .map_err(BoxedError::new)
418 .context(ExternalSnafu)?
419 .with_context(|| TableNotFoundSnafu {
420 table_name: format_full_table_name(
421 &src_table_name.catalog_name,
422 &src_table_name.schema_name,
423 &src_table_name.table_name,
424 ),
425 })?;
426
427 if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
429 warn!(
430 "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
431 format_full_table_name(
432 &src_table_name.catalog_name,
433 &src_table_name.schema_name,
434 &src_table_name.table_name
435 ),
436 expr.flow_name
437 );
438 return Ok(FlowType::Streaming);
439 }
440 }
441
442 let engine = &self.query_engine;
443 let stmt = QueryLanguageParser::parse_sql(&expr.sql, &query_ctx)
444 .map_err(BoxedError::new)
445 .context(ExternalSnafu)?;
446 let plan = engine
447 .planner()
448 .plan(&stmt, query_ctx)
449 .await
450 .map_err(BoxedError::new)
451 .context(ExternalSnafu)?;
452
453 struct FindAggr {
455 is_aggr: bool,
456 }
457
458 impl TreeNodeVisitor<'_> for FindAggr {
459 type Node = LogicalPlan;
460 fn f_down(
461 &mut self,
462 node: &Self::Node,
463 ) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
464 {
465 match node {
466 LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
467 self.is_aggr = true;
468 return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
469 }
470 _ => (),
471 }
472 Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
473 }
474 }
475
476 let mut find_aggr = FindAggr { is_aggr: false };
477
478 plan.visit_with_subqueries(&mut find_aggr)
479 .context(BuildDfLogicalPlanSnafu)?;
480 if find_aggr.is_aggr {
481 Ok(FlowType::Batching)
482 } else {
483 Ok(FlowType::Streaming)
484 }
485 }
486
487 #[tracing::instrument(skip_all)]
488 pub async fn create_view(
489 &self,
490 create_view: CreateView,
491 ctx: QueryContextRef,
492 ) -> Result<TableRef> {
493 let logical_plan = match &*create_view.query {
495 Statement::Query(query) => {
496 self.plan(
497 &QueryStatement::Sql(Statement::Query(query.clone())),
498 ctx.clone(),
499 )
500 .await?
501 }
502 Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?,
503 _ => {
504 return InvalidViewStmtSnafu {}.fail();
505 }
506 };
507 let definition = create_view.to_string();
509
510 let schema: Schema = logical_plan
513 .schema()
514 .clone()
515 .try_into()
516 .context(ConvertSchemaSnafu)?;
517 let plan_columns: Vec<_> = schema
518 .column_schemas()
519 .iter()
520 .map(|c| c.name.clone())
521 .collect();
522
523 let columns: Vec<_> = create_view
524 .columns
525 .iter()
526 .map(|ident| ident.to_string())
527 .collect();
528
529 if !columns.is_empty() {
531 ensure!(
532 columns.len() == plan_columns.len(),
533 error::ViewColumnsMismatchSnafu {
534 view_name: create_view.name.to_string(),
535 expected: plan_columns.len(),
536 actual: columns.len(),
537 }
538 );
539 }
540
541 let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
544 .context(ExtractTableNamesSnafu)?;
545
546 let table_names = table_names.into_iter().map(|t| t.into()).collect();
547
548 let encoded_plan = DFLogicalSubstraitConvertor
555 .encode(&plan, DefaultSerializer)
556 .context(SubstraitCodecSnafu)?;
557
558 let expr = expr_helper::to_create_view_expr(
559 create_view,
560 encoded_plan.to_vec(),
561 table_names,
562 columns,
563 plan_columns,
564 definition,
565 ctx.clone(),
566 )?;
567
568 self.create_view_by_expr(expr, ctx).await
570 }
571
572 pub async fn create_view_by_expr(
573 &self,
574 expr: CreateViewExpr,
575 ctx: QueryContextRef,
576 ) -> Result<TableRef> {
577 ensure! {
578 !(expr.create_if_not_exists & expr.or_replace),
579 InvalidSqlSnafu {
580 err_msg: "syntax error Create Or Replace and If Not Exist cannot be used together",
581 }
582 };
583 let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
584
585 let schema_exists = self
586 .table_metadata_manager
587 .schema_manager()
588 .exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name))
589 .await
590 .context(TableMetadataManagerSnafu)?;
591
592 ensure!(
593 schema_exists,
594 SchemaNotFoundSnafu {
595 schema_info: &expr.schema_name,
596 }
597 );
598
599 if let Some(table) = self
601 .catalog_manager
602 .table(
603 &expr.catalog_name,
604 &expr.schema_name,
605 &expr.view_name,
606 Some(&ctx),
607 )
608 .await
609 .context(CatalogSnafu)?
610 {
611 let table_type = table.table_info().table_type;
612
613 match (table_type, expr.create_if_not_exists, expr.or_replace) {
614 (TableType::View, true, false) => {
615 return Ok(table);
616 }
617 (TableType::View, false, false) => {
618 return ViewAlreadyExistsSnafu {
619 name: format_full_table_name(
620 &expr.catalog_name,
621 &expr.schema_name,
622 &expr.view_name,
623 ),
624 }
625 .fail();
626 }
627 (TableType::View, _, true) => {
628 }
630 _ => {
631 return TableAlreadyExistsSnafu {
632 table: format_full_table_name(
633 &expr.catalog_name,
634 &expr.schema_name,
635 &expr.view_name,
636 ),
637 }
638 .fail();
639 }
640 }
641 }
642
643 ensure!(
644 NAME_PATTERN_REG.is_match(&expr.view_name),
645 InvalidViewNameSnafu {
646 name: expr.view_name.clone(),
647 }
648 );
649
650 let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
651
652 let mut view_info = RawTableInfo {
653 ident: metadata::TableIdent {
654 table_id: 0,
656 version: 0,
657 },
658 name: expr.view_name.clone(),
659 desc: None,
660 catalog_name: expr.catalog_name.clone(),
661 schema_name: expr.schema_name.clone(),
662 meta: RawTableMeta::default(),
664 table_type: TableType::View,
665 };
666
667 let request = SubmitDdlTaskRequest {
668 query_context: ctx,
669 task: DdlTask::new_create_view(expr, view_info.clone()),
670 };
671
672 let resp = self
673 .procedure_executor
674 .submit_ddl_task(&ExecutorContext::default(), request)
675 .await
676 .context(error::ExecuteDdlSnafu)?;
677
678 debug!(
679 "Submit creating view '{view_name}' task response: {:?}",
680 resp
681 );
682
683 let view_id = resp
684 .table_ids
685 .into_iter()
686 .next()
687 .context(error::UnexpectedSnafu {
688 violated: "expected table_id",
689 })?;
690 info!("Successfully created view '{view_name}' with view id {view_id}");
691
692 view_info.ident.table_id = view_id;
693
694 let view_info = Arc::new(view_info.try_into().context(CreateTableInfoSnafu)?);
695
696 let table = DistTable::table(view_info);
697
698 self.cache_invalidator
700 .invalidate(
701 &Context::default(),
702 &[
703 CacheIdent::TableId(view_id),
704 CacheIdent::TableName(view_name.clone()),
705 ],
706 )
707 .await
708 .context(error::InvalidateTableCacheSnafu)?;
709
710 Ok(table)
711 }
712
713 #[tracing::instrument(skip_all)]
714 pub async fn drop_flow(
715 &self,
716 catalog_name: String,
717 flow_name: String,
718 drop_if_exists: bool,
719 query_context: QueryContextRef,
720 ) -> Result<Output> {
721 if let Some(flow) = self
722 .flow_metadata_manager
723 .flow_name_manager()
724 .get(&catalog_name, &flow_name)
725 .await
726 .context(error::TableMetadataManagerSnafu)?
727 {
728 let flow_id = flow.flow_id();
729 let task = DropFlowTask {
730 catalog_name,
731 flow_name,
732 flow_id,
733 drop_if_exists,
734 };
735 self.drop_flow_procedure(task, query_context).await?;
736
737 Ok(Output::new_with_affected_rows(0))
738 } else if drop_if_exists {
739 Ok(Output::new_with_affected_rows(0))
740 } else {
741 FlowNotFoundSnafu {
742 flow_name: format_full_flow_name(&catalog_name, &flow_name),
743 }
744 .fail()
745 }
746 }
747
748 async fn drop_flow_procedure(
749 &self,
750 expr: DropFlowTask,
751 query_context: QueryContextRef,
752 ) -> Result<SubmitDdlTaskResponse> {
753 let request = SubmitDdlTaskRequest {
754 query_context,
755 task: DdlTask::new_drop_flow(expr),
756 };
757
758 self.procedure_executor
759 .submit_ddl_task(&ExecutorContext::default(), request)
760 .await
761 .context(error::ExecuteDdlSnafu)
762 }
763
764 #[tracing::instrument(skip_all)]
766 pub(crate) async fn drop_view(
767 &self,
768 catalog: String,
769 schema: String,
770 view: String,
771 drop_if_exists: bool,
772 query_context: QueryContextRef,
773 ) -> Result<Output> {
774 let view_info = if let Some(view) = self
775 .catalog_manager
776 .table(&catalog, &schema, &view, None)
777 .await
778 .context(CatalogSnafu)?
779 {
780 view.table_info()
781 } else if drop_if_exists {
782 return Ok(Output::new_with_affected_rows(0));
784 } else {
785 return TableNotFoundSnafu {
786 table_name: format_full_table_name(&catalog, &schema, &view),
787 }
788 .fail();
789 };
790
791 ensure!(
793 view_info.table_type == TableType::View,
794 error::InvalidViewSnafu {
795 msg: "not a view",
796 view_name: format_full_table_name(&catalog, &schema, &view),
797 }
798 );
799
800 let view_id = view_info.table_id();
801
802 let task = DropViewTask {
803 catalog,
804 schema,
805 view,
806 view_id,
807 drop_if_exists,
808 };
809
810 self.drop_view_procedure(task, query_context).await?;
811
812 Ok(Output::new_with_affected_rows(0))
813 }
814
815 async fn drop_view_procedure(
817 &self,
818 expr: DropViewTask,
819 query_context: QueryContextRef,
820 ) -> Result<SubmitDdlTaskResponse> {
821 let request = SubmitDdlTaskRequest {
822 query_context,
823 task: DdlTask::new_drop_view(expr),
824 };
825
826 self.procedure_executor
827 .submit_ddl_task(&ExecutorContext::default(), request)
828 .await
829 .context(error::ExecuteDdlSnafu)
830 }
831
832 #[tracing::instrument(skip_all)]
833 pub async fn alter_logical_tables(
834 &self,
835 alter_table_exprs: Vec<AlterTableExpr>,
836 query_context: QueryContextRef,
837 ) -> Result<Output> {
838 let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
839 ensure!(
840 !alter_table_exprs.is_empty(),
841 EmptyDdlExprSnafu {
842 name: "alter logical tables"
843 }
844 );
845
846 self.alter_logical_tables_procedure(alter_table_exprs, query_context)
847 .await?;
848
849 Ok(Output::new_with_affected_rows(0))
850 }
851
852 #[tracing::instrument(skip_all)]
853 pub async fn drop_table(
854 &self,
855 table_name: TableName,
856 drop_if_exists: bool,
857 query_context: QueryContextRef,
858 ) -> Result<Output> {
859 self.drop_tables(&[table_name], drop_if_exists, query_context)
861 .await
862 }
863
864 #[tracing::instrument(skip_all)]
865 pub async fn drop_tables(
866 &self,
867 table_names: &[TableName],
868 drop_if_exists: bool,
869 query_context: QueryContextRef,
870 ) -> Result<Output> {
871 let mut tables = Vec::with_capacity(table_names.len());
872 for table_name in table_names {
873 ensure!(
874 !is_readonly_schema(&table_name.schema_name),
875 SchemaReadOnlySnafu {
876 name: table_name.schema_name.clone()
877 }
878 );
879
880 if let Some(table) = self
881 .catalog_manager
882 .table(
883 &table_name.catalog_name,
884 &table_name.schema_name,
885 &table_name.table_name,
886 Some(&query_context),
887 )
888 .await
889 .context(CatalogSnafu)?
890 {
891 tables.push(table.table_info().table_id());
892 } else if drop_if_exists {
893 continue;
895 } else {
896 return TableNotFoundSnafu {
897 table_name: table_name.to_string(),
898 }
899 .fail();
900 }
901 }
902
903 for (table_name, table_id) in table_names.iter().zip(tables.into_iter()) {
904 self.drop_table_procedure(table_name, table_id, drop_if_exists, query_context.clone())
905 .await?;
906
907 self.cache_invalidator
909 .invalidate(
910 &Context::default(),
911 &[
912 CacheIdent::TableId(table_id),
913 CacheIdent::TableName(table_name.clone()),
914 ],
915 )
916 .await
917 .context(error::InvalidateTableCacheSnafu)?;
918 }
919 Ok(Output::new_with_affected_rows(0))
920 }
921
922 #[tracing::instrument(skip_all)]
923 pub async fn drop_database(
924 &self,
925 catalog: String,
926 schema: String,
927 drop_if_exists: bool,
928 query_context: QueryContextRef,
929 ) -> Result<Output> {
930 ensure!(
931 !is_readonly_schema(&schema),
932 SchemaReadOnlySnafu { name: schema }
933 );
934
935 if self
936 .catalog_manager
937 .schema_exists(&catalog, &schema, None)
938 .await
939 .context(CatalogSnafu)?
940 {
941 if schema == query_context.current_schema() {
942 SchemaInUseSnafu { name: schema }.fail()
943 } else {
944 self.drop_database_procedure(catalog, schema, drop_if_exists, query_context)
945 .await?;
946
947 Ok(Output::new_with_affected_rows(0))
948 }
949 } else if drop_if_exists {
950 Ok(Output::new_with_affected_rows(0))
952 } else {
953 SchemaNotFoundSnafu {
954 schema_info: schema,
955 }
956 .fail()
957 }
958 }
959
960 #[tracing::instrument(skip_all)]
961 pub async fn truncate_table(
962 &self,
963 table_name: TableName,
964 query_context: QueryContextRef,
965 ) -> Result<Output> {
966 ensure!(
967 !is_readonly_schema(&table_name.schema_name),
968 SchemaReadOnlySnafu {
969 name: table_name.schema_name.clone()
970 }
971 );
972
973 let table = self
974 .catalog_manager
975 .table(
976 &table_name.catalog_name,
977 &table_name.schema_name,
978 &table_name.table_name,
979 Some(&query_context),
980 )
981 .await
982 .context(CatalogSnafu)?
983 .with_context(|| TableNotFoundSnafu {
984 table_name: table_name.to_string(),
985 })?;
986 let table_id = table.table_info().table_id();
987 self.truncate_table_procedure(&table_name, table_id, query_context)
988 .await?;
989
990 Ok(Output::new_with_affected_rows(0))
991 }
992
993 fn verify_alter(
999 &self,
1000 table_id: TableId,
1001 table_info: Arc<TableInfo>,
1002 expr: AlterTableExpr,
1003 ) -> Result<bool> {
1004 let request: AlterTableRequest = common_grpc_expr::alter_expr_to_request(table_id, expr)
1005 .context(AlterExprToRequestSnafu)?;
1006
1007 let AlterTableRequest {
1008 table_name,
1009 alter_kind,
1010 ..
1011 } = &request;
1012
1013 if let AlterKind::RenameTable { new_table_name } = alter_kind {
1014 ensure!(
1015 NAME_PATTERN_REG.is_match(new_table_name),
1016 error::UnexpectedSnafu {
1017 violated: format!("Invalid table name: {}", new_table_name)
1018 }
1019 );
1020 } else if let AlterKind::AddColumns { columns } = alter_kind {
1021 let column_names: HashSet<_> = table_info
1024 .meta
1025 .schema
1026 .column_schemas()
1027 .iter()
1028 .map(|schema| &schema.name)
1029 .collect();
1030 if columns.iter().all(|column| {
1031 column_names.contains(&column.column_schema.name) && column.add_if_not_exists
1032 }) {
1033 return Ok(false);
1034 }
1035 }
1036
1037 let _ = table_info
1038 .meta
1039 .builder_with_alter_kind(table_name, &request.alter_kind)
1040 .context(error::TableSnafu)?
1041 .build()
1042 .context(error::BuildTableMetaSnafu { table_name })?;
1043
1044 Ok(true)
1045 }
1046
1047 #[tracing::instrument(skip_all)]
1048 pub async fn alter_table(
1049 &self,
1050 alter_table: AlterTable,
1051 query_context: QueryContextRef,
1052 ) -> Result<Output> {
1053 let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
1054 self.alter_table_inner(expr, query_context).await
1055 }
1056
1057 #[tracing::instrument(skip_all)]
1058 pub async fn alter_table_inner(
1059 &self,
1060 expr: AlterTableExpr,
1061 query_context: QueryContextRef,
1062 ) -> Result<Output> {
1063 ensure!(
1064 !is_readonly_schema(&expr.schema_name),
1065 SchemaReadOnlySnafu {
1066 name: expr.schema_name.clone()
1067 }
1068 );
1069
1070 let catalog_name = if expr.catalog_name.is_empty() {
1071 DEFAULT_CATALOG_NAME.to_string()
1072 } else {
1073 expr.catalog_name.clone()
1074 };
1075
1076 let schema_name = if expr.schema_name.is_empty() {
1077 DEFAULT_SCHEMA_NAME.to_string()
1078 } else {
1079 expr.schema_name.clone()
1080 };
1081
1082 let table_name = expr.table_name.clone();
1083
1084 let table = self
1085 .catalog_manager
1086 .table(
1087 &catalog_name,
1088 &schema_name,
1089 &table_name,
1090 Some(&query_context),
1091 )
1092 .await
1093 .context(CatalogSnafu)?
1094 .with_context(|| TableNotFoundSnafu {
1095 table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
1096 })?;
1097
1098 let table_id = table.table_info().ident.table_id;
1099 let need_alter = self.verify_alter(table_id, table.table_info(), expr.clone())?;
1100 if !need_alter {
1101 return Ok(Output::new_with_affected_rows(0));
1102 }
1103 info!(
1104 "Table info before alter is {:?}, expr: {:?}",
1105 table.table_info(),
1106 expr
1107 );
1108
1109 let physical_table_id = self
1110 .table_metadata_manager
1111 .table_route_manager()
1112 .get_physical_table_id(table_id)
1113 .await
1114 .context(TableMetadataManagerSnafu)?;
1115
1116 let (req, invalidate_keys) = if physical_table_id == table_id {
1117 let req = SubmitDdlTaskRequest {
1119 query_context,
1120 task: DdlTask::new_alter_table(expr),
1121 };
1122
1123 let invalidate_keys = vec![
1124 CacheIdent::TableId(table_id),
1125 CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1126 ];
1127
1128 (req, invalidate_keys)
1129 } else {
1130 let req = SubmitDdlTaskRequest {
1132 query_context,
1133 task: DdlTask::new_alter_logical_tables(vec![expr]),
1134 };
1135
1136 let mut invalidate_keys = vec![
1137 CacheIdent::TableId(physical_table_id),
1138 CacheIdent::TableId(table_id),
1139 CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1140 ];
1141
1142 let physical_table = self
1143 .table_metadata_manager
1144 .table_info_manager()
1145 .get(physical_table_id)
1146 .await
1147 .context(TableMetadataManagerSnafu)?
1148 .map(|x| x.into_inner());
1149 if let Some(physical_table) = physical_table {
1150 let physical_table_name = TableName::new(
1151 physical_table.table_info.catalog_name,
1152 physical_table.table_info.schema_name,
1153 physical_table.table_info.name,
1154 );
1155 invalidate_keys.push(CacheIdent::TableName(physical_table_name));
1156 }
1157
1158 (req, invalidate_keys)
1159 };
1160
1161 self.procedure_executor
1162 .submit_ddl_task(&ExecutorContext::default(), req)
1163 .await
1164 .context(error::ExecuteDdlSnafu)?;
1165
1166 self.cache_invalidator
1168 .invalidate(&Context::default(), &invalidate_keys)
1169 .await
1170 .context(error::InvalidateTableCacheSnafu)?;
1171
1172 Ok(Output::new_with_affected_rows(0))
1173 }
1174
1175 #[tracing::instrument(skip_all)]
1176 pub async fn alter_database(
1177 &self,
1178 alter_expr: AlterDatabase,
1179 query_context: QueryContextRef,
1180 ) -> Result<Output> {
1181 let alter_expr = expr_helper::to_alter_database_expr(alter_expr, &query_context)?;
1182 self.alter_database_inner(alter_expr, query_context).await
1183 }
1184
1185 #[tracing::instrument(skip_all)]
1186 pub async fn alter_database_inner(
1187 &self,
1188 alter_expr: AlterDatabaseExpr,
1189 query_context: QueryContextRef,
1190 ) -> Result<Output> {
1191 ensure!(
1192 !is_readonly_schema(&alter_expr.schema_name),
1193 SchemaReadOnlySnafu {
1194 name: query_context.current_schema().clone()
1195 }
1196 );
1197
1198 let exists = self
1199 .catalog_manager
1200 .schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
1201 .await
1202 .context(CatalogSnafu)?;
1203 ensure!(
1204 exists,
1205 SchemaNotFoundSnafu {
1206 schema_info: alter_expr.schema_name,
1207 }
1208 );
1209
1210 let cache_ident = [CacheIdent::SchemaName(SchemaName {
1211 catalog_name: alter_expr.catalog_name.clone(),
1212 schema_name: alter_expr.schema_name.clone(),
1213 })];
1214
1215 self.alter_database_procedure(alter_expr, query_context)
1216 .await?;
1217
1218 self.cache_invalidator
1220 .invalidate(&Context::default(), &cache_ident)
1221 .await
1222 .context(error::InvalidateTableCacheSnafu)?;
1223
1224 Ok(Output::new_with_affected_rows(0))
1225 }
1226
1227 async fn create_table_procedure(
1228 &self,
1229 create_table: CreateTableExpr,
1230 partitions: Vec<Partition>,
1231 table_info: RawTableInfo,
1232 query_context: QueryContextRef,
1233 ) -> Result<SubmitDdlTaskResponse> {
1234 let partitions = partitions.into_iter().map(Into::into).collect();
1235
1236 let request = SubmitDdlTaskRequest {
1237 query_context,
1238 task: DdlTask::new_create_table(create_table, partitions, table_info),
1239 };
1240
1241 self.procedure_executor
1242 .submit_ddl_task(&ExecutorContext::default(), request)
1243 .await
1244 .context(error::ExecuteDdlSnafu)
1245 }
1246
1247 async fn create_logical_tables_procedure(
1248 &self,
1249 tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
1250 query_context: QueryContextRef,
1251 ) -> Result<SubmitDdlTaskResponse> {
1252 let request = SubmitDdlTaskRequest {
1253 query_context,
1254 task: DdlTask::new_create_logical_tables(tables_data),
1255 };
1256
1257 self.procedure_executor
1258 .submit_ddl_task(&ExecutorContext::default(), request)
1259 .await
1260 .context(error::ExecuteDdlSnafu)
1261 }
1262
1263 async fn alter_logical_tables_procedure(
1264 &self,
1265 tables_data: Vec<AlterTableExpr>,
1266 query_context: QueryContextRef,
1267 ) -> Result<SubmitDdlTaskResponse> {
1268 let request = SubmitDdlTaskRequest {
1269 query_context,
1270 task: DdlTask::new_alter_logical_tables(tables_data),
1271 };
1272
1273 self.procedure_executor
1274 .submit_ddl_task(&ExecutorContext::default(), request)
1275 .await
1276 .context(error::ExecuteDdlSnafu)
1277 }
1278
1279 async fn drop_table_procedure(
1280 &self,
1281 table_name: &TableName,
1282 table_id: TableId,
1283 drop_if_exists: bool,
1284 query_context: QueryContextRef,
1285 ) -> Result<SubmitDdlTaskResponse> {
1286 let request = SubmitDdlTaskRequest {
1287 query_context,
1288 task: DdlTask::new_drop_table(
1289 table_name.catalog_name.to_string(),
1290 table_name.schema_name.to_string(),
1291 table_name.table_name.to_string(),
1292 table_id,
1293 drop_if_exists,
1294 ),
1295 };
1296
1297 self.procedure_executor
1298 .submit_ddl_task(&ExecutorContext::default(), request)
1299 .await
1300 .context(error::ExecuteDdlSnafu)
1301 }
1302
1303 async fn drop_database_procedure(
1304 &self,
1305 catalog: String,
1306 schema: String,
1307 drop_if_exists: bool,
1308 query_context: QueryContextRef,
1309 ) -> Result<SubmitDdlTaskResponse> {
1310 let request = SubmitDdlTaskRequest {
1311 query_context,
1312 task: DdlTask::new_drop_database(catalog, schema, drop_if_exists),
1313 };
1314
1315 self.procedure_executor
1316 .submit_ddl_task(&ExecutorContext::default(), request)
1317 .await
1318 .context(error::ExecuteDdlSnafu)
1319 }
1320
1321 async fn alter_database_procedure(
1322 &self,
1323 alter_expr: AlterDatabaseExpr,
1324 query_context: QueryContextRef,
1325 ) -> Result<SubmitDdlTaskResponse> {
1326 let request = SubmitDdlTaskRequest {
1327 query_context,
1328 task: DdlTask::new_alter_database(alter_expr),
1329 };
1330
1331 self.procedure_executor
1332 .submit_ddl_task(&ExecutorContext::default(), request)
1333 .await
1334 .context(error::ExecuteDdlSnafu)
1335 }
1336
1337 async fn truncate_table_procedure(
1338 &self,
1339 table_name: &TableName,
1340 table_id: TableId,
1341 query_context: QueryContextRef,
1342 ) -> Result<SubmitDdlTaskResponse> {
1343 let request = SubmitDdlTaskRequest {
1344 query_context,
1345 task: DdlTask::new_truncate_table(
1346 table_name.catalog_name.to_string(),
1347 table_name.schema_name.to_string(),
1348 table_name.table_name.to_string(),
1349 table_id,
1350 ),
1351 };
1352
1353 self.procedure_executor
1354 .submit_ddl_task(&ExecutorContext::default(), request)
1355 .await
1356 .context(error::ExecuteDdlSnafu)
1357 }
1358
1359 #[tracing::instrument(skip_all)]
1360 pub async fn create_database(
1361 &self,
1362 database: &str,
1363 create_if_not_exists: bool,
1364 options: HashMap<String, String>,
1365 query_context: QueryContextRef,
1366 ) -> Result<Output> {
1367 let catalog = query_context.current_catalog();
1368 ensure!(
1369 NAME_PATTERN_REG.is_match(catalog),
1370 error::UnexpectedSnafu {
1371 violated: format!("Invalid catalog name: {}", catalog)
1372 }
1373 );
1374
1375 ensure!(
1376 NAME_PATTERN_REG.is_match(database),
1377 error::UnexpectedSnafu {
1378 violated: format!("Invalid database name: {}", database)
1379 }
1380 );
1381
1382 if !self
1383 .catalog_manager
1384 .schema_exists(catalog, database, None)
1385 .await
1386 .context(CatalogSnafu)?
1387 && !self.catalog_manager.is_reserved_schema_name(database)
1388 {
1389 self.create_database_procedure(
1390 catalog.to_string(),
1391 database.to_string(),
1392 create_if_not_exists,
1393 options,
1394 query_context,
1395 )
1396 .await?;
1397
1398 Ok(Output::new_with_affected_rows(1))
1399 } else if create_if_not_exists {
1400 Ok(Output::new_with_affected_rows(1))
1401 } else {
1402 error::SchemaExistsSnafu { name: database }.fail()
1403 }
1404 }
1405
1406 async fn create_database_procedure(
1407 &self,
1408 catalog: String,
1409 database: String,
1410 create_if_not_exists: bool,
1411 options: HashMap<String, String>,
1412 query_context: QueryContextRef,
1413 ) -> Result<SubmitDdlTaskResponse> {
1414 let request = SubmitDdlTaskRequest {
1415 query_context,
1416 task: DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
1417 };
1418
1419 self.procedure_executor
1420 .submit_ddl_task(&ExecutorContext::default(), request)
1421 .await
1422 .context(error::ExecuteDdlSnafu)
1423 }
1424}
1425
1426fn parse_partitions(
1428 create_table: &CreateTableExpr,
1429 partitions: Option<Partitions>,
1430 query_ctx: &QueryContextRef,
1431) -> Result<(Vec<MetaPartition>, Vec<String>)> {
1432 let partition_columns = find_partition_columns(&partitions)?;
1435 let partition_entries =
1436 find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
1437
1438 let mut exprs = vec![];
1440 for partition in &partition_entries {
1441 for bound in partition {
1442 if let PartitionBound::Expr(expr) = bound {
1443 exprs.push(expr.clone());
1444 }
1445 }
1446 }
1447 MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs)
1448 .context(InvalidPartitionSnafu)?;
1449
1450 Ok((
1451 partition_entries
1452 .into_iter()
1453 .map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x)))
1454 .collect::<std::result::Result<_, _>>()
1455 .context(DeserializePartitionSnafu)?,
1456 partition_columns,
1457 ))
1458}
1459
1460fn create_table_info(
1461 create_table: &CreateTableExpr,
1462 partition_columns: Vec<String>,
1463) -> Result<RawTableInfo> {
1464 let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
1465 let mut column_name_to_index_map = HashMap::new();
1466
1467 for (idx, column) in create_table.column_defs.iter().enumerate() {
1468 let schema =
1469 column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
1470 column: &column.name,
1471 })?;
1472 let schema = schema.with_time_index(column.name == create_table.time_index);
1473
1474 column_schemas.push(schema);
1475 let _ = column_name_to_index_map.insert(column.name.clone(), idx);
1476 }
1477
1478 let timestamp_index = column_name_to_index_map
1479 .get(&create_table.time_index)
1480 .cloned();
1481
1482 let raw_schema = RawSchema {
1483 column_schemas: column_schemas.clone(),
1484 timestamp_index,
1485 version: 0,
1486 };
1487
1488 let primary_key_indices = create_table
1489 .primary_keys
1490 .iter()
1491 .map(|name| {
1492 column_name_to_index_map
1493 .get(name)
1494 .cloned()
1495 .context(ColumnNotFoundSnafu { msg: name })
1496 })
1497 .collect::<Result<Vec<_>>>()?;
1498
1499 let partition_key_indices = partition_columns
1500 .into_iter()
1501 .map(|col_name| {
1502 column_name_to_index_map
1503 .get(&col_name)
1504 .cloned()
1505 .context(ColumnNotFoundSnafu { msg: col_name })
1506 })
1507 .collect::<Result<Vec<_>>>()?;
1508
1509 let table_options = TableOptions::try_from_iter(&create_table.table_options)
1510 .context(UnrecognizedTableOptionSnafu)?;
1511
1512 let meta = RawTableMeta {
1513 schema: raw_schema,
1514 primary_key_indices,
1515 value_indices: vec![],
1516 engine: create_table.engine.clone(),
1517 next_column_id: column_schemas.len() as u32,
1518 region_numbers: vec![],
1519 options: table_options,
1520 created_on: Utc::now(),
1521 partition_key_indices,
1522 };
1523
1524 let desc = if create_table.desc.is_empty() {
1525 create_table.table_options.get(COMMENT_KEY).cloned()
1526 } else {
1527 Some(create_table.desc.clone())
1528 };
1529
1530 let table_info = RawTableInfo {
1531 ident: metadata::TableIdent {
1532 table_id: 0,
1534 version: 0,
1535 },
1536 name: create_table.table_name.clone(),
1537 desc,
1538 catalog_name: create_table.catalog_name.clone(),
1539 schema_name: create_table.schema_name.clone(),
1540 meta,
1541 table_type: TableType::Base,
1542 };
1543 Ok(table_info)
1544}
1545
1546fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
1547 let columns = if let Some(partitions) = partitions {
1548 partitions
1549 .column_list
1550 .iter()
1551 .map(|x| x.value.clone())
1552 .collect::<Vec<_>>()
1553 } else {
1554 vec![]
1555 };
1556 Ok(columns)
1557}
1558
1559fn find_partition_entries(
1563 create_table: &CreateTableExpr,
1564 partitions: &Option<Partitions>,
1565 partition_columns: &[String],
1566 query_ctx: &QueryContextRef,
1567) -> Result<Vec<Vec<PartitionBound>>> {
1568 let entries = if let Some(partitions) = partitions {
1569 let column_defs = partition_columns
1571 .iter()
1572 .map(|pc| {
1573 create_table
1574 .column_defs
1575 .iter()
1576 .find(|c| &c.name == pc)
1577 .unwrap()
1579 })
1580 .collect::<Vec<_>>();
1581 let mut column_name_and_type = HashMap::with_capacity(column_defs.len());
1582 for column in column_defs {
1583 let column_name = &column.name;
1584 let data_type = ConcreteDataType::from(
1585 ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension)
1586 .context(ColumnDataTypeSnafu)?,
1587 );
1588 column_name_and_type.insert(column_name, data_type);
1589 }
1590
1591 let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
1593 for partition in &partitions.exprs {
1594 let partition_expr =
1595 convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
1596 partition_exprs.push(vec![PartitionBound::Expr(partition_expr)]);
1597 }
1598
1599 if partition_exprs.is_empty() {
1601 partition_exprs.push(vec![PartitionBound::MaxValue]);
1602 }
1603
1604 partition_exprs
1605 } else {
1606 vec![vec![PartitionBound::MaxValue]]
1607 };
1608 Ok(entries)
1609}
1610
1611fn convert_one_expr(
1612 expr: &Expr,
1613 column_name_and_type: &HashMap<&String, ConcreteDataType>,
1614 timezone: &Timezone,
1615) -> Result<PartitionExpr> {
1616 let Expr::BinaryOp { left, op, right } = expr else {
1617 return InvalidPartitionRuleSnafu {
1618 reason: "partition rule must be a binary expression",
1619 }
1620 .fail();
1621 };
1622
1623 let op =
1624 RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
1625 reason: format!("unsupported operator in partition expr {op}"),
1626 })?;
1627
1628 let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
1630 (Expr::Identifier(ident), Expr::Value(value)) => {
1632 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1633 let value = convert_value(value, data_type, timezone, None)?;
1634 (Operand::Column(column_name), op, Operand::Value(value))
1635 }
1636 (Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
1637 if let Expr::Value(v) = &**expr =>
1638 {
1639 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1640 let value = convert_value(v, data_type, timezone, Some(*unary_op))?;
1641 (Operand::Column(column_name), op, Operand::Value(value))
1642 }
1643 (Expr::Value(value), Expr::Identifier(ident)) => {
1645 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1646 let value = convert_value(value, data_type, timezone, None)?;
1647 (Operand::Value(value), op, Operand::Column(column_name))
1648 }
1649 (Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
1650 if let Expr::Value(v) = &**expr =>
1651 {
1652 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1653 let value = convert_value(v, data_type, timezone, Some(*unary_op))?;
1654 (Operand::Value(value), op, Operand::Column(column_name))
1655 }
1656 (Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
1657 let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
1659 let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
1660 (Operand::Expr(lhs), op, Operand::Expr(rhs))
1661 }
1662 _ => {
1663 return InvalidPartitionRuleSnafu {
1664 reason: format!("invalid partition expr {expr}"),
1665 }
1666 .fail();
1667 }
1668 };
1669
1670 Ok(PartitionExpr::new(lhs, op, rhs))
1671}
1672
1673fn convert_identifier(
1674 ident: &Ident,
1675 column_name_and_type: &HashMap<&String, ConcreteDataType>,
1676) -> Result<(String, ConcreteDataType)> {
1677 let column_name = ident.value.clone();
1678 let data_type = column_name_and_type
1679 .get(&column_name)
1680 .cloned()
1681 .with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
1682 Ok((column_name, data_type))
1683}
1684
1685fn convert_value(
1686 value: &ParserValue,
1687 data_type: ConcreteDataType,
1688 timezone: &Timezone,
1689 unary_op: Option<UnaryOperator>,
1690) -> Result<Value> {
1691 sql_value_to_value(
1692 "<NONAME>",
1693 &data_type,
1694 value,
1695 Some(timezone),
1696 unary_op,
1697 false,
1698 )
1699 .context(ParseSqlValueSnafu)
1700}
1701
1702#[cfg(test)]
1703mod test {
1704 use session::context::{QueryContext, QueryContextBuilder};
1705 use sql::dialect::GreptimeDbDialect;
1706 use sql::parser::{ParseOptions, ParserContext};
1707 use sql::statements::statement::Statement;
1708
1709 use super::*;
1710 use crate::expr_helper;
1711
1712 #[test]
1713 fn test_name_is_match() {
1714 assert!(!NAME_PATTERN_REG.is_match("/adaf"));
1715 assert!(!NAME_PATTERN_REG.is_match("🈲"));
1716 assert!(NAME_PATTERN_REG.is_match("hello"));
1717 assert!(NAME_PATTERN_REG.is_match("test@"));
1718 assert!(!NAME_PATTERN_REG.is_match("@test"));
1719 assert!(NAME_PATTERN_REG.is_match("test#"));
1720 assert!(!NAME_PATTERN_REG.is_match("#test"));
1721 assert!(!NAME_PATTERN_REG.is_match("@"));
1722 assert!(!NAME_PATTERN_REG.is_match("#"));
1723 }
1724
1725 #[tokio::test]
1726 #[ignore = "TODO(ruihang): WIP new partition rule"]
1727 async fn test_parse_partitions() {
1728 common_telemetry::init_default_ut_logging();
1729 let cases = [
1730 (
1731 r"
1732CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
1733PARTITION ON COLUMNS (b) (
1734 b < 'hz',
1735 b >= 'hz' AND b < 'sh',
1736 b >= 'sh'
1737)
1738ENGINE=mito",
1739 r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
1740 ),
1741 (
1742 r"
1743CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
1744PARTITION BY RANGE COLUMNS (b, a) (
1745 PARTITION r0 VALUES LESS THAN ('hz', 10),
1746 b < 'hz' AND a < 10,
1747 b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20,
1748 b >= 'sh' AND a >= 20
1749)
1750ENGINE=mito",
1751 r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
1752 ),
1753 ];
1754 let ctx = QueryContextBuilder::default().build().into();
1755 for (sql, expected) in cases {
1756 let result = ParserContext::create_with_dialect(
1757 sql,
1758 &GreptimeDbDialect {},
1759 ParseOptions::default(),
1760 )
1761 .unwrap();
1762 match &result[0] {
1763 Statement::CreateTable(c) => {
1764 let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
1765 let (partitions, _) =
1766 parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
1767 let json = serde_json::to_string(&partitions).unwrap();
1768 assert_eq!(json, expected);
1769 }
1770 _ => unreachable!(),
1771 }
1772 }
1773 }
1774}