operator/statement/
ddl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::helper::ColumnDataTypeWrapper;
20use api::v1::alter_table_expr::Kind;
21use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
22use api::v1::{
23    AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
24    Repartition, column_def,
25};
26#[cfg(feature = "enterprise")]
27use api::v1::{
28    CreateTriggerExpr as PbCreateTriggerExpr, meta::CreateTriggerTask as PbCreateTriggerTask,
29};
30use catalog::CatalogManagerRef;
31use chrono::Utc;
32use common_base::regex_pattern::NAME_PATTERN_REG;
33use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, is_readonly_schema};
34use common_catalog::{format_full_flow_name, format_full_table_name};
35use common_error::ext::BoxedError;
36use common_meta::cache_invalidator::Context;
37use common_meta::ddl::create_flow::FlowType;
38use common_meta::instruction::CacheIdent;
39use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
40use common_meta::procedure_executor::ExecutorContext;
41#[cfg(feature = "enterprise")]
42use common_meta::rpc::ddl::trigger::CreateTriggerTask;
43#[cfg(feature = "enterprise")]
44use common_meta::rpc::ddl::trigger::DropTriggerTask;
45use common_meta::rpc::ddl::{
46    CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
47    SubmitDdlTaskResponse,
48};
49use common_query::Output;
50use common_recordbatch::{RecordBatch, RecordBatches};
51use common_sql::convert::sql_value_to_value;
52use common_telemetry::{debug, info, tracing, warn};
53use common_time::{Timestamp, Timezone};
54use datafusion_common::tree_node::TreeNodeVisitor;
55use datafusion_expr::LogicalPlan;
56use datatypes::prelude::ConcreteDataType;
57use datatypes::schema::{ColumnSchema, Schema};
58use datatypes::value::Value;
59use datatypes::vectors::{StringVector, VectorRef};
60use humantime::parse_duration;
61use partition::expr::{Operand, PartitionExpr, RestrictedOp};
62use partition::multi_dim::MultiDimPartitionRule;
63use query::parser::QueryStatement;
64use query::plan::extract_and_rewrite_full_table_names;
65use query::query_engine::DefaultSerializer;
66use query::sql::create_table_stmt;
67use session::context::QueryContextRef;
68use session::table_name::table_idents_to_full_name;
69use snafu::{OptionExt, ResultExt, ensure};
70use sql::parser::{ParseOptions, ParserContext};
71use sql::parsers::utils::is_tql;
72use sql::statements::OptionMap;
73#[cfg(feature = "enterprise")]
74use sql::statements::alter::trigger::AlterTrigger;
75use sql::statements::alter::{AlterDatabase, AlterTable, AlterTableOperation};
76#[cfg(feature = "enterprise")]
77use sql::statements::create::trigger::CreateTrigger;
78use sql::statements::create::{
79    CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
80};
81use sql::statements::statement::Statement;
82use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
83use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
84use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
85use table::TableRef;
86use table::dist_table::DistTable;
87use table::metadata::{self, TableId, TableInfo, TableMeta, TableType};
88use table::requests::{
89    AlterKind, AlterTableRequest, COMMENT_KEY, DDL_TIMEOUT, DDL_WAIT, TableOptions,
90};
91use table::table_name::TableName;
92use table::table_reference::TableReference;
93
94use crate::error::{
95    self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
96    ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu,
97    DeserializePartitionExprSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
98    FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
99    InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu,
100    PartitionExprToPbSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu,
101    SerializePartitionExprSnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu,
102    TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
103    ViewAlreadyExistsSnafu,
104};
105use crate::expr_helper::{self, RepartitionRequest};
106use crate::statement::StatementExecutor;
107use crate::statement::show::create_partitions_stmt;
108use crate::utils::to_meta_query_context;
109
110#[derive(Debug, Clone, Copy)]
111struct DdlSubmitOptions {
112    wait: bool,
113    timeout: Duration,
114}
115
116fn build_procedure_id_output(procedure_id: Vec<u8>) -> Result<Output> {
117    let procedure_id = String::from_utf8_lossy(&procedure_id).to_string();
118    let vector: VectorRef = Arc::new(StringVector::from(vec![procedure_id]));
119    let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
120        "Procedure ID",
121        vector.data_type(),
122        false,
123    )]));
124    let batch =
125        RecordBatch::new(schema.clone(), vec![vector]).context(error::BuildRecordBatchSnafu)?;
126    let batches =
127        RecordBatches::try_new(schema, vec![batch]).context(error::BuildRecordBatchSnafu)?;
128    Ok(Output::new_with_record_batches(batches))
129}
130
131fn parse_ddl_options(options: &OptionMap) -> Result<DdlSubmitOptions> {
132    let wait = match options.get(DDL_WAIT) {
133        Some(value) => value.parse::<bool>().map_err(|_| {
134            InvalidSqlSnafu {
135                err_msg: format!("invalid DDL option '{DDL_WAIT}': '{value}'"),
136            }
137            .build()
138        })?,
139        None => SubmitDdlTaskRequest::default_wait(),
140    };
141
142    let timeout = match options.get(DDL_TIMEOUT) {
143        Some(value) => parse_duration(value).map_err(|err| {
144            InvalidSqlSnafu {
145                err_msg: format!("invalid DDL option '{DDL_TIMEOUT}': '{value}': {err}"),
146            }
147            .build()
148        })?,
149        None => SubmitDdlTaskRequest::default_timeout(),
150    };
151
152    Ok(DdlSubmitOptions { wait, timeout })
153}
154
155impl StatementExecutor {
156    pub fn catalog_manager(&self) -> CatalogManagerRef {
157        self.catalog_manager.clone()
158    }
159
160    #[tracing::instrument(skip_all)]
161    pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
162        let (catalog, schema, _table) = table_idents_to_full_name(&stmt.name, &ctx)
163            .map_err(BoxedError::new)
164            .context(error::ExternalSnafu)?;
165
166        let schema_options = self
167            .table_metadata_manager
168            .schema_manager()
169            .get(SchemaNameKey {
170                catalog: &catalog,
171                schema: &schema,
172            })
173            .await
174            .context(TableMetadataManagerSnafu)?
175            .map(|v| v.into_inner());
176
177        let create_expr = &mut expr_helper::create_to_expr(&stmt, &ctx)?;
178        // Don't inherit schema-level TTL/compaction options into table options:
179        // TTL is applied during compaction, and `compaction.*` is handled separately.
180        if let Some(schema_options) = schema_options {
181            for (key, value) in schema_options.extra_options.iter() {
182                if key.starts_with("compaction.") {
183                    continue;
184                }
185                create_expr
186                    .table_options
187                    .entry(key.clone())
188                    .or_insert(value.clone());
189            }
190        }
191
192        self.create_table_inner(create_expr, stmt.partitions, ctx)
193            .await
194    }
195
196    #[tracing::instrument(skip_all)]
197    pub async fn create_table_like(
198        &self,
199        stmt: CreateTableLike,
200        ctx: QueryContextRef,
201    ) -> Result<TableRef> {
202        let (catalog, schema, table) = table_idents_to_full_name(&stmt.source_name, &ctx)
203            .map_err(BoxedError::new)
204            .context(error::ExternalSnafu)?;
205        let table_ref = self
206            .catalog_manager
207            .table(&catalog, &schema, &table, Some(&ctx))
208            .await
209            .context(CatalogSnafu)?
210            .context(TableNotFoundSnafu { table_name: &table })?;
211        let partition_info = self
212            .partition_manager
213            .find_physical_partition_info(table_ref.table_info().table_id())
214            .await
215            .context(error::FindTablePartitionRuleSnafu { table_name: table })?;
216
217        // CREATE TABLE LIKE also inherits database level options.
218        let schema_options = self
219            .table_metadata_manager
220            .schema_manager()
221            .get(SchemaNameKey {
222                catalog: &catalog,
223                schema: &schema,
224            })
225            .await
226            .context(TableMetadataManagerSnafu)?
227            .map(|v| v.into_inner());
228
229        let quote_style = ctx.quote_style();
230        let mut create_stmt =
231            create_table_stmt(&table_ref.table_info(), schema_options, quote_style)
232                .context(error::ParseQuerySnafu)?;
233        create_stmt.name = stmt.table_name;
234        create_stmt.if_not_exists = false;
235
236        let table_info = table_ref.table_info();
237        let partitions = create_partitions_stmt(&table_info, &partition_info.partitions)?.and_then(
238            |mut partitions| {
239                if !partitions.column_list.is_empty() {
240                    partitions.set_quote(quote_style);
241                    Some(partitions)
242                } else {
243                    None
244                }
245            },
246        );
247
248        let create_expr = &mut expr_helper::create_to_expr(&create_stmt, &ctx)?;
249        self.create_table_inner(create_expr, partitions, ctx).await
250    }
251
252    #[tracing::instrument(skip_all)]
253    pub async fn create_external_table(
254        &self,
255        create_expr: CreateExternalTable,
256        ctx: QueryContextRef,
257    ) -> Result<TableRef> {
258        let create_expr = &mut expr_helper::create_external_expr(create_expr, &ctx).await?;
259        self.create_table_inner(create_expr, None, ctx).await
260    }
261
262    #[tracing::instrument(skip_all)]
263    pub async fn create_table_inner(
264        &self,
265        create_table: &mut CreateTableExpr,
266        partitions: Option<Partitions>,
267        query_ctx: QueryContextRef,
268    ) -> Result<TableRef> {
269        ensure!(
270            !is_readonly_schema(&create_table.schema_name),
271            SchemaReadOnlySnafu {
272                name: create_table.schema_name.clone()
273            }
274        );
275
276        if create_table.engine == METRIC_ENGINE_NAME
277            && create_table
278                .table_options
279                .contains_key(LOGICAL_TABLE_METADATA_KEY)
280        {
281            if let Some(partitions) = partitions.as_ref()
282                && !partitions.exprs.is_empty()
283            {
284                self.validate_logical_table_partition_rule(create_table, partitions, &query_ctx)
285                    .await?;
286            }
287            // Create logical tables
288            self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
289                .await?
290                .into_iter()
291                .next()
292                .context(error::UnexpectedSnafu {
293                    violated: "expected to create logical tables",
294                })
295        } else {
296            // Create other normal table
297            self.create_non_logic_table(create_table, partitions, query_ctx)
298                .await
299        }
300    }
301
302    #[tracing::instrument(skip_all)]
303    pub async fn create_non_logic_table(
304        &self,
305        create_table: &mut CreateTableExpr,
306        partitions: Option<Partitions>,
307        query_ctx: QueryContextRef,
308    ) -> Result<TableRef> {
309        let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
310
311        // Check if schema exists
312        let schema = self
313            .table_metadata_manager
314            .schema_manager()
315            .get(SchemaNameKey::new(
316                &create_table.catalog_name,
317                &create_table.schema_name,
318            ))
319            .await
320            .context(TableMetadataManagerSnafu)?;
321        ensure!(
322            schema.is_some(),
323            SchemaNotFoundSnafu {
324                schema_info: &create_table.schema_name,
325            }
326        );
327
328        // if table exists.
329        if let Some(table) = self
330            .catalog_manager
331            .table(
332                &create_table.catalog_name,
333                &create_table.schema_name,
334                &create_table.table_name,
335                Some(&query_ctx),
336            )
337            .await
338            .context(CatalogSnafu)?
339        {
340            return if create_table.create_if_not_exists {
341                Ok(table)
342            } else {
343                TableAlreadyExistsSnafu {
344                    table: format_full_table_name(
345                        &create_table.catalog_name,
346                        &create_table.schema_name,
347                        &create_table.table_name,
348                    ),
349                }
350                .fail()
351            };
352        }
353
354        ensure!(
355            NAME_PATTERN_REG.is_match(&create_table.table_name),
356            InvalidTableNameSnafu {
357                table_name: &create_table.table_name,
358            }
359        );
360
361        let table_name = TableName::new(
362            &create_table.catalog_name,
363            &create_table.schema_name,
364            &create_table.table_name,
365        );
366
367        let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
368        let mut table_info = create_table_info(create_table, partition_cols)?;
369
370        let resp = self
371            .create_table_procedure(
372                create_table.clone(),
373                partitions,
374                table_info.clone(),
375                query_ctx,
376            )
377            .await?;
378
379        let table_id = resp
380            .table_ids
381            .into_iter()
382            .next()
383            .context(error::UnexpectedSnafu {
384                violated: "expected table_id",
385            })?;
386        info!("Successfully created table '{table_name}' with table id {table_id}");
387
388        table_info.ident.table_id = table_id;
389
390        let table_info = Arc::new(table_info);
391        create_table.table_id = Some(api::v1::TableId { id: table_id });
392
393        let table = DistTable::table(table_info);
394
395        Ok(table)
396    }
397
398    #[tracing::instrument(skip_all)]
399    pub async fn create_logical_tables(
400        &self,
401        create_table_exprs: &[CreateTableExpr],
402        query_context: QueryContextRef,
403    ) -> Result<Vec<TableRef>> {
404        let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
405        ensure!(
406            !create_table_exprs.is_empty(),
407            EmptyDdlExprSnafu {
408                name: "create logic tables"
409            }
410        );
411
412        // Check table names
413        for create_table in create_table_exprs {
414            ensure!(
415                NAME_PATTERN_REG.is_match(&create_table.table_name),
416                InvalidTableNameSnafu {
417                    table_name: &create_table.table_name,
418                }
419            );
420        }
421
422        let raw_tables_info = create_table_exprs
423            .iter()
424            .map(|create| create_table_info(create, vec![]))
425            .collect::<Result<Vec<_>>>()?;
426        let tables_data = create_table_exprs
427            .iter()
428            .cloned()
429            .zip(raw_tables_info.iter().cloned())
430            .collect::<Vec<_>>();
431
432        let resp = self
433            .create_logical_tables_procedure(tables_data, query_context.clone())
434            .await?;
435
436        let table_ids = resp.table_ids;
437        ensure!(
438            table_ids.len() == raw_tables_info.len(),
439            CreateLogicalTablesSnafu {
440                reason: format!(
441                    "The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}",
442                    raw_tables_info.len(),
443                    table_ids.len()
444                )
445            }
446        );
447        info!("Successfully created logical tables: {:?}", table_ids);
448
449        // Reacquire table infos from catalog so logical tables inherit the latest partition
450        // metadata (e.g. partition_key_indices) from their physical tables.
451        // And the returned table info also included extra partition columns that are in physical table but not in logical table's create table expr
452        let mut tables_info = Vec::with_capacity(table_ids.len());
453        for (table_id, create_table) in table_ids.iter().zip(create_table_exprs.iter()) {
454            let table = self
455                .catalog_manager
456                .table(
457                    &create_table.catalog_name,
458                    &create_table.schema_name,
459                    &create_table.table_name,
460                    Some(&query_context),
461                )
462                .await
463                .context(CatalogSnafu)?
464                .with_context(|| TableNotFoundSnafu {
465                    table_name: format_full_table_name(
466                        &create_table.catalog_name,
467                        &create_table.schema_name,
468                        &create_table.table_name,
469                    ),
470                })?;
471
472            let table_info = table.table_info();
473            // Safety check: ensure we are returning the table info that matches the newly created table id.
474            ensure!(
475                table_info.table_id() == *table_id,
476                CreateLogicalTablesSnafu {
477                    reason: format!(
478                        "Table id mismatch after creation, expected {}, got {} for table {}",
479                        table_id,
480                        table_info.table_id(),
481                        format_full_table_name(
482                            &create_table.catalog_name,
483                            &create_table.schema_name,
484                            &create_table.table_name
485                        )
486                    )
487                }
488            );
489
490            tables_info.push(table_info);
491        }
492
493        Ok(tables_info.into_iter().map(DistTable::table).collect())
494    }
495
496    async fn validate_logical_table_partition_rule(
497        &self,
498        create_table: &CreateTableExpr,
499        partitions: &Partitions,
500        query_ctx: &QueryContextRef,
501    ) -> Result<()> {
502        let (_, mut logical_partition_exprs) =
503            parse_partitions_for_logical_validation(create_table, partitions, query_ctx)?;
504
505        let physical_table_name = create_table
506            .table_options
507            .get(LOGICAL_TABLE_METADATA_KEY)
508            .with_context(|| CreateLogicalTablesSnafu {
509                reason: format!(
510                    "expect `{LOGICAL_TABLE_METADATA_KEY}` option on creating logical table"
511                ),
512            })?;
513
514        let physical_table = self
515            .catalog_manager
516            .table(
517                &create_table.catalog_name,
518                &create_table.schema_name,
519                physical_table_name,
520                Some(query_ctx),
521            )
522            .await
523            .context(CatalogSnafu)?
524            .context(TableNotFoundSnafu {
525                table_name: physical_table_name.clone(),
526            })?;
527
528        let physical_table_info = physical_table.table_info();
529        let (partition_rule, _) = self
530            .partition_manager
531            .find_table_partition_rule(&physical_table_info)
532            .await
533            .context(error::FindTablePartitionRuleSnafu {
534                table_name: physical_table_name.clone(),
535            })?;
536
537        let multi_dim_rule = partition_rule
538            .as_ref()
539            .as_any()
540            .downcast_ref::<MultiDimPartitionRule>()
541            .context(InvalidPartitionRuleSnafu {
542                reason: "physical table partition rule is not range-based",
543            })?;
544
545        // TODO(ruihang): project physical partition exprs to logical partition column
546        let mut physical_partition_exprs = multi_dim_rule.exprs().to_vec();
547        logical_partition_exprs.sort_unstable();
548        physical_partition_exprs.sort_unstable();
549
550        ensure!(
551            physical_partition_exprs == logical_partition_exprs,
552            InvalidPartitionRuleSnafu {
553                reason: format!(
554                    "logical table partition rule must match the corresponding physical table's\n logical table partition exprs:\t\t {:?}\n physical table partition exprs:\t {:?}",
555                    logical_partition_exprs, physical_partition_exprs
556                ),
557            }
558        );
559
560        Ok(())
561    }
562
563    #[cfg(feature = "enterprise")]
564    #[tracing::instrument(skip_all)]
565    pub async fn create_trigger(
566        &self,
567        stmt: CreateTrigger,
568        query_context: QueryContextRef,
569    ) -> Result<Output> {
570        let expr = expr_helper::to_create_trigger_task_expr(stmt, &query_context)?;
571        self.create_trigger_inner(expr, query_context).await
572    }
573
574    #[cfg(feature = "enterprise")]
575    pub async fn create_trigger_inner(
576        &self,
577        expr: PbCreateTriggerExpr,
578        query_context: QueryContextRef,
579    ) -> Result<Output> {
580        self.create_trigger_procedure(expr, query_context).await?;
581        Ok(Output::new_with_affected_rows(0))
582    }
583
584    #[cfg(feature = "enterprise")]
585    async fn create_trigger_procedure(
586        &self,
587        expr: PbCreateTriggerExpr,
588        query_context: QueryContextRef,
589    ) -> Result<SubmitDdlTaskResponse> {
590        let task = CreateTriggerTask::try_from(PbCreateTriggerTask {
591            create_trigger: Some(expr),
592        })
593        .context(error::InvalidExprSnafu)?;
594
595        let request = SubmitDdlTaskRequest::new(
596            to_meta_query_context(query_context),
597            DdlTask::new_create_trigger(task),
598        );
599
600        self.procedure_executor
601            .submit_ddl_task(&ExecutorContext::default(), request)
602            .await
603            .context(error::ExecuteDdlSnafu)
604    }
605
606    #[tracing::instrument(skip_all)]
607    pub async fn create_flow(
608        &self,
609        stmt: CreateFlow,
610        query_context: QueryContextRef,
611    ) -> Result<Output> {
612        // TODO(ruihang): do some verification
613        let expr = expr_helper::to_create_flow_task_expr(stmt, &query_context)?;
614
615        self.create_flow_inner(expr, query_context).await
616    }
617
618    pub async fn create_flow_inner(
619        &self,
620        expr: CreateFlowExpr,
621        query_context: QueryContextRef,
622    ) -> Result<Output> {
623        self.create_flow_procedure(expr, query_context).await?;
624        Ok(Output::new_with_affected_rows(0))
625    }
626
627    async fn create_flow_procedure(
628        &self,
629        expr: CreateFlowExpr,
630        query_context: QueryContextRef,
631    ) -> Result<SubmitDdlTaskResponse> {
632        let flow_type = self
633            .determine_flow_type(&expr, query_context.clone())
634            .await?;
635        info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
636
637        let expr = {
638            let mut expr = expr;
639            expr.flow_options
640                .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
641            expr
642        };
643
644        let task = CreateFlowTask::try_from(PbCreateFlowTask {
645            create_flow: Some(expr),
646        })
647        .context(error::InvalidExprSnafu)?;
648        let request = SubmitDdlTaskRequest::new(
649            to_meta_query_context(query_context),
650            DdlTask::new_create_flow(task),
651        );
652
653        self.procedure_executor
654            .submit_ddl_task(&ExecutorContext::default(), request)
655            .await
656            .context(error::ExecuteDdlSnafu)
657    }
658
659    /// Determine the flow type based on the SQL query
660    ///
661    /// If it contains aggregation or distinct, then it is a batch flow, otherwise it is a streaming flow
662    async fn determine_flow_type(
663        &self,
664        expr: &CreateFlowExpr,
665        query_ctx: QueryContextRef,
666    ) -> Result<FlowType> {
667        // first check if source table's ttl is instant, if it is, force streaming mode
668        for src_table_name in &expr.source_table_names {
669            let table = self
670                .catalog_manager()
671                .table(
672                    &src_table_name.catalog_name,
673                    &src_table_name.schema_name,
674                    &src_table_name.table_name,
675                    Some(&query_ctx),
676                )
677                .await
678                .map_err(BoxedError::new)
679                .context(ExternalSnafu)?
680                .with_context(|| TableNotFoundSnafu {
681                    table_name: format_full_table_name(
682                        &src_table_name.catalog_name,
683                        &src_table_name.schema_name,
684                        &src_table_name.table_name,
685                    ),
686                })?;
687
688            // instant source table can only be handled by streaming mode
689            if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
690                warn!(
691                    "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
692                    format_full_table_name(
693                        &src_table_name.catalog_name,
694                        &src_table_name.schema_name,
695                        &src_table_name.table_name
696                    ),
697                    expr.flow_name
698                );
699                return Ok(FlowType::Streaming);
700            }
701        }
702
703        let engine = &self.query_engine;
704        let stmts = ParserContext::create_with_dialect(
705            &expr.sql,
706            query_ctx.sql_dialect(),
707            ParseOptions::default(),
708        )
709        .map_err(BoxedError::new)
710        .context(ExternalSnafu)?;
711
712        ensure!(
713            stmts.len() == 1,
714            InvalidSqlSnafu {
715                err_msg: format!("Expect only one statement, found {}", stmts.len())
716            }
717        );
718        let stmt = &stmts[0];
719
720        if is_tql(query_ctx.sql_dialect(), &expr.sql)
721            .map_err(BoxedError::new)
722            .context(ExternalSnafu)?
723        {
724            return Ok(FlowType::Batching);
725        }
726
727        // support tql parse too
728        let plan = match stmt {
729            // prom ql is only supported in batching mode
730            Statement::Tql(_) => return Ok(FlowType::Batching),
731            _ => engine
732                .planner()
733                .plan(&QueryStatement::Sql(stmt.clone()), query_ctx)
734                .await
735                .map_err(BoxedError::new)
736                .context(ExternalSnafu)?,
737        };
738
739        /// Visitor to find aggregation or distinct
740        struct FindAggr {
741            is_aggr: bool,
742        }
743
744        impl TreeNodeVisitor<'_> for FindAggr {
745            type Node = LogicalPlan;
746            fn f_down(
747                &mut self,
748                node: &Self::Node,
749            ) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
750            {
751                match node {
752                    LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
753                        self.is_aggr = true;
754                        return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
755                    }
756                    _ => (),
757                }
758                Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
759            }
760        }
761
762        let mut find_aggr = FindAggr { is_aggr: false };
763
764        plan.visit_with_subqueries(&mut find_aggr)
765            .context(BuildDfLogicalPlanSnafu)?;
766        if find_aggr.is_aggr {
767            Ok(FlowType::Batching)
768        } else {
769            Ok(FlowType::Streaming)
770        }
771    }
772
773    #[tracing::instrument(skip_all)]
774    pub async fn create_view(
775        &self,
776        create_view: CreateView,
777        ctx: QueryContextRef,
778    ) -> Result<TableRef> {
779        // convert input into logical plan
780        let logical_plan = match &*create_view.query {
781            Statement::Query(query) => {
782                self.plan(
783                    &QueryStatement::Sql(Statement::Query(query.clone())),
784                    ctx.clone(),
785                )
786                .await?
787            }
788            Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?,
789            _ => {
790                return InvalidViewStmtSnafu {}.fail();
791            }
792        };
793        // Save the definition for `show create view`.
794        let definition = create_view.to_string();
795
796        // Save the columns in plan, it may changed when the schemas of tables in plan
797        // are altered.
798        let schema: Schema = logical_plan
799            .schema()
800            .clone()
801            .try_into()
802            .context(ConvertSchemaSnafu)?;
803        let plan_columns: Vec<_> = schema
804            .column_schemas()
805            .iter()
806            .map(|c| c.name.clone())
807            .collect();
808
809        let columns: Vec<_> = create_view
810            .columns
811            .iter()
812            .map(|ident| ident.to_string())
813            .collect();
814
815        // Validate columns
816        if !columns.is_empty() {
817            ensure!(
818                columns.len() == plan_columns.len(),
819                error::ViewColumnsMismatchSnafu {
820                    view_name: create_view.name.to_string(),
821                    expected: plan_columns.len(),
822                    actual: columns.len(),
823                }
824            );
825        }
826
827        // Extract the table names from the original plan
828        // and rewrite them as fully qualified names.
829        let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
830            .context(ExtractTableNamesSnafu)?;
831
832        let table_names = table_names.into_iter().map(|t| t.into()).collect();
833
834        // TODO(dennis): we don't save the optimized plan yet,
835        // because there are some serialization issue with our own defined plan node (such as `MergeScanLogicalPlan`).
836        // When the issues are fixed, we can use the `optimized_plan` instead.
837        // let optimized_plan = self.optimize_logical_plan(logical_plan)?.unwrap_df_plan();
838
839        // encode logical plan
840        let encoded_plan = DFLogicalSubstraitConvertor
841            .encode(&plan, DefaultSerializer)
842            .context(SubstraitCodecSnafu)?;
843
844        let expr = expr_helper::to_create_view_expr(
845            create_view,
846            encoded_plan.to_vec(),
847            table_names,
848            columns,
849            plan_columns,
850            definition,
851            ctx.clone(),
852        )?;
853
854        // TODO(dennis): validate the logical plan
855        self.create_view_by_expr(expr, ctx).await
856    }
857
858    pub async fn create_view_by_expr(
859        &self,
860        expr: CreateViewExpr,
861        ctx: QueryContextRef,
862    ) -> Result<TableRef> {
863        ensure! {
864            !(expr.create_if_not_exists & expr.or_replace),
865            InvalidSqlSnafu {
866                err_msg: "syntax error Create Or Replace and If Not Exist cannot be used together",
867            }
868        };
869        let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
870
871        let schema_exists = self
872            .table_metadata_manager
873            .schema_manager()
874            .exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name))
875            .await
876            .context(TableMetadataManagerSnafu)?;
877
878        ensure!(
879            schema_exists,
880            SchemaNotFoundSnafu {
881                schema_info: &expr.schema_name,
882            }
883        );
884
885        // if view or table exists.
886        if let Some(table) = self
887            .catalog_manager
888            .table(
889                &expr.catalog_name,
890                &expr.schema_name,
891                &expr.view_name,
892                Some(&ctx),
893            )
894            .await
895            .context(CatalogSnafu)?
896        {
897            let table_type = table.table_info().table_type;
898
899            match (table_type, expr.create_if_not_exists, expr.or_replace) {
900                (TableType::View, true, false) => {
901                    return Ok(table);
902                }
903                (TableType::View, false, false) => {
904                    return ViewAlreadyExistsSnafu {
905                        name: format_full_table_name(
906                            &expr.catalog_name,
907                            &expr.schema_name,
908                            &expr.view_name,
909                        ),
910                    }
911                    .fail();
912                }
913                (TableType::View, _, true) => {
914                    // Try to replace an exists view
915                }
916                _ => {
917                    return TableAlreadyExistsSnafu {
918                        table: format_full_table_name(
919                            &expr.catalog_name,
920                            &expr.schema_name,
921                            &expr.view_name,
922                        ),
923                    }
924                    .fail();
925                }
926            }
927        }
928
929        ensure!(
930            NAME_PATTERN_REG.is_match(&expr.view_name),
931            InvalidViewNameSnafu {
932                name: expr.view_name.clone(),
933            }
934        );
935
936        let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
937
938        let mut view_info = TableInfo {
939            ident: metadata::TableIdent {
940                // The view id of distributed table is assigned by Meta, set "0" here as a placeholder.
941                table_id: 0,
942                version: 0,
943            },
944            name: expr.view_name.clone(),
945            desc: None,
946            catalog_name: expr.catalog_name.clone(),
947            schema_name: expr.schema_name.clone(),
948            // The meta doesn't make sense for views, so using a default one.
949            meta: TableMeta::empty(),
950            table_type: TableType::View,
951        };
952
953        let request = SubmitDdlTaskRequest::new(
954            to_meta_query_context(ctx),
955            DdlTask::new_create_view(expr, view_info.clone()),
956        );
957
958        let resp = self
959            .procedure_executor
960            .submit_ddl_task(&ExecutorContext::default(), request)
961            .await
962            .context(error::ExecuteDdlSnafu)?;
963
964        debug!(
965            "Submit creating view '{view_name}' task response: {:?}",
966            resp
967        );
968
969        let view_id = resp
970            .table_ids
971            .into_iter()
972            .next()
973            .context(error::UnexpectedSnafu {
974                violated: "expected table_id",
975            })?;
976        info!("Successfully created view '{view_name}' with view id {view_id}");
977
978        view_info.ident.table_id = view_id;
979
980        let view_info = Arc::new(view_info);
981
982        let table = DistTable::table(view_info);
983
984        // Invalidates local cache ASAP.
985        self.cache_invalidator
986            .invalidate(
987                &Context::default(),
988                &[
989                    CacheIdent::TableId(view_id),
990                    CacheIdent::TableName(view_name.clone()),
991                ],
992            )
993            .await
994            .context(error::InvalidateTableCacheSnafu)?;
995
996        Ok(table)
997    }
998
999    #[tracing::instrument(skip_all)]
1000    pub async fn drop_flow(
1001        &self,
1002        catalog_name: String,
1003        flow_name: String,
1004        drop_if_exists: bool,
1005        query_context: QueryContextRef,
1006    ) -> Result<Output> {
1007        if let Some(flow) = self
1008            .flow_metadata_manager
1009            .flow_name_manager()
1010            .get(&catalog_name, &flow_name)
1011            .await
1012            .context(error::TableMetadataManagerSnafu)?
1013        {
1014            let flow_id = flow.flow_id();
1015            let task = DropFlowTask {
1016                catalog_name,
1017                flow_name,
1018                flow_id,
1019                drop_if_exists,
1020            };
1021            self.drop_flow_procedure(task, query_context).await?;
1022
1023            Ok(Output::new_with_affected_rows(0))
1024        } else if drop_if_exists {
1025            Ok(Output::new_with_affected_rows(0))
1026        } else {
1027            FlowNotFoundSnafu {
1028                flow_name: format_full_flow_name(&catalog_name, &flow_name),
1029            }
1030            .fail()
1031        }
1032    }
1033
1034    async fn drop_flow_procedure(
1035        &self,
1036        expr: DropFlowTask,
1037        query_context: QueryContextRef,
1038    ) -> Result<SubmitDdlTaskResponse> {
1039        let request = SubmitDdlTaskRequest::new(
1040            to_meta_query_context(query_context),
1041            DdlTask::new_drop_flow(expr),
1042        );
1043
1044        self.procedure_executor
1045            .submit_ddl_task(&ExecutorContext::default(), request)
1046            .await
1047            .context(error::ExecuteDdlSnafu)
1048    }
1049
1050    #[cfg(feature = "enterprise")]
1051    #[tracing::instrument(skip_all)]
1052    pub(super) async fn drop_trigger(
1053        &self,
1054        catalog_name: String,
1055        trigger_name: String,
1056        drop_if_exists: bool,
1057        query_context: QueryContextRef,
1058    ) -> Result<Output> {
1059        let task = DropTriggerTask {
1060            catalog_name,
1061            trigger_name,
1062            drop_if_exists,
1063        };
1064        self.drop_trigger_procedure(task, query_context).await?;
1065        Ok(Output::new_with_affected_rows(0))
1066    }
1067
1068    #[cfg(feature = "enterprise")]
1069    async fn drop_trigger_procedure(
1070        &self,
1071        expr: DropTriggerTask,
1072        query_context: QueryContextRef,
1073    ) -> Result<SubmitDdlTaskResponse> {
1074        let request = SubmitDdlTaskRequest::new(
1075            to_meta_query_context(query_context),
1076            DdlTask::new_drop_trigger(expr),
1077        );
1078
1079        self.procedure_executor
1080            .submit_ddl_task(&ExecutorContext::default(), request)
1081            .await
1082            .context(error::ExecuteDdlSnafu)
1083    }
1084
1085    /// Drop a view
1086    #[tracing::instrument(skip_all)]
1087    pub(crate) async fn drop_view(
1088        &self,
1089        catalog: String,
1090        schema: String,
1091        view: String,
1092        drop_if_exists: bool,
1093        query_context: QueryContextRef,
1094    ) -> Result<Output> {
1095        let view_info = if let Some(view) = self
1096            .catalog_manager
1097            .table(&catalog, &schema, &view, None)
1098            .await
1099            .context(CatalogSnafu)?
1100        {
1101            view.table_info()
1102        } else if drop_if_exists {
1103            // DROP VIEW IF EXISTS meets view not found - ignored
1104            return Ok(Output::new_with_affected_rows(0));
1105        } else {
1106            return TableNotFoundSnafu {
1107                table_name: format_full_table_name(&catalog, &schema, &view),
1108            }
1109            .fail();
1110        };
1111
1112        // Ensure the exists one is view, we can't drop other table types
1113        ensure!(
1114            view_info.table_type == TableType::View,
1115            error::InvalidViewSnafu {
1116                msg: "not a view",
1117                view_name: format_full_table_name(&catalog, &schema, &view),
1118            }
1119        );
1120
1121        let view_id = view_info.table_id();
1122
1123        let task = DropViewTask {
1124            catalog,
1125            schema,
1126            view,
1127            view_id,
1128            drop_if_exists,
1129        };
1130
1131        self.drop_view_procedure(task, query_context).await?;
1132
1133        Ok(Output::new_with_affected_rows(0))
1134    }
1135
1136    /// Submit [DropViewTask] to procedure executor.
1137    async fn drop_view_procedure(
1138        &self,
1139        expr: DropViewTask,
1140        query_context: QueryContextRef,
1141    ) -> Result<SubmitDdlTaskResponse> {
1142        let request = SubmitDdlTaskRequest::new(
1143            to_meta_query_context(query_context),
1144            DdlTask::new_drop_view(expr),
1145        );
1146
1147        self.procedure_executor
1148            .submit_ddl_task(&ExecutorContext::default(), request)
1149            .await
1150            .context(error::ExecuteDdlSnafu)
1151    }
1152
1153    #[tracing::instrument(skip_all)]
1154    pub async fn alter_logical_tables(
1155        &self,
1156        alter_table_exprs: Vec<AlterTableExpr>,
1157        query_context: QueryContextRef,
1158    ) -> Result<Output> {
1159        let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
1160        ensure!(
1161            !alter_table_exprs.is_empty(),
1162            EmptyDdlExprSnafu {
1163                name: "alter logical tables"
1164            }
1165        );
1166
1167        // group by physical table id
1168        let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
1169        for expr in alter_table_exprs {
1170            // Get table_id from catalog_manager
1171            let catalog = if expr.catalog_name.is_empty() {
1172                query_context.current_catalog()
1173            } else {
1174                &expr.catalog_name
1175            };
1176            let schema = if expr.schema_name.is_empty() {
1177                query_context.current_schema()
1178            } else {
1179                expr.schema_name.clone()
1180            };
1181            let table_name = &expr.table_name;
1182            let table = self
1183                .catalog_manager
1184                .table(catalog, &schema, table_name, Some(&query_context))
1185                .await
1186                .context(CatalogSnafu)?
1187                .with_context(|| TableNotFoundSnafu {
1188                    table_name: format_full_table_name(catalog, &schema, table_name),
1189                })?;
1190            let table_id = table.table_info().ident.table_id;
1191            let physical_table_id = self
1192                .table_metadata_manager
1193                .table_route_manager()
1194                .get_physical_table_id(table_id)
1195                .await
1196                .context(TableMetadataManagerSnafu)?;
1197            groups.entry(physical_table_id).or_default().push(expr);
1198        }
1199
1200        // Submit procedure for each physical table
1201        let mut handles = Vec::with_capacity(groups.len());
1202        for (_physical_table_id, exprs) in groups {
1203            let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
1204            handles.push(fut);
1205        }
1206        let _results = futures::future::try_join_all(handles).await?;
1207
1208        Ok(Output::new_with_affected_rows(0))
1209    }
1210
1211    #[tracing::instrument(skip_all)]
1212    pub async fn drop_table(
1213        &self,
1214        table_name: TableName,
1215        drop_if_exists: bool,
1216        query_context: QueryContextRef,
1217    ) -> Result<Output> {
1218        // Reserved for grpc call
1219        self.drop_tables(&[table_name], drop_if_exists, query_context)
1220            .await
1221    }
1222
1223    #[tracing::instrument(skip_all)]
1224    pub async fn drop_tables(
1225        &self,
1226        table_names: &[TableName],
1227        drop_if_exists: bool,
1228        query_context: QueryContextRef,
1229    ) -> Result<Output> {
1230        let mut tables = Vec::with_capacity(table_names.len());
1231        for table_name in table_names {
1232            ensure!(
1233                !is_readonly_schema(&table_name.schema_name),
1234                SchemaReadOnlySnafu {
1235                    name: table_name.schema_name.clone()
1236                }
1237            );
1238
1239            if let Some(table) = self
1240                .catalog_manager
1241                .table(
1242                    &table_name.catalog_name,
1243                    &table_name.schema_name,
1244                    &table_name.table_name,
1245                    Some(&query_context),
1246                )
1247                .await
1248                .context(CatalogSnafu)?
1249            {
1250                tables.push(table.table_info().table_id());
1251            } else if drop_if_exists {
1252                // DROP TABLE IF EXISTS meets table not found - ignored
1253                continue;
1254            } else {
1255                return TableNotFoundSnafu {
1256                    table_name: table_name.to_string(),
1257                }
1258                .fail();
1259            }
1260        }
1261
1262        for (table_name, table_id) in table_names.iter().zip(tables.into_iter()) {
1263            self.drop_table_procedure(table_name, table_id, drop_if_exists, query_context.clone())
1264                .await?;
1265
1266            // Invalidates local cache ASAP.
1267            self.cache_invalidator
1268                .invalidate(
1269                    &Context::default(),
1270                    &[
1271                        CacheIdent::TableId(table_id),
1272                        CacheIdent::TableName(table_name.clone()),
1273                    ],
1274                )
1275                .await
1276                .context(error::InvalidateTableCacheSnafu)?;
1277        }
1278        Ok(Output::new_with_affected_rows(0))
1279    }
1280
1281    #[tracing::instrument(skip_all)]
1282    pub async fn drop_database(
1283        &self,
1284        catalog: String,
1285        schema: String,
1286        drop_if_exists: bool,
1287        query_context: QueryContextRef,
1288    ) -> Result<Output> {
1289        ensure!(
1290            !is_readonly_schema(&schema),
1291            SchemaReadOnlySnafu { name: schema }
1292        );
1293
1294        if self
1295            .catalog_manager
1296            .schema_exists(&catalog, &schema, None)
1297            .await
1298            .context(CatalogSnafu)?
1299        {
1300            if schema == query_context.current_schema() {
1301                SchemaInUseSnafu { name: schema }.fail()
1302            } else {
1303                self.drop_database_procedure(catalog, schema, drop_if_exists, query_context)
1304                    .await?;
1305
1306                Ok(Output::new_with_affected_rows(0))
1307            }
1308        } else if drop_if_exists {
1309            // DROP TABLE IF EXISTS meets table not found - ignored
1310            Ok(Output::new_with_affected_rows(0))
1311        } else {
1312            SchemaNotFoundSnafu {
1313                schema_info: schema,
1314            }
1315            .fail()
1316        }
1317    }
1318
1319    #[tracing::instrument(skip_all)]
1320    pub async fn truncate_table(
1321        &self,
1322        table_name: TableName,
1323        time_ranges: Vec<(Timestamp, Timestamp)>,
1324        query_context: QueryContextRef,
1325    ) -> Result<Output> {
1326        ensure!(
1327            !is_readonly_schema(&table_name.schema_name),
1328            SchemaReadOnlySnafu {
1329                name: table_name.schema_name.clone()
1330            }
1331        );
1332
1333        let table = self
1334            .catalog_manager
1335            .table(
1336                &table_name.catalog_name,
1337                &table_name.schema_name,
1338                &table_name.table_name,
1339                Some(&query_context),
1340            )
1341            .await
1342            .context(CatalogSnafu)?
1343            .with_context(|| TableNotFoundSnafu {
1344                table_name: table_name.to_string(),
1345            })?;
1346        let table_id = table.table_info().table_id();
1347        self.truncate_table_procedure(&table_name, table_id, time_ranges, query_context)
1348            .await?;
1349
1350        Ok(Output::new_with_affected_rows(0))
1351    }
1352
1353    #[tracing::instrument(skip_all)]
1354    pub async fn alter_table(
1355        &self,
1356        alter_table: AlterTable,
1357        query_context: QueryContextRef,
1358    ) -> Result<Output> {
1359        if matches!(
1360            alter_table.alter_operation(),
1361            AlterTableOperation::Repartition { .. }
1362        ) {
1363            let request = expr_helper::to_repartition_request(alter_table, &query_context)?;
1364            return self.repartition_table(request, &query_context).await;
1365        }
1366
1367        let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
1368        self.alter_table_inner(expr, query_context).await
1369    }
1370
1371    #[tracing::instrument(skip_all)]
1372    pub async fn repartition_table(
1373        &self,
1374        request: RepartitionRequest,
1375        query_context: &QueryContextRef,
1376    ) -> Result<Output> {
1377        // Check if the schema is read-only.
1378        ensure!(
1379            !is_readonly_schema(&request.schema_name),
1380            SchemaReadOnlySnafu {
1381                name: request.schema_name.clone()
1382            }
1383        );
1384
1385        let table_ref = TableReference::full(
1386            &request.catalog_name,
1387            &request.schema_name,
1388            &request.table_name,
1389        );
1390        // Get the table from the catalog.
1391        let table = self
1392            .catalog_manager
1393            .table(
1394                &request.catalog_name,
1395                &request.schema_name,
1396                &request.table_name,
1397                Some(query_context),
1398            )
1399            .await
1400            .context(CatalogSnafu)?
1401            .with_context(|| TableNotFoundSnafu {
1402                table_name: table_ref.to_string(),
1403            })?;
1404        let table_id = table.table_info().ident.table_id;
1405        // Get existing partition expressions from the table route.
1406        let (physical_table_id, physical_table_route) = self
1407            .table_metadata_manager
1408            .table_route_manager()
1409            .get_physical_table_route(table_id)
1410            .await
1411            .context(TableMetadataManagerSnafu)?;
1412
1413        ensure!(
1414            physical_table_id == table_id,
1415            NotSupportedSnafu {
1416                feat: "REPARTITION on logical tables"
1417            }
1418        );
1419
1420        let table_info = table.table_info();
1421        // Get partition column names from the table metadata.
1422        let existing_partition_columns = table_info.meta.partition_columns().collect::<Vec<_>>();
1423        // Repartition requires the table to have partition columns.
1424        ensure!(
1425            !existing_partition_columns.is_empty(),
1426            InvalidPartitionRuleSnafu {
1427                reason: format!(
1428                    "table {} does not have partition columns, cannot repartition",
1429                    table_ref
1430                )
1431            }
1432        );
1433
1434        // Repartition operations involving columns outside the existing partition columns are not supported.
1435        // This restriction ensures repartition only applies to current partition columns.
1436        let column_name_and_type = existing_partition_columns
1437            .iter()
1438            .map(|column| (&column.name, column.data_type.clone()))
1439            .collect();
1440        let timezone = query_context.timezone();
1441        // Convert SQL Exprs to PartitionExprs.
1442        let from_partition_exprs = request
1443            .from_exprs
1444            .iter()
1445            .map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone))
1446            .collect::<Result<Vec<_>>>()?;
1447
1448        let mut into_partition_exprs = request
1449            .into_exprs
1450            .iter()
1451            .map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone))
1452            .collect::<Result<Vec<_>>>()?;
1453
1454        // `MERGE PARTITION` (and some `REPARTITION`) generates a single `OR` expression from
1455        // multiple source partitions; try to simplify it for better readability and stability.
1456        if from_partition_exprs.len() > 1
1457            && into_partition_exprs.len() == 1
1458            && let Some(expr) = into_partition_exprs.pop()
1459        {
1460            into_partition_exprs.push(partition::simplify::simplify_merged_partition_expr(expr));
1461        }
1462
1463        // Parse existing partition expressions from region routes.
1464        let mut existing_partition_exprs =
1465            Vec::with_capacity(physical_table_route.region_routes.len());
1466        for route in &physical_table_route.region_routes {
1467            let expr_json = route.region.partition_expr();
1468            if !expr_json.is_empty() {
1469                match PartitionExpr::from_json_str(&expr_json) {
1470                    Ok(Some(expr)) => existing_partition_exprs.push(expr),
1471                    Ok(None) => {
1472                        // Empty
1473                    }
1474                    Err(e) => {
1475                        return Err(e).context(DeserializePartitionExprSnafu);
1476                    }
1477                }
1478            }
1479        }
1480
1481        // Validate that from_partition_exprs are a subset of existing partition exprs.
1482        // We compare PartitionExpr directly since it implements Eq.
1483        for from_expr in &from_partition_exprs {
1484            ensure!(
1485                existing_partition_exprs.contains(from_expr),
1486                InvalidPartitionRuleSnafu {
1487                    reason: format!(
1488                        "partition expression '{}' does not exist in table {}",
1489                        from_expr, table_ref
1490                    )
1491                }
1492            );
1493        }
1494
1495        // Build the new partition expressions:
1496        // new_exprs = existing_exprs - from_exprs + into_exprs
1497        let new_partition_exprs: Vec<PartitionExpr> = existing_partition_exprs
1498            .into_iter()
1499            .filter(|expr| !from_partition_exprs.contains(expr))
1500            .chain(into_partition_exprs.clone().into_iter())
1501            .collect();
1502        let new_partition_exprs_len = new_partition_exprs.len();
1503        let from_partition_exprs_len = from_partition_exprs.len();
1504
1505        // Validate the new partition expressions using MultiDimPartitionRule and PartitionChecker.
1506        let _ = MultiDimPartitionRule::try_new(
1507            existing_partition_columns
1508                .iter()
1509                .map(|c| c.name.clone())
1510                .collect(),
1511            vec![],
1512            new_partition_exprs,
1513            true,
1514        )
1515        .context(InvalidPartitionSnafu)?;
1516
1517        let ddl_options = parse_ddl_options(&request.options)?;
1518        let serialize_exprs = |exprs: Vec<PartitionExpr>| -> Result<Vec<String>> {
1519            let mut json_exprs = Vec::with_capacity(exprs.len());
1520            for expr in exprs {
1521                json_exprs.push(expr.as_json_str().context(SerializePartitionExprSnafu)?);
1522            }
1523            Ok(json_exprs)
1524        };
1525        let from_partition_exprs_json = serialize_exprs(from_partition_exprs)?;
1526        let into_partition_exprs_json = serialize_exprs(into_partition_exprs)?;
1527        let mut req = SubmitDdlTaskRequest::new(
1528            to_meta_query_context(query_context.clone()),
1529            DdlTask::new_alter_table(AlterTableExpr {
1530                catalog_name: request.catalog_name.clone(),
1531                schema_name: request.schema_name.clone(),
1532                table_name: request.table_name.clone(),
1533                kind: Some(Kind::Repartition(Repartition {
1534                    from_partition_exprs: from_partition_exprs_json,
1535                    into_partition_exprs: into_partition_exprs_json,
1536                })),
1537            }),
1538        );
1539        req.wait = ddl_options.wait;
1540        req.timeout = ddl_options.timeout;
1541
1542        info!(
1543            "Submitting repartition task for table {} (table_id={}), from {} to {} partitions, timeout: {:?}, wait: {}",
1544            table_ref,
1545            table_id,
1546            from_partition_exprs_len,
1547            new_partition_exprs_len,
1548            ddl_options.timeout,
1549            ddl_options.wait
1550        );
1551
1552        let response = self
1553            .procedure_executor
1554            .submit_ddl_task(&ExecutorContext::default(), req)
1555            .await
1556            .context(error::ExecuteDdlSnafu)?;
1557
1558        if !ddl_options.wait {
1559            return build_procedure_id_output(response.key);
1560        }
1561
1562        // Only invalidate cache if wait is true.
1563        let invalidate_keys = vec![
1564            CacheIdent::TableId(table_id),
1565            CacheIdent::TableName(TableName::new(
1566                request.catalog_name,
1567                request.schema_name,
1568                request.table_name,
1569            )),
1570        ];
1571
1572        // Invalidates local cache ASAP.
1573        self.cache_invalidator
1574            .invalidate(&Context::default(), &invalidate_keys)
1575            .await
1576            .context(error::InvalidateTableCacheSnafu)?;
1577
1578        Ok(Output::new_with_affected_rows(0))
1579    }
1580
1581    #[tracing::instrument(skip_all)]
1582    pub async fn alter_table_inner(
1583        &self,
1584        expr: AlterTableExpr,
1585        query_context: QueryContextRef,
1586    ) -> Result<Output> {
1587        ensure!(
1588            !is_readonly_schema(&expr.schema_name),
1589            SchemaReadOnlySnafu {
1590                name: expr.schema_name.clone()
1591            }
1592        );
1593
1594        let catalog_name = if expr.catalog_name.is_empty() {
1595            DEFAULT_CATALOG_NAME.to_string()
1596        } else {
1597            expr.catalog_name.clone()
1598        };
1599
1600        let schema_name = if expr.schema_name.is_empty() {
1601            DEFAULT_SCHEMA_NAME.to_string()
1602        } else {
1603            expr.schema_name.clone()
1604        };
1605
1606        let table_name = expr.table_name.clone();
1607
1608        let table = self
1609            .catalog_manager
1610            .table(
1611                &catalog_name,
1612                &schema_name,
1613                &table_name,
1614                Some(&query_context),
1615            )
1616            .await
1617            .context(CatalogSnafu)?
1618            .with_context(|| TableNotFoundSnafu {
1619                table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
1620            })?;
1621
1622        let table_id = table.table_info().ident.table_id;
1623        let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?;
1624        if !need_alter {
1625            return Ok(Output::new_with_affected_rows(0));
1626        }
1627        info!(
1628            "Table info before alter is {:?}, expr: {:?}",
1629            table.table_info(),
1630            expr
1631        );
1632
1633        let physical_table_id = self
1634            .table_metadata_manager
1635            .table_route_manager()
1636            .get_physical_table_id(table_id)
1637            .await
1638            .context(TableMetadataManagerSnafu)?;
1639
1640        let (req, invalidate_keys) = if physical_table_id == table_id {
1641            // This is physical table
1642            let req = SubmitDdlTaskRequest::new(
1643                to_meta_query_context(query_context),
1644                DdlTask::new_alter_table(expr),
1645            );
1646
1647            let invalidate_keys = vec![
1648                CacheIdent::TableId(table_id),
1649                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1650            ];
1651
1652            (req, invalidate_keys)
1653        } else {
1654            // This is logical table
1655            let req = SubmitDdlTaskRequest::new(
1656                to_meta_query_context(query_context),
1657                DdlTask::new_alter_logical_tables(vec![expr]),
1658            );
1659
1660            let mut invalidate_keys = vec![
1661                CacheIdent::TableId(physical_table_id),
1662                CacheIdent::TableId(table_id),
1663                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1664            ];
1665
1666            let physical_table = self
1667                .table_metadata_manager
1668                .table_info_manager()
1669                .get(physical_table_id)
1670                .await
1671                .context(TableMetadataManagerSnafu)?
1672                .map(|x| x.into_inner());
1673            if let Some(physical_table) = physical_table {
1674                let physical_table_name = TableName::new(
1675                    physical_table.table_info.catalog_name,
1676                    physical_table.table_info.schema_name,
1677                    physical_table.table_info.name,
1678                );
1679                invalidate_keys.push(CacheIdent::TableName(physical_table_name));
1680            }
1681
1682            (req, invalidate_keys)
1683        };
1684
1685        self.procedure_executor
1686            .submit_ddl_task(&ExecutorContext::default(), req)
1687            .await
1688            .context(error::ExecuteDdlSnafu)?;
1689
1690        // Invalidates local cache ASAP.
1691        self.cache_invalidator
1692            .invalidate(&Context::default(), &invalidate_keys)
1693            .await
1694            .context(error::InvalidateTableCacheSnafu)?;
1695
1696        Ok(Output::new_with_affected_rows(0))
1697    }
1698
1699    #[cfg(feature = "enterprise")]
1700    #[tracing::instrument(skip_all)]
1701    pub async fn alter_trigger(
1702        &self,
1703        _alter_expr: AlterTrigger,
1704        _query_context: QueryContextRef,
1705    ) -> Result<Output> {
1706        crate::error::NotSupportedSnafu {
1707            feat: "alter trigger",
1708        }
1709        .fail()
1710    }
1711
1712    #[tracing::instrument(skip_all)]
1713    pub async fn alter_database(
1714        &self,
1715        alter_expr: AlterDatabase,
1716        query_context: QueryContextRef,
1717    ) -> Result<Output> {
1718        let alter_expr = expr_helper::to_alter_database_expr(alter_expr, &query_context)?;
1719        self.alter_database_inner(alter_expr, query_context).await
1720    }
1721
1722    #[tracing::instrument(skip_all)]
1723    pub async fn alter_database_inner(
1724        &self,
1725        alter_expr: AlterDatabaseExpr,
1726        query_context: QueryContextRef,
1727    ) -> Result<Output> {
1728        ensure!(
1729            !is_readonly_schema(&alter_expr.schema_name),
1730            SchemaReadOnlySnafu {
1731                name: query_context.current_schema().clone()
1732            }
1733        );
1734
1735        let exists = self
1736            .catalog_manager
1737            .schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
1738            .await
1739            .context(CatalogSnafu)?;
1740        ensure!(
1741            exists,
1742            SchemaNotFoundSnafu {
1743                schema_info: alter_expr.schema_name,
1744            }
1745        );
1746
1747        let cache_ident = [CacheIdent::SchemaName(SchemaName {
1748            catalog_name: alter_expr.catalog_name.clone(),
1749            schema_name: alter_expr.schema_name.clone(),
1750        })];
1751
1752        self.alter_database_procedure(alter_expr, query_context)
1753            .await?;
1754
1755        // Invalidates local cache ASAP.
1756        self.cache_invalidator
1757            .invalidate(&Context::default(), &cache_ident)
1758            .await
1759            .context(error::InvalidateTableCacheSnafu)?;
1760
1761        Ok(Output::new_with_affected_rows(0))
1762    }
1763
1764    async fn create_table_procedure(
1765        &self,
1766        create_table: CreateTableExpr,
1767        partitions: Vec<PartitionExpr>,
1768        table_info: TableInfo,
1769        query_context: QueryContextRef,
1770    ) -> Result<SubmitDdlTaskResponse> {
1771        let partitions = partitions
1772            .into_iter()
1773            .map(|expr| expr.as_pb_partition().context(PartitionExprToPbSnafu))
1774            .collect::<Result<Vec<_>>>()?;
1775
1776        let request = SubmitDdlTaskRequest::new(
1777            to_meta_query_context(query_context),
1778            DdlTask::new_create_table(create_table, partitions, table_info),
1779        );
1780
1781        self.procedure_executor
1782            .submit_ddl_task(&ExecutorContext::default(), request)
1783            .await
1784            .context(error::ExecuteDdlSnafu)
1785    }
1786
1787    async fn create_logical_tables_procedure(
1788        &self,
1789        tables_data: Vec<(CreateTableExpr, TableInfo)>,
1790        query_context: QueryContextRef,
1791    ) -> Result<SubmitDdlTaskResponse> {
1792        let request = SubmitDdlTaskRequest::new(
1793            to_meta_query_context(query_context),
1794            DdlTask::new_create_logical_tables(tables_data),
1795        );
1796
1797        self.procedure_executor
1798            .submit_ddl_task(&ExecutorContext::default(), request)
1799            .await
1800            .context(error::ExecuteDdlSnafu)
1801    }
1802
1803    async fn alter_logical_tables_procedure(
1804        &self,
1805        tables_data: Vec<AlterTableExpr>,
1806        query_context: QueryContextRef,
1807    ) -> Result<SubmitDdlTaskResponse> {
1808        let request = SubmitDdlTaskRequest::new(
1809            to_meta_query_context(query_context),
1810            DdlTask::new_alter_logical_tables(tables_data),
1811        );
1812
1813        self.procedure_executor
1814            .submit_ddl_task(&ExecutorContext::default(), request)
1815            .await
1816            .context(error::ExecuteDdlSnafu)
1817    }
1818
1819    async fn drop_table_procedure(
1820        &self,
1821        table_name: &TableName,
1822        table_id: TableId,
1823        drop_if_exists: bool,
1824        query_context: QueryContextRef,
1825    ) -> Result<SubmitDdlTaskResponse> {
1826        let request = SubmitDdlTaskRequest::new(
1827            to_meta_query_context(query_context),
1828            DdlTask::new_drop_table(
1829                table_name.catalog_name.clone(),
1830                table_name.schema_name.clone(),
1831                table_name.table_name.clone(),
1832                table_id,
1833                drop_if_exists,
1834            ),
1835        );
1836
1837        self.procedure_executor
1838            .submit_ddl_task(&ExecutorContext::default(), request)
1839            .await
1840            .context(error::ExecuteDdlSnafu)
1841    }
1842
1843    async fn drop_database_procedure(
1844        &self,
1845        catalog: String,
1846        schema: String,
1847        drop_if_exists: bool,
1848        query_context: QueryContextRef,
1849    ) -> Result<SubmitDdlTaskResponse> {
1850        let request = SubmitDdlTaskRequest::new(
1851            to_meta_query_context(query_context),
1852            DdlTask::new_drop_database(catalog, schema, drop_if_exists),
1853        );
1854
1855        self.procedure_executor
1856            .submit_ddl_task(&ExecutorContext::default(), request)
1857            .await
1858            .context(error::ExecuteDdlSnafu)
1859    }
1860
1861    async fn alter_database_procedure(
1862        &self,
1863        alter_expr: AlterDatabaseExpr,
1864        query_context: QueryContextRef,
1865    ) -> Result<SubmitDdlTaskResponse> {
1866        let request = SubmitDdlTaskRequest::new(
1867            to_meta_query_context(query_context),
1868            DdlTask::new_alter_database(alter_expr),
1869        );
1870
1871        self.procedure_executor
1872            .submit_ddl_task(&ExecutorContext::default(), request)
1873            .await
1874            .context(error::ExecuteDdlSnafu)
1875    }
1876
1877    async fn truncate_table_procedure(
1878        &self,
1879        table_name: &TableName,
1880        table_id: TableId,
1881        time_ranges: Vec<(Timestamp, Timestamp)>,
1882        query_context: QueryContextRef,
1883    ) -> Result<SubmitDdlTaskResponse> {
1884        let request = SubmitDdlTaskRequest::new(
1885            to_meta_query_context(query_context),
1886            DdlTask::new_truncate_table(
1887                table_name.catalog_name.clone(),
1888                table_name.schema_name.clone(),
1889                table_name.table_name.clone(),
1890                table_id,
1891                time_ranges,
1892            ),
1893        );
1894
1895        self.procedure_executor
1896            .submit_ddl_task(&ExecutorContext::default(), request)
1897            .await
1898            .context(error::ExecuteDdlSnafu)
1899    }
1900
1901    #[tracing::instrument(skip_all)]
1902    pub async fn create_database(
1903        &self,
1904        database: &str,
1905        create_if_not_exists: bool,
1906        options: HashMap<String, String>,
1907        query_context: QueryContextRef,
1908    ) -> Result<Output> {
1909        let catalog = query_context.current_catalog();
1910        ensure!(
1911            NAME_PATTERN_REG.is_match(catalog),
1912            error::UnexpectedSnafu {
1913                violated: format!("Invalid catalog name: {}", catalog)
1914            }
1915        );
1916
1917        ensure!(
1918            NAME_PATTERN_REG.is_match(database),
1919            error::UnexpectedSnafu {
1920                violated: format!("Invalid database name: {}", database)
1921            }
1922        );
1923
1924        if !self
1925            .catalog_manager
1926            .schema_exists(catalog, database, None)
1927            .await
1928            .context(CatalogSnafu)?
1929            && !self.catalog_manager.is_reserved_schema_name(database)
1930        {
1931            self.create_database_procedure(
1932                catalog.to_string(),
1933                database.to_string(),
1934                create_if_not_exists,
1935                options,
1936                query_context,
1937            )
1938            .await?;
1939
1940            Ok(Output::new_with_affected_rows(1))
1941        } else if create_if_not_exists {
1942            Ok(Output::new_with_affected_rows(1))
1943        } else {
1944            error::SchemaExistsSnafu { name: database }.fail()
1945        }
1946    }
1947
1948    async fn create_database_procedure(
1949        &self,
1950        catalog: String,
1951        database: String,
1952        create_if_not_exists: bool,
1953        options: HashMap<String, String>,
1954        query_context: QueryContextRef,
1955    ) -> Result<SubmitDdlTaskResponse> {
1956        let request = SubmitDdlTaskRequest::new(
1957            to_meta_query_context(query_context),
1958            DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
1959        );
1960
1961        self.procedure_executor
1962            .submit_ddl_task(&ExecutorContext::default(), request)
1963            .await
1964            .context(error::ExecuteDdlSnafu)
1965    }
1966}
1967
1968/// Parse partition statement [Partitions] into [PartitionExpr] and partition columns.
1969pub fn parse_partitions(
1970    create_table: &CreateTableExpr,
1971    partitions: Option<Partitions>,
1972    query_ctx: &QueryContextRef,
1973) -> Result<(Vec<PartitionExpr>, Vec<String>)> {
1974    // If partitions are not defined by user, use the timestamp column (which has to be existed) as
1975    // the partition column, and create only one partition.
1976    let partition_columns = find_partition_columns(&partitions)?;
1977    let partition_exprs =
1978        find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
1979
1980    // Validates partition
1981    let exprs = partition_exprs.clone();
1982    MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true)
1983        .context(InvalidPartitionSnafu)?;
1984
1985    Ok((partition_exprs, partition_columns))
1986}
1987
1988fn parse_partitions_for_logical_validation(
1989    create_table: &CreateTableExpr,
1990    partitions: &Partitions,
1991    query_ctx: &QueryContextRef,
1992) -> Result<(Vec<String>, Vec<PartitionExpr>)> {
1993    let partition_columns = partitions
1994        .column_list
1995        .iter()
1996        .map(|ident| ident.value.clone())
1997        .collect::<Vec<_>>();
1998
1999    let column_name_and_type = partition_columns
2000        .iter()
2001        .map(|pc| {
2002            let column = create_table
2003                .column_defs
2004                .iter()
2005                .find(|c| &c.name == pc)
2006                .context(ColumnNotFoundSnafu { msg: pc.clone() })?;
2007            let column_name = &column.name;
2008            let data_type = ConcreteDataType::from(
2009                ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
2010                    .context(ColumnDataTypeSnafu)?,
2011            );
2012            Ok((column_name, data_type))
2013        })
2014        .collect::<Result<HashMap<_, _>>>()?;
2015
2016    let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
2017    for expr in &partitions.exprs {
2018        let partition_expr = convert_one_expr(expr, &column_name_and_type, &query_ctx.timezone())?;
2019        partition_exprs.push(partition_expr);
2020    }
2021
2022    MultiDimPartitionRule::try_new(
2023        partition_columns.clone(),
2024        vec![],
2025        partition_exprs.clone(),
2026        true,
2027    )
2028    .context(InvalidPartitionSnafu)?;
2029
2030    Ok((partition_columns, partition_exprs))
2031}
2032
2033/// Verifies an alter and returns whether it is necessary to perform the alter.
2034///
2035/// # Returns
2036///
2037/// Returns true if the alter need to be porformed; otherwise, it returns false.
2038pub fn verify_alter(
2039    table_id: TableId,
2040    table_info: Arc<TableInfo>,
2041    expr: AlterTableExpr,
2042) -> Result<bool> {
2043    let request: AlterTableRequest =
2044        common_grpc_expr::alter_expr_to_request(table_id, expr, Some(&table_info.meta))
2045            .context(AlterExprToRequestSnafu)?;
2046
2047    let AlterTableRequest {
2048        table_name,
2049        alter_kind,
2050        ..
2051    } = &request;
2052
2053    if let AlterKind::RenameTable { new_table_name } = alter_kind {
2054        ensure!(
2055            NAME_PATTERN_REG.is_match(new_table_name),
2056            error::UnexpectedSnafu {
2057                violated: format!("Invalid table name: {}", new_table_name)
2058            }
2059        );
2060    } else if let AlterKind::AddColumns { columns } = alter_kind {
2061        // If all the columns are marked as add_if_not_exists and they already exist in the table,
2062        // there is no need to perform the alter.
2063        let column_names: HashSet<_> = table_info
2064            .meta
2065            .schema
2066            .column_schemas()
2067            .iter()
2068            .map(|schema| &schema.name)
2069            .collect();
2070        if columns.iter().all(|column| {
2071            column_names.contains(&column.column_schema.name) && column.add_if_not_exists
2072        }) {
2073            return Ok(false);
2074        }
2075    }
2076
2077    let _ = table_info
2078        .meta
2079        .builder_with_alter_kind(table_name, &request.alter_kind)
2080        .context(error::TableSnafu)?
2081        .build()
2082        .context(error::BuildTableMetaSnafu { table_name })?;
2083
2084    Ok(true)
2085}
2086
2087pub fn create_table_info(
2088    create_table: &CreateTableExpr,
2089    partition_columns: Vec<String>,
2090) -> Result<TableInfo> {
2091    let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
2092    let mut column_name_to_index_map = HashMap::new();
2093
2094    for (idx, column) in create_table.column_defs.iter().enumerate() {
2095        let schema =
2096            column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
2097                column: &column.name,
2098            })?;
2099        let schema = schema.with_time_index(column.name == create_table.time_index);
2100
2101        column_schemas.push(schema);
2102        let _ = column_name_to_index_map.insert(column.name.clone(), idx);
2103    }
2104
2105    let next_column_id = column_schemas.len() as u32;
2106    let schema = Arc::new(Schema::new(column_schemas));
2107
2108    let primary_key_indices = create_table
2109        .primary_keys
2110        .iter()
2111        .map(|name| {
2112            column_name_to_index_map
2113                .get(name)
2114                .cloned()
2115                .context(ColumnNotFoundSnafu { msg: name })
2116        })
2117        .collect::<Result<Vec<_>>>()?;
2118
2119    let partition_key_indices = partition_columns
2120        .into_iter()
2121        .map(|col_name| {
2122            column_name_to_index_map
2123                .get(&col_name)
2124                .cloned()
2125                .context(ColumnNotFoundSnafu { msg: col_name })
2126        })
2127        .collect::<Result<Vec<_>>>()?;
2128
2129    let table_options = TableOptions::try_from_iter(&create_table.table_options)
2130        .context(UnrecognizedTableOptionSnafu)?;
2131
2132    let meta = TableMeta {
2133        schema,
2134        primary_key_indices,
2135        value_indices: vec![],
2136        engine: create_table.engine.clone(),
2137        next_column_id,
2138        options: table_options,
2139        created_on: Utc::now(),
2140        updated_on: Utc::now(),
2141        partition_key_indices,
2142        column_ids: vec![],
2143    };
2144
2145    let desc = if create_table.desc.is_empty() {
2146        create_table.table_options.get(COMMENT_KEY).cloned()
2147    } else {
2148        Some(create_table.desc.clone())
2149    };
2150
2151    let table_info = TableInfo {
2152        ident: metadata::TableIdent {
2153            // The table id of distributed table is assigned by Meta, set "0" here as a placeholder.
2154            table_id: 0,
2155            version: 0,
2156        },
2157        name: create_table.table_name.clone(),
2158        desc,
2159        catalog_name: create_table.catalog_name.clone(),
2160        schema_name: create_table.schema_name.clone(),
2161        meta,
2162        table_type: TableType::Base,
2163    };
2164    Ok(table_info)
2165}
2166
2167fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
2168    let columns = if let Some(partitions) = partitions {
2169        partitions
2170            .column_list
2171            .iter()
2172            .map(|x| x.value.clone())
2173            .collect::<Vec<_>>()
2174    } else {
2175        vec![]
2176    };
2177    Ok(columns)
2178}
2179
2180/// Parse [Partitions] into a group of partition entries.
2181///
2182/// Returns a list of [PartitionExpr], each of which defines a partition.
2183fn find_partition_entries(
2184    create_table: &CreateTableExpr,
2185    partitions: &Option<Partitions>,
2186    partition_columns: &[String],
2187    query_ctx: &QueryContextRef,
2188) -> Result<Vec<PartitionExpr>> {
2189    let Some(partitions) = partitions else {
2190        return Ok(vec![]);
2191    };
2192
2193    // extract concrete data type of partition columns
2194    let column_name_and_type = partition_columns
2195        .iter()
2196        .map(|pc| {
2197            let column = create_table
2198                .column_defs
2199                .iter()
2200                .find(|c| &c.name == pc)
2201                // unwrap is safe here because we have checked that partition columns are defined
2202                .unwrap();
2203            let column_name = &column.name;
2204            let data_type = ConcreteDataType::from(
2205                ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
2206                    .context(ColumnDataTypeSnafu)?,
2207            );
2208            Ok((column_name, data_type))
2209        })
2210        .collect::<Result<HashMap<_, _>>>()?;
2211
2212    // Transform parser expr to partition expr
2213    let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
2214    for partition in &partitions.exprs {
2215        let partition_expr =
2216            convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
2217        partition_exprs.push(partition_expr);
2218    }
2219
2220    Ok(partition_exprs)
2221}
2222
2223fn convert_one_expr(
2224    expr: &Expr,
2225    column_name_and_type: &HashMap<&String, ConcreteDataType>,
2226    timezone: &Timezone,
2227) -> Result<PartitionExpr> {
2228    let Expr::BinaryOp { left, op, right } = expr else {
2229        return InvalidPartitionRuleSnafu {
2230            reason: "partition rule must be a binary expression",
2231        }
2232        .fail();
2233    };
2234
2235    let op =
2236        RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
2237            reason: format!("unsupported operator in partition expr {op}"),
2238        })?;
2239
2240    // convert leaf node.
2241    let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
2242        // col, val
2243        (Expr::Identifier(ident), Expr::Value(value)) => {
2244            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2245            let value = convert_value(&value.value, data_type, timezone, None)?;
2246            (Operand::Column(column_name), op, Operand::Value(value))
2247        }
2248        (Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
2249            if let Expr::Value(v) = &**expr =>
2250        {
2251            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2252            let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
2253            (Operand::Column(column_name), op, Operand::Value(value))
2254        }
2255        // val, col
2256        (Expr::Value(value), Expr::Identifier(ident)) => {
2257            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2258            let value = convert_value(&value.value, data_type, timezone, None)?;
2259            (Operand::Value(value), op, Operand::Column(column_name))
2260        }
2261        (Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
2262            if let Expr::Value(v) = &**expr =>
2263        {
2264            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2265            let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
2266            (Operand::Value(value), op, Operand::Column(column_name))
2267        }
2268        (Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
2269            // sub-expr must against another sub-expr
2270            let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
2271            let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
2272            (Operand::Expr(lhs), op, Operand::Expr(rhs))
2273        }
2274        _ => {
2275            return InvalidPartitionRuleSnafu {
2276                reason: format!("invalid partition expr {expr}"),
2277            }
2278            .fail();
2279        }
2280    };
2281
2282    Ok(PartitionExpr::new(lhs, op, rhs))
2283}
2284
2285fn convert_identifier(
2286    ident: &Ident,
2287    column_name_and_type: &HashMap<&String, ConcreteDataType>,
2288) -> Result<(String, ConcreteDataType)> {
2289    let column_name = ident.value.clone();
2290    let data_type = column_name_and_type
2291        .get(&column_name)
2292        .cloned()
2293        .with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
2294    Ok((column_name, data_type))
2295}
2296
2297fn convert_value(
2298    value: &ParserValue,
2299    data_type: ConcreteDataType,
2300    timezone: &Timezone,
2301    unary_op: Option<UnaryOperator>,
2302) -> Result<Value> {
2303    sql_value_to_value(
2304        &ColumnSchema::new("<NONAME>", data_type, true),
2305        value,
2306        Some(timezone),
2307        unary_op,
2308        false,
2309    )
2310    .context(error::SqlCommonSnafu)
2311}
2312
2313#[cfg(test)]
2314mod test {
2315    use std::time::Duration;
2316
2317    use session::context::{QueryContext, QueryContextBuilder};
2318    use sql::dialect::GreptimeDbDialect;
2319    use sql::parser::{ParseOptions, ParserContext};
2320    use sql::statements::statement::Statement;
2321    use sqlparser::parser::Parser;
2322
2323    use super::*;
2324    use crate::expr_helper;
2325
2326    #[test]
2327    fn test_parse_ddl_options() {
2328        let options = OptionMap::from([
2329            ("timeout".to_string(), "5m".to_string()),
2330            ("wait".to_string(), "false".to_string()),
2331        ]);
2332        let ddl_options = parse_ddl_options(&options).unwrap();
2333        assert!(!ddl_options.wait);
2334        assert_eq!(Duration::from_secs(300), ddl_options.timeout);
2335    }
2336
2337    #[test]
2338    fn test_name_is_match() {
2339        assert!(!NAME_PATTERN_REG.is_match("/adaf"));
2340        assert!(!NAME_PATTERN_REG.is_match("🈲"));
2341        assert!(NAME_PATTERN_REG.is_match("hello"));
2342        assert!(NAME_PATTERN_REG.is_match("test@"));
2343        assert!(!NAME_PATTERN_REG.is_match("@test"));
2344        assert!(NAME_PATTERN_REG.is_match("test#"));
2345        assert!(!NAME_PATTERN_REG.is_match("#test"));
2346        assert!(!NAME_PATTERN_REG.is_match("@"));
2347        assert!(!NAME_PATTERN_REG.is_match("#"));
2348    }
2349
2350    #[test]
2351    fn test_partition_expr_equivalence_with_swapped_operands() {
2352        let column_name = "device_id".to_string();
2353        let column_name_and_type =
2354            HashMap::from([(&column_name, ConcreteDataType::int32_datatype())]);
2355        let timezone = Timezone::from_tz_string("UTC").unwrap();
2356        let dialect = GreptimeDbDialect {};
2357
2358        let mut parser = Parser::new(&dialect)
2359            .try_with_sql("device_id < 100")
2360            .unwrap();
2361        let expr_left = parser.parse_expr().unwrap();
2362
2363        let mut parser = Parser::new(&dialect)
2364            .try_with_sql("100 > device_id")
2365            .unwrap();
2366        let expr_right = parser.parse_expr().unwrap();
2367
2368        let partition_left =
2369            convert_one_expr(&expr_left, &column_name_and_type, &timezone).unwrap();
2370        let partition_right =
2371            convert_one_expr(&expr_right, &column_name_and_type, &timezone).unwrap();
2372
2373        assert_eq!(partition_left, partition_right);
2374        assert!([partition_left.clone()].contains(&partition_right));
2375
2376        let mut physical_partition_exprs = vec![partition_left];
2377        let mut logical_partition_exprs = vec![partition_right];
2378        physical_partition_exprs.sort_unstable();
2379        logical_partition_exprs.sort_unstable();
2380        assert_eq!(physical_partition_exprs, logical_partition_exprs);
2381    }
2382
2383    #[tokio::test]
2384    #[ignore = "TODO(ruihang): WIP new partition rule"]
2385    async fn test_parse_partitions() {
2386        common_telemetry::init_default_ut_logging();
2387        let cases = [
2388            (
2389                r"
2390CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
2391PARTITION ON COLUMNS (b) (
2392  b < 'hz',
2393  b >= 'hz' AND b < 'sh',
2394  b >= 'sh'
2395)
2396ENGINE=mito",
2397                r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
2398            ),
2399            (
2400                r"
2401CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
2402PARTITION BY RANGE COLUMNS (b, a) (
2403  PARTITION r0 VALUES LESS THAN ('hz', 10),
2404  b < 'hz' AND a < 10,
2405  b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20,
2406  b >= 'sh' AND a >= 20
2407)
2408ENGINE=mito",
2409                r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
2410            ),
2411        ];
2412        let ctx = QueryContextBuilder::default().build().into();
2413        for (sql, expected) in cases {
2414            let result = ParserContext::create_with_dialect(
2415                sql,
2416                &GreptimeDbDialect {},
2417                ParseOptions::default(),
2418            )
2419            .unwrap();
2420            match &result[0] {
2421                Statement::CreateTable(c) => {
2422                    let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
2423                    let (partitions, _) =
2424                        parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
2425                    let json = serde_json::to_string(&partitions).unwrap();
2426                    assert_eq!(json, expected);
2427                }
2428                _ => unreachable!(),
2429            }
2430        }
2431    }
2432}