operator/statement/
ddl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::helper::ColumnDataTypeWrapper;
20use api::v1::alter_table_expr::Kind;
21use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
22use api::v1::{
23    AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
24    Repartition, column_def,
25};
26#[cfg(feature = "enterprise")]
27use api::v1::{
28    CreateTriggerExpr as PbCreateTriggerExpr, meta::CreateTriggerTask as PbCreateTriggerTask,
29};
30use catalog::CatalogManagerRef;
31use chrono::Utc;
32use common_base::regex_pattern::NAME_PATTERN_REG;
33use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, is_readonly_schema};
34use common_catalog::{format_full_flow_name, format_full_table_name};
35use common_error::ext::BoxedError;
36use common_meta::cache_invalidator::Context;
37use common_meta::ddl::create_flow::FlowType;
38use common_meta::instruction::CacheIdent;
39use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
40use common_meta::procedure_executor::ExecutorContext;
41#[cfg(feature = "enterprise")]
42use common_meta::rpc::ddl::trigger::CreateTriggerTask;
43#[cfg(feature = "enterprise")]
44use common_meta::rpc::ddl::trigger::DropTriggerTask;
45use common_meta::rpc::ddl::{
46    CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
47    SubmitDdlTaskResponse,
48};
49use common_query::Output;
50use common_recordbatch::{RecordBatch, RecordBatches};
51use common_sql::convert::sql_value_to_value;
52use common_telemetry::{debug, info, tracing, warn};
53use common_time::{Timestamp, Timezone};
54use datafusion_common::tree_node::TreeNodeVisitor;
55use datafusion_expr::LogicalPlan;
56use datatypes::prelude::ConcreteDataType;
57use datatypes::schema::{ColumnSchema, RawSchema, Schema};
58use datatypes::value::Value;
59use datatypes::vectors::{StringVector, VectorRef};
60use humantime::parse_duration;
61use partition::expr::{Operand, PartitionExpr, RestrictedOp};
62use partition::multi_dim::MultiDimPartitionRule;
63use query::parser::QueryStatement;
64use query::plan::extract_and_rewrite_full_table_names;
65use query::query_engine::DefaultSerializer;
66use query::sql::create_table_stmt;
67use session::context::QueryContextRef;
68use session::table_name::table_idents_to_full_name;
69use snafu::{OptionExt, ResultExt, ensure};
70use sql::parser::{ParseOptions, ParserContext};
71use sql::statements::OptionMap;
72#[cfg(feature = "enterprise")]
73use sql::statements::alter::trigger::AlterTrigger;
74use sql::statements::alter::{AlterDatabase, AlterTable, AlterTableOperation};
75#[cfg(feature = "enterprise")]
76use sql::statements::create::trigger::CreateTrigger;
77use sql::statements::create::{
78    CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
79};
80use sql::statements::statement::Statement;
81use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
82use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
83use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
84use table::TableRef;
85use table::dist_table::DistTable;
86use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
87use table::requests::{
88    AlterKind, AlterTableRequest, COMMENT_KEY, DDL_TIMEOUT, DDL_WAIT, TableOptions,
89};
90use table::table_name::TableName;
91use table::table_reference::TableReference;
92
93use crate::error::{
94    self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
95    ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
96    DeserializePartitionExprSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
97    FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
98    InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu,
99    PartitionExprToPbSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu,
100    SerializePartitionExprSnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu,
101    TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
102    ViewAlreadyExistsSnafu,
103};
104use crate::expr_helper::{self, RepartitionRequest};
105use crate::statement::StatementExecutor;
106use crate::statement::show::create_partitions_stmt;
107
108#[derive(Debug, Clone, Copy)]
109struct DdlSubmitOptions {
110    wait: bool,
111    timeout: Duration,
112}
113
114fn build_procedure_id_output(procedure_id: Vec<u8>) -> Result<Output> {
115    let procedure_id = String::from_utf8_lossy(&procedure_id).to_string();
116    let vector: VectorRef = Arc::new(StringVector::from(vec![procedure_id]));
117    let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
118        "Procedure ID",
119        vector.data_type(),
120        false,
121    )]));
122    let batch =
123        RecordBatch::new(schema.clone(), vec![vector]).context(error::BuildRecordBatchSnafu)?;
124    let batches =
125        RecordBatches::try_new(schema, vec![batch]).context(error::BuildRecordBatchSnafu)?;
126    Ok(Output::new_with_record_batches(batches))
127}
128
129fn parse_ddl_options(options: &OptionMap) -> Result<DdlSubmitOptions> {
130    let wait = match options.get(DDL_WAIT) {
131        Some(value) => value.parse::<bool>().map_err(|_| {
132            InvalidSqlSnafu {
133                err_msg: format!("invalid DDL option '{DDL_WAIT}': '{value}'"),
134            }
135            .build()
136        })?,
137        None => SubmitDdlTaskRequest::default_wait(),
138    };
139
140    let timeout = match options.get(DDL_TIMEOUT) {
141        Some(value) => parse_duration(value).map_err(|err| {
142            InvalidSqlSnafu {
143                err_msg: format!("invalid DDL option '{DDL_TIMEOUT}': '{value}': {err}"),
144            }
145            .build()
146        })?,
147        None => SubmitDdlTaskRequest::default_timeout(),
148    };
149
150    Ok(DdlSubmitOptions { wait, timeout })
151}
152
153impl StatementExecutor {
154    pub fn catalog_manager(&self) -> CatalogManagerRef {
155        self.catalog_manager.clone()
156    }
157
158    #[tracing::instrument(skip_all)]
159    pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
160        let (catalog, schema, _table) = table_idents_to_full_name(&stmt.name, &ctx)
161            .map_err(BoxedError::new)
162            .context(error::ExternalSnafu)?;
163
164        let schema_options = self
165            .table_metadata_manager
166            .schema_manager()
167            .get(SchemaNameKey {
168                catalog: &catalog,
169                schema: &schema,
170            })
171            .await
172            .context(TableMetadataManagerSnafu)?
173            .map(|v| v.into_inner());
174
175        let create_expr = &mut expr_helper::create_to_expr(&stmt, &ctx)?;
176        // Don't inherit schema-level TTL/compaction options into table options:
177        // TTL is applied during compaction, and `compaction.*` is handled separately.
178        if let Some(schema_options) = schema_options {
179            for (key, value) in schema_options.extra_options.iter() {
180                if key.starts_with("compaction.") {
181                    continue;
182                }
183                create_expr
184                    .table_options
185                    .entry(key.clone())
186                    .or_insert(value.clone());
187            }
188        }
189
190        self.create_table_inner(create_expr, stmt.partitions, ctx)
191            .await
192    }
193
194    #[tracing::instrument(skip_all)]
195    pub async fn create_table_like(
196        &self,
197        stmt: CreateTableLike,
198        ctx: QueryContextRef,
199    ) -> Result<TableRef> {
200        let (catalog, schema, table) = table_idents_to_full_name(&stmt.source_name, &ctx)
201            .map_err(BoxedError::new)
202            .context(error::ExternalSnafu)?;
203        let table_ref = self
204            .catalog_manager
205            .table(&catalog, &schema, &table, Some(&ctx))
206            .await
207            .context(CatalogSnafu)?
208            .context(TableNotFoundSnafu { table_name: &table })?;
209        let partition_info = self
210            .partition_manager
211            .find_physical_partition_info(table_ref.table_info().table_id())
212            .await
213            .context(error::FindTablePartitionRuleSnafu { table_name: table })?;
214
215        // CREATE TABLE LIKE also inherits database level options.
216        let schema_options = self
217            .table_metadata_manager
218            .schema_manager()
219            .get(SchemaNameKey {
220                catalog: &catalog,
221                schema: &schema,
222            })
223            .await
224            .context(TableMetadataManagerSnafu)?
225            .map(|v| v.into_inner());
226
227        let quote_style = ctx.quote_style();
228        let mut create_stmt =
229            create_table_stmt(&table_ref.table_info(), schema_options, quote_style)
230                .context(error::ParseQuerySnafu)?;
231        create_stmt.name = stmt.table_name;
232        create_stmt.if_not_exists = false;
233
234        let table_info = table_ref.table_info();
235        let partitions = create_partitions_stmt(&table_info, &partition_info.partitions)?.and_then(
236            |mut partitions| {
237                if !partitions.column_list.is_empty() {
238                    partitions.set_quote(quote_style);
239                    Some(partitions)
240                } else {
241                    None
242                }
243            },
244        );
245
246        let create_expr = &mut expr_helper::create_to_expr(&create_stmt, &ctx)?;
247        self.create_table_inner(create_expr, partitions, ctx).await
248    }
249
250    #[tracing::instrument(skip_all)]
251    pub async fn create_external_table(
252        &self,
253        create_expr: CreateExternalTable,
254        ctx: QueryContextRef,
255    ) -> Result<TableRef> {
256        let create_expr = &mut expr_helper::create_external_expr(create_expr, &ctx).await?;
257        self.create_table_inner(create_expr, None, ctx).await
258    }
259
260    #[tracing::instrument(skip_all)]
261    pub async fn create_table_inner(
262        &self,
263        create_table: &mut CreateTableExpr,
264        partitions: Option<Partitions>,
265        query_ctx: QueryContextRef,
266    ) -> Result<TableRef> {
267        ensure!(
268            !is_readonly_schema(&create_table.schema_name),
269            SchemaReadOnlySnafu {
270                name: create_table.schema_name.clone()
271            }
272        );
273
274        if create_table.engine == METRIC_ENGINE_NAME
275            && create_table
276                .table_options
277                .contains_key(LOGICAL_TABLE_METADATA_KEY)
278        {
279            if let Some(partitions) = partitions.as_ref()
280                && !partitions.exprs.is_empty()
281            {
282                self.validate_logical_table_partition_rule(create_table, partitions, &query_ctx)
283                    .await?;
284            }
285            // Create logical tables
286            self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
287                .await?
288                .into_iter()
289                .next()
290                .context(error::UnexpectedSnafu {
291                    violated: "expected to create logical tables",
292                })
293        } else {
294            // Create other normal table
295            self.create_non_logic_table(create_table, partitions, query_ctx)
296                .await
297        }
298    }
299
300    #[tracing::instrument(skip_all)]
301    pub async fn create_non_logic_table(
302        &self,
303        create_table: &mut CreateTableExpr,
304        partitions: Option<Partitions>,
305        query_ctx: QueryContextRef,
306    ) -> Result<TableRef> {
307        let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
308
309        // Check if schema exists
310        let schema = self
311            .table_metadata_manager
312            .schema_manager()
313            .get(SchemaNameKey::new(
314                &create_table.catalog_name,
315                &create_table.schema_name,
316            ))
317            .await
318            .context(TableMetadataManagerSnafu)?;
319        ensure!(
320            schema.is_some(),
321            SchemaNotFoundSnafu {
322                schema_info: &create_table.schema_name,
323            }
324        );
325
326        // if table exists.
327        if let Some(table) = self
328            .catalog_manager
329            .table(
330                &create_table.catalog_name,
331                &create_table.schema_name,
332                &create_table.table_name,
333                Some(&query_ctx),
334            )
335            .await
336            .context(CatalogSnafu)?
337        {
338            return if create_table.create_if_not_exists {
339                Ok(table)
340            } else {
341                TableAlreadyExistsSnafu {
342                    table: format_full_table_name(
343                        &create_table.catalog_name,
344                        &create_table.schema_name,
345                        &create_table.table_name,
346                    ),
347                }
348                .fail()
349            };
350        }
351
352        ensure!(
353            NAME_PATTERN_REG.is_match(&create_table.table_name),
354            InvalidTableNameSnafu {
355                table_name: &create_table.table_name,
356            }
357        );
358
359        let table_name = TableName::new(
360            &create_table.catalog_name,
361            &create_table.schema_name,
362            &create_table.table_name,
363        );
364
365        let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
366        let mut table_info = create_table_info(create_table, partition_cols)?;
367
368        let resp = self
369            .create_table_procedure(
370                create_table.clone(),
371                partitions,
372                table_info.clone(),
373                query_ctx,
374            )
375            .await?;
376
377        let table_id = resp
378            .table_ids
379            .into_iter()
380            .next()
381            .context(error::UnexpectedSnafu {
382                violated: "expected table_id",
383            })?;
384        info!("Successfully created table '{table_name}' with table id {table_id}");
385
386        table_info.ident.table_id = table_id;
387
388        let table_info: Arc<TableInfo> =
389            Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?);
390        create_table.table_id = Some(api::v1::TableId { id: table_id });
391
392        let table = DistTable::table(table_info);
393
394        Ok(table)
395    }
396
397    #[tracing::instrument(skip_all)]
398    pub async fn create_logical_tables(
399        &self,
400        create_table_exprs: &[CreateTableExpr],
401        query_context: QueryContextRef,
402    ) -> Result<Vec<TableRef>> {
403        let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
404        ensure!(
405            !create_table_exprs.is_empty(),
406            EmptyDdlExprSnafu {
407                name: "create logic tables"
408            }
409        );
410
411        // Check table names
412        for create_table in create_table_exprs {
413            ensure!(
414                NAME_PATTERN_REG.is_match(&create_table.table_name),
415                InvalidTableNameSnafu {
416                    table_name: &create_table.table_name,
417                }
418            );
419        }
420
421        let raw_tables_info = create_table_exprs
422            .iter()
423            .map(|create| create_table_info(create, vec![]))
424            .collect::<Result<Vec<_>>>()?;
425        let tables_data = create_table_exprs
426            .iter()
427            .cloned()
428            .zip(raw_tables_info.iter().cloned())
429            .collect::<Vec<_>>();
430
431        let resp = self
432            .create_logical_tables_procedure(tables_data, query_context.clone())
433            .await?;
434
435        let table_ids = resp.table_ids;
436        ensure!(
437            table_ids.len() == raw_tables_info.len(),
438            CreateLogicalTablesSnafu {
439                reason: format!(
440                    "The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}",
441                    raw_tables_info.len(),
442                    table_ids.len()
443                )
444            }
445        );
446        info!("Successfully created logical tables: {:?}", table_ids);
447
448        // Reacquire table infos from catalog so logical tables inherit the latest partition
449        // metadata (e.g. partition_key_indices) from their physical tables.
450        // And the returned table info also included extra partition columns that are in physical table but not in logical table's create table expr
451        let mut tables_info = Vec::with_capacity(table_ids.len());
452        for (table_id, create_table) in table_ids.iter().zip(create_table_exprs.iter()) {
453            let table = self
454                .catalog_manager
455                .table(
456                    &create_table.catalog_name,
457                    &create_table.schema_name,
458                    &create_table.table_name,
459                    Some(&query_context),
460                )
461                .await
462                .context(CatalogSnafu)?
463                .with_context(|| TableNotFoundSnafu {
464                    table_name: format_full_table_name(
465                        &create_table.catalog_name,
466                        &create_table.schema_name,
467                        &create_table.table_name,
468                    ),
469                })?;
470
471            let table_info = table.table_info();
472            // Safety check: ensure we are returning the table info that matches the newly created table id.
473            ensure!(
474                table_info.table_id() == *table_id,
475                CreateLogicalTablesSnafu {
476                    reason: format!(
477                        "Table id mismatch after creation, expected {}, got {} for table {}",
478                        table_id,
479                        table_info.table_id(),
480                        format_full_table_name(
481                            &create_table.catalog_name,
482                            &create_table.schema_name,
483                            &create_table.table_name
484                        )
485                    )
486                }
487            );
488
489            tables_info.push(table_info);
490        }
491
492        Ok(tables_info.into_iter().map(DistTable::table).collect())
493    }
494
495    async fn validate_logical_table_partition_rule(
496        &self,
497        create_table: &CreateTableExpr,
498        partitions: &Partitions,
499        query_ctx: &QueryContextRef,
500    ) -> Result<()> {
501        let (_, mut logical_partition_exprs) =
502            parse_partitions_for_logical_validation(create_table, partitions, query_ctx)?;
503
504        let physical_table_name = create_table
505            .table_options
506            .get(LOGICAL_TABLE_METADATA_KEY)
507            .with_context(|| CreateLogicalTablesSnafu {
508                reason: format!(
509                    "expect `{LOGICAL_TABLE_METADATA_KEY}` option on creating logical table"
510                ),
511            })?;
512
513        let physical_table = self
514            .catalog_manager
515            .table(
516                &create_table.catalog_name,
517                &create_table.schema_name,
518                physical_table_name,
519                Some(query_ctx),
520            )
521            .await
522            .context(CatalogSnafu)?
523            .context(TableNotFoundSnafu {
524                table_name: physical_table_name.clone(),
525            })?;
526
527        let physical_table_info = physical_table.table_info();
528        let (partition_rule, _) = self
529            .partition_manager
530            .find_table_partition_rule(&physical_table_info)
531            .await
532            .context(error::FindTablePartitionRuleSnafu {
533                table_name: physical_table_name.clone(),
534            })?;
535
536        let multi_dim_rule = partition_rule
537            .as_ref()
538            .as_any()
539            .downcast_ref::<MultiDimPartitionRule>()
540            .context(InvalidPartitionRuleSnafu {
541                reason: "physical table partition rule is not range-based",
542            })?;
543
544        // TODO(ruihang): project physical partition exprs to logical partition column
545        let mut physical_partition_exprs = multi_dim_rule.exprs().to_vec();
546        logical_partition_exprs.sort_unstable();
547        physical_partition_exprs.sort_unstable();
548
549        ensure!(
550            physical_partition_exprs == logical_partition_exprs,
551            InvalidPartitionRuleSnafu {
552                reason: format!(
553                    "logical table partition rule must match the corresponding physical table's\n logical table partition exprs:\t\t {:?}\n physical table partition exprs:\t {:?}",
554                    logical_partition_exprs, physical_partition_exprs
555                ),
556            }
557        );
558
559        Ok(())
560    }
561
562    #[cfg(feature = "enterprise")]
563    #[tracing::instrument(skip_all)]
564    pub async fn create_trigger(
565        &self,
566        stmt: CreateTrigger,
567        query_context: QueryContextRef,
568    ) -> Result<Output> {
569        let expr = expr_helper::to_create_trigger_task_expr(stmt, &query_context)?;
570        self.create_trigger_inner(expr, query_context).await
571    }
572
573    #[cfg(feature = "enterprise")]
574    pub async fn create_trigger_inner(
575        &self,
576        expr: PbCreateTriggerExpr,
577        query_context: QueryContextRef,
578    ) -> Result<Output> {
579        self.create_trigger_procedure(expr, query_context).await?;
580        Ok(Output::new_with_affected_rows(0))
581    }
582
583    #[cfg(feature = "enterprise")]
584    async fn create_trigger_procedure(
585        &self,
586        expr: PbCreateTriggerExpr,
587        query_context: QueryContextRef,
588    ) -> Result<SubmitDdlTaskResponse> {
589        let task = CreateTriggerTask::try_from(PbCreateTriggerTask {
590            create_trigger: Some(expr),
591        })
592        .context(error::InvalidExprSnafu)?;
593
594        let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_create_trigger(task));
595
596        self.procedure_executor
597            .submit_ddl_task(&ExecutorContext::default(), request)
598            .await
599            .context(error::ExecuteDdlSnafu)
600    }
601
602    #[tracing::instrument(skip_all)]
603    pub async fn create_flow(
604        &self,
605        stmt: CreateFlow,
606        query_context: QueryContextRef,
607    ) -> Result<Output> {
608        // TODO(ruihang): do some verification
609        let expr = expr_helper::to_create_flow_task_expr(stmt, &query_context)?;
610
611        self.create_flow_inner(expr, query_context).await
612    }
613
614    pub async fn create_flow_inner(
615        &self,
616        expr: CreateFlowExpr,
617        query_context: QueryContextRef,
618    ) -> Result<Output> {
619        self.create_flow_procedure(expr, query_context).await?;
620        Ok(Output::new_with_affected_rows(0))
621    }
622
623    async fn create_flow_procedure(
624        &self,
625        expr: CreateFlowExpr,
626        query_context: QueryContextRef,
627    ) -> Result<SubmitDdlTaskResponse> {
628        let flow_type = self
629            .determine_flow_type(&expr, query_context.clone())
630            .await?;
631        info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
632
633        let expr = {
634            let mut expr = expr;
635            expr.flow_options
636                .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
637            expr
638        };
639
640        let task = CreateFlowTask::try_from(PbCreateFlowTask {
641            create_flow: Some(expr),
642        })
643        .context(error::InvalidExprSnafu)?;
644        let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_create_flow(task));
645
646        self.procedure_executor
647            .submit_ddl_task(&ExecutorContext::default(), request)
648            .await
649            .context(error::ExecuteDdlSnafu)
650    }
651
652    /// Determine the flow type based on the SQL query
653    ///
654    /// If it contains aggregation or distinct, then it is a batch flow, otherwise it is a streaming flow
655    async fn determine_flow_type(
656        &self,
657        expr: &CreateFlowExpr,
658        query_ctx: QueryContextRef,
659    ) -> Result<FlowType> {
660        // first check if source table's ttl is instant, if it is, force streaming mode
661        for src_table_name in &expr.source_table_names {
662            let table = self
663                .catalog_manager()
664                .table(
665                    &src_table_name.catalog_name,
666                    &src_table_name.schema_name,
667                    &src_table_name.table_name,
668                    Some(&query_ctx),
669                )
670                .await
671                .map_err(BoxedError::new)
672                .context(ExternalSnafu)?
673                .with_context(|| TableNotFoundSnafu {
674                    table_name: format_full_table_name(
675                        &src_table_name.catalog_name,
676                        &src_table_name.schema_name,
677                        &src_table_name.table_name,
678                    ),
679                })?;
680
681            // instant source table can only be handled by streaming mode
682            if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
683                warn!(
684                    "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
685                    format_full_table_name(
686                        &src_table_name.catalog_name,
687                        &src_table_name.schema_name,
688                        &src_table_name.table_name
689                    ),
690                    expr.flow_name
691                );
692                return Ok(FlowType::Streaming);
693            }
694        }
695
696        let engine = &self.query_engine;
697        let stmts = ParserContext::create_with_dialect(
698            &expr.sql,
699            query_ctx.sql_dialect(),
700            ParseOptions::default(),
701        )
702        .map_err(BoxedError::new)
703        .context(ExternalSnafu)?;
704
705        ensure!(
706            stmts.len() == 1,
707            InvalidSqlSnafu {
708                err_msg: format!("Expect only one statement, found {}", stmts.len())
709            }
710        );
711        let stmt = &stmts[0];
712
713        // support tql parse too
714        let plan = match stmt {
715            // prom ql is only supported in batching mode
716            Statement::Tql(_) => return Ok(FlowType::Batching),
717            _ => engine
718                .planner()
719                .plan(&QueryStatement::Sql(stmt.clone()), query_ctx)
720                .await
721                .map_err(BoxedError::new)
722                .context(ExternalSnafu)?,
723        };
724
725        /// Visitor to find aggregation or distinct
726        struct FindAggr {
727            is_aggr: bool,
728        }
729
730        impl TreeNodeVisitor<'_> for FindAggr {
731            type Node = LogicalPlan;
732            fn f_down(
733                &mut self,
734                node: &Self::Node,
735            ) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
736            {
737                match node {
738                    LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
739                        self.is_aggr = true;
740                        return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
741                    }
742                    _ => (),
743                }
744                Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
745            }
746        }
747
748        let mut find_aggr = FindAggr { is_aggr: false };
749
750        plan.visit_with_subqueries(&mut find_aggr)
751            .context(BuildDfLogicalPlanSnafu)?;
752        if find_aggr.is_aggr {
753            Ok(FlowType::Batching)
754        } else {
755            Ok(FlowType::Streaming)
756        }
757    }
758
759    #[tracing::instrument(skip_all)]
760    pub async fn create_view(
761        &self,
762        create_view: CreateView,
763        ctx: QueryContextRef,
764    ) -> Result<TableRef> {
765        // convert input into logical plan
766        let logical_plan = match &*create_view.query {
767            Statement::Query(query) => {
768                self.plan(
769                    &QueryStatement::Sql(Statement::Query(query.clone())),
770                    ctx.clone(),
771                )
772                .await?
773            }
774            Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?,
775            _ => {
776                return InvalidViewStmtSnafu {}.fail();
777            }
778        };
779        // Save the definition for `show create view`.
780        let definition = create_view.to_string();
781
782        // Save the columns in plan, it may changed when the schemas of tables in plan
783        // are altered.
784        let schema: Schema = logical_plan
785            .schema()
786            .clone()
787            .try_into()
788            .context(ConvertSchemaSnafu)?;
789        let plan_columns: Vec<_> = schema
790            .column_schemas()
791            .iter()
792            .map(|c| c.name.clone())
793            .collect();
794
795        let columns: Vec<_> = create_view
796            .columns
797            .iter()
798            .map(|ident| ident.to_string())
799            .collect();
800
801        // Validate columns
802        if !columns.is_empty() {
803            ensure!(
804                columns.len() == plan_columns.len(),
805                error::ViewColumnsMismatchSnafu {
806                    view_name: create_view.name.to_string(),
807                    expected: plan_columns.len(),
808                    actual: columns.len(),
809                }
810            );
811        }
812
813        // Extract the table names from the original plan
814        // and rewrite them as fully qualified names.
815        let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
816            .context(ExtractTableNamesSnafu)?;
817
818        let table_names = table_names.into_iter().map(|t| t.into()).collect();
819
820        // TODO(dennis): we don't save the optimized plan yet,
821        // because there are some serialization issue with our own defined plan node (such as `MergeScanLogicalPlan`).
822        // When the issues are fixed, we can use the `optimized_plan` instead.
823        // let optimized_plan = self.optimize_logical_plan(logical_plan)?.unwrap_df_plan();
824
825        // encode logical plan
826        let encoded_plan = DFLogicalSubstraitConvertor
827            .encode(&plan, DefaultSerializer)
828            .context(SubstraitCodecSnafu)?;
829
830        let expr = expr_helper::to_create_view_expr(
831            create_view,
832            encoded_plan.to_vec(),
833            table_names,
834            columns,
835            plan_columns,
836            definition,
837            ctx.clone(),
838        )?;
839
840        // TODO(dennis): validate the logical plan
841        self.create_view_by_expr(expr, ctx).await
842    }
843
844    pub async fn create_view_by_expr(
845        &self,
846        expr: CreateViewExpr,
847        ctx: QueryContextRef,
848    ) -> Result<TableRef> {
849        ensure! {
850            !(expr.create_if_not_exists & expr.or_replace),
851            InvalidSqlSnafu {
852                err_msg: "syntax error Create Or Replace and If Not Exist cannot be used together",
853            }
854        };
855        let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
856
857        let schema_exists = self
858            .table_metadata_manager
859            .schema_manager()
860            .exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name))
861            .await
862            .context(TableMetadataManagerSnafu)?;
863
864        ensure!(
865            schema_exists,
866            SchemaNotFoundSnafu {
867                schema_info: &expr.schema_name,
868            }
869        );
870
871        // if view or table exists.
872        if let Some(table) = self
873            .catalog_manager
874            .table(
875                &expr.catalog_name,
876                &expr.schema_name,
877                &expr.view_name,
878                Some(&ctx),
879            )
880            .await
881            .context(CatalogSnafu)?
882        {
883            let table_type = table.table_info().table_type;
884
885            match (table_type, expr.create_if_not_exists, expr.or_replace) {
886                (TableType::View, true, false) => {
887                    return Ok(table);
888                }
889                (TableType::View, false, false) => {
890                    return ViewAlreadyExistsSnafu {
891                        name: format_full_table_name(
892                            &expr.catalog_name,
893                            &expr.schema_name,
894                            &expr.view_name,
895                        ),
896                    }
897                    .fail();
898                }
899                (TableType::View, _, true) => {
900                    // Try to replace an exists view
901                }
902                _ => {
903                    return TableAlreadyExistsSnafu {
904                        table: format_full_table_name(
905                            &expr.catalog_name,
906                            &expr.schema_name,
907                            &expr.view_name,
908                        ),
909                    }
910                    .fail();
911                }
912            }
913        }
914
915        ensure!(
916            NAME_PATTERN_REG.is_match(&expr.view_name),
917            InvalidViewNameSnafu {
918                name: expr.view_name.clone(),
919            }
920        );
921
922        let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
923
924        let mut view_info = RawTableInfo {
925            ident: metadata::TableIdent {
926                // The view id of distributed table is assigned by Meta, set "0" here as a placeholder.
927                table_id: 0,
928                version: 0,
929            },
930            name: expr.view_name.clone(),
931            desc: None,
932            catalog_name: expr.catalog_name.clone(),
933            schema_name: expr.schema_name.clone(),
934            // The meta doesn't make sense for views, so using a default one.
935            meta: RawTableMeta::default(),
936            table_type: TableType::View,
937        };
938
939        let request =
940            SubmitDdlTaskRequest::new(ctx, DdlTask::new_create_view(expr, view_info.clone()));
941
942        let resp = self
943            .procedure_executor
944            .submit_ddl_task(&ExecutorContext::default(), request)
945            .await
946            .context(error::ExecuteDdlSnafu)?;
947
948        debug!(
949            "Submit creating view '{view_name}' task response: {:?}",
950            resp
951        );
952
953        let view_id = resp
954            .table_ids
955            .into_iter()
956            .next()
957            .context(error::UnexpectedSnafu {
958                violated: "expected table_id",
959            })?;
960        info!("Successfully created view '{view_name}' with view id {view_id}");
961
962        view_info.ident.table_id = view_id;
963
964        let view_info = Arc::new(view_info.try_into().context(CreateTableInfoSnafu)?);
965
966        let table = DistTable::table(view_info);
967
968        // Invalidates local cache ASAP.
969        self.cache_invalidator
970            .invalidate(
971                &Context::default(),
972                &[
973                    CacheIdent::TableId(view_id),
974                    CacheIdent::TableName(view_name.clone()),
975                ],
976            )
977            .await
978            .context(error::InvalidateTableCacheSnafu)?;
979
980        Ok(table)
981    }
982
983    #[tracing::instrument(skip_all)]
984    pub async fn drop_flow(
985        &self,
986        catalog_name: String,
987        flow_name: String,
988        drop_if_exists: bool,
989        query_context: QueryContextRef,
990    ) -> Result<Output> {
991        if let Some(flow) = self
992            .flow_metadata_manager
993            .flow_name_manager()
994            .get(&catalog_name, &flow_name)
995            .await
996            .context(error::TableMetadataManagerSnafu)?
997        {
998            let flow_id = flow.flow_id();
999            let task = DropFlowTask {
1000                catalog_name,
1001                flow_name,
1002                flow_id,
1003                drop_if_exists,
1004            };
1005            self.drop_flow_procedure(task, query_context).await?;
1006
1007            Ok(Output::new_with_affected_rows(0))
1008        } else if drop_if_exists {
1009            Ok(Output::new_with_affected_rows(0))
1010        } else {
1011            FlowNotFoundSnafu {
1012                flow_name: format_full_flow_name(&catalog_name, &flow_name),
1013            }
1014            .fail()
1015        }
1016    }
1017
1018    async fn drop_flow_procedure(
1019        &self,
1020        expr: DropFlowTask,
1021        query_context: QueryContextRef,
1022    ) -> Result<SubmitDdlTaskResponse> {
1023        let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_drop_flow(expr));
1024
1025        self.procedure_executor
1026            .submit_ddl_task(&ExecutorContext::default(), request)
1027            .await
1028            .context(error::ExecuteDdlSnafu)
1029    }
1030
1031    #[cfg(feature = "enterprise")]
1032    #[tracing::instrument(skip_all)]
1033    pub(super) async fn drop_trigger(
1034        &self,
1035        catalog_name: String,
1036        trigger_name: String,
1037        drop_if_exists: bool,
1038        query_context: QueryContextRef,
1039    ) -> Result<Output> {
1040        let task = DropTriggerTask {
1041            catalog_name,
1042            trigger_name,
1043            drop_if_exists,
1044        };
1045        self.drop_trigger_procedure(task, query_context).await?;
1046        Ok(Output::new_with_affected_rows(0))
1047    }
1048
1049    #[cfg(feature = "enterprise")]
1050    async fn drop_trigger_procedure(
1051        &self,
1052        expr: DropTriggerTask,
1053        query_context: QueryContextRef,
1054    ) -> Result<SubmitDdlTaskResponse> {
1055        let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_drop_trigger(expr));
1056
1057        self.procedure_executor
1058            .submit_ddl_task(&ExecutorContext::default(), request)
1059            .await
1060            .context(error::ExecuteDdlSnafu)
1061    }
1062
1063    /// Drop a view
1064    #[tracing::instrument(skip_all)]
1065    pub(crate) async fn drop_view(
1066        &self,
1067        catalog: String,
1068        schema: String,
1069        view: String,
1070        drop_if_exists: bool,
1071        query_context: QueryContextRef,
1072    ) -> Result<Output> {
1073        let view_info = if let Some(view) = self
1074            .catalog_manager
1075            .table(&catalog, &schema, &view, None)
1076            .await
1077            .context(CatalogSnafu)?
1078        {
1079            view.table_info()
1080        } else if drop_if_exists {
1081            // DROP VIEW IF EXISTS meets view not found - ignored
1082            return Ok(Output::new_with_affected_rows(0));
1083        } else {
1084            return TableNotFoundSnafu {
1085                table_name: format_full_table_name(&catalog, &schema, &view),
1086            }
1087            .fail();
1088        };
1089
1090        // Ensure the exists one is view, we can't drop other table types
1091        ensure!(
1092            view_info.table_type == TableType::View,
1093            error::InvalidViewSnafu {
1094                msg: "not a view",
1095                view_name: format_full_table_name(&catalog, &schema, &view),
1096            }
1097        );
1098
1099        let view_id = view_info.table_id();
1100
1101        let task = DropViewTask {
1102            catalog,
1103            schema,
1104            view,
1105            view_id,
1106            drop_if_exists,
1107        };
1108
1109        self.drop_view_procedure(task, query_context).await?;
1110
1111        Ok(Output::new_with_affected_rows(0))
1112    }
1113
1114    /// Submit [DropViewTask] to procedure executor.
1115    async fn drop_view_procedure(
1116        &self,
1117        expr: DropViewTask,
1118        query_context: QueryContextRef,
1119    ) -> Result<SubmitDdlTaskResponse> {
1120        let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_drop_view(expr));
1121
1122        self.procedure_executor
1123            .submit_ddl_task(&ExecutorContext::default(), request)
1124            .await
1125            .context(error::ExecuteDdlSnafu)
1126    }
1127
1128    #[tracing::instrument(skip_all)]
1129    pub async fn alter_logical_tables(
1130        &self,
1131        alter_table_exprs: Vec<AlterTableExpr>,
1132        query_context: QueryContextRef,
1133    ) -> Result<Output> {
1134        let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
1135        ensure!(
1136            !alter_table_exprs.is_empty(),
1137            EmptyDdlExprSnafu {
1138                name: "alter logical tables"
1139            }
1140        );
1141
1142        // group by physical table id
1143        let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
1144        for expr in alter_table_exprs {
1145            // Get table_id from catalog_manager
1146            let catalog = if expr.catalog_name.is_empty() {
1147                query_context.current_catalog()
1148            } else {
1149                &expr.catalog_name
1150            };
1151            let schema = if expr.schema_name.is_empty() {
1152                query_context.current_schema()
1153            } else {
1154                expr.schema_name.clone()
1155            };
1156            let table_name = &expr.table_name;
1157            let table = self
1158                .catalog_manager
1159                .table(catalog, &schema, table_name, Some(&query_context))
1160                .await
1161                .context(CatalogSnafu)?
1162                .with_context(|| TableNotFoundSnafu {
1163                    table_name: format_full_table_name(catalog, &schema, table_name),
1164                })?;
1165            let table_id = table.table_info().ident.table_id;
1166            let physical_table_id = self
1167                .table_metadata_manager
1168                .table_route_manager()
1169                .get_physical_table_id(table_id)
1170                .await
1171                .context(TableMetadataManagerSnafu)?;
1172            groups.entry(physical_table_id).or_default().push(expr);
1173        }
1174
1175        // Submit procedure for each physical table
1176        let mut handles = Vec::with_capacity(groups.len());
1177        for (_physical_table_id, exprs) in groups {
1178            let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
1179            handles.push(fut);
1180        }
1181        let _results = futures::future::try_join_all(handles).await?;
1182
1183        Ok(Output::new_with_affected_rows(0))
1184    }
1185
1186    #[tracing::instrument(skip_all)]
1187    pub async fn drop_table(
1188        &self,
1189        table_name: TableName,
1190        drop_if_exists: bool,
1191        query_context: QueryContextRef,
1192    ) -> Result<Output> {
1193        // Reserved for grpc call
1194        self.drop_tables(&[table_name], drop_if_exists, query_context)
1195            .await
1196    }
1197
1198    #[tracing::instrument(skip_all)]
1199    pub async fn drop_tables(
1200        &self,
1201        table_names: &[TableName],
1202        drop_if_exists: bool,
1203        query_context: QueryContextRef,
1204    ) -> Result<Output> {
1205        let mut tables = Vec::with_capacity(table_names.len());
1206        for table_name in table_names {
1207            ensure!(
1208                !is_readonly_schema(&table_name.schema_name),
1209                SchemaReadOnlySnafu {
1210                    name: table_name.schema_name.clone()
1211                }
1212            );
1213
1214            if let Some(table) = self
1215                .catalog_manager
1216                .table(
1217                    &table_name.catalog_name,
1218                    &table_name.schema_name,
1219                    &table_name.table_name,
1220                    Some(&query_context),
1221                )
1222                .await
1223                .context(CatalogSnafu)?
1224            {
1225                tables.push(table.table_info().table_id());
1226            } else if drop_if_exists {
1227                // DROP TABLE IF EXISTS meets table not found - ignored
1228                continue;
1229            } else {
1230                return TableNotFoundSnafu {
1231                    table_name: table_name.to_string(),
1232                }
1233                .fail();
1234            }
1235        }
1236
1237        for (table_name, table_id) in table_names.iter().zip(tables.into_iter()) {
1238            self.drop_table_procedure(table_name, table_id, drop_if_exists, query_context.clone())
1239                .await?;
1240
1241            // Invalidates local cache ASAP.
1242            self.cache_invalidator
1243                .invalidate(
1244                    &Context::default(),
1245                    &[
1246                        CacheIdent::TableId(table_id),
1247                        CacheIdent::TableName(table_name.clone()),
1248                    ],
1249                )
1250                .await
1251                .context(error::InvalidateTableCacheSnafu)?;
1252        }
1253        Ok(Output::new_with_affected_rows(0))
1254    }
1255
1256    #[tracing::instrument(skip_all)]
1257    pub async fn drop_database(
1258        &self,
1259        catalog: String,
1260        schema: String,
1261        drop_if_exists: bool,
1262        query_context: QueryContextRef,
1263    ) -> Result<Output> {
1264        ensure!(
1265            !is_readonly_schema(&schema),
1266            SchemaReadOnlySnafu { name: schema }
1267        );
1268
1269        if self
1270            .catalog_manager
1271            .schema_exists(&catalog, &schema, None)
1272            .await
1273            .context(CatalogSnafu)?
1274        {
1275            if schema == query_context.current_schema() {
1276                SchemaInUseSnafu { name: schema }.fail()
1277            } else {
1278                self.drop_database_procedure(catalog, schema, drop_if_exists, query_context)
1279                    .await?;
1280
1281                Ok(Output::new_with_affected_rows(0))
1282            }
1283        } else if drop_if_exists {
1284            // DROP TABLE IF EXISTS meets table not found - ignored
1285            Ok(Output::new_with_affected_rows(0))
1286        } else {
1287            SchemaNotFoundSnafu {
1288                schema_info: schema,
1289            }
1290            .fail()
1291        }
1292    }
1293
1294    #[tracing::instrument(skip_all)]
1295    pub async fn truncate_table(
1296        &self,
1297        table_name: TableName,
1298        time_ranges: Vec<(Timestamp, Timestamp)>,
1299        query_context: QueryContextRef,
1300    ) -> Result<Output> {
1301        ensure!(
1302            !is_readonly_schema(&table_name.schema_name),
1303            SchemaReadOnlySnafu {
1304                name: table_name.schema_name.clone()
1305            }
1306        );
1307
1308        let table = self
1309            .catalog_manager
1310            .table(
1311                &table_name.catalog_name,
1312                &table_name.schema_name,
1313                &table_name.table_name,
1314                Some(&query_context),
1315            )
1316            .await
1317            .context(CatalogSnafu)?
1318            .with_context(|| TableNotFoundSnafu {
1319                table_name: table_name.to_string(),
1320            })?;
1321        let table_id = table.table_info().table_id();
1322        self.truncate_table_procedure(&table_name, table_id, time_ranges, query_context)
1323            .await?;
1324
1325        Ok(Output::new_with_affected_rows(0))
1326    }
1327
1328    #[tracing::instrument(skip_all)]
1329    pub async fn alter_table(
1330        &self,
1331        alter_table: AlterTable,
1332        query_context: QueryContextRef,
1333    ) -> Result<Output> {
1334        if matches!(
1335            alter_table.alter_operation(),
1336            AlterTableOperation::Repartition { .. }
1337        ) {
1338            let request = expr_helper::to_repartition_request(alter_table, &query_context)?;
1339            return self.repartition_table(request, &query_context).await;
1340        }
1341
1342        let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
1343        self.alter_table_inner(expr, query_context).await
1344    }
1345
1346    #[tracing::instrument(skip_all)]
1347    pub async fn repartition_table(
1348        &self,
1349        request: RepartitionRequest,
1350        query_context: &QueryContextRef,
1351    ) -> Result<Output> {
1352        // Check if the schema is read-only.
1353        ensure!(
1354            !is_readonly_schema(&request.schema_name),
1355            SchemaReadOnlySnafu {
1356                name: request.schema_name.clone()
1357            }
1358        );
1359
1360        let table_ref = TableReference::full(
1361            &request.catalog_name,
1362            &request.schema_name,
1363            &request.table_name,
1364        );
1365        // Get the table from the catalog.
1366        let table = self
1367            .catalog_manager
1368            .table(
1369                &request.catalog_name,
1370                &request.schema_name,
1371                &request.table_name,
1372                Some(query_context),
1373            )
1374            .await
1375            .context(CatalogSnafu)?
1376            .with_context(|| TableNotFoundSnafu {
1377                table_name: table_ref.to_string(),
1378            })?;
1379        let table_id = table.table_info().ident.table_id;
1380        // Get existing partition expressions from the table route.
1381        let (physical_table_id, physical_table_route) = self
1382            .table_metadata_manager
1383            .table_route_manager()
1384            .get_physical_table_route(table_id)
1385            .await
1386            .context(TableMetadataManagerSnafu)?;
1387
1388        ensure!(
1389            physical_table_id == table_id,
1390            NotSupportedSnafu {
1391                feat: "REPARTITION on logical tables"
1392            }
1393        );
1394
1395        let table_info = table.table_info();
1396        // Get partition column names from the table metadata.
1397        let existing_partition_columns = table_info.meta.partition_columns().collect::<Vec<_>>();
1398        // Repartition requires the table to have partition columns.
1399        ensure!(
1400            !existing_partition_columns.is_empty(),
1401            InvalidPartitionRuleSnafu {
1402                reason: format!(
1403                    "table {} does not have partition columns, cannot repartition",
1404                    table_ref
1405                )
1406            }
1407        );
1408
1409        // Repartition operations involving columns outside the existing partition columns are not supported.
1410        // This restriction ensures repartition only applies to current partition columns.
1411        let column_name_and_type = existing_partition_columns
1412            .iter()
1413            .map(|column| (&column.name, column.data_type.clone()))
1414            .collect();
1415        let timezone = query_context.timezone();
1416        // Convert SQL Exprs to PartitionExprs.
1417        let from_partition_exprs = request
1418            .from_exprs
1419            .iter()
1420            .map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone))
1421            .collect::<Result<Vec<_>>>()?;
1422
1423        let mut into_partition_exprs = request
1424            .into_exprs
1425            .iter()
1426            .map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone))
1427            .collect::<Result<Vec<_>>>()?;
1428
1429        // `MERGE PARTITION` (and some `REPARTITION`) generates a single `OR` expression from
1430        // multiple source partitions; try to simplify it for better readability and stability.
1431        if from_partition_exprs.len() > 1
1432            && into_partition_exprs.len() == 1
1433            && let Some(expr) = into_partition_exprs.pop()
1434        {
1435            into_partition_exprs.push(partition::simplify::simplify_merged_partition_expr(expr));
1436        }
1437
1438        // Parse existing partition expressions from region routes.
1439        let mut existing_partition_exprs =
1440            Vec::with_capacity(physical_table_route.region_routes.len());
1441        for route in &physical_table_route.region_routes {
1442            let expr_json = route.region.partition_expr();
1443            if !expr_json.is_empty() {
1444                match PartitionExpr::from_json_str(&expr_json) {
1445                    Ok(Some(expr)) => existing_partition_exprs.push(expr),
1446                    Ok(None) => {
1447                        // Empty
1448                    }
1449                    Err(e) => {
1450                        return Err(e).context(DeserializePartitionExprSnafu);
1451                    }
1452                }
1453            }
1454        }
1455
1456        // Validate that from_partition_exprs are a subset of existing partition exprs.
1457        // We compare PartitionExpr directly since it implements Eq.
1458        for from_expr in &from_partition_exprs {
1459            ensure!(
1460                existing_partition_exprs.contains(from_expr),
1461                InvalidPartitionRuleSnafu {
1462                    reason: format!(
1463                        "partition expression '{}' does not exist in table {}",
1464                        from_expr, table_ref
1465                    )
1466                }
1467            );
1468        }
1469
1470        // Build the new partition expressions:
1471        // new_exprs = existing_exprs - from_exprs + into_exprs
1472        let new_partition_exprs: Vec<PartitionExpr> = existing_partition_exprs
1473            .into_iter()
1474            .filter(|expr| !from_partition_exprs.contains(expr))
1475            .chain(into_partition_exprs.clone().into_iter())
1476            .collect();
1477        let new_partition_exprs_len = new_partition_exprs.len();
1478        let from_partition_exprs_len = from_partition_exprs.len();
1479
1480        // Validate the new partition expressions using MultiDimPartitionRule and PartitionChecker.
1481        let _ = MultiDimPartitionRule::try_new(
1482            existing_partition_columns
1483                .iter()
1484                .map(|c| c.name.clone())
1485                .collect(),
1486            vec![],
1487            new_partition_exprs,
1488            true,
1489        )
1490        .context(InvalidPartitionSnafu)?;
1491
1492        let ddl_options = parse_ddl_options(&request.options)?;
1493        let serialize_exprs = |exprs: Vec<PartitionExpr>| -> Result<Vec<String>> {
1494            let mut json_exprs = Vec::with_capacity(exprs.len());
1495            for expr in exprs {
1496                json_exprs.push(expr.as_json_str().context(SerializePartitionExprSnafu)?);
1497            }
1498            Ok(json_exprs)
1499        };
1500        let from_partition_exprs_json = serialize_exprs(from_partition_exprs)?;
1501        let into_partition_exprs_json = serialize_exprs(into_partition_exprs)?;
1502        let mut req = SubmitDdlTaskRequest::new(
1503            query_context.clone(),
1504            DdlTask::new_alter_table(AlterTableExpr {
1505                catalog_name: request.catalog_name.clone(),
1506                schema_name: request.schema_name.clone(),
1507                table_name: request.table_name.clone(),
1508                kind: Some(Kind::Repartition(Repartition {
1509                    from_partition_exprs: from_partition_exprs_json,
1510                    into_partition_exprs: into_partition_exprs_json,
1511                })),
1512            }),
1513        );
1514        req.wait = ddl_options.wait;
1515        req.timeout = ddl_options.timeout;
1516
1517        info!(
1518            "Submitting repartition task for table {} (table_id={}), from {} to {} partitions, timeout: {:?}, wait: {}",
1519            table_ref,
1520            table_id,
1521            from_partition_exprs_len,
1522            new_partition_exprs_len,
1523            ddl_options.timeout,
1524            ddl_options.wait
1525        );
1526
1527        let response = self
1528            .procedure_executor
1529            .submit_ddl_task(&ExecutorContext::default(), req)
1530            .await
1531            .context(error::ExecuteDdlSnafu)?;
1532
1533        if !ddl_options.wait {
1534            return build_procedure_id_output(response.key);
1535        }
1536
1537        // Only invalidate cache if wait is true.
1538        let invalidate_keys = vec![
1539            CacheIdent::TableId(table_id),
1540            CacheIdent::TableName(TableName::new(
1541                request.catalog_name,
1542                request.schema_name,
1543                request.table_name,
1544            )),
1545        ];
1546
1547        // Invalidates local cache ASAP.
1548        self.cache_invalidator
1549            .invalidate(&Context::default(), &invalidate_keys)
1550            .await
1551            .context(error::InvalidateTableCacheSnafu)?;
1552
1553        Ok(Output::new_with_affected_rows(0))
1554    }
1555
1556    #[tracing::instrument(skip_all)]
1557    pub async fn alter_table_inner(
1558        &self,
1559        expr: AlterTableExpr,
1560        query_context: QueryContextRef,
1561    ) -> Result<Output> {
1562        ensure!(
1563            !is_readonly_schema(&expr.schema_name),
1564            SchemaReadOnlySnafu {
1565                name: expr.schema_name.clone()
1566            }
1567        );
1568
1569        let catalog_name = if expr.catalog_name.is_empty() {
1570            DEFAULT_CATALOG_NAME.to_string()
1571        } else {
1572            expr.catalog_name.clone()
1573        };
1574
1575        let schema_name = if expr.schema_name.is_empty() {
1576            DEFAULT_SCHEMA_NAME.to_string()
1577        } else {
1578            expr.schema_name.clone()
1579        };
1580
1581        let table_name = expr.table_name.clone();
1582
1583        let table = self
1584            .catalog_manager
1585            .table(
1586                &catalog_name,
1587                &schema_name,
1588                &table_name,
1589                Some(&query_context),
1590            )
1591            .await
1592            .context(CatalogSnafu)?
1593            .with_context(|| TableNotFoundSnafu {
1594                table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
1595            })?;
1596
1597        let table_id = table.table_info().ident.table_id;
1598        let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?;
1599        if !need_alter {
1600            return Ok(Output::new_with_affected_rows(0));
1601        }
1602        info!(
1603            "Table info before alter is {:?}, expr: {:?}",
1604            table.table_info(),
1605            expr
1606        );
1607
1608        let physical_table_id = self
1609            .table_metadata_manager
1610            .table_route_manager()
1611            .get_physical_table_id(table_id)
1612            .await
1613            .context(TableMetadataManagerSnafu)?;
1614
1615        let (req, invalidate_keys) = if physical_table_id == table_id {
1616            // This is physical table
1617            let req = SubmitDdlTaskRequest::new(query_context, DdlTask::new_alter_table(expr));
1618
1619            let invalidate_keys = vec![
1620                CacheIdent::TableId(table_id),
1621                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1622            ];
1623
1624            (req, invalidate_keys)
1625        } else {
1626            // This is logical table
1627            let req = SubmitDdlTaskRequest::new(
1628                query_context,
1629                DdlTask::new_alter_logical_tables(vec![expr]),
1630            );
1631
1632            let mut invalidate_keys = vec![
1633                CacheIdent::TableId(physical_table_id),
1634                CacheIdent::TableId(table_id),
1635                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1636            ];
1637
1638            let physical_table = self
1639                .table_metadata_manager
1640                .table_info_manager()
1641                .get(physical_table_id)
1642                .await
1643                .context(TableMetadataManagerSnafu)?
1644                .map(|x| x.into_inner());
1645            if let Some(physical_table) = physical_table {
1646                let physical_table_name = TableName::new(
1647                    physical_table.table_info.catalog_name,
1648                    physical_table.table_info.schema_name,
1649                    physical_table.table_info.name,
1650                );
1651                invalidate_keys.push(CacheIdent::TableName(physical_table_name));
1652            }
1653
1654            (req, invalidate_keys)
1655        };
1656
1657        self.procedure_executor
1658            .submit_ddl_task(&ExecutorContext::default(), req)
1659            .await
1660            .context(error::ExecuteDdlSnafu)?;
1661
1662        // Invalidates local cache ASAP.
1663        self.cache_invalidator
1664            .invalidate(&Context::default(), &invalidate_keys)
1665            .await
1666            .context(error::InvalidateTableCacheSnafu)?;
1667
1668        Ok(Output::new_with_affected_rows(0))
1669    }
1670
1671    #[cfg(feature = "enterprise")]
1672    #[tracing::instrument(skip_all)]
1673    pub async fn alter_trigger(
1674        &self,
1675        _alter_expr: AlterTrigger,
1676        _query_context: QueryContextRef,
1677    ) -> Result<Output> {
1678        crate::error::NotSupportedSnafu {
1679            feat: "alter trigger",
1680        }
1681        .fail()
1682    }
1683
1684    #[tracing::instrument(skip_all)]
1685    pub async fn alter_database(
1686        &self,
1687        alter_expr: AlterDatabase,
1688        query_context: QueryContextRef,
1689    ) -> Result<Output> {
1690        let alter_expr = expr_helper::to_alter_database_expr(alter_expr, &query_context)?;
1691        self.alter_database_inner(alter_expr, query_context).await
1692    }
1693
1694    #[tracing::instrument(skip_all)]
1695    pub async fn alter_database_inner(
1696        &self,
1697        alter_expr: AlterDatabaseExpr,
1698        query_context: QueryContextRef,
1699    ) -> Result<Output> {
1700        ensure!(
1701            !is_readonly_schema(&alter_expr.schema_name),
1702            SchemaReadOnlySnafu {
1703                name: query_context.current_schema().clone()
1704            }
1705        );
1706
1707        let exists = self
1708            .catalog_manager
1709            .schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
1710            .await
1711            .context(CatalogSnafu)?;
1712        ensure!(
1713            exists,
1714            SchemaNotFoundSnafu {
1715                schema_info: alter_expr.schema_name,
1716            }
1717        );
1718
1719        let cache_ident = [CacheIdent::SchemaName(SchemaName {
1720            catalog_name: alter_expr.catalog_name.clone(),
1721            schema_name: alter_expr.schema_name.clone(),
1722        })];
1723
1724        self.alter_database_procedure(alter_expr, query_context)
1725            .await?;
1726
1727        // Invalidates local cache ASAP.
1728        self.cache_invalidator
1729            .invalidate(&Context::default(), &cache_ident)
1730            .await
1731            .context(error::InvalidateTableCacheSnafu)?;
1732
1733        Ok(Output::new_with_affected_rows(0))
1734    }
1735
1736    async fn create_table_procedure(
1737        &self,
1738        create_table: CreateTableExpr,
1739        partitions: Vec<PartitionExpr>,
1740        table_info: RawTableInfo,
1741        query_context: QueryContextRef,
1742    ) -> Result<SubmitDdlTaskResponse> {
1743        let partitions = partitions
1744            .into_iter()
1745            .map(|expr| expr.as_pb_partition().context(PartitionExprToPbSnafu))
1746            .collect::<Result<Vec<_>>>()?;
1747
1748        let request = SubmitDdlTaskRequest::new(
1749            query_context,
1750            DdlTask::new_create_table(create_table, partitions, table_info),
1751        );
1752
1753        self.procedure_executor
1754            .submit_ddl_task(&ExecutorContext::default(), request)
1755            .await
1756            .context(error::ExecuteDdlSnafu)
1757    }
1758
1759    async fn create_logical_tables_procedure(
1760        &self,
1761        tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
1762        query_context: QueryContextRef,
1763    ) -> Result<SubmitDdlTaskResponse> {
1764        let request = SubmitDdlTaskRequest::new(
1765            query_context,
1766            DdlTask::new_create_logical_tables(tables_data),
1767        );
1768
1769        self.procedure_executor
1770            .submit_ddl_task(&ExecutorContext::default(), request)
1771            .await
1772            .context(error::ExecuteDdlSnafu)
1773    }
1774
1775    async fn alter_logical_tables_procedure(
1776        &self,
1777        tables_data: Vec<AlterTableExpr>,
1778        query_context: QueryContextRef,
1779    ) -> Result<SubmitDdlTaskResponse> {
1780        let request = SubmitDdlTaskRequest::new(
1781            query_context,
1782            DdlTask::new_alter_logical_tables(tables_data),
1783        );
1784
1785        self.procedure_executor
1786            .submit_ddl_task(&ExecutorContext::default(), request)
1787            .await
1788            .context(error::ExecuteDdlSnafu)
1789    }
1790
1791    async fn drop_table_procedure(
1792        &self,
1793        table_name: &TableName,
1794        table_id: TableId,
1795        drop_if_exists: bool,
1796        query_context: QueryContextRef,
1797    ) -> Result<SubmitDdlTaskResponse> {
1798        let request = SubmitDdlTaskRequest::new(
1799            query_context,
1800            DdlTask::new_drop_table(
1801                table_name.catalog_name.clone(),
1802                table_name.schema_name.clone(),
1803                table_name.table_name.clone(),
1804                table_id,
1805                drop_if_exists,
1806            ),
1807        );
1808
1809        self.procedure_executor
1810            .submit_ddl_task(&ExecutorContext::default(), request)
1811            .await
1812            .context(error::ExecuteDdlSnafu)
1813    }
1814
1815    async fn drop_database_procedure(
1816        &self,
1817        catalog: String,
1818        schema: String,
1819        drop_if_exists: bool,
1820        query_context: QueryContextRef,
1821    ) -> Result<SubmitDdlTaskResponse> {
1822        let request = SubmitDdlTaskRequest::new(
1823            query_context,
1824            DdlTask::new_drop_database(catalog, schema, drop_if_exists),
1825        );
1826
1827        self.procedure_executor
1828            .submit_ddl_task(&ExecutorContext::default(), request)
1829            .await
1830            .context(error::ExecuteDdlSnafu)
1831    }
1832
1833    async fn alter_database_procedure(
1834        &self,
1835        alter_expr: AlterDatabaseExpr,
1836        query_context: QueryContextRef,
1837    ) -> Result<SubmitDdlTaskResponse> {
1838        let request =
1839            SubmitDdlTaskRequest::new(query_context, DdlTask::new_alter_database(alter_expr));
1840
1841        self.procedure_executor
1842            .submit_ddl_task(&ExecutorContext::default(), request)
1843            .await
1844            .context(error::ExecuteDdlSnafu)
1845    }
1846
1847    async fn truncate_table_procedure(
1848        &self,
1849        table_name: &TableName,
1850        table_id: TableId,
1851        time_ranges: Vec<(Timestamp, Timestamp)>,
1852        query_context: QueryContextRef,
1853    ) -> Result<SubmitDdlTaskResponse> {
1854        let request = SubmitDdlTaskRequest::new(
1855            query_context,
1856            DdlTask::new_truncate_table(
1857                table_name.catalog_name.clone(),
1858                table_name.schema_name.clone(),
1859                table_name.table_name.clone(),
1860                table_id,
1861                time_ranges,
1862            ),
1863        );
1864
1865        self.procedure_executor
1866            .submit_ddl_task(&ExecutorContext::default(), request)
1867            .await
1868            .context(error::ExecuteDdlSnafu)
1869    }
1870
1871    #[tracing::instrument(skip_all)]
1872    pub async fn create_database(
1873        &self,
1874        database: &str,
1875        create_if_not_exists: bool,
1876        options: HashMap<String, String>,
1877        query_context: QueryContextRef,
1878    ) -> Result<Output> {
1879        let catalog = query_context.current_catalog();
1880        ensure!(
1881            NAME_PATTERN_REG.is_match(catalog),
1882            error::UnexpectedSnafu {
1883                violated: format!("Invalid catalog name: {}", catalog)
1884            }
1885        );
1886
1887        ensure!(
1888            NAME_PATTERN_REG.is_match(database),
1889            error::UnexpectedSnafu {
1890                violated: format!("Invalid database name: {}", database)
1891            }
1892        );
1893
1894        if !self
1895            .catalog_manager
1896            .schema_exists(catalog, database, None)
1897            .await
1898            .context(CatalogSnafu)?
1899            && !self.catalog_manager.is_reserved_schema_name(database)
1900        {
1901            self.create_database_procedure(
1902                catalog.to_string(),
1903                database.to_string(),
1904                create_if_not_exists,
1905                options,
1906                query_context,
1907            )
1908            .await?;
1909
1910            Ok(Output::new_with_affected_rows(1))
1911        } else if create_if_not_exists {
1912            Ok(Output::new_with_affected_rows(1))
1913        } else {
1914            error::SchemaExistsSnafu { name: database }.fail()
1915        }
1916    }
1917
1918    async fn create_database_procedure(
1919        &self,
1920        catalog: String,
1921        database: String,
1922        create_if_not_exists: bool,
1923        options: HashMap<String, String>,
1924        query_context: QueryContextRef,
1925    ) -> Result<SubmitDdlTaskResponse> {
1926        let request = SubmitDdlTaskRequest::new(
1927            query_context,
1928            DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
1929        );
1930
1931        self.procedure_executor
1932            .submit_ddl_task(&ExecutorContext::default(), request)
1933            .await
1934            .context(error::ExecuteDdlSnafu)
1935    }
1936}
1937
1938/// Parse partition statement [Partitions] into [PartitionExpr] and partition columns.
1939pub fn parse_partitions(
1940    create_table: &CreateTableExpr,
1941    partitions: Option<Partitions>,
1942    query_ctx: &QueryContextRef,
1943) -> Result<(Vec<PartitionExpr>, Vec<String>)> {
1944    // If partitions are not defined by user, use the timestamp column (which has to be existed) as
1945    // the partition column, and create only one partition.
1946    let partition_columns = find_partition_columns(&partitions)?;
1947    let partition_exprs =
1948        find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
1949
1950    // Validates partition
1951    let exprs = partition_exprs.clone();
1952    MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true)
1953        .context(InvalidPartitionSnafu)?;
1954
1955    Ok((partition_exprs, partition_columns))
1956}
1957
1958fn parse_partitions_for_logical_validation(
1959    create_table: &CreateTableExpr,
1960    partitions: &Partitions,
1961    query_ctx: &QueryContextRef,
1962) -> Result<(Vec<String>, Vec<PartitionExpr>)> {
1963    let partition_columns = partitions
1964        .column_list
1965        .iter()
1966        .map(|ident| ident.value.clone())
1967        .collect::<Vec<_>>();
1968
1969    let column_name_and_type = partition_columns
1970        .iter()
1971        .map(|pc| {
1972            let column = create_table
1973                .column_defs
1974                .iter()
1975                .find(|c| &c.name == pc)
1976                .context(ColumnNotFoundSnafu { msg: pc.clone() })?;
1977            let column_name = &column.name;
1978            let data_type = ConcreteDataType::from(
1979                ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
1980                    .context(ColumnDataTypeSnafu)?,
1981            );
1982            Ok((column_name, data_type))
1983        })
1984        .collect::<Result<HashMap<_, _>>>()?;
1985
1986    let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
1987    for expr in &partitions.exprs {
1988        let partition_expr = convert_one_expr(expr, &column_name_and_type, &query_ctx.timezone())?;
1989        partition_exprs.push(partition_expr);
1990    }
1991
1992    MultiDimPartitionRule::try_new(
1993        partition_columns.clone(),
1994        vec![],
1995        partition_exprs.clone(),
1996        true,
1997    )
1998    .context(InvalidPartitionSnafu)?;
1999
2000    Ok((partition_columns, partition_exprs))
2001}
2002
2003/// Verifies an alter and returns whether it is necessary to perform the alter.
2004///
2005/// # Returns
2006///
2007/// Returns true if the alter need to be porformed; otherwise, it returns false.
2008pub fn verify_alter(
2009    table_id: TableId,
2010    table_info: Arc<TableInfo>,
2011    expr: AlterTableExpr,
2012) -> Result<bool> {
2013    let request: AlterTableRequest =
2014        common_grpc_expr::alter_expr_to_request(table_id, expr, Some(&table_info.meta))
2015            .context(AlterExprToRequestSnafu)?;
2016
2017    let AlterTableRequest {
2018        table_name,
2019        alter_kind,
2020        ..
2021    } = &request;
2022
2023    if let AlterKind::RenameTable { new_table_name } = alter_kind {
2024        ensure!(
2025            NAME_PATTERN_REG.is_match(new_table_name),
2026            error::UnexpectedSnafu {
2027                violated: format!("Invalid table name: {}", new_table_name)
2028            }
2029        );
2030    } else if let AlterKind::AddColumns { columns } = alter_kind {
2031        // If all the columns are marked as add_if_not_exists and they already exist in the table,
2032        // there is no need to perform the alter.
2033        let column_names: HashSet<_> = table_info
2034            .meta
2035            .schema
2036            .column_schemas()
2037            .iter()
2038            .map(|schema| &schema.name)
2039            .collect();
2040        if columns.iter().all(|column| {
2041            column_names.contains(&column.column_schema.name) && column.add_if_not_exists
2042        }) {
2043            return Ok(false);
2044        }
2045    }
2046
2047    let _ = table_info
2048        .meta
2049        .builder_with_alter_kind(table_name, &request.alter_kind)
2050        .context(error::TableSnafu)?
2051        .build()
2052        .context(error::BuildTableMetaSnafu { table_name })?;
2053
2054    Ok(true)
2055}
2056
2057pub fn create_table_info(
2058    create_table: &CreateTableExpr,
2059    partition_columns: Vec<String>,
2060) -> Result<RawTableInfo> {
2061    let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
2062    let mut column_name_to_index_map = HashMap::new();
2063
2064    for (idx, column) in create_table.column_defs.iter().enumerate() {
2065        let schema =
2066            column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
2067                column: &column.name,
2068            })?;
2069        let schema = schema.with_time_index(column.name == create_table.time_index);
2070
2071        column_schemas.push(schema);
2072        let _ = column_name_to_index_map.insert(column.name.clone(), idx);
2073    }
2074
2075    let timestamp_index = column_name_to_index_map
2076        .get(&create_table.time_index)
2077        .cloned();
2078
2079    let raw_schema = RawSchema {
2080        column_schemas: column_schemas.clone(),
2081        timestamp_index,
2082        version: 0,
2083    };
2084
2085    let primary_key_indices = create_table
2086        .primary_keys
2087        .iter()
2088        .map(|name| {
2089            column_name_to_index_map
2090                .get(name)
2091                .cloned()
2092                .context(ColumnNotFoundSnafu { msg: name })
2093        })
2094        .collect::<Result<Vec<_>>>()?;
2095
2096    let partition_key_indices = partition_columns
2097        .into_iter()
2098        .map(|col_name| {
2099            column_name_to_index_map
2100                .get(&col_name)
2101                .cloned()
2102                .context(ColumnNotFoundSnafu { msg: col_name })
2103        })
2104        .collect::<Result<Vec<_>>>()?;
2105
2106    let table_options = TableOptions::try_from_iter(&create_table.table_options)
2107        .context(UnrecognizedTableOptionSnafu)?;
2108
2109    let meta = RawTableMeta {
2110        schema: raw_schema,
2111        primary_key_indices,
2112        value_indices: vec![],
2113        engine: create_table.engine.clone(),
2114        next_column_id: column_schemas.len() as u32,
2115        options: table_options,
2116        created_on: Utc::now(),
2117        updated_on: Utc::now(),
2118        partition_key_indices,
2119        column_ids: vec![],
2120    };
2121
2122    let desc = if create_table.desc.is_empty() {
2123        create_table.table_options.get(COMMENT_KEY).cloned()
2124    } else {
2125        Some(create_table.desc.clone())
2126    };
2127
2128    let table_info = RawTableInfo {
2129        ident: metadata::TableIdent {
2130            // The table id of distributed table is assigned by Meta, set "0" here as a placeholder.
2131            table_id: 0,
2132            version: 0,
2133        },
2134        name: create_table.table_name.clone(),
2135        desc,
2136        catalog_name: create_table.catalog_name.clone(),
2137        schema_name: create_table.schema_name.clone(),
2138        meta,
2139        table_type: TableType::Base,
2140    };
2141    Ok(table_info)
2142}
2143
2144fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
2145    let columns = if let Some(partitions) = partitions {
2146        partitions
2147            .column_list
2148            .iter()
2149            .map(|x| x.value.clone())
2150            .collect::<Vec<_>>()
2151    } else {
2152        vec![]
2153    };
2154    Ok(columns)
2155}
2156
2157/// Parse [Partitions] into a group of partition entries.
2158///
2159/// Returns a list of [PartitionExpr], each of which defines a partition.
2160fn find_partition_entries(
2161    create_table: &CreateTableExpr,
2162    partitions: &Option<Partitions>,
2163    partition_columns: &[String],
2164    query_ctx: &QueryContextRef,
2165) -> Result<Vec<PartitionExpr>> {
2166    let Some(partitions) = partitions else {
2167        return Ok(vec![]);
2168    };
2169
2170    // extract concrete data type of partition columns
2171    let column_name_and_type = partition_columns
2172        .iter()
2173        .map(|pc| {
2174            let column = create_table
2175                .column_defs
2176                .iter()
2177                .find(|c| &c.name == pc)
2178                // unwrap is safe here because we have checked that partition columns are defined
2179                .unwrap();
2180            let column_name = &column.name;
2181            let data_type = ConcreteDataType::from(
2182                ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
2183                    .context(ColumnDataTypeSnafu)?,
2184            );
2185            Ok((column_name, data_type))
2186        })
2187        .collect::<Result<HashMap<_, _>>>()?;
2188
2189    // Transform parser expr to partition expr
2190    let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
2191    for partition in &partitions.exprs {
2192        let partition_expr =
2193            convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
2194        partition_exprs.push(partition_expr);
2195    }
2196
2197    Ok(partition_exprs)
2198}
2199
2200fn convert_one_expr(
2201    expr: &Expr,
2202    column_name_and_type: &HashMap<&String, ConcreteDataType>,
2203    timezone: &Timezone,
2204) -> Result<PartitionExpr> {
2205    let Expr::BinaryOp { left, op, right } = expr else {
2206        return InvalidPartitionRuleSnafu {
2207            reason: "partition rule must be a binary expression",
2208        }
2209        .fail();
2210    };
2211
2212    let op =
2213        RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
2214            reason: format!("unsupported operator in partition expr {op}"),
2215        })?;
2216
2217    // convert leaf node.
2218    let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
2219        // col, val
2220        (Expr::Identifier(ident), Expr::Value(value)) => {
2221            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2222            let value = convert_value(&value.value, data_type, timezone, None)?;
2223            (Operand::Column(column_name), op, Operand::Value(value))
2224        }
2225        (Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
2226            if let Expr::Value(v) = &**expr =>
2227        {
2228            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2229            let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
2230            (Operand::Column(column_name), op, Operand::Value(value))
2231        }
2232        // val, col
2233        (Expr::Value(value), Expr::Identifier(ident)) => {
2234            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2235            let value = convert_value(&value.value, data_type, timezone, None)?;
2236            (Operand::Value(value), op, Operand::Column(column_name))
2237        }
2238        (Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
2239            if let Expr::Value(v) = &**expr =>
2240        {
2241            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2242            let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
2243            (Operand::Value(value), op, Operand::Column(column_name))
2244        }
2245        (Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
2246            // sub-expr must against another sub-expr
2247            let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
2248            let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
2249            (Operand::Expr(lhs), op, Operand::Expr(rhs))
2250        }
2251        _ => {
2252            return InvalidPartitionRuleSnafu {
2253                reason: format!("invalid partition expr {expr}"),
2254            }
2255            .fail();
2256        }
2257    };
2258
2259    Ok(PartitionExpr::new(lhs, op, rhs))
2260}
2261
2262fn convert_identifier(
2263    ident: &Ident,
2264    column_name_and_type: &HashMap<&String, ConcreteDataType>,
2265) -> Result<(String, ConcreteDataType)> {
2266    let column_name = ident.value.clone();
2267    let data_type = column_name_and_type
2268        .get(&column_name)
2269        .cloned()
2270        .with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
2271    Ok((column_name, data_type))
2272}
2273
2274fn convert_value(
2275    value: &ParserValue,
2276    data_type: ConcreteDataType,
2277    timezone: &Timezone,
2278    unary_op: Option<UnaryOperator>,
2279) -> Result<Value> {
2280    sql_value_to_value(
2281        &ColumnSchema::new("<NONAME>", data_type, true),
2282        value,
2283        Some(timezone),
2284        unary_op,
2285        false,
2286    )
2287    .context(error::SqlCommonSnafu)
2288}
2289
2290#[cfg(test)]
2291mod test {
2292    use std::time::Duration;
2293
2294    use session::context::{QueryContext, QueryContextBuilder};
2295    use sql::dialect::GreptimeDbDialect;
2296    use sql::parser::{ParseOptions, ParserContext};
2297    use sql::statements::statement::Statement;
2298    use sqlparser::parser::Parser;
2299
2300    use super::*;
2301    use crate::expr_helper;
2302
2303    #[test]
2304    fn test_parse_ddl_options() {
2305        let options = OptionMap::from([
2306            ("timeout".to_string(), "5m".to_string()),
2307            ("wait".to_string(), "false".to_string()),
2308        ]);
2309        let ddl_options = parse_ddl_options(&options).unwrap();
2310        assert!(!ddl_options.wait);
2311        assert_eq!(Duration::from_secs(300), ddl_options.timeout);
2312    }
2313
2314    #[test]
2315    fn test_name_is_match() {
2316        assert!(!NAME_PATTERN_REG.is_match("/adaf"));
2317        assert!(!NAME_PATTERN_REG.is_match("🈲"));
2318        assert!(NAME_PATTERN_REG.is_match("hello"));
2319        assert!(NAME_PATTERN_REG.is_match("test@"));
2320        assert!(!NAME_PATTERN_REG.is_match("@test"));
2321        assert!(NAME_PATTERN_REG.is_match("test#"));
2322        assert!(!NAME_PATTERN_REG.is_match("#test"));
2323        assert!(!NAME_PATTERN_REG.is_match("@"));
2324        assert!(!NAME_PATTERN_REG.is_match("#"));
2325    }
2326
2327    #[test]
2328    fn test_partition_expr_equivalence_with_swapped_operands() {
2329        let column_name = "device_id".to_string();
2330        let column_name_and_type =
2331            HashMap::from([(&column_name, ConcreteDataType::int32_datatype())]);
2332        let timezone = Timezone::from_tz_string("UTC").unwrap();
2333        let dialect = GreptimeDbDialect {};
2334
2335        let mut parser = Parser::new(&dialect)
2336            .try_with_sql("device_id < 100")
2337            .unwrap();
2338        let expr_left = parser.parse_expr().unwrap();
2339
2340        let mut parser = Parser::new(&dialect)
2341            .try_with_sql("100 > device_id")
2342            .unwrap();
2343        let expr_right = parser.parse_expr().unwrap();
2344
2345        let partition_left =
2346            convert_one_expr(&expr_left, &column_name_and_type, &timezone).unwrap();
2347        let partition_right =
2348            convert_one_expr(&expr_right, &column_name_and_type, &timezone).unwrap();
2349
2350        assert_eq!(partition_left, partition_right);
2351        assert!([partition_left.clone()].contains(&partition_right));
2352
2353        let mut physical_partition_exprs = vec![partition_left];
2354        let mut logical_partition_exprs = vec![partition_right];
2355        physical_partition_exprs.sort_unstable();
2356        logical_partition_exprs.sort_unstable();
2357        assert_eq!(physical_partition_exprs, logical_partition_exprs);
2358    }
2359
2360    #[tokio::test]
2361    #[ignore = "TODO(ruihang): WIP new partition rule"]
2362    async fn test_parse_partitions() {
2363        common_telemetry::init_default_ut_logging();
2364        let cases = [
2365            (
2366                r"
2367CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
2368PARTITION ON COLUMNS (b) (
2369  b < 'hz',
2370  b >= 'hz' AND b < 'sh',
2371  b >= 'sh'
2372)
2373ENGINE=mito",
2374                r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
2375            ),
2376            (
2377                r"
2378CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
2379PARTITION BY RANGE COLUMNS (b, a) (
2380  PARTITION r0 VALUES LESS THAN ('hz', 10),
2381  b < 'hz' AND a < 10,
2382  b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20,
2383  b >= 'sh' AND a >= 20
2384)
2385ENGINE=mito",
2386                r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
2387            ),
2388        ];
2389        let ctx = QueryContextBuilder::default().build().into();
2390        for (sql, expected) in cases {
2391            let result = ParserContext::create_with_dialect(
2392                sql,
2393                &GreptimeDbDialect {},
2394                ParseOptions::default(),
2395            )
2396            .unwrap();
2397            match &result[0] {
2398                Statement::CreateTable(c) => {
2399                    let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
2400                    let (partitions, _) =
2401                        parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
2402                    let json = serde_json::to_string(&partitions).unwrap();
2403                    assert_eq!(json, expected);
2404                }
2405                _ => unreachable!(),
2406            }
2407        }
2408    }
2409}