operator/statement/
ddl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::helper::ColumnDataTypeWrapper;
20use api::v1::alter_table_expr::Kind;
21use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
22use api::v1::{
23    AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
24    Repartition, column_def,
25};
26#[cfg(feature = "enterprise")]
27use api::v1::{
28    CreateTriggerExpr as PbCreateTriggerExpr, meta::CreateTriggerTask as PbCreateTriggerTask,
29};
30use catalog::CatalogManagerRef;
31use chrono::Utc;
32use common_base::regex_pattern::NAME_PATTERN_REG;
33use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, is_readonly_schema};
34use common_catalog::{format_full_flow_name, format_full_table_name};
35use common_error::ext::BoxedError;
36use common_meta::cache_invalidator::Context;
37use common_meta::ddl::create_flow::FlowType;
38use common_meta::instruction::CacheIdent;
39use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
40use common_meta::procedure_executor::ExecutorContext;
41#[cfg(feature = "enterprise")]
42use common_meta::rpc::ddl::trigger::CreateTriggerTask;
43#[cfg(feature = "enterprise")]
44use common_meta::rpc::ddl::trigger::DropTriggerTask;
45use common_meta::rpc::ddl::{
46    CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
47    SubmitDdlTaskResponse,
48};
49use common_query::Output;
50use common_recordbatch::{RecordBatch, RecordBatches};
51use common_sql::convert::sql_value_to_value;
52use common_telemetry::{debug, info, tracing, warn};
53use common_time::{Timestamp, Timezone};
54use datafusion_common::tree_node::TreeNodeVisitor;
55use datafusion_expr::LogicalPlan;
56use datatypes::prelude::ConcreteDataType;
57use datatypes::schema::{ColumnSchema, Schema};
58use datatypes::value::Value;
59use datatypes::vectors::{StringVector, VectorRef};
60use humantime::parse_duration;
61use partition::expr::{Operand, PartitionExpr, RestrictedOp};
62use partition::multi_dim::MultiDimPartitionRule;
63use query::parser::QueryStatement;
64use query::plan::extract_and_rewrite_full_table_names;
65use query::query_engine::DefaultSerializer;
66use query::sql::create_table_stmt;
67use session::context::QueryContextRef;
68use session::table_name::table_idents_to_full_name;
69use snafu::{OptionExt, ResultExt, ensure};
70use sql::parser::{ParseOptions, ParserContext};
71use sql::statements::OptionMap;
72#[cfg(feature = "enterprise")]
73use sql::statements::alter::trigger::AlterTrigger;
74use sql::statements::alter::{AlterDatabase, AlterTable, AlterTableOperation};
75#[cfg(feature = "enterprise")]
76use sql::statements::create::trigger::CreateTrigger;
77use sql::statements::create::{
78    CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
79};
80use sql::statements::statement::Statement;
81use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
82use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
83use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
84use table::TableRef;
85use table::dist_table::DistTable;
86use table::metadata::{self, TableId, TableInfo, TableMeta, TableType};
87use table::requests::{
88    AlterKind, AlterTableRequest, COMMENT_KEY, DDL_TIMEOUT, DDL_WAIT, TableOptions,
89};
90use table::table_name::TableName;
91use table::table_reference::TableReference;
92
93use crate::error::{
94    self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
95    ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu,
96    DeserializePartitionExprSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
97    FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
98    InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu,
99    PartitionExprToPbSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu,
100    SerializePartitionExprSnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu,
101    TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
102    ViewAlreadyExistsSnafu,
103};
104use crate::expr_helper::{self, RepartitionRequest};
105use crate::statement::StatementExecutor;
106use crate::statement::show::create_partitions_stmt;
107use crate::utils::to_meta_query_context;
108
109#[derive(Debug, Clone, Copy)]
110struct DdlSubmitOptions {
111    wait: bool,
112    timeout: Duration,
113}
114
115fn build_procedure_id_output(procedure_id: Vec<u8>) -> Result<Output> {
116    let procedure_id = String::from_utf8_lossy(&procedure_id).to_string();
117    let vector: VectorRef = Arc::new(StringVector::from(vec![procedure_id]));
118    let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
119        "Procedure ID",
120        vector.data_type(),
121        false,
122    )]));
123    let batch =
124        RecordBatch::new(schema.clone(), vec![vector]).context(error::BuildRecordBatchSnafu)?;
125    let batches =
126        RecordBatches::try_new(schema, vec![batch]).context(error::BuildRecordBatchSnafu)?;
127    Ok(Output::new_with_record_batches(batches))
128}
129
130fn parse_ddl_options(options: &OptionMap) -> Result<DdlSubmitOptions> {
131    let wait = match options.get(DDL_WAIT) {
132        Some(value) => value.parse::<bool>().map_err(|_| {
133            InvalidSqlSnafu {
134                err_msg: format!("invalid DDL option '{DDL_WAIT}': '{value}'"),
135            }
136            .build()
137        })?,
138        None => SubmitDdlTaskRequest::default_wait(),
139    };
140
141    let timeout = match options.get(DDL_TIMEOUT) {
142        Some(value) => parse_duration(value).map_err(|err| {
143            InvalidSqlSnafu {
144                err_msg: format!("invalid DDL option '{DDL_TIMEOUT}': '{value}': {err}"),
145            }
146            .build()
147        })?,
148        None => SubmitDdlTaskRequest::default_timeout(),
149    };
150
151    Ok(DdlSubmitOptions { wait, timeout })
152}
153
154impl StatementExecutor {
155    pub fn catalog_manager(&self) -> CatalogManagerRef {
156        self.catalog_manager.clone()
157    }
158
159    #[tracing::instrument(skip_all)]
160    pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
161        let (catalog, schema, _table) = table_idents_to_full_name(&stmt.name, &ctx)
162            .map_err(BoxedError::new)
163            .context(error::ExternalSnafu)?;
164
165        let schema_options = self
166            .table_metadata_manager
167            .schema_manager()
168            .get(SchemaNameKey {
169                catalog: &catalog,
170                schema: &schema,
171            })
172            .await
173            .context(TableMetadataManagerSnafu)?
174            .map(|v| v.into_inner());
175
176        let create_expr = &mut expr_helper::create_to_expr(&stmt, &ctx)?;
177        // Don't inherit schema-level TTL/compaction options into table options:
178        // TTL is applied during compaction, and `compaction.*` is handled separately.
179        if let Some(schema_options) = schema_options {
180            for (key, value) in schema_options.extra_options.iter() {
181                if key.starts_with("compaction.") {
182                    continue;
183                }
184                create_expr
185                    .table_options
186                    .entry(key.clone())
187                    .or_insert(value.clone());
188            }
189        }
190
191        self.create_table_inner(create_expr, stmt.partitions, ctx)
192            .await
193    }
194
195    #[tracing::instrument(skip_all)]
196    pub async fn create_table_like(
197        &self,
198        stmt: CreateTableLike,
199        ctx: QueryContextRef,
200    ) -> Result<TableRef> {
201        let (catalog, schema, table) = table_idents_to_full_name(&stmt.source_name, &ctx)
202            .map_err(BoxedError::new)
203            .context(error::ExternalSnafu)?;
204        let table_ref = self
205            .catalog_manager
206            .table(&catalog, &schema, &table, Some(&ctx))
207            .await
208            .context(CatalogSnafu)?
209            .context(TableNotFoundSnafu { table_name: &table })?;
210        let partition_info = self
211            .partition_manager
212            .find_physical_partition_info(table_ref.table_info().table_id())
213            .await
214            .context(error::FindTablePartitionRuleSnafu { table_name: table })?;
215
216        // CREATE TABLE LIKE also inherits database level options.
217        let schema_options = self
218            .table_metadata_manager
219            .schema_manager()
220            .get(SchemaNameKey {
221                catalog: &catalog,
222                schema: &schema,
223            })
224            .await
225            .context(TableMetadataManagerSnafu)?
226            .map(|v| v.into_inner());
227
228        let quote_style = ctx.quote_style();
229        let mut create_stmt =
230            create_table_stmt(&table_ref.table_info(), schema_options, quote_style)
231                .context(error::ParseQuerySnafu)?;
232        create_stmt.name = stmt.table_name;
233        create_stmt.if_not_exists = false;
234
235        let table_info = table_ref.table_info();
236        let partitions = create_partitions_stmt(&table_info, &partition_info.partitions)?.and_then(
237            |mut partitions| {
238                if !partitions.column_list.is_empty() {
239                    partitions.set_quote(quote_style);
240                    Some(partitions)
241                } else {
242                    None
243                }
244            },
245        );
246
247        let create_expr = &mut expr_helper::create_to_expr(&create_stmt, &ctx)?;
248        self.create_table_inner(create_expr, partitions, ctx).await
249    }
250
251    #[tracing::instrument(skip_all)]
252    pub async fn create_external_table(
253        &self,
254        create_expr: CreateExternalTable,
255        ctx: QueryContextRef,
256    ) -> Result<TableRef> {
257        let create_expr = &mut expr_helper::create_external_expr(create_expr, &ctx).await?;
258        self.create_table_inner(create_expr, None, ctx).await
259    }
260
261    #[tracing::instrument(skip_all)]
262    pub async fn create_table_inner(
263        &self,
264        create_table: &mut CreateTableExpr,
265        partitions: Option<Partitions>,
266        query_ctx: QueryContextRef,
267    ) -> Result<TableRef> {
268        ensure!(
269            !is_readonly_schema(&create_table.schema_name),
270            SchemaReadOnlySnafu {
271                name: create_table.schema_name.clone()
272            }
273        );
274
275        if create_table.engine == METRIC_ENGINE_NAME
276            && create_table
277                .table_options
278                .contains_key(LOGICAL_TABLE_METADATA_KEY)
279        {
280            if let Some(partitions) = partitions.as_ref()
281                && !partitions.exprs.is_empty()
282            {
283                self.validate_logical_table_partition_rule(create_table, partitions, &query_ctx)
284                    .await?;
285            }
286            // Create logical tables
287            self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
288                .await?
289                .into_iter()
290                .next()
291                .context(error::UnexpectedSnafu {
292                    violated: "expected to create logical tables",
293                })
294        } else {
295            // Create other normal table
296            self.create_non_logic_table(create_table, partitions, query_ctx)
297                .await
298        }
299    }
300
301    #[tracing::instrument(skip_all)]
302    pub async fn create_non_logic_table(
303        &self,
304        create_table: &mut CreateTableExpr,
305        partitions: Option<Partitions>,
306        query_ctx: QueryContextRef,
307    ) -> Result<TableRef> {
308        let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
309
310        // Check if schema exists
311        let schema = self
312            .table_metadata_manager
313            .schema_manager()
314            .get(SchemaNameKey::new(
315                &create_table.catalog_name,
316                &create_table.schema_name,
317            ))
318            .await
319            .context(TableMetadataManagerSnafu)?;
320        ensure!(
321            schema.is_some(),
322            SchemaNotFoundSnafu {
323                schema_info: &create_table.schema_name,
324            }
325        );
326
327        // if table exists.
328        if let Some(table) = self
329            .catalog_manager
330            .table(
331                &create_table.catalog_name,
332                &create_table.schema_name,
333                &create_table.table_name,
334                Some(&query_ctx),
335            )
336            .await
337            .context(CatalogSnafu)?
338        {
339            return if create_table.create_if_not_exists {
340                Ok(table)
341            } else {
342                TableAlreadyExistsSnafu {
343                    table: format_full_table_name(
344                        &create_table.catalog_name,
345                        &create_table.schema_name,
346                        &create_table.table_name,
347                    ),
348                }
349                .fail()
350            };
351        }
352
353        ensure!(
354            NAME_PATTERN_REG.is_match(&create_table.table_name),
355            InvalidTableNameSnafu {
356                table_name: &create_table.table_name,
357            }
358        );
359
360        let table_name = TableName::new(
361            &create_table.catalog_name,
362            &create_table.schema_name,
363            &create_table.table_name,
364        );
365
366        let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
367        let mut table_info = create_table_info(create_table, partition_cols)?;
368
369        let resp = self
370            .create_table_procedure(
371                create_table.clone(),
372                partitions,
373                table_info.clone(),
374                query_ctx,
375            )
376            .await?;
377
378        let table_id = resp
379            .table_ids
380            .into_iter()
381            .next()
382            .context(error::UnexpectedSnafu {
383                violated: "expected table_id",
384            })?;
385        info!("Successfully created table '{table_name}' with table id {table_id}");
386
387        table_info.ident.table_id = table_id;
388
389        let table_info = Arc::new(table_info);
390        create_table.table_id = Some(api::v1::TableId { id: table_id });
391
392        let table = DistTable::table(table_info);
393
394        Ok(table)
395    }
396
397    #[tracing::instrument(skip_all)]
398    pub async fn create_logical_tables(
399        &self,
400        create_table_exprs: &[CreateTableExpr],
401        query_context: QueryContextRef,
402    ) -> Result<Vec<TableRef>> {
403        let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
404        ensure!(
405            !create_table_exprs.is_empty(),
406            EmptyDdlExprSnafu {
407                name: "create logic tables"
408            }
409        );
410
411        // Check table names
412        for create_table in create_table_exprs {
413            ensure!(
414                NAME_PATTERN_REG.is_match(&create_table.table_name),
415                InvalidTableNameSnafu {
416                    table_name: &create_table.table_name,
417                }
418            );
419        }
420
421        let raw_tables_info = create_table_exprs
422            .iter()
423            .map(|create| create_table_info(create, vec![]))
424            .collect::<Result<Vec<_>>>()?;
425        let tables_data = create_table_exprs
426            .iter()
427            .cloned()
428            .zip(raw_tables_info.iter().cloned())
429            .collect::<Vec<_>>();
430
431        let resp = self
432            .create_logical_tables_procedure(tables_data, query_context.clone())
433            .await?;
434
435        let table_ids = resp.table_ids;
436        ensure!(
437            table_ids.len() == raw_tables_info.len(),
438            CreateLogicalTablesSnafu {
439                reason: format!(
440                    "The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}",
441                    raw_tables_info.len(),
442                    table_ids.len()
443                )
444            }
445        );
446        info!("Successfully created logical tables: {:?}", table_ids);
447
448        // Reacquire table infos from catalog so logical tables inherit the latest partition
449        // metadata (e.g. partition_key_indices) from their physical tables.
450        // And the returned table info also included extra partition columns that are in physical table but not in logical table's create table expr
451        let mut tables_info = Vec::with_capacity(table_ids.len());
452        for (table_id, create_table) in table_ids.iter().zip(create_table_exprs.iter()) {
453            let table = self
454                .catalog_manager
455                .table(
456                    &create_table.catalog_name,
457                    &create_table.schema_name,
458                    &create_table.table_name,
459                    Some(&query_context),
460                )
461                .await
462                .context(CatalogSnafu)?
463                .with_context(|| TableNotFoundSnafu {
464                    table_name: format_full_table_name(
465                        &create_table.catalog_name,
466                        &create_table.schema_name,
467                        &create_table.table_name,
468                    ),
469                })?;
470
471            let table_info = table.table_info();
472            // Safety check: ensure we are returning the table info that matches the newly created table id.
473            ensure!(
474                table_info.table_id() == *table_id,
475                CreateLogicalTablesSnafu {
476                    reason: format!(
477                        "Table id mismatch after creation, expected {}, got {} for table {}",
478                        table_id,
479                        table_info.table_id(),
480                        format_full_table_name(
481                            &create_table.catalog_name,
482                            &create_table.schema_name,
483                            &create_table.table_name
484                        )
485                    )
486                }
487            );
488
489            tables_info.push(table_info);
490        }
491
492        Ok(tables_info.into_iter().map(DistTable::table).collect())
493    }
494
495    async fn validate_logical_table_partition_rule(
496        &self,
497        create_table: &CreateTableExpr,
498        partitions: &Partitions,
499        query_ctx: &QueryContextRef,
500    ) -> Result<()> {
501        let (_, mut logical_partition_exprs) =
502            parse_partitions_for_logical_validation(create_table, partitions, query_ctx)?;
503
504        let physical_table_name = create_table
505            .table_options
506            .get(LOGICAL_TABLE_METADATA_KEY)
507            .with_context(|| CreateLogicalTablesSnafu {
508                reason: format!(
509                    "expect `{LOGICAL_TABLE_METADATA_KEY}` option on creating logical table"
510                ),
511            })?;
512
513        let physical_table = self
514            .catalog_manager
515            .table(
516                &create_table.catalog_name,
517                &create_table.schema_name,
518                physical_table_name,
519                Some(query_ctx),
520            )
521            .await
522            .context(CatalogSnafu)?
523            .context(TableNotFoundSnafu {
524                table_name: physical_table_name.clone(),
525            })?;
526
527        let physical_table_info = physical_table.table_info();
528        let (partition_rule, _) = self
529            .partition_manager
530            .find_table_partition_rule(&physical_table_info)
531            .await
532            .context(error::FindTablePartitionRuleSnafu {
533                table_name: physical_table_name.clone(),
534            })?;
535
536        let multi_dim_rule = partition_rule
537            .as_ref()
538            .as_any()
539            .downcast_ref::<MultiDimPartitionRule>()
540            .context(InvalidPartitionRuleSnafu {
541                reason: "physical table partition rule is not range-based",
542            })?;
543
544        // TODO(ruihang): project physical partition exprs to logical partition column
545        let mut physical_partition_exprs = multi_dim_rule.exprs().to_vec();
546        logical_partition_exprs.sort_unstable();
547        physical_partition_exprs.sort_unstable();
548
549        ensure!(
550            physical_partition_exprs == logical_partition_exprs,
551            InvalidPartitionRuleSnafu {
552                reason: format!(
553                    "logical table partition rule must match the corresponding physical table's\n logical table partition exprs:\t\t {:?}\n physical table partition exprs:\t {:?}",
554                    logical_partition_exprs, physical_partition_exprs
555                ),
556            }
557        );
558
559        Ok(())
560    }
561
562    #[cfg(feature = "enterprise")]
563    #[tracing::instrument(skip_all)]
564    pub async fn create_trigger(
565        &self,
566        stmt: CreateTrigger,
567        query_context: QueryContextRef,
568    ) -> Result<Output> {
569        let expr = expr_helper::to_create_trigger_task_expr(stmt, &query_context)?;
570        self.create_trigger_inner(expr, query_context).await
571    }
572
573    #[cfg(feature = "enterprise")]
574    pub async fn create_trigger_inner(
575        &self,
576        expr: PbCreateTriggerExpr,
577        query_context: QueryContextRef,
578    ) -> Result<Output> {
579        self.create_trigger_procedure(expr, query_context).await?;
580        Ok(Output::new_with_affected_rows(0))
581    }
582
583    #[cfg(feature = "enterprise")]
584    async fn create_trigger_procedure(
585        &self,
586        expr: PbCreateTriggerExpr,
587        query_context: QueryContextRef,
588    ) -> Result<SubmitDdlTaskResponse> {
589        let task = CreateTriggerTask::try_from(PbCreateTriggerTask {
590            create_trigger: Some(expr),
591        })
592        .context(error::InvalidExprSnafu)?;
593
594        let request = SubmitDdlTaskRequest::new(
595            to_meta_query_context(query_context),
596            DdlTask::new_create_trigger(task),
597        );
598
599        self.procedure_executor
600            .submit_ddl_task(&ExecutorContext::default(), request)
601            .await
602            .context(error::ExecuteDdlSnafu)
603    }
604
605    #[tracing::instrument(skip_all)]
606    pub async fn create_flow(
607        &self,
608        stmt: CreateFlow,
609        query_context: QueryContextRef,
610    ) -> Result<Output> {
611        // TODO(ruihang): do some verification
612        let expr = expr_helper::to_create_flow_task_expr(stmt, &query_context)?;
613
614        self.create_flow_inner(expr, query_context).await
615    }
616
617    pub async fn create_flow_inner(
618        &self,
619        expr: CreateFlowExpr,
620        query_context: QueryContextRef,
621    ) -> Result<Output> {
622        self.create_flow_procedure(expr, query_context).await?;
623        Ok(Output::new_with_affected_rows(0))
624    }
625
626    async fn create_flow_procedure(
627        &self,
628        expr: CreateFlowExpr,
629        query_context: QueryContextRef,
630    ) -> Result<SubmitDdlTaskResponse> {
631        let flow_type = self
632            .determine_flow_type(&expr, query_context.clone())
633            .await?;
634        info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
635
636        let expr = {
637            let mut expr = expr;
638            expr.flow_options
639                .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
640            expr
641        };
642
643        let task = CreateFlowTask::try_from(PbCreateFlowTask {
644            create_flow: Some(expr),
645        })
646        .context(error::InvalidExprSnafu)?;
647        let request = SubmitDdlTaskRequest::new(
648            to_meta_query_context(query_context),
649            DdlTask::new_create_flow(task),
650        );
651
652        self.procedure_executor
653            .submit_ddl_task(&ExecutorContext::default(), request)
654            .await
655            .context(error::ExecuteDdlSnafu)
656    }
657
658    /// Determine the flow type based on the SQL query
659    ///
660    /// If it contains aggregation or distinct, then it is a batch flow, otherwise it is a streaming flow
661    async fn determine_flow_type(
662        &self,
663        expr: &CreateFlowExpr,
664        query_ctx: QueryContextRef,
665    ) -> Result<FlowType> {
666        // first check if source table's ttl is instant, if it is, force streaming mode
667        for src_table_name in &expr.source_table_names {
668            let table = self
669                .catalog_manager()
670                .table(
671                    &src_table_name.catalog_name,
672                    &src_table_name.schema_name,
673                    &src_table_name.table_name,
674                    Some(&query_ctx),
675                )
676                .await
677                .map_err(BoxedError::new)
678                .context(ExternalSnafu)?
679                .with_context(|| TableNotFoundSnafu {
680                    table_name: format_full_table_name(
681                        &src_table_name.catalog_name,
682                        &src_table_name.schema_name,
683                        &src_table_name.table_name,
684                    ),
685                })?;
686
687            // instant source table can only be handled by streaming mode
688            if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
689                warn!(
690                    "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
691                    format_full_table_name(
692                        &src_table_name.catalog_name,
693                        &src_table_name.schema_name,
694                        &src_table_name.table_name
695                    ),
696                    expr.flow_name
697                );
698                return Ok(FlowType::Streaming);
699            }
700        }
701
702        let engine = &self.query_engine;
703        let stmts = ParserContext::create_with_dialect(
704            &expr.sql,
705            query_ctx.sql_dialect(),
706            ParseOptions::default(),
707        )
708        .map_err(BoxedError::new)
709        .context(ExternalSnafu)?;
710
711        ensure!(
712            stmts.len() == 1,
713            InvalidSqlSnafu {
714                err_msg: format!("Expect only one statement, found {}", stmts.len())
715            }
716        );
717        let stmt = &stmts[0];
718
719        // support tql parse too
720        let plan = match stmt {
721            // prom ql is only supported in batching mode
722            Statement::Tql(_) => return Ok(FlowType::Batching),
723            _ => engine
724                .planner()
725                .plan(&QueryStatement::Sql(stmt.clone()), query_ctx)
726                .await
727                .map_err(BoxedError::new)
728                .context(ExternalSnafu)?,
729        };
730
731        /// Visitor to find aggregation or distinct
732        struct FindAggr {
733            is_aggr: bool,
734        }
735
736        impl TreeNodeVisitor<'_> for FindAggr {
737            type Node = LogicalPlan;
738            fn f_down(
739                &mut self,
740                node: &Self::Node,
741            ) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
742            {
743                match node {
744                    LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
745                        self.is_aggr = true;
746                        return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
747                    }
748                    _ => (),
749                }
750                Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
751            }
752        }
753
754        let mut find_aggr = FindAggr { is_aggr: false };
755
756        plan.visit_with_subqueries(&mut find_aggr)
757            .context(BuildDfLogicalPlanSnafu)?;
758        if find_aggr.is_aggr {
759            Ok(FlowType::Batching)
760        } else {
761            Ok(FlowType::Streaming)
762        }
763    }
764
765    #[tracing::instrument(skip_all)]
766    pub async fn create_view(
767        &self,
768        create_view: CreateView,
769        ctx: QueryContextRef,
770    ) -> Result<TableRef> {
771        // convert input into logical plan
772        let logical_plan = match &*create_view.query {
773            Statement::Query(query) => {
774                self.plan(
775                    &QueryStatement::Sql(Statement::Query(query.clone())),
776                    ctx.clone(),
777                )
778                .await?
779            }
780            Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?,
781            _ => {
782                return InvalidViewStmtSnafu {}.fail();
783            }
784        };
785        // Save the definition for `show create view`.
786        let definition = create_view.to_string();
787
788        // Save the columns in plan, it may changed when the schemas of tables in plan
789        // are altered.
790        let schema: Schema = logical_plan
791            .schema()
792            .clone()
793            .try_into()
794            .context(ConvertSchemaSnafu)?;
795        let plan_columns: Vec<_> = schema
796            .column_schemas()
797            .iter()
798            .map(|c| c.name.clone())
799            .collect();
800
801        let columns: Vec<_> = create_view
802            .columns
803            .iter()
804            .map(|ident| ident.to_string())
805            .collect();
806
807        // Validate columns
808        if !columns.is_empty() {
809            ensure!(
810                columns.len() == plan_columns.len(),
811                error::ViewColumnsMismatchSnafu {
812                    view_name: create_view.name.to_string(),
813                    expected: plan_columns.len(),
814                    actual: columns.len(),
815                }
816            );
817        }
818
819        // Extract the table names from the original plan
820        // and rewrite them as fully qualified names.
821        let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
822            .context(ExtractTableNamesSnafu)?;
823
824        let table_names = table_names.into_iter().map(|t| t.into()).collect();
825
826        // TODO(dennis): we don't save the optimized plan yet,
827        // because there are some serialization issue with our own defined plan node (such as `MergeScanLogicalPlan`).
828        // When the issues are fixed, we can use the `optimized_plan` instead.
829        // let optimized_plan = self.optimize_logical_plan(logical_plan)?.unwrap_df_plan();
830
831        // encode logical plan
832        let encoded_plan = DFLogicalSubstraitConvertor
833            .encode(&plan, DefaultSerializer)
834            .context(SubstraitCodecSnafu)?;
835
836        let expr = expr_helper::to_create_view_expr(
837            create_view,
838            encoded_plan.to_vec(),
839            table_names,
840            columns,
841            plan_columns,
842            definition,
843            ctx.clone(),
844        )?;
845
846        // TODO(dennis): validate the logical plan
847        self.create_view_by_expr(expr, ctx).await
848    }
849
850    pub async fn create_view_by_expr(
851        &self,
852        expr: CreateViewExpr,
853        ctx: QueryContextRef,
854    ) -> Result<TableRef> {
855        ensure! {
856            !(expr.create_if_not_exists & expr.or_replace),
857            InvalidSqlSnafu {
858                err_msg: "syntax error Create Or Replace and If Not Exist cannot be used together",
859            }
860        };
861        let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
862
863        let schema_exists = self
864            .table_metadata_manager
865            .schema_manager()
866            .exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name))
867            .await
868            .context(TableMetadataManagerSnafu)?;
869
870        ensure!(
871            schema_exists,
872            SchemaNotFoundSnafu {
873                schema_info: &expr.schema_name,
874            }
875        );
876
877        // if view or table exists.
878        if let Some(table) = self
879            .catalog_manager
880            .table(
881                &expr.catalog_name,
882                &expr.schema_name,
883                &expr.view_name,
884                Some(&ctx),
885            )
886            .await
887            .context(CatalogSnafu)?
888        {
889            let table_type = table.table_info().table_type;
890
891            match (table_type, expr.create_if_not_exists, expr.or_replace) {
892                (TableType::View, true, false) => {
893                    return Ok(table);
894                }
895                (TableType::View, false, false) => {
896                    return ViewAlreadyExistsSnafu {
897                        name: format_full_table_name(
898                            &expr.catalog_name,
899                            &expr.schema_name,
900                            &expr.view_name,
901                        ),
902                    }
903                    .fail();
904                }
905                (TableType::View, _, true) => {
906                    // Try to replace an exists view
907                }
908                _ => {
909                    return TableAlreadyExistsSnafu {
910                        table: format_full_table_name(
911                            &expr.catalog_name,
912                            &expr.schema_name,
913                            &expr.view_name,
914                        ),
915                    }
916                    .fail();
917                }
918            }
919        }
920
921        ensure!(
922            NAME_PATTERN_REG.is_match(&expr.view_name),
923            InvalidViewNameSnafu {
924                name: expr.view_name.clone(),
925            }
926        );
927
928        let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
929
930        let mut view_info = TableInfo {
931            ident: metadata::TableIdent {
932                // The view id of distributed table is assigned by Meta, set "0" here as a placeholder.
933                table_id: 0,
934                version: 0,
935            },
936            name: expr.view_name.clone(),
937            desc: None,
938            catalog_name: expr.catalog_name.clone(),
939            schema_name: expr.schema_name.clone(),
940            // The meta doesn't make sense for views, so using a default one.
941            meta: TableMeta::empty(),
942            table_type: TableType::View,
943        };
944
945        let request = SubmitDdlTaskRequest::new(
946            to_meta_query_context(ctx),
947            DdlTask::new_create_view(expr, view_info.clone()),
948        );
949
950        let resp = self
951            .procedure_executor
952            .submit_ddl_task(&ExecutorContext::default(), request)
953            .await
954            .context(error::ExecuteDdlSnafu)?;
955
956        debug!(
957            "Submit creating view '{view_name}' task response: {:?}",
958            resp
959        );
960
961        let view_id = resp
962            .table_ids
963            .into_iter()
964            .next()
965            .context(error::UnexpectedSnafu {
966                violated: "expected table_id",
967            })?;
968        info!("Successfully created view '{view_name}' with view id {view_id}");
969
970        view_info.ident.table_id = view_id;
971
972        let view_info = Arc::new(view_info);
973
974        let table = DistTable::table(view_info);
975
976        // Invalidates local cache ASAP.
977        self.cache_invalidator
978            .invalidate(
979                &Context::default(),
980                &[
981                    CacheIdent::TableId(view_id),
982                    CacheIdent::TableName(view_name.clone()),
983                ],
984            )
985            .await
986            .context(error::InvalidateTableCacheSnafu)?;
987
988        Ok(table)
989    }
990
991    #[tracing::instrument(skip_all)]
992    pub async fn drop_flow(
993        &self,
994        catalog_name: String,
995        flow_name: String,
996        drop_if_exists: bool,
997        query_context: QueryContextRef,
998    ) -> Result<Output> {
999        if let Some(flow) = self
1000            .flow_metadata_manager
1001            .flow_name_manager()
1002            .get(&catalog_name, &flow_name)
1003            .await
1004            .context(error::TableMetadataManagerSnafu)?
1005        {
1006            let flow_id = flow.flow_id();
1007            let task = DropFlowTask {
1008                catalog_name,
1009                flow_name,
1010                flow_id,
1011                drop_if_exists,
1012            };
1013            self.drop_flow_procedure(task, query_context).await?;
1014
1015            Ok(Output::new_with_affected_rows(0))
1016        } else if drop_if_exists {
1017            Ok(Output::new_with_affected_rows(0))
1018        } else {
1019            FlowNotFoundSnafu {
1020                flow_name: format_full_flow_name(&catalog_name, &flow_name),
1021            }
1022            .fail()
1023        }
1024    }
1025
1026    async fn drop_flow_procedure(
1027        &self,
1028        expr: DropFlowTask,
1029        query_context: QueryContextRef,
1030    ) -> Result<SubmitDdlTaskResponse> {
1031        let request = SubmitDdlTaskRequest::new(
1032            to_meta_query_context(query_context),
1033            DdlTask::new_drop_flow(expr),
1034        );
1035
1036        self.procedure_executor
1037            .submit_ddl_task(&ExecutorContext::default(), request)
1038            .await
1039            .context(error::ExecuteDdlSnafu)
1040    }
1041
1042    #[cfg(feature = "enterprise")]
1043    #[tracing::instrument(skip_all)]
1044    pub(super) async fn drop_trigger(
1045        &self,
1046        catalog_name: String,
1047        trigger_name: String,
1048        drop_if_exists: bool,
1049        query_context: QueryContextRef,
1050    ) -> Result<Output> {
1051        let task = DropTriggerTask {
1052            catalog_name,
1053            trigger_name,
1054            drop_if_exists,
1055        };
1056        self.drop_trigger_procedure(task, query_context).await?;
1057        Ok(Output::new_with_affected_rows(0))
1058    }
1059
1060    #[cfg(feature = "enterprise")]
1061    async fn drop_trigger_procedure(
1062        &self,
1063        expr: DropTriggerTask,
1064        query_context: QueryContextRef,
1065    ) -> Result<SubmitDdlTaskResponse> {
1066        let request = SubmitDdlTaskRequest::new(
1067            to_meta_query_context(query_context),
1068            DdlTask::new_drop_trigger(expr),
1069        );
1070
1071        self.procedure_executor
1072            .submit_ddl_task(&ExecutorContext::default(), request)
1073            .await
1074            .context(error::ExecuteDdlSnafu)
1075    }
1076
1077    /// Drop a view
1078    #[tracing::instrument(skip_all)]
1079    pub(crate) async fn drop_view(
1080        &self,
1081        catalog: String,
1082        schema: String,
1083        view: String,
1084        drop_if_exists: bool,
1085        query_context: QueryContextRef,
1086    ) -> Result<Output> {
1087        let view_info = if let Some(view) = self
1088            .catalog_manager
1089            .table(&catalog, &schema, &view, None)
1090            .await
1091            .context(CatalogSnafu)?
1092        {
1093            view.table_info()
1094        } else if drop_if_exists {
1095            // DROP VIEW IF EXISTS meets view not found - ignored
1096            return Ok(Output::new_with_affected_rows(0));
1097        } else {
1098            return TableNotFoundSnafu {
1099                table_name: format_full_table_name(&catalog, &schema, &view),
1100            }
1101            .fail();
1102        };
1103
1104        // Ensure the exists one is view, we can't drop other table types
1105        ensure!(
1106            view_info.table_type == TableType::View,
1107            error::InvalidViewSnafu {
1108                msg: "not a view",
1109                view_name: format_full_table_name(&catalog, &schema, &view),
1110            }
1111        );
1112
1113        let view_id = view_info.table_id();
1114
1115        let task = DropViewTask {
1116            catalog,
1117            schema,
1118            view,
1119            view_id,
1120            drop_if_exists,
1121        };
1122
1123        self.drop_view_procedure(task, query_context).await?;
1124
1125        Ok(Output::new_with_affected_rows(0))
1126    }
1127
1128    /// Submit [DropViewTask] to procedure executor.
1129    async fn drop_view_procedure(
1130        &self,
1131        expr: DropViewTask,
1132        query_context: QueryContextRef,
1133    ) -> Result<SubmitDdlTaskResponse> {
1134        let request = SubmitDdlTaskRequest::new(
1135            to_meta_query_context(query_context),
1136            DdlTask::new_drop_view(expr),
1137        );
1138
1139        self.procedure_executor
1140            .submit_ddl_task(&ExecutorContext::default(), request)
1141            .await
1142            .context(error::ExecuteDdlSnafu)
1143    }
1144
1145    #[tracing::instrument(skip_all)]
1146    pub async fn alter_logical_tables(
1147        &self,
1148        alter_table_exprs: Vec<AlterTableExpr>,
1149        query_context: QueryContextRef,
1150    ) -> Result<Output> {
1151        let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
1152        ensure!(
1153            !alter_table_exprs.is_empty(),
1154            EmptyDdlExprSnafu {
1155                name: "alter logical tables"
1156            }
1157        );
1158
1159        // group by physical table id
1160        let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
1161        for expr in alter_table_exprs {
1162            // Get table_id from catalog_manager
1163            let catalog = if expr.catalog_name.is_empty() {
1164                query_context.current_catalog()
1165            } else {
1166                &expr.catalog_name
1167            };
1168            let schema = if expr.schema_name.is_empty() {
1169                query_context.current_schema()
1170            } else {
1171                expr.schema_name.clone()
1172            };
1173            let table_name = &expr.table_name;
1174            let table = self
1175                .catalog_manager
1176                .table(catalog, &schema, table_name, Some(&query_context))
1177                .await
1178                .context(CatalogSnafu)?
1179                .with_context(|| TableNotFoundSnafu {
1180                    table_name: format_full_table_name(catalog, &schema, table_name),
1181                })?;
1182            let table_id = table.table_info().ident.table_id;
1183            let physical_table_id = self
1184                .table_metadata_manager
1185                .table_route_manager()
1186                .get_physical_table_id(table_id)
1187                .await
1188                .context(TableMetadataManagerSnafu)?;
1189            groups.entry(physical_table_id).or_default().push(expr);
1190        }
1191
1192        // Submit procedure for each physical table
1193        let mut handles = Vec::with_capacity(groups.len());
1194        for (_physical_table_id, exprs) in groups {
1195            let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
1196            handles.push(fut);
1197        }
1198        let _results = futures::future::try_join_all(handles).await?;
1199
1200        Ok(Output::new_with_affected_rows(0))
1201    }
1202
1203    #[tracing::instrument(skip_all)]
1204    pub async fn drop_table(
1205        &self,
1206        table_name: TableName,
1207        drop_if_exists: bool,
1208        query_context: QueryContextRef,
1209    ) -> Result<Output> {
1210        // Reserved for grpc call
1211        self.drop_tables(&[table_name], drop_if_exists, query_context)
1212            .await
1213    }
1214
1215    #[tracing::instrument(skip_all)]
1216    pub async fn drop_tables(
1217        &self,
1218        table_names: &[TableName],
1219        drop_if_exists: bool,
1220        query_context: QueryContextRef,
1221    ) -> Result<Output> {
1222        let mut tables = Vec::with_capacity(table_names.len());
1223        for table_name in table_names {
1224            ensure!(
1225                !is_readonly_schema(&table_name.schema_name),
1226                SchemaReadOnlySnafu {
1227                    name: table_name.schema_name.clone()
1228                }
1229            );
1230
1231            if let Some(table) = self
1232                .catalog_manager
1233                .table(
1234                    &table_name.catalog_name,
1235                    &table_name.schema_name,
1236                    &table_name.table_name,
1237                    Some(&query_context),
1238                )
1239                .await
1240                .context(CatalogSnafu)?
1241            {
1242                tables.push(table.table_info().table_id());
1243            } else if drop_if_exists {
1244                // DROP TABLE IF EXISTS meets table not found - ignored
1245                continue;
1246            } else {
1247                return TableNotFoundSnafu {
1248                    table_name: table_name.to_string(),
1249                }
1250                .fail();
1251            }
1252        }
1253
1254        for (table_name, table_id) in table_names.iter().zip(tables.into_iter()) {
1255            self.drop_table_procedure(table_name, table_id, drop_if_exists, query_context.clone())
1256                .await?;
1257
1258            // Invalidates local cache ASAP.
1259            self.cache_invalidator
1260                .invalidate(
1261                    &Context::default(),
1262                    &[
1263                        CacheIdent::TableId(table_id),
1264                        CacheIdent::TableName(table_name.clone()),
1265                    ],
1266                )
1267                .await
1268                .context(error::InvalidateTableCacheSnafu)?;
1269        }
1270        Ok(Output::new_with_affected_rows(0))
1271    }
1272
1273    #[tracing::instrument(skip_all)]
1274    pub async fn drop_database(
1275        &self,
1276        catalog: String,
1277        schema: String,
1278        drop_if_exists: bool,
1279        query_context: QueryContextRef,
1280    ) -> Result<Output> {
1281        ensure!(
1282            !is_readonly_schema(&schema),
1283            SchemaReadOnlySnafu { name: schema }
1284        );
1285
1286        if self
1287            .catalog_manager
1288            .schema_exists(&catalog, &schema, None)
1289            .await
1290            .context(CatalogSnafu)?
1291        {
1292            if schema == query_context.current_schema() {
1293                SchemaInUseSnafu { name: schema }.fail()
1294            } else {
1295                self.drop_database_procedure(catalog, schema, drop_if_exists, query_context)
1296                    .await?;
1297
1298                Ok(Output::new_with_affected_rows(0))
1299            }
1300        } else if drop_if_exists {
1301            // DROP TABLE IF EXISTS meets table not found - ignored
1302            Ok(Output::new_with_affected_rows(0))
1303        } else {
1304            SchemaNotFoundSnafu {
1305                schema_info: schema,
1306            }
1307            .fail()
1308        }
1309    }
1310
1311    #[tracing::instrument(skip_all)]
1312    pub async fn truncate_table(
1313        &self,
1314        table_name: TableName,
1315        time_ranges: Vec<(Timestamp, Timestamp)>,
1316        query_context: QueryContextRef,
1317    ) -> Result<Output> {
1318        ensure!(
1319            !is_readonly_schema(&table_name.schema_name),
1320            SchemaReadOnlySnafu {
1321                name: table_name.schema_name.clone()
1322            }
1323        );
1324
1325        let table = self
1326            .catalog_manager
1327            .table(
1328                &table_name.catalog_name,
1329                &table_name.schema_name,
1330                &table_name.table_name,
1331                Some(&query_context),
1332            )
1333            .await
1334            .context(CatalogSnafu)?
1335            .with_context(|| TableNotFoundSnafu {
1336                table_name: table_name.to_string(),
1337            })?;
1338        let table_id = table.table_info().table_id();
1339        self.truncate_table_procedure(&table_name, table_id, time_ranges, query_context)
1340            .await?;
1341
1342        Ok(Output::new_with_affected_rows(0))
1343    }
1344
1345    #[tracing::instrument(skip_all)]
1346    pub async fn alter_table(
1347        &self,
1348        alter_table: AlterTable,
1349        query_context: QueryContextRef,
1350    ) -> Result<Output> {
1351        if matches!(
1352            alter_table.alter_operation(),
1353            AlterTableOperation::Repartition { .. }
1354        ) {
1355            let request = expr_helper::to_repartition_request(alter_table, &query_context)?;
1356            return self.repartition_table(request, &query_context).await;
1357        }
1358
1359        let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
1360        self.alter_table_inner(expr, query_context).await
1361    }
1362
1363    #[tracing::instrument(skip_all)]
1364    pub async fn repartition_table(
1365        &self,
1366        request: RepartitionRequest,
1367        query_context: &QueryContextRef,
1368    ) -> Result<Output> {
1369        // Check if the schema is read-only.
1370        ensure!(
1371            !is_readonly_schema(&request.schema_name),
1372            SchemaReadOnlySnafu {
1373                name: request.schema_name.clone()
1374            }
1375        );
1376
1377        let table_ref = TableReference::full(
1378            &request.catalog_name,
1379            &request.schema_name,
1380            &request.table_name,
1381        );
1382        // Get the table from the catalog.
1383        let table = self
1384            .catalog_manager
1385            .table(
1386                &request.catalog_name,
1387                &request.schema_name,
1388                &request.table_name,
1389                Some(query_context),
1390            )
1391            .await
1392            .context(CatalogSnafu)?
1393            .with_context(|| TableNotFoundSnafu {
1394                table_name: table_ref.to_string(),
1395            })?;
1396        let table_id = table.table_info().ident.table_id;
1397        // Get existing partition expressions from the table route.
1398        let (physical_table_id, physical_table_route) = self
1399            .table_metadata_manager
1400            .table_route_manager()
1401            .get_physical_table_route(table_id)
1402            .await
1403            .context(TableMetadataManagerSnafu)?;
1404
1405        ensure!(
1406            physical_table_id == table_id,
1407            NotSupportedSnafu {
1408                feat: "REPARTITION on logical tables"
1409            }
1410        );
1411
1412        let table_info = table.table_info();
1413        // Get partition column names from the table metadata.
1414        let existing_partition_columns = table_info.meta.partition_columns().collect::<Vec<_>>();
1415        // Repartition requires the table to have partition columns.
1416        ensure!(
1417            !existing_partition_columns.is_empty(),
1418            InvalidPartitionRuleSnafu {
1419                reason: format!(
1420                    "table {} does not have partition columns, cannot repartition",
1421                    table_ref
1422                )
1423            }
1424        );
1425
1426        // Repartition operations involving columns outside the existing partition columns are not supported.
1427        // This restriction ensures repartition only applies to current partition columns.
1428        let column_name_and_type = existing_partition_columns
1429            .iter()
1430            .map(|column| (&column.name, column.data_type.clone()))
1431            .collect();
1432        let timezone = query_context.timezone();
1433        // Convert SQL Exprs to PartitionExprs.
1434        let from_partition_exprs = request
1435            .from_exprs
1436            .iter()
1437            .map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone))
1438            .collect::<Result<Vec<_>>>()?;
1439
1440        let mut into_partition_exprs = request
1441            .into_exprs
1442            .iter()
1443            .map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone))
1444            .collect::<Result<Vec<_>>>()?;
1445
1446        // `MERGE PARTITION` (and some `REPARTITION`) generates a single `OR` expression from
1447        // multiple source partitions; try to simplify it for better readability and stability.
1448        if from_partition_exprs.len() > 1
1449            && into_partition_exprs.len() == 1
1450            && let Some(expr) = into_partition_exprs.pop()
1451        {
1452            into_partition_exprs.push(partition::simplify::simplify_merged_partition_expr(expr));
1453        }
1454
1455        // Parse existing partition expressions from region routes.
1456        let mut existing_partition_exprs =
1457            Vec::with_capacity(physical_table_route.region_routes.len());
1458        for route in &physical_table_route.region_routes {
1459            let expr_json = route.region.partition_expr();
1460            if !expr_json.is_empty() {
1461                match PartitionExpr::from_json_str(&expr_json) {
1462                    Ok(Some(expr)) => existing_partition_exprs.push(expr),
1463                    Ok(None) => {
1464                        // Empty
1465                    }
1466                    Err(e) => {
1467                        return Err(e).context(DeserializePartitionExprSnafu);
1468                    }
1469                }
1470            }
1471        }
1472
1473        // Validate that from_partition_exprs are a subset of existing partition exprs.
1474        // We compare PartitionExpr directly since it implements Eq.
1475        for from_expr in &from_partition_exprs {
1476            ensure!(
1477                existing_partition_exprs.contains(from_expr),
1478                InvalidPartitionRuleSnafu {
1479                    reason: format!(
1480                        "partition expression '{}' does not exist in table {}",
1481                        from_expr, table_ref
1482                    )
1483                }
1484            );
1485        }
1486
1487        // Build the new partition expressions:
1488        // new_exprs = existing_exprs - from_exprs + into_exprs
1489        let new_partition_exprs: Vec<PartitionExpr> = existing_partition_exprs
1490            .into_iter()
1491            .filter(|expr| !from_partition_exprs.contains(expr))
1492            .chain(into_partition_exprs.clone().into_iter())
1493            .collect();
1494        let new_partition_exprs_len = new_partition_exprs.len();
1495        let from_partition_exprs_len = from_partition_exprs.len();
1496
1497        // Validate the new partition expressions using MultiDimPartitionRule and PartitionChecker.
1498        let _ = MultiDimPartitionRule::try_new(
1499            existing_partition_columns
1500                .iter()
1501                .map(|c| c.name.clone())
1502                .collect(),
1503            vec![],
1504            new_partition_exprs,
1505            true,
1506        )
1507        .context(InvalidPartitionSnafu)?;
1508
1509        let ddl_options = parse_ddl_options(&request.options)?;
1510        let serialize_exprs = |exprs: Vec<PartitionExpr>| -> Result<Vec<String>> {
1511            let mut json_exprs = Vec::with_capacity(exprs.len());
1512            for expr in exprs {
1513                json_exprs.push(expr.as_json_str().context(SerializePartitionExprSnafu)?);
1514            }
1515            Ok(json_exprs)
1516        };
1517        let from_partition_exprs_json = serialize_exprs(from_partition_exprs)?;
1518        let into_partition_exprs_json = serialize_exprs(into_partition_exprs)?;
1519        let mut req = SubmitDdlTaskRequest::new(
1520            to_meta_query_context(query_context.clone()),
1521            DdlTask::new_alter_table(AlterTableExpr {
1522                catalog_name: request.catalog_name.clone(),
1523                schema_name: request.schema_name.clone(),
1524                table_name: request.table_name.clone(),
1525                kind: Some(Kind::Repartition(Repartition {
1526                    from_partition_exprs: from_partition_exprs_json,
1527                    into_partition_exprs: into_partition_exprs_json,
1528                })),
1529            }),
1530        );
1531        req.wait = ddl_options.wait;
1532        req.timeout = ddl_options.timeout;
1533
1534        info!(
1535            "Submitting repartition task for table {} (table_id={}), from {} to {} partitions, timeout: {:?}, wait: {}",
1536            table_ref,
1537            table_id,
1538            from_partition_exprs_len,
1539            new_partition_exprs_len,
1540            ddl_options.timeout,
1541            ddl_options.wait
1542        );
1543
1544        let response = self
1545            .procedure_executor
1546            .submit_ddl_task(&ExecutorContext::default(), req)
1547            .await
1548            .context(error::ExecuteDdlSnafu)?;
1549
1550        if !ddl_options.wait {
1551            return build_procedure_id_output(response.key);
1552        }
1553
1554        // Only invalidate cache if wait is true.
1555        let invalidate_keys = vec![
1556            CacheIdent::TableId(table_id),
1557            CacheIdent::TableName(TableName::new(
1558                request.catalog_name,
1559                request.schema_name,
1560                request.table_name,
1561            )),
1562        ];
1563
1564        // Invalidates local cache ASAP.
1565        self.cache_invalidator
1566            .invalidate(&Context::default(), &invalidate_keys)
1567            .await
1568            .context(error::InvalidateTableCacheSnafu)?;
1569
1570        Ok(Output::new_with_affected_rows(0))
1571    }
1572
1573    #[tracing::instrument(skip_all)]
1574    pub async fn alter_table_inner(
1575        &self,
1576        expr: AlterTableExpr,
1577        query_context: QueryContextRef,
1578    ) -> Result<Output> {
1579        ensure!(
1580            !is_readonly_schema(&expr.schema_name),
1581            SchemaReadOnlySnafu {
1582                name: expr.schema_name.clone()
1583            }
1584        );
1585
1586        let catalog_name = if expr.catalog_name.is_empty() {
1587            DEFAULT_CATALOG_NAME.to_string()
1588        } else {
1589            expr.catalog_name.clone()
1590        };
1591
1592        let schema_name = if expr.schema_name.is_empty() {
1593            DEFAULT_SCHEMA_NAME.to_string()
1594        } else {
1595            expr.schema_name.clone()
1596        };
1597
1598        let table_name = expr.table_name.clone();
1599
1600        let table = self
1601            .catalog_manager
1602            .table(
1603                &catalog_name,
1604                &schema_name,
1605                &table_name,
1606                Some(&query_context),
1607            )
1608            .await
1609            .context(CatalogSnafu)?
1610            .with_context(|| TableNotFoundSnafu {
1611                table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
1612            })?;
1613
1614        let table_id = table.table_info().ident.table_id;
1615        let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?;
1616        if !need_alter {
1617            return Ok(Output::new_with_affected_rows(0));
1618        }
1619        info!(
1620            "Table info before alter is {:?}, expr: {:?}",
1621            table.table_info(),
1622            expr
1623        );
1624
1625        let physical_table_id = self
1626            .table_metadata_manager
1627            .table_route_manager()
1628            .get_physical_table_id(table_id)
1629            .await
1630            .context(TableMetadataManagerSnafu)?;
1631
1632        let (req, invalidate_keys) = if physical_table_id == table_id {
1633            // This is physical table
1634            let req = SubmitDdlTaskRequest::new(
1635                to_meta_query_context(query_context),
1636                DdlTask::new_alter_table(expr),
1637            );
1638
1639            let invalidate_keys = vec![
1640                CacheIdent::TableId(table_id),
1641                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1642            ];
1643
1644            (req, invalidate_keys)
1645        } else {
1646            // This is logical table
1647            let req = SubmitDdlTaskRequest::new(
1648                to_meta_query_context(query_context),
1649                DdlTask::new_alter_logical_tables(vec![expr]),
1650            );
1651
1652            let mut invalidate_keys = vec![
1653                CacheIdent::TableId(physical_table_id),
1654                CacheIdent::TableId(table_id),
1655                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1656            ];
1657
1658            let physical_table = self
1659                .table_metadata_manager
1660                .table_info_manager()
1661                .get(physical_table_id)
1662                .await
1663                .context(TableMetadataManagerSnafu)?
1664                .map(|x| x.into_inner());
1665            if let Some(physical_table) = physical_table {
1666                let physical_table_name = TableName::new(
1667                    physical_table.table_info.catalog_name,
1668                    physical_table.table_info.schema_name,
1669                    physical_table.table_info.name,
1670                );
1671                invalidate_keys.push(CacheIdent::TableName(physical_table_name));
1672            }
1673
1674            (req, invalidate_keys)
1675        };
1676
1677        self.procedure_executor
1678            .submit_ddl_task(&ExecutorContext::default(), req)
1679            .await
1680            .context(error::ExecuteDdlSnafu)?;
1681
1682        // Invalidates local cache ASAP.
1683        self.cache_invalidator
1684            .invalidate(&Context::default(), &invalidate_keys)
1685            .await
1686            .context(error::InvalidateTableCacheSnafu)?;
1687
1688        Ok(Output::new_with_affected_rows(0))
1689    }
1690
1691    #[cfg(feature = "enterprise")]
1692    #[tracing::instrument(skip_all)]
1693    pub async fn alter_trigger(
1694        &self,
1695        _alter_expr: AlterTrigger,
1696        _query_context: QueryContextRef,
1697    ) -> Result<Output> {
1698        crate::error::NotSupportedSnafu {
1699            feat: "alter trigger",
1700        }
1701        .fail()
1702    }
1703
1704    #[tracing::instrument(skip_all)]
1705    pub async fn alter_database(
1706        &self,
1707        alter_expr: AlterDatabase,
1708        query_context: QueryContextRef,
1709    ) -> Result<Output> {
1710        let alter_expr = expr_helper::to_alter_database_expr(alter_expr, &query_context)?;
1711        self.alter_database_inner(alter_expr, query_context).await
1712    }
1713
1714    #[tracing::instrument(skip_all)]
1715    pub async fn alter_database_inner(
1716        &self,
1717        alter_expr: AlterDatabaseExpr,
1718        query_context: QueryContextRef,
1719    ) -> Result<Output> {
1720        ensure!(
1721            !is_readonly_schema(&alter_expr.schema_name),
1722            SchemaReadOnlySnafu {
1723                name: query_context.current_schema().clone()
1724            }
1725        );
1726
1727        let exists = self
1728            .catalog_manager
1729            .schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
1730            .await
1731            .context(CatalogSnafu)?;
1732        ensure!(
1733            exists,
1734            SchemaNotFoundSnafu {
1735                schema_info: alter_expr.schema_name,
1736            }
1737        );
1738
1739        let cache_ident = [CacheIdent::SchemaName(SchemaName {
1740            catalog_name: alter_expr.catalog_name.clone(),
1741            schema_name: alter_expr.schema_name.clone(),
1742        })];
1743
1744        self.alter_database_procedure(alter_expr, query_context)
1745            .await?;
1746
1747        // Invalidates local cache ASAP.
1748        self.cache_invalidator
1749            .invalidate(&Context::default(), &cache_ident)
1750            .await
1751            .context(error::InvalidateTableCacheSnafu)?;
1752
1753        Ok(Output::new_with_affected_rows(0))
1754    }
1755
1756    async fn create_table_procedure(
1757        &self,
1758        create_table: CreateTableExpr,
1759        partitions: Vec<PartitionExpr>,
1760        table_info: TableInfo,
1761        query_context: QueryContextRef,
1762    ) -> Result<SubmitDdlTaskResponse> {
1763        let partitions = partitions
1764            .into_iter()
1765            .map(|expr| expr.as_pb_partition().context(PartitionExprToPbSnafu))
1766            .collect::<Result<Vec<_>>>()?;
1767
1768        let request = SubmitDdlTaskRequest::new(
1769            to_meta_query_context(query_context),
1770            DdlTask::new_create_table(create_table, partitions, table_info),
1771        );
1772
1773        self.procedure_executor
1774            .submit_ddl_task(&ExecutorContext::default(), request)
1775            .await
1776            .context(error::ExecuteDdlSnafu)
1777    }
1778
1779    async fn create_logical_tables_procedure(
1780        &self,
1781        tables_data: Vec<(CreateTableExpr, TableInfo)>,
1782        query_context: QueryContextRef,
1783    ) -> Result<SubmitDdlTaskResponse> {
1784        let request = SubmitDdlTaskRequest::new(
1785            to_meta_query_context(query_context),
1786            DdlTask::new_create_logical_tables(tables_data),
1787        );
1788
1789        self.procedure_executor
1790            .submit_ddl_task(&ExecutorContext::default(), request)
1791            .await
1792            .context(error::ExecuteDdlSnafu)
1793    }
1794
1795    async fn alter_logical_tables_procedure(
1796        &self,
1797        tables_data: Vec<AlterTableExpr>,
1798        query_context: QueryContextRef,
1799    ) -> Result<SubmitDdlTaskResponse> {
1800        let request = SubmitDdlTaskRequest::new(
1801            to_meta_query_context(query_context),
1802            DdlTask::new_alter_logical_tables(tables_data),
1803        );
1804
1805        self.procedure_executor
1806            .submit_ddl_task(&ExecutorContext::default(), request)
1807            .await
1808            .context(error::ExecuteDdlSnafu)
1809    }
1810
1811    async fn drop_table_procedure(
1812        &self,
1813        table_name: &TableName,
1814        table_id: TableId,
1815        drop_if_exists: bool,
1816        query_context: QueryContextRef,
1817    ) -> Result<SubmitDdlTaskResponse> {
1818        let request = SubmitDdlTaskRequest::new(
1819            to_meta_query_context(query_context),
1820            DdlTask::new_drop_table(
1821                table_name.catalog_name.clone(),
1822                table_name.schema_name.clone(),
1823                table_name.table_name.clone(),
1824                table_id,
1825                drop_if_exists,
1826            ),
1827        );
1828
1829        self.procedure_executor
1830            .submit_ddl_task(&ExecutorContext::default(), request)
1831            .await
1832            .context(error::ExecuteDdlSnafu)
1833    }
1834
1835    async fn drop_database_procedure(
1836        &self,
1837        catalog: String,
1838        schema: String,
1839        drop_if_exists: bool,
1840        query_context: QueryContextRef,
1841    ) -> Result<SubmitDdlTaskResponse> {
1842        let request = SubmitDdlTaskRequest::new(
1843            to_meta_query_context(query_context),
1844            DdlTask::new_drop_database(catalog, schema, drop_if_exists),
1845        );
1846
1847        self.procedure_executor
1848            .submit_ddl_task(&ExecutorContext::default(), request)
1849            .await
1850            .context(error::ExecuteDdlSnafu)
1851    }
1852
1853    async fn alter_database_procedure(
1854        &self,
1855        alter_expr: AlterDatabaseExpr,
1856        query_context: QueryContextRef,
1857    ) -> Result<SubmitDdlTaskResponse> {
1858        let request = SubmitDdlTaskRequest::new(
1859            to_meta_query_context(query_context),
1860            DdlTask::new_alter_database(alter_expr),
1861        );
1862
1863        self.procedure_executor
1864            .submit_ddl_task(&ExecutorContext::default(), request)
1865            .await
1866            .context(error::ExecuteDdlSnafu)
1867    }
1868
1869    async fn truncate_table_procedure(
1870        &self,
1871        table_name: &TableName,
1872        table_id: TableId,
1873        time_ranges: Vec<(Timestamp, Timestamp)>,
1874        query_context: QueryContextRef,
1875    ) -> Result<SubmitDdlTaskResponse> {
1876        let request = SubmitDdlTaskRequest::new(
1877            to_meta_query_context(query_context),
1878            DdlTask::new_truncate_table(
1879                table_name.catalog_name.clone(),
1880                table_name.schema_name.clone(),
1881                table_name.table_name.clone(),
1882                table_id,
1883                time_ranges,
1884            ),
1885        );
1886
1887        self.procedure_executor
1888            .submit_ddl_task(&ExecutorContext::default(), request)
1889            .await
1890            .context(error::ExecuteDdlSnafu)
1891    }
1892
1893    #[tracing::instrument(skip_all)]
1894    pub async fn create_database(
1895        &self,
1896        database: &str,
1897        create_if_not_exists: bool,
1898        options: HashMap<String, String>,
1899        query_context: QueryContextRef,
1900    ) -> Result<Output> {
1901        let catalog = query_context.current_catalog();
1902        ensure!(
1903            NAME_PATTERN_REG.is_match(catalog),
1904            error::UnexpectedSnafu {
1905                violated: format!("Invalid catalog name: {}", catalog)
1906            }
1907        );
1908
1909        ensure!(
1910            NAME_PATTERN_REG.is_match(database),
1911            error::UnexpectedSnafu {
1912                violated: format!("Invalid database name: {}", database)
1913            }
1914        );
1915
1916        if !self
1917            .catalog_manager
1918            .schema_exists(catalog, database, None)
1919            .await
1920            .context(CatalogSnafu)?
1921            && !self.catalog_manager.is_reserved_schema_name(database)
1922        {
1923            self.create_database_procedure(
1924                catalog.to_string(),
1925                database.to_string(),
1926                create_if_not_exists,
1927                options,
1928                query_context,
1929            )
1930            .await?;
1931
1932            Ok(Output::new_with_affected_rows(1))
1933        } else if create_if_not_exists {
1934            Ok(Output::new_with_affected_rows(1))
1935        } else {
1936            error::SchemaExistsSnafu { name: database }.fail()
1937        }
1938    }
1939
1940    async fn create_database_procedure(
1941        &self,
1942        catalog: String,
1943        database: String,
1944        create_if_not_exists: bool,
1945        options: HashMap<String, String>,
1946        query_context: QueryContextRef,
1947    ) -> Result<SubmitDdlTaskResponse> {
1948        let request = SubmitDdlTaskRequest::new(
1949            to_meta_query_context(query_context),
1950            DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
1951        );
1952
1953        self.procedure_executor
1954            .submit_ddl_task(&ExecutorContext::default(), request)
1955            .await
1956            .context(error::ExecuteDdlSnafu)
1957    }
1958}
1959
1960/// Parse partition statement [Partitions] into [PartitionExpr] and partition columns.
1961pub fn parse_partitions(
1962    create_table: &CreateTableExpr,
1963    partitions: Option<Partitions>,
1964    query_ctx: &QueryContextRef,
1965) -> Result<(Vec<PartitionExpr>, Vec<String>)> {
1966    // If partitions are not defined by user, use the timestamp column (which has to be existed) as
1967    // the partition column, and create only one partition.
1968    let partition_columns = find_partition_columns(&partitions)?;
1969    let partition_exprs =
1970        find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
1971
1972    // Validates partition
1973    let exprs = partition_exprs.clone();
1974    MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true)
1975        .context(InvalidPartitionSnafu)?;
1976
1977    Ok((partition_exprs, partition_columns))
1978}
1979
1980fn parse_partitions_for_logical_validation(
1981    create_table: &CreateTableExpr,
1982    partitions: &Partitions,
1983    query_ctx: &QueryContextRef,
1984) -> Result<(Vec<String>, Vec<PartitionExpr>)> {
1985    let partition_columns = partitions
1986        .column_list
1987        .iter()
1988        .map(|ident| ident.value.clone())
1989        .collect::<Vec<_>>();
1990
1991    let column_name_and_type = partition_columns
1992        .iter()
1993        .map(|pc| {
1994            let column = create_table
1995                .column_defs
1996                .iter()
1997                .find(|c| &c.name == pc)
1998                .context(ColumnNotFoundSnafu { msg: pc.clone() })?;
1999            let column_name = &column.name;
2000            let data_type = ConcreteDataType::from(
2001                ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
2002                    .context(ColumnDataTypeSnafu)?,
2003            );
2004            Ok((column_name, data_type))
2005        })
2006        .collect::<Result<HashMap<_, _>>>()?;
2007
2008    let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
2009    for expr in &partitions.exprs {
2010        let partition_expr = convert_one_expr(expr, &column_name_and_type, &query_ctx.timezone())?;
2011        partition_exprs.push(partition_expr);
2012    }
2013
2014    MultiDimPartitionRule::try_new(
2015        partition_columns.clone(),
2016        vec![],
2017        partition_exprs.clone(),
2018        true,
2019    )
2020    .context(InvalidPartitionSnafu)?;
2021
2022    Ok((partition_columns, partition_exprs))
2023}
2024
2025/// Verifies an alter and returns whether it is necessary to perform the alter.
2026///
2027/// # Returns
2028///
2029/// Returns true if the alter need to be porformed; otherwise, it returns false.
2030pub fn verify_alter(
2031    table_id: TableId,
2032    table_info: Arc<TableInfo>,
2033    expr: AlterTableExpr,
2034) -> Result<bool> {
2035    let request: AlterTableRequest =
2036        common_grpc_expr::alter_expr_to_request(table_id, expr, Some(&table_info.meta))
2037            .context(AlterExprToRequestSnafu)?;
2038
2039    let AlterTableRequest {
2040        table_name,
2041        alter_kind,
2042        ..
2043    } = &request;
2044
2045    if let AlterKind::RenameTable { new_table_name } = alter_kind {
2046        ensure!(
2047            NAME_PATTERN_REG.is_match(new_table_name),
2048            error::UnexpectedSnafu {
2049                violated: format!("Invalid table name: {}", new_table_name)
2050            }
2051        );
2052    } else if let AlterKind::AddColumns { columns } = alter_kind {
2053        // If all the columns are marked as add_if_not_exists and they already exist in the table,
2054        // there is no need to perform the alter.
2055        let column_names: HashSet<_> = table_info
2056            .meta
2057            .schema
2058            .column_schemas()
2059            .iter()
2060            .map(|schema| &schema.name)
2061            .collect();
2062        if columns.iter().all(|column| {
2063            column_names.contains(&column.column_schema.name) && column.add_if_not_exists
2064        }) {
2065            return Ok(false);
2066        }
2067    }
2068
2069    let _ = table_info
2070        .meta
2071        .builder_with_alter_kind(table_name, &request.alter_kind)
2072        .context(error::TableSnafu)?
2073        .build()
2074        .context(error::BuildTableMetaSnafu { table_name })?;
2075
2076    Ok(true)
2077}
2078
2079pub fn create_table_info(
2080    create_table: &CreateTableExpr,
2081    partition_columns: Vec<String>,
2082) -> Result<TableInfo> {
2083    let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
2084    let mut column_name_to_index_map = HashMap::new();
2085
2086    for (idx, column) in create_table.column_defs.iter().enumerate() {
2087        let schema =
2088            column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
2089                column: &column.name,
2090            })?;
2091        let schema = schema.with_time_index(column.name == create_table.time_index);
2092
2093        column_schemas.push(schema);
2094        let _ = column_name_to_index_map.insert(column.name.clone(), idx);
2095    }
2096
2097    let next_column_id = column_schemas.len() as u32;
2098    let schema = Arc::new(Schema::new(column_schemas));
2099
2100    let primary_key_indices = create_table
2101        .primary_keys
2102        .iter()
2103        .map(|name| {
2104            column_name_to_index_map
2105                .get(name)
2106                .cloned()
2107                .context(ColumnNotFoundSnafu { msg: name })
2108        })
2109        .collect::<Result<Vec<_>>>()?;
2110
2111    let partition_key_indices = partition_columns
2112        .into_iter()
2113        .map(|col_name| {
2114            column_name_to_index_map
2115                .get(&col_name)
2116                .cloned()
2117                .context(ColumnNotFoundSnafu { msg: col_name })
2118        })
2119        .collect::<Result<Vec<_>>>()?;
2120
2121    let table_options = TableOptions::try_from_iter(&create_table.table_options)
2122        .context(UnrecognizedTableOptionSnafu)?;
2123
2124    let meta = TableMeta {
2125        schema,
2126        primary_key_indices,
2127        value_indices: vec![],
2128        engine: create_table.engine.clone(),
2129        next_column_id,
2130        options: table_options,
2131        created_on: Utc::now(),
2132        updated_on: Utc::now(),
2133        partition_key_indices,
2134        column_ids: vec![],
2135    };
2136
2137    let desc = if create_table.desc.is_empty() {
2138        create_table.table_options.get(COMMENT_KEY).cloned()
2139    } else {
2140        Some(create_table.desc.clone())
2141    };
2142
2143    let table_info = TableInfo {
2144        ident: metadata::TableIdent {
2145            // The table id of distributed table is assigned by Meta, set "0" here as a placeholder.
2146            table_id: 0,
2147            version: 0,
2148        },
2149        name: create_table.table_name.clone(),
2150        desc,
2151        catalog_name: create_table.catalog_name.clone(),
2152        schema_name: create_table.schema_name.clone(),
2153        meta,
2154        table_type: TableType::Base,
2155    };
2156    Ok(table_info)
2157}
2158
2159fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
2160    let columns = if let Some(partitions) = partitions {
2161        partitions
2162            .column_list
2163            .iter()
2164            .map(|x| x.value.clone())
2165            .collect::<Vec<_>>()
2166    } else {
2167        vec![]
2168    };
2169    Ok(columns)
2170}
2171
2172/// Parse [Partitions] into a group of partition entries.
2173///
2174/// Returns a list of [PartitionExpr], each of which defines a partition.
2175fn find_partition_entries(
2176    create_table: &CreateTableExpr,
2177    partitions: &Option<Partitions>,
2178    partition_columns: &[String],
2179    query_ctx: &QueryContextRef,
2180) -> Result<Vec<PartitionExpr>> {
2181    let Some(partitions) = partitions else {
2182        return Ok(vec![]);
2183    };
2184
2185    // extract concrete data type of partition columns
2186    let column_name_and_type = partition_columns
2187        .iter()
2188        .map(|pc| {
2189            let column = create_table
2190                .column_defs
2191                .iter()
2192                .find(|c| &c.name == pc)
2193                // unwrap is safe here because we have checked that partition columns are defined
2194                .unwrap();
2195            let column_name = &column.name;
2196            let data_type = ConcreteDataType::from(
2197                ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
2198                    .context(ColumnDataTypeSnafu)?,
2199            );
2200            Ok((column_name, data_type))
2201        })
2202        .collect::<Result<HashMap<_, _>>>()?;
2203
2204    // Transform parser expr to partition expr
2205    let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
2206    for partition in &partitions.exprs {
2207        let partition_expr =
2208            convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
2209        partition_exprs.push(partition_expr);
2210    }
2211
2212    Ok(partition_exprs)
2213}
2214
2215fn convert_one_expr(
2216    expr: &Expr,
2217    column_name_and_type: &HashMap<&String, ConcreteDataType>,
2218    timezone: &Timezone,
2219) -> Result<PartitionExpr> {
2220    let Expr::BinaryOp { left, op, right } = expr else {
2221        return InvalidPartitionRuleSnafu {
2222            reason: "partition rule must be a binary expression",
2223        }
2224        .fail();
2225    };
2226
2227    let op =
2228        RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
2229            reason: format!("unsupported operator in partition expr {op}"),
2230        })?;
2231
2232    // convert leaf node.
2233    let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
2234        // col, val
2235        (Expr::Identifier(ident), Expr::Value(value)) => {
2236            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2237            let value = convert_value(&value.value, data_type, timezone, None)?;
2238            (Operand::Column(column_name), op, Operand::Value(value))
2239        }
2240        (Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
2241            if let Expr::Value(v) = &**expr =>
2242        {
2243            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2244            let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
2245            (Operand::Column(column_name), op, Operand::Value(value))
2246        }
2247        // val, col
2248        (Expr::Value(value), Expr::Identifier(ident)) => {
2249            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2250            let value = convert_value(&value.value, data_type, timezone, None)?;
2251            (Operand::Value(value), op, Operand::Column(column_name))
2252        }
2253        (Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
2254            if let Expr::Value(v) = &**expr =>
2255        {
2256            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2257            let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
2258            (Operand::Value(value), op, Operand::Column(column_name))
2259        }
2260        (Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
2261            // sub-expr must against another sub-expr
2262            let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
2263            let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
2264            (Operand::Expr(lhs), op, Operand::Expr(rhs))
2265        }
2266        _ => {
2267            return InvalidPartitionRuleSnafu {
2268                reason: format!("invalid partition expr {expr}"),
2269            }
2270            .fail();
2271        }
2272    };
2273
2274    Ok(PartitionExpr::new(lhs, op, rhs))
2275}
2276
2277fn convert_identifier(
2278    ident: &Ident,
2279    column_name_and_type: &HashMap<&String, ConcreteDataType>,
2280) -> Result<(String, ConcreteDataType)> {
2281    let column_name = ident.value.clone();
2282    let data_type = column_name_and_type
2283        .get(&column_name)
2284        .cloned()
2285        .with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
2286    Ok((column_name, data_type))
2287}
2288
2289fn convert_value(
2290    value: &ParserValue,
2291    data_type: ConcreteDataType,
2292    timezone: &Timezone,
2293    unary_op: Option<UnaryOperator>,
2294) -> Result<Value> {
2295    sql_value_to_value(
2296        &ColumnSchema::new("<NONAME>", data_type, true),
2297        value,
2298        Some(timezone),
2299        unary_op,
2300        false,
2301    )
2302    .context(error::SqlCommonSnafu)
2303}
2304
2305#[cfg(test)]
2306mod test {
2307    use std::time::Duration;
2308
2309    use session::context::{QueryContext, QueryContextBuilder};
2310    use sql::dialect::GreptimeDbDialect;
2311    use sql::parser::{ParseOptions, ParserContext};
2312    use sql::statements::statement::Statement;
2313    use sqlparser::parser::Parser;
2314
2315    use super::*;
2316    use crate::expr_helper;
2317
2318    #[test]
2319    fn test_parse_ddl_options() {
2320        let options = OptionMap::from([
2321            ("timeout".to_string(), "5m".to_string()),
2322            ("wait".to_string(), "false".to_string()),
2323        ]);
2324        let ddl_options = parse_ddl_options(&options).unwrap();
2325        assert!(!ddl_options.wait);
2326        assert_eq!(Duration::from_secs(300), ddl_options.timeout);
2327    }
2328
2329    #[test]
2330    fn test_name_is_match() {
2331        assert!(!NAME_PATTERN_REG.is_match("/adaf"));
2332        assert!(!NAME_PATTERN_REG.is_match("🈲"));
2333        assert!(NAME_PATTERN_REG.is_match("hello"));
2334        assert!(NAME_PATTERN_REG.is_match("test@"));
2335        assert!(!NAME_PATTERN_REG.is_match("@test"));
2336        assert!(NAME_PATTERN_REG.is_match("test#"));
2337        assert!(!NAME_PATTERN_REG.is_match("#test"));
2338        assert!(!NAME_PATTERN_REG.is_match("@"));
2339        assert!(!NAME_PATTERN_REG.is_match("#"));
2340    }
2341
2342    #[test]
2343    fn test_partition_expr_equivalence_with_swapped_operands() {
2344        let column_name = "device_id".to_string();
2345        let column_name_and_type =
2346            HashMap::from([(&column_name, ConcreteDataType::int32_datatype())]);
2347        let timezone = Timezone::from_tz_string("UTC").unwrap();
2348        let dialect = GreptimeDbDialect {};
2349
2350        let mut parser = Parser::new(&dialect)
2351            .try_with_sql("device_id < 100")
2352            .unwrap();
2353        let expr_left = parser.parse_expr().unwrap();
2354
2355        let mut parser = Parser::new(&dialect)
2356            .try_with_sql("100 > device_id")
2357            .unwrap();
2358        let expr_right = parser.parse_expr().unwrap();
2359
2360        let partition_left =
2361            convert_one_expr(&expr_left, &column_name_and_type, &timezone).unwrap();
2362        let partition_right =
2363            convert_one_expr(&expr_right, &column_name_and_type, &timezone).unwrap();
2364
2365        assert_eq!(partition_left, partition_right);
2366        assert!([partition_left.clone()].contains(&partition_right));
2367
2368        let mut physical_partition_exprs = vec![partition_left];
2369        let mut logical_partition_exprs = vec![partition_right];
2370        physical_partition_exprs.sort_unstable();
2371        logical_partition_exprs.sort_unstable();
2372        assert_eq!(physical_partition_exprs, logical_partition_exprs);
2373    }
2374
2375    #[tokio::test]
2376    #[ignore = "TODO(ruihang): WIP new partition rule"]
2377    async fn test_parse_partitions() {
2378        common_telemetry::init_default_ut_logging();
2379        let cases = [
2380            (
2381                r"
2382CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
2383PARTITION ON COLUMNS (b) (
2384  b < 'hz',
2385  b >= 'hz' AND b < 'sh',
2386  b >= 'sh'
2387)
2388ENGINE=mito",
2389                r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
2390            ),
2391            (
2392                r"
2393CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
2394PARTITION BY RANGE COLUMNS (b, a) (
2395  PARTITION r0 VALUES LESS THAN ('hz', 10),
2396  b < 'hz' AND a < 10,
2397  b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20,
2398  b >= 'sh' AND a >= 20
2399)
2400ENGINE=mito",
2401                r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
2402            ),
2403        ];
2404        let ctx = QueryContextBuilder::default().build().into();
2405        for (sql, expected) in cases {
2406            let result = ParserContext::create_with_dialect(
2407                sql,
2408                &GreptimeDbDialect {},
2409                ParseOptions::default(),
2410            )
2411            .unwrap();
2412            match &result[0] {
2413                Statement::CreateTable(c) => {
2414                    let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
2415                    let (partitions, _) =
2416                        parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
2417                    let json = serde_json::to_string(&partitions).unwrap();
2418                    assert_eq!(json, expected);
2419                }
2420                _ => unreachable!(),
2421            }
2422        }
2423    }
2424}