operator/statement/
ddl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use api::helper::ColumnDataTypeWrapper;
19use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
20use api::v1::{
21    column_def, AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
22};
23#[cfg(feature = "enterprise")]
24use api::v1::{
25    meta::CreateTriggerTask as PbCreateTriggerTask, CreateTriggerExpr as PbCreateTriggerExpr,
26};
27use catalog::CatalogManagerRef;
28use chrono::Utc;
29use common_catalog::consts::{is_readonly_schema, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
30use common_catalog::{format_full_flow_name, format_full_table_name};
31use common_error::ext::BoxedError;
32use common_meta::cache_invalidator::Context;
33use common_meta::ddl::create_flow::FlowType;
34use common_meta::ddl::ExecutorContext;
35use common_meta::instruction::CacheIdent;
36use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
37use common_meta::key::NAME_PATTERN;
38#[cfg(feature = "enterprise")]
39use common_meta::rpc::ddl::trigger::CreateTriggerTask;
40#[cfg(feature = "enterprise")]
41use common_meta::rpc::ddl::trigger::DropTriggerTask;
42use common_meta::rpc::ddl::{
43    CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
44    SubmitDdlTaskResponse,
45};
46use common_meta::rpc::router::{Partition, Partition as MetaPartition};
47use common_query::Output;
48use common_sql::convert::sql_value_to_value;
49use common_telemetry::{debug, info, tracing, warn};
50use common_time::Timezone;
51use datafusion_common::tree_node::TreeNodeVisitor;
52use datafusion_expr::LogicalPlan;
53use datatypes::prelude::ConcreteDataType;
54use datatypes::schema::{RawSchema, Schema};
55use datatypes::value::Value;
56use lazy_static::lazy_static;
57use partition::expr::{Operand, PartitionExpr, RestrictedOp};
58use partition::multi_dim::MultiDimPartitionRule;
59use partition::partition::{PartitionBound, PartitionDef};
60use query::parser::QueryStatement;
61use query::plan::extract_and_rewrite_full_table_names;
62use query::query_engine::DefaultSerializer;
63use query::sql::create_table_stmt;
64use regex::Regex;
65use session::context::QueryContextRef;
66use session::table_name::table_idents_to_full_name;
67use snafu::{ensure, OptionExt, ResultExt};
68use sql::parser::{ParseOptions, ParserContext};
69use sql::statements::alter::{AlterDatabase, AlterTable};
70#[cfg(feature = "enterprise")]
71use sql::statements::create::trigger::CreateTrigger;
72use sql::statements::create::{
73    CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
74};
75use sql::statements::statement::Statement;
76use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
77use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
78use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
79use table::dist_table::DistTable;
80use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
81use table::requests::{AlterKind, AlterTableRequest, TableOptions, COMMENT_KEY};
82use table::table_name::TableName;
83use table::TableRef;
84
85use crate::error::{
86    self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
87    ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
88    DeserializePartitionSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
89    FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
90    InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, Result, SchemaInUseSnafu,
91    SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu,
92    TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
93    ViewAlreadyExistsSnafu,
94};
95use crate::expr_helper;
96use crate::statement::show::create_partitions_stmt;
97use crate::statement::StatementExecutor;
98
99lazy_static! {
100    pub static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap();
101}
102
103impl StatementExecutor {
104    pub fn catalog_manager(&self) -> CatalogManagerRef {
105        self.catalog_manager.clone()
106    }
107
108    #[tracing::instrument(skip_all)]
109    pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
110        let create_expr = &mut expr_helper::create_to_expr(&stmt, &ctx)?;
111        self.create_table_inner(create_expr, stmt.partitions, ctx)
112            .await
113    }
114
115    #[tracing::instrument(skip_all)]
116    pub async fn create_table_like(
117        &self,
118        stmt: CreateTableLike,
119        ctx: QueryContextRef,
120    ) -> Result<TableRef> {
121        let (catalog, schema, table) = table_idents_to_full_name(&stmt.source_name, &ctx)
122            .map_err(BoxedError::new)
123            .context(error::ExternalSnafu)?;
124        let table_ref = self
125            .catalog_manager
126            .table(&catalog, &schema, &table, Some(&ctx))
127            .await
128            .context(CatalogSnafu)?
129            .context(TableNotFoundSnafu { table_name: &table })?;
130        let partitions = self
131            .partition_manager
132            .find_table_partitions(table_ref.table_info().table_id())
133            .await
134            .context(error::FindTablePartitionRuleSnafu { table_name: table })?;
135
136        // CREATE TABLE LIKE also inherits database level options.
137        let schema_options = self
138            .table_metadata_manager
139            .schema_manager()
140            .get(SchemaNameKey {
141                catalog: &catalog,
142                schema: &schema,
143            })
144            .await
145            .context(TableMetadataManagerSnafu)?
146            .map(|v| v.into_inner());
147
148        let quote_style = ctx.quote_style();
149        let mut create_stmt =
150            create_table_stmt(&table_ref.table_info(), schema_options, quote_style)
151                .context(error::ParseQuerySnafu)?;
152        create_stmt.name = stmt.table_name;
153        create_stmt.if_not_exists = false;
154
155        let partitions = create_partitions_stmt(partitions)?.and_then(|mut partitions| {
156            if !partitions.column_list.is_empty() {
157                partitions.set_quote(quote_style);
158                Some(partitions)
159            } else {
160                None
161            }
162        });
163
164        let create_expr = &mut expr_helper::create_to_expr(&create_stmt, &ctx)?;
165        self.create_table_inner(create_expr, partitions, ctx).await
166    }
167
168    #[tracing::instrument(skip_all)]
169    pub async fn create_external_table(
170        &self,
171        create_expr: CreateExternalTable,
172        ctx: QueryContextRef,
173    ) -> Result<TableRef> {
174        let create_expr = &mut expr_helper::create_external_expr(create_expr, &ctx).await?;
175        self.create_table_inner(create_expr, None, ctx).await
176    }
177
178    #[tracing::instrument(skip_all)]
179    pub async fn create_table_inner(
180        &self,
181        create_table: &mut CreateTableExpr,
182        partitions: Option<Partitions>,
183        query_ctx: QueryContextRef,
184    ) -> Result<TableRef> {
185        ensure!(
186            !is_readonly_schema(&create_table.schema_name),
187            SchemaReadOnlySnafu {
188                name: create_table.schema_name.clone()
189            }
190        );
191
192        if create_table.engine == METRIC_ENGINE_NAME
193            && create_table
194                .table_options
195                .contains_key(LOGICAL_TABLE_METADATA_KEY)
196        {
197            // Create logical tables
198            ensure!(
199                partitions.is_none(),
200                InvalidPartitionRuleSnafu {
201                    reason: "logical table in metric engine should not have partition rule, it will be inherited from physical table",
202                }
203            );
204            self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
205                .await?
206                .into_iter()
207                .next()
208                .context(error::UnexpectedSnafu {
209                    violated: "expected to create logical tables",
210                })
211        } else {
212            // Create other normal table
213            self.create_non_logic_table(create_table, partitions, query_ctx)
214                .await
215        }
216    }
217
218    #[tracing::instrument(skip_all)]
219    pub async fn create_non_logic_table(
220        &self,
221        create_table: &mut CreateTableExpr,
222        partitions: Option<Partitions>,
223        query_ctx: QueryContextRef,
224    ) -> Result<TableRef> {
225        let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
226
227        // Check if schema exists
228        let schema = self
229            .table_metadata_manager
230            .schema_manager()
231            .get(SchemaNameKey::new(
232                &create_table.catalog_name,
233                &create_table.schema_name,
234            ))
235            .await
236            .context(TableMetadataManagerSnafu)?;
237        ensure!(
238            schema.is_some(),
239            SchemaNotFoundSnafu {
240                schema_info: &create_table.schema_name,
241            }
242        );
243
244        // if table exists.
245        if let Some(table) = self
246            .catalog_manager
247            .table(
248                &create_table.catalog_name,
249                &create_table.schema_name,
250                &create_table.table_name,
251                Some(&query_ctx),
252            )
253            .await
254            .context(CatalogSnafu)?
255        {
256            return if create_table.create_if_not_exists {
257                Ok(table)
258            } else {
259                TableAlreadyExistsSnafu {
260                    table: format_full_table_name(
261                        &create_table.catalog_name,
262                        &create_table.schema_name,
263                        &create_table.table_name,
264                    ),
265                }
266                .fail()
267            };
268        }
269
270        ensure!(
271            NAME_PATTERN_REG.is_match(&create_table.table_name),
272            InvalidTableNameSnafu {
273                table_name: &create_table.table_name,
274            }
275        );
276
277        let table_name = TableName::new(
278            &create_table.catalog_name,
279            &create_table.schema_name,
280            &create_table.table_name,
281        );
282
283        let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
284        let mut table_info = create_table_info(create_table, partition_cols)?;
285
286        let resp = self
287            .create_table_procedure(
288                create_table.clone(),
289                partitions,
290                table_info.clone(),
291                query_ctx,
292            )
293            .await?;
294
295        let table_id = resp
296            .table_ids
297            .into_iter()
298            .next()
299            .context(error::UnexpectedSnafu {
300                violated: "expected table_id",
301            })?;
302        info!("Successfully created table '{table_name}' with table id {table_id}");
303
304        table_info.ident.table_id = table_id;
305
306        let table_info: Arc<TableInfo> =
307            Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?);
308        create_table.table_id = Some(api::v1::TableId { id: table_id });
309
310        let table = DistTable::table(table_info);
311
312        Ok(table)
313    }
314
315    #[tracing::instrument(skip_all)]
316    pub async fn create_logical_tables(
317        &self,
318        create_table_exprs: &[CreateTableExpr],
319        query_context: QueryContextRef,
320    ) -> Result<Vec<TableRef>> {
321        let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
322        ensure!(
323            !create_table_exprs.is_empty(),
324            EmptyDdlExprSnafu {
325                name: "create logic tables"
326            }
327        );
328
329        // Check table names
330        for create_table in create_table_exprs {
331            ensure!(
332                NAME_PATTERN_REG.is_match(&create_table.table_name),
333                InvalidTableNameSnafu {
334                    table_name: &create_table.table_name,
335                }
336            );
337        }
338
339        let mut raw_tables_info = create_table_exprs
340            .iter()
341            .map(|create| create_table_info(create, vec![]))
342            .collect::<Result<Vec<_>>>()?;
343        let tables_data = create_table_exprs
344            .iter()
345            .cloned()
346            .zip(raw_tables_info.iter().cloned())
347            .collect::<Vec<_>>();
348
349        let resp = self
350            .create_logical_tables_procedure(tables_data, query_context)
351            .await?;
352
353        let table_ids = resp.table_ids;
354        ensure!(table_ids.len() == raw_tables_info.len(), CreateLogicalTablesSnafu {
355            reason: format!("The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}", raw_tables_info.len(), table_ids.len())
356        });
357        info!("Successfully created logical tables: {:?}", table_ids);
358
359        for (i, table_info) in raw_tables_info.iter_mut().enumerate() {
360            table_info.ident.table_id = table_ids[i];
361        }
362        let tables_info = raw_tables_info
363            .into_iter()
364            .map(|x| x.try_into().context(CreateTableInfoSnafu))
365            .collect::<Result<Vec<_>>>()?;
366
367        Ok(tables_info
368            .into_iter()
369            .map(|x| DistTable::table(Arc::new(x)))
370            .collect())
371    }
372
373    #[cfg(feature = "enterprise")]
374    #[tracing::instrument(skip_all)]
375    pub async fn create_trigger(
376        &self,
377        stmt: CreateTrigger,
378        query_context: QueryContextRef,
379    ) -> Result<Output> {
380        let expr = expr_helper::to_create_trigger_task_expr(stmt, &query_context)?;
381        self.create_trigger_inner(expr, query_context).await
382    }
383
384    #[cfg(feature = "enterprise")]
385    pub async fn create_trigger_inner(
386        &self,
387        expr: PbCreateTriggerExpr,
388        query_context: QueryContextRef,
389    ) -> Result<Output> {
390        self.create_trigger_procedure(expr, query_context).await?;
391        Ok(Output::new_with_affected_rows(0))
392    }
393
394    #[cfg(feature = "enterprise")]
395    async fn create_trigger_procedure(
396        &self,
397        expr: PbCreateTriggerExpr,
398        query_context: QueryContextRef,
399    ) -> Result<SubmitDdlTaskResponse> {
400        let task = CreateTriggerTask::try_from(PbCreateTriggerTask {
401            create_trigger: Some(expr),
402        })
403        .context(error::InvalidExprSnafu)?;
404
405        let request = SubmitDdlTaskRequest {
406            query_context,
407            task: DdlTask::new_create_trigger(task),
408        };
409
410        self.procedure_executor
411            .submit_ddl_task(&ExecutorContext::default(), request)
412            .await
413            .context(error::ExecuteDdlSnafu)
414    }
415
416    #[tracing::instrument(skip_all)]
417    pub async fn create_flow(
418        &self,
419        stmt: CreateFlow,
420        query_context: QueryContextRef,
421    ) -> Result<Output> {
422        // TODO(ruihang): do some verification
423        let expr = expr_helper::to_create_flow_task_expr(stmt, &query_context)?;
424
425        self.create_flow_inner(expr, query_context).await
426    }
427
428    pub async fn create_flow_inner(
429        &self,
430        expr: CreateFlowExpr,
431        query_context: QueryContextRef,
432    ) -> Result<Output> {
433        self.create_flow_procedure(expr, query_context).await?;
434        Ok(Output::new_with_affected_rows(0))
435    }
436
437    async fn create_flow_procedure(
438        &self,
439        expr: CreateFlowExpr,
440        query_context: QueryContextRef,
441    ) -> Result<SubmitDdlTaskResponse> {
442        let flow_type = self
443            .determine_flow_type(&expr, query_context.clone())
444            .await?;
445        info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
446
447        let expr = {
448            let mut expr = expr;
449            expr.flow_options
450                .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
451            expr
452        };
453
454        let task = CreateFlowTask::try_from(PbCreateFlowTask {
455            create_flow: Some(expr),
456        })
457        .context(error::InvalidExprSnafu)?;
458        let request = SubmitDdlTaskRequest {
459            query_context,
460            task: DdlTask::new_create_flow(task),
461        };
462
463        self.procedure_executor
464            .submit_ddl_task(&ExecutorContext::default(), request)
465            .await
466            .context(error::ExecuteDdlSnafu)
467    }
468
469    /// Determine the flow type based on the SQL query
470    ///
471    /// If it contains aggregation or distinct, then it is a batch flow, otherwise it is a streaming flow
472    async fn determine_flow_type(
473        &self,
474        expr: &CreateFlowExpr,
475        query_ctx: QueryContextRef,
476    ) -> Result<FlowType> {
477        // first check if source table's ttl is instant, if it is, force streaming mode
478        for src_table_name in &expr.source_table_names {
479            let table = self
480                .catalog_manager()
481                .table(
482                    &src_table_name.catalog_name,
483                    &src_table_name.schema_name,
484                    &src_table_name.table_name,
485                    Some(&query_ctx),
486                )
487                .await
488                .map_err(BoxedError::new)
489                .context(ExternalSnafu)?
490                .with_context(|| TableNotFoundSnafu {
491                    table_name: format_full_table_name(
492                        &src_table_name.catalog_name,
493                        &src_table_name.schema_name,
494                        &src_table_name.table_name,
495                    ),
496                })?;
497
498            // instant source table can only be handled by streaming mode
499            if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
500                warn!(
501                    "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
502                    format_full_table_name(
503                        &src_table_name.catalog_name,
504                        &src_table_name.schema_name,
505                        &src_table_name.table_name
506                    ),
507                    expr.flow_name
508                );
509                return Ok(FlowType::Streaming);
510            }
511        }
512
513        let engine = &self.query_engine;
514        let stmts = ParserContext::create_with_dialect(
515            &expr.sql,
516            query_ctx.sql_dialect(),
517            ParseOptions::default(),
518        )
519        .map_err(BoxedError::new)
520        .context(ExternalSnafu)?;
521
522        ensure!(
523            stmts.len() == 1,
524            InvalidSqlSnafu {
525                err_msg: format!("Expect only one statement, found {}", stmts.len())
526            }
527        );
528        let stmt = &stmts[0];
529
530        // support tql parse too
531        let plan = match stmt {
532            // prom ql is only supported in batching mode
533            Statement::Tql(_) => return Ok(FlowType::Batching),
534            _ => engine
535                .planner()
536                .plan(&QueryStatement::Sql(stmt.clone()), query_ctx)
537                .await
538                .map_err(BoxedError::new)
539                .context(ExternalSnafu)?,
540        };
541
542        /// Visitor to find aggregation or distinct
543        struct FindAggr {
544            is_aggr: bool,
545        }
546
547        impl TreeNodeVisitor<'_> for FindAggr {
548            type Node = LogicalPlan;
549            fn f_down(
550                &mut self,
551                node: &Self::Node,
552            ) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
553            {
554                match node {
555                    LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
556                        self.is_aggr = true;
557                        return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
558                    }
559                    _ => (),
560                }
561                Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
562            }
563        }
564
565        let mut find_aggr = FindAggr { is_aggr: false };
566
567        plan.visit_with_subqueries(&mut find_aggr)
568            .context(BuildDfLogicalPlanSnafu)?;
569        if find_aggr.is_aggr {
570            Ok(FlowType::Batching)
571        } else {
572            Ok(FlowType::Streaming)
573        }
574    }
575
576    #[tracing::instrument(skip_all)]
577    pub async fn create_view(
578        &self,
579        create_view: CreateView,
580        ctx: QueryContextRef,
581    ) -> Result<TableRef> {
582        // convert input into logical plan
583        let logical_plan = match &*create_view.query {
584            Statement::Query(query) => {
585                self.plan(
586                    &QueryStatement::Sql(Statement::Query(query.clone())),
587                    ctx.clone(),
588                )
589                .await?
590            }
591            Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?,
592            _ => {
593                return InvalidViewStmtSnafu {}.fail();
594            }
595        };
596        // Save the definition for `show create view`.
597        let definition = create_view.to_string();
598
599        // Save the columns in plan, it may changed when the schemas of tables in plan
600        // are altered.
601        let schema: Schema = logical_plan
602            .schema()
603            .clone()
604            .try_into()
605            .context(ConvertSchemaSnafu)?;
606        let plan_columns: Vec<_> = schema
607            .column_schemas()
608            .iter()
609            .map(|c| c.name.clone())
610            .collect();
611
612        let columns: Vec<_> = create_view
613            .columns
614            .iter()
615            .map(|ident| ident.to_string())
616            .collect();
617
618        // Validate columns
619        if !columns.is_empty() {
620            ensure!(
621                columns.len() == plan_columns.len(),
622                error::ViewColumnsMismatchSnafu {
623                    view_name: create_view.name.to_string(),
624                    expected: plan_columns.len(),
625                    actual: columns.len(),
626                }
627            );
628        }
629
630        // Extract the table names from the original plan
631        // and rewrite them as fully qualified names.
632        let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
633            .context(ExtractTableNamesSnafu)?;
634
635        let table_names = table_names.into_iter().map(|t| t.into()).collect();
636
637        // TODO(dennis): we don't save the optimized plan yet,
638        // because there are some serialization issue with our own defined plan node (such as `MergeScanLogicalPlan`).
639        // When the issues are fixed, we can use the `optimized_plan` instead.
640        // let optimized_plan = self.optimize_logical_plan(logical_plan)?.unwrap_df_plan();
641
642        // encode logical plan
643        let encoded_plan = DFLogicalSubstraitConvertor
644            .encode(&plan, DefaultSerializer)
645            .context(SubstraitCodecSnafu)?;
646
647        let expr = expr_helper::to_create_view_expr(
648            create_view,
649            encoded_plan.to_vec(),
650            table_names,
651            columns,
652            plan_columns,
653            definition,
654            ctx.clone(),
655        )?;
656
657        // TODO(dennis): validate the logical plan
658        self.create_view_by_expr(expr, ctx).await
659    }
660
661    pub async fn create_view_by_expr(
662        &self,
663        expr: CreateViewExpr,
664        ctx: QueryContextRef,
665    ) -> Result<TableRef> {
666        ensure! {
667            !(expr.create_if_not_exists & expr.or_replace),
668            InvalidSqlSnafu {
669                err_msg: "syntax error Create Or Replace and If Not Exist cannot be used together",
670            }
671        };
672        let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
673
674        let schema_exists = self
675            .table_metadata_manager
676            .schema_manager()
677            .exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name))
678            .await
679            .context(TableMetadataManagerSnafu)?;
680
681        ensure!(
682            schema_exists,
683            SchemaNotFoundSnafu {
684                schema_info: &expr.schema_name,
685            }
686        );
687
688        // if view or table exists.
689        if let Some(table) = self
690            .catalog_manager
691            .table(
692                &expr.catalog_name,
693                &expr.schema_name,
694                &expr.view_name,
695                Some(&ctx),
696            )
697            .await
698            .context(CatalogSnafu)?
699        {
700            let table_type = table.table_info().table_type;
701
702            match (table_type, expr.create_if_not_exists, expr.or_replace) {
703                (TableType::View, true, false) => {
704                    return Ok(table);
705                }
706                (TableType::View, false, false) => {
707                    return ViewAlreadyExistsSnafu {
708                        name: format_full_table_name(
709                            &expr.catalog_name,
710                            &expr.schema_name,
711                            &expr.view_name,
712                        ),
713                    }
714                    .fail();
715                }
716                (TableType::View, _, true) => {
717                    // Try to replace an exists view
718                }
719                _ => {
720                    return TableAlreadyExistsSnafu {
721                        table: format_full_table_name(
722                            &expr.catalog_name,
723                            &expr.schema_name,
724                            &expr.view_name,
725                        ),
726                    }
727                    .fail();
728                }
729            }
730        }
731
732        ensure!(
733            NAME_PATTERN_REG.is_match(&expr.view_name),
734            InvalidViewNameSnafu {
735                name: expr.view_name.clone(),
736            }
737        );
738
739        let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
740
741        let mut view_info = RawTableInfo {
742            ident: metadata::TableIdent {
743                // The view id of distributed table is assigned by Meta, set "0" here as a placeholder.
744                table_id: 0,
745                version: 0,
746            },
747            name: expr.view_name.clone(),
748            desc: None,
749            catalog_name: expr.catalog_name.clone(),
750            schema_name: expr.schema_name.clone(),
751            // The meta doesn't make sense for views, so using a default one.
752            meta: RawTableMeta::default(),
753            table_type: TableType::View,
754        };
755
756        let request = SubmitDdlTaskRequest {
757            query_context: ctx,
758            task: DdlTask::new_create_view(expr, view_info.clone()),
759        };
760
761        let resp = self
762            .procedure_executor
763            .submit_ddl_task(&ExecutorContext::default(), request)
764            .await
765            .context(error::ExecuteDdlSnafu)?;
766
767        debug!(
768            "Submit creating view '{view_name}' task response: {:?}",
769            resp
770        );
771
772        let view_id = resp
773            .table_ids
774            .into_iter()
775            .next()
776            .context(error::UnexpectedSnafu {
777                violated: "expected table_id",
778            })?;
779        info!("Successfully created view '{view_name}' with view id {view_id}");
780
781        view_info.ident.table_id = view_id;
782
783        let view_info = Arc::new(view_info.try_into().context(CreateTableInfoSnafu)?);
784
785        let table = DistTable::table(view_info);
786
787        // Invalidates local cache ASAP.
788        self.cache_invalidator
789            .invalidate(
790                &Context::default(),
791                &[
792                    CacheIdent::TableId(view_id),
793                    CacheIdent::TableName(view_name.clone()),
794                ],
795            )
796            .await
797            .context(error::InvalidateTableCacheSnafu)?;
798
799        Ok(table)
800    }
801
802    #[tracing::instrument(skip_all)]
803    pub async fn drop_flow(
804        &self,
805        catalog_name: String,
806        flow_name: String,
807        drop_if_exists: bool,
808        query_context: QueryContextRef,
809    ) -> Result<Output> {
810        if let Some(flow) = self
811            .flow_metadata_manager
812            .flow_name_manager()
813            .get(&catalog_name, &flow_name)
814            .await
815            .context(error::TableMetadataManagerSnafu)?
816        {
817            let flow_id = flow.flow_id();
818            let task = DropFlowTask {
819                catalog_name,
820                flow_name,
821                flow_id,
822                drop_if_exists,
823            };
824            self.drop_flow_procedure(task, query_context).await?;
825
826            Ok(Output::new_with_affected_rows(0))
827        } else if drop_if_exists {
828            Ok(Output::new_with_affected_rows(0))
829        } else {
830            FlowNotFoundSnafu {
831                flow_name: format_full_flow_name(&catalog_name, &flow_name),
832            }
833            .fail()
834        }
835    }
836
837    async fn drop_flow_procedure(
838        &self,
839        expr: DropFlowTask,
840        query_context: QueryContextRef,
841    ) -> Result<SubmitDdlTaskResponse> {
842        let request = SubmitDdlTaskRequest {
843            query_context,
844            task: DdlTask::new_drop_flow(expr),
845        };
846
847        self.procedure_executor
848            .submit_ddl_task(&ExecutorContext::default(), request)
849            .await
850            .context(error::ExecuteDdlSnafu)
851    }
852
853    #[cfg(feature = "enterprise")]
854    #[tracing::instrument(skip_all)]
855    pub(super) async fn drop_trigger(
856        &self,
857        catalog_name: String,
858        trigger_name: String,
859        drop_if_exists: bool,
860        query_context: QueryContextRef,
861    ) -> Result<Output> {
862        let task = DropTriggerTask {
863            catalog_name,
864            trigger_name,
865            drop_if_exists,
866        };
867        self.drop_trigger_procedure(task, query_context).await?;
868        Ok(Output::new_with_affected_rows(0))
869    }
870
871    #[cfg(feature = "enterprise")]
872    async fn drop_trigger_procedure(
873        &self,
874        expr: DropTriggerTask,
875        query_context: QueryContextRef,
876    ) -> Result<SubmitDdlTaskResponse> {
877        let request = SubmitDdlTaskRequest {
878            query_context,
879            task: DdlTask::new_drop_trigger(expr),
880        };
881
882        self.procedure_executor
883            .submit_ddl_task(&ExecutorContext::default(), request)
884            .await
885            .context(error::ExecuteDdlSnafu)
886    }
887
888    /// Drop a view
889    #[tracing::instrument(skip_all)]
890    pub(crate) async fn drop_view(
891        &self,
892        catalog: String,
893        schema: String,
894        view: String,
895        drop_if_exists: bool,
896        query_context: QueryContextRef,
897    ) -> Result<Output> {
898        let view_info = if let Some(view) = self
899            .catalog_manager
900            .table(&catalog, &schema, &view, None)
901            .await
902            .context(CatalogSnafu)?
903        {
904            view.table_info()
905        } else if drop_if_exists {
906            // DROP VIEW IF EXISTS meets view not found - ignored
907            return Ok(Output::new_with_affected_rows(0));
908        } else {
909            return TableNotFoundSnafu {
910                table_name: format_full_table_name(&catalog, &schema, &view),
911            }
912            .fail();
913        };
914
915        // Ensure the exists one is view, we can't drop other table types
916        ensure!(
917            view_info.table_type == TableType::View,
918            error::InvalidViewSnafu {
919                msg: "not a view",
920                view_name: format_full_table_name(&catalog, &schema, &view),
921            }
922        );
923
924        let view_id = view_info.table_id();
925
926        let task = DropViewTask {
927            catalog,
928            schema,
929            view,
930            view_id,
931            drop_if_exists,
932        };
933
934        self.drop_view_procedure(task, query_context).await?;
935
936        Ok(Output::new_with_affected_rows(0))
937    }
938
939    /// Submit [DropViewTask] to procedure executor.
940    async fn drop_view_procedure(
941        &self,
942        expr: DropViewTask,
943        query_context: QueryContextRef,
944    ) -> Result<SubmitDdlTaskResponse> {
945        let request = SubmitDdlTaskRequest {
946            query_context,
947            task: DdlTask::new_drop_view(expr),
948        };
949
950        self.procedure_executor
951            .submit_ddl_task(&ExecutorContext::default(), request)
952            .await
953            .context(error::ExecuteDdlSnafu)
954    }
955
956    #[tracing::instrument(skip_all)]
957    pub async fn alter_logical_tables(
958        &self,
959        alter_table_exprs: Vec<AlterTableExpr>,
960        query_context: QueryContextRef,
961    ) -> Result<Output> {
962        let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
963        ensure!(
964            !alter_table_exprs.is_empty(),
965            EmptyDdlExprSnafu {
966                name: "alter logical tables"
967            }
968        );
969
970        // group by physical table id
971        let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
972        for expr in alter_table_exprs {
973            // Get table_id from catalog_manager
974            let catalog = if expr.catalog_name.is_empty() {
975                query_context.current_catalog()
976            } else {
977                &expr.catalog_name
978            };
979            let schema = if expr.schema_name.is_empty() {
980                query_context.current_schema()
981            } else {
982                expr.schema_name.to_string()
983            };
984            let table_name = &expr.table_name;
985            let table = self
986                .catalog_manager
987                .table(catalog, &schema, table_name, Some(&query_context))
988                .await
989                .context(CatalogSnafu)?
990                .with_context(|| TableNotFoundSnafu {
991                    table_name: format_full_table_name(catalog, &schema, table_name),
992                })?;
993            let table_id = table.table_info().ident.table_id;
994            let physical_table_id = self
995                .table_metadata_manager
996                .table_route_manager()
997                .get_physical_table_id(table_id)
998                .await
999                .context(TableMetadataManagerSnafu)?;
1000            groups.entry(physical_table_id).or_default().push(expr);
1001        }
1002
1003        // Submit procedure for each physical table
1004        let mut handles = Vec::with_capacity(groups.len());
1005        for (_physical_table_id, exprs) in groups {
1006            let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
1007            handles.push(fut);
1008        }
1009        let _results = futures::future::try_join_all(handles).await?;
1010
1011        Ok(Output::new_with_affected_rows(0))
1012    }
1013
1014    #[tracing::instrument(skip_all)]
1015    pub async fn drop_table(
1016        &self,
1017        table_name: TableName,
1018        drop_if_exists: bool,
1019        query_context: QueryContextRef,
1020    ) -> Result<Output> {
1021        // Reserved for grpc call
1022        self.drop_tables(&[table_name], drop_if_exists, query_context)
1023            .await
1024    }
1025
1026    #[tracing::instrument(skip_all)]
1027    pub async fn drop_tables(
1028        &self,
1029        table_names: &[TableName],
1030        drop_if_exists: bool,
1031        query_context: QueryContextRef,
1032    ) -> Result<Output> {
1033        let mut tables = Vec::with_capacity(table_names.len());
1034        for table_name in table_names {
1035            ensure!(
1036                !is_readonly_schema(&table_name.schema_name),
1037                SchemaReadOnlySnafu {
1038                    name: table_name.schema_name.clone()
1039                }
1040            );
1041
1042            if let Some(table) = self
1043                .catalog_manager
1044                .table(
1045                    &table_name.catalog_name,
1046                    &table_name.schema_name,
1047                    &table_name.table_name,
1048                    Some(&query_context),
1049                )
1050                .await
1051                .context(CatalogSnafu)?
1052            {
1053                tables.push(table.table_info().table_id());
1054            } else if drop_if_exists {
1055                // DROP TABLE IF EXISTS meets table not found - ignored
1056                continue;
1057            } else {
1058                return TableNotFoundSnafu {
1059                    table_name: table_name.to_string(),
1060                }
1061                .fail();
1062            }
1063        }
1064
1065        for (table_name, table_id) in table_names.iter().zip(tables.into_iter()) {
1066            self.drop_table_procedure(table_name, table_id, drop_if_exists, query_context.clone())
1067                .await?;
1068
1069            // Invalidates local cache ASAP.
1070            self.cache_invalidator
1071                .invalidate(
1072                    &Context::default(),
1073                    &[
1074                        CacheIdent::TableId(table_id),
1075                        CacheIdent::TableName(table_name.clone()),
1076                    ],
1077                )
1078                .await
1079                .context(error::InvalidateTableCacheSnafu)?;
1080        }
1081        Ok(Output::new_with_affected_rows(0))
1082    }
1083
1084    #[tracing::instrument(skip_all)]
1085    pub async fn drop_database(
1086        &self,
1087        catalog: String,
1088        schema: String,
1089        drop_if_exists: bool,
1090        query_context: QueryContextRef,
1091    ) -> Result<Output> {
1092        ensure!(
1093            !is_readonly_schema(&schema),
1094            SchemaReadOnlySnafu { name: schema }
1095        );
1096
1097        if self
1098            .catalog_manager
1099            .schema_exists(&catalog, &schema, None)
1100            .await
1101            .context(CatalogSnafu)?
1102        {
1103            if schema == query_context.current_schema() {
1104                SchemaInUseSnafu { name: schema }.fail()
1105            } else {
1106                self.drop_database_procedure(catalog, schema, drop_if_exists, query_context)
1107                    .await?;
1108
1109                Ok(Output::new_with_affected_rows(0))
1110            }
1111        } else if drop_if_exists {
1112            // DROP TABLE IF EXISTS meets table not found - ignored
1113            Ok(Output::new_with_affected_rows(0))
1114        } else {
1115            SchemaNotFoundSnafu {
1116                schema_info: schema,
1117            }
1118            .fail()
1119        }
1120    }
1121
1122    #[tracing::instrument(skip_all)]
1123    pub async fn truncate_table(
1124        &self,
1125        table_name: TableName,
1126        query_context: QueryContextRef,
1127    ) -> Result<Output> {
1128        ensure!(
1129            !is_readonly_schema(&table_name.schema_name),
1130            SchemaReadOnlySnafu {
1131                name: table_name.schema_name.clone()
1132            }
1133        );
1134
1135        let table = self
1136            .catalog_manager
1137            .table(
1138                &table_name.catalog_name,
1139                &table_name.schema_name,
1140                &table_name.table_name,
1141                Some(&query_context),
1142            )
1143            .await
1144            .context(CatalogSnafu)?
1145            .with_context(|| TableNotFoundSnafu {
1146                table_name: table_name.to_string(),
1147            })?;
1148        let table_id = table.table_info().table_id();
1149        self.truncate_table_procedure(&table_name, table_id, query_context)
1150            .await?;
1151
1152        Ok(Output::new_with_affected_rows(0))
1153    }
1154
1155    #[tracing::instrument(skip_all)]
1156    pub async fn alter_table(
1157        &self,
1158        alter_table: AlterTable,
1159        query_context: QueryContextRef,
1160    ) -> Result<Output> {
1161        let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
1162        self.alter_table_inner(expr, query_context).await
1163    }
1164
1165    #[tracing::instrument(skip_all)]
1166    pub async fn alter_table_inner(
1167        &self,
1168        expr: AlterTableExpr,
1169        query_context: QueryContextRef,
1170    ) -> Result<Output> {
1171        ensure!(
1172            !is_readonly_schema(&expr.schema_name),
1173            SchemaReadOnlySnafu {
1174                name: expr.schema_name.clone()
1175            }
1176        );
1177
1178        let catalog_name = if expr.catalog_name.is_empty() {
1179            DEFAULT_CATALOG_NAME.to_string()
1180        } else {
1181            expr.catalog_name.clone()
1182        };
1183
1184        let schema_name = if expr.schema_name.is_empty() {
1185            DEFAULT_SCHEMA_NAME.to_string()
1186        } else {
1187            expr.schema_name.clone()
1188        };
1189
1190        let table_name = expr.table_name.clone();
1191
1192        let table = self
1193            .catalog_manager
1194            .table(
1195                &catalog_name,
1196                &schema_name,
1197                &table_name,
1198                Some(&query_context),
1199            )
1200            .await
1201            .context(CatalogSnafu)?
1202            .with_context(|| TableNotFoundSnafu {
1203                table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
1204            })?;
1205
1206        let table_id = table.table_info().ident.table_id;
1207        let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?;
1208        if !need_alter {
1209            return Ok(Output::new_with_affected_rows(0));
1210        }
1211        info!(
1212            "Table info before alter is {:?}, expr: {:?}",
1213            table.table_info(),
1214            expr
1215        );
1216
1217        let physical_table_id = self
1218            .table_metadata_manager
1219            .table_route_manager()
1220            .get_physical_table_id(table_id)
1221            .await
1222            .context(TableMetadataManagerSnafu)?;
1223
1224        let (req, invalidate_keys) = if physical_table_id == table_id {
1225            // This is physical table
1226            let req = SubmitDdlTaskRequest {
1227                query_context,
1228                task: DdlTask::new_alter_table(expr),
1229            };
1230
1231            let invalidate_keys = vec![
1232                CacheIdent::TableId(table_id),
1233                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1234            ];
1235
1236            (req, invalidate_keys)
1237        } else {
1238            // This is logical table
1239            let req = SubmitDdlTaskRequest {
1240                query_context,
1241                task: DdlTask::new_alter_logical_tables(vec![expr]),
1242            };
1243
1244            let mut invalidate_keys = vec![
1245                CacheIdent::TableId(physical_table_id),
1246                CacheIdent::TableId(table_id),
1247                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1248            ];
1249
1250            let physical_table = self
1251                .table_metadata_manager
1252                .table_info_manager()
1253                .get(physical_table_id)
1254                .await
1255                .context(TableMetadataManagerSnafu)?
1256                .map(|x| x.into_inner());
1257            if let Some(physical_table) = physical_table {
1258                let physical_table_name = TableName::new(
1259                    physical_table.table_info.catalog_name,
1260                    physical_table.table_info.schema_name,
1261                    physical_table.table_info.name,
1262                );
1263                invalidate_keys.push(CacheIdent::TableName(physical_table_name));
1264            }
1265
1266            (req, invalidate_keys)
1267        };
1268
1269        self.procedure_executor
1270            .submit_ddl_task(&ExecutorContext::default(), req)
1271            .await
1272            .context(error::ExecuteDdlSnafu)?;
1273
1274        // Invalidates local cache ASAP.
1275        self.cache_invalidator
1276            .invalidate(&Context::default(), &invalidate_keys)
1277            .await
1278            .context(error::InvalidateTableCacheSnafu)?;
1279
1280        Ok(Output::new_with_affected_rows(0))
1281    }
1282
1283    #[tracing::instrument(skip_all)]
1284    pub async fn alter_database(
1285        &self,
1286        alter_expr: AlterDatabase,
1287        query_context: QueryContextRef,
1288    ) -> Result<Output> {
1289        let alter_expr = expr_helper::to_alter_database_expr(alter_expr, &query_context)?;
1290        self.alter_database_inner(alter_expr, query_context).await
1291    }
1292
1293    #[tracing::instrument(skip_all)]
1294    pub async fn alter_database_inner(
1295        &self,
1296        alter_expr: AlterDatabaseExpr,
1297        query_context: QueryContextRef,
1298    ) -> Result<Output> {
1299        ensure!(
1300            !is_readonly_schema(&alter_expr.schema_name),
1301            SchemaReadOnlySnafu {
1302                name: query_context.current_schema().clone()
1303            }
1304        );
1305
1306        let exists = self
1307            .catalog_manager
1308            .schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
1309            .await
1310            .context(CatalogSnafu)?;
1311        ensure!(
1312            exists,
1313            SchemaNotFoundSnafu {
1314                schema_info: alter_expr.schema_name,
1315            }
1316        );
1317
1318        let cache_ident = [CacheIdent::SchemaName(SchemaName {
1319            catalog_name: alter_expr.catalog_name.clone(),
1320            schema_name: alter_expr.schema_name.clone(),
1321        })];
1322
1323        self.alter_database_procedure(alter_expr, query_context)
1324            .await?;
1325
1326        // Invalidates local cache ASAP.
1327        self.cache_invalidator
1328            .invalidate(&Context::default(), &cache_ident)
1329            .await
1330            .context(error::InvalidateTableCacheSnafu)?;
1331
1332        Ok(Output::new_with_affected_rows(0))
1333    }
1334
1335    async fn create_table_procedure(
1336        &self,
1337        create_table: CreateTableExpr,
1338        partitions: Vec<Partition>,
1339        table_info: RawTableInfo,
1340        query_context: QueryContextRef,
1341    ) -> Result<SubmitDdlTaskResponse> {
1342        let partitions = partitions.into_iter().map(Into::into).collect();
1343
1344        let request = SubmitDdlTaskRequest {
1345            query_context,
1346            task: DdlTask::new_create_table(create_table, partitions, table_info),
1347        };
1348
1349        self.procedure_executor
1350            .submit_ddl_task(&ExecutorContext::default(), request)
1351            .await
1352            .context(error::ExecuteDdlSnafu)
1353    }
1354
1355    async fn create_logical_tables_procedure(
1356        &self,
1357        tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
1358        query_context: QueryContextRef,
1359    ) -> Result<SubmitDdlTaskResponse> {
1360        let request = SubmitDdlTaskRequest {
1361            query_context,
1362            task: DdlTask::new_create_logical_tables(tables_data),
1363        };
1364
1365        self.procedure_executor
1366            .submit_ddl_task(&ExecutorContext::default(), request)
1367            .await
1368            .context(error::ExecuteDdlSnafu)
1369    }
1370
1371    async fn alter_logical_tables_procedure(
1372        &self,
1373        tables_data: Vec<AlterTableExpr>,
1374        query_context: QueryContextRef,
1375    ) -> Result<SubmitDdlTaskResponse> {
1376        let request = SubmitDdlTaskRequest {
1377            query_context,
1378            task: DdlTask::new_alter_logical_tables(tables_data),
1379        };
1380
1381        self.procedure_executor
1382            .submit_ddl_task(&ExecutorContext::default(), request)
1383            .await
1384            .context(error::ExecuteDdlSnafu)
1385    }
1386
1387    async fn drop_table_procedure(
1388        &self,
1389        table_name: &TableName,
1390        table_id: TableId,
1391        drop_if_exists: bool,
1392        query_context: QueryContextRef,
1393    ) -> Result<SubmitDdlTaskResponse> {
1394        let request = SubmitDdlTaskRequest {
1395            query_context,
1396            task: DdlTask::new_drop_table(
1397                table_name.catalog_name.to_string(),
1398                table_name.schema_name.to_string(),
1399                table_name.table_name.to_string(),
1400                table_id,
1401                drop_if_exists,
1402            ),
1403        };
1404
1405        self.procedure_executor
1406            .submit_ddl_task(&ExecutorContext::default(), request)
1407            .await
1408            .context(error::ExecuteDdlSnafu)
1409    }
1410
1411    async fn drop_database_procedure(
1412        &self,
1413        catalog: String,
1414        schema: String,
1415        drop_if_exists: bool,
1416        query_context: QueryContextRef,
1417    ) -> Result<SubmitDdlTaskResponse> {
1418        let request = SubmitDdlTaskRequest {
1419            query_context,
1420            task: DdlTask::new_drop_database(catalog, schema, drop_if_exists),
1421        };
1422
1423        self.procedure_executor
1424            .submit_ddl_task(&ExecutorContext::default(), request)
1425            .await
1426            .context(error::ExecuteDdlSnafu)
1427    }
1428
1429    async fn alter_database_procedure(
1430        &self,
1431        alter_expr: AlterDatabaseExpr,
1432        query_context: QueryContextRef,
1433    ) -> Result<SubmitDdlTaskResponse> {
1434        let request = SubmitDdlTaskRequest {
1435            query_context,
1436            task: DdlTask::new_alter_database(alter_expr),
1437        };
1438
1439        self.procedure_executor
1440            .submit_ddl_task(&ExecutorContext::default(), request)
1441            .await
1442            .context(error::ExecuteDdlSnafu)
1443    }
1444
1445    async fn truncate_table_procedure(
1446        &self,
1447        table_name: &TableName,
1448        table_id: TableId,
1449        query_context: QueryContextRef,
1450    ) -> Result<SubmitDdlTaskResponse> {
1451        let request = SubmitDdlTaskRequest {
1452            query_context,
1453            task: DdlTask::new_truncate_table(
1454                table_name.catalog_name.to_string(),
1455                table_name.schema_name.to_string(),
1456                table_name.table_name.to_string(),
1457                table_id,
1458            ),
1459        };
1460
1461        self.procedure_executor
1462            .submit_ddl_task(&ExecutorContext::default(), request)
1463            .await
1464            .context(error::ExecuteDdlSnafu)
1465    }
1466
1467    #[tracing::instrument(skip_all)]
1468    pub async fn create_database(
1469        &self,
1470        database: &str,
1471        create_if_not_exists: bool,
1472        options: HashMap<String, String>,
1473        query_context: QueryContextRef,
1474    ) -> Result<Output> {
1475        let catalog = query_context.current_catalog();
1476        ensure!(
1477            NAME_PATTERN_REG.is_match(catalog),
1478            error::UnexpectedSnafu {
1479                violated: format!("Invalid catalog name: {}", catalog)
1480            }
1481        );
1482
1483        ensure!(
1484            NAME_PATTERN_REG.is_match(database),
1485            error::UnexpectedSnafu {
1486                violated: format!("Invalid database name: {}", database)
1487            }
1488        );
1489
1490        if !self
1491            .catalog_manager
1492            .schema_exists(catalog, database, None)
1493            .await
1494            .context(CatalogSnafu)?
1495            && !self.catalog_manager.is_reserved_schema_name(database)
1496        {
1497            self.create_database_procedure(
1498                catalog.to_string(),
1499                database.to_string(),
1500                create_if_not_exists,
1501                options,
1502                query_context,
1503            )
1504            .await?;
1505
1506            Ok(Output::new_with_affected_rows(1))
1507        } else if create_if_not_exists {
1508            Ok(Output::new_with_affected_rows(1))
1509        } else {
1510            error::SchemaExistsSnafu { name: database }.fail()
1511        }
1512    }
1513
1514    async fn create_database_procedure(
1515        &self,
1516        catalog: String,
1517        database: String,
1518        create_if_not_exists: bool,
1519        options: HashMap<String, String>,
1520        query_context: QueryContextRef,
1521    ) -> Result<SubmitDdlTaskResponse> {
1522        let request = SubmitDdlTaskRequest {
1523            query_context,
1524            task: DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
1525        };
1526
1527        self.procedure_executor
1528            .submit_ddl_task(&ExecutorContext::default(), request)
1529            .await
1530            .context(error::ExecuteDdlSnafu)
1531    }
1532}
1533
1534/// Parse partition statement [Partitions] into [MetaPartition] and partition columns.
1535pub fn parse_partitions(
1536    create_table: &CreateTableExpr,
1537    partitions: Option<Partitions>,
1538    query_ctx: &QueryContextRef,
1539) -> Result<(Vec<MetaPartition>, Vec<String>)> {
1540    // If partitions are not defined by user, use the timestamp column (which has to be existed) as
1541    // the partition column, and create only one partition.
1542    let partition_columns = find_partition_columns(&partitions)?;
1543    let partition_entries =
1544        find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
1545
1546    // Validates partition
1547    let mut exprs = vec![];
1548    for partition in &partition_entries {
1549        for bound in partition {
1550            if let PartitionBound::Expr(expr) = bound {
1551                exprs.push(expr.clone());
1552            }
1553        }
1554    }
1555    MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true)
1556        .context(InvalidPartitionSnafu)?;
1557
1558    Ok((
1559        partition_entries
1560            .into_iter()
1561            .map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x)))
1562            .collect::<std::result::Result<_, _>>()
1563            .context(DeserializePartitionSnafu)?,
1564        partition_columns,
1565    ))
1566}
1567
1568/// Verifies an alter and returns whether it is necessary to perform the alter.
1569///
1570/// # Returns
1571///
1572/// Returns true if the alter need to be porformed; otherwise, it returns false.
1573pub fn verify_alter(
1574    table_id: TableId,
1575    table_info: Arc<TableInfo>,
1576    expr: AlterTableExpr,
1577) -> Result<bool> {
1578    let request: AlterTableRequest =
1579        common_grpc_expr::alter_expr_to_request(table_id, expr).context(AlterExprToRequestSnafu)?;
1580
1581    let AlterTableRequest {
1582        table_name,
1583        alter_kind,
1584        ..
1585    } = &request;
1586
1587    if let AlterKind::RenameTable { new_table_name } = alter_kind {
1588        ensure!(
1589            NAME_PATTERN_REG.is_match(new_table_name),
1590            error::UnexpectedSnafu {
1591                violated: format!("Invalid table name: {}", new_table_name)
1592            }
1593        );
1594    } else if let AlterKind::AddColumns { columns } = alter_kind {
1595        // If all the columns are marked as add_if_not_exists and they already exist in the table,
1596        // there is no need to perform the alter.
1597        let column_names: HashSet<_> = table_info
1598            .meta
1599            .schema
1600            .column_schemas()
1601            .iter()
1602            .map(|schema| &schema.name)
1603            .collect();
1604        if columns.iter().all(|column| {
1605            column_names.contains(&column.column_schema.name) && column.add_if_not_exists
1606        }) {
1607            return Ok(false);
1608        }
1609    }
1610
1611    let _ = table_info
1612        .meta
1613        .builder_with_alter_kind(table_name, &request.alter_kind)
1614        .context(error::TableSnafu)?
1615        .build()
1616        .context(error::BuildTableMetaSnafu { table_name })?;
1617
1618    Ok(true)
1619}
1620
1621pub fn create_table_info(
1622    create_table: &CreateTableExpr,
1623    partition_columns: Vec<String>,
1624) -> Result<RawTableInfo> {
1625    let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
1626    let mut column_name_to_index_map = HashMap::new();
1627
1628    for (idx, column) in create_table.column_defs.iter().enumerate() {
1629        let schema =
1630            column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
1631                column: &column.name,
1632            })?;
1633        let schema = schema.with_time_index(column.name == create_table.time_index);
1634
1635        column_schemas.push(schema);
1636        let _ = column_name_to_index_map.insert(column.name.clone(), idx);
1637    }
1638
1639    let timestamp_index = column_name_to_index_map
1640        .get(&create_table.time_index)
1641        .cloned();
1642
1643    let raw_schema = RawSchema {
1644        column_schemas: column_schemas.clone(),
1645        timestamp_index,
1646        version: 0,
1647    };
1648
1649    let primary_key_indices = create_table
1650        .primary_keys
1651        .iter()
1652        .map(|name| {
1653            column_name_to_index_map
1654                .get(name)
1655                .cloned()
1656                .context(ColumnNotFoundSnafu { msg: name })
1657        })
1658        .collect::<Result<Vec<_>>>()?;
1659
1660    let partition_key_indices = partition_columns
1661        .into_iter()
1662        .map(|col_name| {
1663            column_name_to_index_map
1664                .get(&col_name)
1665                .cloned()
1666                .context(ColumnNotFoundSnafu { msg: col_name })
1667        })
1668        .collect::<Result<Vec<_>>>()?;
1669
1670    let table_options = TableOptions::try_from_iter(&create_table.table_options)
1671        .context(UnrecognizedTableOptionSnafu)?;
1672
1673    let meta = RawTableMeta {
1674        schema: raw_schema,
1675        primary_key_indices,
1676        value_indices: vec![],
1677        engine: create_table.engine.clone(),
1678        next_column_id: column_schemas.len() as u32,
1679        region_numbers: vec![],
1680        options: table_options,
1681        created_on: Utc::now(),
1682        partition_key_indices,
1683        column_ids: vec![],
1684    };
1685
1686    let desc = if create_table.desc.is_empty() {
1687        create_table.table_options.get(COMMENT_KEY).cloned()
1688    } else {
1689        Some(create_table.desc.clone())
1690    };
1691
1692    let table_info = RawTableInfo {
1693        ident: metadata::TableIdent {
1694            // The table id of distributed table is assigned by Meta, set "0" here as a placeholder.
1695            table_id: 0,
1696            version: 0,
1697        },
1698        name: create_table.table_name.clone(),
1699        desc,
1700        catalog_name: create_table.catalog_name.clone(),
1701        schema_name: create_table.schema_name.clone(),
1702        meta,
1703        table_type: TableType::Base,
1704    };
1705    Ok(table_info)
1706}
1707
1708fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
1709    let columns = if let Some(partitions) = partitions {
1710        partitions
1711            .column_list
1712            .iter()
1713            .map(|x| x.value.clone())
1714            .collect::<Vec<_>>()
1715    } else {
1716        vec![]
1717    };
1718    Ok(columns)
1719}
1720
1721/// Parse [Partitions] into a group of partition entries.
1722///
1723/// Returns a list of [PartitionBound], each of which defines a partition.
1724fn find_partition_entries(
1725    create_table: &CreateTableExpr,
1726    partitions: &Option<Partitions>,
1727    partition_columns: &[String],
1728    query_ctx: &QueryContextRef,
1729) -> Result<Vec<Vec<PartitionBound>>> {
1730    let entries = if let Some(partitions) = partitions {
1731        // extract concrete data type of partition columns
1732        let column_defs = partition_columns
1733            .iter()
1734            .map(|pc| {
1735                create_table
1736                    .column_defs
1737                    .iter()
1738                    .find(|c| &c.name == pc)
1739                    // unwrap is safe here because we have checked that partition columns are defined
1740                    .unwrap()
1741            })
1742            .collect::<Vec<_>>();
1743        let mut column_name_and_type = HashMap::with_capacity(column_defs.len());
1744        for column in column_defs {
1745            let column_name = &column.name;
1746            let data_type = ConcreteDataType::from(
1747                ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension)
1748                    .context(ColumnDataTypeSnafu)?,
1749            );
1750            column_name_and_type.insert(column_name, data_type);
1751        }
1752
1753        // Transform parser expr to partition expr
1754        let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
1755        for partition in &partitions.exprs {
1756            let partition_expr =
1757                convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
1758            partition_exprs.push(vec![PartitionBound::Expr(partition_expr)]);
1759        }
1760
1761        // fallback for no expr
1762        if partition_exprs.is_empty() {
1763            partition_exprs.push(vec![PartitionBound::MaxValue]);
1764        }
1765
1766        partition_exprs
1767    } else {
1768        vec![vec![PartitionBound::MaxValue]]
1769    };
1770    Ok(entries)
1771}
1772
1773fn convert_one_expr(
1774    expr: &Expr,
1775    column_name_and_type: &HashMap<&String, ConcreteDataType>,
1776    timezone: &Timezone,
1777) -> Result<PartitionExpr> {
1778    let Expr::BinaryOp { left, op, right } = expr else {
1779        return InvalidPartitionRuleSnafu {
1780            reason: "partition rule must be a binary expression",
1781        }
1782        .fail();
1783    };
1784
1785    let op =
1786        RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
1787            reason: format!("unsupported operator in partition expr {op}"),
1788        })?;
1789
1790    // convert leaf node.
1791    let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
1792        // col, val
1793        (Expr::Identifier(ident), Expr::Value(value)) => {
1794            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1795            let value = convert_value(value, data_type, timezone, None)?;
1796            (Operand::Column(column_name), op, Operand::Value(value))
1797        }
1798        (Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
1799            if let Expr::Value(v) = &**expr =>
1800        {
1801            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1802            let value = convert_value(v, data_type, timezone, Some(*unary_op))?;
1803            (Operand::Column(column_name), op, Operand::Value(value))
1804        }
1805        // val, col
1806        (Expr::Value(value), Expr::Identifier(ident)) => {
1807            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1808            let value = convert_value(value, data_type, timezone, None)?;
1809            (Operand::Value(value), op, Operand::Column(column_name))
1810        }
1811        (Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
1812            if let Expr::Value(v) = &**expr =>
1813        {
1814            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1815            let value = convert_value(v, data_type, timezone, Some(*unary_op))?;
1816            (Operand::Value(value), op, Operand::Column(column_name))
1817        }
1818        (Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
1819            // sub-expr must against another sub-expr
1820            let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
1821            let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
1822            (Operand::Expr(lhs), op, Operand::Expr(rhs))
1823        }
1824        _ => {
1825            return InvalidPartitionRuleSnafu {
1826                reason: format!("invalid partition expr {expr}"),
1827            }
1828            .fail();
1829        }
1830    };
1831
1832    Ok(PartitionExpr::new(lhs, op, rhs))
1833}
1834
1835fn convert_identifier(
1836    ident: &Ident,
1837    column_name_and_type: &HashMap<&String, ConcreteDataType>,
1838) -> Result<(String, ConcreteDataType)> {
1839    let column_name = ident.value.clone();
1840    let data_type = column_name_and_type
1841        .get(&column_name)
1842        .cloned()
1843        .with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
1844    Ok((column_name, data_type))
1845}
1846
1847fn convert_value(
1848    value: &ParserValue,
1849    data_type: ConcreteDataType,
1850    timezone: &Timezone,
1851    unary_op: Option<UnaryOperator>,
1852) -> Result<Value> {
1853    sql_value_to_value(
1854        "<NONAME>",
1855        &data_type,
1856        value,
1857        Some(timezone),
1858        unary_op,
1859        false,
1860    )
1861    .context(error::SqlCommonSnafu)
1862}
1863
1864#[cfg(test)]
1865mod test {
1866    use session::context::{QueryContext, QueryContextBuilder};
1867    use sql::dialect::GreptimeDbDialect;
1868    use sql::parser::{ParseOptions, ParserContext};
1869    use sql::statements::statement::Statement;
1870
1871    use super::*;
1872    use crate::expr_helper;
1873
1874    #[test]
1875    fn test_name_is_match() {
1876        assert!(!NAME_PATTERN_REG.is_match("/adaf"));
1877        assert!(!NAME_PATTERN_REG.is_match("🈲"));
1878        assert!(NAME_PATTERN_REG.is_match("hello"));
1879        assert!(NAME_PATTERN_REG.is_match("test@"));
1880        assert!(!NAME_PATTERN_REG.is_match("@test"));
1881        assert!(NAME_PATTERN_REG.is_match("test#"));
1882        assert!(!NAME_PATTERN_REG.is_match("#test"));
1883        assert!(!NAME_PATTERN_REG.is_match("@"));
1884        assert!(!NAME_PATTERN_REG.is_match("#"));
1885    }
1886
1887    #[tokio::test]
1888    #[ignore = "TODO(ruihang): WIP new partition rule"]
1889    async fn test_parse_partitions() {
1890        common_telemetry::init_default_ut_logging();
1891        let cases = [
1892            (
1893                r"
1894CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
1895PARTITION ON COLUMNS (b) (
1896  b < 'hz',
1897  b >= 'hz' AND b < 'sh',
1898  b >= 'sh'
1899)
1900ENGINE=mito",
1901                r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
1902            ),
1903            (
1904                r"
1905CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
1906PARTITION BY RANGE COLUMNS (b, a) (
1907  PARTITION r0 VALUES LESS THAN ('hz', 10),
1908  b < 'hz' AND a < 10,
1909  b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20,
1910  b >= 'sh' AND a >= 20
1911)
1912ENGINE=mito",
1913                r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
1914            ),
1915        ];
1916        let ctx = QueryContextBuilder::default().build().into();
1917        for (sql, expected) in cases {
1918            let result = ParserContext::create_with_dialect(
1919                sql,
1920                &GreptimeDbDialect {},
1921                ParseOptions::default(),
1922            )
1923            .unwrap();
1924            match &result[0] {
1925                Statement::CreateTable(c) => {
1926                    let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
1927                    let (partitions, _) =
1928                        parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
1929                    let json = serde_json::to_string(&partitions).unwrap();
1930                    assert_eq!(json, expected);
1931                }
1932                _ => unreachable!(),
1933            }
1934        }
1935    }
1936}