Skip to main content

operator/statement/
ddl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::helper::ColumnDataTypeWrapper;
20use api::v1::alter_table_expr::Kind;
21use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
22use api::v1::repartition::Source;
23use api::v1::{
24    AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
25    PartitionedSource, Repartition, TargetPartitionColumns, UnpartitionedSource, column_def,
26};
27#[cfg(feature = "enterprise")]
28use api::v1::{
29    CreateTriggerExpr as PbCreateTriggerExpr, meta::CreateTriggerTask as PbCreateTriggerTask,
30};
31use catalog::CatalogManagerRef;
32use chrono::Utc;
33use common_base::regex_pattern::NAME_PATTERN_REG;
34use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, is_readonly_schema};
35use common_catalog::{format_full_flow_name, format_full_table_name};
36use common_error::ext::BoxedError;
37use common_meta::cache_invalidator::Context;
38use common_meta::ddl::create_flow::{
39    DEFER_ON_MISSING_SOURCE_KEY, FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY, FlowType,
40};
41use common_meta::instruction::CacheIdent;
42use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
43use common_meta::procedure_executor::ExecutorContext;
44#[cfg(feature = "enterprise")]
45use common_meta::rpc::ddl::trigger::CreateTriggerTask;
46#[cfg(feature = "enterprise")]
47use common_meta::rpc::ddl::trigger::DropTriggerTask;
48use common_meta::rpc::ddl::{
49    CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
50    SubmitDdlTaskResponse,
51};
52use common_query::Output;
53use common_recordbatch::{RecordBatch, RecordBatches};
54use common_sql::convert::sql_value_to_value;
55use common_telemetry::{debug, info, tracing, warn};
56use common_time::{Timestamp, Timezone};
57use datafusion_common::tree_node::TreeNodeVisitor;
58use datafusion_expr::LogicalPlan;
59use datatypes::prelude::ConcreteDataType;
60use datatypes::schema::{ColumnSchema, Schema};
61use datatypes::value::Value;
62use datatypes::vectors::{StringVector, VectorRef};
63use humantime::parse_duration;
64use partition::expr::{Operand, PartitionExpr, RestrictedOp};
65use partition::multi_dim::MultiDimPartitionRule;
66use query::parser::QueryStatement;
67use query::plan::extract_and_rewrite_full_table_names;
68use query::query_engine::DefaultSerializer;
69use query::sql::create_table_stmt;
70use session::context::QueryContextRef;
71use session::table_name::table_idents_to_full_name;
72use snafu::{OptionExt, ResultExt, ensure};
73use sql::parser::{ParseOptions, ParserContext};
74use sql::parsers::utils::is_tql;
75use sql::statements::OptionMap;
76#[cfg(feature = "enterprise")]
77use sql::statements::alter::trigger::AlterTrigger;
78use sql::statements::alter::{AlterDatabase, AlterTable, AlterTableOperation};
79#[cfg(feature = "enterprise")]
80use sql::statements::create::trigger::CreateTrigger;
81use sql::statements::create::{
82    CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
83};
84use sql::statements::statement::Statement;
85use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
86use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
87use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
88use table::TableRef;
89use table::dist_table::DistTable;
90use table::metadata::{self, TableId, TableInfo, TableMeta, TableType};
91use table::requests::{
92    AlterKind, AlterTableRequest, COMMENT_KEY, DDL_TIMEOUT, DDL_WAIT, REPARTITION_COLUMN_HINT_KEY,
93    TableOptions,
94};
95use table::table_name::TableName;
96use table::table_reference::TableReference;
97
98use crate::error::{
99    self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
100    ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu,
101    DeserializePartitionExprSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
102    FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
103    InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu,
104    PartitionExprToPbSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu,
105    SerializePartitionExprSnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu,
106    TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
107    ViewAlreadyExistsSnafu,
108};
109use crate::expr_helper::{self, RepartitionRequest, RepartitionSource};
110use crate::statement::StatementExecutor;
111use crate::statement::show::create_partitions_stmt;
112use crate::utils::{to_meta_query_context, to_meta_query_context_with_origin_frontend};
113
114#[derive(Debug, Clone, Copy)]
115struct DdlSubmitOptions {
116    wait: bool,
117    timeout: Duration,
118}
119
120const ALLOWED_FLOW_OPTIONS: [&str; 2] = [
121    DEFER_ON_MISSING_SOURCE_KEY,
122    FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY,
123];
124
125fn build_procedure_id_output(procedure_id: Vec<u8>) -> Result<Output> {
126    let procedure_id = String::from_utf8_lossy(&procedure_id).to_string();
127    let vector: VectorRef = Arc::new(StringVector::from(vec![procedure_id]));
128    let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
129        "Procedure ID",
130        vector.data_type(),
131        false,
132    )]));
133    let batch =
134        RecordBatch::new(schema.clone(), vec![vector]).context(error::BuildRecordBatchSnafu)?;
135    let batches =
136        RecordBatches::try_new(schema, vec![batch]).context(error::BuildRecordBatchSnafu)?;
137    Ok(Output::new_with_record_batches(batches))
138}
139
140fn parse_ddl_options(options: &OptionMap) -> Result<DdlSubmitOptions> {
141    let wait = match options.get(DDL_WAIT) {
142        Some(value) => value.parse::<bool>().map_err(|_| {
143            InvalidSqlSnafu {
144                err_msg: format!("invalid DDL option '{DDL_WAIT}': '{value}'"),
145            }
146            .build()
147        })?,
148        None => SubmitDdlTaskRequest::default_wait(),
149    };
150
151    let timeout = match options.get(DDL_TIMEOUT) {
152        Some(value) => parse_duration(value).map_err(|err| {
153            InvalidSqlSnafu {
154                err_msg: format!("invalid DDL option '{DDL_TIMEOUT}': '{value}': {err}"),
155            }
156            .build()
157        })?,
158        None => SubmitDdlTaskRequest::default_timeout(),
159    };
160
161    Ok(DdlSubmitOptions { wait, timeout })
162}
163
164fn supported_flow_options() -> String {
165    ALLOWED_FLOW_OPTIONS.join(", ")
166}
167
168fn normalize_flow_bool_option(key: &str, value: &str) -> Result<String> {
169    value
170        .trim()
171        .to_ascii_lowercase()
172        .parse::<bool>()
173        .map(|value| value.to_string())
174        .map_err(|_| {
175            InvalidSqlSnafu {
176                err_msg: format!("invalid flow option '{key}': '{value}'"),
177            }
178            .build()
179        })
180}
181
182fn validate_and_normalize_flow_options(
183    options: HashMap<String, String>,
184) -> Result<HashMap<String, String>> {
185    options
186        .into_iter()
187        .map(|(key, value)| {
188            if key == FlowType::FLOW_TYPE_KEY {
189                return InvalidSqlSnafu {
190                    err_msg: format!("flow option '{key}' is reserved for internal use"),
191                }
192                .fail();
193            }
194
195            let normalized_value = match key.as_str() {
196                DEFER_ON_MISSING_SOURCE_KEY | FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY => {
197                    normalize_flow_bool_option(&key, &value)?
198                }
199                _ => {
200                    return InvalidSqlSnafu {
201                        err_msg: format!(
202                            "unknown flow option '{key}', supported options: {}",
203                            supported_flow_options()
204                        ),
205                    }
206                    .fail();
207                }
208            };
209
210            Ok((key, normalized_value))
211        })
212        .collect()
213}
214
215fn determine_flow_type_for_source_state(
216    flow_name: &str,
217    flow_options: &HashMap<String, String>,
218    has_missing_source_table: bool,
219    has_instant_ttl_source_table: bool,
220) -> Result<Option<FlowType>> {
221    if has_missing_source_table {
222        let defer_on_missing_source = flow_options
223            .get(DEFER_ON_MISSING_SOURCE_KEY)
224            .is_some_and(|value| value == "true");
225        ensure!(
226            defer_on_missing_source,
227            InvalidSqlSnafu {
228                err_msg: format!(
229                    "missing source tables for flow '{}'; use WITH ({DEFER_ON_MISSING_SOURCE_KEY} = true) to create a pending flow",
230                    flow_name
231                )
232            }
233        );
234        info!(
235            "Flow `{}` is created as a pending batching flow because source tables are missing and defer_on_missing_source=true",
236            flow_name
237        );
238        return Ok(Some(FlowType::Batching));
239    }
240
241    if has_instant_ttl_source_table {
242        return Ok(Some(FlowType::Streaming));
243    }
244
245    Ok(None)
246}
247
248impl StatementExecutor {
249    pub fn catalog_manager(&self) -> CatalogManagerRef {
250        self.catalog_manager.clone()
251    }
252
253    #[tracing::instrument(skip_all)]
254    pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
255        let (catalog, schema, _table) = table_idents_to_full_name(&stmt.name, &ctx)
256            .map_err(BoxedError::new)
257            .context(error::ExternalSnafu)?;
258
259        let schema_options = self
260            .table_metadata_manager
261            .schema_manager()
262            .get(SchemaNameKey {
263                catalog: &catalog,
264                schema: &schema,
265            })
266            .await
267            .context(TableMetadataManagerSnafu)?
268            .map(|v| v.into_inner());
269
270        let create_expr = &mut expr_helper::create_to_expr(&stmt, &ctx)?;
271        // Don't inherit schema-level TTL/compaction options into table options:
272        // TTL is applied during compaction, and `compaction.*` is handled separately.
273        if let Some(schema_options) = schema_options {
274            for (key, value) in schema_options.extra_options.iter() {
275                if key.starts_with("compaction.") {
276                    continue;
277                }
278                create_expr
279                    .table_options
280                    .entry(key.clone())
281                    .or_insert(value.clone());
282            }
283        }
284
285        self.create_table_inner(create_expr, stmt.partitions, ctx)
286            .await
287    }
288
289    #[tracing::instrument(skip_all)]
290    pub async fn create_table_like(
291        &self,
292        stmt: CreateTableLike,
293        ctx: QueryContextRef,
294    ) -> Result<TableRef> {
295        let (catalog, schema, table) = table_idents_to_full_name(&stmt.source_name, &ctx)
296            .map_err(BoxedError::new)
297            .context(error::ExternalSnafu)?;
298        let table_ref = self
299            .catalog_manager
300            .table(&catalog, &schema, &table, Some(&ctx))
301            .await
302            .context(CatalogSnafu)?
303            .context(TableNotFoundSnafu { table_name: &table })?;
304        let partition_info = self
305            .partition_manager
306            .find_physical_partition_info(table_ref.table_info().table_id())
307            .await
308            .context(error::FindTablePartitionRuleSnafu { table_name: table })?;
309
310        // CREATE TABLE LIKE also inherits database level options.
311        let schema_options = self
312            .table_metadata_manager
313            .schema_manager()
314            .get(SchemaNameKey {
315                catalog: &catalog,
316                schema: &schema,
317            })
318            .await
319            .context(TableMetadataManagerSnafu)?
320            .map(|v| v.into_inner());
321
322        let quote_style = ctx.quote_style();
323        let mut create_stmt =
324            create_table_stmt(&table_ref.table_info(), schema_options, quote_style)
325                .context(error::ParseQuerySnafu)?;
326        create_stmt.name = stmt.table_name;
327        create_stmt.if_not_exists = false;
328
329        let table_info = table_ref.table_info();
330        let partitions = create_partitions_stmt(&table_info, &partition_info.partitions)?.and_then(
331            |mut partitions| {
332                if !partitions.column_list.is_empty() {
333                    partitions.set_quote(quote_style);
334                    Some(partitions)
335                } else {
336                    None
337                }
338            },
339        );
340
341        let create_expr = &mut expr_helper::create_to_expr(&create_stmt, &ctx)?;
342        self.create_table_inner(create_expr, partitions, ctx).await
343    }
344
345    #[tracing::instrument(skip_all)]
346    pub async fn create_external_table(
347        &self,
348        create_expr: CreateExternalTable,
349        ctx: QueryContextRef,
350    ) -> Result<TableRef> {
351        let create_expr = &mut expr_helper::create_external_expr(create_expr, &ctx).await?;
352        self.create_table_inner(create_expr, None, ctx).await
353    }
354
355    #[tracing::instrument(skip_all)]
356    pub async fn create_table_inner(
357        &self,
358        create_table: &mut CreateTableExpr,
359        partitions: Option<Partitions>,
360        query_ctx: QueryContextRef,
361    ) -> Result<TableRef> {
362        ensure!(
363            !is_readonly_schema(&create_table.schema_name),
364            SchemaReadOnlySnafu {
365                name: create_table.schema_name.clone()
366            }
367        );
368
369        if create_table.engine == METRIC_ENGINE_NAME
370            && create_table
371                .table_options
372                .contains_key(LOGICAL_TABLE_METADATA_KEY)
373        {
374            if let Some(partitions) = partitions.as_ref()
375                && !partitions.exprs.is_empty()
376            {
377                self.validate_logical_table_partition_rule(create_table, partitions, &query_ctx)
378                    .await?;
379            }
380            // Create logical tables
381            self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
382                .await?
383                .into_iter()
384                .next()
385                .context(error::UnexpectedSnafu {
386                    violated: "expected to create logical tables",
387                })
388        } else {
389            // Create other normal table
390            self.create_non_logic_table(create_table, partitions, query_ctx)
391                .await
392        }
393    }
394
395    #[tracing::instrument(skip_all)]
396    pub async fn create_non_logic_table(
397        &self,
398        create_table: &mut CreateTableExpr,
399        partitions: Option<Partitions>,
400        query_ctx: QueryContextRef,
401    ) -> Result<TableRef> {
402        let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
403
404        // Check if schema exists
405        let schema = self
406            .table_metadata_manager
407            .schema_manager()
408            .get(SchemaNameKey::new(
409                &create_table.catalog_name,
410                &create_table.schema_name,
411            ))
412            .await
413            .context(TableMetadataManagerSnafu)?;
414        ensure!(
415            schema.is_some(),
416            SchemaNotFoundSnafu {
417                schema_info: &create_table.schema_name,
418            }
419        );
420
421        // if table exists.
422        if let Some(table) = self
423            .catalog_manager
424            .table(
425                &create_table.catalog_name,
426                &create_table.schema_name,
427                &create_table.table_name,
428                Some(&query_ctx),
429            )
430            .await
431            .context(CatalogSnafu)?
432        {
433            return if create_table.create_if_not_exists {
434                Ok(table)
435            } else {
436                TableAlreadyExistsSnafu {
437                    table: format_full_table_name(
438                        &create_table.catalog_name,
439                        &create_table.schema_name,
440                        &create_table.table_name,
441                    ),
442                }
443                .fail()
444            };
445        }
446
447        ensure!(
448            NAME_PATTERN_REG.is_match(&create_table.table_name),
449            InvalidTableNameSnafu {
450                table_name: &create_table.table_name,
451            }
452        );
453
454        let table_name = TableName::new(
455            &create_table.catalog_name,
456            &create_table.schema_name,
457            &create_table.table_name,
458        );
459
460        let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
461        let mut table_info = create_table_info(create_table, partition_cols)?;
462
463        let resp = self
464            .create_table_procedure(
465                create_table.clone(),
466                partitions,
467                table_info.clone(),
468                query_ctx,
469            )
470            .await?;
471
472        let table_id = resp
473            .table_ids
474            .into_iter()
475            .next()
476            .context(error::UnexpectedSnafu {
477                violated: "expected table_id",
478            })?;
479        info!("Successfully created table '{table_name}' with table id {table_id}");
480
481        table_info.ident.table_id = table_id;
482
483        let table_info = Arc::new(table_info);
484        create_table.table_id = Some(api::v1::TableId { id: table_id });
485
486        let table = DistTable::table(table_info);
487
488        Ok(table)
489    }
490
491    #[tracing::instrument(skip_all)]
492    pub async fn create_logical_tables(
493        &self,
494        create_table_exprs: &[CreateTableExpr],
495        query_context: QueryContextRef,
496    ) -> Result<Vec<TableRef>> {
497        let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
498        ensure!(
499            !create_table_exprs.is_empty(),
500            EmptyDdlExprSnafu {
501                name: "create logic tables"
502            }
503        );
504
505        // Check table names
506        for create_table in create_table_exprs {
507            ensure!(
508                NAME_PATTERN_REG.is_match(&create_table.table_name),
509                InvalidTableNameSnafu {
510                    table_name: &create_table.table_name,
511                }
512            );
513        }
514
515        let raw_tables_info = create_table_exprs
516            .iter()
517            .map(|create| create_table_info(create, vec![]))
518            .collect::<Result<Vec<_>>>()?;
519        let tables_data = create_table_exprs
520            .iter()
521            .cloned()
522            .zip(raw_tables_info.iter().cloned())
523            .collect::<Vec<_>>();
524
525        let resp = self
526            .create_logical_tables_procedure(tables_data, query_context.clone())
527            .await?;
528
529        let table_ids = resp.table_ids;
530        ensure!(
531            table_ids.len() == raw_tables_info.len(),
532            CreateLogicalTablesSnafu {
533                reason: format!(
534                    "The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}",
535                    raw_tables_info.len(),
536                    table_ids.len()
537                )
538            }
539        );
540        info!("Successfully created logical tables: {:?}", table_ids);
541
542        // Reacquire table infos from catalog so logical tables inherit the latest partition
543        // metadata (e.g. partition_key_indices) from their physical tables.
544        // And the returned table info also included extra partition columns that are in physical table but not in logical table's create table expr
545        let mut tables_info = Vec::with_capacity(table_ids.len());
546        for (table_id, create_table) in table_ids.iter().zip(create_table_exprs.iter()) {
547            let table = self
548                .catalog_manager
549                .table(
550                    &create_table.catalog_name,
551                    &create_table.schema_name,
552                    &create_table.table_name,
553                    Some(&query_context),
554                )
555                .await
556                .context(CatalogSnafu)?
557                .with_context(|| TableNotFoundSnafu {
558                    table_name: format_full_table_name(
559                        &create_table.catalog_name,
560                        &create_table.schema_name,
561                        &create_table.table_name,
562                    ),
563                })?;
564
565            let table_info = table.table_info();
566            // Safety check: ensure we are returning the table info that matches the newly created table id.
567            ensure!(
568                table_info.table_id() == *table_id,
569                CreateLogicalTablesSnafu {
570                    reason: format!(
571                        "Table id mismatch after creation, expected {}, got {} for table {}",
572                        table_id,
573                        table_info.table_id(),
574                        format_full_table_name(
575                            &create_table.catalog_name,
576                            &create_table.schema_name,
577                            &create_table.table_name
578                        )
579                    )
580                }
581            );
582
583            tables_info.push(table_info);
584        }
585
586        Ok(tables_info.into_iter().map(DistTable::table).collect())
587    }
588
589    async fn validate_logical_table_partition_rule(
590        &self,
591        create_table: &CreateTableExpr,
592        partitions: &Partitions,
593        query_ctx: &QueryContextRef,
594    ) -> Result<()> {
595        let (_, mut logical_partition_exprs) =
596            parse_partitions_for_logical_validation(create_table, partitions, query_ctx)?;
597
598        let physical_table_name = create_table
599            .table_options
600            .get(LOGICAL_TABLE_METADATA_KEY)
601            .with_context(|| CreateLogicalTablesSnafu {
602                reason: format!(
603                    "expect `{LOGICAL_TABLE_METADATA_KEY}` option on creating logical table"
604                ),
605            })?;
606
607        let physical_table = self
608            .catalog_manager
609            .table(
610                &create_table.catalog_name,
611                &create_table.schema_name,
612                physical_table_name,
613                Some(query_ctx),
614            )
615            .await
616            .context(CatalogSnafu)?
617            .context(TableNotFoundSnafu {
618                table_name: physical_table_name.clone(),
619            })?;
620
621        let physical_table_info = physical_table.table_info();
622        let (partition_rule, _) = self
623            .partition_manager
624            .find_table_partition_rule(&physical_table_info)
625            .await
626            .context(error::FindTablePartitionRuleSnafu {
627                table_name: physical_table_name.clone(),
628            })?;
629
630        let multi_dim_rule = partition_rule
631            .as_ref()
632            .as_any()
633            .downcast_ref::<MultiDimPartitionRule>()
634            .context(InvalidPartitionRuleSnafu {
635                reason: "physical table partition rule is not range-based",
636            })?;
637
638        // TODO(ruihang): project physical partition exprs to logical partition column
639        let mut physical_partition_exprs = multi_dim_rule.exprs().to_vec();
640        logical_partition_exprs.sort_unstable();
641        physical_partition_exprs.sort_unstable();
642
643        ensure!(
644            physical_partition_exprs == logical_partition_exprs,
645            InvalidPartitionRuleSnafu {
646                reason: format!(
647                    "logical table partition rule must match the corresponding physical table's\n logical table partition exprs:\t\t {:?}\n physical table partition exprs:\t {:?}",
648                    logical_partition_exprs, physical_partition_exprs
649                ),
650            }
651        );
652
653        Ok(())
654    }
655
656    #[cfg(feature = "enterprise")]
657    #[tracing::instrument(skip_all)]
658    pub async fn create_trigger(
659        &self,
660        stmt: CreateTrigger,
661        query_context: QueryContextRef,
662    ) -> Result<Output> {
663        let expr = expr_helper::to_create_trigger_task_expr(stmt, &query_context)?;
664        self.create_trigger_inner(expr, query_context).await
665    }
666
667    #[cfg(feature = "enterprise")]
668    pub async fn create_trigger_inner(
669        &self,
670        expr: PbCreateTriggerExpr,
671        query_context: QueryContextRef,
672    ) -> Result<Output> {
673        self.create_trigger_procedure(expr, query_context).await?;
674        Ok(Output::new_with_affected_rows(0))
675    }
676
677    #[cfg(feature = "enterprise")]
678    async fn create_trigger_procedure(
679        &self,
680        expr: PbCreateTriggerExpr,
681        query_context: QueryContextRef,
682    ) -> Result<SubmitDdlTaskResponse> {
683        let task = CreateTriggerTask::try_from(PbCreateTriggerTask {
684            create_trigger: Some(expr),
685        })
686        .context(error::InvalidExprSnafu)?;
687
688        let request = SubmitDdlTaskRequest::new(
689            to_meta_query_context(query_context),
690            DdlTask::new_create_trigger(task),
691        );
692
693        self.procedure_executor
694            .submit_ddl_task(&ExecutorContext::default(), request)
695            .await
696            .context(error::ExecuteDdlSnafu)
697    }
698
699    #[tracing::instrument(skip_all)]
700    pub async fn create_flow(
701        &self,
702        stmt: CreateFlow,
703        query_context: QueryContextRef,
704    ) -> Result<Output> {
705        // TODO(ruihang): do some verification
706        let expr = expr_helper::to_create_flow_task_expr(stmt, &query_context)?;
707
708        self.create_flow_inner(expr, query_context).await
709    }
710
711    pub async fn create_flow_inner(
712        &self,
713        expr: CreateFlowExpr,
714        query_context: QueryContextRef,
715    ) -> Result<Output> {
716        self.create_flow_procedure(expr, query_context).await?;
717        Ok(Output::new_with_affected_rows(0))
718    }
719
720    async fn create_flow_procedure(
721        &self,
722        mut expr: CreateFlowExpr,
723        query_context: QueryContextRef,
724    ) -> Result<SubmitDdlTaskResponse> {
725        expr.flow_options = validate_and_normalize_flow_options(expr.flow_options)?;
726
727        let flow_type = self
728            .determine_flow_type(&expr, query_context.clone())
729            .await?;
730        info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
731
732        expr.flow_options
733            .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
734
735        let task = CreateFlowTask::try_from(PbCreateFlowTask {
736            create_flow: Some(expr),
737        })
738        .context(error::InvalidExprSnafu)?;
739        let request = SubmitDdlTaskRequest::new(
740            to_meta_query_context(query_context),
741            DdlTask::new_create_flow(task),
742        );
743
744        self.procedure_executor
745            .submit_ddl_task(&ExecutorContext::default(), request)
746            .await
747            .context(error::ExecuteDdlSnafu)
748    }
749
750    /// Determine the flow type based on the SQL query
751    ///
752    /// If it contains aggregation or distinct, then it is a batch flow, otherwise it is a streaming flow
753    async fn determine_flow_type(
754        &self,
755        expr: &CreateFlowExpr,
756        query_ctx: QueryContextRef,
757    ) -> Result<FlowType> {
758        let mut has_missing_source_table = false;
759        let mut has_instant_ttl_source_table = false;
760
761        for src_table_name in &expr.source_table_names {
762            let table = self
763                .catalog_manager()
764                .table(
765                    &src_table_name.catalog_name,
766                    &src_table_name.schema_name,
767                    &src_table_name.table_name,
768                    Some(&query_ctx),
769                )
770                .await
771                .map_err(BoxedError::new)
772                .context(ExternalSnafu)?;
773
774            let Some(table) = table else {
775                has_missing_source_table = true;
776                continue;
777            };
778
779            if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
780                warn!(
781                    "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
782                    format_full_table_name(
783                        &src_table_name.catalog_name,
784                        &src_table_name.schema_name,
785                        &src_table_name.table_name
786                    ),
787                    expr.flow_name
788                );
789                has_instant_ttl_source_table = true;
790            }
791        }
792
793        if let Some(flow_type) = determine_flow_type_for_source_state(
794            &expr.flow_name,
795            &expr.flow_options,
796            has_missing_source_table,
797            has_instant_ttl_source_table,
798        )? {
799            return Ok(flow_type);
800        }
801
802        let engine = &self.query_engine;
803        let stmts = ParserContext::create_with_dialect(
804            &expr.sql,
805            query_ctx.sql_dialect(),
806            ParseOptions::default(),
807        )
808        .map_err(BoxedError::new)
809        .context(ExternalSnafu)?;
810
811        ensure!(
812            stmts.len() == 1,
813            InvalidSqlSnafu {
814                err_msg: format!("Expect only one statement, found {}", stmts.len())
815            }
816        );
817        let stmt = &stmts[0];
818
819        if is_tql(query_ctx.sql_dialect(), &expr.sql)
820            .map_err(BoxedError::new)
821            .context(ExternalSnafu)?
822        {
823            return Ok(FlowType::Batching);
824        }
825
826        // support tql parse too
827        let plan = match stmt {
828            // prom ql is only supported in batching mode
829            Statement::Tql(_) => return Ok(FlowType::Batching),
830            _ => engine
831                .planner()
832                .plan(&QueryStatement::Sql(stmt.clone()), query_ctx)
833                .await
834                .map_err(BoxedError::new)
835                .context(ExternalSnafu)?,
836        };
837
838        /// Visitor to find aggregation or distinct
839        struct FindAggr {
840            is_aggr: bool,
841        }
842
843        impl TreeNodeVisitor<'_> for FindAggr {
844            type Node = LogicalPlan;
845            fn f_down(
846                &mut self,
847                node: &Self::Node,
848            ) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
849            {
850                match node {
851                    LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
852                        self.is_aggr = true;
853                        return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
854                    }
855                    _ => (),
856                }
857                Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
858            }
859        }
860
861        let mut find_aggr = FindAggr { is_aggr: false };
862
863        plan.visit_with_subqueries(&mut find_aggr)
864            .context(BuildDfLogicalPlanSnafu)?;
865        if find_aggr.is_aggr {
866            Ok(FlowType::Batching)
867        } else {
868            Ok(FlowType::Streaming)
869        }
870    }
871
872    #[tracing::instrument(skip_all)]
873    pub async fn create_view(
874        &self,
875        create_view: CreateView,
876        ctx: QueryContextRef,
877    ) -> Result<TableRef> {
878        // convert input into logical plan
879        let logical_plan = match &*create_view.query {
880            Statement::Query(query) => {
881                self.plan(
882                    &QueryStatement::Sql(Statement::Query(query.clone())),
883                    ctx.clone(),
884                )
885                .await?
886            }
887            Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?,
888            _ => {
889                return InvalidViewStmtSnafu {}.fail();
890            }
891        };
892        // Save the definition for `show create view`.
893        let definition = create_view.to_string();
894
895        // Save the columns in plan, it may changed when the schemas of tables in plan
896        // are altered.
897        let schema: Schema = logical_plan
898            .schema()
899            .clone()
900            .try_into()
901            .context(ConvertSchemaSnafu)?;
902        let plan_columns: Vec<_> = schema
903            .column_schemas()
904            .iter()
905            .map(|c| c.name.clone())
906            .collect();
907
908        let columns: Vec<_> = create_view
909            .columns
910            .iter()
911            .map(|ident| ident.to_string())
912            .collect();
913
914        // Validate columns
915        if !columns.is_empty() {
916            ensure!(
917                columns.len() == plan_columns.len(),
918                error::ViewColumnsMismatchSnafu {
919                    view_name: create_view.name.to_string(),
920                    expected: plan_columns.len(),
921                    actual: columns.len(),
922                }
923            );
924        }
925
926        // Extract the table names from the original plan
927        // and rewrite them as fully qualified names.
928        let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
929            .context(ExtractTableNamesSnafu)?;
930
931        let table_names = table_names.into_iter().map(|t| t.into()).collect();
932
933        // TODO(dennis): we don't save the optimized plan yet,
934        // because there are some serialization issue with our own defined plan node (such as `MergeScanLogicalPlan`).
935        // When the issues are fixed, we can use the `optimized_plan` instead.
936        // let optimized_plan = self.optimize_logical_plan(logical_plan)?.unwrap_df_plan();
937
938        // encode logical plan
939        let encoded_plan = DFLogicalSubstraitConvertor
940            .encode(&plan, DefaultSerializer)
941            .context(SubstraitCodecSnafu)?;
942
943        let expr = expr_helper::to_create_view_expr(
944            create_view,
945            encoded_plan.to_vec(),
946            table_names,
947            columns,
948            plan_columns,
949            definition,
950            ctx.clone(),
951        )?;
952
953        // TODO(dennis): validate the logical plan
954        self.create_view_by_expr(expr, ctx).await
955    }
956
957    pub async fn create_view_by_expr(
958        &self,
959        expr: CreateViewExpr,
960        ctx: QueryContextRef,
961    ) -> Result<TableRef> {
962        ensure! {
963            !(expr.create_if_not_exists & expr.or_replace),
964            InvalidSqlSnafu {
965                err_msg: "syntax error Create Or Replace and If Not Exist cannot be used together",
966            }
967        };
968        let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
969
970        let schema_exists = self
971            .table_metadata_manager
972            .schema_manager()
973            .exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name))
974            .await
975            .context(TableMetadataManagerSnafu)?;
976
977        ensure!(
978            schema_exists,
979            SchemaNotFoundSnafu {
980                schema_info: &expr.schema_name,
981            }
982        );
983
984        // if view or table exists.
985        if let Some(table) = self
986            .catalog_manager
987            .table(
988                &expr.catalog_name,
989                &expr.schema_name,
990                &expr.view_name,
991                Some(&ctx),
992            )
993            .await
994            .context(CatalogSnafu)?
995        {
996            let table_type = table.table_info().table_type;
997
998            match (table_type, expr.create_if_not_exists, expr.or_replace) {
999                (TableType::View, true, false) => {
1000                    return Ok(table);
1001                }
1002                (TableType::View, false, false) => {
1003                    return ViewAlreadyExistsSnafu {
1004                        name: format_full_table_name(
1005                            &expr.catalog_name,
1006                            &expr.schema_name,
1007                            &expr.view_name,
1008                        ),
1009                    }
1010                    .fail();
1011                }
1012                (TableType::View, _, true) => {
1013                    // Try to replace an exists view
1014                }
1015                _ => {
1016                    return TableAlreadyExistsSnafu {
1017                        table: format_full_table_name(
1018                            &expr.catalog_name,
1019                            &expr.schema_name,
1020                            &expr.view_name,
1021                        ),
1022                    }
1023                    .fail();
1024                }
1025            }
1026        }
1027
1028        ensure!(
1029            NAME_PATTERN_REG.is_match(&expr.view_name),
1030            InvalidViewNameSnafu {
1031                name: expr.view_name.clone(),
1032            }
1033        );
1034
1035        let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
1036
1037        let mut view_info = TableInfo {
1038            ident: metadata::TableIdent {
1039                // The view id of distributed table is assigned by Meta, set "0" here as a placeholder.
1040                table_id: 0,
1041                version: 0,
1042            },
1043            name: expr.view_name.clone(),
1044            desc: None,
1045            catalog_name: expr.catalog_name.clone(),
1046            schema_name: expr.schema_name.clone(),
1047            // The meta doesn't make sense for views, so using a default one.
1048            meta: TableMeta::empty(),
1049            table_type: TableType::View,
1050        };
1051
1052        let request = SubmitDdlTaskRequest::new(
1053            to_meta_query_context(ctx),
1054            DdlTask::new_create_view(expr, view_info.clone()),
1055        );
1056
1057        let resp = self
1058            .procedure_executor
1059            .submit_ddl_task(&ExecutorContext::default(), request)
1060            .await
1061            .context(error::ExecuteDdlSnafu)?;
1062
1063        debug!(
1064            "Submit creating view '{view_name}' task response: {:?}",
1065            resp
1066        );
1067
1068        let view_id = resp
1069            .table_ids
1070            .into_iter()
1071            .next()
1072            .context(error::UnexpectedSnafu {
1073                violated: "expected table_id",
1074            })?;
1075        info!("Successfully created view '{view_name}' with view id {view_id}");
1076
1077        view_info.ident.table_id = view_id;
1078
1079        let view_info = Arc::new(view_info);
1080
1081        let table = DistTable::table(view_info);
1082
1083        // Invalidates local cache ASAP.
1084        self.cache_invalidator
1085            .invalidate(
1086                &Context::default(),
1087                &[
1088                    CacheIdent::TableId(view_id),
1089                    CacheIdent::TableName(view_name.clone()),
1090                ],
1091            )
1092            .await
1093            .context(error::InvalidateTableCacheSnafu)?;
1094
1095        Ok(table)
1096    }
1097
1098    #[tracing::instrument(skip_all)]
1099    pub async fn drop_flow(
1100        &self,
1101        catalog_name: String,
1102        flow_name: String,
1103        drop_if_exists: bool,
1104        query_context: QueryContextRef,
1105    ) -> Result<Output> {
1106        if let Some(flow) = self
1107            .flow_metadata_manager
1108            .flow_name_manager()
1109            .get(&catalog_name, &flow_name)
1110            .await
1111            .context(error::TableMetadataManagerSnafu)?
1112        {
1113            let flow_id = flow.flow_id();
1114            let task = DropFlowTask {
1115                catalog_name,
1116                flow_name,
1117                flow_id,
1118                drop_if_exists,
1119            };
1120            self.drop_flow_procedure(task, query_context).await?;
1121
1122            Ok(Output::new_with_affected_rows(0))
1123        } else if drop_if_exists {
1124            Ok(Output::new_with_affected_rows(0))
1125        } else {
1126            FlowNotFoundSnafu {
1127                flow_name: format_full_flow_name(&catalog_name, &flow_name),
1128            }
1129            .fail()
1130        }
1131    }
1132
1133    async fn drop_flow_procedure(
1134        &self,
1135        expr: DropFlowTask,
1136        query_context: QueryContextRef,
1137    ) -> Result<SubmitDdlTaskResponse> {
1138        let request = SubmitDdlTaskRequest::new(
1139            to_meta_query_context(query_context),
1140            DdlTask::new_drop_flow(expr),
1141        );
1142
1143        self.procedure_executor
1144            .submit_ddl_task(&ExecutorContext::default(), request)
1145            .await
1146            .context(error::ExecuteDdlSnafu)
1147    }
1148
1149    #[cfg(feature = "enterprise")]
1150    #[tracing::instrument(skip_all)]
1151    pub(super) async fn drop_trigger(
1152        &self,
1153        catalog_name: String,
1154        trigger_name: String,
1155        drop_if_exists: bool,
1156        query_context: QueryContextRef,
1157    ) -> Result<Output> {
1158        let task = DropTriggerTask {
1159            catalog_name,
1160            trigger_name,
1161            drop_if_exists,
1162        };
1163        self.drop_trigger_procedure(task, query_context).await?;
1164        Ok(Output::new_with_affected_rows(0))
1165    }
1166
1167    #[cfg(feature = "enterprise")]
1168    async fn drop_trigger_procedure(
1169        &self,
1170        expr: DropTriggerTask,
1171        query_context: QueryContextRef,
1172    ) -> Result<SubmitDdlTaskResponse> {
1173        let request = SubmitDdlTaskRequest::new(
1174            to_meta_query_context(query_context),
1175            DdlTask::new_drop_trigger(expr),
1176        );
1177
1178        self.procedure_executor
1179            .submit_ddl_task(&ExecutorContext::default(), request)
1180            .await
1181            .context(error::ExecuteDdlSnafu)
1182    }
1183
1184    /// Drop a view
1185    #[tracing::instrument(skip_all)]
1186    pub(crate) async fn drop_view(
1187        &self,
1188        catalog: String,
1189        schema: String,
1190        view: String,
1191        drop_if_exists: bool,
1192        query_context: QueryContextRef,
1193    ) -> Result<Output> {
1194        let view_info = if let Some(view) = self
1195            .catalog_manager
1196            .table(&catalog, &schema, &view, None)
1197            .await
1198            .context(CatalogSnafu)?
1199        {
1200            view.table_info()
1201        } else if drop_if_exists {
1202            // DROP VIEW IF EXISTS meets view not found - ignored
1203            return Ok(Output::new_with_affected_rows(0));
1204        } else {
1205            return TableNotFoundSnafu {
1206                table_name: format_full_table_name(&catalog, &schema, &view),
1207            }
1208            .fail();
1209        };
1210
1211        // Ensure the exists one is view, we can't drop other table types
1212        ensure!(
1213            view_info.table_type == TableType::View,
1214            error::InvalidViewSnafu {
1215                msg: "not a view",
1216                view_name: format_full_table_name(&catalog, &schema, &view),
1217            }
1218        );
1219
1220        let view_id = view_info.table_id();
1221
1222        let task = DropViewTask {
1223            catalog,
1224            schema,
1225            view,
1226            view_id,
1227            drop_if_exists,
1228        };
1229
1230        self.drop_view_procedure(task, query_context).await?;
1231
1232        Ok(Output::new_with_affected_rows(0))
1233    }
1234
1235    /// Submit [DropViewTask] to procedure executor.
1236    async fn drop_view_procedure(
1237        &self,
1238        expr: DropViewTask,
1239        query_context: QueryContextRef,
1240    ) -> Result<SubmitDdlTaskResponse> {
1241        let request = SubmitDdlTaskRequest::new(
1242            to_meta_query_context(query_context),
1243            DdlTask::new_drop_view(expr),
1244        );
1245
1246        self.procedure_executor
1247            .submit_ddl_task(&ExecutorContext::default(), request)
1248            .await
1249            .context(error::ExecuteDdlSnafu)
1250    }
1251
1252    #[tracing::instrument(skip_all)]
1253    pub async fn alter_logical_tables(
1254        &self,
1255        alter_table_exprs: Vec<AlterTableExpr>,
1256        query_context: QueryContextRef,
1257    ) -> Result<Output> {
1258        let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
1259        ensure!(
1260            !alter_table_exprs.is_empty(),
1261            EmptyDdlExprSnafu {
1262                name: "alter logical tables"
1263            }
1264        );
1265
1266        // group by physical table id
1267        let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
1268        for expr in alter_table_exprs {
1269            // Get table_id from catalog_manager
1270            let catalog = if expr.catalog_name.is_empty() {
1271                query_context.current_catalog()
1272            } else {
1273                &expr.catalog_name
1274            };
1275            let schema = if expr.schema_name.is_empty() {
1276                query_context.current_schema()
1277            } else {
1278                expr.schema_name.clone()
1279            };
1280            let table_name = &expr.table_name;
1281            let table = self
1282                .catalog_manager
1283                .table(catalog, &schema, table_name, Some(&query_context))
1284                .await
1285                .context(CatalogSnafu)?
1286                .with_context(|| TableNotFoundSnafu {
1287                    table_name: format_full_table_name(catalog, &schema, table_name),
1288                })?;
1289            let table_id = table.table_info().ident.table_id;
1290            let physical_table_id = self
1291                .table_metadata_manager
1292                .table_route_manager()
1293                .get_physical_table_id(table_id)
1294                .await
1295                .context(TableMetadataManagerSnafu)?;
1296            groups.entry(physical_table_id).or_default().push(expr);
1297        }
1298
1299        // Submit procedure for each physical table
1300        let mut handles = Vec::with_capacity(groups.len());
1301        for (_physical_table_id, exprs) in groups {
1302            let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
1303            handles.push(fut);
1304        }
1305        let _results = futures::future::try_join_all(handles).await?;
1306
1307        Ok(Output::new_with_affected_rows(0))
1308    }
1309
1310    #[tracing::instrument(skip_all)]
1311    pub async fn drop_table(
1312        &self,
1313        table_name: TableName,
1314        drop_if_exists: bool,
1315        query_context: QueryContextRef,
1316    ) -> Result<Output> {
1317        // Reserved for grpc call
1318        self.drop_tables(&[table_name], drop_if_exists, query_context)
1319            .await
1320    }
1321
1322    #[tracing::instrument(skip_all)]
1323    pub async fn drop_tables(
1324        &self,
1325        table_names: &[TableName],
1326        drop_if_exists: bool,
1327        query_context: QueryContextRef,
1328    ) -> Result<Output> {
1329        let mut tables = Vec::with_capacity(table_names.len());
1330        for table_name in table_names {
1331            ensure!(
1332                !is_readonly_schema(&table_name.schema_name),
1333                SchemaReadOnlySnafu {
1334                    name: table_name.schema_name.clone()
1335                }
1336            );
1337
1338            if let Some(table) = self
1339                .catalog_manager
1340                .table(
1341                    &table_name.catalog_name,
1342                    &table_name.schema_name,
1343                    &table_name.table_name,
1344                    Some(&query_context),
1345                )
1346                .await
1347                .context(CatalogSnafu)?
1348            {
1349                tables.push(table.table_info().table_id());
1350            } else if drop_if_exists {
1351                // DROP TABLE IF EXISTS meets table not found - ignored
1352                continue;
1353            } else {
1354                return TableNotFoundSnafu {
1355                    table_name: table_name.to_string(),
1356                }
1357                .fail();
1358            }
1359        }
1360
1361        for (table_name, table_id) in table_names.iter().zip(tables.into_iter()) {
1362            self.drop_table_procedure(table_name, table_id, drop_if_exists, query_context.clone())
1363                .await?;
1364
1365            // Invalidates local cache ASAP.
1366            self.cache_invalidator
1367                .invalidate(
1368                    &Context::default(),
1369                    &[
1370                        CacheIdent::TableId(table_id),
1371                        CacheIdent::TableName(table_name.clone()),
1372                    ],
1373                )
1374                .await
1375                .context(error::InvalidateTableCacheSnafu)?;
1376        }
1377        Ok(Output::new_with_affected_rows(0))
1378    }
1379
1380    #[tracing::instrument(skip_all)]
1381    pub async fn drop_database(
1382        &self,
1383        catalog: String,
1384        schema: String,
1385        drop_if_exists: bool,
1386        query_context: QueryContextRef,
1387    ) -> Result<Output> {
1388        ensure!(
1389            !is_readonly_schema(&schema),
1390            SchemaReadOnlySnafu { name: schema }
1391        );
1392
1393        if self
1394            .catalog_manager
1395            .schema_exists(&catalog, &schema, None)
1396            .await
1397            .context(CatalogSnafu)?
1398        {
1399            if schema == query_context.current_schema() {
1400                SchemaInUseSnafu { name: schema }.fail()
1401            } else {
1402                self.drop_database_procedure(catalog, schema, drop_if_exists, query_context)
1403                    .await?;
1404
1405                Ok(Output::new_with_affected_rows(0))
1406            }
1407        } else if drop_if_exists {
1408            // DROP TABLE IF EXISTS meets table not found - ignored
1409            Ok(Output::new_with_affected_rows(0))
1410        } else {
1411            SchemaNotFoundSnafu {
1412                schema_info: schema,
1413            }
1414            .fail()
1415        }
1416    }
1417
1418    #[tracing::instrument(skip_all)]
1419    pub async fn truncate_table(
1420        &self,
1421        table_name: TableName,
1422        time_ranges: Vec<(Timestamp, Timestamp)>,
1423        query_context: QueryContextRef,
1424    ) -> Result<Output> {
1425        ensure!(
1426            !is_readonly_schema(&table_name.schema_name),
1427            SchemaReadOnlySnafu {
1428                name: table_name.schema_name.clone()
1429            }
1430        );
1431
1432        let table = self
1433            .catalog_manager
1434            .table(
1435                &table_name.catalog_name,
1436                &table_name.schema_name,
1437                &table_name.table_name,
1438                Some(&query_context),
1439            )
1440            .await
1441            .context(CatalogSnafu)?
1442            .with_context(|| TableNotFoundSnafu {
1443                table_name: table_name.to_string(),
1444            })?;
1445        let table_id = table.table_info().table_id();
1446        self.truncate_table_procedure(&table_name, table_id, time_ranges, query_context)
1447            .await?;
1448
1449        Ok(Output::new_with_affected_rows(0))
1450    }
1451
1452    #[tracing::instrument(skip_all)]
1453    pub async fn alter_table(
1454        &self,
1455        alter_table: AlterTable,
1456        query_context: QueryContextRef,
1457    ) -> Result<Output> {
1458        if matches!(
1459            alter_table.alter_operation(),
1460            AlterTableOperation::Repartition { .. } | AlterTableOperation::Partition { .. }
1461        ) {
1462            let request = expr_helper::to_repartition_request(alter_table, &query_context)?;
1463            return self.repartition_table(request, &query_context).await;
1464        }
1465
1466        let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
1467        self.alter_table_inner(expr, query_context).await
1468    }
1469
1470    #[tracing::instrument(skip_all)]
1471    pub async fn repartition_table(
1472        &self,
1473        request: RepartitionRequest,
1474        query_context: &QueryContextRef,
1475    ) -> Result<Output> {
1476        // Check if the schema is read-only.
1477        ensure!(
1478            !is_readonly_schema(&request.schema_name),
1479            SchemaReadOnlySnafu {
1480                name: request.schema_name.clone()
1481            }
1482        );
1483
1484        let table_ref = TableReference::full(
1485            &request.catalog_name,
1486            &request.schema_name,
1487            &request.table_name,
1488        );
1489        // Get the table from the catalog.
1490        let table = self
1491            .catalog_manager
1492            .table(
1493                &request.catalog_name,
1494                &request.schema_name,
1495                &request.table_name,
1496                Some(query_context),
1497            )
1498            .await
1499            .context(CatalogSnafu)?
1500            .with_context(|| TableNotFoundSnafu {
1501                table_name: table_ref.to_string(),
1502            })?;
1503        let table_id = table.table_info().ident.table_id;
1504        // Get existing partition expressions from the table route.
1505        let (physical_table_id, physical_table_route) = self
1506            .table_metadata_manager
1507            .table_route_manager()
1508            .get_physical_table_route(table_id)
1509            .await
1510            .context(TableMetadataManagerSnafu)?;
1511
1512        ensure!(
1513            physical_table_id == table_id,
1514            NotSupportedSnafu {
1515                feat: "REPARTITION on logical tables"
1516            }
1517        );
1518
1519        let table_info = table.table_info();
1520        let existing_partition_columns = table_info.meta.partition_columns().collect::<Vec<_>>();
1521        let column_schemas = table_info.meta.schema.column_schemas();
1522        // `REPARTITION ... ON COLUMNS` uses overwrite semantics: the provided
1523        // columns are the full target partition columns, not an extension of the
1524        // current ones. Therefore source expressions are converted with the
1525        // existing partition columns, while target expressions and the final
1526        // partition rule are validated against this effective target column set.
1527        let target_partition_columns = match &request.source {
1528            RepartitionSource::Partitions {
1529                target_partition_columns,
1530                ..
1531            } => {
1532                ensure!(
1533                    !existing_partition_columns.is_empty(),
1534                    InvalidPartitionRuleSnafu {
1535                        reason: format!(
1536                            "table {} does not have partition columns, cannot repartition",
1537                            table_ref
1538                        )
1539                    }
1540                );
1541
1542                if let Some(target_partition_columns) = target_partition_columns {
1543                    ensure!(
1544                        !target_partition_columns.is_empty(),
1545                        InvalidPartitionRuleSnafu {
1546                            reason: "ON COLUMNS requires at least one partition column"
1547                        }
1548                    );
1549                    validate_and_collect_partition_columns(
1550                        target_partition_columns,
1551                        column_schemas,
1552                    )?
1553                } else {
1554                    existing_partition_columns.clone()
1555                }
1556            }
1557            RepartitionSource::Unpartitioned { partition_columns } => {
1558                ensure!(
1559                    !partition_columns.is_empty(),
1560                    InvalidPartitionRuleSnafu {
1561                        reason: "PARTITION ON COLUMNS requires at least one partition column"
1562                    }
1563                );
1564                ensure!(
1565                    existing_partition_columns.is_empty(),
1566                    InvalidPartitionRuleSnafu {
1567                        reason: format!("table {} already has partition columns", table_ref)
1568                    }
1569                );
1570                partition_columns
1571                    .iter()
1572                    .map(|column_name| {
1573                        column_schemas
1574                            .iter()
1575                            .find(|column| &column.name == column_name)
1576                            .with_context(|| ColumnNotFoundSnafu { msg: column_name })
1577                    })
1578                    .collect::<Result<Vec<_>>>()?
1579            }
1580        };
1581
1582        let from_column_name_and_type = column_name_and_type(&existing_partition_columns);
1583        let target_column_name_and_type = column_name_and_type(&target_partition_columns);
1584        let target_partition_column_names = target_partition_columns
1585            .iter()
1586            .map(|column| column.name.clone())
1587            .collect::<Vec<_>>();
1588        let timezone = query_context.timezone();
1589        // Convert SQL Exprs to PartitionExprs.
1590        let from_partition_exprs = match &request.source {
1591            RepartitionSource::Partitions { from_exprs, .. } => from_exprs
1592                .iter()
1593                .map(|expr| convert_one_expr(expr, &from_column_name_and_type, &timezone))
1594                .collect::<Result<Vec<_>>>()?,
1595            RepartitionSource::Unpartitioned { .. } => vec![],
1596        };
1597
1598        let mut into_partition_exprs = request
1599            .into_exprs
1600            .iter()
1601            .map(|expr| convert_one_expr(expr, &target_column_name_and_type, &timezone))
1602            .collect::<Result<Vec<_>>>()?;
1603
1604        // `MERGE PARTITION` (and some `REPARTITION`) generates a single `OR` expression from
1605        // multiple source partitions; try to simplify it for better readability and stability.
1606        if matches!(&request.source, RepartitionSource::Partitions { .. })
1607            && from_partition_exprs.len() > 1
1608            && into_partition_exprs.len() == 1
1609            && let Some(expr) = into_partition_exprs.pop()
1610        {
1611            into_partition_exprs.push(partition::simplify::simplify_merged_partition_expr(expr));
1612        }
1613
1614        // Parse existing partition expressions from region routes.
1615        let mut existing_partition_exprs =
1616            Vec::with_capacity(physical_table_route.region_routes.len());
1617        for route in &physical_table_route.region_routes {
1618            let expr_json = route.region.partition_expr();
1619            if !expr_json.is_empty() {
1620                match PartitionExpr::from_json_str(&expr_json) {
1621                    Ok(Some(expr)) => existing_partition_exprs.push(expr),
1622                    Ok(None) => {
1623                        // Empty
1624                    }
1625                    Err(e) => {
1626                        return Err(e).context(DeserializePartitionExprSnafu);
1627                    }
1628                }
1629            }
1630        }
1631
1632        // Validate that from_partition_exprs are a subset of existing partition exprs.
1633        // We compare PartitionExpr directly since it implements Eq.
1634        if matches!(&request.source, RepartitionSource::Partitions { .. }) {
1635            for from_expr in &from_partition_exprs {
1636                ensure!(
1637                    existing_partition_exprs.contains(from_expr),
1638                    InvalidPartitionRuleSnafu {
1639                        reason: format!(
1640                            "partition expression '{}' does not exist in table {}",
1641                            from_expr, table_ref
1642                        )
1643                    }
1644                );
1645            }
1646        }
1647
1648        // Build the new partition expressions:
1649        // new_exprs = existing_exprs - from_exprs + into_exprs
1650        let new_partition_exprs: Vec<PartitionExpr> = match &request.source {
1651            RepartitionSource::Partitions { .. } => existing_partition_exprs
1652                .into_iter()
1653                .filter(|expr| !from_partition_exprs.contains(expr))
1654                .chain(into_partition_exprs.clone().into_iter())
1655                .collect(),
1656            RepartitionSource::Unpartitioned { .. } => into_partition_exprs.clone(),
1657        };
1658        ensure_partition_expr_columns_in_target(
1659            &new_partition_exprs,
1660            &target_partition_column_names.iter().collect(),
1661        )?;
1662        let new_partition_exprs_len = new_partition_exprs.len();
1663        let from_partition_exprs_len = from_partition_exprs.len();
1664
1665        // Validate the new partition expressions using MultiDimPartitionRule and PartitionChecker.
1666        let _ = MultiDimPartitionRule::try_new(
1667            target_partition_column_names,
1668            vec![],
1669            new_partition_exprs,
1670            true,
1671        )
1672        .context(InvalidPartitionSnafu)?;
1673
1674        let ddl_options = parse_ddl_options(&request.options)?;
1675        let serialize_exprs = |exprs: Vec<PartitionExpr>| -> Result<Vec<String>> {
1676            let mut json_exprs = Vec::with_capacity(exprs.len());
1677            for expr in exprs {
1678                json_exprs.push(expr.as_json_str().context(SerializePartitionExprSnafu)?);
1679            }
1680            Ok(json_exprs)
1681        };
1682        let from_partition_exprs_json = serialize_exprs(from_partition_exprs)?;
1683        let into_partition_exprs_json = serialize_exprs(into_partition_exprs)?;
1684        let source = match &request.source {
1685            RepartitionSource::Partitions {
1686                target_partition_columns,
1687                ..
1688            } => Source::PartitionExprs(PartitionedSource {
1689                exprs: from_partition_exprs_json,
1690                target_partition_columns: target_partition_columns
1691                    .clone()
1692                    .map(|columns| TargetPartitionColumns { columns }),
1693            }),
1694            RepartitionSource::Unpartitioned { partition_columns } => {
1695                Source::Unpartitioned(UnpartitionedSource {
1696                    partition_columns: partition_columns.clone(),
1697                })
1698            }
1699        };
1700        let repartition = Repartition {
1701            into_partition_exprs: into_partition_exprs_json,
1702            source: Some(source),
1703            ..Default::default()
1704        };
1705        let mut req = SubmitDdlTaskRequest::new(
1706            to_meta_query_context(query_context.clone()),
1707            DdlTask::new_alter_table(AlterTableExpr {
1708                catalog_name: request.catalog_name.clone(),
1709                schema_name: request.schema_name.clone(),
1710                table_name: request.table_name.clone(),
1711                kind: Some(Kind::Repartition(repartition)),
1712            }),
1713        );
1714        req.wait = ddl_options.wait;
1715        req.timeout = ddl_options.timeout;
1716
1717        info!(
1718            "Submitting repartition task for table {} (table_id={}), from {} to {} partitions, timeout: {:?}, wait: {}",
1719            table_ref,
1720            table_id,
1721            from_partition_exprs_len,
1722            new_partition_exprs_len,
1723            ddl_options.timeout,
1724            ddl_options.wait
1725        );
1726
1727        let response = self
1728            .procedure_executor
1729            .submit_ddl_task(&ExecutorContext::default(), req)
1730            .await
1731            .context(error::ExecuteDdlSnafu)?;
1732
1733        if !ddl_options.wait {
1734            return build_procedure_id_output(response.key);
1735        }
1736
1737        // Only invalidate cache if wait is true.
1738        let invalidate_keys = vec![
1739            CacheIdent::TableId(table_id),
1740            CacheIdent::TableName(TableName::new(
1741                request.catalog_name,
1742                request.schema_name,
1743                request.table_name,
1744            )),
1745        ];
1746
1747        // Invalidates local cache ASAP.
1748        self.cache_invalidator
1749            .invalidate(&Context::default(), &invalidate_keys)
1750            .await
1751            .context(error::InvalidateTableCacheSnafu)?;
1752
1753        Ok(Output::new_with_affected_rows(0))
1754    }
1755
1756    #[tracing::instrument(skip_all)]
1757    pub async fn alter_table_inner(
1758        &self,
1759        expr: AlterTableExpr,
1760        query_context: QueryContextRef,
1761    ) -> Result<Output> {
1762        ensure!(
1763            !is_readonly_schema(&expr.schema_name),
1764            SchemaReadOnlySnafu {
1765                name: expr.schema_name.clone()
1766            }
1767        );
1768
1769        let catalog_name = if expr.catalog_name.is_empty() {
1770            DEFAULT_CATALOG_NAME.to_string()
1771        } else {
1772            expr.catalog_name.clone()
1773        };
1774
1775        let schema_name = if expr.schema_name.is_empty() {
1776            DEFAULT_SCHEMA_NAME.to_string()
1777        } else {
1778            expr.schema_name.clone()
1779        };
1780
1781        let table_name = expr.table_name.clone();
1782
1783        let table = self
1784            .catalog_manager
1785            .table(
1786                &catalog_name,
1787                &schema_name,
1788                &table_name,
1789                Some(&query_context),
1790            )
1791            .await
1792            .context(CatalogSnafu)?
1793            .with_context(|| TableNotFoundSnafu {
1794                table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
1795            })?;
1796
1797        let table_id = table.table_info().ident.table_id;
1798        let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?;
1799        if !need_alter {
1800            return Ok(Output::new_with_affected_rows(0));
1801        }
1802        info!(
1803            "Table info before alter is {:?}, expr: {:?}",
1804            table.table_info(),
1805            expr
1806        );
1807
1808        let physical_table_id = self
1809            .table_metadata_manager
1810            .table_route_manager()
1811            .get_physical_table_id(table_id)
1812            .await
1813            .context(TableMetadataManagerSnafu)?;
1814
1815        let (req, invalidate_keys) = if physical_table_id == table_id {
1816            // This is physical table
1817            let req = SubmitDdlTaskRequest::new(
1818                to_meta_query_context(query_context),
1819                DdlTask::new_alter_table(expr),
1820            );
1821
1822            let invalidate_keys = vec![
1823                CacheIdent::TableId(table_id),
1824                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1825            ];
1826
1827            (req, invalidate_keys)
1828        } else {
1829            // This is logical table
1830            let req = SubmitDdlTaskRequest::new(
1831                to_meta_query_context(query_context),
1832                DdlTask::new_alter_logical_tables(vec![expr]),
1833            );
1834
1835            let mut invalidate_keys = vec![
1836                CacheIdent::TableId(physical_table_id),
1837                CacheIdent::TableId(table_id),
1838                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1839            ];
1840
1841            let physical_table = self
1842                .table_metadata_manager
1843                .table_info_manager()
1844                .get(physical_table_id)
1845                .await
1846                .context(TableMetadataManagerSnafu)?
1847                .map(|x| x.into_inner());
1848            if let Some(physical_table) = physical_table {
1849                let physical_table_name = TableName::new(
1850                    physical_table.table_info.catalog_name,
1851                    physical_table.table_info.schema_name,
1852                    physical_table.table_info.name,
1853                );
1854                invalidate_keys.push(CacheIdent::TableName(physical_table_name));
1855            }
1856
1857            (req, invalidate_keys)
1858        };
1859
1860        self.procedure_executor
1861            .submit_ddl_task(&ExecutorContext::default(), req)
1862            .await
1863            .context(error::ExecuteDdlSnafu)?;
1864
1865        // Invalidates local cache ASAP.
1866        self.cache_invalidator
1867            .invalidate(&Context::default(), &invalidate_keys)
1868            .await
1869            .context(error::InvalidateTableCacheSnafu)?;
1870
1871        Ok(Output::new_with_affected_rows(0))
1872    }
1873
1874    #[cfg(feature = "enterprise")]
1875    #[tracing::instrument(skip_all)]
1876    pub async fn alter_trigger(
1877        &self,
1878        _alter_expr: AlterTrigger,
1879        _query_context: QueryContextRef,
1880    ) -> Result<Output> {
1881        crate::error::NotSupportedSnafu {
1882            feat: "alter trigger",
1883        }
1884        .fail()
1885    }
1886
1887    #[tracing::instrument(skip_all)]
1888    pub async fn alter_database(
1889        &self,
1890        alter_expr: AlterDatabase,
1891        query_context: QueryContextRef,
1892    ) -> Result<Output> {
1893        let alter_expr = expr_helper::to_alter_database_expr(alter_expr, &query_context)?;
1894        self.alter_database_inner(alter_expr, query_context).await
1895    }
1896
1897    #[tracing::instrument(skip_all)]
1898    pub async fn alter_database_inner(
1899        &self,
1900        alter_expr: AlterDatabaseExpr,
1901        query_context: QueryContextRef,
1902    ) -> Result<Output> {
1903        ensure!(
1904            !is_readonly_schema(&alter_expr.schema_name),
1905            SchemaReadOnlySnafu {
1906                name: query_context.current_schema().clone()
1907            }
1908        );
1909
1910        let exists = self
1911            .catalog_manager
1912            .schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
1913            .await
1914            .context(CatalogSnafu)?;
1915        ensure!(
1916            exists,
1917            SchemaNotFoundSnafu {
1918                schema_info: alter_expr.schema_name,
1919            }
1920        );
1921
1922        let cache_ident = [CacheIdent::SchemaName(SchemaName {
1923            catalog_name: alter_expr.catalog_name.clone(),
1924            schema_name: alter_expr.schema_name.clone(),
1925        })];
1926
1927        self.alter_database_procedure(alter_expr, query_context)
1928            .await?;
1929
1930        // Invalidates local cache ASAP.
1931        self.cache_invalidator
1932            .invalidate(&Context::default(), &cache_ident)
1933            .await
1934            .context(error::InvalidateTableCacheSnafu)?;
1935
1936        Ok(Output::new_with_affected_rows(0))
1937    }
1938
1939    async fn create_table_procedure(
1940        &self,
1941        create_table: CreateTableExpr,
1942        partitions: Vec<PartitionExpr>,
1943        table_info: TableInfo,
1944        query_context: QueryContextRef,
1945    ) -> Result<SubmitDdlTaskResponse> {
1946        let partitions = partitions
1947            .into_iter()
1948            .map(|expr| expr.as_pb_partition().context(PartitionExprToPbSnafu))
1949            .collect::<Result<Vec<_>>>()?;
1950
1951        let request = SubmitDdlTaskRequest::new(
1952            to_meta_query_context_with_origin_frontend(query_context, &self.origin_frontend_addr),
1953            DdlTask::new_create_table(create_table, partitions, table_info),
1954        );
1955
1956        self.procedure_executor
1957            .submit_ddl_task(&ExecutorContext::default(), request)
1958            .await
1959            .context(error::ExecuteDdlSnafu)
1960    }
1961
1962    async fn create_logical_tables_procedure(
1963        &self,
1964        tables_data: Vec<(CreateTableExpr, TableInfo)>,
1965        query_context: QueryContextRef,
1966    ) -> Result<SubmitDdlTaskResponse> {
1967        let request = SubmitDdlTaskRequest::new(
1968            to_meta_query_context_with_origin_frontend(query_context, &self.origin_frontend_addr),
1969            DdlTask::new_create_logical_tables(tables_data),
1970        );
1971
1972        self.procedure_executor
1973            .submit_ddl_task(&ExecutorContext::default(), request)
1974            .await
1975            .context(error::ExecuteDdlSnafu)
1976    }
1977
1978    async fn alter_logical_tables_procedure(
1979        &self,
1980        tables_data: Vec<AlterTableExpr>,
1981        query_context: QueryContextRef,
1982    ) -> Result<SubmitDdlTaskResponse> {
1983        let request = SubmitDdlTaskRequest::new(
1984            to_meta_query_context(query_context),
1985            DdlTask::new_alter_logical_tables(tables_data),
1986        );
1987
1988        self.procedure_executor
1989            .submit_ddl_task(&ExecutorContext::default(), request)
1990            .await
1991            .context(error::ExecuteDdlSnafu)
1992    }
1993
1994    async fn drop_table_procedure(
1995        &self,
1996        table_name: &TableName,
1997        table_id: TableId,
1998        drop_if_exists: bool,
1999        query_context: QueryContextRef,
2000    ) -> Result<SubmitDdlTaskResponse> {
2001        let request = SubmitDdlTaskRequest::new(
2002            to_meta_query_context(query_context),
2003            DdlTask::new_drop_table(
2004                table_name.catalog_name.clone(),
2005                table_name.schema_name.clone(),
2006                table_name.table_name.clone(),
2007                table_id,
2008                drop_if_exists,
2009            ),
2010        );
2011
2012        self.procedure_executor
2013            .submit_ddl_task(&ExecutorContext::default(), request)
2014            .await
2015            .context(error::ExecuteDdlSnafu)
2016    }
2017
2018    async fn drop_database_procedure(
2019        &self,
2020        catalog: String,
2021        schema: String,
2022        drop_if_exists: bool,
2023        query_context: QueryContextRef,
2024    ) -> Result<SubmitDdlTaskResponse> {
2025        let request = SubmitDdlTaskRequest::new(
2026            to_meta_query_context(query_context),
2027            DdlTask::new_drop_database(catalog, schema, drop_if_exists),
2028        );
2029
2030        self.procedure_executor
2031            .submit_ddl_task(&ExecutorContext::default(), request)
2032            .await
2033            .context(error::ExecuteDdlSnafu)
2034    }
2035
2036    async fn alter_database_procedure(
2037        &self,
2038        alter_expr: AlterDatabaseExpr,
2039        query_context: QueryContextRef,
2040    ) -> Result<SubmitDdlTaskResponse> {
2041        let request = SubmitDdlTaskRequest::new(
2042            to_meta_query_context(query_context),
2043            DdlTask::new_alter_database(alter_expr),
2044        );
2045
2046        self.procedure_executor
2047            .submit_ddl_task(&ExecutorContext::default(), request)
2048            .await
2049            .context(error::ExecuteDdlSnafu)
2050    }
2051
2052    async fn truncate_table_procedure(
2053        &self,
2054        table_name: &TableName,
2055        table_id: TableId,
2056        time_ranges: Vec<(Timestamp, Timestamp)>,
2057        query_context: QueryContextRef,
2058    ) -> Result<SubmitDdlTaskResponse> {
2059        let request = SubmitDdlTaskRequest::new(
2060            to_meta_query_context(query_context),
2061            DdlTask::new_truncate_table(
2062                table_name.catalog_name.clone(),
2063                table_name.schema_name.clone(),
2064                table_name.table_name.clone(),
2065                table_id,
2066                time_ranges,
2067            ),
2068        );
2069
2070        self.procedure_executor
2071            .submit_ddl_task(&ExecutorContext::default(), request)
2072            .await
2073            .context(error::ExecuteDdlSnafu)
2074    }
2075
2076    #[tracing::instrument(skip_all)]
2077    pub async fn create_database(
2078        &self,
2079        database: &str,
2080        create_if_not_exists: bool,
2081        options: HashMap<String, String>,
2082        query_context: QueryContextRef,
2083    ) -> Result<Output> {
2084        let catalog = query_context.current_catalog();
2085        ensure!(
2086            NAME_PATTERN_REG.is_match(catalog),
2087            error::UnexpectedSnafu {
2088                violated: format!("Invalid catalog name: {}", catalog)
2089            }
2090        );
2091
2092        ensure!(
2093            NAME_PATTERN_REG.is_match(database),
2094            error::UnexpectedSnafu {
2095                violated: format!("Invalid database name: {}", database)
2096            }
2097        );
2098
2099        if !self
2100            .catalog_manager
2101            .schema_exists(catalog, database, None)
2102            .await
2103            .context(CatalogSnafu)?
2104            && !self.catalog_manager.is_reserved_schema_name(database)
2105        {
2106            self.create_database_procedure(
2107                catalog.to_string(),
2108                database.to_string(),
2109                create_if_not_exists,
2110                options,
2111                query_context,
2112            )
2113            .await?;
2114
2115            Ok(Output::new_with_affected_rows(1))
2116        } else if create_if_not_exists {
2117            Ok(Output::new_with_affected_rows(1))
2118        } else {
2119            error::SchemaExistsSnafu { name: database }.fail()
2120        }
2121    }
2122
2123    async fn create_database_procedure(
2124        &self,
2125        catalog: String,
2126        database: String,
2127        create_if_not_exists: bool,
2128        options: HashMap<String, String>,
2129        query_context: QueryContextRef,
2130    ) -> Result<SubmitDdlTaskResponse> {
2131        let request = SubmitDdlTaskRequest::new(
2132            to_meta_query_context(query_context),
2133            DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
2134        );
2135
2136        self.procedure_executor
2137            .submit_ddl_task(&ExecutorContext::default(), request)
2138            .await
2139            .context(error::ExecuteDdlSnafu)
2140    }
2141}
2142
2143/// Parse partition statement [Partitions] into [PartitionExpr] and partition columns.
2144pub fn parse_partitions(
2145    create_table: &CreateTableExpr,
2146    partitions: Option<Partitions>,
2147    query_ctx: &QueryContextRef,
2148) -> Result<(Vec<PartitionExpr>, Vec<String>)> {
2149    // If partitions are not defined by user, use the timestamp column (which has to be existed) as
2150    // the partition column, and create only one partition.
2151    let partition_columns = find_partition_columns(&partitions)?;
2152    let partition_exprs =
2153        find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
2154
2155    // Validates partition
2156    let exprs = partition_exprs.clone();
2157    MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true)
2158        .context(InvalidPartitionSnafu)?;
2159
2160    Ok((partition_exprs, partition_columns))
2161}
2162
2163fn parse_partitions_for_logical_validation(
2164    create_table: &CreateTableExpr,
2165    partitions: &Partitions,
2166    query_ctx: &QueryContextRef,
2167) -> Result<(Vec<String>, Vec<PartitionExpr>)> {
2168    let partition_columns = partitions
2169        .column_list
2170        .iter()
2171        .map(|ident| ident.value.clone())
2172        .collect::<Vec<_>>();
2173
2174    let column_name_and_type = partition_columns
2175        .iter()
2176        .map(|pc| {
2177            let column = create_table
2178                .column_defs
2179                .iter()
2180                .find(|c| &c.name == pc)
2181                .context(ColumnNotFoundSnafu { msg: pc.clone() })?;
2182            let column_name = &column.name;
2183            let data_type = ConcreteDataType::from(
2184                ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
2185                    .context(ColumnDataTypeSnafu)?,
2186            );
2187            Ok((column_name, data_type))
2188        })
2189        .collect::<Result<HashMap<_, _>>>()?;
2190
2191    let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
2192    for expr in &partitions.exprs {
2193        let partition_expr = convert_one_expr(expr, &column_name_and_type, &query_ctx.timezone())?;
2194        partition_exprs.push(partition_expr);
2195    }
2196
2197    MultiDimPartitionRule::try_new(
2198        partition_columns.clone(),
2199        vec![],
2200        partition_exprs.clone(),
2201        true,
2202    )
2203    .context(InvalidPartitionSnafu)?;
2204
2205    Ok((partition_columns, partition_exprs))
2206}
2207
2208/// Verifies an alter and returns whether it is necessary to perform the alter.
2209///
2210/// # Returns
2211///
2212/// Returns true if the alter need to be porformed; otherwise, it returns false.
2213pub fn verify_alter(
2214    table_id: TableId,
2215    table_info: Arc<TableInfo>,
2216    expr: AlterTableExpr,
2217) -> Result<bool> {
2218    let request: AlterTableRequest =
2219        common_grpc_expr::alter_expr_to_request(table_id, expr, Some(&table_info.meta))
2220            .context(AlterExprToRequestSnafu)?;
2221
2222    let AlterTableRequest {
2223        table_name,
2224        alter_kind,
2225        ..
2226    } = &request;
2227
2228    if let AlterKind::RenameTable { new_table_name } = alter_kind {
2229        ensure!(
2230            NAME_PATTERN_REG.is_match(new_table_name),
2231            error::UnexpectedSnafu {
2232                violated: format!("Invalid table name: {}", new_table_name)
2233            }
2234        );
2235    } else if let AlterKind::AddColumns { columns } = alter_kind {
2236        // If all the columns are marked as add_if_not_exists and they already exist in the table,
2237        // there is no need to perform the alter.
2238        let column_names: HashSet<_> = table_info
2239            .meta
2240            .schema
2241            .column_schemas()
2242            .iter()
2243            .map(|schema| &schema.name)
2244            .collect();
2245        if columns.iter().all(|column| {
2246            column_names.contains(&column.column_schema.name) && column.add_if_not_exists
2247        }) {
2248            return Ok(false);
2249        }
2250    }
2251
2252    let _ = table_info
2253        .meta
2254        .builder_with_alter_kind(table_name, &request.alter_kind)
2255        .context(error::TableSnafu)?
2256        .build()
2257        .context(error::BuildTableMetaSnafu { table_name })?;
2258
2259    Ok(true)
2260}
2261
2262pub fn create_table_info(
2263    create_table: &CreateTableExpr,
2264    partition_columns: Vec<String>,
2265) -> Result<TableInfo> {
2266    let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
2267    let mut column_name_to_index_map = HashMap::new();
2268
2269    for (idx, column) in create_table.column_defs.iter().enumerate() {
2270        let schema =
2271            column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
2272                column: &column.name,
2273            })?;
2274        let schema = schema.with_time_index(column.name == create_table.time_index);
2275
2276        column_schemas.push(schema);
2277        let _ = column_name_to_index_map.insert(column.name.clone(), idx);
2278    }
2279
2280    let next_column_id = column_schemas.len() as u32;
2281    let schema = Arc::new(Schema::new(column_schemas));
2282
2283    let primary_key_indices = create_table
2284        .primary_keys
2285        .iter()
2286        .map(|name| {
2287            column_name_to_index_map
2288                .get(name)
2289                .cloned()
2290                .context(ColumnNotFoundSnafu { msg: name })
2291        })
2292        .collect::<Result<Vec<_>>>()?;
2293
2294    let partition_key_indices = partition_columns
2295        .into_iter()
2296        .map(|col_name| {
2297            column_name_to_index_map
2298                .get(&col_name)
2299                .cloned()
2300                .context(ColumnNotFoundSnafu { msg: col_name })
2301        })
2302        .collect::<Result<Vec<_>>>()?;
2303
2304    let mut table_options = TableOptions::try_from_iter(&create_table.table_options)
2305        .context(UnrecognizedTableOptionSnafu)?;
2306
2307    validate_repartition_column_hint(
2308        &mut table_options,
2309        &column_name_to_index_map,
2310        &partition_key_indices,
2311        &create_table.time_index,
2312    )?;
2313
2314    let meta = TableMeta {
2315        schema,
2316        primary_key_indices,
2317        value_indices: vec![],
2318        engine: create_table.engine.clone(),
2319        next_column_id,
2320        options: table_options,
2321        created_on: Utc::now(),
2322        updated_on: Utc::now(),
2323        partition_key_indices,
2324        column_ids: vec![],
2325    };
2326
2327    let desc = if create_table.desc.is_empty() {
2328        create_table.table_options.get(COMMENT_KEY).cloned()
2329    } else {
2330        Some(create_table.desc.clone())
2331    };
2332
2333    let table_info = TableInfo {
2334        ident: metadata::TableIdent {
2335            // The table id of distributed table is assigned by Meta, set "0" here as a placeholder.
2336            table_id: 0,
2337            version: 0,
2338        },
2339        name: create_table.table_name.clone(),
2340        desc,
2341        catalog_name: create_table.catalog_name.clone(),
2342        schema_name: create_table.schema_name.clone(),
2343        meta,
2344        table_type: TableType::Base,
2345    };
2346    Ok(table_info)
2347}
2348
2349fn validate_repartition_column_hint(
2350    table_options: &mut TableOptions,
2351    column_name_to_index_map: &HashMap<String, usize>,
2352    partition_key_indices: &[usize],
2353    time_index: &str,
2354) -> Result<()> {
2355    let Some(column_name) = table_options
2356        .extra_options
2357        .get(REPARTITION_COLUMN_HINT_KEY)
2358        .map(|value| value.trim().to_string())
2359    else {
2360        return Ok(());
2361    };
2362
2363    ensure!(
2364        !column_name.is_empty(),
2365        InvalidPartitionRuleSnafu {
2366            reason: format!("{REPARTITION_COLUMN_HINT_KEY} expects exactly one column name"),
2367        }
2368    );
2369
2370    ensure!(
2371        !column_name.contains(','),
2372        InvalidPartitionRuleSnafu {
2373            reason: format!("{REPARTITION_COLUMN_HINT_KEY} expects exactly one column name"),
2374        }
2375    );
2376
2377    ensure!(
2378        partition_key_indices.is_empty(),
2379        InvalidPartitionRuleSnafu {
2380            reason: format!(
2381                "cannot set {REPARTITION_COLUMN_HINT_KEY} on a table with partition metadata"
2382            ),
2383        }
2384    );
2385
2386    column_name_to_index_map
2387        .get(&column_name)
2388        .context(ColumnNotFoundSnafu { msg: &column_name })?;
2389
2390    ensure!(
2391        column_name != time_index,
2392        InvalidPartitionRuleSnafu {
2393            reason: format!("cannot set {REPARTITION_COLUMN_HINT_KEY} to the time index column"),
2394        }
2395    );
2396
2397    table_options
2398        .extra_options
2399        .insert(REPARTITION_COLUMN_HINT_KEY.to_string(), column_name);
2400
2401    Ok(())
2402}
2403
2404fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
2405    let columns = if let Some(partitions) = partitions {
2406        partitions
2407            .column_list
2408            .iter()
2409            .map(|x| x.value.clone())
2410            .collect::<Vec<_>>()
2411    } else {
2412        vec![]
2413    };
2414    Ok(columns)
2415}
2416
2417/// Parse [Partitions] into a group of partition entries.
2418///
2419/// Returns a list of [PartitionExpr], each of which defines a partition.
2420fn find_partition_entries(
2421    create_table: &CreateTableExpr,
2422    partitions: &Option<Partitions>,
2423    partition_columns: &[String],
2424    query_ctx: &QueryContextRef,
2425) -> Result<Vec<PartitionExpr>> {
2426    let Some(partitions) = partitions else {
2427        return Ok(vec![]);
2428    };
2429
2430    // extract concrete data type of partition columns
2431    let column_name_and_type = partition_columns
2432        .iter()
2433        .map(|pc| {
2434            let column = create_table
2435                .column_defs
2436                .iter()
2437                .find(|c| &c.name == pc)
2438                // unwrap is safe here because we have checked that partition columns are defined
2439                .unwrap();
2440            let column_name = &column.name;
2441            let data_type = ConcreteDataType::from(
2442                ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
2443                    .context(ColumnDataTypeSnafu)?,
2444            );
2445            Ok((column_name, data_type))
2446        })
2447        .collect::<Result<HashMap<_, _>>>()?;
2448
2449    // Transform parser expr to partition expr
2450    let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
2451    for partition in &partitions.exprs {
2452        let partition_expr =
2453            convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
2454        partition_exprs.push(partition_expr);
2455    }
2456
2457    Ok(partition_exprs)
2458}
2459
2460fn column_name_and_type<'a>(
2461    partition_columns: &'a [&'a ColumnSchema],
2462) -> HashMap<&'a String, ConcreteDataType> {
2463    partition_columns
2464        .iter()
2465        .map(|column| (&column.name, column.data_type.clone()))
2466        .collect()
2467}
2468
2469fn validate_and_collect_partition_columns<'a>(
2470    column_names: &[String],
2471    column_schemas: &'a [ColumnSchema],
2472) -> Result<Vec<&'a ColumnSchema>> {
2473    let mut seen = HashSet::with_capacity(column_names.len());
2474    column_names
2475        .iter()
2476        .map(|column_name| {
2477            ensure!(
2478                seen.insert(column_name),
2479                InvalidPartitionRuleSnafu {
2480                    reason: format!("duplicate partition column '{}'", column_name)
2481                }
2482            );
2483            column_schemas
2484                .iter()
2485                .find(|column| &column.name == column_name)
2486                .with_context(|| ColumnNotFoundSnafu { msg: column_name })
2487        })
2488        .collect()
2489}
2490
2491fn ensure_partition_expr_columns_in_target(
2492    partition_exprs: &[PartitionExpr],
2493    target_partition_columns: &HashSet<&String>,
2494) -> Result<()> {
2495    for expr in partition_exprs {
2496        ensure_partition_operand_columns_in_target(&expr.lhs, target_partition_columns)?;
2497        ensure_partition_operand_columns_in_target(&expr.rhs, target_partition_columns)?;
2498    }
2499
2500    Ok(())
2501}
2502
2503fn ensure_partition_operand_columns_in_target(
2504    operand: &Operand,
2505    target_partition_columns: &HashSet<&String>,
2506) -> Result<()> {
2507    match operand {
2508        Operand::Column(column) => ensure!(
2509            target_partition_columns.contains(column),
2510            InvalidPartitionRuleSnafu {
2511                reason: format!(
2512                    "partition expression references column '{}' that is not in target partition columns",
2513                    column
2514                )
2515            }
2516        ),
2517        Operand::Expr(expr) => {
2518            ensure_partition_operand_columns_in_target(&expr.lhs, target_partition_columns)?;
2519            ensure_partition_operand_columns_in_target(&expr.rhs, target_partition_columns)?;
2520        }
2521        Operand::Value(_) => {}
2522    }
2523
2524    Ok(())
2525}
2526
2527fn convert_one_expr(
2528    expr: &Expr,
2529    column_name_and_type: &HashMap<&String, ConcreteDataType>,
2530    timezone: &Timezone,
2531) -> Result<PartitionExpr> {
2532    let Expr::BinaryOp { left, op, right } = expr else {
2533        return InvalidPartitionRuleSnafu {
2534            reason: "partition rule must be a binary expression",
2535        }
2536        .fail();
2537    };
2538
2539    let op =
2540        RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
2541            reason: format!("unsupported operator in partition expr {op}"),
2542        })?;
2543
2544    // convert leaf node.
2545    let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
2546        // col, val
2547        (Expr::Identifier(ident), Expr::Value(value)) => {
2548            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2549            let value = convert_value(&value.value, data_type, timezone, None)?;
2550            (Operand::Column(column_name), op, Operand::Value(value))
2551        }
2552        (Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
2553            if let Expr::Value(v) = &**expr =>
2554        {
2555            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2556            let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
2557            (Operand::Column(column_name), op, Operand::Value(value))
2558        }
2559        // val, col
2560        (Expr::Value(value), Expr::Identifier(ident)) => {
2561            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2562            let value = convert_value(&value.value, data_type, timezone, None)?;
2563            (Operand::Value(value), op, Operand::Column(column_name))
2564        }
2565        (Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
2566            if let Expr::Value(v) = &**expr =>
2567        {
2568            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2569            let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
2570            (Operand::Value(value), op, Operand::Column(column_name))
2571        }
2572        (Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
2573            // sub-expr must against another sub-expr
2574            let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
2575            let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
2576            (Operand::Expr(lhs), op, Operand::Expr(rhs))
2577        }
2578        _ => {
2579            return InvalidPartitionRuleSnafu {
2580                reason: format!("invalid partition expr {expr}"),
2581            }
2582            .fail();
2583        }
2584    };
2585
2586    Ok(PartitionExpr::new(lhs, op, rhs))
2587}
2588
2589fn convert_identifier(
2590    ident: &Ident,
2591    column_name_and_type: &HashMap<&String, ConcreteDataType>,
2592) -> Result<(String, ConcreteDataType)> {
2593    let column_name = ident.value.clone();
2594    let data_type = column_name_and_type
2595        .get(&column_name)
2596        .cloned()
2597        .with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
2598    Ok((column_name, data_type))
2599}
2600
2601fn convert_value(
2602    value: &ParserValue,
2603    data_type: ConcreteDataType,
2604    timezone: &Timezone,
2605    unary_op: Option<UnaryOperator>,
2606) -> Result<Value> {
2607    sql_value_to_value(
2608        &ColumnSchema::new("<NONAME>", data_type, true),
2609        value,
2610        Some(timezone),
2611        unary_op,
2612        false,
2613    )
2614    .context(error::SqlCommonSnafu)
2615}
2616
2617#[cfg(test)]
2618mod test {
2619    use std::time::Duration;
2620
2621    use session::context::{QueryContext, QueryContextBuilder};
2622    use sql::dialect::GreptimeDbDialect;
2623    use sql::parser::{ParseOptions, ParserContext};
2624    use sql::statements::statement::Statement;
2625    use sqlparser::parser::Parser;
2626
2627    use super::*;
2628    use crate::expr_helper;
2629
2630    #[test]
2631    fn test_parse_ddl_options() {
2632        let options = OptionMap::from([
2633            ("timeout".to_string(), "5m".to_string()),
2634            ("wait".to_string(), "false".to_string()),
2635        ]);
2636        let ddl_options = parse_ddl_options(&options).unwrap();
2637        assert!(!ddl_options.wait);
2638        assert_eq!(Duration::from_secs(300), ddl_options.timeout);
2639    }
2640
2641    #[test]
2642    fn test_validate_and_normalize_flow_options_empty() {
2643        assert!(
2644            validate_and_normalize_flow_options(HashMap::new())
2645                .unwrap()
2646                .is_empty()
2647        );
2648    }
2649
2650    #[test]
2651    fn test_validate_and_normalize_flow_options_valid() {
2652        let options = HashMap::from([
2653            (DEFER_ON_MISSING_SOURCE_KEY.to_string(), "TRUE".to_string()),
2654            (
2655                FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY.to_string(),
2656                "FALSE".to_string(),
2657            ),
2658        ]);
2659
2660        assert_eq!(
2661            validate_and_normalize_flow_options(options).unwrap(),
2662            HashMap::from([
2663                (DEFER_ON_MISSING_SOURCE_KEY.to_string(), "true".to_string(),),
2664                (
2665                    FLOW_EXPERIMENTAL_ENABLE_INCREMENTAL_READ_KEY.to_string(),
2666                    "false".to_string(),
2667                )
2668            ])
2669        );
2670    }
2671
2672    #[test]
2673    fn test_validate_and_normalize_flow_options_unknown_option() {
2674        let err = validate_and_normalize_flow_options(HashMap::from([(
2675            "foo".to_string(),
2676            "bar".to_string(),
2677        )]))
2678        .unwrap_err();
2679
2680        assert!(
2681            err.to_string()
2682                .contains("unknown flow option 'foo', supported options: defer_on_missing_source, experimental_enable_incremental_read")
2683        );
2684    }
2685
2686    #[test]
2687    fn test_validate_and_normalize_flow_options_reserved_option() {
2688        let err = validate_and_normalize_flow_options(HashMap::from([(
2689            FlowType::FLOW_TYPE_KEY.to_string(),
2690            FlowType::BATCHING.to_string(),
2691        )]))
2692        .unwrap_err();
2693
2694        assert!(
2695            err.to_string()
2696                .contains("flow option 'flow_type' is reserved for internal use")
2697        );
2698    }
2699
2700    #[test]
2701    fn test_validate_and_normalize_flow_options_invalid_bool() {
2702        let err = validate_and_normalize_flow_options(HashMap::from([(
2703            DEFER_ON_MISSING_SOURCE_KEY.to_string(),
2704            "not-a-bool".to_string(),
2705        )]))
2706        .unwrap_err();
2707
2708        assert!(
2709            err.to_string()
2710                .contains("invalid flow option 'defer_on_missing_source': 'not-a-bool'")
2711        );
2712    }
2713
2714    #[test]
2715    fn test_validate_and_normalize_flow_options_rejects_redacted_invalid_input() {
2716        let sql = r"
2717CREATE FLOW task_6
2718SINK TO schema_1.table_1
2719WITH (access_key_id = [true])
2720AS
2721SELECT max(c1), min(c2) FROM schema_2.table_2;";
2722        let stmt =
2723            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2724                .unwrap()
2725                .pop()
2726                .unwrap();
2727
2728        let Statement::CreateFlow(create_flow) = stmt else {
2729            unreachable!()
2730        };
2731        let expr =
2732            expr_helper::to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
2733        let err = validate_and_normalize_flow_options(expr.flow_options).unwrap_err();
2734
2735        assert!(err.to_string().contains(
2736            "unknown flow option 'access_key_id', supported options: defer_on_missing_source"
2737        ));
2738    }
2739
2740    #[test]
2741    fn test_determine_flow_type_for_source_state_missing_sources_require_opt_in() {
2742        let err = determine_flow_type_for_source_state("my_flow", &HashMap::new(), true, false)
2743            .unwrap_err();
2744
2745        assert!(err.to_string().contains(
2746            "missing source tables for flow 'my_flow'; use WITH (defer_on_missing_source = true) to create a pending flow"
2747        ));
2748    }
2749
2750    #[test]
2751    fn test_determine_flow_type_for_source_state_missing_sources_prefer_batching() {
2752        let flow_options =
2753            HashMap::from([(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "true".to_string())]);
2754
2755        assert_eq!(
2756            determine_flow_type_for_source_state("my_flow", &flow_options, true, true).unwrap(),
2757            Some(FlowType::Batching)
2758        );
2759    }
2760
2761    #[test]
2762    fn test_determine_flow_type_for_source_state_instant_ttl_without_missing_sources() {
2763        assert_eq!(
2764            determine_flow_type_for_source_state("my_flow", &HashMap::new(), false, true).unwrap(),
2765            Some(FlowType::Streaming)
2766        );
2767    }
2768
2769    #[test]
2770    fn test_name_is_match() {
2771        assert!(!NAME_PATTERN_REG.is_match("/adaf"));
2772        assert!(!NAME_PATTERN_REG.is_match("🈲"));
2773        assert!(NAME_PATTERN_REG.is_match("hello"));
2774        assert!(NAME_PATTERN_REG.is_match("test@"));
2775        assert!(!NAME_PATTERN_REG.is_match("@test"));
2776        assert!(NAME_PATTERN_REG.is_match("test#"));
2777        assert!(!NAME_PATTERN_REG.is_match("#test"));
2778        assert!(!NAME_PATTERN_REG.is_match("@"));
2779        assert!(!NAME_PATTERN_REG.is_match("#"));
2780    }
2781
2782    #[test]
2783    fn test_partition_expr_equivalence_with_swapped_operands() {
2784        let column_name = "device_id".to_string();
2785        let column_name_and_type =
2786            HashMap::from([(&column_name, ConcreteDataType::int32_datatype())]);
2787        let timezone = Timezone::from_tz_string("UTC").unwrap();
2788        let dialect = GreptimeDbDialect {};
2789
2790        let mut parser = Parser::new(&dialect)
2791            .try_with_sql("device_id < 100")
2792            .unwrap();
2793        let expr_left = parser.parse_expr().unwrap();
2794
2795        let mut parser = Parser::new(&dialect)
2796            .try_with_sql("100 > device_id")
2797            .unwrap();
2798        let expr_right = parser.parse_expr().unwrap();
2799
2800        let partition_left =
2801            convert_one_expr(&expr_left, &column_name_and_type, &timezone).unwrap();
2802        let partition_right =
2803            convert_one_expr(&expr_right, &column_name_and_type, &timezone).unwrap();
2804
2805        assert_eq!(partition_left, partition_right);
2806        assert!([partition_left.clone()].contains(&partition_right));
2807
2808        let mut physical_partition_exprs = vec![partition_left];
2809        let mut logical_partition_exprs = vec![partition_right];
2810        physical_partition_exprs.sort_unstable();
2811        logical_partition_exprs.sort_unstable();
2812        assert_eq!(physical_partition_exprs, logical_partition_exprs);
2813    }
2814
2815    #[test]
2816    fn test_repartition_target_partition_columns_are_overwrite_context() {
2817        let device_id = ColumnSchema::new("device_id", ConcreteDataType::int32_datatype(), true);
2818        let area = ColumnSchema::new("area", ConcreteDataType::string_datatype(), true);
2819        let existing_partition_columns = vec![&device_id];
2820        let target_partition_columns = vec![&device_id, &area];
2821        let existing_column_name_and_type = column_name_and_type(&existing_partition_columns);
2822        let target_column_name_and_type = column_name_and_type(&target_partition_columns);
2823        let timezone = Timezone::from_tz_string("UTC").unwrap();
2824        let dialect = GreptimeDbDialect {};
2825
2826        let mut parser = Parser::new(&dialect)
2827            .try_with_sql("device_id < 100 AND area < 'South'")
2828            .unwrap();
2829        let expr = parser.parse_expr().unwrap();
2830
2831        let err = convert_one_expr(&expr, &existing_column_name_and_type, &timezone).unwrap_err();
2832        assert!(err.to_string().contains("area"));
2833
2834        let partition_expr = convert_one_expr(&expr, &target_column_name_and_type, &timezone)
2835            .expect("target columns should overwrite the conversion context");
2836        let partition_expr = partition_expr.to_string();
2837        assert!(partition_expr.contains("device_id"));
2838        assert!(partition_expr.contains("area"));
2839        assert!(partition_expr.contains("South"));
2840    }
2841
2842    #[test]
2843    fn test_repartition_rejects_remaining_expr_outside_target_columns() {
2844        let device_id = "device_id".to_string();
2845        let area = "area".to_string();
2846        let timezone = Timezone::from_tz_string("UTC").unwrap();
2847        let column_name_and_type = HashMap::from([
2848            (&device_id, ConcreteDataType::int32_datatype()),
2849            (&area, ConcreteDataType::string_datatype()),
2850        ]);
2851        let dialect = GreptimeDbDialect {};
2852        let mut parser = Parser::new(&dialect)
2853            .try_with_sql("device_id >= 100")
2854            .unwrap();
2855        let remaining_old_expr = convert_one_expr(
2856            &parser.parse_expr().unwrap(),
2857            &column_name_and_type,
2858            &timezone,
2859        )
2860        .unwrap();
2861        let target_partition_columns = HashSet::from([&area]);
2862
2863        let err = ensure_partition_expr_columns_in_target(
2864            &[remaining_old_expr],
2865            &target_partition_columns,
2866        )
2867        .unwrap_err();
2868
2869        assert!(err.to_string().contains("device_id"));
2870        assert!(err.to_string().contains("target partition columns"));
2871    }
2872
2873    #[test]
2874    fn test_repartition_rejects_duplicate_target_partition_columns() {
2875        let device_id = ColumnSchema::new("device_id", ConcreteDataType::int32_datatype(), true);
2876        let column_schemas = vec![device_id];
2877        let target_partition_columns = vec!["device_id".to_string(), "device_id".to_string()];
2878
2879        let err =
2880            validate_and_collect_partition_columns(&target_partition_columns, &column_schemas)
2881                .unwrap_err();
2882
2883        assert!(err.to_string().contains("duplicate partition column"));
2884        assert!(err.to_string().contains("device_id"));
2885    }
2886
2887    fn create_expr_from_sql(sql: &str) -> CreateTableExpr {
2888        let result =
2889            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2890                .unwrap();
2891
2892        match &result[0] {
2893            Statement::CreateTable(create) => {
2894                expr_helper::create_to_expr(create, &QueryContext::arc()).unwrap()
2895            }
2896            _ => unreachable!(),
2897        }
2898    }
2899
2900    #[test]
2901    fn test_create_table_with_repartition_column_hint() {
2902        let expr = create_expr_from_sql(
2903            r"
2904CREATE TABLE metrics (
2905  host STRING,
2906  ts TIMESTAMP TIME INDEX,
2907  cpu DOUBLE,
2908  PRIMARY KEY(host)
2909)
2910WITH ('repartition.column.hint' = ' host ')",
2911        );
2912
2913        let table_info = create_table_info(&expr, vec![]).unwrap();
2914        assert_eq!(
2915            table_info
2916                .meta
2917                .options
2918                .extra_options
2919                .get(REPARTITION_COLUMN_HINT_KEY),
2920            Some(&"host".to_string())
2921        );
2922    }
2923
2924    #[test]
2925    fn test_create_table_with_empty_repartition_column_hint() {
2926        let expr = create_expr_from_sql(
2927            r"
2928CREATE TABLE metrics (
2929  host STRING,
2930  ts TIMESTAMP TIME INDEX,
2931  cpu DOUBLE,
2932  PRIMARY KEY(host)
2933)
2934WITH ('repartition.column.hint' = '')",
2935        );
2936
2937        let err = create_table_info(&expr, vec![]).unwrap_err();
2938        assert!(
2939            err.to_string()
2940                .contains("repartition.column.hint expects exactly one column name")
2941        );
2942    }
2943
2944    #[test]
2945    fn test_create_table_with_multiple_repartition_column_hints() {
2946        let expr = create_expr_from_sql(
2947            r"
2948CREATE TABLE metrics (
2949  host STRING,
2950  region_id STRING,
2951  ts TIMESTAMP TIME INDEX,
2952  cpu DOUBLE,
2953  PRIMARY KEY(host)
2954)
2955WITH ('repartition.column.hint' = 'host,region_id')",
2956        );
2957
2958        let err = create_table_info(&expr, vec![]).unwrap_err();
2959        assert!(
2960            err.to_string()
2961                .contains("repartition.column.hint expects exactly one column name")
2962        );
2963    }
2964
2965    #[test]
2966    fn test_create_table_with_missing_repartition_column_hint() {
2967        let expr = create_expr_from_sql(
2968            r"
2969CREATE TABLE metrics (
2970  host STRING,
2971  ts TIMESTAMP TIME INDEX,
2972  cpu DOUBLE,
2973  PRIMARY KEY(host)
2974)
2975WITH ('repartition.column.hint' = 'region_id')",
2976        );
2977
2978        let err = create_table_info(&expr, vec![]).unwrap_err();
2979        assert!(
2980            err.to_string()
2981                .contains("Cannot find column by name: region")
2982        );
2983    }
2984
2985    #[test]
2986    fn test_create_table_with_time_index_repartition_column_hint() {
2987        let expr = create_expr_from_sql(
2988            r"
2989CREATE TABLE metrics (
2990  host STRING,
2991  ts TIMESTAMP TIME INDEX,
2992  cpu DOUBLE,
2993  PRIMARY KEY(host)
2994)
2995WITH ('repartition.column.hint' = 'ts')",
2996        );
2997
2998        let err = create_table_info(&expr, vec![]).unwrap_err();
2999        assert!(
3000            err.to_string()
3001                .contains("cannot set repartition.column.hint to the time index column")
3002        );
3003    }
3004
3005    #[test]
3006    fn test_create_partitioned_table_with_repartition_column_hint() {
3007        let expr = create_expr_from_sql(
3008            r"
3009CREATE TABLE metrics (
3010  host STRING,
3011  ts TIMESTAMP TIME INDEX,
3012  cpu DOUBLE,
3013  PRIMARY KEY(host)
3014)
3015WITH ('repartition.column.hint' = 'host')",
3016        );
3017
3018        let err = create_table_info(&expr, vec!["host".to_string()]).unwrap_err();
3019        assert!(
3020            err.to_string()
3021                .contains("cannot set repartition.column.hint on a table with partition metadata")
3022        );
3023    }
3024
3025    #[tokio::test]
3026    #[ignore = "TODO(ruihang): WIP new partition rule"]
3027    async fn test_parse_partitions() {
3028        common_telemetry::init_default_ut_logging();
3029        let cases = [
3030            (
3031                r"
3032CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
3033PARTITION ON COLUMNS (b) (
3034  b < 'hz',
3035  b >= 'hz' AND b < 'sh',
3036  b >= 'sh'
3037)
3038ENGINE=mito",
3039                r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
3040            ),
3041            (
3042                r"
3043CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
3044PARTITION BY RANGE COLUMNS (b, a) (
3045  PARTITION r0 VALUES LESS THAN ('hz', 10),
3046  b < 'hz' AND a < 10,
3047  b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20,
3048  b >= 'sh' AND a >= 20
3049)
3050ENGINE=mito",
3051                r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
3052            ),
3053        ];
3054        let ctx = QueryContextBuilder::default().build().into();
3055        for (sql, expected) in cases {
3056            let result = ParserContext::create_with_dialect(
3057                sql,
3058                &GreptimeDbDialect {},
3059                ParseOptions::default(),
3060            )
3061            .unwrap();
3062            match &result[0] {
3063                Statement::CreateTable(c) => {
3064                    let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
3065                    let (partitions, _) =
3066                        parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
3067                    let json = serde_json::to_string(&partitions).unwrap();
3068                    assert_eq!(json, expected);
3069                }
3070                _ => unreachable!(),
3071            }
3072        }
3073    }
3074}