operator/statement/
ddl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use api::helper::ColumnDataTypeWrapper;
19use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
20use api::v1::{
21    column_def, AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
22};
23#[cfg(feature = "enterprise")]
24use api::v1::{
25    meta::CreateTriggerTask as PbCreateTriggerTask, CreateTriggerExpr as PbCreateTriggerExpr,
26};
27use catalog::CatalogManagerRef;
28use chrono::Utc;
29use common_catalog::consts::{is_readonly_schema, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
30use common_catalog::{format_full_flow_name, format_full_table_name};
31use common_error::ext::BoxedError;
32use common_meta::cache_invalidator::Context;
33use common_meta::ddl::create_flow::FlowType;
34use common_meta::ddl::ExecutorContext;
35use common_meta::instruction::CacheIdent;
36use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
37use common_meta::key::NAME_PATTERN;
38#[cfg(feature = "enterprise")]
39use common_meta::rpc::ddl::trigger::CreateTriggerTask;
40use common_meta::rpc::ddl::{
41    CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
42    SubmitDdlTaskResponse,
43};
44use common_meta::rpc::router::{Partition, Partition as MetaPartition};
45use common_query::Output;
46use common_telemetry::{debug, info, tracing, warn};
47use common_time::Timezone;
48use datafusion_common::tree_node::TreeNodeVisitor;
49use datafusion_expr::LogicalPlan;
50use datatypes::prelude::ConcreteDataType;
51use datatypes::schema::{RawSchema, Schema};
52use datatypes::value::Value;
53use lazy_static::lazy_static;
54use partition::expr::{Operand, PartitionExpr, RestrictedOp};
55use partition::multi_dim::MultiDimPartitionRule;
56use partition::partition::{PartitionBound, PartitionDef};
57use query::parser::QueryStatement;
58use query::plan::extract_and_rewrite_full_table_names;
59use query::query_engine::DefaultSerializer;
60use query::sql::create_table_stmt;
61use regex::Regex;
62use session::context::QueryContextRef;
63use session::table_name::table_idents_to_full_name;
64use snafu::{ensure, OptionExt, ResultExt};
65use sql::parser::{ParseOptions, ParserContext};
66use sql::statements::alter::{AlterDatabase, AlterTable};
67#[cfg(feature = "enterprise")]
68use sql::statements::create::trigger::CreateTrigger;
69use sql::statements::create::{
70    CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
71};
72use sql::statements::sql_value_to_value;
73use sql::statements::statement::Statement;
74use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
75use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
76use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
77use table::dist_table::DistTable;
78use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
79use table::requests::{AlterKind, AlterTableRequest, TableOptions, COMMENT_KEY};
80use table::table_name::TableName;
81use table::TableRef;
82
83use crate::error::{
84    self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
85    ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
86    DeserializePartitionSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
87    FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
88    InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, ParseSqlValueSnafu, Result,
89    SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu,
90    TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
91    UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
92};
93use crate::expr_helper;
94use crate::statement::show::create_partitions_stmt;
95use crate::statement::StatementExecutor;
96
97lazy_static! {
98    static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap();
99}
100
101impl StatementExecutor {
102    pub fn catalog_manager(&self) -> CatalogManagerRef {
103        self.catalog_manager.clone()
104    }
105
106    #[tracing::instrument(skip_all)]
107    pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
108        let create_expr = &mut expr_helper::create_to_expr(&stmt, &ctx)?;
109        self.create_table_inner(create_expr, stmt.partitions, ctx)
110            .await
111    }
112
113    #[tracing::instrument(skip_all)]
114    pub async fn create_table_like(
115        &self,
116        stmt: CreateTableLike,
117        ctx: QueryContextRef,
118    ) -> Result<TableRef> {
119        let (catalog, schema, table) = table_idents_to_full_name(&stmt.source_name, &ctx)
120            .map_err(BoxedError::new)
121            .context(error::ExternalSnafu)?;
122        let table_ref = self
123            .catalog_manager
124            .table(&catalog, &schema, &table, Some(&ctx))
125            .await
126            .context(CatalogSnafu)?
127            .context(TableNotFoundSnafu { table_name: &table })?;
128        let partitions = self
129            .partition_manager
130            .find_table_partitions(table_ref.table_info().table_id())
131            .await
132            .context(error::FindTablePartitionRuleSnafu { table_name: table })?;
133
134        // CREATE TABLE LIKE also inherits database level options.
135        let schema_options = self
136            .table_metadata_manager
137            .schema_manager()
138            .get(SchemaNameKey {
139                catalog: &catalog,
140                schema: &schema,
141            })
142            .await
143            .context(TableMetadataManagerSnafu)?
144            .map(|v| v.into_inner());
145
146        let quote_style = ctx.quote_style();
147        let mut create_stmt =
148            create_table_stmt(&table_ref.table_info(), schema_options, quote_style)
149                .context(error::ParseQuerySnafu)?;
150        create_stmt.name = stmt.table_name;
151        create_stmt.if_not_exists = false;
152
153        let partitions = create_partitions_stmt(partitions)?.and_then(|mut partitions| {
154            if !partitions.column_list.is_empty() {
155                partitions.set_quote(quote_style);
156                Some(partitions)
157            } else {
158                None
159            }
160        });
161
162        let create_expr = &mut expr_helper::create_to_expr(&create_stmt, &ctx)?;
163        self.create_table_inner(create_expr, partitions, ctx).await
164    }
165
166    #[tracing::instrument(skip_all)]
167    pub async fn create_external_table(
168        &self,
169        create_expr: CreateExternalTable,
170        ctx: QueryContextRef,
171    ) -> Result<TableRef> {
172        let create_expr = &mut expr_helper::create_external_expr(create_expr, &ctx).await?;
173        self.create_table_inner(create_expr, None, ctx).await
174    }
175
176    #[tracing::instrument(skip_all)]
177    pub async fn create_table_inner(
178        &self,
179        create_table: &mut CreateTableExpr,
180        partitions: Option<Partitions>,
181        query_ctx: QueryContextRef,
182    ) -> Result<TableRef> {
183        ensure!(
184            !is_readonly_schema(&create_table.schema_name),
185            SchemaReadOnlySnafu {
186                name: create_table.schema_name.clone()
187            }
188        );
189
190        if create_table.engine == METRIC_ENGINE_NAME
191            && create_table
192                .table_options
193                .contains_key(LOGICAL_TABLE_METADATA_KEY)
194        {
195            // Create logical tables
196            ensure!(
197                partitions.is_none(),
198                InvalidPartitionRuleSnafu {
199                    reason: "logical table in metric engine should not have partition rule, it will be inherited from physical table",
200                }
201            );
202            self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
203                .await?
204                .into_iter()
205                .next()
206                .context(error::UnexpectedSnafu {
207                    violated: "expected to create logical tables",
208                })
209        } else {
210            // Create other normal table
211            self.create_non_logic_table(create_table, partitions, query_ctx)
212                .await
213        }
214    }
215
216    #[tracing::instrument(skip_all)]
217    pub async fn create_non_logic_table(
218        &self,
219        create_table: &mut CreateTableExpr,
220        partitions: Option<Partitions>,
221        query_ctx: QueryContextRef,
222    ) -> Result<TableRef> {
223        let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
224
225        // Check if schema exists
226        let schema = self
227            .table_metadata_manager
228            .schema_manager()
229            .get(SchemaNameKey::new(
230                &create_table.catalog_name,
231                &create_table.schema_name,
232            ))
233            .await
234            .context(TableMetadataManagerSnafu)?;
235        ensure!(
236            schema.is_some(),
237            SchemaNotFoundSnafu {
238                schema_info: &create_table.schema_name,
239            }
240        );
241
242        // if table exists.
243        if let Some(table) = self
244            .catalog_manager
245            .table(
246                &create_table.catalog_name,
247                &create_table.schema_name,
248                &create_table.table_name,
249                Some(&query_ctx),
250            )
251            .await
252            .context(CatalogSnafu)?
253        {
254            return if create_table.create_if_not_exists {
255                Ok(table)
256            } else {
257                TableAlreadyExistsSnafu {
258                    table: format_full_table_name(
259                        &create_table.catalog_name,
260                        &create_table.schema_name,
261                        &create_table.table_name,
262                    ),
263                }
264                .fail()
265            };
266        }
267
268        ensure!(
269            NAME_PATTERN_REG.is_match(&create_table.table_name),
270            InvalidTableNameSnafu {
271                table_name: &create_table.table_name,
272            }
273        );
274
275        let table_name = TableName::new(
276            &create_table.catalog_name,
277            &create_table.schema_name,
278            &create_table.table_name,
279        );
280
281        let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
282        let mut table_info = create_table_info(create_table, partition_cols)?;
283
284        let resp = self
285            .create_table_procedure(
286                create_table.clone(),
287                partitions,
288                table_info.clone(),
289                query_ctx,
290            )
291            .await?;
292
293        let table_id = resp
294            .table_ids
295            .into_iter()
296            .next()
297            .context(error::UnexpectedSnafu {
298                violated: "expected table_id",
299            })?;
300        info!("Successfully created table '{table_name}' with table id {table_id}");
301
302        table_info.ident.table_id = table_id;
303
304        let table_info: Arc<TableInfo> =
305            Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?);
306        create_table.table_id = Some(api::v1::TableId { id: table_id });
307
308        let table = DistTable::table(table_info);
309
310        Ok(table)
311    }
312
313    #[tracing::instrument(skip_all)]
314    pub async fn create_logical_tables(
315        &self,
316        create_table_exprs: &[CreateTableExpr],
317        query_context: QueryContextRef,
318    ) -> Result<Vec<TableRef>> {
319        let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
320        ensure!(
321            !create_table_exprs.is_empty(),
322            EmptyDdlExprSnafu {
323                name: "create logic tables"
324            }
325        );
326
327        // Check table names
328        for create_table in create_table_exprs {
329            ensure!(
330                NAME_PATTERN_REG.is_match(&create_table.table_name),
331                InvalidTableNameSnafu {
332                    table_name: &create_table.table_name,
333                }
334            );
335        }
336
337        let mut raw_tables_info = create_table_exprs
338            .iter()
339            .map(|create| create_table_info(create, vec![]))
340            .collect::<Result<Vec<_>>>()?;
341        let tables_data = create_table_exprs
342            .iter()
343            .cloned()
344            .zip(raw_tables_info.iter().cloned())
345            .collect::<Vec<_>>();
346
347        let resp = self
348            .create_logical_tables_procedure(tables_data, query_context)
349            .await?;
350
351        let table_ids = resp.table_ids;
352        ensure!(table_ids.len() == raw_tables_info.len(), CreateLogicalTablesSnafu {
353            reason: format!("The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}", raw_tables_info.len(), table_ids.len())
354        });
355        info!("Successfully created logical tables: {:?}", table_ids);
356
357        for (i, table_info) in raw_tables_info.iter_mut().enumerate() {
358            table_info.ident.table_id = table_ids[i];
359        }
360        let tables_info = raw_tables_info
361            .into_iter()
362            .map(|x| x.try_into().context(CreateTableInfoSnafu))
363            .collect::<Result<Vec<_>>>()?;
364
365        Ok(tables_info
366            .into_iter()
367            .map(|x| DistTable::table(Arc::new(x)))
368            .collect())
369    }
370
371    #[cfg(feature = "enterprise")]
372    #[tracing::instrument(skip_all)]
373    pub async fn create_trigger(
374        &self,
375        stmt: CreateTrigger,
376        query_context: QueryContextRef,
377    ) -> Result<Output> {
378        let expr = expr_helper::to_create_trigger_task_expr(stmt, &query_context)?;
379        self.create_trigger_inner(expr, query_context).await
380    }
381
382    #[cfg(feature = "enterprise")]
383    pub async fn create_trigger_inner(
384        &self,
385        expr: PbCreateTriggerExpr,
386        query_context: QueryContextRef,
387    ) -> Result<Output> {
388        self.create_trigger_procedure(expr, query_context).await?;
389        Ok(Output::new_with_affected_rows(0))
390    }
391
392    #[cfg(feature = "enterprise")]
393    async fn create_trigger_procedure(
394        &self,
395        expr: PbCreateTriggerExpr,
396        query_context: QueryContextRef,
397    ) -> Result<SubmitDdlTaskResponse> {
398        let task = CreateTriggerTask::try_from(PbCreateTriggerTask {
399            create_trigger: Some(expr),
400        })
401        .context(error::InvalidExprSnafu)?;
402
403        let request = SubmitDdlTaskRequest {
404            query_context,
405            task: DdlTask::new_create_trigger(task),
406        };
407
408        self.procedure_executor
409            .submit_ddl_task(&ExecutorContext::default(), request)
410            .await
411            .context(error::ExecuteDdlSnafu)
412    }
413
414    #[tracing::instrument(skip_all)]
415    pub async fn create_flow(
416        &self,
417        stmt: CreateFlow,
418        query_context: QueryContextRef,
419    ) -> Result<Output> {
420        // TODO(ruihang): do some verification
421        let expr = expr_helper::to_create_flow_task_expr(stmt, &query_context)?;
422
423        self.create_flow_inner(expr, query_context).await
424    }
425
426    pub async fn create_flow_inner(
427        &self,
428        expr: CreateFlowExpr,
429        query_context: QueryContextRef,
430    ) -> Result<Output> {
431        self.create_flow_procedure(expr, query_context).await?;
432        Ok(Output::new_with_affected_rows(0))
433    }
434
435    async fn create_flow_procedure(
436        &self,
437        expr: CreateFlowExpr,
438        query_context: QueryContextRef,
439    ) -> Result<SubmitDdlTaskResponse> {
440        let flow_type = self
441            .determine_flow_type(&expr, query_context.clone())
442            .await?;
443        info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
444
445        let expr = {
446            let mut expr = expr;
447            expr.flow_options
448                .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
449            expr
450        };
451
452        let task = CreateFlowTask::try_from(PbCreateFlowTask {
453            create_flow: Some(expr),
454        })
455        .context(error::InvalidExprSnafu)?;
456        let request = SubmitDdlTaskRequest {
457            query_context,
458            task: DdlTask::new_create_flow(task),
459        };
460
461        self.procedure_executor
462            .submit_ddl_task(&ExecutorContext::default(), request)
463            .await
464            .context(error::ExecuteDdlSnafu)
465    }
466
467    /// Determine the flow type based on the SQL query
468    ///
469    /// If it contains aggregation or distinct, then it is a batch flow, otherwise it is a streaming flow
470    async fn determine_flow_type(
471        &self,
472        expr: &CreateFlowExpr,
473        query_ctx: QueryContextRef,
474    ) -> Result<FlowType> {
475        // first check if source table's ttl is instant, if it is, force streaming mode
476        for src_table_name in &expr.source_table_names {
477            let table = self
478                .catalog_manager()
479                .table(
480                    &src_table_name.catalog_name,
481                    &src_table_name.schema_name,
482                    &src_table_name.table_name,
483                    Some(&query_ctx),
484                )
485                .await
486                .map_err(BoxedError::new)
487                .context(ExternalSnafu)?
488                .with_context(|| TableNotFoundSnafu {
489                    table_name: format_full_table_name(
490                        &src_table_name.catalog_name,
491                        &src_table_name.schema_name,
492                        &src_table_name.table_name,
493                    ),
494                })?;
495
496            // instant source table can only be handled by streaming mode
497            if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
498                warn!(
499                    "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
500                    format_full_table_name(
501                        &src_table_name.catalog_name,
502                        &src_table_name.schema_name,
503                        &src_table_name.table_name
504                    ),
505                    expr.flow_name
506                );
507                return Ok(FlowType::Streaming);
508            }
509        }
510
511        let engine = &self.query_engine;
512        let stmts = ParserContext::create_with_dialect(
513            &expr.sql,
514            query_ctx.sql_dialect(),
515            ParseOptions::default(),
516        )
517        .map_err(BoxedError::new)
518        .context(ExternalSnafu)?;
519
520        ensure!(
521            stmts.len() == 1,
522            InvalidSqlSnafu {
523                err_msg: format!("Expect only one statement, found {}", stmts.len())
524            }
525        );
526        let stmt = &stmts[0];
527
528        // support tql parse too
529        let plan = match stmt {
530            // prom ql is only supported in batching mode
531            Statement::Tql(_) => return Ok(FlowType::Batching),
532            _ => engine
533                .planner()
534                .plan(&QueryStatement::Sql(stmt.clone()), query_ctx)
535                .await
536                .map_err(BoxedError::new)
537                .context(ExternalSnafu)?,
538        };
539
540        /// Visitor to find aggregation or distinct
541        struct FindAggr {
542            is_aggr: bool,
543        }
544
545        impl TreeNodeVisitor<'_> for FindAggr {
546            type Node = LogicalPlan;
547            fn f_down(
548                &mut self,
549                node: &Self::Node,
550            ) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
551            {
552                match node {
553                    LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
554                        self.is_aggr = true;
555                        return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
556                    }
557                    _ => (),
558                }
559                Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
560            }
561        }
562
563        let mut find_aggr = FindAggr { is_aggr: false };
564
565        plan.visit_with_subqueries(&mut find_aggr)
566            .context(BuildDfLogicalPlanSnafu)?;
567        if find_aggr.is_aggr {
568            Ok(FlowType::Batching)
569        } else {
570            Ok(FlowType::Streaming)
571        }
572    }
573
574    #[tracing::instrument(skip_all)]
575    pub async fn create_view(
576        &self,
577        create_view: CreateView,
578        ctx: QueryContextRef,
579    ) -> Result<TableRef> {
580        // convert input into logical plan
581        let logical_plan = match &*create_view.query {
582            Statement::Query(query) => {
583                self.plan(
584                    &QueryStatement::Sql(Statement::Query(query.clone())),
585                    ctx.clone(),
586                )
587                .await?
588            }
589            Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?,
590            _ => {
591                return InvalidViewStmtSnafu {}.fail();
592            }
593        };
594        // Save the definition for `show create view`.
595        let definition = create_view.to_string();
596
597        // Save the columns in plan, it may changed when the schemas of tables in plan
598        // are altered.
599        let schema: Schema = logical_plan
600            .schema()
601            .clone()
602            .try_into()
603            .context(ConvertSchemaSnafu)?;
604        let plan_columns: Vec<_> = schema
605            .column_schemas()
606            .iter()
607            .map(|c| c.name.clone())
608            .collect();
609
610        let columns: Vec<_> = create_view
611            .columns
612            .iter()
613            .map(|ident| ident.to_string())
614            .collect();
615
616        // Validate columns
617        if !columns.is_empty() {
618            ensure!(
619                columns.len() == plan_columns.len(),
620                error::ViewColumnsMismatchSnafu {
621                    view_name: create_view.name.to_string(),
622                    expected: plan_columns.len(),
623                    actual: columns.len(),
624                }
625            );
626        }
627
628        // Extract the table names from the original plan
629        // and rewrite them as fully qualified names.
630        let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
631            .context(ExtractTableNamesSnafu)?;
632
633        let table_names = table_names.into_iter().map(|t| t.into()).collect();
634
635        // TODO(dennis): we don't save the optimized plan yet,
636        // because there are some serialization issue with our own defined plan node (such as `MergeScanLogicalPlan`).
637        // When the issues are fixed, we can use the `optimized_plan` instead.
638        // let optimized_plan = self.optimize_logical_plan(logical_plan)?.unwrap_df_plan();
639
640        // encode logical plan
641        let encoded_plan = DFLogicalSubstraitConvertor
642            .encode(&plan, DefaultSerializer)
643            .context(SubstraitCodecSnafu)?;
644
645        let expr = expr_helper::to_create_view_expr(
646            create_view,
647            encoded_plan.to_vec(),
648            table_names,
649            columns,
650            plan_columns,
651            definition,
652            ctx.clone(),
653        )?;
654
655        //TODO(dennis): validate the logical plan
656        self.create_view_by_expr(expr, ctx).await
657    }
658
659    pub async fn create_view_by_expr(
660        &self,
661        expr: CreateViewExpr,
662        ctx: QueryContextRef,
663    ) -> Result<TableRef> {
664        ensure! {
665            !(expr.create_if_not_exists & expr.or_replace),
666            InvalidSqlSnafu {
667                err_msg: "syntax error Create Or Replace and If Not Exist cannot be used together",
668            }
669        };
670        let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
671
672        let schema_exists = self
673            .table_metadata_manager
674            .schema_manager()
675            .exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name))
676            .await
677            .context(TableMetadataManagerSnafu)?;
678
679        ensure!(
680            schema_exists,
681            SchemaNotFoundSnafu {
682                schema_info: &expr.schema_name,
683            }
684        );
685
686        // if view or table exists.
687        if let Some(table) = self
688            .catalog_manager
689            .table(
690                &expr.catalog_name,
691                &expr.schema_name,
692                &expr.view_name,
693                Some(&ctx),
694            )
695            .await
696            .context(CatalogSnafu)?
697        {
698            let table_type = table.table_info().table_type;
699
700            match (table_type, expr.create_if_not_exists, expr.or_replace) {
701                (TableType::View, true, false) => {
702                    return Ok(table);
703                }
704                (TableType::View, false, false) => {
705                    return ViewAlreadyExistsSnafu {
706                        name: format_full_table_name(
707                            &expr.catalog_name,
708                            &expr.schema_name,
709                            &expr.view_name,
710                        ),
711                    }
712                    .fail();
713                }
714                (TableType::View, _, true) => {
715                    // Try to replace an exists view
716                }
717                _ => {
718                    return TableAlreadyExistsSnafu {
719                        table: format_full_table_name(
720                            &expr.catalog_name,
721                            &expr.schema_name,
722                            &expr.view_name,
723                        ),
724                    }
725                    .fail();
726                }
727            }
728        }
729
730        ensure!(
731            NAME_PATTERN_REG.is_match(&expr.view_name),
732            InvalidViewNameSnafu {
733                name: expr.view_name.clone(),
734            }
735        );
736
737        let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
738
739        let mut view_info = RawTableInfo {
740            ident: metadata::TableIdent {
741                // The view id of distributed table is assigned by Meta, set "0" here as a placeholder.
742                table_id: 0,
743                version: 0,
744            },
745            name: expr.view_name.clone(),
746            desc: None,
747            catalog_name: expr.catalog_name.clone(),
748            schema_name: expr.schema_name.clone(),
749            // The meta doesn't make sense for views, so using a default one.
750            meta: RawTableMeta::default(),
751            table_type: TableType::View,
752        };
753
754        let request = SubmitDdlTaskRequest {
755            query_context: ctx,
756            task: DdlTask::new_create_view(expr, view_info.clone()),
757        };
758
759        let resp = self
760            .procedure_executor
761            .submit_ddl_task(&ExecutorContext::default(), request)
762            .await
763            .context(error::ExecuteDdlSnafu)?;
764
765        debug!(
766            "Submit creating view '{view_name}' task response: {:?}",
767            resp
768        );
769
770        let view_id = resp
771            .table_ids
772            .into_iter()
773            .next()
774            .context(error::UnexpectedSnafu {
775                violated: "expected table_id",
776            })?;
777        info!("Successfully created view '{view_name}' with view id {view_id}");
778
779        view_info.ident.table_id = view_id;
780
781        let view_info = Arc::new(view_info.try_into().context(CreateTableInfoSnafu)?);
782
783        let table = DistTable::table(view_info);
784
785        // Invalidates local cache ASAP.
786        self.cache_invalidator
787            .invalidate(
788                &Context::default(),
789                &[
790                    CacheIdent::TableId(view_id),
791                    CacheIdent::TableName(view_name.clone()),
792                ],
793            )
794            .await
795            .context(error::InvalidateTableCacheSnafu)?;
796
797        Ok(table)
798    }
799
800    #[tracing::instrument(skip_all)]
801    pub async fn drop_flow(
802        &self,
803        catalog_name: String,
804        flow_name: String,
805        drop_if_exists: bool,
806        query_context: QueryContextRef,
807    ) -> Result<Output> {
808        if let Some(flow) = self
809            .flow_metadata_manager
810            .flow_name_manager()
811            .get(&catalog_name, &flow_name)
812            .await
813            .context(error::TableMetadataManagerSnafu)?
814        {
815            let flow_id = flow.flow_id();
816            let task = DropFlowTask {
817                catalog_name,
818                flow_name,
819                flow_id,
820                drop_if_exists,
821            };
822            self.drop_flow_procedure(task, query_context).await?;
823
824            Ok(Output::new_with_affected_rows(0))
825        } else if drop_if_exists {
826            Ok(Output::new_with_affected_rows(0))
827        } else {
828            FlowNotFoundSnafu {
829                flow_name: format_full_flow_name(&catalog_name, &flow_name),
830            }
831            .fail()
832        }
833    }
834
835    async fn drop_flow_procedure(
836        &self,
837        expr: DropFlowTask,
838        query_context: QueryContextRef,
839    ) -> Result<SubmitDdlTaskResponse> {
840        let request = SubmitDdlTaskRequest {
841            query_context,
842            task: DdlTask::new_drop_flow(expr),
843        };
844
845        self.procedure_executor
846            .submit_ddl_task(&ExecutorContext::default(), request)
847            .await
848            .context(error::ExecuteDdlSnafu)
849    }
850
851    /// Drop a view
852    #[tracing::instrument(skip_all)]
853    pub(crate) async fn drop_view(
854        &self,
855        catalog: String,
856        schema: String,
857        view: String,
858        drop_if_exists: bool,
859        query_context: QueryContextRef,
860    ) -> Result<Output> {
861        let view_info = if let Some(view) = self
862            .catalog_manager
863            .table(&catalog, &schema, &view, None)
864            .await
865            .context(CatalogSnafu)?
866        {
867            view.table_info()
868        } else if drop_if_exists {
869            // DROP VIEW IF EXISTS meets view not found - ignored
870            return Ok(Output::new_with_affected_rows(0));
871        } else {
872            return TableNotFoundSnafu {
873                table_name: format_full_table_name(&catalog, &schema, &view),
874            }
875            .fail();
876        };
877
878        // Ensure the exists one is view, we can't drop other table types
879        ensure!(
880            view_info.table_type == TableType::View,
881            error::InvalidViewSnafu {
882                msg: "not a view",
883                view_name: format_full_table_name(&catalog, &schema, &view),
884            }
885        );
886
887        let view_id = view_info.table_id();
888
889        let task = DropViewTask {
890            catalog,
891            schema,
892            view,
893            view_id,
894            drop_if_exists,
895        };
896
897        self.drop_view_procedure(task, query_context).await?;
898
899        Ok(Output::new_with_affected_rows(0))
900    }
901
902    /// Submit [DropViewTask] to procedure executor.
903    async fn drop_view_procedure(
904        &self,
905        expr: DropViewTask,
906        query_context: QueryContextRef,
907    ) -> Result<SubmitDdlTaskResponse> {
908        let request = SubmitDdlTaskRequest {
909            query_context,
910            task: DdlTask::new_drop_view(expr),
911        };
912
913        self.procedure_executor
914            .submit_ddl_task(&ExecutorContext::default(), request)
915            .await
916            .context(error::ExecuteDdlSnafu)
917    }
918
919    #[tracing::instrument(skip_all)]
920    pub async fn alter_logical_tables(
921        &self,
922        alter_table_exprs: Vec<AlterTableExpr>,
923        query_context: QueryContextRef,
924    ) -> Result<Output> {
925        let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
926        ensure!(
927            !alter_table_exprs.is_empty(),
928            EmptyDdlExprSnafu {
929                name: "alter logical tables"
930            }
931        );
932
933        // group by physical table id
934        let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
935        for expr in alter_table_exprs {
936            // Get table_id from catalog_manager
937            let catalog = if expr.catalog_name.is_empty() {
938                query_context.current_catalog()
939            } else {
940                &expr.catalog_name
941            };
942            let schema = if expr.schema_name.is_empty() {
943                query_context.current_schema()
944            } else {
945                expr.schema_name.to_string()
946            };
947            let table_name = &expr.table_name;
948            let table = self
949                .catalog_manager
950                .table(catalog, &schema, table_name, Some(&query_context))
951                .await
952                .context(CatalogSnafu)?
953                .with_context(|| TableNotFoundSnafu {
954                    table_name: format_full_table_name(catalog, &schema, table_name),
955                })?;
956            let table_id = table.table_info().ident.table_id;
957            let physical_table_id = self
958                .table_metadata_manager
959                .table_route_manager()
960                .get_physical_table_id(table_id)
961                .await
962                .context(TableMetadataManagerSnafu)?;
963            groups.entry(physical_table_id).or_default().push(expr);
964        }
965
966        // Submit procedure for each physical table
967        let mut handles = Vec::with_capacity(groups.len());
968        for (_physical_table_id, exprs) in groups {
969            let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
970            handles.push(fut);
971        }
972        let _results = futures::future::try_join_all(handles).await?;
973
974        Ok(Output::new_with_affected_rows(0))
975    }
976
977    #[tracing::instrument(skip_all)]
978    pub async fn drop_table(
979        &self,
980        table_name: TableName,
981        drop_if_exists: bool,
982        query_context: QueryContextRef,
983    ) -> Result<Output> {
984        // Reserved for grpc call
985        self.drop_tables(&[table_name], drop_if_exists, query_context)
986            .await
987    }
988
989    #[tracing::instrument(skip_all)]
990    pub async fn drop_tables(
991        &self,
992        table_names: &[TableName],
993        drop_if_exists: bool,
994        query_context: QueryContextRef,
995    ) -> Result<Output> {
996        let mut tables = Vec::with_capacity(table_names.len());
997        for table_name in table_names {
998            ensure!(
999                !is_readonly_schema(&table_name.schema_name),
1000                SchemaReadOnlySnafu {
1001                    name: table_name.schema_name.clone()
1002                }
1003            );
1004
1005            if let Some(table) = self
1006                .catalog_manager
1007                .table(
1008                    &table_name.catalog_name,
1009                    &table_name.schema_name,
1010                    &table_name.table_name,
1011                    Some(&query_context),
1012                )
1013                .await
1014                .context(CatalogSnafu)?
1015            {
1016                tables.push(table.table_info().table_id());
1017            } else if drop_if_exists {
1018                // DROP TABLE IF EXISTS meets table not found - ignored
1019                continue;
1020            } else {
1021                return TableNotFoundSnafu {
1022                    table_name: table_name.to_string(),
1023                }
1024                .fail();
1025            }
1026        }
1027
1028        for (table_name, table_id) in table_names.iter().zip(tables.into_iter()) {
1029            self.drop_table_procedure(table_name, table_id, drop_if_exists, query_context.clone())
1030                .await?;
1031
1032            // Invalidates local cache ASAP.
1033            self.cache_invalidator
1034                .invalidate(
1035                    &Context::default(),
1036                    &[
1037                        CacheIdent::TableId(table_id),
1038                        CacheIdent::TableName(table_name.clone()),
1039                    ],
1040                )
1041                .await
1042                .context(error::InvalidateTableCacheSnafu)?;
1043        }
1044        Ok(Output::new_with_affected_rows(0))
1045    }
1046
1047    #[tracing::instrument(skip_all)]
1048    pub async fn drop_database(
1049        &self,
1050        catalog: String,
1051        schema: String,
1052        drop_if_exists: bool,
1053        query_context: QueryContextRef,
1054    ) -> Result<Output> {
1055        ensure!(
1056            !is_readonly_schema(&schema),
1057            SchemaReadOnlySnafu { name: schema }
1058        );
1059
1060        if self
1061            .catalog_manager
1062            .schema_exists(&catalog, &schema, None)
1063            .await
1064            .context(CatalogSnafu)?
1065        {
1066            if schema == query_context.current_schema() {
1067                SchemaInUseSnafu { name: schema }.fail()
1068            } else {
1069                self.drop_database_procedure(catalog, schema, drop_if_exists, query_context)
1070                    .await?;
1071
1072                Ok(Output::new_with_affected_rows(0))
1073            }
1074        } else if drop_if_exists {
1075            // DROP TABLE IF EXISTS meets table not found - ignored
1076            Ok(Output::new_with_affected_rows(0))
1077        } else {
1078            SchemaNotFoundSnafu {
1079                schema_info: schema,
1080            }
1081            .fail()
1082        }
1083    }
1084
1085    #[tracing::instrument(skip_all)]
1086    pub async fn truncate_table(
1087        &self,
1088        table_name: TableName,
1089        query_context: QueryContextRef,
1090    ) -> Result<Output> {
1091        ensure!(
1092            !is_readonly_schema(&table_name.schema_name),
1093            SchemaReadOnlySnafu {
1094                name: table_name.schema_name.clone()
1095            }
1096        );
1097
1098        let table = self
1099            .catalog_manager
1100            .table(
1101                &table_name.catalog_name,
1102                &table_name.schema_name,
1103                &table_name.table_name,
1104                Some(&query_context),
1105            )
1106            .await
1107            .context(CatalogSnafu)?
1108            .with_context(|| TableNotFoundSnafu {
1109                table_name: table_name.to_string(),
1110            })?;
1111        let table_id = table.table_info().table_id();
1112        self.truncate_table_procedure(&table_name, table_id, query_context)
1113            .await?;
1114
1115        Ok(Output::new_with_affected_rows(0))
1116    }
1117
1118    /// Verifies an alter and returns whether it is necessary to perform the alter.
1119    ///
1120    /// # Returns
1121    ///
1122    /// Returns true if the alter need to be porformed; otherwise, it returns false.
1123    fn verify_alter(
1124        &self,
1125        table_id: TableId,
1126        table_info: Arc<TableInfo>,
1127        expr: AlterTableExpr,
1128    ) -> Result<bool> {
1129        let request: AlterTableRequest = common_grpc_expr::alter_expr_to_request(table_id, expr)
1130            .context(AlterExprToRequestSnafu)?;
1131
1132        let AlterTableRequest {
1133            table_name,
1134            alter_kind,
1135            ..
1136        } = &request;
1137
1138        if let AlterKind::RenameTable { new_table_name } = alter_kind {
1139            ensure!(
1140                NAME_PATTERN_REG.is_match(new_table_name),
1141                error::UnexpectedSnafu {
1142                    violated: format!("Invalid table name: {}", new_table_name)
1143                }
1144            );
1145        } else if let AlterKind::AddColumns { columns } = alter_kind {
1146            // If all the columns are marked as add_if_not_exists and they already exist in the table,
1147            // there is no need to perform the alter.
1148            let column_names: HashSet<_> = table_info
1149                .meta
1150                .schema
1151                .column_schemas()
1152                .iter()
1153                .map(|schema| &schema.name)
1154                .collect();
1155            if columns.iter().all(|column| {
1156                column_names.contains(&column.column_schema.name) && column.add_if_not_exists
1157            }) {
1158                return Ok(false);
1159            }
1160        }
1161
1162        let _ = table_info
1163            .meta
1164            .builder_with_alter_kind(table_name, &request.alter_kind)
1165            .context(error::TableSnafu)?
1166            .build()
1167            .context(error::BuildTableMetaSnafu { table_name })?;
1168
1169        Ok(true)
1170    }
1171
1172    #[tracing::instrument(skip_all)]
1173    pub async fn alter_table(
1174        &self,
1175        alter_table: AlterTable,
1176        query_context: QueryContextRef,
1177    ) -> Result<Output> {
1178        let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
1179        self.alter_table_inner(expr, query_context).await
1180    }
1181
1182    #[tracing::instrument(skip_all)]
1183    pub async fn alter_table_inner(
1184        &self,
1185        expr: AlterTableExpr,
1186        query_context: QueryContextRef,
1187    ) -> Result<Output> {
1188        ensure!(
1189            !is_readonly_schema(&expr.schema_name),
1190            SchemaReadOnlySnafu {
1191                name: expr.schema_name.clone()
1192            }
1193        );
1194
1195        let catalog_name = if expr.catalog_name.is_empty() {
1196            DEFAULT_CATALOG_NAME.to_string()
1197        } else {
1198            expr.catalog_name.clone()
1199        };
1200
1201        let schema_name = if expr.schema_name.is_empty() {
1202            DEFAULT_SCHEMA_NAME.to_string()
1203        } else {
1204            expr.schema_name.clone()
1205        };
1206
1207        let table_name = expr.table_name.clone();
1208
1209        let table = self
1210            .catalog_manager
1211            .table(
1212                &catalog_name,
1213                &schema_name,
1214                &table_name,
1215                Some(&query_context),
1216            )
1217            .await
1218            .context(CatalogSnafu)?
1219            .with_context(|| TableNotFoundSnafu {
1220                table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
1221            })?;
1222
1223        let table_id = table.table_info().ident.table_id;
1224        let need_alter = self.verify_alter(table_id, table.table_info(), expr.clone())?;
1225        if !need_alter {
1226            return Ok(Output::new_with_affected_rows(0));
1227        }
1228        info!(
1229            "Table info before alter is {:?}, expr: {:?}",
1230            table.table_info(),
1231            expr
1232        );
1233
1234        let physical_table_id = self
1235            .table_metadata_manager
1236            .table_route_manager()
1237            .get_physical_table_id(table_id)
1238            .await
1239            .context(TableMetadataManagerSnafu)?;
1240
1241        let (req, invalidate_keys) = if physical_table_id == table_id {
1242            // This is physical table
1243            let req = SubmitDdlTaskRequest {
1244                query_context,
1245                task: DdlTask::new_alter_table(expr),
1246            };
1247
1248            let invalidate_keys = vec![
1249                CacheIdent::TableId(table_id),
1250                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1251            ];
1252
1253            (req, invalidate_keys)
1254        } else {
1255            // This is logical table
1256            let req = SubmitDdlTaskRequest {
1257                query_context,
1258                task: DdlTask::new_alter_logical_tables(vec![expr]),
1259            };
1260
1261            let mut invalidate_keys = vec![
1262                CacheIdent::TableId(physical_table_id),
1263                CacheIdent::TableId(table_id),
1264                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1265            ];
1266
1267            let physical_table = self
1268                .table_metadata_manager
1269                .table_info_manager()
1270                .get(physical_table_id)
1271                .await
1272                .context(TableMetadataManagerSnafu)?
1273                .map(|x| x.into_inner());
1274            if let Some(physical_table) = physical_table {
1275                let physical_table_name = TableName::new(
1276                    physical_table.table_info.catalog_name,
1277                    physical_table.table_info.schema_name,
1278                    physical_table.table_info.name,
1279                );
1280                invalidate_keys.push(CacheIdent::TableName(physical_table_name));
1281            }
1282
1283            (req, invalidate_keys)
1284        };
1285
1286        self.procedure_executor
1287            .submit_ddl_task(&ExecutorContext::default(), req)
1288            .await
1289            .context(error::ExecuteDdlSnafu)?;
1290
1291        // Invalidates local cache ASAP.
1292        self.cache_invalidator
1293            .invalidate(&Context::default(), &invalidate_keys)
1294            .await
1295            .context(error::InvalidateTableCacheSnafu)?;
1296
1297        Ok(Output::new_with_affected_rows(0))
1298    }
1299
1300    #[tracing::instrument(skip_all)]
1301    pub async fn alter_database(
1302        &self,
1303        alter_expr: AlterDatabase,
1304        query_context: QueryContextRef,
1305    ) -> Result<Output> {
1306        let alter_expr = expr_helper::to_alter_database_expr(alter_expr, &query_context)?;
1307        self.alter_database_inner(alter_expr, query_context).await
1308    }
1309
1310    #[tracing::instrument(skip_all)]
1311    pub async fn alter_database_inner(
1312        &self,
1313        alter_expr: AlterDatabaseExpr,
1314        query_context: QueryContextRef,
1315    ) -> Result<Output> {
1316        ensure!(
1317            !is_readonly_schema(&alter_expr.schema_name),
1318            SchemaReadOnlySnafu {
1319                name: query_context.current_schema().clone()
1320            }
1321        );
1322
1323        let exists = self
1324            .catalog_manager
1325            .schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
1326            .await
1327            .context(CatalogSnafu)?;
1328        ensure!(
1329            exists,
1330            SchemaNotFoundSnafu {
1331                schema_info: alter_expr.schema_name,
1332            }
1333        );
1334
1335        let cache_ident = [CacheIdent::SchemaName(SchemaName {
1336            catalog_name: alter_expr.catalog_name.clone(),
1337            schema_name: alter_expr.schema_name.clone(),
1338        })];
1339
1340        self.alter_database_procedure(alter_expr, query_context)
1341            .await?;
1342
1343        // Invalidates local cache ASAP.
1344        self.cache_invalidator
1345            .invalidate(&Context::default(), &cache_ident)
1346            .await
1347            .context(error::InvalidateTableCacheSnafu)?;
1348
1349        Ok(Output::new_with_affected_rows(0))
1350    }
1351
1352    async fn create_table_procedure(
1353        &self,
1354        create_table: CreateTableExpr,
1355        partitions: Vec<Partition>,
1356        table_info: RawTableInfo,
1357        query_context: QueryContextRef,
1358    ) -> Result<SubmitDdlTaskResponse> {
1359        let partitions = partitions.into_iter().map(Into::into).collect();
1360
1361        let request = SubmitDdlTaskRequest {
1362            query_context,
1363            task: DdlTask::new_create_table(create_table, partitions, table_info),
1364        };
1365
1366        self.procedure_executor
1367            .submit_ddl_task(&ExecutorContext::default(), request)
1368            .await
1369            .context(error::ExecuteDdlSnafu)
1370    }
1371
1372    async fn create_logical_tables_procedure(
1373        &self,
1374        tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
1375        query_context: QueryContextRef,
1376    ) -> Result<SubmitDdlTaskResponse> {
1377        let request = SubmitDdlTaskRequest {
1378            query_context,
1379            task: DdlTask::new_create_logical_tables(tables_data),
1380        };
1381
1382        self.procedure_executor
1383            .submit_ddl_task(&ExecutorContext::default(), request)
1384            .await
1385            .context(error::ExecuteDdlSnafu)
1386    }
1387
1388    async fn alter_logical_tables_procedure(
1389        &self,
1390        tables_data: Vec<AlterTableExpr>,
1391        query_context: QueryContextRef,
1392    ) -> Result<SubmitDdlTaskResponse> {
1393        let request = SubmitDdlTaskRequest {
1394            query_context,
1395            task: DdlTask::new_alter_logical_tables(tables_data),
1396        };
1397
1398        self.procedure_executor
1399            .submit_ddl_task(&ExecutorContext::default(), request)
1400            .await
1401            .context(error::ExecuteDdlSnafu)
1402    }
1403
1404    async fn drop_table_procedure(
1405        &self,
1406        table_name: &TableName,
1407        table_id: TableId,
1408        drop_if_exists: bool,
1409        query_context: QueryContextRef,
1410    ) -> Result<SubmitDdlTaskResponse> {
1411        let request = SubmitDdlTaskRequest {
1412            query_context,
1413            task: DdlTask::new_drop_table(
1414                table_name.catalog_name.to_string(),
1415                table_name.schema_name.to_string(),
1416                table_name.table_name.to_string(),
1417                table_id,
1418                drop_if_exists,
1419            ),
1420        };
1421
1422        self.procedure_executor
1423            .submit_ddl_task(&ExecutorContext::default(), request)
1424            .await
1425            .context(error::ExecuteDdlSnafu)
1426    }
1427
1428    async fn drop_database_procedure(
1429        &self,
1430        catalog: String,
1431        schema: String,
1432        drop_if_exists: bool,
1433        query_context: QueryContextRef,
1434    ) -> Result<SubmitDdlTaskResponse> {
1435        let request = SubmitDdlTaskRequest {
1436            query_context,
1437            task: DdlTask::new_drop_database(catalog, schema, drop_if_exists),
1438        };
1439
1440        self.procedure_executor
1441            .submit_ddl_task(&ExecutorContext::default(), request)
1442            .await
1443            .context(error::ExecuteDdlSnafu)
1444    }
1445
1446    async fn alter_database_procedure(
1447        &self,
1448        alter_expr: AlterDatabaseExpr,
1449        query_context: QueryContextRef,
1450    ) -> Result<SubmitDdlTaskResponse> {
1451        let request = SubmitDdlTaskRequest {
1452            query_context,
1453            task: DdlTask::new_alter_database(alter_expr),
1454        };
1455
1456        self.procedure_executor
1457            .submit_ddl_task(&ExecutorContext::default(), request)
1458            .await
1459            .context(error::ExecuteDdlSnafu)
1460    }
1461
1462    async fn truncate_table_procedure(
1463        &self,
1464        table_name: &TableName,
1465        table_id: TableId,
1466        query_context: QueryContextRef,
1467    ) -> Result<SubmitDdlTaskResponse> {
1468        let request = SubmitDdlTaskRequest {
1469            query_context,
1470            task: DdlTask::new_truncate_table(
1471                table_name.catalog_name.to_string(),
1472                table_name.schema_name.to_string(),
1473                table_name.table_name.to_string(),
1474                table_id,
1475            ),
1476        };
1477
1478        self.procedure_executor
1479            .submit_ddl_task(&ExecutorContext::default(), request)
1480            .await
1481            .context(error::ExecuteDdlSnafu)
1482    }
1483
1484    #[tracing::instrument(skip_all)]
1485    pub async fn create_database(
1486        &self,
1487        database: &str,
1488        create_if_not_exists: bool,
1489        options: HashMap<String, String>,
1490        query_context: QueryContextRef,
1491    ) -> Result<Output> {
1492        let catalog = query_context.current_catalog();
1493        ensure!(
1494            NAME_PATTERN_REG.is_match(catalog),
1495            error::UnexpectedSnafu {
1496                violated: format!("Invalid catalog name: {}", catalog)
1497            }
1498        );
1499
1500        ensure!(
1501            NAME_PATTERN_REG.is_match(database),
1502            error::UnexpectedSnafu {
1503                violated: format!("Invalid database name: {}", database)
1504            }
1505        );
1506
1507        if !self
1508            .catalog_manager
1509            .schema_exists(catalog, database, None)
1510            .await
1511            .context(CatalogSnafu)?
1512            && !self.catalog_manager.is_reserved_schema_name(database)
1513        {
1514            self.create_database_procedure(
1515                catalog.to_string(),
1516                database.to_string(),
1517                create_if_not_exists,
1518                options,
1519                query_context,
1520            )
1521            .await?;
1522
1523            Ok(Output::new_with_affected_rows(1))
1524        } else if create_if_not_exists {
1525            Ok(Output::new_with_affected_rows(1))
1526        } else {
1527            error::SchemaExistsSnafu { name: database }.fail()
1528        }
1529    }
1530
1531    async fn create_database_procedure(
1532        &self,
1533        catalog: String,
1534        database: String,
1535        create_if_not_exists: bool,
1536        options: HashMap<String, String>,
1537        query_context: QueryContextRef,
1538    ) -> Result<SubmitDdlTaskResponse> {
1539        let request = SubmitDdlTaskRequest {
1540            query_context,
1541            task: DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
1542        };
1543
1544        self.procedure_executor
1545            .submit_ddl_task(&ExecutorContext::default(), request)
1546            .await
1547            .context(error::ExecuteDdlSnafu)
1548    }
1549}
1550
1551/// Parse partition statement [Partitions] into [MetaPartition] and partition columns.
1552fn parse_partitions(
1553    create_table: &CreateTableExpr,
1554    partitions: Option<Partitions>,
1555    query_ctx: &QueryContextRef,
1556) -> Result<(Vec<MetaPartition>, Vec<String>)> {
1557    // If partitions are not defined by user, use the timestamp column (which has to be existed) as
1558    // the partition column, and create only one partition.
1559    let partition_columns = find_partition_columns(&partitions)?;
1560    let partition_entries =
1561        find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
1562
1563    // Validates partition
1564    let mut exprs = vec![];
1565    for partition in &partition_entries {
1566        for bound in partition {
1567            if let PartitionBound::Expr(expr) = bound {
1568                exprs.push(expr.clone());
1569            }
1570        }
1571    }
1572    MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs)
1573        .context(InvalidPartitionSnafu)?;
1574
1575    Ok((
1576        partition_entries
1577            .into_iter()
1578            .map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x)))
1579            .collect::<std::result::Result<_, _>>()
1580            .context(DeserializePartitionSnafu)?,
1581        partition_columns,
1582    ))
1583}
1584
1585fn create_table_info(
1586    create_table: &CreateTableExpr,
1587    partition_columns: Vec<String>,
1588) -> Result<RawTableInfo> {
1589    let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
1590    let mut column_name_to_index_map = HashMap::new();
1591
1592    for (idx, column) in create_table.column_defs.iter().enumerate() {
1593        let schema =
1594            column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
1595                column: &column.name,
1596            })?;
1597        let schema = schema.with_time_index(column.name == create_table.time_index);
1598
1599        column_schemas.push(schema);
1600        let _ = column_name_to_index_map.insert(column.name.clone(), idx);
1601    }
1602
1603    let timestamp_index = column_name_to_index_map
1604        .get(&create_table.time_index)
1605        .cloned();
1606
1607    let raw_schema = RawSchema {
1608        column_schemas: column_schemas.clone(),
1609        timestamp_index,
1610        version: 0,
1611    };
1612
1613    let primary_key_indices = create_table
1614        .primary_keys
1615        .iter()
1616        .map(|name| {
1617            column_name_to_index_map
1618                .get(name)
1619                .cloned()
1620                .context(ColumnNotFoundSnafu { msg: name })
1621        })
1622        .collect::<Result<Vec<_>>>()?;
1623
1624    let partition_key_indices = partition_columns
1625        .into_iter()
1626        .map(|col_name| {
1627            column_name_to_index_map
1628                .get(&col_name)
1629                .cloned()
1630                .context(ColumnNotFoundSnafu { msg: col_name })
1631        })
1632        .collect::<Result<Vec<_>>>()?;
1633
1634    let table_options = TableOptions::try_from_iter(&create_table.table_options)
1635        .context(UnrecognizedTableOptionSnafu)?;
1636
1637    let meta = RawTableMeta {
1638        schema: raw_schema,
1639        primary_key_indices,
1640        value_indices: vec![],
1641        engine: create_table.engine.clone(),
1642        next_column_id: column_schemas.len() as u32,
1643        region_numbers: vec![],
1644        options: table_options,
1645        created_on: Utc::now(),
1646        partition_key_indices,
1647    };
1648
1649    let desc = if create_table.desc.is_empty() {
1650        create_table.table_options.get(COMMENT_KEY).cloned()
1651    } else {
1652        Some(create_table.desc.clone())
1653    };
1654
1655    let table_info = RawTableInfo {
1656        ident: metadata::TableIdent {
1657            // The table id of distributed table is assigned by Meta, set "0" here as a placeholder.
1658            table_id: 0,
1659            version: 0,
1660        },
1661        name: create_table.table_name.clone(),
1662        desc,
1663        catalog_name: create_table.catalog_name.clone(),
1664        schema_name: create_table.schema_name.clone(),
1665        meta,
1666        table_type: TableType::Base,
1667    };
1668    Ok(table_info)
1669}
1670
1671fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
1672    let columns = if let Some(partitions) = partitions {
1673        partitions
1674            .column_list
1675            .iter()
1676            .map(|x| x.value.clone())
1677            .collect::<Vec<_>>()
1678    } else {
1679        vec![]
1680    };
1681    Ok(columns)
1682}
1683
1684/// Parse [Partitions] into a group of partition entries.
1685///
1686/// Returns a list of [PartitionBound], each of which defines a partition.
1687fn find_partition_entries(
1688    create_table: &CreateTableExpr,
1689    partitions: &Option<Partitions>,
1690    partition_columns: &[String],
1691    query_ctx: &QueryContextRef,
1692) -> Result<Vec<Vec<PartitionBound>>> {
1693    let entries = if let Some(partitions) = partitions {
1694        // extract concrete data type of partition columns
1695        let column_defs = partition_columns
1696            .iter()
1697            .map(|pc| {
1698                create_table
1699                    .column_defs
1700                    .iter()
1701                    .find(|c| &c.name == pc)
1702                    // unwrap is safe here because we have checked that partition columns are defined
1703                    .unwrap()
1704            })
1705            .collect::<Vec<_>>();
1706        let mut column_name_and_type = HashMap::with_capacity(column_defs.len());
1707        for column in column_defs {
1708            let column_name = &column.name;
1709            let data_type = ConcreteDataType::from(
1710                ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension)
1711                    .context(ColumnDataTypeSnafu)?,
1712            );
1713            column_name_and_type.insert(column_name, data_type);
1714        }
1715
1716        // Transform parser expr to partition expr
1717        let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
1718        for partition in &partitions.exprs {
1719            let partition_expr =
1720                convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
1721            partition_exprs.push(vec![PartitionBound::Expr(partition_expr)]);
1722        }
1723
1724        // fallback for no expr
1725        if partition_exprs.is_empty() {
1726            partition_exprs.push(vec![PartitionBound::MaxValue]);
1727        }
1728
1729        partition_exprs
1730    } else {
1731        vec![vec![PartitionBound::MaxValue]]
1732    };
1733    Ok(entries)
1734}
1735
1736fn convert_one_expr(
1737    expr: &Expr,
1738    column_name_and_type: &HashMap<&String, ConcreteDataType>,
1739    timezone: &Timezone,
1740) -> Result<PartitionExpr> {
1741    let Expr::BinaryOp { left, op, right } = expr else {
1742        return InvalidPartitionRuleSnafu {
1743            reason: "partition rule must be a binary expression",
1744        }
1745        .fail();
1746    };
1747
1748    let op =
1749        RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
1750            reason: format!("unsupported operator in partition expr {op}"),
1751        })?;
1752
1753    // convert leaf node.
1754    let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
1755        // col, val
1756        (Expr::Identifier(ident), Expr::Value(value)) => {
1757            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1758            let value = convert_value(value, data_type, timezone, None)?;
1759            (Operand::Column(column_name), op, Operand::Value(value))
1760        }
1761        (Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
1762            if let Expr::Value(v) = &**expr =>
1763        {
1764            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1765            let value = convert_value(v, data_type, timezone, Some(*unary_op))?;
1766            (Operand::Column(column_name), op, Operand::Value(value))
1767        }
1768        // val, col
1769        (Expr::Value(value), Expr::Identifier(ident)) => {
1770            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1771            let value = convert_value(value, data_type, timezone, None)?;
1772            (Operand::Value(value), op, Operand::Column(column_name))
1773        }
1774        (Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
1775            if let Expr::Value(v) = &**expr =>
1776        {
1777            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1778            let value = convert_value(v, data_type, timezone, Some(*unary_op))?;
1779            (Operand::Value(value), op, Operand::Column(column_name))
1780        }
1781        (Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
1782            // sub-expr must against another sub-expr
1783            let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
1784            let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
1785            (Operand::Expr(lhs), op, Operand::Expr(rhs))
1786        }
1787        _ => {
1788            return InvalidPartitionRuleSnafu {
1789                reason: format!("invalid partition expr {expr}"),
1790            }
1791            .fail();
1792        }
1793    };
1794
1795    Ok(PartitionExpr::new(lhs, op, rhs))
1796}
1797
1798fn convert_identifier(
1799    ident: &Ident,
1800    column_name_and_type: &HashMap<&String, ConcreteDataType>,
1801) -> Result<(String, ConcreteDataType)> {
1802    let column_name = ident.value.clone();
1803    let data_type = column_name_and_type
1804        .get(&column_name)
1805        .cloned()
1806        .with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
1807    Ok((column_name, data_type))
1808}
1809
1810fn convert_value(
1811    value: &ParserValue,
1812    data_type: ConcreteDataType,
1813    timezone: &Timezone,
1814    unary_op: Option<UnaryOperator>,
1815) -> Result<Value> {
1816    sql_value_to_value(
1817        "<NONAME>",
1818        &data_type,
1819        value,
1820        Some(timezone),
1821        unary_op,
1822        false,
1823    )
1824    .context(ParseSqlValueSnafu)
1825}
1826
1827#[cfg(test)]
1828mod test {
1829    use session::context::{QueryContext, QueryContextBuilder};
1830    use sql::dialect::GreptimeDbDialect;
1831    use sql::parser::{ParseOptions, ParserContext};
1832    use sql::statements::statement::Statement;
1833
1834    use super::*;
1835    use crate::expr_helper;
1836
1837    #[test]
1838    fn test_name_is_match() {
1839        assert!(!NAME_PATTERN_REG.is_match("/adaf"));
1840        assert!(!NAME_PATTERN_REG.is_match("🈲"));
1841        assert!(NAME_PATTERN_REG.is_match("hello"));
1842        assert!(NAME_PATTERN_REG.is_match("test@"));
1843        assert!(!NAME_PATTERN_REG.is_match("@test"));
1844        assert!(NAME_PATTERN_REG.is_match("test#"));
1845        assert!(!NAME_PATTERN_REG.is_match("#test"));
1846        assert!(!NAME_PATTERN_REG.is_match("@"));
1847        assert!(!NAME_PATTERN_REG.is_match("#"));
1848    }
1849
1850    #[tokio::test]
1851    #[ignore = "TODO(ruihang): WIP new partition rule"]
1852    async fn test_parse_partitions() {
1853        common_telemetry::init_default_ut_logging();
1854        let cases = [
1855            (
1856                r"
1857CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
1858PARTITION ON COLUMNS (b) (
1859  b < 'hz',
1860  b >= 'hz' AND b < 'sh',
1861  b >= 'sh'
1862)
1863ENGINE=mito",
1864                r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
1865            ),
1866            (
1867                r"
1868CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
1869PARTITION BY RANGE COLUMNS (b, a) (
1870  PARTITION r0 VALUES LESS THAN ('hz', 10),
1871  b < 'hz' AND a < 10,
1872  b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20,
1873  b >= 'sh' AND a >= 20
1874)
1875ENGINE=mito",
1876                r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
1877            ),
1878        ];
1879        let ctx = QueryContextBuilder::default().build().into();
1880        for (sql, expected) in cases {
1881            let result = ParserContext::create_with_dialect(
1882                sql,
1883                &GreptimeDbDialect {},
1884                ParseOptions::default(),
1885            )
1886            .unwrap();
1887            match &result[0] {
1888                Statement::CreateTable(c) => {
1889                    let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
1890                    let (partitions, _) =
1891                        parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
1892                    let json = serde_json::to_string(&partitions).unwrap();
1893                    assert_eq!(json, expected);
1894                }
1895                _ => unreachable!(),
1896            }
1897        }
1898    }
1899}