1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::helper::ColumnDataTypeWrapper;
20use api::v1::alter_table_expr::Kind;
21use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
22use api::v1::{
23 AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
24 Repartition, column_def,
25};
26#[cfg(feature = "enterprise")]
27use api::v1::{
28 CreateTriggerExpr as PbCreateTriggerExpr, meta::CreateTriggerTask as PbCreateTriggerTask,
29};
30use catalog::CatalogManagerRef;
31use chrono::Utc;
32use common_base::regex_pattern::NAME_PATTERN_REG;
33use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, is_readonly_schema};
34use common_catalog::{format_full_flow_name, format_full_table_name};
35use common_error::ext::BoxedError;
36use common_meta::cache_invalidator::Context;
37use common_meta::ddl::create_flow::FlowType;
38use common_meta::instruction::CacheIdent;
39use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
40use common_meta::procedure_executor::ExecutorContext;
41#[cfg(feature = "enterprise")]
42use common_meta::rpc::ddl::trigger::CreateTriggerTask;
43#[cfg(feature = "enterprise")]
44use common_meta::rpc::ddl::trigger::DropTriggerTask;
45use common_meta::rpc::ddl::{
46 CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
47 SubmitDdlTaskResponse,
48};
49use common_query::Output;
50use common_recordbatch::{RecordBatch, RecordBatches};
51use common_sql::convert::sql_value_to_value;
52use common_telemetry::{debug, info, tracing, warn};
53use common_time::{Timestamp, Timezone};
54use datafusion_common::tree_node::TreeNodeVisitor;
55use datafusion_expr::LogicalPlan;
56use datatypes::prelude::ConcreteDataType;
57use datatypes::schema::{ColumnSchema, Schema};
58use datatypes::value::Value;
59use datatypes::vectors::{StringVector, VectorRef};
60use humantime::parse_duration;
61use partition::expr::{Operand, PartitionExpr, RestrictedOp};
62use partition::multi_dim::MultiDimPartitionRule;
63use query::parser::QueryStatement;
64use query::plan::extract_and_rewrite_full_table_names;
65use query::query_engine::DefaultSerializer;
66use query::sql::create_table_stmt;
67use session::context::QueryContextRef;
68use session::table_name::table_idents_to_full_name;
69use snafu::{OptionExt, ResultExt, ensure};
70use sql::parser::{ParseOptions, ParserContext};
71use sql::parsers::utils::is_tql;
72use sql::statements::OptionMap;
73#[cfg(feature = "enterprise")]
74use sql::statements::alter::trigger::AlterTrigger;
75use sql::statements::alter::{AlterDatabase, AlterTable, AlterTableOperation};
76#[cfg(feature = "enterprise")]
77use sql::statements::create::trigger::CreateTrigger;
78use sql::statements::create::{
79 CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
80};
81use sql::statements::statement::Statement;
82use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
83use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
84use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
85use table::TableRef;
86use table::dist_table::DistTable;
87use table::metadata::{self, TableId, TableInfo, TableMeta, TableType};
88use table::requests::{
89 AlterKind, AlterTableRequest, COMMENT_KEY, DDL_TIMEOUT, DDL_WAIT, TableOptions,
90};
91use table::table_name::TableName;
92use table::table_reference::TableReference;
93
94use crate::error::{
95 self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
96 ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu,
97 DeserializePartitionExprSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
98 FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
99 InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu,
100 PartitionExprToPbSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu,
101 SerializePartitionExprSnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu,
102 TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
103 ViewAlreadyExistsSnafu,
104};
105use crate::expr_helper::{self, RepartitionRequest};
106use crate::statement::StatementExecutor;
107use crate::statement::show::create_partitions_stmt;
108use crate::utils::to_meta_query_context;
109
110#[derive(Debug, Clone, Copy)]
111struct DdlSubmitOptions {
112 wait: bool,
113 timeout: Duration,
114}
115
116fn build_procedure_id_output(procedure_id: Vec<u8>) -> Result<Output> {
117 let procedure_id = String::from_utf8_lossy(&procedure_id).to_string();
118 let vector: VectorRef = Arc::new(StringVector::from(vec![procedure_id]));
119 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
120 "Procedure ID",
121 vector.data_type(),
122 false,
123 )]));
124 let batch =
125 RecordBatch::new(schema.clone(), vec![vector]).context(error::BuildRecordBatchSnafu)?;
126 let batches =
127 RecordBatches::try_new(schema, vec![batch]).context(error::BuildRecordBatchSnafu)?;
128 Ok(Output::new_with_record_batches(batches))
129}
130
131fn parse_ddl_options(options: &OptionMap) -> Result<DdlSubmitOptions> {
132 let wait = match options.get(DDL_WAIT) {
133 Some(value) => value.parse::<bool>().map_err(|_| {
134 InvalidSqlSnafu {
135 err_msg: format!("invalid DDL option '{DDL_WAIT}': '{value}'"),
136 }
137 .build()
138 })?,
139 None => SubmitDdlTaskRequest::default_wait(),
140 };
141
142 let timeout = match options.get(DDL_TIMEOUT) {
143 Some(value) => parse_duration(value).map_err(|err| {
144 InvalidSqlSnafu {
145 err_msg: format!("invalid DDL option '{DDL_TIMEOUT}': '{value}': {err}"),
146 }
147 .build()
148 })?,
149 None => SubmitDdlTaskRequest::default_timeout(),
150 };
151
152 Ok(DdlSubmitOptions { wait, timeout })
153}
154
155impl StatementExecutor {
156 pub fn catalog_manager(&self) -> CatalogManagerRef {
157 self.catalog_manager.clone()
158 }
159
160 #[tracing::instrument(skip_all)]
161 pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
162 let (catalog, schema, _table) = table_idents_to_full_name(&stmt.name, &ctx)
163 .map_err(BoxedError::new)
164 .context(error::ExternalSnafu)?;
165
166 let schema_options = self
167 .table_metadata_manager
168 .schema_manager()
169 .get(SchemaNameKey {
170 catalog: &catalog,
171 schema: &schema,
172 })
173 .await
174 .context(TableMetadataManagerSnafu)?
175 .map(|v| v.into_inner());
176
177 let create_expr = &mut expr_helper::create_to_expr(&stmt, &ctx)?;
178 if let Some(schema_options) = schema_options {
181 for (key, value) in schema_options.extra_options.iter() {
182 if key.starts_with("compaction.") {
183 continue;
184 }
185 create_expr
186 .table_options
187 .entry(key.clone())
188 .or_insert(value.clone());
189 }
190 }
191
192 self.create_table_inner(create_expr, stmt.partitions, ctx)
193 .await
194 }
195
196 #[tracing::instrument(skip_all)]
197 pub async fn create_table_like(
198 &self,
199 stmt: CreateTableLike,
200 ctx: QueryContextRef,
201 ) -> Result<TableRef> {
202 let (catalog, schema, table) = table_idents_to_full_name(&stmt.source_name, &ctx)
203 .map_err(BoxedError::new)
204 .context(error::ExternalSnafu)?;
205 let table_ref = self
206 .catalog_manager
207 .table(&catalog, &schema, &table, Some(&ctx))
208 .await
209 .context(CatalogSnafu)?
210 .context(TableNotFoundSnafu { table_name: &table })?;
211 let partition_info = self
212 .partition_manager
213 .find_physical_partition_info(table_ref.table_info().table_id())
214 .await
215 .context(error::FindTablePartitionRuleSnafu { table_name: table })?;
216
217 let schema_options = self
219 .table_metadata_manager
220 .schema_manager()
221 .get(SchemaNameKey {
222 catalog: &catalog,
223 schema: &schema,
224 })
225 .await
226 .context(TableMetadataManagerSnafu)?
227 .map(|v| v.into_inner());
228
229 let quote_style = ctx.quote_style();
230 let mut create_stmt =
231 create_table_stmt(&table_ref.table_info(), schema_options, quote_style)
232 .context(error::ParseQuerySnafu)?;
233 create_stmt.name = stmt.table_name;
234 create_stmt.if_not_exists = false;
235
236 let table_info = table_ref.table_info();
237 let partitions = create_partitions_stmt(&table_info, &partition_info.partitions)?.and_then(
238 |mut partitions| {
239 if !partitions.column_list.is_empty() {
240 partitions.set_quote(quote_style);
241 Some(partitions)
242 } else {
243 None
244 }
245 },
246 );
247
248 let create_expr = &mut expr_helper::create_to_expr(&create_stmt, &ctx)?;
249 self.create_table_inner(create_expr, partitions, ctx).await
250 }
251
252 #[tracing::instrument(skip_all)]
253 pub async fn create_external_table(
254 &self,
255 create_expr: CreateExternalTable,
256 ctx: QueryContextRef,
257 ) -> Result<TableRef> {
258 let create_expr = &mut expr_helper::create_external_expr(create_expr, &ctx).await?;
259 self.create_table_inner(create_expr, None, ctx).await
260 }
261
262 #[tracing::instrument(skip_all)]
263 pub async fn create_table_inner(
264 &self,
265 create_table: &mut CreateTableExpr,
266 partitions: Option<Partitions>,
267 query_ctx: QueryContextRef,
268 ) -> Result<TableRef> {
269 ensure!(
270 !is_readonly_schema(&create_table.schema_name),
271 SchemaReadOnlySnafu {
272 name: create_table.schema_name.clone()
273 }
274 );
275
276 if create_table.engine == METRIC_ENGINE_NAME
277 && create_table
278 .table_options
279 .contains_key(LOGICAL_TABLE_METADATA_KEY)
280 {
281 if let Some(partitions) = partitions.as_ref()
282 && !partitions.exprs.is_empty()
283 {
284 self.validate_logical_table_partition_rule(create_table, partitions, &query_ctx)
285 .await?;
286 }
287 self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
289 .await?
290 .into_iter()
291 .next()
292 .context(error::UnexpectedSnafu {
293 violated: "expected to create logical tables",
294 })
295 } else {
296 self.create_non_logic_table(create_table, partitions, query_ctx)
298 .await
299 }
300 }
301
302 #[tracing::instrument(skip_all)]
303 pub async fn create_non_logic_table(
304 &self,
305 create_table: &mut CreateTableExpr,
306 partitions: Option<Partitions>,
307 query_ctx: QueryContextRef,
308 ) -> Result<TableRef> {
309 let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
310
311 let schema = self
313 .table_metadata_manager
314 .schema_manager()
315 .get(SchemaNameKey::new(
316 &create_table.catalog_name,
317 &create_table.schema_name,
318 ))
319 .await
320 .context(TableMetadataManagerSnafu)?;
321 ensure!(
322 schema.is_some(),
323 SchemaNotFoundSnafu {
324 schema_info: &create_table.schema_name,
325 }
326 );
327
328 if let Some(table) = self
330 .catalog_manager
331 .table(
332 &create_table.catalog_name,
333 &create_table.schema_name,
334 &create_table.table_name,
335 Some(&query_ctx),
336 )
337 .await
338 .context(CatalogSnafu)?
339 {
340 return if create_table.create_if_not_exists {
341 Ok(table)
342 } else {
343 TableAlreadyExistsSnafu {
344 table: format_full_table_name(
345 &create_table.catalog_name,
346 &create_table.schema_name,
347 &create_table.table_name,
348 ),
349 }
350 .fail()
351 };
352 }
353
354 ensure!(
355 NAME_PATTERN_REG.is_match(&create_table.table_name),
356 InvalidTableNameSnafu {
357 table_name: &create_table.table_name,
358 }
359 );
360
361 let table_name = TableName::new(
362 &create_table.catalog_name,
363 &create_table.schema_name,
364 &create_table.table_name,
365 );
366
367 let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
368 let mut table_info = create_table_info(create_table, partition_cols)?;
369
370 let resp = self
371 .create_table_procedure(
372 create_table.clone(),
373 partitions,
374 table_info.clone(),
375 query_ctx,
376 )
377 .await?;
378
379 let table_id = resp
380 .table_ids
381 .into_iter()
382 .next()
383 .context(error::UnexpectedSnafu {
384 violated: "expected table_id",
385 })?;
386 info!("Successfully created table '{table_name}' with table id {table_id}");
387
388 table_info.ident.table_id = table_id;
389
390 let table_info = Arc::new(table_info);
391 create_table.table_id = Some(api::v1::TableId { id: table_id });
392
393 let table = DistTable::table(table_info);
394
395 Ok(table)
396 }
397
398 #[tracing::instrument(skip_all)]
399 pub async fn create_logical_tables(
400 &self,
401 create_table_exprs: &[CreateTableExpr],
402 query_context: QueryContextRef,
403 ) -> Result<Vec<TableRef>> {
404 let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
405 ensure!(
406 !create_table_exprs.is_empty(),
407 EmptyDdlExprSnafu {
408 name: "create logic tables"
409 }
410 );
411
412 for create_table in create_table_exprs {
414 ensure!(
415 NAME_PATTERN_REG.is_match(&create_table.table_name),
416 InvalidTableNameSnafu {
417 table_name: &create_table.table_name,
418 }
419 );
420 }
421
422 let raw_tables_info = create_table_exprs
423 .iter()
424 .map(|create| create_table_info(create, vec![]))
425 .collect::<Result<Vec<_>>>()?;
426 let tables_data = create_table_exprs
427 .iter()
428 .cloned()
429 .zip(raw_tables_info.iter().cloned())
430 .collect::<Vec<_>>();
431
432 let resp = self
433 .create_logical_tables_procedure(tables_data, query_context.clone())
434 .await?;
435
436 let table_ids = resp.table_ids;
437 ensure!(
438 table_ids.len() == raw_tables_info.len(),
439 CreateLogicalTablesSnafu {
440 reason: format!(
441 "The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}",
442 raw_tables_info.len(),
443 table_ids.len()
444 )
445 }
446 );
447 info!("Successfully created logical tables: {:?}", table_ids);
448
449 let mut tables_info = Vec::with_capacity(table_ids.len());
453 for (table_id, create_table) in table_ids.iter().zip(create_table_exprs.iter()) {
454 let table = self
455 .catalog_manager
456 .table(
457 &create_table.catalog_name,
458 &create_table.schema_name,
459 &create_table.table_name,
460 Some(&query_context),
461 )
462 .await
463 .context(CatalogSnafu)?
464 .with_context(|| TableNotFoundSnafu {
465 table_name: format_full_table_name(
466 &create_table.catalog_name,
467 &create_table.schema_name,
468 &create_table.table_name,
469 ),
470 })?;
471
472 let table_info = table.table_info();
473 ensure!(
475 table_info.table_id() == *table_id,
476 CreateLogicalTablesSnafu {
477 reason: format!(
478 "Table id mismatch after creation, expected {}, got {} for table {}",
479 table_id,
480 table_info.table_id(),
481 format_full_table_name(
482 &create_table.catalog_name,
483 &create_table.schema_name,
484 &create_table.table_name
485 )
486 )
487 }
488 );
489
490 tables_info.push(table_info);
491 }
492
493 Ok(tables_info.into_iter().map(DistTable::table).collect())
494 }
495
496 async fn validate_logical_table_partition_rule(
497 &self,
498 create_table: &CreateTableExpr,
499 partitions: &Partitions,
500 query_ctx: &QueryContextRef,
501 ) -> Result<()> {
502 let (_, mut logical_partition_exprs) =
503 parse_partitions_for_logical_validation(create_table, partitions, query_ctx)?;
504
505 let physical_table_name = create_table
506 .table_options
507 .get(LOGICAL_TABLE_METADATA_KEY)
508 .with_context(|| CreateLogicalTablesSnafu {
509 reason: format!(
510 "expect `{LOGICAL_TABLE_METADATA_KEY}` option on creating logical table"
511 ),
512 })?;
513
514 let physical_table = self
515 .catalog_manager
516 .table(
517 &create_table.catalog_name,
518 &create_table.schema_name,
519 physical_table_name,
520 Some(query_ctx),
521 )
522 .await
523 .context(CatalogSnafu)?
524 .context(TableNotFoundSnafu {
525 table_name: physical_table_name.clone(),
526 })?;
527
528 let physical_table_info = physical_table.table_info();
529 let (partition_rule, _) = self
530 .partition_manager
531 .find_table_partition_rule(&physical_table_info)
532 .await
533 .context(error::FindTablePartitionRuleSnafu {
534 table_name: physical_table_name.clone(),
535 })?;
536
537 let multi_dim_rule = partition_rule
538 .as_ref()
539 .as_any()
540 .downcast_ref::<MultiDimPartitionRule>()
541 .context(InvalidPartitionRuleSnafu {
542 reason: "physical table partition rule is not range-based",
543 })?;
544
545 let mut physical_partition_exprs = multi_dim_rule.exprs().to_vec();
547 logical_partition_exprs.sort_unstable();
548 physical_partition_exprs.sort_unstable();
549
550 ensure!(
551 physical_partition_exprs == logical_partition_exprs,
552 InvalidPartitionRuleSnafu {
553 reason: format!(
554 "logical table partition rule must match the corresponding physical table's\n logical table partition exprs:\t\t {:?}\n physical table partition exprs:\t {:?}",
555 logical_partition_exprs, physical_partition_exprs
556 ),
557 }
558 );
559
560 Ok(())
561 }
562
563 #[cfg(feature = "enterprise")]
564 #[tracing::instrument(skip_all)]
565 pub async fn create_trigger(
566 &self,
567 stmt: CreateTrigger,
568 query_context: QueryContextRef,
569 ) -> Result<Output> {
570 let expr = expr_helper::to_create_trigger_task_expr(stmt, &query_context)?;
571 self.create_trigger_inner(expr, query_context).await
572 }
573
574 #[cfg(feature = "enterprise")]
575 pub async fn create_trigger_inner(
576 &self,
577 expr: PbCreateTriggerExpr,
578 query_context: QueryContextRef,
579 ) -> Result<Output> {
580 self.create_trigger_procedure(expr, query_context).await?;
581 Ok(Output::new_with_affected_rows(0))
582 }
583
584 #[cfg(feature = "enterprise")]
585 async fn create_trigger_procedure(
586 &self,
587 expr: PbCreateTriggerExpr,
588 query_context: QueryContextRef,
589 ) -> Result<SubmitDdlTaskResponse> {
590 let task = CreateTriggerTask::try_from(PbCreateTriggerTask {
591 create_trigger: Some(expr),
592 })
593 .context(error::InvalidExprSnafu)?;
594
595 let request = SubmitDdlTaskRequest::new(
596 to_meta_query_context(query_context),
597 DdlTask::new_create_trigger(task),
598 );
599
600 self.procedure_executor
601 .submit_ddl_task(&ExecutorContext::default(), request)
602 .await
603 .context(error::ExecuteDdlSnafu)
604 }
605
606 #[tracing::instrument(skip_all)]
607 pub async fn create_flow(
608 &self,
609 stmt: CreateFlow,
610 query_context: QueryContextRef,
611 ) -> Result<Output> {
612 let expr = expr_helper::to_create_flow_task_expr(stmt, &query_context)?;
614
615 self.create_flow_inner(expr, query_context).await
616 }
617
618 pub async fn create_flow_inner(
619 &self,
620 expr: CreateFlowExpr,
621 query_context: QueryContextRef,
622 ) -> Result<Output> {
623 self.create_flow_procedure(expr, query_context).await?;
624 Ok(Output::new_with_affected_rows(0))
625 }
626
627 async fn create_flow_procedure(
628 &self,
629 expr: CreateFlowExpr,
630 query_context: QueryContextRef,
631 ) -> Result<SubmitDdlTaskResponse> {
632 let flow_type = self
633 .determine_flow_type(&expr, query_context.clone())
634 .await?;
635 info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
636
637 let expr = {
638 let mut expr = expr;
639 expr.flow_options
640 .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
641 expr
642 };
643
644 let task = CreateFlowTask::try_from(PbCreateFlowTask {
645 create_flow: Some(expr),
646 })
647 .context(error::InvalidExprSnafu)?;
648 let request = SubmitDdlTaskRequest::new(
649 to_meta_query_context(query_context),
650 DdlTask::new_create_flow(task),
651 );
652
653 self.procedure_executor
654 .submit_ddl_task(&ExecutorContext::default(), request)
655 .await
656 .context(error::ExecuteDdlSnafu)
657 }
658
659 async fn determine_flow_type(
663 &self,
664 expr: &CreateFlowExpr,
665 query_ctx: QueryContextRef,
666 ) -> Result<FlowType> {
667 for src_table_name in &expr.source_table_names {
669 let table = self
670 .catalog_manager()
671 .table(
672 &src_table_name.catalog_name,
673 &src_table_name.schema_name,
674 &src_table_name.table_name,
675 Some(&query_ctx),
676 )
677 .await
678 .map_err(BoxedError::new)
679 .context(ExternalSnafu)?
680 .with_context(|| TableNotFoundSnafu {
681 table_name: format_full_table_name(
682 &src_table_name.catalog_name,
683 &src_table_name.schema_name,
684 &src_table_name.table_name,
685 ),
686 })?;
687
688 if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
690 warn!(
691 "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
692 format_full_table_name(
693 &src_table_name.catalog_name,
694 &src_table_name.schema_name,
695 &src_table_name.table_name
696 ),
697 expr.flow_name
698 );
699 return Ok(FlowType::Streaming);
700 }
701 }
702
703 let engine = &self.query_engine;
704 let stmts = ParserContext::create_with_dialect(
705 &expr.sql,
706 query_ctx.sql_dialect(),
707 ParseOptions::default(),
708 )
709 .map_err(BoxedError::new)
710 .context(ExternalSnafu)?;
711
712 ensure!(
713 stmts.len() == 1,
714 InvalidSqlSnafu {
715 err_msg: format!("Expect only one statement, found {}", stmts.len())
716 }
717 );
718 let stmt = &stmts[0];
719
720 if is_tql(query_ctx.sql_dialect(), &expr.sql)
721 .map_err(BoxedError::new)
722 .context(ExternalSnafu)?
723 {
724 return Ok(FlowType::Batching);
725 }
726
727 let plan = match stmt {
729 Statement::Tql(_) => return Ok(FlowType::Batching),
731 _ => engine
732 .planner()
733 .plan(&QueryStatement::Sql(stmt.clone()), query_ctx)
734 .await
735 .map_err(BoxedError::new)
736 .context(ExternalSnafu)?,
737 };
738
739 struct FindAggr {
741 is_aggr: bool,
742 }
743
744 impl TreeNodeVisitor<'_> for FindAggr {
745 type Node = LogicalPlan;
746 fn f_down(
747 &mut self,
748 node: &Self::Node,
749 ) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
750 {
751 match node {
752 LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
753 self.is_aggr = true;
754 return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
755 }
756 _ => (),
757 }
758 Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
759 }
760 }
761
762 let mut find_aggr = FindAggr { is_aggr: false };
763
764 plan.visit_with_subqueries(&mut find_aggr)
765 .context(BuildDfLogicalPlanSnafu)?;
766 if find_aggr.is_aggr {
767 Ok(FlowType::Batching)
768 } else {
769 Ok(FlowType::Streaming)
770 }
771 }
772
773 #[tracing::instrument(skip_all)]
774 pub async fn create_view(
775 &self,
776 create_view: CreateView,
777 ctx: QueryContextRef,
778 ) -> Result<TableRef> {
779 let logical_plan = match &*create_view.query {
781 Statement::Query(query) => {
782 self.plan(
783 &QueryStatement::Sql(Statement::Query(query.clone())),
784 ctx.clone(),
785 )
786 .await?
787 }
788 Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?,
789 _ => {
790 return InvalidViewStmtSnafu {}.fail();
791 }
792 };
793 let definition = create_view.to_string();
795
796 let schema: Schema = logical_plan
799 .schema()
800 .clone()
801 .try_into()
802 .context(ConvertSchemaSnafu)?;
803 let plan_columns: Vec<_> = schema
804 .column_schemas()
805 .iter()
806 .map(|c| c.name.clone())
807 .collect();
808
809 let columns: Vec<_> = create_view
810 .columns
811 .iter()
812 .map(|ident| ident.to_string())
813 .collect();
814
815 if !columns.is_empty() {
817 ensure!(
818 columns.len() == plan_columns.len(),
819 error::ViewColumnsMismatchSnafu {
820 view_name: create_view.name.to_string(),
821 expected: plan_columns.len(),
822 actual: columns.len(),
823 }
824 );
825 }
826
827 let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
830 .context(ExtractTableNamesSnafu)?;
831
832 let table_names = table_names.into_iter().map(|t| t.into()).collect();
833
834 let encoded_plan = DFLogicalSubstraitConvertor
841 .encode(&plan, DefaultSerializer)
842 .context(SubstraitCodecSnafu)?;
843
844 let expr = expr_helper::to_create_view_expr(
845 create_view,
846 encoded_plan.to_vec(),
847 table_names,
848 columns,
849 plan_columns,
850 definition,
851 ctx.clone(),
852 )?;
853
854 self.create_view_by_expr(expr, ctx).await
856 }
857
858 pub async fn create_view_by_expr(
859 &self,
860 expr: CreateViewExpr,
861 ctx: QueryContextRef,
862 ) -> Result<TableRef> {
863 ensure! {
864 !(expr.create_if_not_exists & expr.or_replace),
865 InvalidSqlSnafu {
866 err_msg: "syntax error Create Or Replace and If Not Exist cannot be used together",
867 }
868 };
869 let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
870
871 let schema_exists = self
872 .table_metadata_manager
873 .schema_manager()
874 .exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name))
875 .await
876 .context(TableMetadataManagerSnafu)?;
877
878 ensure!(
879 schema_exists,
880 SchemaNotFoundSnafu {
881 schema_info: &expr.schema_name,
882 }
883 );
884
885 if let Some(table) = self
887 .catalog_manager
888 .table(
889 &expr.catalog_name,
890 &expr.schema_name,
891 &expr.view_name,
892 Some(&ctx),
893 )
894 .await
895 .context(CatalogSnafu)?
896 {
897 let table_type = table.table_info().table_type;
898
899 match (table_type, expr.create_if_not_exists, expr.or_replace) {
900 (TableType::View, true, false) => {
901 return Ok(table);
902 }
903 (TableType::View, false, false) => {
904 return ViewAlreadyExistsSnafu {
905 name: format_full_table_name(
906 &expr.catalog_name,
907 &expr.schema_name,
908 &expr.view_name,
909 ),
910 }
911 .fail();
912 }
913 (TableType::View, _, true) => {
914 }
916 _ => {
917 return TableAlreadyExistsSnafu {
918 table: format_full_table_name(
919 &expr.catalog_name,
920 &expr.schema_name,
921 &expr.view_name,
922 ),
923 }
924 .fail();
925 }
926 }
927 }
928
929 ensure!(
930 NAME_PATTERN_REG.is_match(&expr.view_name),
931 InvalidViewNameSnafu {
932 name: expr.view_name.clone(),
933 }
934 );
935
936 let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
937
938 let mut view_info = TableInfo {
939 ident: metadata::TableIdent {
940 table_id: 0,
942 version: 0,
943 },
944 name: expr.view_name.clone(),
945 desc: None,
946 catalog_name: expr.catalog_name.clone(),
947 schema_name: expr.schema_name.clone(),
948 meta: TableMeta::empty(),
950 table_type: TableType::View,
951 };
952
953 let request = SubmitDdlTaskRequest::new(
954 to_meta_query_context(ctx),
955 DdlTask::new_create_view(expr, view_info.clone()),
956 );
957
958 let resp = self
959 .procedure_executor
960 .submit_ddl_task(&ExecutorContext::default(), request)
961 .await
962 .context(error::ExecuteDdlSnafu)?;
963
964 debug!(
965 "Submit creating view '{view_name}' task response: {:?}",
966 resp
967 );
968
969 let view_id = resp
970 .table_ids
971 .into_iter()
972 .next()
973 .context(error::UnexpectedSnafu {
974 violated: "expected table_id",
975 })?;
976 info!("Successfully created view '{view_name}' with view id {view_id}");
977
978 view_info.ident.table_id = view_id;
979
980 let view_info = Arc::new(view_info);
981
982 let table = DistTable::table(view_info);
983
984 self.cache_invalidator
986 .invalidate(
987 &Context::default(),
988 &[
989 CacheIdent::TableId(view_id),
990 CacheIdent::TableName(view_name.clone()),
991 ],
992 )
993 .await
994 .context(error::InvalidateTableCacheSnafu)?;
995
996 Ok(table)
997 }
998
999 #[tracing::instrument(skip_all)]
1000 pub async fn drop_flow(
1001 &self,
1002 catalog_name: String,
1003 flow_name: String,
1004 drop_if_exists: bool,
1005 query_context: QueryContextRef,
1006 ) -> Result<Output> {
1007 if let Some(flow) = self
1008 .flow_metadata_manager
1009 .flow_name_manager()
1010 .get(&catalog_name, &flow_name)
1011 .await
1012 .context(error::TableMetadataManagerSnafu)?
1013 {
1014 let flow_id = flow.flow_id();
1015 let task = DropFlowTask {
1016 catalog_name,
1017 flow_name,
1018 flow_id,
1019 drop_if_exists,
1020 };
1021 self.drop_flow_procedure(task, query_context).await?;
1022
1023 Ok(Output::new_with_affected_rows(0))
1024 } else if drop_if_exists {
1025 Ok(Output::new_with_affected_rows(0))
1026 } else {
1027 FlowNotFoundSnafu {
1028 flow_name: format_full_flow_name(&catalog_name, &flow_name),
1029 }
1030 .fail()
1031 }
1032 }
1033
1034 async fn drop_flow_procedure(
1035 &self,
1036 expr: DropFlowTask,
1037 query_context: QueryContextRef,
1038 ) -> Result<SubmitDdlTaskResponse> {
1039 let request = SubmitDdlTaskRequest::new(
1040 to_meta_query_context(query_context),
1041 DdlTask::new_drop_flow(expr),
1042 );
1043
1044 self.procedure_executor
1045 .submit_ddl_task(&ExecutorContext::default(), request)
1046 .await
1047 .context(error::ExecuteDdlSnafu)
1048 }
1049
1050 #[cfg(feature = "enterprise")]
1051 #[tracing::instrument(skip_all)]
1052 pub(super) async fn drop_trigger(
1053 &self,
1054 catalog_name: String,
1055 trigger_name: String,
1056 drop_if_exists: bool,
1057 query_context: QueryContextRef,
1058 ) -> Result<Output> {
1059 let task = DropTriggerTask {
1060 catalog_name,
1061 trigger_name,
1062 drop_if_exists,
1063 };
1064 self.drop_trigger_procedure(task, query_context).await?;
1065 Ok(Output::new_with_affected_rows(0))
1066 }
1067
1068 #[cfg(feature = "enterprise")]
1069 async fn drop_trigger_procedure(
1070 &self,
1071 expr: DropTriggerTask,
1072 query_context: QueryContextRef,
1073 ) -> Result<SubmitDdlTaskResponse> {
1074 let request = SubmitDdlTaskRequest::new(
1075 to_meta_query_context(query_context),
1076 DdlTask::new_drop_trigger(expr),
1077 );
1078
1079 self.procedure_executor
1080 .submit_ddl_task(&ExecutorContext::default(), request)
1081 .await
1082 .context(error::ExecuteDdlSnafu)
1083 }
1084
1085 #[tracing::instrument(skip_all)]
1087 pub(crate) async fn drop_view(
1088 &self,
1089 catalog: String,
1090 schema: String,
1091 view: String,
1092 drop_if_exists: bool,
1093 query_context: QueryContextRef,
1094 ) -> Result<Output> {
1095 let view_info = if let Some(view) = self
1096 .catalog_manager
1097 .table(&catalog, &schema, &view, None)
1098 .await
1099 .context(CatalogSnafu)?
1100 {
1101 view.table_info()
1102 } else if drop_if_exists {
1103 return Ok(Output::new_with_affected_rows(0));
1105 } else {
1106 return TableNotFoundSnafu {
1107 table_name: format_full_table_name(&catalog, &schema, &view),
1108 }
1109 .fail();
1110 };
1111
1112 ensure!(
1114 view_info.table_type == TableType::View,
1115 error::InvalidViewSnafu {
1116 msg: "not a view",
1117 view_name: format_full_table_name(&catalog, &schema, &view),
1118 }
1119 );
1120
1121 let view_id = view_info.table_id();
1122
1123 let task = DropViewTask {
1124 catalog,
1125 schema,
1126 view,
1127 view_id,
1128 drop_if_exists,
1129 };
1130
1131 self.drop_view_procedure(task, query_context).await?;
1132
1133 Ok(Output::new_with_affected_rows(0))
1134 }
1135
1136 async fn drop_view_procedure(
1138 &self,
1139 expr: DropViewTask,
1140 query_context: QueryContextRef,
1141 ) -> Result<SubmitDdlTaskResponse> {
1142 let request = SubmitDdlTaskRequest::new(
1143 to_meta_query_context(query_context),
1144 DdlTask::new_drop_view(expr),
1145 );
1146
1147 self.procedure_executor
1148 .submit_ddl_task(&ExecutorContext::default(), request)
1149 .await
1150 .context(error::ExecuteDdlSnafu)
1151 }
1152
1153 #[tracing::instrument(skip_all)]
1154 pub async fn alter_logical_tables(
1155 &self,
1156 alter_table_exprs: Vec<AlterTableExpr>,
1157 query_context: QueryContextRef,
1158 ) -> Result<Output> {
1159 let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
1160 ensure!(
1161 !alter_table_exprs.is_empty(),
1162 EmptyDdlExprSnafu {
1163 name: "alter logical tables"
1164 }
1165 );
1166
1167 let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
1169 for expr in alter_table_exprs {
1170 let catalog = if expr.catalog_name.is_empty() {
1172 query_context.current_catalog()
1173 } else {
1174 &expr.catalog_name
1175 };
1176 let schema = if expr.schema_name.is_empty() {
1177 query_context.current_schema()
1178 } else {
1179 expr.schema_name.clone()
1180 };
1181 let table_name = &expr.table_name;
1182 let table = self
1183 .catalog_manager
1184 .table(catalog, &schema, table_name, Some(&query_context))
1185 .await
1186 .context(CatalogSnafu)?
1187 .with_context(|| TableNotFoundSnafu {
1188 table_name: format_full_table_name(catalog, &schema, table_name),
1189 })?;
1190 let table_id = table.table_info().ident.table_id;
1191 let physical_table_id = self
1192 .table_metadata_manager
1193 .table_route_manager()
1194 .get_physical_table_id(table_id)
1195 .await
1196 .context(TableMetadataManagerSnafu)?;
1197 groups.entry(physical_table_id).or_default().push(expr);
1198 }
1199
1200 let mut handles = Vec::with_capacity(groups.len());
1202 for (_physical_table_id, exprs) in groups {
1203 let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
1204 handles.push(fut);
1205 }
1206 let _results = futures::future::try_join_all(handles).await?;
1207
1208 Ok(Output::new_with_affected_rows(0))
1209 }
1210
1211 #[tracing::instrument(skip_all)]
1212 pub async fn drop_table(
1213 &self,
1214 table_name: TableName,
1215 drop_if_exists: bool,
1216 query_context: QueryContextRef,
1217 ) -> Result<Output> {
1218 self.drop_tables(&[table_name], drop_if_exists, query_context)
1220 .await
1221 }
1222
1223 #[tracing::instrument(skip_all)]
1224 pub async fn drop_tables(
1225 &self,
1226 table_names: &[TableName],
1227 drop_if_exists: bool,
1228 query_context: QueryContextRef,
1229 ) -> Result<Output> {
1230 let mut tables = Vec::with_capacity(table_names.len());
1231 for table_name in table_names {
1232 ensure!(
1233 !is_readonly_schema(&table_name.schema_name),
1234 SchemaReadOnlySnafu {
1235 name: table_name.schema_name.clone()
1236 }
1237 );
1238
1239 if let Some(table) = self
1240 .catalog_manager
1241 .table(
1242 &table_name.catalog_name,
1243 &table_name.schema_name,
1244 &table_name.table_name,
1245 Some(&query_context),
1246 )
1247 .await
1248 .context(CatalogSnafu)?
1249 {
1250 tables.push(table.table_info().table_id());
1251 } else if drop_if_exists {
1252 continue;
1254 } else {
1255 return TableNotFoundSnafu {
1256 table_name: table_name.to_string(),
1257 }
1258 .fail();
1259 }
1260 }
1261
1262 for (table_name, table_id) in table_names.iter().zip(tables.into_iter()) {
1263 self.drop_table_procedure(table_name, table_id, drop_if_exists, query_context.clone())
1264 .await?;
1265
1266 self.cache_invalidator
1268 .invalidate(
1269 &Context::default(),
1270 &[
1271 CacheIdent::TableId(table_id),
1272 CacheIdent::TableName(table_name.clone()),
1273 ],
1274 )
1275 .await
1276 .context(error::InvalidateTableCacheSnafu)?;
1277 }
1278 Ok(Output::new_with_affected_rows(0))
1279 }
1280
1281 #[tracing::instrument(skip_all)]
1282 pub async fn drop_database(
1283 &self,
1284 catalog: String,
1285 schema: String,
1286 drop_if_exists: bool,
1287 query_context: QueryContextRef,
1288 ) -> Result<Output> {
1289 ensure!(
1290 !is_readonly_schema(&schema),
1291 SchemaReadOnlySnafu { name: schema }
1292 );
1293
1294 if self
1295 .catalog_manager
1296 .schema_exists(&catalog, &schema, None)
1297 .await
1298 .context(CatalogSnafu)?
1299 {
1300 if schema == query_context.current_schema() {
1301 SchemaInUseSnafu { name: schema }.fail()
1302 } else {
1303 self.drop_database_procedure(catalog, schema, drop_if_exists, query_context)
1304 .await?;
1305
1306 Ok(Output::new_with_affected_rows(0))
1307 }
1308 } else if drop_if_exists {
1309 Ok(Output::new_with_affected_rows(0))
1311 } else {
1312 SchemaNotFoundSnafu {
1313 schema_info: schema,
1314 }
1315 .fail()
1316 }
1317 }
1318
1319 #[tracing::instrument(skip_all)]
1320 pub async fn truncate_table(
1321 &self,
1322 table_name: TableName,
1323 time_ranges: Vec<(Timestamp, Timestamp)>,
1324 query_context: QueryContextRef,
1325 ) -> Result<Output> {
1326 ensure!(
1327 !is_readonly_schema(&table_name.schema_name),
1328 SchemaReadOnlySnafu {
1329 name: table_name.schema_name.clone()
1330 }
1331 );
1332
1333 let table = self
1334 .catalog_manager
1335 .table(
1336 &table_name.catalog_name,
1337 &table_name.schema_name,
1338 &table_name.table_name,
1339 Some(&query_context),
1340 )
1341 .await
1342 .context(CatalogSnafu)?
1343 .with_context(|| TableNotFoundSnafu {
1344 table_name: table_name.to_string(),
1345 })?;
1346 let table_id = table.table_info().table_id();
1347 self.truncate_table_procedure(&table_name, table_id, time_ranges, query_context)
1348 .await?;
1349
1350 Ok(Output::new_with_affected_rows(0))
1351 }
1352
1353 #[tracing::instrument(skip_all)]
1354 pub async fn alter_table(
1355 &self,
1356 alter_table: AlterTable,
1357 query_context: QueryContextRef,
1358 ) -> Result<Output> {
1359 if matches!(
1360 alter_table.alter_operation(),
1361 AlterTableOperation::Repartition { .. }
1362 ) {
1363 let request = expr_helper::to_repartition_request(alter_table, &query_context)?;
1364 return self.repartition_table(request, &query_context).await;
1365 }
1366
1367 let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
1368 self.alter_table_inner(expr, query_context).await
1369 }
1370
1371 #[tracing::instrument(skip_all)]
1372 pub async fn repartition_table(
1373 &self,
1374 request: RepartitionRequest,
1375 query_context: &QueryContextRef,
1376 ) -> Result<Output> {
1377 ensure!(
1379 !is_readonly_schema(&request.schema_name),
1380 SchemaReadOnlySnafu {
1381 name: request.schema_name.clone()
1382 }
1383 );
1384
1385 let table_ref = TableReference::full(
1386 &request.catalog_name,
1387 &request.schema_name,
1388 &request.table_name,
1389 );
1390 let table = self
1392 .catalog_manager
1393 .table(
1394 &request.catalog_name,
1395 &request.schema_name,
1396 &request.table_name,
1397 Some(query_context),
1398 )
1399 .await
1400 .context(CatalogSnafu)?
1401 .with_context(|| TableNotFoundSnafu {
1402 table_name: table_ref.to_string(),
1403 })?;
1404 let table_id = table.table_info().ident.table_id;
1405 let (physical_table_id, physical_table_route) = self
1407 .table_metadata_manager
1408 .table_route_manager()
1409 .get_physical_table_route(table_id)
1410 .await
1411 .context(TableMetadataManagerSnafu)?;
1412
1413 ensure!(
1414 physical_table_id == table_id,
1415 NotSupportedSnafu {
1416 feat: "REPARTITION on logical tables"
1417 }
1418 );
1419
1420 let table_info = table.table_info();
1421 let existing_partition_columns = table_info.meta.partition_columns().collect::<Vec<_>>();
1423 ensure!(
1425 !existing_partition_columns.is_empty(),
1426 InvalidPartitionRuleSnafu {
1427 reason: format!(
1428 "table {} does not have partition columns, cannot repartition",
1429 table_ref
1430 )
1431 }
1432 );
1433
1434 let column_name_and_type = existing_partition_columns
1437 .iter()
1438 .map(|column| (&column.name, column.data_type.clone()))
1439 .collect();
1440 let timezone = query_context.timezone();
1441 let from_partition_exprs = request
1443 .from_exprs
1444 .iter()
1445 .map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone))
1446 .collect::<Result<Vec<_>>>()?;
1447
1448 let mut into_partition_exprs = request
1449 .into_exprs
1450 .iter()
1451 .map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone))
1452 .collect::<Result<Vec<_>>>()?;
1453
1454 if from_partition_exprs.len() > 1
1457 && into_partition_exprs.len() == 1
1458 && let Some(expr) = into_partition_exprs.pop()
1459 {
1460 into_partition_exprs.push(partition::simplify::simplify_merged_partition_expr(expr));
1461 }
1462
1463 let mut existing_partition_exprs =
1465 Vec::with_capacity(physical_table_route.region_routes.len());
1466 for route in &physical_table_route.region_routes {
1467 let expr_json = route.region.partition_expr();
1468 if !expr_json.is_empty() {
1469 match PartitionExpr::from_json_str(&expr_json) {
1470 Ok(Some(expr)) => existing_partition_exprs.push(expr),
1471 Ok(None) => {
1472 }
1474 Err(e) => {
1475 return Err(e).context(DeserializePartitionExprSnafu);
1476 }
1477 }
1478 }
1479 }
1480
1481 for from_expr in &from_partition_exprs {
1484 ensure!(
1485 existing_partition_exprs.contains(from_expr),
1486 InvalidPartitionRuleSnafu {
1487 reason: format!(
1488 "partition expression '{}' does not exist in table {}",
1489 from_expr, table_ref
1490 )
1491 }
1492 );
1493 }
1494
1495 let new_partition_exprs: Vec<PartitionExpr> = existing_partition_exprs
1498 .into_iter()
1499 .filter(|expr| !from_partition_exprs.contains(expr))
1500 .chain(into_partition_exprs.clone().into_iter())
1501 .collect();
1502 let new_partition_exprs_len = new_partition_exprs.len();
1503 let from_partition_exprs_len = from_partition_exprs.len();
1504
1505 let _ = MultiDimPartitionRule::try_new(
1507 existing_partition_columns
1508 .iter()
1509 .map(|c| c.name.clone())
1510 .collect(),
1511 vec![],
1512 new_partition_exprs,
1513 true,
1514 )
1515 .context(InvalidPartitionSnafu)?;
1516
1517 let ddl_options = parse_ddl_options(&request.options)?;
1518 let serialize_exprs = |exprs: Vec<PartitionExpr>| -> Result<Vec<String>> {
1519 let mut json_exprs = Vec::with_capacity(exprs.len());
1520 for expr in exprs {
1521 json_exprs.push(expr.as_json_str().context(SerializePartitionExprSnafu)?);
1522 }
1523 Ok(json_exprs)
1524 };
1525 let from_partition_exprs_json = serialize_exprs(from_partition_exprs)?;
1526 let into_partition_exprs_json = serialize_exprs(into_partition_exprs)?;
1527 let mut req = SubmitDdlTaskRequest::new(
1528 to_meta_query_context(query_context.clone()),
1529 DdlTask::new_alter_table(AlterTableExpr {
1530 catalog_name: request.catalog_name.clone(),
1531 schema_name: request.schema_name.clone(),
1532 table_name: request.table_name.clone(),
1533 kind: Some(Kind::Repartition(Repartition {
1534 from_partition_exprs: from_partition_exprs_json,
1535 into_partition_exprs: into_partition_exprs_json,
1536 })),
1537 }),
1538 );
1539 req.wait = ddl_options.wait;
1540 req.timeout = ddl_options.timeout;
1541
1542 info!(
1543 "Submitting repartition task for table {} (table_id={}), from {} to {} partitions, timeout: {:?}, wait: {}",
1544 table_ref,
1545 table_id,
1546 from_partition_exprs_len,
1547 new_partition_exprs_len,
1548 ddl_options.timeout,
1549 ddl_options.wait
1550 );
1551
1552 let response = self
1553 .procedure_executor
1554 .submit_ddl_task(&ExecutorContext::default(), req)
1555 .await
1556 .context(error::ExecuteDdlSnafu)?;
1557
1558 if !ddl_options.wait {
1559 return build_procedure_id_output(response.key);
1560 }
1561
1562 let invalidate_keys = vec![
1564 CacheIdent::TableId(table_id),
1565 CacheIdent::TableName(TableName::new(
1566 request.catalog_name,
1567 request.schema_name,
1568 request.table_name,
1569 )),
1570 ];
1571
1572 self.cache_invalidator
1574 .invalidate(&Context::default(), &invalidate_keys)
1575 .await
1576 .context(error::InvalidateTableCacheSnafu)?;
1577
1578 Ok(Output::new_with_affected_rows(0))
1579 }
1580
1581 #[tracing::instrument(skip_all)]
1582 pub async fn alter_table_inner(
1583 &self,
1584 expr: AlterTableExpr,
1585 query_context: QueryContextRef,
1586 ) -> Result<Output> {
1587 ensure!(
1588 !is_readonly_schema(&expr.schema_name),
1589 SchemaReadOnlySnafu {
1590 name: expr.schema_name.clone()
1591 }
1592 );
1593
1594 let catalog_name = if expr.catalog_name.is_empty() {
1595 DEFAULT_CATALOG_NAME.to_string()
1596 } else {
1597 expr.catalog_name.clone()
1598 };
1599
1600 let schema_name = if expr.schema_name.is_empty() {
1601 DEFAULT_SCHEMA_NAME.to_string()
1602 } else {
1603 expr.schema_name.clone()
1604 };
1605
1606 let table_name = expr.table_name.clone();
1607
1608 let table = self
1609 .catalog_manager
1610 .table(
1611 &catalog_name,
1612 &schema_name,
1613 &table_name,
1614 Some(&query_context),
1615 )
1616 .await
1617 .context(CatalogSnafu)?
1618 .with_context(|| TableNotFoundSnafu {
1619 table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
1620 })?;
1621
1622 let table_id = table.table_info().ident.table_id;
1623 let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?;
1624 if !need_alter {
1625 return Ok(Output::new_with_affected_rows(0));
1626 }
1627 info!(
1628 "Table info before alter is {:?}, expr: {:?}",
1629 table.table_info(),
1630 expr
1631 );
1632
1633 let physical_table_id = self
1634 .table_metadata_manager
1635 .table_route_manager()
1636 .get_physical_table_id(table_id)
1637 .await
1638 .context(TableMetadataManagerSnafu)?;
1639
1640 let (req, invalidate_keys) = if physical_table_id == table_id {
1641 let req = SubmitDdlTaskRequest::new(
1643 to_meta_query_context(query_context),
1644 DdlTask::new_alter_table(expr),
1645 );
1646
1647 let invalidate_keys = vec![
1648 CacheIdent::TableId(table_id),
1649 CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1650 ];
1651
1652 (req, invalidate_keys)
1653 } else {
1654 let req = SubmitDdlTaskRequest::new(
1656 to_meta_query_context(query_context),
1657 DdlTask::new_alter_logical_tables(vec![expr]),
1658 );
1659
1660 let mut invalidate_keys = vec![
1661 CacheIdent::TableId(physical_table_id),
1662 CacheIdent::TableId(table_id),
1663 CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1664 ];
1665
1666 let physical_table = self
1667 .table_metadata_manager
1668 .table_info_manager()
1669 .get(physical_table_id)
1670 .await
1671 .context(TableMetadataManagerSnafu)?
1672 .map(|x| x.into_inner());
1673 if let Some(physical_table) = physical_table {
1674 let physical_table_name = TableName::new(
1675 physical_table.table_info.catalog_name,
1676 physical_table.table_info.schema_name,
1677 physical_table.table_info.name,
1678 );
1679 invalidate_keys.push(CacheIdent::TableName(physical_table_name));
1680 }
1681
1682 (req, invalidate_keys)
1683 };
1684
1685 self.procedure_executor
1686 .submit_ddl_task(&ExecutorContext::default(), req)
1687 .await
1688 .context(error::ExecuteDdlSnafu)?;
1689
1690 self.cache_invalidator
1692 .invalidate(&Context::default(), &invalidate_keys)
1693 .await
1694 .context(error::InvalidateTableCacheSnafu)?;
1695
1696 Ok(Output::new_with_affected_rows(0))
1697 }
1698
1699 #[cfg(feature = "enterprise")]
1700 #[tracing::instrument(skip_all)]
1701 pub async fn alter_trigger(
1702 &self,
1703 _alter_expr: AlterTrigger,
1704 _query_context: QueryContextRef,
1705 ) -> Result<Output> {
1706 crate::error::NotSupportedSnafu {
1707 feat: "alter trigger",
1708 }
1709 .fail()
1710 }
1711
1712 #[tracing::instrument(skip_all)]
1713 pub async fn alter_database(
1714 &self,
1715 alter_expr: AlterDatabase,
1716 query_context: QueryContextRef,
1717 ) -> Result<Output> {
1718 let alter_expr = expr_helper::to_alter_database_expr(alter_expr, &query_context)?;
1719 self.alter_database_inner(alter_expr, query_context).await
1720 }
1721
1722 #[tracing::instrument(skip_all)]
1723 pub async fn alter_database_inner(
1724 &self,
1725 alter_expr: AlterDatabaseExpr,
1726 query_context: QueryContextRef,
1727 ) -> Result<Output> {
1728 ensure!(
1729 !is_readonly_schema(&alter_expr.schema_name),
1730 SchemaReadOnlySnafu {
1731 name: query_context.current_schema().clone()
1732 }
1733 );
1734
1735 let exists = self
1736 .catalog_manager
1737 .schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
1738 .await
1739 .context(CatalogSnafu)?;
1740 ensure!(
1741 exists,
1742 SchemaNotFoundSnafu {
1743 schema_info: alter_expr.schema_name,
1744 }
1745 );
1746
1747 let cache_ident = [CacheIdent::SchemaName(SchemaName {
1748 catalog_name: alter_expr.catalog_name.clone(),
1749 schema_name: alter_expr.schema_name.clone(),
1750 })];
1751
1752 self.alter_database_procedure(alter_expr, query_context)
1753 .await?;
1754
1755 self.cache_invalidator
1757 .invalidate(&Context::default(), &cache_ident)
1758 .await
1759 .context(error::InvalidateTableCacheSnafu)?;
1760
1761 Ok(Output::new_with_affected_rows(0))
1762 }
1763
1764 async fn create_table_procedure(
1765 &self,
1766 create_table: CreateTableExpr,
1767 partitions: Vec<PartitionExpr>,
1768 table_info: TableInfo,
1769 query_context: QueryContextRef,
1770 ) -> Result<SubmitDdlTaskResponse> {
1771 let partitions = partitions
1772 .into_iter()
1773 .map(|expr| expr.as_pb_partition().context(PartitionExprToPbSnafu))
1774 .collect::<Result<Vec<_>>>()?;
1775
1776 let request = SubmitDdlTaskRequest::new(
1777 to_meta_query_context(query_context),
1778 DdlTask::new_create_table(create_table, partitions, table_info),
1779 );
1780
1781 self.procedure_executor
1782 .submit_ddl_task(&ExecutorContext::default(), request)
1783 .await
1784 .context(error::ExecuteDdlSnafu)
1785 }
1786
1787 async fn create_logical_tables_procedure(
1788 &self,
1789 tables_data: Vec<(CreateTableExpr, TableInfo)>,
1790 query_context: QueryContextRef,
1791 ) -> Result<SubmitDdlTaskResponse> {
1792 let request = SubmitDdlTaskRequest::new(
1793 to_meta_query_context(query_context),
1794 DdlTask::new_create_logical_tables(tables_data),
1795 );
1796
1797 self.procedure_executor
1798 .submit_ddl_task(&ExecutorContext::default(), request)
1799 .await
1800 .context(error::ExecuteDdlSnafu)
1801 }
1802
1803 async fn alter_logical_tables_procedure(
1804 &self,
1805 tables_data: Vec<AlterTableExpr>,
1806 query_context: QueryContextRef,
1807 ) -> Result<SubmitDdlTaskResponse> {
1808 let request = SubmitDdlTaskRequest::new(
1809 to_meta_query_context(query_context),
1810 DdlTask::new_alter_logical_tables(tables_data),
1811 );
1812
1813 self.procedure_executor
1814 .submit_ddl_task(&ExecutorContext::default(), request)
1815 .await
1816 .context(error::ExecuteDdlSnafu)
1817 }
1818
1819 async fn drop_table_procedure(
1820 &self,
1821 table_name: &TableName,
1822 table_id: TableId,
1823 drop_if_exists: bool,
1824 query_context: QueryContextRef,
1825 ) -> Result<SubmitDdlTaskResponse> {
1826 let request = SubmitDdlTaskRequest::new(
1827 to_meta_query_context(query_context),
1828 DdlTask::new_drop_table(
1829 table_name.catalog_name.clone(),
1830 table_name.schema_name.clone(),
1831 table_name.table_name.clone(),
1832 table_id,
1833 drop_if_exists,
1834 ),
1835 );
1836
1837 self.procedure_executor
1838 .submit_ddl_task(&ExecutorContext::default(), request)
1839 .await
1840 .context(error::ExecuteDdlSnafu)
1841 }
1842
1843 async fn drop_database_procedure(
1844 &self,
1845 catalog: String,
1846 schema: String,
1847 drop_if_exists: bool,
1848 query_context: QueryContextRef,
1849 ) -> Result<SubmitDdlTaskResponse> {
1850 let request = SubmitDdlTaskRequest::new(
1851 to_meta_query_context(query_context),
1852 DdlTask::new_drop_database(catalog, schema, drop_if_exists),
1853 );
1854
1855 self.procedure_executor
1856 .submit_ddl_task(&ExecutorContext::default(), request)
1857 .await
1858 .context(error::ExecuteDdlSnafu)
1859 }
1860
1861 async fn alter_database_procedure(
1862 &self,
1863 alter_expr: AlterDatabaseExpr,
1864 query_context: QueryContextRef,
1865 ) -> Result<SubmitDdlTaskResponse> {
1866 let request = SubmitDdlTaskRequest::new(
1867 to_meta_query_context(query_context),
1868 DdlTask::new_alter_database(alter_expr),
1869 );
1870
1871 self.procedure_executor
1872 .submit_ddl_task(&ExecutorContext::default(), request)
1873 .await
1874 .context(error::ExecuteDdlSnafu)
1875 }
1876
1877 async fn truncate_table_procedure(
1878 &self,
1879 table_name: &TableName,
1880 table_id: TableId,
1881 time_ranges: Vec<(Timestamp, Timestamp)>,
1882 query_context: QueryContextRef,
1883 ) -> Result<SubmitDdlTaskResponse> {
1884 let request = SubmitDdlTaskRequest::new(
1885 to_meta_query_context(query_context),
1886 DdlTask::new_truncate_table(
1887 table_name.catalog_name.clone(),
1888 table_name.schema_name.clone(),
1889 table_name.table_name.clone(),
1890 table_id,
1891 time_ranges,
1892 ),
1893 );
1894
1895 self.procedure_executor
1896 .submit_ddl_task(&ExecutorContext::default(), request)
1897 .await
1898 .context(error::ExecuteDdlSnafu)
1899 }
1900
1901 #[tracing::instrument(skip_all)]
1902 pub async fn create_database(
1903 &self,
1904 database: &str,
1905 create_if_not_exists: bool,
1906 options: HashMap<String, String>,
1907 query_context: QueryContextRef,
1908 ) -> Result<Output> {
1909 let catalog = query_context.current_catalog();
1910 ensure!(
1911 NAME_PATTERN_REG.is_match(catalog),
1912 error::UnexpectedSnafu {
1913 violated: format!("Invalid catalog name: {}", catalog)
1914 }
1915 );
1916
1917 ensure!(
1918 NAME_PATTERN_REG.is_match(database),
1919 error::UnexpectedSnafu {
1920 violated: format!("Invalid database name: {}", database)
1921 }
1922 );
1923
1924 if !self
1925 .catalog_manager
1926 .schema_exists(catalog, database, None)
1927 .await
1928 .context(CatalogSnafu)?
1929 && !self.catalog_manager.is_reserved_schema_name(database)
1930 {
1931 self.create_database_procedure(
1932 catalog.to_string(),
1933 database.to_string(),
1934 create_if_not_exists,
1935 options,
1936 query_context,
1937 )
1938 .await?;
1939
1940 Ok(Output::new_with_affected_rows(1))
1941 } else if create_if_not_exists {
1942 Ok(Output::new_with_affected_rows(1))
1943 } else {
1944 error::SchemaExistsSnafu { name: database }.fail()
1945 }
1946 }
1947
1948 async fn create_database_procedure(
1949 &self,
1950 catalog: String,
1951 database: String,
1952 create_if_not_exists: bool,
1953 options: HashMap<String, String>,
1954 query_context: QueryContextRef,
1955 ) -> Result<SubmitDdlTaskResponse> {
1956 let request = SubmitDdlTaskRequest::new(
1957 to_meta_query_context(query_context),
1958 DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
1959 );
1960
1961 self.procedure_executor
1962 .submit_ddl_task(&ExecutorContext::default(), request)
1963 .await
1964 .context(error::ExecuteDdlSnafu)
1965 }
1966}
1967
1968pub fn parse_partitions(
1970 create_table: &CreateTableExpr,
1971 partitions: Option<Partitions>,
1972 query_ctx: &QueryContextRef,
1973) -> Result<(Vec<PartitionExpr>, Vec<String>)> {
1974 let partition_columns = find_partition_columns(&partitions)?;
1977 let partition_exprs =
1978 find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
1979
1980 let exprs = partition_exprs.clone();
1982 MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true)
1983 .context(InvalidPartitionSnafu)?;
1984
1985 Ok((partition_exprs, partition_columns))
1986}
1987
1988fn parse_partitions_for_logical_validation(
1989 create_table: &CreateTableExpr,
1990 partitions: &Partitions,
1991 query_ctx: &QueryContextRef,
1992) -> Result<(Vec<String>, Vec<PartitionExpr>)> {
1993 let partition_columns = partitions
1994 .column_list
1995 .iter()
1996 .map(|ident| ident.value.clone())
1997 .collect::<Vec<_>>();
1998
1999 let column_name_and_type = partition_columns
2000 .iter()
2001 .map(|pc| {
2002 let column = create_table
2003 .column_defs
2004 .iter()
2005 .find(|c| &c.name == pc)
2006 .context(ColumnNotFoundSnafu { msg: pc.clone() })?;
2007 let column_name = &column.name;
2008 let data_type = ConcreteDataType::from(
2009 ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
2010 .context(ColumnDataTypeSnafu)?,
2011 );
2012 Ok((column_name, data_type))
2013 })
2014 .collect::<Result<HashMap<_, _>>>()?;
2015
2016 let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
2017 for expr in &partitions.exprs {
2018 let partition_expr = convert_one_expr(expr, &column_name_and_type, &query_ctx.timezone())?;
2019 partition_exprs.push(partition_expr);
2020 }
2021
2022 MultiDimPartitionRule::try_new(
2023 partition_columns.clone(),
2024 vec![],
2025 partition_exprs.clone(),
2026 true,
2027 )
2028 .context(InvalidPartitionSnafu)?;
2029
2030 Ok((partition_columns, partition_exprs))
2031}
2032
2033pub fn verify_alter(
2039 table_id: TableId,
2040 table_info: Arc<TableInfo>,
2041 expr: AlterTableExpr,
2042) -> Result<bool> {
2043 let request: AlterTableRequest =
2044 common_grpc_expr::alter_expr_to_request(table_id, expr, Some(&table_info.meta))
2045 .context(AlterExprToRequestSnafu)?;
2046
2047 let AlterTableRequest {
2048 table_name,
2049 alter_kind,
2050 ..
2051 } = &request;
2052
2053 if let AlterKind::RenameTable { new_table_name } = alter_kind {
2054 ensure!(
2055 NAME_PATTERN_REG.is_match(new_table_name),
2056 error::UnexpectedSnafu {
2057 violated: format!("Invalid table name: {}", new_table_name)
2058 }
2059 );
2060 } else if let AlterKind::AddColumns { columns } = alter_kind {
2061 let column_names: HashSet<_> = table_info
2064 .meta
2065 .schema
2066 .column_schemas()
2067 .iter()
2068 .map(|schema| &schema.name)
2069 .collect();
2070 if columns.iter().all(|column| {
2071 column_names.contains(&column.column_schema.name) && column.add_if_not_exists
2072 }) {
2073 return Ok(false);
2074 }
2075 }
2076
2077 let _ = table_info
2078 .meta
2079 .builder_with_alter_kind(table_name, &request.alter_kind)
2080 .context(error::TableSnafu)?
2081 .build()
2082 .context(error::BuildTableMetaSnafu { table_name })?;
2083
2084 Ok(true)
2085}
2086
2087pub fn create_table_info(
2088 create_table: &CreateTableExpr,
2089 partition_columns: Vec<String>,
2090) -> Result<TableInfo> {
2091 let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
2092 let mut column_name_to_index_map = HashMap::new();
2093
2094 for (idx, column) in create_table.column_defs.iter().enumerate() {
2095 let schema =
2096 column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
2097 column: &column.name,
2098 })?;
2099 let schema = schema.with_time_index(column.name == create_table.time_index);
2100
2101 column_schemas.push(schema);
2102 let _ = column_name_to_index_map.insert(column.name.clone(), idx);
2103 }
2104
2105 let next_column_id = column_schemas.len() as u32;
2106 let schema = Arc::new(Schema::new(column_schemas));
2107
2108 let primary_key_indices = create_table
2109 .primary_keys
2110 .iter()
2111 .map(|name| {
2112 column_name_to_index_map
2113 .get(name)
2114 .cloned()
2115 .context(ColumnNotFoundSnafu { msg: name })
2116 })
2117 .collect::<Result<Vec<_>>>()?;
2118
2119 let partition_key_indices = partition_columns
2120 .into_iter()
2121 .map(|col_name| {
2122 column_name_to_index_map
2123 .get(&col_name)
2124 .cloned()
2125 .context(ColumnNotFoundSnafu { msg: col_name })
2126 })
2127 .collect::<Result<Vec<_>>>()?;
2128
2129 let table_options = TableOptions::try_from_iter(&create_table.table_options)
2130 .context(UnrecognizedTableOptionSnafu)?;
2131
2132 let meta = TableMeta {
2133 schema,
2134 primary_key_indices,
2135 value_indices: vec![],
2136 engine: create_table.engine.clone(),
2137 next_column_id,
2138 options: table_options,
2139 created_on: Utc::now(),
2140 updated_on: Utc::now(),
2141 partition_key_indices,
2142 column_ids: vec![],
2143 };
2144
2145 let desc = if create_table.desc.is_empty() {
2146 create_table.table_options.get(COMMENT_KEY).cloned()
2147 } else {
2148 Some(create_table.desc.clone())
2149 };
2150
2151 let table_info = TableInfo {
2152 ident: metadata::TableIdent {
2153 table_id: 0,
2155 version: 0,
2156 },
2157 name: create_table.table_name.clone(),
2158 desc,
2159 catalog_name: create_table.catalog_name.clone(),
2160 schema_name: create_table.schema_name.clone(),
2161 meta,
2162 table_type: TableType::Base,
2163 };
2164 Ok(table_info)
2165}
2166
2167fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
2168 let columns = if let Some(partitions) = partitions {
2169 partitions
2170 .column_list
2171 .iter()
2172 .map(|x| x.value.clone())
2173 .collect::<Vec<_>>()
2174 } else {
2175 vec![]
2176 };
2177 Ok(columns)
2178}
2179
2180fn find_partition_entries(
2184 create_table: &CreateTableExpr,
2185 partitions: &Option<Partitions>,
2186 partition_columns: &[String],
2187 query_ctx: &QueryContextRef,
2188) -> Result<Vec<PartitionExpr>> {
2189 let Some(partitions) = partitions else {
2190 return Ok(vec![]);
2191 };
2192
2193 let column_name_and_type = partition_columns
2195 .iter()
2196 .map(|pc| {
2197 let column = create_table
2198 .column_defs
2199 .iter()
2200 .find(|c| &c.name == pc)
2201 .unwrap();
2203 let column_name = &column.name;
2204 let data_type = ConcreteDataType::from(
2205 ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
2206 .context(ColumnDataTypeSnafu)?,
2207 );
2208 Ok((column_name, data_type))
2209 })
2210 .collect::<Result<HashMap<_, _>>>()?;
2211
2212 let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
2214 for partition in &partitions.exprs {
2215 let partition_expr =
2216 convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
2217 partition_exprs.push(partition_expr);
2218 }
2219
2220 Ok(partition_exprs)
2221}
2222
2223fn convert_one_expr(
2224 expr: &Expr,
2225 column_name_and_type: &HashMap<&String, ConcreteDataType>,
2226 timezone: &Timezone,
2227) -> Result<PartitionExpr> {
2228 let Expr::BinaryOp { left, op, right } = expr else {
2229 return InvalidPartitionRuleSnafu {
2230 reason: "partition rule must be a binary expression",
2231 }
2232 .fail();
2233 };
2234
2235 let op =
2236 RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
2237 reason: format!("unsupported operator in partition expr {op}"),
2238 })?;
2239
2240 let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
2242 (Expr::Identifier(ident), Expr::Value(value)) => {
2244 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2245 let value = convert_value(&value.value, data_type, timezone, None)?;
2246 (Operand::Column(column_name), op, Operand::Value(value))
2247 }
2248 (Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
2249 if let Expr::Value(v) = &**expr =>
2250 {
2251 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2252 let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
2253 (Operand::Column(column_name), op, Operand::Value(value))
2254 }
2255 (Expr::Value(value), Expr::Identifier(ident)) => {
2257 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2258 let value = convert_value(&value.value, data_type, timezone, None)?;
2259 (Operand::Value(value), op, Operand::Column(column_name))
2260 }
2261 (Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
2262 if let Expr::Value(v) = &**expr =>
2263 {
2264 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2265 let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
2266 (Operand::Value(value), op, Operand::Column(column_name))
2267 }
2268 (Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
2269 let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
2271 let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
2272 (Operand::Expr(lhs), op, Operand::Expr(rhs))
2273 }
2274 _ => {
2275 return InvalidPartitionRuleSnafu {
2276 reason: format!("invalid partition expr {expr}"),
2277 }
2278 .fail();
2279 }
2280 };
2281
2282 Ok(PartitionExpr::new(lhs, op, rhs))
2283}
2284
2285fn convert_identifier(
2286 ident: &Ident,
2287 column_name_and_type: &HashMap<&String, ConcreteDataType>,
2288) -> Result<(String, ConcreteDataType)> {
2289 let column_name = ident.value.clone();
2290 let data_type = column_name_and_type
2291 .get(&column_name)
2292 .cloned()
2293 .with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
2294 Ok((column_name, data_type))
2295}
2296
2297fn convert_value(
2298 value: &ParserValue,
2299 data_type: ConcreteDataType,
2300 timezone: &Timezone,
2301 unary_op: Option<UnaryOperator>,
2302) -> Result<Value> {
2303 sql_value_to_value(
2304 &ColumnSchema::new("<NONAME>", data_type, true),
2305 value,
2306 Some(timezone),
2307 unary_op,
2308 false,
2309 )
2310 .context(error::SqlCommonSnafu)
2311}
2312
2313#[cfg(test)]
2314mod test {
2315 use std::time::Duration;
2316
2317 use session::context::{QueryContext, QueryContextBuilder};
2318 use sql::dialect::GreptimeDbDialect;
2319 use sql::parser::{ParseOptions, ParserContext};
2320 use sql::statements::statement::Statement;
2321 use sqlparser::parser::Parser;
2322
2323 use super::*;
2324 use crate::expr_helper;
2325
2326 #[test]
2327 fn test_parse_ddl_options() {
2328 let options = OptionMap::from([
2329 ("timeout".to_string(), "5m".to_string()),
2330 ("wait".to_string(), "false".to_string()),
2331 ]);
2332 let ddl_options = parse_ddl_options(&options).unwrap();
2333 assert!(!ddl_options.wait);
2334 assert_eq!(Duration::from_secs(300), ddl_options.timeout);
2335 }
2336
2337 #[test]
2338 fn test_name_is_match() {
2339 assert!(!NAME_PATTERN_REG.is_match("/adaf"));
2340 assert!(!NAME_PATTERN_REG.is_match("🈲"));
2341 assert!(NAME_PATTERN_REG.is_match("hello"));
2342 assert!(NAME_PATTERN_REG.is_match("test@"));
2343 assert!(!NAME_PATTERN_REG.is_match("@test"));
2344 assert!(NAME_PATTERN_REG.is_match("test#"));
2345 assert!(!NAME_PATTERN_REG.is_match("#test"));
2346 assert!(!NAME_PATTERN_REG.is_match("@"));
2347 assert!(!NAME_PATTERN_REG.is_match("#"));
2348 }
2349
2350 #[test]
2351 fn test_partition_expr_equivalence_with_swapped_operands() {
2352 let column_name = "device_id".to_string();
2353 let column_name_and_type =
2354 HashMap::from([(&column_name, ConcreteDataType::int32_datatype())]);
2355 let timezone = Timezone::from_tz_string("UTC").unwrap();
2356 let dialect = GreptimeDbDialect {};
2357
2358 let mut parser = Parser::new(&dialect)
2359 .try_with_sql("device_id < 100")
2360 .unwrap();
2361 let expr_left = parser.parse_expr().unwrap();
2362
2363 let mut parser = Parser::new(&dialect)
2364 .try_with_sql("100 > device_id")
2365 .unwrap();
2366 let expr_right = parser.parse_expr().unwrap();
2367
2368 let partition_left =
2369 convert_one_expr(&expr_left, &column_name_and_type, &timezone).unwrap();
2370 let partition_right =
2371 convert_one_expr(&expr_right, &column_name_and_type, &timezone).unwrap();
2372
2373 assert_eq!(partition_left, partition_right);
2374 assert!([partition_left.clone()].contains(&partition_right));
2375
2376 let mut physical_partition_exprs = vec![partition_left];
2377 let mut logical_partition_exprs = vec![partition_right];
2378 physical_partition_exprs.sort_unstable();
2379 logical_partition_exprs.sort_unstable();
2380 assert_eq!(physical_partition_exprs, logical_partition_exprs);
2381 }
2382
2383 #[tokio::test]
2384 #[ignore = "TODO(ruihang): WIP new partition rule"]
2385 async fn test_parse_partitions() {
2386 common_telemetry::init_default_ut_logging();
2387 let cases = [
2388 (
2389 r"
2390CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
2391PARTITION ON COLUMNS (b) (
2392 b < 'hz',
2393 b >= 'hz' AND b < 'sh',
2394 b >= 'sh'
2395)
2396ENGINE=mito",
2397 r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
2398 ),
2399 (
2400 r"
2401CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
2402PARTITION BY RANGE COLUMNS (b, a) (
2403 PARTITION r0 VALUES LESS THAN ('hz', 10),
2404 b < 'hz' AND a < 10,
2405 b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20,
2406 b >= 'sh' AND a >= 20
2407)
2408ENGINE=mito",
2409 r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
2410 ),
2411 ];
2412 let ctx = QueryContextBuilder::default().build().into();
2413 for (sql, expected) in cases {
2414 let result = ParserContext::create_with_dialect(
2415 sql,
2416 &GreptimeDbDialect {},
2417 ParseOptions::default(),
2418 )
2419 .unwrap();
2420 match &result[0] {
2421 Statement::CreateTable(c) => {
2422 let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
2423 let (partitions, _) =
2424 parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
2425 let json = serde_json::to_string(&partitions).unwrap();
2426 assert_eq!(json, expected);
2427 }
2428 _ => unreachable!(),
2429 }
2430 }
2431 }
2432}