1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::helper::ColumnDataTypeWrapper;
20use api::v1::alter_table_expr::Kind;
21use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
22use api::v1::repartition::Source;
23use api::v1::{
24 AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
25 PartitionedSource, Repartition, TargetPartitionColumns, UnpartitionedSource, column_def,
26};
27#[cfg(feature = "enterprise")]
28use api::v1::{
29 CreateTriggerExpr as PbCreateTriggerExpr, meta::CreateTriggerTask as PbCreateTriggerTask,
30};
31use catalog::CatalogManagerRef;
32use chrono::Utc;
33use common_base::regex_pattern::NAME_PATTERN_REG;
34use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, is_readonly_schema};
35use common_catalog::{format_full_flow_name, format_full_table_name};
36use common_error::ext::BoxedError;
37use common_meta::cache_invalidator::Context;
38use common_meta::ddl::create_flow::{
39 DEFER_ON_MISSING_SOURCE_KEY, FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY, FlowType,
40};
41use common_meta::instruction::CacheIdent;
42use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
43use common_meta::procedure_executor::ExecutorContext;
44#[cfg(feature = "enterprise")]
45use common_meta::rpc::ddl::trigger::CreateTriggerTask;
46#[cfg(feature = "enterprise")]
47use common_meta::rpc::ddl::trigger::DropTriggerTask;
48use common_meta::rpc::ddl::{
49 CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
50 SubmitDdlTaskResponse,
51};
52use common_query::Output;
53use common_recordbatch::{RecordBatch, RecordBatches};
54use common_sql::convert::sql_value_to_value;
55use common_telemetry::{debug, info, tracing, warn};
56use common_time::{Timestamp, Timezone};
57use datafusion_common::tree_node::TreeNodeVisitor;
58use datafusion_expr::LogicalPlan;
59use datatypes::prelude::ConcreteDataType;
60use datatypes::schema::{ColumnSchema, Schema};
61use datatypes::value::Value;
62use datatypes::vectors::{StringVector, VectorRef};
63use humantime::parse_duration;
64use partition::expr::{Operand, PartitionExpr, RestrictedOp};
65use partition::multi_dim::MultiDimPartitionRule;
66use query::parser::QueryStatement;
67use query::plan::extract_and_rewrite_full_table_names;
68use query::query_engine::DefaultSerializer;
69use query::sql::create_table_stmt;
70use session::context::QueryContextRef;
71use session::table_name::table_idents_to_full_name;
72use snafu::{OptionExt, ResultExt, ensure};
73use sql::parser::{ParseOptions, ParserContext};
74use sql::parsers::utils::is_tql;
75use sql::statements::OptionMap;
76#[cfg(feature = "enterprise")]
77use sql::statements::alter::trigger::AlterTrigger;
78use sql::statements::alter::{AlterDatabase, AlterTable, AlterTableOperation};
79#[cfg(feature = "enterprise")]
80use sql::statements::create::trigger::CreateTrigger;
81use sql::statements::create::{
82 CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
83};
84use sql::statements::statement::Statement;
85use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
86use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
87use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
88use table::TableRef;
89use table::dist_table::DistTable;
90use table::metadata::{self, TableId, TableInfo, TableMeta, TableType};
91use table::requests::{
92 AlterKind, AlterTableRequest, COMMENT_KEY, DDL_TIMEOUT, DDL_WAIT, REPARTITION_COLUMN_HINT_KEY,
93 TableOptions,
94};
95use table::table_name::TableName;
96use table::table_reference::TableReference;
97
98use crate::error::{
99 self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
100 ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu,
101 DeserializePartitionExprSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
102 FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
103 InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu,
104 PartitionExprToPbSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu,
105 SerializePartitionExprSnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu,
106 TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
107 ViewAlreadyExistsSnafu,
108};
109use crate::expr_helper::{self, RepartitionRequest, RepartitionSource};
110use crate::statement::StatementExecutor;
111use crate::statement::show::create_partitions_stmt;
112use crate::utils::{to_meta_query_context, to_meta_query_context_with_origin_frontend};
113
114#[derive(Debug, Clone, Copy)]
115struct DdlSubmitOptions {
116 wait: bool,
117 timeout: Duration,
118}
119
120const ALLOWED_FLOW_OPTIONS: [&str; 2] = [
121 DEFER_ON_MISSING_SOURCE_KEY,
122 FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY,
123];
124
125fn build_procedure_id_output(procedure_id: Vec<u8>) -> Result<Output> {
126 let procedure_id = String::from_utf8_lossy(&procedure_id).to_string();
127 let vector: VectorRef = Arc::new(StringVector::from(vec![procedure_id]));
128 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
129 "Procedure ID",
130 vector.data_type(),
131 false,
132 )]));
133 let batch =
134 RecordBatch::new(schema.clone(), vec![vector]).context(error::BuildRecordBatchSnafu)?;
135 let batches =
136 RecordBatches::try_new(schema, vec![batch]).context(error::BuildRecordBatchSnafu)?;
137 Ok(Output::new_with_record_batches(batches))
138}
139
140fn parse_ddl_options(options: &OptionMap) -> Result<DdlSubmitOptions> {
141 let wait = match options.get(DDL_WAIT) {
142 Some(value) => value.parse::<bool>().map_err(|_| {
143 InvalidSqlSnafu {
144 err_msg: format!("invalid DDL option '{DDL_WAIT}': '{value}'"),
145 }
146 .build()
147 })?,
148 None => SubmitDdlTaskRequest::default_wait(),
149 };
150
151 let timeout = match options.get(DDL_TIMEOUT) {
152 Some(value) => parse_duration(value).map_err(|err| {
153 InvalidSqlSnafu {
154 err_msg: format!("invalid DDL option '{DDL_TIMEOUT}': '{value}': {err}"),
155 }
156 .build()
157 })?,
158 None => SubmitDdlTaskRequest::default_timeout(),
159 };
160
161 Ok(DdlSubmitOptions { wait, timeout })
162}
163
164fn supported_flow_options() -> String {
165 ALLOWED_FLOW_OPTIONS.join(", ")
166}
167
168fn normalize_flow_bool_option(key: &str, value: &str) -> Result<String> {
169 value
170 .trim()
171 .to_ascii_lowercase()
172 .parse::<bool>()
173 .map(|value| value.to_string())
174 .map_err(|_| {
175 InvalidSqlSnafu {
176 err_msg: format!("invalid flow option '{key}': '{value}'"),
177 }
178 .build()
179 })
180}
181
182fn validate_and_normalize_flow_options(
183 options: HashMap<String, String>,
184) -> Result<HashMap<String, String>> {
185 options
186 .into_iter()
187 .map(|(key, value)| {
188 if key == FlowType::FLOW_TYPE_KEY {
189 return InvalidSqlSnafu {
190 err_msg: format!("flow option '{key}' is reserved for internal use"),
191 }
192 .fail();
193 }
194
195 let normalized_value = match key.as_str() {
196 DEFER_ON_MISSING_SOURCE_KEY | FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY => {
197 normalize_flow_bool_option(&key, &value)?
198 }
199 _ => {
200 return InvalidSqlSnafu {
201 err_msg: format!(
202 "unknown flow option '{key}', supported options: {}",
203 supported_flow_options()
204 ),
205 }
206 .fail();
207 }
208 };
209
210 Ok((key, normalized_value))
211 })
212 .collect()
213}
214
215fn determine_flow_type_for_source_state(
216 flow_name: &str,
217 flow_options: &HashMap<String, String>,
218 has_missing_source_table: bool,
219 has_instant_ttl_source_table: bool,
220) -> Result<Option<FlowType>> {
221 if has_missing_source_table {
222 let defer_on_missing_source = flow_options
223 .get(DEFER_ON_MISSING_SOURCE_KEY)
224 .is_some_and(|value| value == "true");
225 ensure!(
226 defer_on_missing_source,
227 InvalidSqlSnafu {
228 err_msg: format!(
229 "missing source tables for flow '{}'; use WITH ({DEFER_ON_MISSING_SOURCE_KEY} = true) to create a pending flow",
230 flow_name
231 )
232 }
233 );
234 info!(
235 "Flow `{}` is created as a pending batching flow because source tables are missing and defer_on_missing_source=true",
236 flow_name
237 );
238 return Ok(Some(FlowType::Batching));
239 }
240
241 if has_instant_ttl_source_table {
242 return Ok(Some(FlowType::Streaming));
243 }
244
245 Ok(None)
246}
247
248impl StatementExecutor {
249 pub fn catalog_manager(&self) -> CatalogManagerRef {
250 self.catalog_manager.clone()
251 }
252
253 #[tracing::instrument(skip_all)]
254 pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
255 let (catalog, schema, _table) = table_idents_to_full_name(&stmt.name, &ctx)
256 .map_err(BoxedError::new)
257 .context(error::ExternalSnafu)?;
258
259 let schema_options = self
260 .table_metadata_manager
261 .schema_manager()
262 .get(SchemaNameKey {
263 catalog: &catalog,
264 schema: &schema,
265 })
266 .await
267 .context(TableMetadataManagerSnafu)?
268 .map(|v| v.into_inner());
269
270 let create_expr = &mut expr_helper::create_to_expr(&stmt, &ctx)?;
271 if let Some(schema_options) = schema_options {
274 for (key, value) in schema_options.extra_options.iter() {
275 if key.starts_with("compaction.") {
276 continue;
277 }
278 create_expr
279 .table_options
280 .entry(key.clone())
281 .or_insert(value.clone());
282 }
283 }
284
285 self.create_table_inner(create_expr, stmt.partitions, ctx)
286 .await
287 }
288
289 #[tracing::instrument(skip_all)]
290 pub async fn create_table_like(
291 &self,
292 stmt: CreateTableLike,
293 ctx: QueryContextRef,
294 ) -> Result<TableRef> {
295 let (catalog, schema, table) = table_idents_to_full_name(&stmt.source_name, &ctx)
296 .map_err(BoxedError::new)
297 .context(error::ExternalSnafu)?;
298 let table_ref = self
299 .catalog_manager
300 .table(&catalog, &schema, &table, Some(&ctx))
301 .await
302 .context(CatalogSnafu)?
303 .context(TableNotFoundSnafu { table_name: &table })?;
304 let partition_info = self
305 .partition_manager
306 .find_physical_partition_info(table_ref.table_info().table_id())
307 .await
308 .context(error::FindTablePartitionRuleSnafu { table_name: table })?;
309
310 let schema_options = self
312 .table_metadata_manager
313 .schema_manager()
314 .get(SchemaNameKey {
315 catalog: &catalog,
316 schema: &schema,
317 })
318 .await
319 .context(TableMetadataManagerSnafu)?
320 .map(|v| v.into_inner());
321
322 let quote_style = ctx.quote_style();
323 let mut create_stmt =
324 create_table_stmt(&table_ref.table_info(), schema_options, quote_style)
325 .context(error::ParseQuerySnafu)?;
326 create_stmt.name = stmt.table_name;
327 create_stmt.if_not_exists = false;
328
329 let table_info = table_ref.table_info();
330 let partitions = create_partitions_stmt(&table_info, &partition_info.partitions)?.and_then(
331 |mut partitions| {
332 if !partitions.column_list.is_empty() {
333 partitions.set_quote(quote_style);
334 Some(partitions)
335 } else {
336 None
337 }
338 },
339 );
340
341 let create_expr = &mut expr_helper::create_to_expr(&create_stmt, &ctx)?;
342 self.create_table_inner(create_expr, partitions, ctx).await
343 }
344
345 #[tracing::instrument(skip_all)]
346 pub async fn create_external_table(
347 &self,
348 create_expr: CreateExternalTable,
349 ctx: QueryContextRef,
350 ) -> Result<TableRef> {
351 let create_expr = &mut expr_helper::create_external_expr(create_expr, &ctx).await?;
352 self.create_table_inner(create_expr, None, ctx).await
353 }
354
355 #[tracing::instrument(skip_all)]
356 pub async fn create_table_inner(
357 &self,
358 create_table: &mut CreateTableExpr,
359 partitions: Option<Partitions>,
360 query_ctx: QueryContextRef,
361 ) -> Result<TableRef> {
362 ensure!(
363 !is_readonly_schema(&create_table.schema_name),
364 SchemaReadOnlySnafu {
365 name: create_table.schema_name.clone()
366 }
367 );
368
369 if create_table.engine == METRIC_ENGINE_NAME
370 && create_table
371 .table_options
372 .contains_key(LOGICAL_TABLE_METADATA_KEY)
373 {
374 if let Some(partitions) = partitions.as_ref()
375 && !partitions.exprs.is_empty()
376 {
377 self.validate_logical_table_partition_rule(create_table, partitions, &query_ctx)
378 .await?;
379 }
380 self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
382 .await?
383 .into_iter()
384 .next()
385 .context(error::UnexpectedSnafu {
386 violated: "expected to create logical tables",
387 })
388 } else {
389 self.create_non_logic_table(create_table, partitions, query_ctx)
391 .await
392 }
393 }
394
395 #[tracing::instrument(skip_all)]
396 pub async fn create_non_logic_table(
397 &self,
398 create_table: &mut CreateTableExpr,
399 partitions: Option<Partitions>,
400 query_ctx: QueryContextRef,
401 ) -> Result<TableRef> {
402 let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
403
404 let schema = self
406 .table_metadata_manager
407 .schema_manager()
408 .get(SchemaNameKey::new(
409 &create_table.catalog_name,
410 &create_table.schema_name,
411 ))
412 .await
413 .context(TableMetadataManagerSnafu)?;
414 ensure!(
415 schema.is_some(),
416 SchemaNotFoundSnafu {
417 schema_info: &create_table.schema_name,
418 }
419 );
420
421 if let Some(table) = self
423 .catalog_manager
424 .table(
425 &create_table.catalog_name,
426 &create_table.schema_name,
427 &create_table.table_name,
428 Some(&query_ctx),
429 )
430 .await
431 .context(CatalogSnafu)?
432 {
433 return if create_table.create_if_not_exists {
434 Ok(table)
435 } else {
436 TableAlreadyExistsSnafu {
437 table: format_full_table_name(
438 &create_table.catalog_name,
439 &create_table.schema_name,
440 &create_table.table_name,
441 ),
442 }
443 .fail()
444 };
445 }
446
447 ensure!(
448 NAME_PATTERN_REG.is_match(&create_table.table_name),
449 InvalidTableNameSnafu {
450 table_name: &create_table.table_name,
451 }
452 );
453
454 let table_name = TableName::new(
455 &create_table.catalog_name,
456 &create_table.schema_name,
457 &create_table.table_name,
458 );
459
460 let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
461 let mut table_info = create_table_info(create_table, partition_cols)?;
462
463 let resp = self
464 .create_table_procedure(
465 create_table.clone(),
466 partitions,
467 table_info.clone(),
468 query_ctx,
469 )
470 .await?;
471
472 let table_id = resp
473 .table_ids
474 .into_iter()
475 .next()
476 .context(error::UnexpectedSnafu {
477 violated: "expected table_id",
478 })?;
479 info!("Successfully created table '{table_name}' with table id {table_id}");
480
481 table_info.ident.table_id = table_id;
482
483 let table_info = Arc::new(table_info);
484 create_table.table_id = Some(api::v1::TableId { id: table_id });
485
486 let table = DistTable::table(table_info);
487
488 Ok(table)
489 }
490
491 #[tracing::instrument(skip_all)]
492 pub async fn create_logical_tables(
493 &self,
494 create_table_exprs: &[CreateTableExpr],
495 query_context: QueryContextRef,
496 ) -> Result<Vec<TableRef>> {
497 let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
498 ensure!(
499 !create_table_exprs.is_empty(),
500 EmptyDdlExprSnafu {
501 name: "create logic tables"
502 }
503 );
504
505 for create_table in create_table_exprs {
507 ensure!(
508 NAME_PATTERN_REG.is_match(&create_table.table_name),
509 InvalidTableNameSnafu {
510 table_name: &create_table.table_name,
511 }
512 );
513 }
514
515 let raw_tables_info = create_table_exprs
516 .iter()
517 .map(|create| create_table_info(create, vec![]))
518 .collect::<Result<Vec<_>>>()?;
519 let tables_data = create_table_exprs
520 .iter()
521 .cloned()
522 .zip(raw_tables_info.iter().cloned())
523 .collect::<Vec<_>>();
524
525 let resp = self
526 .create_logical_tables_procedure(tables_data, query_context.clone())
527 .await?;
528
529 let table_ids = resp.table_ids;
530 ensure!(
531 table_ids.len() == raw_tables_info.len(),
532 CreateLogicalTablesSnafu {
533 reason: format!(
534 "The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}",
535 raw_tables_info.len(),
536 table_ids.len()
537 )
538 }
539 );
540 info!("Successfully created logical tables: {:?}", table_ids);
541
542 let mut tables_info = Vec::with_capacity(table_ids.len());
546 for (table_id, create_table) in table_ids.iter().zip(create_table_exprs.iter()) {
547 let table = self
548 .catalog_manager
549 .table(
550 &create_table.catalog_name,
551 &create_table.schema_name,
552 &create_table.table_name,
553 Some(&query_context),
554 )
555 .await
556 .context(CatalogSnafu)?
557 .with_context(|| TableNotFoundSnafu {
558 table_name: format_full_table_name(
559 &create_table.catalog_name,
560 &create_table.schema_name,
561 &create_table.table_name,
562 ),
563 })?;
564
565 let table_info = table.table_info();
566 ensure!(
568 table_info.table_id() == *table_id,
569 CreateLogicalTablesSnafu {
570 reason: format!(
571 "Table id mismatch after creation, expected {}, got {} for table {}",
572 table_id,
573 table_info.table_id(),
574 format_full_table_name(
575 &create_table.catalog_name,
576 &create_table.schema_name,
577 &create_table.table_name
578 )
579 )
580 }
581 );
582
583 tables_info.push(table_info);
584 }
585
586 Ok(tables_info.into_iter().map(DistTable::table).collect())
587 }
588
589 async fn validate_logical_table_partition_rule(
590 &self,
591 create_table: &CreateTableExpr,
592 partitions: &Partitions,
593 query_ctx: &QueryContextRef,
594 ) -> Result<()> {
595 let (_, mut logical_partition_exprs) =
596 parse_partitions_for_logical_validation(create_table, partitions, query_ctx)?;
597
598 let physical_table_name = create_table
599 .table_options
600 .get(LOGICAL_TABLE_METADATA_KEY)
601 .with_context(|| CreateLogicalTablesSnafu {
602 reason: format!(
603 "expect `{LOGICAL_TABLE_METADATA_KEY}` option on creating logical table"
604 ),
605 })?;
606
607 let physical_table = self
608 .catalog_manager
609 .table(
610 &create_table.catalog_name,
611 &create_table.schema_name,
612 physical_table_name,
613 Some(query_ctx),
614 )
615 .await
616 .context(CatalogSnafu)?
617 .context(TableNotFoundSnafu {
618 table_name: physical_table_name.clone(),
619 })?;
620
621 let physical_table_info = physical_table.table_info();
622 let (partition_rule, _) = self
623 .partition_manager
624 .find_table_partition_rule(&physical_table_info)
625 .await
626 .context(error::FindTablePartitionRuleSnafu {
627 table_name: physical_table_name.clone(),
628 })?;
629
630 let multi_dim_rule = partition_rule
631 .as_ref()
632 .as_any()
633 .downcast_ref::<MultiDimPartitionRule>()
634 .context(InvalidPartitionRuleSnafu {
635 reason: "physical table partition rule is not range-based",
636 })?;
637
638 let mut physical_partition_exprs = multi_dim_rule.exprs().to_vec();
640 logical_partition_exprs.sort_unstable();
641 physical_partition_exprs.sort_unstable();
642
643 ensure!(
644 physical_partition_exprs == logical_partition_exprs,
645 InvalidPartitionRuleSnafu {
646 reason: format!(
647 "logical table partition rule must match the corresponding physical table's\n logical table partition exprs:\t\t {:?}\n physical table partition exprs:\t {:?}",
648 logical_partition_exprs, physical_partition_exprs
649 ),
650 }
651 );
652
653 Ok(())
654 }
655
656 #[cfg(feature = "enterprise")]
657 #[tracing::instrument(skip_all)]
658 pub async fn create_trigger(
659 &self,
660 stmt: CreateTrigger,
661 query_context: QueryContextRef,
662 ) -> Result<Output> {
663 let expr = expr_helper::to_create_trigger_task_expr(stmt, &query_context)?;
664 self.create_trigger_inner(expr, query_context).await
665 }
666
667 #[cfg(feature = "enterprise")]
668 pub async fn create_trigger_inner(
669 &self,
670 expr: PbCreateTriggerExpr,
671 query_context: QueryContextRef,
672 ) -> Result<Output> {
673 self.create_trigger_procedure(expr, query_context).await?;
674 Ok(Output::new_with_affected_rows(0))
675 }
676
677 #[cfg(feature = "enterprise")]
678 async fn create_trigger_procedure(
679 &self,
680 expr: PbCreateTriggerExpr,
681 query_context: QueryContextRef,
682 ) -> Result<SubmitDdlTaskResponse> {
683 let task = CreateTriggerTask::try_from(PbCreateTriggerTask {
684 create_trigger: Some(expr),
685 })
686 .context(error::InvalidExprSnafu)?;
687
688 let request = SubmitDdlTaskRequest::new(
689 to_meta_query_context(query_context),
690 DdlTask::new_create_trigger(task),
691 );
692
693 self.procedure_executor
694 .submit_ddl_task(&ExecutorContext::default(), request)
695 .await
696 .context(error::ExecuteDdlSnafu)
697 }
698
699 #[tracing::instrument(skip_all)]
700 pub async fn create_flow(
701 &self,
702 stmt: CreateFlow,
703 query_context: QueryContextRef,
704 ) -> Result<Output> {
705 let expr = expr_helper::to_create_flow_task_expr(stmt, &query_context)?;
707
708 self.create_flow_inner(expr, query_context).await
709 }
710
711 pub async fn create_flow_inner(
712 &self,
713 expr: CreateFlowExpr,
714 query_context: QueryContextRef,
715 ) -> Result<Output> {
716 self.create_flow_procedure(expr, query_context).await?;
717 Ok(Output::new_with_affected_rows(0))
718 }
719
720 async fn create_flow_procedure(
721 &self,
722 mut expr: CreateFlowExpr,
723 query_context: QueryContextRef,
724 ) -> Result<SubmitDdlTaskResponse> {
725 expr.flow_options = validate_and_normalize_flow_options(expr.flow_options)?;
726
727 let flow_type = self
728 .determine_flow_type(&expr, query_context.clone())
729 .await?;
730 info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
731
732 expr.flow_options
733 .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
734
735 let task = CreateFlowTask::try_from(PbCreateFlowTask {
736 create_flow: Some(expr),
737 })
738 .context(error::InvalidExprSnafu)?;
739 let request = SubmitDdlTaskRequest::new(
740 to_meta_query_context(query_context),
741 DdlTask::new_create_flow(task),
742 );
743
744 self.procedure_executor
745 .submit_ddl_task(&ExecutorContext::default(), request)
746 .await
747 .context(error::ExecuteDdlSnafu)
748 }
749
750 async fn determine_flow_type(
754 &self,
755 expr: &CreateFlowExpr,
756 query_ctx: QueryContextRef,
757 ) -> Result<FlowType> {
758 let mut has_missing_source_table = false;
759 let mut has_instant_ttl_source_table = false;
760
761 for src_table_name in &expr.source_table_names {
762 let table = self
763 .catalog_manager()
764 .table(
765 &src_table_name.catalog_name,
766 &src_table_name.schema_name,
767 &src_table_name.table_name,
768 Some(&query_ctx),
769 )
770 .await
771 .map_err(BoxedError::new)
772 .context(ExternalSnafu)?;
773
774 let Some(table) = table else {
775 has_missing_source_table = true;
776 continue;
777 };
778
779 if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
780 warn!(
781 "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
782 format_full_table_name(
783 &src_table_name.catalog_name,
784 &src_table_name.schema_name,
785 &src_table_name.table_name
786 ),
787 expr.flow_name
788 );
789 has_instant_ttl_source_table = true;
790 }
791 }
792
793 if let Some(flow_type) = determine_flow_type_for_source_state(
794 &expr.flow_name,
795 &expr.flow_options,
796 has_missing_source_table,
797 has_instant_ttl_source_table,
798 )? {
799 return Ok(flow_type);
800 }
801
802 let engine = &self.query_engine;
803 let stmts = ParserContext::create_with_dialect(
804 &expr.sql,
805 query_ctx.sql_dialect(),
806 ParseOptions::default(),
807 )
808 .map_err(BoxedError::new)
809 .context(ExternalSnafu)?;
810
811 ensure!(
812 stmts.len() == 1,
813 InvalidSqlSnafu {
814 err_msg: format!("Expect only one statement, found {}", stmts.len())
815 }
816 );
817 let stmt = &stmts[0];
818
819 if is_tql(query_ctx.sql_dialect(), &expr.sql)
820 .map_err(BoxedError::new)
821 .context(ExternalSnafu)?
822 {
823 return Ok(FlowType::Batching);
824 }
825
826 let plan = match stmt {
828 Statement::Tql(_) => return Ok(FlowType::Batching),
830 _ => engine
831 .planner()
832 .plan(&QueryStatement::Sql(stmt.clone()), query_ctx)
833 .await
834 .map_err(BoxedError::new)
835 .context(ExternalSnafu)?,
836 };
837
838 struct FindAggr {
840 is_aggr: bool,
841 }
842
843 impl TreeNodeVisitor<'_> for FindAggr {
844 type Node = LogicalPlan;
845 fn f_down(
846 &mut self,
847 node: &Self::Node,
848 ) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
849 {
850 match node {
851 LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
852 self.is_aggr = true;
853 return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
854 }
855 _ => (),
856 }
857 Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
858 }
859 }
860
861 let mut find_aggr = FindAggr { is_aggr: false };
862
863 plan.visit_with_subqueries(&mut find_aggr)
864 .context(BuildDfLogicalPlanSnafu)?;
865 if find_aggr.is_aggr {
866 Ok(FlowType::Batching)
867 } else {
868 Ok(FlowType::Streaming)
869 }
870 }
871
872 #[tracing::instrument(skip_all)]
873 pub async fn create_view(
874 &self,
875 create_view: CreateView,
876 ctx: QueryContextRef,
877 ) -> Result<TableRef> {
878 let logical_plan = match &*create_view.query {
880 Statement::Query(query) => {
881 self.plan(
882 &QueryStatement::Sql(Statement::Query(query.clone())),
883 ctx.clone(),
884 )
885 .await?
886 }
887 Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?,
888 _ => {
889 return InvalidViewStmtSnafu {}.fail();
890 }
891 };
892 let definition = create_view.to_string();
894
895 let schema: Schema = logical_plan
898 .schema()
899 .clone()
900 .try_into()
901 .context(ConvertSchemaSnafu)?;
902 let plan_columns: Vec<_> = schema
903 .column_schemas()
904 .iter()
905 .map(|c| c.name.clone())
906 .collect();
907
908 let columns: Vec<_> = create_view
909 .columns
910 .iter()
911 .map(|ident| ident.to_string())
912 .collect();
913
914 if !columns.is_empty() {
916 ensure!(
917 columns.len() == plan_columns.len(),
918 error::ViewColumnsMismatchSnafu {
919 view_name: create_view.name.to_string(),
920 expected: plan_columns.len(),
921 actual: columns.len(),
922 }
923 );
924 }
925
926 let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
929 .context(ExtractTableNamesSnafu)?;
930
931 let table_names = table_names.into_iter().map(|t| t.into()).collect();
932
933 let encoded_plan = DFLogicalSubstraitConvertor
940 .encode(&plan, DefaultSerializer)
941 .context(SubstraitCodecSnafu)?;
942
943 let expr = expr_helper::to_create_view_expr(
944 create_view,
945 encoded_plan.to_vec(),
946 table_names,
947 columns,
948 plan_columns,
949 definition,
950 ctx.clone(),
951 )?;
952
953 self.create_view_by_expr(expr, ctx).await
955 }
956
957 pub async fn create_view_by_expr(
958 &self,
959 expr: CreateViewExpr,
960 ctx: QueryContextRef,
961 ) -> Result<TableRef> {
962 ensure! {
963 !(expr.create_if_not_exists & expr.or_replace),
964 InvalidSqlSnafu {
965 err_msg: "syntax error Create Or Replace and If Not Exist cannot be used together",
966 }
967 };
968 let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
969
970 let schema_exists = self
971 .table_metadata_manager
972 .schema_manager()
973 .exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name))
974 .await
975 .context(TableMetadataManagerSnafu)?;
976
977 ensure!(
978 schema_exists,
979 SchemaNotFoundSnafu {
980 schema_info: &expr.schema_name,
981 }
982 );
983
984 if let Some(table) = self
986 .catalog_manager
987 .table(
988 &expr.catalog_name,
989 &expr.schema_name,
990 &expr.view_name,
991 Some(&ctx),
992 )
993 .await
994 .context(CatalogSnafu)?
995 {
996 let table_type = table.table_info().table_type;
997
998 match (table_type, expr.create_if_not_exists, expr.or_replace) {
999 (TableType::View, true, false) => {
1000 return Ok(table);
1001 }
1002 (TableType::View, false, false) => {
1003 return ViewAlreadyExistsSnafu {
1004 name: format_full_table_name(
1005 &expr.catalog_name,
1006 &expr.schema_name,
1007 &expr.view_name,
1008 ),
1009 }
1010 .fail();
1011 }
1012 (TableType::View, _, true) => {
1013 }
1015 _ => {
1016 return TableAlreadyExistsSnafu {
1017 table: format_full_table_name(
1018 &expr.catalog_name,
1019 &expr.schema_name,
1020 &expr.view_name,
1021 ),
1022 }
1023 .fail();
1024 }
1025 }
1026 }
1027
1028 ensure!(
1029 NAME_PATTERN_REG.is_match(&expr.view_name),
1030 InvalidViewNameSnafu {
1031 name: expr.view_name.clone(),
1032 }
1033 );
1034
1035 let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
1036
1037 let mut view_info = TableInfo {
1038 ident: metadata::TableIdent {
1039 table_id: 0,
1041 version: 0,
1042 },
1043 name: expr.view_name.clone(),
1044 desc: None,
1045 catalog_name: expr.catalog_name.clone(),
1046 schema_name: expr.schema_name.clone(),
1047 meta: TableMeta::empty(),
1049 table_type: TableType::View,
1050 };
1051
1052 let request = SubmitDdlTaskRequest::new(
1053 to_meta_query_context(ctx),
1054 DdlTask::new_create_view(expr, view_info.clone()),
1055 );
1056
1057 let resp = self
1058 .procedure_executor
1059 .submit_ddl_task(&ExecutorContext::default(), request)
1060 .await
1061 .context(error::ExecuteDdlSnafu)?;
1062
1063 debug!(
1064 "Submit creating view '{view_name}' task response: {:?}",
1065 resp
1066 );
1067
1068 let view_id = resp
1069 .table_ids
1070 .into_iter()
1071 .next()
1072 .context(error::UnexpectedSnafu {
1073 violated: "expected table_id",
1074 })?;
1075 info!("Successfully created view '{view_name}' with view id {view_id}");
1076
1077 view_info.ident.table_id = view_id;
1078
1079 let view_info = Arc::new(view_info);
1080
1081 let table = DistTable::table(view_info);
1082
1083 self.cache_invalidator
1085 .invalidate(
1086 &Context::default(),
1087 &[
1088 CacheIdent::TableId(view_id),
1089 CacheIdent::TableName(view_name.clone()),
1090 ],
1091 )
1092 .await
1093 .context(error::InvalidateTableCacheSnafu)?;
1094
1095 Ok(table)
1096 }
1097
1098 #[tracing::instrument(skip_all)]
1099 pub async fn drop_flow(
1100 &self,
1101 catalog_name: String,
1102 flow_name: String,
1103 drop_if_exists: bool,
1104 query_context: QueryContextRef,
1105 ) -> Result<Output> {
1106 if let Some(flow) = self
1107 .flow_metadata_manager
1108 .flow_name_manager()
1109 .get(&catalog_name, &flow_name)
1110 .await
1111 .context(error::TableMetadataManagerSnafu)?
1112 {
1113 let flow_id = flow.flow_id();
1114 let task = DropFlowTask {
1115 catalog_name,
1116 flow_name,
1117 flow_id,
1118 drop_if_exists,
1119 };
1120 self.drop_flow_procedure(task, query_context).await?;
1121
1122 Ok(Output::new_with_affected_rows(0))
1123 } else if drop_if_exists {
1124 Ok(Output::new_with_affected_rows(0))
1125 } else {
1126 FlowNotFoundSnafu {
1127 flow_name: format_full_flow_name(&catalog_name, &flow_name),
1128 }
1129 .fail()
1130 }
1131 }
1132
1133 async fn drop_flow_procedure(
1134 &self,
1135 expr: DropFlowTask,
1136 query_context: QueryContextRef,
1137 ) -> Result<SubmitDdlTaskResponse> {
1138 let request = SubmitDdlTaskRequest::new(
1139 to_meta_query_context(query_context),
1140 DdlTask::new_drop_flow(expr),
1141 );
1142
1143 self.procedure_executor
1144 .submit_ddl_task(&ExecutorContext::default(), request)
1145 .await
1146 .context(error::ExecuteDdlSnafu)
1147 }
1148
1149 #[cfg(feature = "enterprise")]
1150 #[tracing::instrument(skip_all)]
1151 pub(super) async fn drop_trigger(
1152 &self,
1153 catalog_name: String,
1154 trigger_name: String,
1155 drop_if_exists: bool,
1156 query_context: QueryContextRef,
1157 ) -> Result<Output> {
1158 let task = DropTriggerTask {
1159 catalog_name,
1160 trigger_name,
1161 drop_if_exists,
1162 };
1163 self.drop_trigger_procedure(task, query_context).await?;
1164 Ok(Output::new_with_affected_rows(0))
1165 }
1166
1167 #[cfg(feature = "enterprise")]
1168 async fn drop_trigger_procedure(
1169 &self,
1170 expr: DropTriggerTask,
1171 query_context: QueryContextRef,
1172 ) -> Result<SubmitDdlTaskResponse> {
1173 let request = SubmitDdlTaskRequest::new(
1174 to_meta_query_context(query_context),
1175 DdlTask::new_drop_trigger(expr),
1176 );
1177
1178 self.procedure_executor
1179 .submit_ddl_task(&ExecutorContext::default(), request)
1180 .await
1181 .context(error::ExecuteDdlSnafu)
1182 }
1183
1184 #[tracing::instrument(skip_all)]
1186 pub(crate) async fn drop_view(
1187 &self,
1188 catalog: String,
1189 schema: String,
1190 view: String,
1191 drop_if_exists: bool,
1192 query_context: QueryContextRef,
1193 ) -> Result<Output> {
1194 let view_info = if let Some(view) = self
1195 .catalog_manager
1196 .table(&catalog, &schema, &view, None)
1197 .await
1198 .context(CatalogSnafu)?
1199 {
1200 view.table_info()
1201 } else if drop_if_exists {
1202 return Ok(Output::new_with_affected_rows(0));
1204 } else {
1205 return TableNotFoundSnafu {
1206 table_name: format_full_table_name(&catalog, &schema, &view),
1207 }
1208 .fail();
1209 };
1210
1211 ensure!(
1213 view_info.table_type == TableType::View,
1214 error::InvalidViewSnafu {
1215 msg: "not a view",
1216 view_name: format_full_table_name(&catalog, &schema, &view),
1217 }
1218 );
1219
1220 let view_id = view_info.table_id();
1221
1222 let task = DropViewTask {
1223 catalog,
1224 schema,
1225 view,
1226 view_id,
1227 drop_if_exists,
1228 };
1229
1230 self.drop_view_procedure(task, query_context).await?;
1231
1232 Ok(Output::new_with_affected_rows(0))
1233 }
1234
1235 async fn drop_view_procedure(
1237 &self,
1238 expr: DropViewTask,
1239 query_context: QueryContextRef,
1240 ) -> Result<SubmitDdlTaskResponse> {
1241 let request = SubmitDdlTaskRequest::new(
1242 to_meta_query_context(query_context),
1243 DdlTask::new_drop_view(expr),
1244 );
1245
1246 self.procedure_executor
1247 .submit_ddl_task(&ExecutorContext::default(), request)
1248 .await
1249 .context(error::ExecuteDdlSnafu)
1250 }
1251
1252 #[tracing::instrument(skip_all)]
1253 pub async fn alter_logical_tables(
1254 &self,
1255 alter_table_exprs: Vec<AlterTableExpr>,
1256 query_context: QueryContextRef,
1257 ) -> Result<Output> {
1258 let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
1259 ensure!(
1260 !alter_table_exprs.is_empty(),
1261 EmptyDdlExprSnafu {
1262 name: "alter logical tables"
1263 }
1264 );
1265
1266 let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
1268 for expr in alter_table_exprs {
1269 let catalog = if expr.catalog_name.is_empty() {
1271 query_context.current_catalog()
1272 } else {
1273 &expr.catalog_name
1274 };
1275 let schema = if expr.schema_name.is_empty() {
1276 query_context.current_schema()
1277 } else {
1278 expr.schema_name.clone()
1279 };
1280 let table_name = &expr.table_name;
1281 let table = self
1282 .catalog_manager
1283 .table(catalog, &schema, table_name, Some(&query_context))
1284 .await
1285 .context(CatalogSnafu)?
1286 .with_context(|| TableNotFoundSnafu {
1287 table_name: format_full_table_name(catalog, &schema, table_name),
1288 })?;
1289 let table_id = table.table_info().ident.table_id;
1290 let physical_table_id = self
1291 .table_metadata_manager
1292 .table_route_manager()
1293 .get_physical_table_id(table_id)
1294 .await
1295 .context(TableMetadataManagerSnafu)?;
1296 groups.entry(physical_table_id).or_default().push(expr);
1297 }
1298
1299 let mut handles = Vec::with_capacity(groups.len());
1301 for (_physical_table_id, exprs) in groups {
1302 let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
1303 handles.push(fut);
1304 }
1305 let _results = futures::future::try_join_all(handles).await?;
1306
1307 Ok(Output::new_with_affected_rows(0))
1308 }
1309
1310 #[tracing::instrument(skip_all)]
1311 pub async fn drop_table(
1312 &self,
1313 table_name: TableName,
1314 drop_if_exists: bool,
1315 query_context: QueryContextRef,
1316 ) -> Result<Output> {
1317 self.drop_tables(&[table_name], drop_if_exists, query_context)
1319 .await
1320 }
1321
1322 #[tracing::instrument(skip_all)]
1323 pub async fn drop_tables(
1324 &self,
1325 table_names: &[TableName],
1326 drop_if_exists: bool,
1327 query_context: QueryContextRef,
1328 ) -> Result<Output> {
1329 let mut tables = Vec::with_capacity(table_names.len());
1330 for table_name in table_names {
1331 ensure!(
1332 !is_readonly_schema(&table_name.schema_name),
1333 SchemaReadOnlySnafu {
1334 name: table_name.schema_name.clone()
1335 }
1336 );
1337
1338 if let Some(table) = self
1339 .catalog_manager
1340 .table(
1341 &table_name.catalog_name,
1342 &table_name.schema_name,
1343 &table_name.table_name,
1344 Some(&query_context),
1345 )
1346 .await
1347 .context(CatalogSnafu)?
1348 {
1349 tables.push(table.table_info().table_id());
1350 } else if drop_if_exists {
1351 continue;
1353 } else {
1354 return TableNotFoundSnafu {
1355 table_name: table_name.to_string(),
1356 }
1357 .fail();
1358 }
1359 }
1360
1361 for (table_name, table_id) in table_names.iter().zip(tables.into_iter()) {
1362 self.drop_table_procedure(table_name, table_id, drop_if_exists, query_context.clone())
1363 .await?;
1364
1365 self.cache_invalidator
1367 .invalidate(
1368 &Context::default(),
1369 &[
1370 CacheIdent::TableId(table_id),
1371 CacheIdent::TableName(table_name.clone()),
1372 ],
1373 )
1374 .await
1375 .context(error::InvalidateTableCacheSnafu)?;
1376 }
1377 Ok(Output::new_with_affected_rows(0))
1378 }
1379
1380 #[tracing::instrument(skip_all)]
1381 pub async fn drop_database(
1382 &self,
1383 catalog: String,
1384 schema: String,
1385 drop_if_exists: bool,
1386 query_context: QueryContextRef,
1387 ) -> Result<Output> {
1388 ensure!(
1389 !is_readonly_schema(&schema),
1390 SchemaReadOnlySnafu { name: schema }
1391 );
1392
1393 if self
1394 .catalog_manager
1395 .schema_exists(&catalog, &schema, None)
1396 .await
1397 .context(CatalogSnafu)?
1398 {
1399 if schema == query_context.current_schema() {
1400 SchemaInUseSnafu { name: schema }.fail()
1401 } else {
1402 self.drop_database_procedure(catalog, schema, drop_if_exists, query_context)
1403 .await?;
1404
1405 Ok(Output::new_with_affected_rows(0))
1406 }
1407 } else if drop_if_exists {
1408 Ok(Output::new_with_affected_rows(0))
1410 } else {
1411 SchemaNotFoundSnafu {
1412 schema_info: schema,
1413 }
1414 .fail()
1415 }
1416 }
1417
1418 #[tracing::instrument(skip_all)]
1419 pub async fn truncate_table(
1420 &self,
1421 table_name: TableName,
1422 time_ranges: Vec<(Timestamp, Timestamp)>,
1423 query_context: QueryContextRef,
1424 ) -> Result<Output> {
1425 ensure!(
1426 !is_readonly_schema(&table_name.schema_name),
1427 SchemaReadOnlySnafu {
1428 name: table_name.schema_name.clone()
1429 }
1430 );
1431
1432 let table = self
1433 .catalog_manager
1434 .table(
1435 &table_name.catalog_name,
1436 &table_name.schema_name,
1437 &table_name.table_name,
1438 Some(&query_context),
1439 )
1440 .await
1441 .context(CatalogSnafu)?
1442 .with_context(|| TableNotFoundSnafu {
1443 table_name: table_name.to_string(),
1444 })?;
1445 let table_id = table.table_info().table_id();
1446 self.truncate_table_procedure(&table_name, table_id, time_ranges, query_context)
1447 .await?;
1448
1449 Ok(Output::new_with_affected_rows(0))
1450 }
1451
1452 #[tracing::instrument(skip_all)]
1453 pub async fn alter_table(
1454 &self,
1455 alter_table: AlterTable,
1456 query_context: QueryContextRef,
1457 ) -> Result<Output> {
1458 if matches!(
1459 alter_table.alter_operation(),
1460 AlterTableOperation::Repartition { .. } | AlterTableOperation::Partition { .. }
1461 ) {
1462 let request = expr_helper::to_repartition_request(alter_table, &query_context)?;
1463 return self.repartition_table(request, &query_context).await;
1464 }
1465
1466 let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
1467 self.alter_table_inner(expr, query_context).await
1468 }
1469
1470 #[tracing::instrument(skip_all)]
1471 pub async fn repartition_table(
1472 &self,
1473 request: RepartitionRequest,
1474 query_context: &QueryContextRef,
1475 ) -> Result<Output> {
1476 ensure!(
1478 !is_readonly_schema(&request.schema_name),
1479 SchemaReadOnlySnafu {
1480 name: request.schema_name.clone()
1481 }
1482 );
1483
1484 let table_ref = TableReference::full(
1485 &request.catalog_name,
1486 &request.schema_name,
1487 &request.table_name,
1488 );
1489 let table = self
1491 .catalog_manager
1492 .table(
1493 &request.catalog_name,
1494 &request.schema_name,
1495 &request.table_name,
1496 Some(query_context),
1497 )
1498 .await
1499 .context(CatalogSnafu)?
1500 .with_context(|| TableNotFoundSnafu {
1501 table_name: table_ref.to_string(),
1502 })?;
1503 let table_id = table.table_info().ident.table_id;
1504 let (physical_table_id, physical_table_route) = self
1506 .table_metadata_manager
1507 .table_route_manager()
1508 .get_physical_table_route(table_id)
1509 .await
1510 .context(TableMetadataManagerSnafu)?;
1511
1512 ensure!(
1513 physical_table_id == table_id,
1514 NotSupportedSnafu {
1515 feat: "REPARTITION on logical tables"
1516 }
1517 );
1518
1519 let table_info = table.table_info();
1520 let existing_partition_columns = table_info.meta.partition_columns().collect::<Vec<_>>();
1521 let column_schemas = table_info.meta.schema.column_schemas();
1522 let target_partition_columns = match &request.source {
1528 RepartitionSource::Partitions {
1529 target_partition_columns,
1530 ..
1531 } => {
1532 ensure!(
1533 !existing_partition_columns.is_empty(),
1534 InvalidPartitionRuleSnafu {
1535 reason: format!(
1536 "table {} does not have partition columns, cannot repartition",
1537 table_ref
1538 )
1539 }
1540 );
1541
1542 if let Some(target_partition_columns) = target_partition_columns {
1543 ensure!(
1544 !target_partition_columns.is_empty(),
1545 InvalidPartitionRuleSnafu {
1546 reason: "ON COLUMNS requires at least one partition column"
1547 }
1548 );
1549 validate_and_collect_partition_columns(
1550 target_partition_columns,
1551 column_schemas,
1552 )?
1553 } else {
1554 existing_partition_columns.clone()
1555 }
1556 }
1557 RepartitionSource::Unpartitioned { partition_columns } => {
1558 ensure!(
1559 !partition_columns.is_empty(),
1560 InvalidPartitionRuleSnafu {
1561 reason: "PARTITION ON COLUMNS requires at least one partition column"
1562 }
1563 );
1564 ensure!(
1565 existing_partition_columns.is_empty(),
1566 InvalidPartitionRuleSnafu {
1567 reason: format!("table {} already has partition columns", table_ref)
1568 }
1569 );
1570 partition_columns
1571 .iter()
1572 .map(|column_name| {
1573 column_schemas
1574 .iter()
1575 .find(|column| &column.name == column_name)
1576 .with_context(|| ColumnNotFoundSnafu { msg: column_name })
1577 })
1578 .collect::<Result<Vec<_>>>()?
1579 }
1580 };
1581
1582 let from_column_name_and_type = column_name_and_type(&existing_partition_columns);
1583 let target_column_name_and_type = column_name_and_type(&target_partition_columns);
1584 let target_partition_column_names = target_partition_columns
1585 .iter()
1586 .map(|column| column.name.clone())
1587 .collect::<Vec<_>>();
1588 let timezone = query_context.timezone();
1589 let from_partition_exprs = match &request.source {
1591 RepartitionSource::Partitions { from_exprs, .. } => from_exprs
1592 .iter()
1593 .map(|expr| convert_one_expr(expr, &from_column_name_and_type, &timezone))
1594 .collect::<Result<Vec<_>>>()?,
1595 RepartitionSource::Unpartitioned { .. } => vec![],
1596 };
1597
1598 let mut into_partition_exprs = request
1599 .into_exprs
1600 .iter()
1601 .map(|expr| convert_one_expr(expr, &target_column_name_and_type, &timezone))
1602 .collect::<Result<Vec<_>>>()?;
1603
1604 if matches!(&request.source, RepartitionSource::Partitions { .. })
1607 && from_partition_exprs.len() > 1
1608 && into_partition_exprs.len() == 1
1609 && let Some(expr) = into_partition_exprs.pop()
1610 {
1611 into_partition_exprs.push(partition::simplify::simplify_merged_partition_expr(expr));
1612 }
1613
1614 let mut existing_partition_exprs =
1616 Vec::with_capacity(physical_table_route.region_routes.len());
1617 for route in &physical_table_route.region_routes {
1618 let expr_json = route.region.partition_expr();
1619 if !expr_json.is_empty() {
1620 match PartitionExpr::from_json_str(&expr_json) {
1621 Ok(Some(expr)) => existing_partition_exprs.push(expr),
1622 Ok(None) => {
1623 }
1625 Err(e) => {
1626 return Err(e).context(DeserializePartitionExprSnafu);
1627 }
1628 }
1629 }
1630 }
1631
1632 if matches!(&request.source, RepartitionSource::Partitions { .. }) {
1635 for from_expr in &from_partition_exprs {
1636 ensure!(
1637 existing_partition_exprs.contains(from_expr),
1638 InvalidPartitionRuleSnafu {
1639 reason: format!(
1640 "partition expression '{}' does not exist in table {}",
1641 from_expr, table_ref
1642 )
1643 }
1644 );
1645 }
1646 }
1647
1648 let new_partition_exprs: Vec<PartitionExpr> = match &request.source {
1651 RepartitionSource::Partitions { .. } => existing_partition_exprs
1652 .into_iter()
1653 .filter(|expr| !from_partition_exprs.contains(expr))
1654 .chain(into_partition_exprs.clone().into_iter())
1655 .collect(),
1656 RepartitionSource::Unpartitioned { .. } => into_partition_exprs.clone(),
1657 };
1658 ensure_partition_expr_columns_in_target(
1659 &new_partition_exprs,
1660 &target_partition_column_names.iter().collect(),
1661 )?;
1662 let new_partition_exprs_len = new_partition_exprs.len();
1663 let from_partition_exprs_len = from_partition_exprs.len();
1664
1665 let _ = MultiDimPartitionRule::try_new(
1667 target_partition_column_names,
1668 vec![],
1669 new_partition_exprs,
1670 true,
1671 )
1672 .context(InvalidPartitionSnafu)?;
1673
1674 let ddl_options = parse_ddl_options(&request.options)?;
1675 let serialize_exprs = |exprs: Vec<PartitionExpr>| -> Result<Vec<String>> {
1676 let mut json_exprs = Vec::with_capacity(exprs.len());
1677 for expr in exprs {
1678 json_exprs.push(expr.as_json_str().context(SerializePartitionExprSnafu)?);
1679 }
1680 Ok(json_exprs)
1681 };
1682 let from_partition_exprs_json = serialize_exprs(from_partition_exprs)?;
1683 let into_partition_exprs_json = serialize_exprs(into_partition_exprs)?;
1684 let source = match &request.source {
1685 RepartitionSource::Partitions {
1686 target_partition_columns,
1687 ..
1688 } => Source::PartitionExprs(PartitionedSource {
1689 exprs: from_partition_exprs_json,
1690 target_partition_columns: target_partition_columns
1691 .clone()
1692 .map(|columns| TargetPartitionColumns { columns }),
1693 }),
1694 RepartitionSource::Unpartitioned { partition_columns } => {
1695 Source::Unpartitioned(UnpartitionedSource {
1696 partition_columns: partition_columns.clone(),
1697 })
1698 }
1699 };
1700 let repartition = Repartition {
1701 into_partition_exprs: into_partition_exprs_json,
1702 source: Some(source),
1703 ..Default::default()
1704 };
1705 let mut req = SubmitDdlTaskRequest::new(
1706 to_meta_query_context(query_context.clone()),
1707 DdlTask::new_alter_table(AlterTableExpr {
1708 catalog_name: request.catalog_name.clone(),
1709 schema_name: request.schema_name.clone(),
1710 table_name: request.table_name.clone(),
1711 kind: Some(Kind::Repartition(repartition)),
1712 }),
1713 );
1714 req.wait = ddl_options.wait;
1715 req.timeout = ddl_options.timeout;
1716
1717 info!(
1718 "Submitting repartition task for table {} (table_id={}), from {} to {} partitions, timeout: {:?}, wait: {}",
1719 table_ref,
1720 table_id,
1721 from_partition_exprs_len,
1722 new_partition_exprs_len,
1723 ddl_options.timeout,
1724 ddl_options.wait
1725 );
1726
1727 let response = self
1728 .procedure_executor
1729 .submit_ddl_task(&ExecutorContext::default(), req)
1730 .await
1731 .context(error::ExecuteDdlSnafu)?;
1732
1733 if !ddl_options.wait {
1734 return build_procedure_id_output(response.key);
1735 }
1736
1737 let invalidate_keys = vec![
1739 CacheIdent::TableId(table_id),
1740 CacheIdent::TableName(TableName::new(
1741 request.catalog_name,
1742 request.schema_name,
1743 request.table_name,
1744 )),
1745 ];
1746
1747 self.cache_invalidator
1749 .invalidate(&Context::default(), &invalidate_keys)
1750 .await
1751 .context(error::InvalidateTableCacheSnafu)?;
1752
1753 Ok(Output::new_with_affected_rows(0))
1754 }
1755
1756 #[tracing::instrument(skip_all)]
1757 pub async fn alter_table_inner(
1758 &self,
1759 expr: AlterTableExpr,
1760 query_context: QueryContextRef,
1761 ) -> Result<Output> {
1762 ensure!(
1763 !is_readonly_schema(&expr.schema_name),
1764 SchemaReadOnlySnafu {
1765 name: expr.schema_name.clone()
1766 }
1767 );
1768
1769 let catalog_name = if expr.catalog_name.is_empty() {
1770 DEFAULT_CATALOG_NAME.to_string()
1771 } else {
1772 expr.catalog_name.clone()
1773 };
1774
1775 let schema_name = if expr.schema_name.is_empty() {
1776 DEFAULT_SCHEMA_NAME.to_string()
1777 } else {
1778 expr.schema_name.clone()
1779 };
1780
1781 let table_name = expr.table_name.clone();
1782
1783 let table = self
1784 .catalog_manager
1785 .table(
1786 &catalog_name,
1787 &schema_name,
1788 &table_name,
1789 Some(&query_context),
1790 )
1791 .await
1792 .context(CatalogSnafu)?
1793 .with_context(|| TableNotFoundSnafu {
1794 table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
1795 })?;
1796
1797 let table_id = table.table_info().ident.table_id;
1798 let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?;
1799 if !need_alter {
1800 return Ok(Output::new_with_affected_rows(0));
1801 }
1802 info!(
1803 "Table info before alter is {:?}, expr: {:?}",
1804 table.table_info(),
1805 expr
1806 );
1807
1808 let physical_table_id = self
1809 .table_metadata_manager
1810 .table_route_manager()
1811 .get_physical_table_id(table_id)
1812 .await
1813 .context(TableMetadataManagerSnafu)?;
1814
1815 let (req, invalidate_keys) = if physical_table_id == table_id {
1816 let req = SubmitDdlTaskRequest::new(
1818 to_meta_query_context(query_context),
1819 DdlTask::new_alter_table(expr),
1820 );
1821
1822 let invalidate_keys = vec![
1823 CacheIdent::TableId(table_id),
1824 CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1825 ];
1826
1827 (req, invalidate_keys)
1828 } else {
1829 let req = SubmitDdlTaskRequest::new(
1831 to_meta_query_context(query_context),
1832 DdlTask::new_alter_logical_tables(vec![expr]),
1833 );
1834
1835 let mut invalidate_keys = vec![
1836 CacheIdent::TableId(physical_table_id),
1837 CacheIdent::TableId(table_id),
1838 CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1839 ];
1840
1841 let physical_table = self
1842 .table_metadata_manager
1843 .table_info_manager()
1844 .get(physical_table_id)
1845 .await
1846 .context(TableMetadataManagerSnafu)?
1847 .map(|x| x.into_inner());
1848 if let Some(physical_table) = physical_table {
1849 let physical_table_name = TableName::new(
1850 physical_table.table_info.catalog_name,
1851 physical_table.table_info.schema_name,
1852 physical_table.table_info.name,
1853 );
1854 invalidate_keys.push(CacheIdent::TableName(physical_table_name));
1855 }
1856
1857 (req, invalidate_keys)
1858 };
1859
1860 self.procedure_executor
1861 .submit_ddl_task(&ExecutorContext::default(), req)
1862 .await
1863 .context(error::ExecuteDdlSnafu)?;
1864
1865 self.cache_invalidator
1867 .invalidate(&Context::default(), &invalidate_keys)
1868 .await
1869 .context(error::InvalidateTableCacheSnafu)?;
1870
1871 Ok(Output::new_with_affected_rows(0))
1872 }
1873
1874 #[cfg(feature = "enterprise")]
1875 #[tracing::instrument(skip_all)]
1876 pub async fn alter_trigger(
1877 &self,
1878 _alter_expr: AlterTrigger,
1879 _query_context: QueryContextRef,
1880 ) -> Result<Output> {
1881 crate::error::NotSupportedSnafu {
1882 feat: "alter trigger",
1883 }
1884 .fail()
1885 }
1886
1887 #[tracing::instrument(skip_all)]
1888 pub async fn alter_database(
1889 &self,
1890 alter_expr: AlterDatabase,
1891 query_context: QueryContextRef,
1892 ) -> Result<Output> {
1893 let alter_expr = expr_helper::to_alter_database_expr(alter_expr, &query_context)?;
1894 self.alter_database_inner(alter_expr, query_context).await
1895 }
1896
1897 #[tracing::instrument(skip_all)]
1898 pub async fn alter_database_inner(
1899 &self,
1900 alter_expr: AlterDatabaseExpr,
1901 query_context: QueryContextRef,
1902 ) -> Result<Output> {
1903 ensure!(
1904 !is_readonly_schema(&alter_expr.schema_name),
1905 SchemaReadOnlySnafu {
1906 name: query_context.current_schema().clone()
1907 }
1908 );
1909
1910 let exists = self
1911 .catalog_manager
1912 .schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
1913 .await
1914 .context(CatalogSnafu)?;
1915 ensure!(
1916 exists,
1917 SchemaNotFoundSnafu {
1918 schema_info: alter_expr.schema_name,
1919 }
1920 );
1921
1922 let cache_ident = [CacheIdent::SchemaName(SchemaName {
1923 catalog_name: alter_expr.catalog_name.clone(),
1924 schema_name: alter_expr.schema_name.clone(),
1925 })];
1926
1927 self.alter_database_procedure(alter_expr, query_context)
1928 .await?;
1929
1930 self.cache_invalidator
1932 .invalidate(&Context::default(), &cache_ident)
1933 .await
1934 .context(error::InvalidateTableCacheSnafu)?;
1935
1936 Ok(Output::new_with_affected_rows(0))
1937 }
1938
1939 async fn create_table_procedure(
1940 &self,
1941 create_table: CreateTableExpr,
1942 partitions: Vec<PartitionExpr>,
1943 table_info: TableInfo,
1944 query_context: QueryContextRef,
1945 ) -> Result<SubmitDdlTaskResponse> {
1946 let partitions = partitions
1947 .into_iter()
1948 .map(|expr| expr.as_pb_partition().context(PartitionExprToPbSnafu))
1949 .collect::<Result<Vec<_>>>()?;
1950
1951 let request = SubmitDdlTaskRequest::new(
1952 to_meta_query_context_with_origin_frontend(query_context, &self.origin_frontend_addr),
1953 DdlTask::new_create_table(create_table, partitions, table_info),
1954 );
1955
1956 self.procedure_executor
1957 .submit_ddl_task(&ExecutorContext::default(), request)
1958 .await
1959 .context(error::ExecuteDdlSnafu)
1960 }
1961
1962 async fn create_logical_tables_procedure(
1963 &self,
1964 tables_data: Vec<(CreateTableExpr, TableInfo)>,
1965 query_context: QueryContextRef,
1966 ) -> Result<SubmitDdlTaskResponse> {
1967 let request = SubmitDdlTaskRequest::new(
1968 to_meta_query_context_with_origin_frontend(query_context, &self.origin_frontend_addr),
1969 DdlTask::new_create_logical_tables(tables_data),
1970 );
1971
1972 self.procedure_executor
1973 .submit_ddl_task(&ExecutorContext::default(), request)
1974 .await
1975 .context(error::ExecuteDdlSnafu)
1976 }
1977
1978 async fn alter_logical_tables_procedure(
1979 &self,
1980 tables_data: Vec<AlterTableExpr>,
1981 query_context: QueryContextRef,
1982 ) -> Result<SubmitDdlTaskResponse> {
1983 let request = SubmitDdlTaskRequest::new(
1984 to_meta_query_context(query_context),
1985 DdlTask::new_alter_logical_tables(tables_data),
1986 );
1987
1988 self.procedure_executor
1989 .submit_ddl_task(&ExecutorContext::default(), request)
1990 .await
1991 .context(error::ExecuteDdlSnafu)
1992 }
1993
1994 async fn drop_table_procedure(
1995 &self,
1996 table_name: &TableName,
1997 table_id: TableId,
1998 drop_if_exists: bool,
1999 query_context: QueryContextRef,
2000 ) -> Result<SubmitDdlTaskResponse> {
2001 let request = SubmitDdlTaskRequest::new(
2002 to_meta_query_context(query_context),
2003 DdlTask::new_drop_table(
2004 table_name.catalog_name.clone(),
2005 table_name.schema_name.clone(),
2006 table_name.table_name.clone(),
2007 table_id,
2008 drop_if_exists,
2009 ),
2010 );
2011
2012 self.procedure_executor
2013 .submit_ddl_task(&ExecutorContext::default(), request)
2014 .await
2015 .context(error::ExecuteDdlSnafu)
2016 }
2017
2018 async fn drop_database_procedure(
2019 &self,
2020 catalog: String,
2021 schema: String,
2022 drop_if_exists: bool,
2023 query_context: QueryContextRef,
2024 ) -> Result<SubmitDdlTaskResponse> {
2025 let request = SubmitDdlTaskRequest::new(
2026 to_meta_query_context(query_context),
2027 DdlTask::new_drop_database(catalog, schema, drop_if_exists),
2028 );
2029
2030 self.procedure_executor
2031 .submit_ddl_task(&ExecutorContext::default(), request)
2032 .await
2033 .context(error::ExecuteDdlSnafu)
2034 }
2035
2036 async fn alter_database_procedure(
2037 &self,
2038 alter_expr: AlterDatabaseExpr,
2039 query_context: QueryContextRef,
2040 ) -> Result<SubmitDdlTaskResponse> {
2041 let request = SubmitDdlTaskRequest::new(
2042 to_meta_query_context(query_context),
2043 DdlTask::new_alter_database(alter_expr),
2044 );
2045
2046 self.procedure_executor
2047 .submit_ddl_task(&ExecutorContext::default(), request)
2048 .await
2049 .context(error::ExecuteDdlSnafu)
2050 }
2051
2052 async fn truncate_table_procedure(
2053 &self,
2054 table_name: &TableName,
2055 table_id: TableId,
2056 time_ranges: Vec<(Timestamp, Timestamp)>,
2057 query_context: QueryContextRef,
2058 ) -> Result<SubmitDdlTaskResponse> {
2059 let request = SubmitDdlTaskRequest::new(
2060 to_meta_query_context(query_context),
2061 DdlTask::new_truncate_table(
2062 table_name.catalog_name.clone(),
2063 table_name.schema_name.clone(),
2064 table_name.table_name.clone(),
2065 table_id,
2066 time_ranges,
2067 ),
2068 );
2069
2070 self.procedure_executor
2071 .submit_ddl_task(&ExecutorContext::default(), request)
2072 .await
2073 .context(error::ExecuteDdlSnafu)
2074 }
2075
2076 #[tracing::instrument(skip_all)]
2077 pub async fn create_database(
2078 &self,
2079 database: &str,
2080 create_if_not_exists: bool,
2081 options: HashMap<String, String>,
2082 query_context: QueryContextRef,
2083 ) -> Result<Output> {
2084 let catalog = query_context.current_catalog();
2085 ensure!(
2086 NAME_PATTERN_REG.is_match(catalog),
2087 error::UnexpectedSnafu {
2088 violated: format!("Invalid catalog name: {}", catalog)
2089 }
2090 );
2091
2092 ensure!(
2093 NAME_PATTERN_REG.is_match(database),
2094 error::UnexpectedSnafu {
2095 violated: format!("Invalid database name: {}", database)
2096 }
2097 );
2098
2099 if !self
2100 .catalog_manager
2101 .schema_exists(catalog, database, None)
2102 .await
2103 .context(CatalogSnafu)?
2104 && !self.catalog_manager.is_reserved_schema_name(database)
2105 {
2106 self.create_database_procedure(
2107 catalog.to_string(),
2108 database.to_string(),
2109 create_if_not_exists,
2110 options,
2111 query_context,
2112 )
2113 .await?;
2114
2115 Ok(Output::new_with_affected_rows(1))
2116 } else if create_if_not_exists {
2117 Ok(Output::new_with_affected_rows(1))
2118 } else {
2119 error::SchemaExistsSnafu { name: database }.fail()
2120 }
2121 }
2122
2123 async fn create_database_procedure(
2124 &self,
2125 catalog: String,
2126 database: String,
2127 create_if_not_exists: bool,
2128 options: HashMap<String, String>,
2129 query_context: QueryContextRef,
2130 ) -> Result<SubmitDdlTaskResponse> {
2131 let request = SubmitDdlTaskRequest::new(
2132 to_meta_query_context(query_context),
2133 DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
2134 );
2135
2136 self.procedure_executor
2137 .submit_ddl_task(&ExecutorContext::default(), request)
2138 .await
2139 .context(error::ExecuteDdlSnafu)
2140 }
2141}
2142
2143pub fn parse_partitions(
2145 create_table: &CreateTableExpr,
2146 partitions: Option<Partitions>,
2147 query_ctx: &QueryContextRef,
2148) -> Result<(Vec<PartitionExpr>, Vec<String>)> {
2149 let partition_columns = find_partition_columns(&partitions)?;
2152 let partition_exprs =
2153 find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
2154
2155 let exprs = partition_exprs.clone();
2157 MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true)
2158 .context(InvalidPartitionSnafu)?;
2159
2160 Ok((partition_exprs, partition_columns))
2161}
2162
2163fn parse_partitions_for_logical_validation(
2164 create_table: &CreateTableExpr,
2165 partitions: &Partitions,
2166 query_ctx: &QueryContextRef,
2167) -> Result<(Vec<String>, Vec<PartitionExpr>)> {
2168 let partition_columns = partitions
2169 .column_list
2170 .iter()
2171 .map(|ident| ident.value.clone())
2172 .collect::<Vec<_>>();
2173
2174 let column_name_and_type = partition_columns
2175 .iter()
2176 .map(|pc| {
2177 let column = create_table
2178 .column_defs
2179 .iter()
2180 .find(|c| &c.name == pc)
2181 .context(ColumnNotFoundSnafu { msg: pc.clone() })?;
2182 let column_name = &column.name;
2183 let data_type = ConcreteDataType::from(
2184 ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
2185 .context(ColumnDataTypeSnafu)?,
2186 );
2187 Ok((column_name, data_type))
2188 })
2189 .collect::<Result<HashMap<_, _>>>()?;
2190
2191 let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
2192 for expr in &partitions.exprs {
2193 let partition_expr = convert_one_expr(expr, &column_name_and_type, &query_ctx.timezone())?;
2194 partition_exprs.push(partition_expr);
2195 }
2196
2197 MultiDimPartitionRule::try_new(
2198 partition_columns.clone(),
2199 vec![],
2200 partition_exprs.clone(),
2201 true,
2202 )
2203 .context(InvalidPartitionSnafu)?;
2204
2205 Ok((partition_columns, partition_exprs))
2206}
2207
2208pub fn verify_alter(
2214 table_id: TableId,
2215 table_info: Arc<TableInfo>,
2216 expr: AlterTableExpr,
2217) -> Result<bool> {
2218 let request: AlterTableRequest =
2219 common_grpc_expr::alter_expr_to_request(table_id, expr, Some(&table_info.meta))
2220 .context(AlterExprToRequestSnafu)?;
2221
2222 let AlterTableRequest {
2223 table_name,
2224 alter_kind,
2225 ..
2226 } = &request;
2227
2228 if let AlterKind::RenameTable { new_table_name } = alter_kind {
2229 ensure!(
2230 NAME_PATTERN_REG.is_match(new_table_name),
2231 error::UnexpectedSnafu {
2232 violated: format!("Invalid table name: {}", new_table_name)
2233 }
2234 );
2235 } else if let AlterKind::AddColumns { columns } = alter_kind {
2236 let column_names: HashSet<_> = table_info
2239 .meta
2240 .schema
2241 .column_schemas()
2242 .iter()
2243 .map(|schema| &schema.name)
2244 .collect();
2245 if columns.iter().all(|column| {
2246 column_names.contains(&column.column_schema.name) && column.add_if_not_exists
2247 }) {
2248 return Ok(false);
2249 }
2250 }
2251
2252 let _ = table_info
2253 .meta
2254 .builder_with_alter_kind(table_name, &request.alter_kind)
2255 .context(error::TableSnafu)?
2256 .build()
2257 .context(error::BuildTableMetaSnafu { table_name })?;
2258
2259 Ok(true)
2260}
2261
2262pub fn create_table_info(
2263 create_table: &CreateTableExpr,
2264 partition_columns: Vec<String>,
2265) -> Result<TableInfo> {
2266 let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
2267 let mut column_name_to_index_map = HashMap::new();
2268
2269 for (idx, column) in create_table.column_defs.iter().enumerate() {
2270 let schema =
2271 column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
2272 column: &column.name,
2273 })?;
2274 let schema = schema.with_time_index(column.name == create_table.time_index);
2275
2276 column_schemas.push(schema);
2277 let _ = column_name_to_index_map.insert(column.name.clone(), idx);
2278 }
2279
2280 let next_column_id = column_schemas.len() as u32;
2281 let schema = Arc::new(Schema::new(column_schemas));
2282
2283 let primary_key_indices = create_table
2284 .primary_keys
2285 .iter()
2286 .map(|name| {
2287 column_name_to_index_map
2288 .get(name)
2289 .cloned()
2290 .context(ColumnNotFoundSnafu { msg: name })
2291 })
2292 .collect::<Result<Vec<_>>>()?;
2293
2294 let partition_key_indices = partition_columns
2295 .into_iter()
2296 .map(|col_name| {
2297 column_name_to_index_map
2298 .get(&col_name)
2299 .cloned()
2300 .context(ColumnNotFoundSnafu { msg: col_name })
2301 })
2302 .collect::<Result<Vec<_>>>()?;
2303
2304 let mut table_options = TableOptions::try_from_iter(&create_table.table_options)
2305 .context(UnrecognizedTableOptionSnafu)?;
2306
2307 validate_repartition_column_hint(
2308 &mut table_options,
2309 &column_name_to_index_map,
2310 &partition_key_indices,
2311 &create_table.time_index,
2312 )?;
2313
2314 let meta = TableMeta {
2315 schema,
2316 primary_key_indices,
2317 value_indices: vec![],
2318 engine: create_table.engine.clone(),
2319 next_column_id,
2320 options: table_options,
2321 created_on: Utc::now(),
2322 updated_on: Utc::now(),
2323 partition_key_indices,
2324 column_ids: vec![],
2325 };
2326
2327 let desc = if create_table.desc.is_empty() {
2328 create_table.table_options.get(COMMENT_KEY).cloned()
2329 } else {
2330 Some(create_table.desc.clone())
2331 };
2332
2333 let table_info = TableInfo {
2334 ident: metadata::TableIdent {
2335 table_id: 0,
2337 version: 0,
2338 },
2339 name: create_table.table_name.clone(),
2340 desc,
2341 catalog_name: create_table.catalog_name.clone(),
2342 schema_name: create_table.schema_name.clone(),
2343 meta,
2344 table_type: TableType::Base,
2345 };
2346 Ok(table_info)
2347}
2348
2349fn validate_repartition_column_hint(
2350 table_options: &mut TableOptions,
2351 column_name_to_index_map: &HashMap<String, usize>,
2352 partition_key_indices: &[usize],
2353 time_index: &str,
2354) -> Result<()> {
2355 let Some(column_name) = table_options
2356 .extra_options
2357 .get(REPARTITION_COLUMN_HINT_KEY)
2358 .map(|value| value.trim().to_string())
2359 else {
2360 return Ok(());
2361 };
2362
2363 ensure!(
2364 !column_name.is_empty(),
2365 InvalidPartitionRuleSnafu {
2366 reason: format!("{REPARTITION_COLUMN_HINT_KEY} expects exactly one column name"),
2367 }
2368 );
2369
2370 ensure!(
2371 !column_name.contains(','),
2372 InvalidPartitionRuleSnafu {
2373 reason: format!("{REPARTITION_COLUMN_HINT_KEY} expects exactly one column name"),
2374 }
2375 );
2376
2377 ensure!(
2378 partition_key_indices.is_empty(),
2379 InvalidPartitionRuleSnafu {
2380 reason: format!(
2381 "cannot set {REPARTITION_COLUMN_HINT_KEY} on a table with partition metadata"
2382 ),
2383 }
2384 );
2385
2386 column_name_to_index_map
2387 .get(&column_name)
2388 .context(ColumnNotFoundSnafu { msg: &column_name })?;
2389
2390 ensure!(
2391 column_name != time_index,
2392 InvalidPartitionRuleSnafu {
2393 reason: format!("cannot set {REPARTITION_COLUMN_HINT_KEY} to the time index column"),
2394 }
2395 );
2396
2397 table_options
2398 .extra_options
2399 .insert(REPARTITION_COLUMN_HINT_KEY.to_string(), column_name);
2400
2401 Ok(())
2402}
2403
2404fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
2405 let columns = if let Some(partitions) = partitions {
2406 partitions
2407 .column_list
2408 .iter()
2409 .map(|x| x.value.clone())
2410 .collect::<Vec<_>>()
2411 } else {
2412 vec![]
2413 };
2414 Ok(columns)
2415}
2416
2417fn find_partition_entries(
2421 create_table: &CreateTableExpr,
2422 partitions: &Option<Partitions>,
2423 partition_columns: &[String],
2424 query_ctx: &QueryContextRef,
2425) -> Result<Vec<PartitionExpr>> {
2426 let Some(partitions) = partitions else {
2427 return Ok(vec![]);
2428 };
2429
2430 let column_name_and_type = partition_columns
2432 .iter()
2433 .map(|pc| {
2434 let column = create_table
2435 .column_defs
2436 .iter()
2437 .find(|c| &c.name == pc)
2438 .unwrap();
2440 let column_name = &column.name;
2441 let data_type = ConcreteDataType::from(
2442 ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
2443 .context(ColumnDataTypeSnafu)?,
2444 );
2445 Ok((column_name, data_type))
2446 })
2447 .collect::<Result<HashMap<_, _>>>()?;
2448
2449 let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
2451 for partition in &partitions.exprs {
2452 let partition_expr =
2453 convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
2454 partition_exprs.push(partition_expr);
2455 }
2456
2457 Ok(partition_exprs)
2458}
2459
2460fn column_name_and_type<'a>(
2461 partition_columns: &'a [&'a ColumnSchema],
2462) -> HashMap<&'a String, ConcreteDataType> {
2463 partition_columns
2464 .iter()
2465 .map(|column| (&column.name, column.data_type.clone()))
2466 .collect()
2467}
2468
2469fn validate_and_collect_partition_columns<'a>(
2470 column_names: &[String],
2471 column_schemas: &'a [ColumnSchema],
2472) -> Result<Vec<&'a ColumnSchema>> {
2473 let mut seen = HashSet::with_capacity(column_names.len());
2474 column_names
2475 .iter()
2476 .map(|column_name| {
2477 ensure!(
2478 seen.insert(column_name),
2479 InvalidPartitionRuleSnafu {
2480 reason: format!("duplicate partition column '{}'", column_name)
2481 }
2482 );
2483 column_schemas
2484 .iter()
2485 .find(|column| &column.name == column_name)
2486 .with_context(|| ColumnNotFoundSnafu { msg: column_name })
2487 })
2488 .collect()
2489}
2490
2491fn ensure_partition_expr_columns_in_target(
2492 partition_exprs: &[PartitionExpr],
2493 target_partition_columns: &HashSet<&String>,
2494) -> Result<()> {
2495 for expr in partition_exprs {
2496 ensure_partition_operand_columns_in_target(&expr.lhs, target_partition_columns)?;
2497 ensure_partition_operand_columns_in_target(&expr.rhs, target_partition_columns)?;
2498 }
2499
2500 Ok(())
2501}
2502
2503fn ensure_partition_operand_columns_in_target(
2504 operand: &Operand,
2505 target_partition_columns: &HashSet<&String>,
2506) -> Result<()> {
2507 match operand {
2508 Operand::Column(column) => ensure!(
2509 target_partition_columns.contains(column),
2510 InvalidPartitionRuleSnafu {
2511 reason: format!(
2512 "partition expression references column '{}' that is not in target partition columns",
2513 column
2514 )
2515 }
2516 ),
2517 Operand::Expr(expr) => {
2518 ensure_partition_operand_columns_in_target(&expr.lhs, target_partition_columns)?;
2519 ensure_partition_operand_columns_in_target(&expr.rhs, target_partition_columns)?;
2520 }
2521 Operand::Value(_) => {}
2522 }
2523
2524 Ok(())
2525}
2526
2527fn convert_one_expr(
2528 expr: &Expr,
2529 column_name_and_type: &HashMap<&String, ConcreteDataType>,
2530 timezone: &Timezone,
2531) -> Result<PartitionExpr> {
2532 let Expr::BinaryOp { left, op, right } = expr else {
2533 return InvalidPartitionRuleSnafu {
2534 reason: "partition rule must be a binary expression",
2535 }
2536 .fail();
2537 };
2538
2539 let op =
2540 RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
2541 reason: format!("unsupported operator in partition expr {op}"),
2542 })?;
2543
2544 let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
2546 (Expr::Identifier(ident), Expr::Value(value)) => {
2548 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2549 let value = convert_value(&value.value, data_type, timezone, None)?;
2550 (Operand::Column(column_name), op, Operand::Value(value))
2551 }
2552 (Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
2553 if let Expr::Value(v) = &**expr =>
2554 {
2555 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2556 let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
2557 (Operand::Column(column_name), op, Operand::Value(value))
2558 }
2559 (Expr::Value(value), Expr::Identifier(ident)) => {
2561 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2562 let value = convert_value(&value.value, data_type, timezone, None)?;
2563 (Operand::Value(value), op, Operand::Column(column_name))
2564 }
2565 (Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
2566 if let Expr::Value(v) = &**expr =>
2567 {
2568 let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2569 let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
2570 (Operand::Value(value), op, Operand::Column(column_name))
2571 }
2572 (Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
2573 let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
2575 let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
2576 (Operand::Expr(lhs), op, Operand::Expr(rhs))
2577 }
2578 _ => {
2579 return InvalidPartitionRuleSnafu {
2580 reason: format!("invalid partition expr {expr}"),
2581 }
2582 .fail();
2583 }
2584 };
2585
2586 Ok(PartitionExpr::new(lhs, op, rhs))
2587}
2588
2589fn convert_identifier(
2590 ident: &Ident,
2591 column_name_and_type: &HashMap<&String, ConcreteDataType>,
2592) -> Result<(String, ConcreteDataType)> {
2593 let column_name = ident.value.clone();
2594 let data_type = column_name_and_type
2595 .get(&column_name)
2596 .cloned()
2597 .with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
2598 Ok((column_name, data_type))
2599}
2600
2601fn convert_value(
2602 value: &ParserValue,
2603 data_type: ConcreteDataType,
2604 timezone: &Timezone,
2605 unary_op: Option<UnaryOperator>,
2606) -> Result<Value> {
2607 sql_value_to_value(
2608 &ColumnSchema::new("<NONAME>", data_type, true),
2609 value,
2610 Some(timezone),
2611 unary_op,
2612 false,
2613 )
2614 .context(error::SqlCommonSnafu)
2615}
2616
2617#[cfg(test)]
2618mod test {
2619 use std::time::Duration;
2620
2621 use session::context::{QueryContext, QueryContextBuilder};
2622 use sql::dialect::GreptimeDbDialect;
2623 use sql::parser::{ParseOptions, ParserContext};
2624 use sql::statements::statement::Statement;
2625 use sqlparser::parser::Parser;
2626
2627 use super::*;
2628 use crate::expr_helper;
2629
2630 #[test]
2631 fn test_parse_ddl_options() {
2632 let options = OptionMap::from([
2633 ("timeout".to_string(), "5m".to_string()),
2634 ("wait".to_string(), "false".to_string()),
2635 ]);
2636 let ddl_options = parse_ddl_options(&options).unwrap();
2637 assert!(!ddl_options.wait);
2638 assert_eq!(Duration::from_secs(300), ddl_options.timeout);
2639 }
2640
2641 #[test]
2642 fn test_validate_and_normalize_flow_options_empty() {
2643 assert!(
2644 validate_and_normalize_flow_options(HashMap::new())
2645 .unwrap()
2646 .is_empty()
2647 );
2648 }
2649
2650 #[test]
2651 fn test_validate_and_normalize_flow_options_valid() {
2652 let options = HashMap::from([
2653 (DEFER_ON_MISSING_SOURCE_KEY.to_string(), "TRUE".to_string()),
2654 (
2655 FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY.to_string(),
2656 "FALSE".to_string(),
2657 ),
2658 ]);
2659
2660 assert_eq!(
2661 validate_and_normalize_flow_options(options).unwrap(),
2662 HashMap::from([
2663 (DEFER_ON_MISSING_SOURCE_KEY.to_string(), "true".to_string(),),
2664 (
2665 FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY.to_string(),
2666 "false".to_string(),
2667 )
2668 ])
2669 );
2670 }
2671
2672 #[test]
2673 fn test_validate_and_normalize_flow_options_unknown_option() {
2674 let err = validate_and_normalize_flow_options(HashMap::from([(
2675 "foo".to_string(),
2676 "bar".to_string(),
2677 )]))
2678 .unwrap_err();
2679
2680 assert!(
2681 err.to_string()
2682 .contains("unknown flow option 'foo', supported options: defer_on_missing_source, experimental_enable_incremental_read")
2683 );
2684 }
2685
2686 #[test]
2687 fn test_validate_and_normalize_flow_options_reserved_option() {
2688 let err = validate_and_normalize_flow_options(HashMap::from([(
2689 FlowType::FLOW_TYPE_KEY.to_string(),
2690 FlowType::BATCHING.to_string(),
2691 )]))
2692 .unwrap_err();
2693
2694 assert!(
2695 err.to_string()
2696 .contains("flow option 'flow_type' is reserved for internal use")
2697 );
2698 }
2699
2700 #[test]
2701 fn test_validate_and_normalize_flow_options_invalid_bool() {
2702 let err = validate_and_normalize_flow_options(HashMap::from([(
2703 DEFER_ON_MISSING_SOURCE_KEY.to_string(),
2704 "not-a-bool".to_string(),
2705 )]))
2706 .unwrap_err();
2707
2708 assert!(
2709 err.to_string()
2710 .contains("invalid flow option 'defer_on_missing_source': 'not-a-bool'")
2711 );
2712 }
2713
2714 #[test]
2715 fn test_validate_and_normalize_flow_options_rejects_redacted_invalid_input() {
2716 let sql = r"
2717CREATE FLOW task_6
2718SINK TO schema_1.table_1
2719WITH (access_key_id = [true])
2720AS
2721SELECT max(c1), min(c2) FROM schema_2.table_2;";
2722 let stmt =
2723 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2724 .unwrap()
2725 .pop()
2726 .unwrap();
2727
2728 let Statement::CreateFlow(create_flow) = stmt else {
2729 unreachable!()
2730 };
2731 let expr =
2732 expr_helper::to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
2733 let err = validate_and_normalize_flow_options(expr.flow_options).unwrap_err();
2734
2735 assert!(err.to_string().contains(
2736 "unknown flow option 'access_key_id', supported options: defer_on_missing_source"
2737 ));
2738 }
2739
2740 #[test]
2741 fn test_determine_flow_type_for_source_state_missing_sources_require_opt_in() {
2742 let err = determine_flow_type_for_source_state("my_flow", &HashMap::new(), true, false)
2743 .unwrap_err();
2744
2745 assert!(err.to_string().contains(
2746 "missing source tables for flow 'my_flow'; use WITH (defer_on_missing_source = true) to create a pending flow"
2747 ));
2748 }
2749
2750 #[test]
2751 fn test_determine_flow_type_for_source_state_missing_sources_prefer_batching() {
2752 let flow_options =
2753 HashMap::from([(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "true".to_string())]);
2754
2755 assert_eq!(
2756 determine_flow_type_for_source_state("my_flow", &flow_options, true, true).unwrap(),
2757 Some(FlowType::Batching)
2758 );
2759 }
2760
2761 #[test]
2762 fn test_determine_flow_type_for_source_state_instant_ttl_without_missing_sources() {
2763 assert_eq!(
2764 determine_flow_type_for_source_state("my_flow", &HashMap::new(), false, true).unwrap(),
2765 Some(FlowType::Streaming)
2766 );
2767 }
2768
2769 #[test]
2770 fn test_name_is_match() {
2771 assert!(!NAME_PATTERN_REG.is_match("/adaf"));
2772 assert!(!NAME_PATTERN_REG.is_match("🈲"));
2773 assert!(NAME_PATTERN_REG.is_match("hello"));
2774 assert!(NAME_PATTERN_REG.is_match("test@"));
2775 assert!(!NAME_PATTERN_REG.is_match("@test"));
2776 assert!(NAME_PATTERN_REG.is_match("test#"));
2777 assert!(!NAME_PATTERN_REG.is_match("#test"));
2778 assert!(!NAME_PATTERN_REG.is_match("@"));
2779 assert!(!NAME_PATTERN_REG.is_match("#"));
2780 }
2781
2782 #[test]
2783 fn test_partition_expr_equivalence_with_swapped_operands() {
2784 let column_name = "device_id".to_string();
2785 let column_name_and_type =
2786 HashMap::from([(&column_name, ConcreteDataType::int32_datatype())]);
2787 let timezone = Timezone::from_tz_string("UTC").unwrap();
2788 let dialect = GreptimeDbDialect {};
2789
2790 let mut parser = Parser::new(&dialect)
2791 .try_with_sql("device_id < 100")
2792 .unwrap();
2793 let expr_left = parser.parse_expr().unwrap();
2794
2795 let mut parser = Parser::new(&dialect)
2796 .try_with_sql("100 > device_id")
2797 .unwrap();
2798 let expr_right = parser.parse_expr().unwrap();
2799
2800 let partition_left =
2801 convert_one_expr(&expr_left, &column_name_and_type, &timezone).unwrap();
2802 let partition_right =
2803 convert_one_expr(&expr_right, &column_name_and_type, &timezone).unwrap();
2804
2805 assert_eq!(partition_left, partition_right);
2806 assert!([partition_left.clone()].contains(&partition_right));
2807
2808 let mut physical_partition_exprs = vec![partition_left];
2809 let mut logical_partition_exprs = vec![partition_right];
2810 physical_partition_exprs.sort_unstable();
2811 logical_partition_exprs.sort_unstable();
2812 assert_eq!(physical_partition_exprs, logical_partition_exprs);
2813 }
2814
2815 #[test]
2816 fn test_repartition_target_partition_columns_are_overwrite_context() {
2817 let device_id = ColumnSchema::new("device_id", ConcreteDataType::int32_datatype(), true);
2818 let area = ColumnSchema::new("area", ConcreteDataType::string_datatype(), true);
2819 let existing_partition_columns = vec![&device_id];
2820 let target_partition_columns = vec![&device_id, &area];
2821 let existing_column_name_and_type = column_name_and_type(&existing_partition_columns);
2822 let target_column_name_and_type = column_name_and_type(&target_partition_columns);
2823 let timezone = Timezone::from_tz_string("UTC").unwrap();
2824 let dialect = GreptimeDbDialect {};
2825
2826 let mut parser = Parser::new(&dialect)
2827 .try_with_sql("device_id < 100 AND area < 'South'")
2828 .unwrap();
2829 let expr = parser.parse_expr().unwrap();
2830
2831 let err = convert_one_expr(&expr, &existing_column_name_and_type, &timezone).unwrap_err();
2832 assert!(err.to_string().contains("area"));
2833
2834 let partition_expr = convert_one_expr(&expr, &target_column_name_and_type, &timezone)
2835 .expect("target columns should overwrite the conversion context");
2836 let partition_expr = partition_expr.to_string();
2837 assert!(partition_expr.contains("device_id"));
2838 assert!(partition_expr.contains("area"));
2839 assert!(partition_expr.contains("South"));
2840 }
2841
2842 #[test]
2843 fn test_repartition_rejects_remaining_expr_outside_target_columns() {
2844 let device_id = "device_id".to_string();
2845 let area = "area".to_string();
2846 let timezone = Timezone::from_tz_string("UTC").unwrap();
2847 let column_name_and_type = HashMap::from([
2848 (&device_id, ConcreteDataType::int32_datatype()),
2849 (&area, ConcreteDataType::string_datatype()),
2850 ]);
2851 let dialect = GreptimeDbDialect {};
2852 let mut parser = Parser::new(&dialect)
2853 .try_with_sql("device_id >= 100")
2854 .unwrap();
2855 let remaining_old_expr = convert_one_expr(
2856 &parser.parse_expr().unwrap(),
2857 &column_name_and_type,
2858 &timezone,
2859 )
2860 .unwrap();
2861 let target_partition_columns = HashSet::from([&area]);
2862
2863 let err = ensure_partition_expr_columns_in_target(
2864 &[remaining_old_expr],
2865 &target_partition_columns,
2866 )
2867 .unwrap_err();
2868
2869 assert!(err.to_string().contains("device_id"));
2870 assert!(err.to_string().contains("target partition columns"));
2871 }
2872
2873 #[test]
2874 fn test_repartition_rejects_duplicate_target_partition_columns() {
2875 let device_id = ColumnSchema::new("device_id", ConcreteDataType::int32_datatype(), true);
2876 let column_schemas = vec![device_id];
2877 let target_partition_columns = vec!["device_id".to_string(), "device_id".to_string()];
2878
2879 let err =
2880 validate_and_collect_partition_columns(&target_partition_columns, &column_schemas)
2881 .unwrap_err();
2882
2883 assert!(err.to_string().contains("duplicate partition column"));
2884 assert!(err.to_string().contains("device_id"));
2885 }
2886
2887 fn create_expr_from_sql(sql: &str) -> CreateTableExpr {
2888 let result =
2889 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2890 .unwrap();
2891
2892 match &result[0] {
2893 Statement::CreateTable(create) => {
2894 expr_helper::create_to_expr(create, &QueryContext::arc()).unwrap()
2895 }
2896 _ => unreachable!(),
2897 }
2898 }
2899
2900 #[test]
2901 fn test_create_table_with_repartition_column_hint() {
2902 let expr = create_expr_from_sql(
2903 r"
2904CREATE TABLE metrics (
2905 host STRING,
2906 ts TIMESTAMP TIME INDEX,
2907 cpu DOUBLE,
2908 PRIMARY KEY(host)
2909)
2910WITH ('repartition.column.hint' = ' host ')",
2911 );
2912
2913 let table_info = create_table_info(&expr, vec![]).unwrap();
2914 assert_eq!(
2915 table_info
2916 .meta
2917 .options
2918 .extra_options
2919 .get(REPARTITION_COLUMN_HINT_KEY),
2920 Some(&"host".to_string())
2921 );
2922 }
2923
2924 #[test]
2925 fn test_create_table_with_empty_repartition_column_hint() {
2926 let expr = create_expr_from_sql(
2927 r"
2928CREATE TABLE metrics (
2929 host STRING,
2930 ts TIMESTAMP TIME INDEX,
2931 cpu DOUBLE,
2932 PRIMARY KEY(host)
2933)
2934WITH ('repartition.column.hint' = '')",
2935 );
2936
2937 let err = create_table_info(&expr, vec![]).unwrap_err();
2938 assert!(
2939 err.to_string()
2940 .contains("repartition.column.hint expects exactly one column name")
2941 );
2942 }
2943
2944 #[test]
2945 fn test_create_table_with_multiple_repartition_column_hints() {
2946 let expr = create_expr_from_sql(
2947 r"
2948CREATE TABLE metrics (
2949 host STRING,
2950 region_id STRING,
2951 ts TIMESTAMP TIME INDEX,
2952 cpu DOUBLE,
2953 PRIMARY KEY(host)
2954)
2955WITH ('repartition.column.hint' = 'host,region_id')",
2956 );
2957
2958 let err = create_table_info(&expr, vec![]).unwrap_err();
2959 assert!(
2960 err.to_string()
2961 .contains("repartition.column.hint expects exactly one column name")
2962 );
2963 }
2964
2965 #[test]
2966 fn test_create_table_with_missing_repartition_column_hint() {
2967 let expr = create_expr_from_sql(
2968 r"
2969CREATE TABLE metrics (
2970 host STRING,
2971 ts TIMESTAMP TIME INDEX,
2972 cpu DOUBLE,
2973 PRIMARY KEY(host)
2974)
2975WITH ('repartition.column.hint' = 'region_id')",
2976 );
2977
2978 let err = create_table_info(&expr, vec![]).unwrap_err();
2979 assert!(
2980 err.to_string()
2981 .contains("Cannot find column by name: region")
2982 );
2983 }
2984
2985 #[test]
2986 fn test_create_table_with_time_index_repartition_column_hint() {
2987 let expr = create_expr_from_sql(
2988 r"
2989CREATE TABLE metrics (
2990 host STRING,
2991 ts TIMESTAMP TIME INDEX,
2992 cpu DOUBLE,
2993 PRIMARY KEY(host)
2994)
2995WITH ('repartition.column.hint' = 'ts')",
2996 );
2997
2998 let err = create_table_info(&expr, vec![]).unwrap_err();
2999 assert!(
3000 err.to_string()
3001 .contains("cannot set repartition.column.hint to the time index column")
3002 );
3003 }
3004
3005 #[test]
3006 fn test_create_partitioned_table_with_repartition_column_hint() {
3007 let expr = create_expr_from_sql(
3008 r"
3009CREATE TABLE metrics (
3010 host STRING,
3011 ts TIMESTAMP TIME INDEX,
3012 cpu DOUBLE,
3013 PRIMARY KEY(host)
3014)
3015WITH ('repartition.column.hint' = 'host')",
3016 );
3017
3018 let err = create_table_info(&expr, vec!["host".to_string()]).unwrap_err();
3019 assert!(
3020 err.to_string()
3021 .contains("cannot set repartition.column.hint on a table with partition metadata")
3022 );
3023 }
3024
3025 #[tokio::test]
3026 #[ignore = "TODO(ruihang): WIP new partition rule"]
3027 async fn test_parse_partitions() {
3028 common_telemetry::init_default_ut_logging();
3029 let cases = [
3030 (
3031 r"
3032CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
3033PARTITION ON COLUMNS (b) (
3034 b < 'hz',
3035 b >= 'hz' AND b < 'sh',
3036 b >= 'sh'
3037)
3038ENGINE=mito",
3039 r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
3040 ),
3041 (
3042 r"
3043CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
3044PARTITION BY RANGE COLUMNS (b, a) (
3045 PARTITION r0 VALUES LESS THAN ('hz', 10),
3046 b < 'hz' AND a < 10,
3047 b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20,
3048 b >= 'sh' AND a >= 20
3049)
3050ENGINE=mito",
3051 r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
3052 ),
3053 ];
3054 let ctx = QueryContextBuilder::default().build().into();
3055 for (sql, expected) in cases {
3056 let result = ParserContext::create_with_dialect(
3057 sql,
3058 &GreptimeDbDialect {},
3059 ParseOptions::default(),
3060 )
3061 .unwrap();
3062 match &result[0] {
3063 Statement::CreateTable(c) => {
3064 let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
3065 let (partitions, _) =
3066 parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
3067 let json = serde_json::to_string(&partitions).unwrap();
3068 assert_eq!(json, expected);
3069 }
3070 _ => unreachable!(),
3071 }
3072 }
3073 }
3074}