operator/statement/
ddl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use api::helper::ColumnDataTypeWrapper;
19use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
20use api::v1::{
21    AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, column_def,
22};
23#[cfg(feature = "enterprise")]
24use api::v1::{
25    CreateTriggerExpr as PbCreateTriggerExpr, meta::CreateTriggerTask as PbCreateTriggerTask,
26};
27use catalog::CatalogManagerRef;
28use chrono::Utc;
29use common_base::regex_pattern::NAME_PATTERN_REG;
30use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, is_readonly_schema};
31use common_catalog::{format_full_flow_name, format_full_table_name};
32use common_error::ext::BoxedError;
33use common_meta::cache_invalidator::Context;
34use common_meta::ddl::create_flow::FlowType;
35use common_meta::instruction::CacheIdent;
36use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
37use common_meta::procedure_executor::ExecutorContext;
38#[cfg(feature = "enterprise")]
39use common_meta::rpc::ddl::trigger::CreateTriggerTask;
40#[cfg(feature = "enterprise")]
41use common_meta::rpc::ddl::trigger::DropTriggerTask;
42use common_meta::rpc::ddl::{
43    CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
44    SubmitDdlTaskResponse,
45};
46use common_query::Output;
47use common_sql::convert::sql_value_to_value;
48use common_telemetry::{debug, info, tracing, warn};
49use common_time::{Timestamp, Timezone};
50use datafusion_common::tree_node::TreeNodeVisitor;
51use datafusion_expr::LogicalPlan;
52use datatypes::prelude::ConcreteDataType;
53use datatypes::schema::{RawSchema, Schema};
54use datatypes::value::Value;
55use partition::expr::{Operand, PartitionExpr, RestrictedOp};
56use partition::multi_dim::MultiDimPartitionRule;
57use query::parser::QueryStatement;
58use query::plan::extract_and_rewrite_full_table_names;
59use query::query_engine::DefaultSerializer;
60use query::sql::create_table_stmt;
61use session::context::QueryContextRef;
62use session::table_name::table_idents_to_full_name;
63use snafu::{OptionExt, ResultExt, ensure};
64use sql::parser::{ParseOptions, ParserContext};
65#[cfg(feature = "enterprise")]
66use sql::statements::alter::trigger::AlterTrigger;
67use sql::statements::alter::{AlterDatabase, AlterTable, AlterTableOperation};
68#[cfg(feature = "enterprise")]
69use sql::statements::create::trigger::CreateTrigger;
70use sql::statements::create::{
71    CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
72};
73use sql::statements::statement::Statement;
74use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
75use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
76use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
77use table::TableRef;
78use table::dist_table::DistTable;
79use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
80use table::requests::{AlterKind, AlterTableRequest, COMMENT_KEY, TableOptions};
81use table::table_name::TableName;
82
83use crate::error::{
84    self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
85    ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
86    EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu,
87    InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu, InvalidTableNameSnafu,
88    InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu, PartitionExprToPbSnafu, Result,
89    SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu,
90    TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
91    UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
92};
93use crate::expr_helper;
94use crate::statement::StatementExecutor;
95use crate::statement::show::create_partitions_stmt;
96
97impl StatementExecutor {
98    pub fn catalog_manager(&self) -> CatalogManagerRef {
99        self.catalog_manager.clone()
100    }
101
102    #[tracing::instrument(skip_all)]
103    pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
104        let (catalog, schema, _table) = table_idents_to_full_name(&stmt.name, &ctx)
105            .map_err(BoxedError::new)
106            .context(error::ExternalSnafu)?;
107
108        let schema_options = self
109            .table_metadata_manager
110            .schema_manager()
111            .get(SchemaNameKey {
112                catalog: &catalog,
113                schema: &schema,
114            })
115            .await
116            .context(TableMetadataManagerSnafu)?
117            .map(|v| v.into_inner());
118
119        let create_expr = &mut expr_helper::create_to_expr(&stmt, &ctx)?;
120        // We don't put ttl into the table options
121        // Because it will be used directly while compaction.
122        if let Some(schema_options) = schema_options {
123            for (key, value) in schema_options.extra_options.iter() {
124                create_expr
125                    .table_options
126                    .entry(key.clone())
127                    .or_insert(value.clone());
128            }
129        }
130
131        self.create_table_inner(create_expr, stmt.partitions, ctx)
132            .await
133    }
134
135    #[tracing::instrument(skip_all)]
136    pub async fn create_table_like(
137        &self,
138        stmt: CreateTableLike,
139        ctx: QueryContextRef,
140    ) -> Result<TableRef> {
141        let (catalog, schema, table) = table_idents_to_full_name(&stmt.source_name, &ctx)
142            .map_err(BoxedError::new)
143            .context(error::ExternalSnafu)?;
144        let table_ref = self
145            .catalog_manager
146            .table(&catalog, &schema, &table, Some(&ctx))
147            .await
148            .context(CatalogSnafu)?
149            .context(TableNotFoundSnafu { table_name: &table })?;
150        let partitions = self
151            .partition_manager
152            .find_table_partitions(table_ref.table_info().table_id())
153            .await
154            .context(error::FindTablePartitionRuleSnafu { table_name: table })?;
155
156        // CREATE TABLE LIKE also inherits database level options.
157        let schema_options = self
158            .table_metadata_manager
159            .schema_manager()
160            .get(SchemaNameKey {
161                catalog: &catalog,
162                schema: &schema,
163            })
164            .await
165            .context(TableMetadataManagerSnafu)?
166            .map(|v| v.into_inner());
167
168        let quote_style = ctx.quote_style();
169        let mut create_stmt =
170            create_table_stmt(&table_ref.table_info(), schema_options, quote_style)
171                .context(error::ParseQuerySnafu)?;
172        create_stmt.name = stmt.table_name;
173        create_stmt.if_not_exists = false;
174
175        let table_info = table_ref.table_info();
176        let partitions =
177            create_partitions_stmt(&table_info, partitions)?.and_then(|mut partitions| {
178                if !partitions.column_list.is_empty() {
179                    partitions.set_quote(quote_style);
180                    Some(partitions)
181                } else {
182                    None
183                }
184            });
185
186        let create_expr = &mut expr_helper::create_to_expr(&create_stmt, &ctx)?;
187        self.create_table_inner(create_expr, partitions, ctx).await
188    }
189
190    #[tracing::instrument(skip_all)]
191    pub async fn create_external_table(
192        &self,
193        create_expr: CreateExternalTable,
194        ctx: QueryContextRef,
195    ) -> Result<TableRef> {
196        let create_expr = &mut expr_helper::create_external_expr(create_expr, &ctx).await?;
197        self.create_table_inner(create_expr, None, ctx).await
198    }
199
200    #[tracing::instrument(skip_all)]
201    pub async fn create_table_inner(
202        &self,
203        create_table: &mut CreateTableExpr,
204        partitions: Option<Partitions>,
205        query_ctx: QueryContextRef,
206    ) -> Result<TableRef> {
207        ensure!(
208            !is_readonly_schema(&create_table.schema_name),
209            SchemaReadOnlySnafu {
210                name: create_table.schema_name.clone()
211            }
212        );
213
214        if create_table.engine == METRIC_ENGINE_NAME
215            && create_table
216                .table_options
217                .contains_key(LOGICAL_TABLE_METADATA_KEY)
218        {
219            if let Some(partitions) = partitions.as_ref()
220                && !partitions.exprs.is_empty()
221            {
222                self.validate_logical_table_partition_rule(create_table, partitions, &query_ctx)
223                    .await?;
224            }
225            // Create logical tables
226            self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
227                .await?
228                .into_iter()
229                .next()
230                .context(error::UnexpectedSnafu {
231                    violated: "expected to create logical tables",
232                })
233        } else {
234            // Create other normal table
235            self.create_non_logic_table(create_table, partitions, query_ctx)
236                .await
237        }
238    }
239
240    #[tracing::instrument(skip_all)]
241    pub async fn create_non_logic_table(
242        &self,
243        create_table: &mut CreateTableExpr,
244        partitions: Option<Partitions>,
245        query_ctx: QueryContextRef,
246    ) -> Result<TableRef> {
247        let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
248
249        // Check if schema exists
250        let schema = self
251            .table_metadata_manager
252            .schema_manager()
253            .get(SchemaNameKey::new(
254                &create_table.catalog_name,
255                &create_table.schema_name,
256            ))
257            .await
258            .context(TableMetadataManagerSnafu)?;
259        ensure!(
260            schema.is_some(),
261            SchemaNotFoundSnafu {
262                schema_info: &create_table.schema_name,
263            }
264        );
265
266        // if table exists.
267        if let Some(table) = self
268            .catalog_manager
269            .table(
270                &create_table.catalog_name,
271                &create_table.schema_name,
272                &create_table.table_name,
273                Some(&query_ctx),
274            )
275            .await
276            .context(CatalogSnafu)?
277        {
278            return if create_table.create_if_not_exists {
279                Ok(table)
280            } else {
281                TableAlreadyExistsSnafu {
282                    table: format_full_table_name(
283                        &create_table.catalog_name,
284                        &create_table.schema_name,
285                        &create_table.table_name,
286                    ),
287                }
288                .fail()
289            };
290        }
291
292        ensure!(
293            NAME_PATTERN_REG.is_match(&create_table.table_name),
294            InvalidTableNameSnafu {
295                table_name: &create_table.table_name,
296            }
297        );
298
299        let table_name = TableName::new(
300            &create_table.catalog_name,
301            &create_table.schema_name,
302            &create_table.table_name,
303        );
304
305        let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
306        let mut table_info = create_table_info(create_table, partition_cols)?;
307
308        let resp = self
309            .create_table_procedure(
310                create_table.clone(),
311                partitions,
312                table_info.clone(),
313                query_ctx,
314            )
315            .await?;
316
317        let table_id = resp
318            .table_ids
319            .into_iter()
320            .next()
321            .context(error::UnexpectedSnafu {
322                violated: "expected table_id",
323            })?;
324        info!("Successfully created table '{table_name}' with table id {table_id}");
325
326        table_info.ident.table_id = table_id;
327
328        let table_info: Arc<TableInfo> =
329            Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?);
330        create_table.table_id = Some(api::v1::TableId { id: table_id });
331
332        let table = DistTable::table(table_info);
333
334        Ok(table)
335    }
336
337    #[tracing::instrument(skip_all)]
338    pub async fn create_logical_tables(
339        &self,
340        create_table_exprs: &[CreateTableExpr],
341        query_context: QueryContextRef,
342    ) -> Result<Vec<TableRef>> {
343        let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
344        ensure!(
345            !create_table_exprs.is_empty(),
346            EmptyDdlExprSnafu {
347                name: "create logic tables"
348            }
349        );
350
351        // Check table names
352        for create_table in create_table_exprs {
353            ensure!(
354                NAME_PATTERN_REG.is_match(&create_table.table_name),
355                InvalidTableNameSnafu {
356                    table_name: &create_table.table_name,
357                }
358            );
359        }
360
361        let mut raw_tables_info = create_table_exprs
362            .iter()
363            .map(|create| create_table_info(create, vec![]))
364            .collect::<Result<Vec<_>>>()?;
365        let tables_data = create_table_exprs
366            .iter()
367            .cloned()
368            .zip(raw_tables_info.iter().cloned())
369            .collect::<Vec<_>>();
370
371        let resp = self
372            .create_logical_tables_procedure(tables_data, query_context)
373            .await?;
374
375        let table_ids = resp.table_ids;
376        ensure!(
377            table_ids.len() == raw_tables_info.len(),
378            CreateLogicalTablesSnafu {
379                reason: format!(
380                    "The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}",
381                    raw_tables_info.len(),
382                    table_ids.len()
383                )
384            }
385        );
386        info!("Successfully created logical tables: {:?}", table_ids);
387
388        for (i, table_info) in raw_tables_info.iter_mut().enumerate() {
389            table_info.ident.table_id = table_ids[i];
390        }
391        let tables_info = raw_tables_info
392            .into_iter()
393            .map(|x| x.try_into().context(CreateTableInfoSnafu))
394            .collect::<Result<Vec<_>>>()?;
395
396        Ok(tables_info
397            .into_iter()
398            .map(|x| DistTable::table(Arc::new(x)))
399            .collect())
400    }
401
402    async fn validate_logical_table_partition_rule(
403        &self,
404        create_table: &CreateTableExpr,
405        partitions: &Partitions,
406        query_ctx: &QueryContextRef,
407    ) -> Result<()> {
408        let (_, mut logical_partition_exprs) =
409            parse_partitions_for_logical_validation(create_table, partitions, query_ctx)?;
410
411        let physical_table_name = create_table
412            .table_options
413            .get(LOGICAL_TABLE_METADATA_KEY)
414            .with_context(|| CreateLogicalTablesSnafu {
415                reason: format!(
416                    "expect `{LOGICAL_TABLE_METADATA_KEY}` option on creating logical table"
417                ),
418            })?;
419
420        let physical_table = self
421            .catalog_manager
422            .table(
423                &create_table.catalog_name,
424                &create_table.schema_name,
425                physical_table_name,
426                Some(query_ctx),
427            )
428            .await
429            .context(CatalogSnafu)?
430            .context(TableNotFoundSnafu {
431                table_name: physical_table_name.clone(),
432            })?;
433
434        let physical_table_info = physical_table.table_info();
435        let partition_rule = self
436            .partition_manager
437            .find_table_partition_rule(&physical_table_info)
438            .await
439            .context(error::FindTablePartitionRuleSnafu {
440                table_name: physical_table_name.clone(),
441            })?;
442
443        let multi_dim_rule = partition_rule
444            .as_ref()
445            .as_any()
446            .downcast_ref::<MultiDimPartitionRule>()
447            .context(InvalidPartitionRuleSnafu {
448                reason: "physical table partition rule is not range-based",
449            })?;
450
451        // TODO(ruihang): project physical partition exprs to logical partition column
452        let mut physical_partition_exprs = multi_dim_rule.exprs().to_vec();
453        logical_partition_exprs.sort_unstable();
454        physical_partition_exprs.sort_unstable();
455
456        ensure!(
457            physical_partition_exprs == logical_partition_exprs,
458            InvalidPartitionRuleSnafu {
459                reason: format!(
460                    "logical table partition rule must match the corresponding physical table's\n logical table partition exprs:\t\t {:?}\n physical table partition exprs:\t {:?}",
461                    logical_partition_exprs, physical_partition_exprs
462                ),
463            }
464        );
465
466        Ok(())
467    }
468
469    #[cfg(feature = "enterprise")]
470    #[tracing::instrument(skip_all)]
471    pub async fn create_trigger(
472        &self,
473        stmt: CreateTrigger,
474        query_context: QueryContextRef,
475    ) -> Result<Output> {
476        let expr = expr_helper::to_create_trigger_task_expr(stmt, &query_context)?;
477        self.create_trigger_inner(expr, query_context).await
478    }
479
480    #[cfg(feature = "enterprise")]
481    pub async fn create_trigger_inner(
482        &self,
483        expr: PbCreateTriggerExpr,
484        query_context: QueryContextRef,
485    ) -> Result<Output> {
486        self.create_trigger_procedure(expr, query_context).await?;
487        Ok(Output::new_with_affected_rows(0))
488    }
489
490    #[cfg(feature = "enterprise")]
491    async fn create_trigger_procedure(
492        &self,
493        expr: PbCreateTriggerExpr,
494        query_context: QueryContextRef,
495    ) -> Result<SubmitDdlTaskResponse> {
496        let task = CreateTriggerTask::try_from(PbCreateTriggerTask {
497            create_trigger: Some(expr),
498        })
499        .context(error::InvalidExprSnafu)?;
500
501        let request = SubmitDdlTaskRequest {
502            query_context,
503            task: DdlTask::new_create_trigger(task),
504        };
505
506        self.procedure_executor
507            .submit_ddl_task(&ExecutorContext::default(), request)
508            .await
509            .context(error::ExecuteDdlSnafu)
510    }
511
512    #[tracing::instrument(skip_all)]
513    pub async fn create_flow(
514        &self,
515        stmt: CreateFlow,
516        query_context: QueryContextRef,
517    ) -> Result<Output> {
518        // TODO(ruihang): do some verification
519        let expr = expr_helper::to_create_flow_task_expr(stmt, &query_context)?;
520
521        self.create_flow_inner(expr, query_context).await
522    }
523
524    pub async fn create_flow_inner(
525        &self,
526        expr: CreateFlowExpr,
527        query_context: QueryContextRef,
528    ) -> Result<Output> {
529        self.create_flow_procedure(expr, query_context).await?;
530        Ok(Output::new_with_affected_rows(0))
531    }
532
533    async fn create_flow_procedure(
534        &self,
535        expr: CreateFlowExpr,
536        query_context: QueryContextRef,
537    ) -> Result<SubmitDdlTaskResponse> {
538        let flow_type = self
539            .determine_flow_type(&expr, query_context.clone())
540            .await?;
541        info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
542
543        let expr = {
544            let mut expr = expr;
545            expr.flow_options
546                .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
547            expr
548        };
549
550        let task = CreateFlowTask::try_from(PbCreateFlowTask {
551            create_flow: Some(expr),
552        })
553        .context(error::InvalidExprSnafu)?;
554        let request = SubmitDdlTaskRequest {
555            query_context,
556            task: DdlTask::new_create_flow(task),
557        };
558
559        self.procedure_executor
560            .submit_ddl_task(&ExecutorContext::default(), request)
561            .await
562            .context(error::ExecuteDdlSnafu)
563    }
564
565    /// Determine the flow type based on the SQL query
566    ///
567    /// If it contains aggregation or distinct, then it is a batch flow, otherwise it is a streaming flow
568    async fn determine_flow_type(
569        &self,
570        expr: &CreateFlowExpr,
571        query_ctx: QueryContextRef,
572    ) -> Result<FlowType> {
573        // first check if source table's ttl is instant, if it is, force streaming mode
574        for src_table_name in &expr.source_table_names {
575            let table = self
576                .catalog_manager()
577                .table(
578                    &src_table_name.catalog_name,
579                    &src_table_name.schema_name,
580                    &src_table_name.table_name,
581                    Some(&query_ctx),
582                )
583                .await
584                .map_err(BoxedError::new)
585                .context(ExternalSnafu)?
586                .with_context(|| TableNotFoundSnafu {
587                    table_name: format_full_table_name(
588                        &src_table_name.catalog_name,
589                        &src_table_name.schema_name,
590                        &src_table_name.table_name,
591                    ),
592                })?;
593
594            // instant source table can only be handled by streaming mode
595            if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
596                warn!(
597                    "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
598                    format_full_table_name(
599                        &src_table_name.catalog_name,
600                        &src_table_name.schema_name,
601                        &src_table_name.table_name
602                    ),
603                    expr.flow_name
604                );
605                return Ok(FlowType::Streaming);
606            }
607        }
608
609        let engine = &self.query_engine;
610        let stmts = ParserContext::create_with_dialect(
611            &expr.sql,
612            query_ctx.sql_dialect(),
613            ParseOptions::default(),
614        )
615        .map_err(BoxedError::new)
616        .context(ExternalSnafu)?;
617
618        ensure!(
619            stmts.len() == 1,
620            InvalidSqlSnafu {
621                err_msg: format!("Expect only one statement, found {}", stmts.len())
622            }
623        );
624        let stmt = &stmts[0];
625
626        // support tql parse too
627        let plan = match stmt {
628            // prom ql is only supported in batching mode
629            Statement::Tql(_) => return Ok(FlowType::Batching),
630            _ => engine
631                .planner()
632                .plan(&QueryStatement::Sql(stmt.clone()), query_ctx)
633                .await
634                .map_err(BoxedError::new)
635                .context(ExternalSnafu)?,
636        };
637
638        /// Visitor to find aggregation or distinct
639        struct FindAggr {
640            is_aggr: bool,
641        }
642
643        impl TreeNodeVisitor<'_> for FindAggr {
644            type Node = LogicalPlan;
645            fn f_down(
646                &mut self,
647                node: &Self::Node,
648            ) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
649            {
650                match node {
651                    LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
652                        self.is_aggr = true;
653                        return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
654                    }
655                    _ => (),
656                }
657                Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
658            }
659        }
660
661        let mut find_aggr = FindAggr { is_aggr: false };
662
663        plan.visit_with_subqueries(&mut find_aggr)
664            .context(BuildDfLogicalPlanSnafu)?;
665        if find_aggr.is_aggr {
666            Ok(FlowType::Batching)
667        } else {
668            Ok(FlowType::Streaming)
669        }
670    }
671
672    #[tracing::instrument(skip_all)]
673    pub async fn create_view(
674        &self,
675        create_view: CreateView,
676        ctx: QueryContextRef,
677    ) -> Result<TableRef> {
678        // convert input into logical plan
679        let logical_plan = match &*create_view.query {
680            Statement::Query(query) => {
681                self.plan(
682                    &QueryStatement::Sql(Statement::Query(query.clone())),
683                    ctx.clone(),
684                )
685                .await?
686            }
687            Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?,
688            _ => {
689                return InvalidViewStmtSnafu {}.fail();
690            }
691        };
692        // Save the definition for `show create view`.
693        let definition = create_view.to_string();
694
695        // Save the columns in plan, it may changed when the schemas of tables in plan
696        // are altered.
697        let schema: Schema = logical_plan
698            .schema()
699            .clone()
700            .try_into()
701            .context(ConvertSchemaSnafu)?;
702        let plan_columns: Vec<_> = schema
703            .column_schemas()
704            .iter()
705            .map(|c| c.name.clone())
706            .collect();
707
708        let columns: Vec<_> = create_view
709            .columns
710            .iter()
711            .map(|ident| ident.to_string())
712            .collect();
713
714        // Validate columns
715        if !columns.is_empty() {
716            ensure!(
717                columns.len() == plan_columns.len(),
718                error::ViewColumnsMismatchSnafu {
719                    view_name: create_view.name.to_string(),
720                    expected: plan_columns.len(),
721                    actual: columns.len(),
722                }
723            );
724        }
725
726        // Extract the table names from the original plan
727        // and rewrite them as fully qualified names.
728        let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
729            .context(ExtractTableNamesSnafu)?;
730
731        let table_names = table_names.into_iter().map(|t| t.into()).collect();
732
733        // TODO(dennis): we don't save the optimized plan yet,
734        // because there are some serialization issue with our own defined plan node (such as `MergeScanLogicalPlan`).
735        // When the issues are fixed, we can use the `optimized_plan` instead.
736        // let optimized_plan = self.optimize_logical_plan(logical_plan)?.unwrap_df_plan();
737
738        // encode logical plan
739        let encoded_plan = DFLogicalSubstraitConvertor
740            .encode(&plan, DefaultSerializer)
741            .context(SubstraitCodecSnafu)?;
742
743        let expr = expr_helper::to_create_view_expr(
744            create_view,
745            encoded_plan.to_vec(),
746            table_names,
747            columns,
748            plan_columns,
749            definition,
750            ctx.clone(),
751        )?;
752
753        // TODO(dennis): validate the logical plan
754        self.create_view_by_expr(expr, ctx).await
755    }
756
757    pub async fn create_view_by_expr(
758        &self,
759        expr: CreateViewExpr,
760        ctx: QueryContextRef,
761    ) -> Result<TableRef> {
762        ensure! {
763            !(expr.create_if_not_exists & expr.or_replace),
764            InvalidSqlSnafu {
765                err_msg: "syntax error Create Or Replace and If Not Exist cannot be used together",
766            }
767        };
768        let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
769
770        let schema_exists = self
771            .table_metadata_manager
772            .schema_manager()
773            .exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name))
774            .await
775            .context(TableMetadataManagerSnafu)?;
776
777        ensure!(
778            schema_exists,
779            SchemaNotFoundSnafu {
780                schema_info: &expr.schema_name,
781            }
782        );
783
784        // if view or table exists.
785        if let Some(table) = self
786            .catalog_manager
787            .table(
788                &expr.catalog_name,
789                &expr.schema_name,
790                &expr.view_name,
791                Some(&ctx),
792            )
793            .await
794            .context(CatalogSnafu)?
795        {
796            let table_type = table.table_info().table_type;
797
798            match (table_type, expr.create_if_not_exists, expr.or_replace) {
799                (TableType::View, true, false) => {
800                    return Ok(table);
801                }
802                (TableType::View, false, false) => {
803                    return ViewAlreadyExistsSnafu {
804                        name: format_full_table_name(
805                            &expr.catalog_name,
806                            &expr.schema_name,
807                            &expr.view_name,
808                        ),
809                    }
810                    .fail();
811                }
812                (TableType::View, _, true) => {
813                    // Try to replace an exists view
814                }
815                _ => {
816                    return TableAlreadyExistsSnafu {
817                        table: format_full_table_name(
818                            &expr.catalog_name,
819                            &expr.schema_name,
820                            &expr.view_name,
821                        ),
822                    }
823                    .fail();
824                }
825            }
826        }
827
828        ensure!(
829            NAME_PATTERN_REG.is_match(&expr.view_name),
830            InvalidViewNameSnafu {
831                name: expr.view_name.clone(),
832            }
833        );
834
835        let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
836
837        let mut view_info = RawTableInfo {
838            ident: metadata::TableIdent {
839                // The view id of distributed table is assigned by Meta, set "0" here as a placeholder.
840                table_id: 0,
841                version: 0,
842            },
843            name: expr.view_name.clone(),
844            desc: None,
845            catalog_name: expr.catalog_name.clone(),
846            schema_name: expr.schema_name.clone(),
847            // The meta doesn't make sense for views, so using a default one.
848            meta: RawTableMeta::default(),
849            table_type: TableType::View,
850        };
851
852        let request = SubmitDdlTaskRequest {
853            query_context: ctx,
854            task: DdlTask::new_create_view(expr, view_info.clone()),
855        };
856
857        let resp = self
858            .procedure_executor
859            .submit_ddl_task(&ExecutorContext::default(), request)
860            .await
861            .context(error::ExecuteDdlSnafu)?;
862
863        debug!(
864            "Submit creating view '{view_name}' task response: {:?}",
865            resp
866        );
867
868        let view_id = resp
869            .table_ids
870            .into_iter()
871            .next()
872            .context(error::UnexpectedSnafu {
873                violated: "expected table_id",
874            })?;
875        info!("Successfully created view '{view_name}' with view id {view_id}");
876
877        view_info.ident.table_id = view_id;
878
879        let view_info = Arc::new(view_info.try_into().context(CreateTableInfoSnafu)?);
880
881        let table = DistTable::table(view_info);
882
883        // Invalidates local cache ASAP.
884        self.cache_invalidator
885            .invalidate(
886                &Context::default(),
887                &[
888                    CacheIdent::TableId(view_id),
889                    CacheIdent::TableName(view_name.clone()),
890                ],
891            )
892            .await
893            .context(error::InvalidateTableCacheSnafu)?;
894
895        Ok(table)
896    }
897
898    #[tracing::instrument(skip_all)]
899    pub async fn drop_flow(
900        &self,
901        catalog_name: String,
902        flow_name: String,
903        drop_if_exists: bool,
904        query_context: QueryContextRef,
905    ) -> Result<Output> {
906        if let Some(flow) = self
907            .flow_metadata_manager
908            .flow_name_manager()
909            .get(&catalog_name, &flow_name)
910            .await
911            .context(error::TableMetadataManagerSnafu)?
912        {
913            let flow_id = flow.flow_id();
914            let task = DropFlowTask {
915                catalog_name,
916                flow_name,
917                flow_id,
918                drop_if_exists,
919            };
920            self.drop_flow_procedure(task, query_context).await?;
921
922            Ok(Output::new_with_affected_rows(0))
923        } else if drop_if_exists {
924            Ok(Output::new_with_affected_rows(0))
925        } else {
926            FlowNotFoundSnafu {
927                flow_name: format_full_flow_name(&catalog_name, &flow_name),
928            }
929            .fail()
930        }
931    }
932
933    async fn drop_flow_procedure(
934        &self,
935        expr: DropFlowTask,
936        query_context: QueryContextRef,
937    ) -> Result<SubmitDdlTaskResponse> {
938        let request = SubmitDdlTaskRequest {
939            query_context,
940            task: DdlTask::new_drop_flow(expr),
941        };
942
943        self.procedure_executor
944            .submit_ddl_task(&ExecutorContext::default(), request)
945            .await
946            .context(error::ExecuteDdlSnafu)
947    }
948
949    #[cfg(feature = "enterprise")]
950    #[tracing::instrument(skip_all)]
951    pub(super) async fn drop_trigger(
952        &self,
953        catalog_name: String,
954        trigger_name: String,
955        drop_if_exists: bool,
956        query_context: QueryContextRef,
957    ) -> Result<Output> {
958        let task = DropTriggerTask {
959            catalog_name,
960            trigger_name,
961            drop_if_exists,
962        };
963        self.drop_trigger_procedure(task, query_context).await?;
964        Ok(Output::new_with_affected_rows(0))
965    }
966
967    #[cfg(feature = "enterprise")]
968    async fn drop_trigger_procedure(
969        &self,
970        expr: DropTriggerTask,
971        query_context: QueryContextRef,
972    ) -> Result<SubmitDdlTaskResponse> {
973        let request = SubmitDdlTaskRequest {
974            query_context,
975            task: DdlTask::new_drop_trigger(expr),
976        };
977
978        self.procedure_executor
979            .submit_ddl_task(&ExecutorContext::default(), request)
980            .await
981            .context(error::ExecuteDdlSnafu)
982    }
983
984    /// Drop a view
985    #[tracing::instrument(skip_all)]
986    pub(crate) async fn drop_view(
987        &self,
988        catalog: String,
989        schema: String,
990        view: String,
991        drop_if_exists: bool,
992        query_context: QueryContextRef,
993    ) -> Result<Output> {
994        let view_info = if let Some(view) = self
995            .catalog_manager
996            .table(&catalog, &schema, &view, None)
997            .await
998            .context(CatalogSnafu)?
999        {
1000            view.table_info()
1001        } else if drop_if_exists {
1002            // DROP VIEW IF EXISTS meets view not found - ignored
1003            return Ok(Output::new_with_affected_rows(0));
1004        } else {
1005            return TableNotFoundSnafu {
1006                table_name: format_full_table_name(&catalog, &schema, &view),
1007            }
1008            .fail();
1009        };
1010
1011        // Ensure the exists one is view, we can't drop other table types
1012        ensure!(
1013            view_info.table_type == TableType::View,
1014            error::InvalidViewSnafu {
1015                msg: "not a view",
1016                view_name: format_full_table_name(&catalog, &schema, &view),
1017            }
1018        );
1019
1020        let view_id = view_info.table_id();
1021
1022        let task = DropViewTask {
1023            catalog,
1024            schema,
1025            view,
1026            view_id,
1027            drop_if_exists,
1028        };
1029
1030        self.drop_view_procedure(task, query_context).await?;
1031
1032        Ok(Output::new_with_affected_rows(0))
1033    }
1034
1035    /// Submit [DropViewTask] to procedure executor.
1036    async fn drop_view_procedure(
1037        &self,
1038        expr: DropViewTask,
1039        query_context: QueryContextRef,
1040    ) -> Result<SubmitDdlTaskResponse> {
1041        let request = SubmitDdlTaskRequest {
1042            query_context,
1043            task: DdlTask::new_drop_view(expr),
1044        };
1045
1046        self.procedure_executor
1047            .submit_ddl_task(&ExecutorContext::default(), request)
1048            .await
1049            .context(error::ExecuteDdlSnafu)
1050    }
1051
1052    #[tracing::instrument(skip_all)]
1053    pub async fn alter_logical_tables(
1054        &self,
1055        alter_table_exprs: Vec<AlterTableExpr>,
1056        query_context: QueryContextRef,
1057    ) -> Result<Output> {
1058        let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
1059        ensure!(
1060            !alter_table_exprs.is_empty(),
1061            EmptyDdlExprSnafu {
1062                name: "alter logical tables"
1063            }
1064        );
1065
1066        // group by physical table id
1067        let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
1068        for expr in alter_table_exprs {
1069            // Get table_id from catalog_manager
1070            let catalog = if expr.catalog_name.is_empty() {
1071                query_context.current_catalog()
1072            } else {
1073                &expr.catalog_name
1074            };
1075            let schema = if expr.schema_name.is_empty() {
1076                query_context.current_schema()
1077            } else {
1078                expr.schema_name.clone()
1079            };
1080            let table_name = &expr.table_name;
1081            let table = self
1082                .catalog_manager
1083                .table(catalog, &schema, table_name, Some(&query_context))
1084                .await
1085                .context(CatalogSnafu)?
1086                .with_context(|| TableNotFoundSnafu {
1087                    table_name: format_full_table_name(catalog, &schema, table_name),
1088                })?;
1089            let table_id = table.table_info().ident.table_id;
1090            let physical_table_id = self
1091                .table_metadata_manager
1092                .table_route_manager()
1093                .get_physical_table_id(table_id)
1094                .await
1095                .context(TableMetadataManagerSnafu)?;
1096            groups.entry(physical_table_id).or_default().push(expr);
1097        }
1098
1099        // Submit procedure for each physical table
1100        let mut handles = Vec::with_capacity(groups.len());
1101        for (_physical_table_id, exprs) in groups {
1102            let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
1103            handles.push(fut);
1104        }
1105        let _results = futures::future::try_join_all(handles).await?;
1106
1107        Ok(Output::new_with_affected_rows(0))
1108    }
1109
1110    #[tracing::instrument(skip_all)]
1111    pub async fn drop_table(
1112        &self,
1113        table_name: TableName,
1114        drop_if_exists: bool,
1115        query_context: QueryContextRef,
1116    ) -> Result<Output> {
1117        // Reserved for grpc call
1118        self.drop_tables(&[table_name], drop_if_exists, query_context)
1119            .await
1120    }
1121
1122    #[tracing::instrument(skip_all)]
1123    pub async fn drop_tables(
1124        &self,
1125        table_names: &[TableName],
1126        drop_if_exists: bool,
1127        query_context: QueryContextRef,
1128    ) -> Result<Output> {
1129        let mut tables = Vec::with_capacity(table_names.len());
1130        for table_name in table_names {
1131            ensure!(
1132                !is_readonly_schema(&table_name.schema_name),
1133                SchemaReadOnlySnafu {
1134                    name: table_name.schema_name.clone()
1135                }
1136            );
1137
1138            if let Some(table) = self
1139                .catalog_manager
1140                .table(
1141                    &table_name.catalog_name,
1142                    &table_name.schema_name,
1143                    &table_name.table_name,
1144                    Some(&query_context),
1145                )
1146                .await
1147                .context(CatalogSnafu)?
1148            {
1149                tables.push(table.table_info().table_id());
1150            } else if drop_if_exists {
1151                // DROP TABLE IF EXISTS meets table not found - ignored
1152                continue;
1153            } else {
1154                return TableNotFoundSnafu {
1155                    table_name: table_name.to_string(),
1156                }
1157                .fail();
1158            }
1159        }
1160
1161        for (table_name, table_id) in table_names.iter().zip(tables.into_iter()) {
1162            self.drop_table_procedure(table_name, table_id, drop_if_exists, query_context.clone())
1163                .await?;
1164
1165            // Invalidates local cache ASAP.
1166            self.cache_invalidator
1167                .invalidate(
1168                    &Context::default(),
1169                    &[
1170                        CacheIdent::TableId(table_id),
1171                        CacheIdent::TableName(table_name.clone()),
1172                    ],
1173                )
1174                .await
1175                .context(error::InvalidateTableCacheSnafu)?;
1176        }
1177        Ok(Output::new_with_affected_rows(0))
1178    }
1179
1180    #[tracing::instrument(skip_all)]
1181    pub async fn drop_database(
1182        &self,
1183        catalog: String,
1184        schema: String,
1185        drop_if_exists: bool,
1186        query_context: QueryContextRef,
1187    ) -> Result<Output> {
1188        ensure!(
1189            !is_readonly_schema(&schema),
1190            SchemaReadOnlySnafu { name: schema }
1191        );
1192
1193        if self
1194            .catalog_manager
1195            .schema_exists(&catalog, &schema, None)
1196            .await
1197            .context(CatalogSnafu)?
1198        {
1199            if schema == query_context.current_schema() {
1200                SchemaInUseSnafu { name: schema }.fail()
1201            } else {
1202                self.drop_database_procedure(catalog, schema, drop_if_exists, query_context)
1203                    .await?;
1204
1205                Ok(Output::new_with_affected_rows(0))
1206            }
1207        } else if drop_if_exists {
1208            // DROP TABLE IF EXISTS meets table not found - ignored
1209            Ok(Output::new_with_affected_rows(0))
1210        } else {
1211            SchemaNotFoundSnafu {
1212                schema_info: schema,
1213            }
1214            .fail()
1215        }
1216    }
1217
1218    #[tracing::instrument(skip_all)]
1219    pub async fn truncate_table(
1220        &self,
1221        table_name: TableName,
1222        time_ranges: Vec<(Timestamp, Timestamp)>,
1223        query_context: QueryContextRef,
1224    ) -> Result<Output> {
1225        ensure!(
1226            !is_readonly_schema(&table_name.schema_name),
1227            SchemaReadOnlySnafu {
1228                name: table_name.schema_name.clone()
1229            }
1230        );
1231
1232        let table = self
1233            .catalog_manager
1234            .table(
1235                &table_name.catalog_name,
1236                &table_name.schema_name,
1237                &table_name.table_name,
1238                Some(&query_context),
1239            )
1240            .await
1241            .context(CatalogSnafu)?
1242            .with_context(|| TableNotFoundSnafu {
1243                table_name: table_name.to_string(),
1244            })?;
1245        let table_id = table.table_info().table_id();
1246        self.truncate_table_procedure(&table_name, table_id, time_ranges, query_context)
1247            .await?;
1248
1249        Ok(Output::new_with_affected_rows(0))
1250    }
1251
1252    #[tracing::instrument(skip_all)]
1253    pub async fn alter_table(
1254        &self,
1255        alter_table: AlterTable,
1256        query_context: QueryContextRef,
1257    ) -> Result<Output> {
1258        if matches!(
1259            alter_table.alter_operation(),
1260            AlterTableOperation::Repartition { .. }
1261        ) {
1262            let _request = expr_helper::to_repartition_request(alter_table, &query_context)?;
1263            return NotSupportedSnafu {
1264                feat: "ALTER TABLE REPARTITION",
1265            }
1266            .fail();
1267        }
1268
1269        let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
1270        self.alter_table_inner(expr, query_context).await
1271    }
1272
1273    #[tracing::instrument(skip_all)]
1274    pub async fn alter_table_inner(
1275        &self,
1276        expr: AlterTableExpr,
1277        query_context: QueryContextRef,
1278    ) -> Result<Output> {
1279        ensure!(
1280            !is_readonly_schema(&expr.schema_name),
1281            SchemaReadOnlySnafu {
1282                name: expr.schema_name.clone()
1283            }
1284        );
1285
1286        let catalog_name = if expr.catalog_name.is_empty() {
1287            DEFAULT_CATALOG_NAME.to_string()
1288        } else {
1289            expr.catalog_name.clone()
1290        };
1291
1292        let schema_name = if expr.schema_name.is_empty() {
1293            DEFAULT_SCHEMA_NAME.to_string()
1294        } else {
1295            expr.schema_name.clone()
1296        };
1297
1298        let table_name = expr.table_name.clone();
1299
1300        let table = self
1301            .catalog_manager
1302            .table(
1303                &catalog_name,
1304                &schema_name,
1305                &table_name,
1306                Some(&query_context),
1307            )
1308            .await
1309            .context(CatalogSnafu)?
1310            .with_context(|| TableNotFoundSnafu {
1311                table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
1312            })?;
1313
1314        let table_id = table.table_info().ident.table_id;
1315        let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?;
1316        if !need_alter {
1317            return Ok(Output::new_with_affected_rows(0));
1318        }
1319        info!(
1320            "Table info before alter is {:?}, expr: {:?}",
1321            table.table_info(),
1322            expr
1323        );
1324
1325        let physical_table_id = self
1326            .table_metadata_manager
1327            .table_route_manager()
1328            .get_physical_table_id(table_id)
1329            .await
1330            .context(TableMetadataManagerSnafu)?;
1331
1332        let (req, invalidate_keys) = if physical_table_id == table_id {
1333            // This is physical table
1334            let req = SubmitDdlTaskRequest {
1335                query_context,
1336                task: DdlTask::new_alter_table(expr),
1337            };
1338
1339            let invalidate_keys = vec![
1340                CacheIdent::TableId(table_id),
1341                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1342            ];
1343
1344            (req, invalidate_keys)
1345        } else {
1346            // This is logical table
1347            let req = SubmitDdlTaskRequest {
1348                query_context,
1349                task: DdlTask::new_alter_logical_tables(vec![expr]),
1350            };
1351
1352            let mut invalidate_keys = vec![
1353                CacheIdent::TableId(physical_table_id),
1354                CacheIdent::TableId(table_id),
1355                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1356            ];
1357
1358            let physical_table = self
1359                .table_metadata_manager
1360                .table_info_manager()
1361                .get(physical_table_id)
1362                .await
1363                .context(TableMetadataManagerSnafu)?
1364                .map(|x| x.into_inner());
1365            if let Some(physical_table) = physical_table {
1366                let physical_table_name = TableName::new(
1367                    physical_table.table_info.catalog_name,
1368                    physical_table.table_info.schema_name,
1369                    physical_table.table_info.name,
1370                );
1371                invalidate_keys.push(CacheIdent::TableName(physical_table_name));
1372            }
1373
1374            (req, invalidate_keys)
1375        };
1376
1377        self.procedure_executor
1378            .submit_ddl_task(&ExecutorContext::default(), req)
1379            .await
1380            .context(error::ExecuteDdlSnafu)?;
1381
1382        // Invalidates local cache ASAP.
1383        self.cache_invalidator
1384            .invalidate(&Context::default(), &invalidate_keys)
1385            .await
1386            .context(error::InvalidateTableCacheSnafu)?;
1387
1388        Ok(Output::new_with_affected_rows(0))
1389    }
1390
1391    #[cfg(feature = "enterprise")]
1392    #[tracing::instrument(skip_all)]
1393    pub async fn alter_trigger(
1394        &self,
1395        _alter_expr: AlterTrigger,
1396        _query_context: QueryContextRef,
1397    ) -> Result<Output> {
1398        crate::error::NotSupportedSnafu {
1399            feat: "alter trigger",
1400        }
1401        .fail()
1402    }
1403
1404    #[tracing::instrument(skip_all)]
1405    pub async fn alter_database(
1406        &self,
1407        alter_expr: AlterDatabase,
1408        query_context: QueryContextRef,
1409    ) -> Result<Output> {
1410        let alter_expr = expr_helper::to_alter_database_expr(alter_expr, &query_context)?;
1411        self.alter_database_inner(alter_expr, query_context).await
1412    }
1413
1414    #[tracing::instrument(skip_all)]
1415    pub async fn alter_database_inner(
1416        &self,
1417        alter_expr: AlterDatabaseExpr,
1418        query_context: QueryContextRef,
1419    ) -> Result<Output> {
1420        ensure!(
1421            !is_readonly_schema(&alter_expr.schema_name),
1422            SchemaReadOnlySnafu {
1423                name: query_context.current_schema().clone()
1424            }
1425        );
1426
1427        let exists = self
1428            .catalog_manager
1429            .schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
1430            .await
1431            .context(CatalogSnafu)?;
1432        ensure!(
1433            exists,
1434            SchemaNotFoundSnafu {
1435                schema_info: alter_expr.schema_name,
1436            }
1437        );
1438
1439        let cache_ident = [CacheIdent::SchemaName(SchemaName {
1440            catalog_name: alter_expr.catalog_name.clone(),
1441            schema_name: alter_expr.schema_name.clone(),
1442        })];
1443
1444        self.alter_database_procedure(alter_expr, query_context)
1445            .await?;
1446
1447        // Invalidates local cache ASAP.
1448        self.cache_invalidator
1449            .invalidate(&Context::default(), &cache_ident)
1450            .await
1451            .context(error::InvalidateTableCacheSnafu)?;
1452
1453        Ok(Output::new_with_affected_rows(0))
1454    }
1455
1456    async fn create_table_procedure(
1457        &self,
1458        create_table: CreateTableExpr,
1459        partitions: Vec<PartitionExpr>,
1460        table_info: RawTableInfo,
1461        query_context: QueryContextRef,
1462    ) -> Result<SubmitDdlTaskResponse> {
1463        let partitions = partitions
1464            .into_iter()
1465            .map(|expr| expr.as_pb_partition().context(PartitionExprToPbSnafu))
1466            .collect::<Result<Vec<_>>>()?;
1467
1468        let request = SubmitDdlTaskRequest {
1469            query_context,
1470            task: DdlTask::new_create_table(create_table, partitions, table_info),
1471        };
1472
1473        self.procedure_executor
1474            .submit_ddl_task(&ExecutorContext::default(), request)
1475            .await
1476            .context(error::ExecuteDdlSnafu)
1477    }
1478
1479    async fn create_logical_tables_procedure(
1480        &self,
1481        tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
1482        query_context: QueryContextRef,
1483    ) -> Result<SubmitDdlTaskResponse> {
1484        let request = SubmitDdlTaskRequest {
1485            query_context,
1486            task: DdlTask::new_create_logical_tables(tables_data),
1487        };
1488
1489        self.procedure_executor
1490            .submit_ddl_task(&ExecutorContext::default(), request)
1491            .await
1492            .context(error::ExecuteDdlSnafu)
1493    }
1494
1495    async fn alter_logical_tables_procedure(
1496        &self,
1497        tables_data: Vec<AlterTableExpr>,
1498        query_context: QueryContextRef,
1499    ) -> Result<SubmitDdlTaskResponse> {
1500        let request = SubmitDdlTaskRequest {
1501            query_context,
1502            task: DdlTask::new_alter_logical_tables(tables_data),
1503        };
1504
1505        self.procedure_executor
1506            .submit_ddl_task(&ExecutorContext::default(), request)
1507            .await
1508            .context(error::ExecuteDdlSnafu)
1509    }
1510
1511    async fn drop_table_procedure(
1512        &self,
1513        table_name: &TableName,
1514        table_id: TableId,
1515        drop_if_exists: bool,
1516        query_context: QueryContextRef,
1517    ) -> Result<SubmitDdlTaskResponse> {
1518        let request = SubmitDdlTaskRequest {
1519            query_context,
1520            task: DdlTask::new_drop_table(
1521                table_name.catalog_name.clone(),
1522                table_name.schema_name.clone(),
1523                table_name.table_name.clone(),
1524                table_id,
1525                drop_if_exists,
1526            ),
1527        };
1528
1529        self.procedure_executor
1530            .submit_ddl_task(&ExecutorContext::default(), request)
1531            .await
1532            .context(error::ExecuteDdlSnafu)
1533    }
1534
1535    async fn drop_database_procedure(
1536        &self,
1537        catalog: String,
1538        schema: String,
1539        drop_if_exists: bool,
1540        query_context: QueryContextRef,
1541    ) -> Result<SubmitDdlTaskResponse> {
1542        let request = SubmitDdlTaskRequest {
1543            query_context,
1544            task: DdlTask::new_drop_database(catalog, schema, drop_if_exists),
1545        };
1546
1547        self.procedure_executor
1548            .submit_ddl_task(&ExecutorContext::default(), request)
1549            .await
1550            .context(error::ExecuteDdlSnafu)
1551    }
1552
1553    async fn alter_database_procedure(
1554        &self,
1555        alter_expr: AlterDatabaseExpr,
1556        query_context: QueryContextRef,
1557    ) -> Result<SubmitDdlTaskResponse> {
1558        let request = SubmitDdlTaskRequest {
1559            query_context,
1560            task: DdlTask::new_alter_database(alter_expr),
1561        };
1562
1563        self.procedure_executor
1564            .submit_ddl_task(&ExecutorContext::default(), request)
1565            .await
1566            .context(error::ExecuteDdlSnafu)
1567    }
1568
1569    async fn truncate_table_procedure(
1570        &self,
1571        table_name: &TableName,
1572        table_id: TableId,
1573        time_ranges: Vec<(Timestamp, Timestamp)>,
1574        query_context: QueryContextRef,
1575    ) -> Result<SubmitDdlTaskResponse> {
1576        let request = SubmitDdlTaskRequest {
1577            query_context,
1578            task: DdlTask::new_truncate_table(
1579                table_name.catalog_name.clone(),
1580                table_name.schema_name.clone(),
1581                table_name.table_name.clone(),
1582                table_id,
1583                time_ranges,
1584            ),
1585        };
1586
1587        self.procedure_executor
1588            .submit_ddl_task(&ExecutorContext::default(), request)
1589            .await
1590            .context(error::ExecuteDdlSnafu)
1591    }
1592
1593    #[tracing::instrument(skip_all)]
1594    pub async fn create_database(
1595        &self,
1596        database: &str,
1597        create_if_not_exists: bool,
1598        options: HashMap<String, String>,
1599        query_context: QueryContextRef,
1600    ) -> Result<Output> {
1601        let catalog = query_context.current_catalog();
1602        ensure!(
1603            NAME_PATTERN_REG.is_match(catalog),
1604            error::UnexpectedSnafu {
1605                violated: format!("Invalid catalog name: {}", catalog)
1606            }
1607        );
1608
1609        ensure!(
1610            NAME_PATTERN_REG.is_match(database),
1611            error::UnexpectedSnafu {
1612                violated: format!("Invalid database name: {}", database)
1613            }
1614        );
1615
1616        if !self
1617            .catalog_manager
1618            .schema_exists(catalog, database, None)
1619            .await
1620            .context(CatalogSnafu)?
1621            && !self.catalog_manager.is_reserved_schema_name(database)
1622        {
1623            self.create_database_procedure(
1624                catalog.to_string(),
1625                database.to_string(),
1626                create_if_not_exists,
1627                options,
1628                query_context,
1629            )
1630            .await?;
1631
1632            Ok(Output::new_with_affected_rows(1))
1633        } else if create_if_not_exists {
1634            Ok(Output::new_with_affected_rows(1))
1635        } else {
1636            error::SchemaExistsSnafu { name: database }.fail()
1637        }
1638    }
1639
1640    async fn create_database_procedure(
1641        &self,
1642        catalog: String,
1643        database: String,
1644        create_if_not_exists: bool,
1645        options: HashMap<String, String>,
1646        query_context: QueryContextRef,
1647    ) -> Result<SubmitDdlTaskResponse> {
1648        let request = SubmitDdlTaskRequest {
1649            query_context,
1650            task: DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
1651        };
1652
1653        self.procedure_executor
1654            .submit_ddl_task(&ExecutorContext::default(), request)
1655            .await
1656            .context(error::ExecuteDdlSnafu)
1657    }
1658}
1659
1660/// Parse partition statement [Partitions] into [PartitionExpr] and partition columns.
1661pub fn parse_partitions(
1662    create_table: &CreateTableExpr,
1663    partitions: Option<Partitions>,
1664    query_ctx: &QueryContextRef,
1665) -> Result<(Vec<PartitionExpr>, Vec<String>)> {
1666    // If partitions are not defined by user, use the timestamp column (which has to be existed) as
1667    // the partition column, and create only one partition.
1668    let partition_columns = find_partition_columns(&partitions)?;
1669    let partition_exprs =
1670        find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
1671
1672    // Validates partition
1673    let exprs = partition_exprs.clone();
1674    MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true)
1675        .context(InvalidPartitionSnafu)?;
1676
1677    Ok((partition_exprs, partition_columns))
1678}
1679
1680fn parse_partitions_for_logical_validation(
1681    create_table: &CreateTableExpr,
1682    partitions: &Partitions,
1683    query_ctx: &QueryContextRef,
1684) -> Result<(Vec<String>, Vec<PartitionExpr>)> {
1685    let partition_columns = partitions
1686        .column_list
1687        .iter()
1688        .map(|ident| ident.value.clone())
1689        .collect::<Vec<_>>();
1690
1691    let column_name_and_type = partition_columns
1692        .iter()
1693        .map(|pc| {
1694            let column = create_table
1695                .column_defs
1696                .iter()
1697                .find(|c| &c.name == pc)
1698                .context(ColumnNotFoundSnafu { msg: pc.clone() })?;
1699            let column_name = &column.name;
1700            let data_type = ConcreteDataType::from(
1701                ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
1702                    .context(ColumnDataTypeSnafu)?,
1703            );
1704            Ok((column_name, data_type))
1705        })
1706        .collect::<Result<HashMap<_, _>>>()?;
1707
1708    let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
1709    for expr in &partitions.exprs {
1710        let partition_expr = convert_one_expr(expr, &column_name_and_type, &query_ctx.timezone())?;
1711        partition_exprs.push(partition_expr);
1712    }
1713
1714    MultiDimPartitionRule::try_new(
1715        partition_columns.clone(),
1716        vec![],
1717        partition_exprs.clone(),
1718        true,
1719    )
1720    .context(InvalidPartitionSnafu)?;
1721
1722    Ok((partition_columns, partition_exprs))
1723}
1724
1725/// Verifies an alter and returns whether it is necessary to perform the alter.
1726///
1727/// # Returns
1728///
1729/// Returns true if the alter need to be porformed; otherwise, it returns false.
1730pub fn verify_alter(
1731    table_id: TableId,
1732    table_info: Arc<TableInfo>,
1733    expr: AlterTableExpr,
1734) -> Result<bool> {
1735    let request: AlterTableRequest =
1736        common_grpc_expr::alter_expr_to_request(table_id, expr, Some(&table_info.meta))
1737            .context(AlterExprToRequestSnafu)?;
1738
1739    let AlterTableRequest {
1740        table_name,
1741        alter_kind,
1742        ..
1743    } = &request;
1744
1745    if let AlterKind::RenameTable { new_table_name } = alter_kind {
1746        ensure!(
1747            NAME_PATTERN_REG.is_match(new_table_name),
1748            error::UnexpectedSnafu {
1749                violated: format!("Invalid table name: {}", new_table_name)
1750            }
1751        );
1752    } else if let AlterKind::AddColumns { columns } = alter_kind {
1753        // If all the columns are marked as add_if_not_exists and they already exist in the table,
1754        // there is no need to perform the alter.
1755        let column_names: HashSet<_> = table_info
1756            .meta
1757            .schema
1758            .column_schemas()
1759            .iter()
1760            .map(|schema| &schema.name)
1761            .collect();
1762        if columns.iter().all(|column| {
1763            column_names.contains(&column.column_schema.name) && column.add_if_not_exists
1764        }) {
1765            return Ok(false);
1766        }
1767    }
1768
1769    let _ = table_info
1770        .meta
1771        .builder_with_alter_kind(table_name, &request.alter_kind)
1772        .context(error::TableSnafu)?
1773        .build()
1774        .context(error::BuildTableMetaSnafu { table_name })?;
1775
1776    Ok(true)
1777}
1778
1779pub fn create_table_info(
1780    create_table: &CreateTableExpr,
1781    partition_columns: Vec<String>,
1782) -> Result<RawTableInfo> {
1783    let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
1784    let mut column_name_to_index_map = HashMap::new();
1785
1786    for (idx, column) in create_table.column_defs.iter().enumerate() {
1787        let schema =
1788            column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
1789                column: &column.name,
1790            })?;
1791        let schema = schema.with_time_index(column.name == create_table.time_index);
1792
1793        column_schemas.push(schema);
1794        let _ = column_name_to_index_map.insert(column.name.clone(), idx);
1795    }
1796
1797    let timestamp_index = column_name_to_index_map
1798        .get(&create_table.time_index)
1799        .cloned();
1800
1801    let raw_schema = RawSchema {
1802        column_schemas: column_schemas.clone(),
1803        timestamp_index,
1804        version: 0,
1805    };
1806
1807    let primary_key_indices = create_table
1808        .primary_keys
1809        .iter()
1810        .map(|name| {
1811            column_name_to_index_map
1812                .get(name)
1813                .cloned()
1814                .context(ColumnNotFoundSnafu { msg: name })
1815        })
1816        .collect::<Result<Vec<_>>>()?;
1817
1818    let partition_key_indices = partition_columns
1819        .into_iter()
1820        .map(|col_name| {
1821            column_name_to_index_map
1822                .get(&col_name)
1823                .cloned()
1824                .context(ColumnNotFoundSnafu { msg: col_name })
1825        })
1826        .collect::<Result<Vec<_>>>()?;
1827
1828    let table_options = TableOptions::try_from_iter(&create_table.table_options)
1829        .context(UnrecognizedTableOptionSnafu)?;
1830
1831    let meta = RawTableMeta {
1832        schema: raw_schema,
1833        primary_key_indices,
1834        value_indices: vec![],
1835        engine: create_table.engine.clone(),
1836        next_column_id: column_schemas.len() as u32,
1837        region_numbers: vec![],
1838        options: table_options,
1839        created_on: Utc::now(),
1840        updated_on: Utc::now(),
1841        partition_key_indices,
1842        column_ids: vec![],
1843    };
1844
1845    let desc = if create_table.desc.is_empty() {
1846        create_table.table_options.get(COMMENT_KEY).cloned()
1847    } else {
1848        Some(create_table.desc.clone())
1849    };
1850
1851    let table_info = RawTableInfo {
1852        ident: metadata::TableIdent {
1853            // The table id of distributed table is assigned by Meta, set "0" here as a placeholder.
1854            table_id: 0,
1855            version: 0,
1856        },
1857        name: create_table.table_name.clone(),
1858        desc,
1859        catalog_name: create_table.catalog_name.clone(),
1860        schema_name: create_table.schema_name.clone(),
1861        meta,
1862        table_type: TableType::Base,
1863    };
1864    Ok(table_info)
1865}
1866
1867fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
1868    let columns = if let Some(partitions) = partitions {
1869        partitions
1870            .column_list
1871            .iter()
1872            .map(|x| x.value.clone())
1873            .collect::<Vec<_>>()
1874    } else {
1875        vec![]
1876    };
1877    Ok(columns)
1878}
1879
1880/// Parse [Partitions] into a group of partition entries.
1881///
1882/// Returns a list of [PartitionExpr], each of which defines a partition.
1883fn find_partition_entries(
1884    create_table: &CreateTableExpr,
1885    partitions: &Option<Partitions>,
1886    partition_columns: &[String],
1887    query_ctx: &QueryContextRef,
1888) -> Result<Vec<PartitionExpr>> {
1889    let Some(partitions) = partitions else {
1890        return Ok(vec![]);
1891    };
1892
1893    // extract concrete data type of partition columns
1894    let column_name_and_type = partition_columns
1895        .iter()
1896        .map(|pc| {
1897            let column = create_table
1898                .column_defs
1899                .iter()
1900                .find(|c| &c.name == pc)
1901                // unwrap is safe here because we have checked that partition columns are defined
1902                .unwrap();
1903            let column_name = &column.name;
1904            let data_type = ConcreteDataType::from(
1905                ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
1906                    .context(ColumnDataTypeSnafu)?,
1907            );
1908            Ok((column_name, data_type))
1909        })
1910        .collect::<Result<HashMap<_, _>>>()?;
1911
1912    // Transform parser expr to partition expr
1913    let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
1914    for partition in &partitions.exprs {
1915        let partition_expr =
1916            convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
1917        partition_exprs.push(partition_expr);
1918    }
1919
1920    Ok(partition_exprs)
1921}
1922
1923fn convert_one_expr(
1924    expr: &Expr,
1925    column_name_and_type: &HashMap<&String, ConcreteDataType>,
1926    timezone: &Timezone,
1927) -> Result<PartitionExpr> {
1928    let Expr::BinaryOp { left, op, right } = expr else {
1929        return InvalidPartitionRuleSnafu {
1930            reason: "partition rule must be a binary expression",
1931        }
1932        .fail();
1933    };
1934
1935    let op =
1936        RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
1937            reason: format!("unsupported operator in partition expr {op}"),
1938        })?;
1939
1940    // convert leaf node.
1941    let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
1942        // col, val
1943        (Expr::Identifier(ident), Expr::Value(value)) => {
1944            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1945            let value = convert_value(&value.value, data_type, timezone, None)?;
1946            (Operand::Column(column_name), op, Operand::Value(value))
1947        }
1948        (Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
1949            if let Expr::Value(v) = &**expr =>
1950        {
1951            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1952            let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
1953            (Operand::Column(column_name), op, Operand::Value(value))
1954        }
1955        // val, col
1956        (Expr::Value(value), Expr::Identifier(ident)) => {
1957            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1958            let value = convert_value(&value.value, data_type, timezone, None)?;
1959            (Operand::Value(value), op, Operand::Column(column_name))
1960        }
1961        (Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
1962            if let Expr::Value(v) = &**expr =>
1963        {
1964            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1965            let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
1966            (Operand::Value(value), op, Operand::Column(column_name))
1967        }
1968        (Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
1969            // sub-expr must against another sub-expr
1970            let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
1971            let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
1972            (Operand::Expr(lhs), op, Operand::Expr(rhs))
1973        }
1974        _ => {
1975            return InvalidPartitionRuleSnafu {
1976                reason: format!("invalid partition expr {expr}"),
1977            }
1978            .fail();
1979        }
1980    };
1981
1982    Ok(PartitionExpr::new(lhs, op, rhs))
1983}
1984
1985fn convert_identifier(
1986    ident: &Ident,
1987    column_name_and_type: &HashMap<&String, ConcreteDataType>,
1988) -> Result<(String, ConcreteDataType)> {
1989    let column_name = ident.value.clone();
1990    let data_type = column_name_and_type
1991        .get(&column_name)
1992        .cloned()
1993        .with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
1994    Ok((column_name, data_type))
1995}
1996
1997fn convert_value(
1998    value: &ParserValue,
1999    data_type: ConcreteDataType,
2000    timezone: &Timezone,
2001    unary_op: Option<UnaryOperator>,
2002) -> Result<Value> {
2003    sql_value_to_value(
2004        "<NONAME>",
2005        &data_type,
2006        value,
2007        Some(timezone),
2008        unary_op,
2009        false,
2010    )
2011    .context(error::SqlCommonSnafu)
2012}
2013
2014#[cfg(test)]
2015mod test {
2016    use session::context::{QueryContext, QueryContextBuilder};
2017    use sql::dialect::GreptimeDbDialect;
2018    use sql::parser::{ParseOptions, ParserContext};
2019    use sql::statements::statement::Statement;
2020
2021    use super::*;
2022    use crate::expr_helper;
2023
2024    #[test]
2025    fn test_name_is_match() {
2026        assert!(!NAME_PATTERN_REG.is_match("/adaf"));
2027        assert!(!NAME_PATTERN_REG.is_match("🈲"));
2028        assert!(NAME_PATTERN_REG.is_match("hello"));
2029        assert!(NAME_PATTERN_REG.is_match("test@"));
2030        assert!(!NAME_PATTERN_REG.is_match("@test"));
2031        assert!(NAME_PATTERN_REG.is_match("test#"));
2032        assert!(!NAME_PATTERN_REG.is_match("#test"));
2033        assert!(!NAME_PATTERN_REG.is_match("@"));
2034        assert!(!NAME_PATTERN_REG.is_match("#"));
2035    }
2036
2037    #[tokio::test]
2038    #[ignore = "TODO(ruihang): WIP new partition rule"]
2039    async fn test_parse_partitions() {
2040        common_telemetry::init_default_ut_logging();
2041        let cases = [
2042            (
2043                r"
2044CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
2045PARTITION ON COLUMNS (b) (
2046  b < 'hz',
2047  b >= 'hz' AND b < 'sh',
2048  b >= 'sh'
2049)
2050ENGINE=mito",
2051                r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
2052            ),
2053            (
2054                r"
2055CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
2056PARTITION BY RANGE COLUMNS (b, a) (
2057  PARTITION r0 VALUES LESS THAN ('hz', 10),
2058  b < 'hz' AND a < 10,
2059  b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20,
2060  b >= 'sh' AND a >= 20
2061)
2062ENGINE=mito",
2063                r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
2064            ),
2065        ];
2066        let ctx = QueryContextBuilder::default().build().into();
2067        for (sql, expected) in cases {
2068            let result = ParserContext::create_with_dialect(
2069                sql,
2070                &GreptimeDbDialect {},
2071                ParseOptions::default(),
2072            )
2073            .unwrap();
2074            match &result[0] {
2075                Statement::CreateTable(c) => {
2076                    let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
2077                    let (partitions, _) =
2078                        parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
2079                    let json = serde_json::to_string(&partitions).unwrap();
2080                    assert_eq!(json, expected);
2081                }
2082                _ => unreachable!(),
2083            }
2084        }
2085    }
2086}