operator/statement/
ddl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use api::helper::ColumnDataTypeWrapper;
19use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
20use api::v1::{
21    column_def, AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
22};
23#[cfg(feature = "enterprise")]
24use api::v1::{
25    meta::CreateTriggerTask as PbCreateTriggerTask, CreateTriggerExpr as PbCreateTriggerExpr,
26};
27use catalog::CatalogManagerRef;
28use chrono::Utc;
29use common_catalog::consts::{is_readonly_schema, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
30use common_catalog::{format_full_flow_name, format_full_table_name};
31use common_error::ext::BoxedError;
32use common_meta::cache_invalidator::Context;
33use common_meta::ddl::create_flow::FlowType;
34use common_meta::instruction::CacheIdent;
35use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
36use common_meta::key::NAME_PATTERN;
37use common_meta::procedure_executor::ExecutorContext;
38#[cfg(feature = "enterprise")]
39use common_meta::rpc::ddl::trigger::CreateTriggerTask;
40#[cfg(feature = "enterprise")]
41use common_meta::rpc::ddl::trigger::DropTriggerTask;
42use common_meta::rpc::ddl::{
43    CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
44    SubmitDdlTaskResponse,
45};
46use common_query::Output;
47use common_sql::convert::sql_value_to_value;
48use common_telemetry::{debug, info, tracing, warn};
49use common_time::{Timestamp, Timezone};
50use datafusion_common::tree_node::TreeNodeVisitor;
51use datafusion_expr::LogicalPlan;
52use datatypes::prelude::ConcreteDataType;
53use datatypes::schema::{RawSchema, Schema};
54use datatypes::value::Value;
55use lazy_static::lazy_static;
56use partition::expr::{Operand, PartitionExpr, RestrictedOp};
57use partition::multi_dim::MultiDimPartitionRule;
58use query::parser::QueryStatement;
59use query::plan::extract_and_rewrite_full_table_names;
60use query::query_engine::DefaultSerializer;
61use query::sql::create_table_stmt;
62use regex::Regex;
63use session::context::QueryContextRef;
64use session::table_name::table_idents_to_full_name;
65use snafu::{ensure, OptionExt, ResultExt};
66use sql::parser::{ParseOptions, ParserContext};
67#[cfg(feature = "enterprise")]
68use sql::statements::alter::trigger::AlterTrigger;
69use sql::statements::alter::{AlterDatabase, AlterTable};
70#[cfg(feature = "enterprise")]
71use sql::statements::create::trigger::CreateTrigger;
72use sql::statements::create::{
73    CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
74};
75use sql::statements::statement::Statement;
76use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
77use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
78use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
79use table::dist_table::DistTable;
80use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
81use table::requests::{AlterKind, AlterTableRequest, TableOptions, COMMENT_KEY};
82use table::table_name::TableName;
83use table::TableRef;
84
85use crate::error::{
86    self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
87    ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
88    EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu,
89    InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu, InvalidTableNameSnafu,
90    InvalidViewNameSnafu, InvalidViewStmtSnafu, PartitionExprToPbSnafu, Result, SchemaInUseSnafu,
91    SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu,
92    TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
93    ViewAlreadyExistsSnafu,
94};
95use crate::expr_helper;
96use crate::statement::show::create_partitions_stmt;
97use crate::statement::StatementExecutor;
98
99lazy_static! {
100    pub static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap();
101}
102
103impl StatementExecutor {
104    pub fn catalog_manager(&self) -> CatalogManagerRef {
105        self.catalog_manager.clone()
106    }
107
108    #[tracing::instrument(skip_all)]
109    pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
110        let (catalog, schema, _table) = table_idents_to_full_name(&stmt.name, &ctx)
111            .map_err(BoxedError::new)
112            .context(error::ExternalSnafu)?;
113
114        let schema_options = self
115            .table_metadata_manager
116            .schema_manager()
117            .get(SchemaNameKey {
118                catalog: &catalog,
119                schema: &schema,
120            })
121            .await
122            .context(TableMetadataManagerSnafu)?
123            .map(|v| v.into_inner());
124
125        let create_expr = &mut expr_helper::create_to_expr(&stmt, &ctx)?;
126        // We don't put ttl into the table options
127        // Because it will be used directly while compaction.
128        if let Some(schema_options) = schema_options {
129            for (key, value) in schema_options.extra_options.iter() {
130                create_expr
131                    .table_options
132                    .entry(key.clone())
133                    .or_insert(value.clone());
134            }
135        }
136
137        self.create_table_inner(create_expr, stmt.partitions, ctx)
138            .await
139    }
140
141    #[tracing::instrument(skip_all)]
142    pub async fn create_table_like(
143        &self,
144        stmt: CreateTableLike,
145        ctx: QueryContextRef,
146    ) -> Result<TableRef> {
147        let (catalog, schema, table) = table_idents_to_full_name(&stmt.source_name, &ctx)
148            .map_err(BoxedError::new)
149            .context(error::ExternalSnafu)?;
150        let table_ref = self
151            .catalog_manager
152            .table(&catalog, &schema, &table, Some(&ctx))
153            .await
154            .context(CatalogSnafu)?
155            .context(TableNotFoundSnafu { table_name: &table })?;
156        let partitions = self
157            .partition_manager
158            .find_table_partitions(table_ref.table_info().table_id())
159            .await
160            .context(error::FindTablePartitionRuleSnafu { table_name: table })?;
161
162        // CREATE TABLE LIKE also inherits database level options.
163        let schema_options = self
164            .table_metadata_manager
165            .schema_manager()
166            .get(SchemaNameKey {
167                catalog: &catalog,
168                schema: &schema,
169            })
170            .await
171            .context(TableMetadataManagerSnafu)?
172            .map(|v| v.into_inner());
173
174        let quote_style = ctx.quote_style();
175        let mut create_stmt =
176            create_table_stmt(&table_ref.table_info(), schema_options, quote_style)
177                .context(error::ParseQuerySnafu)?;
178        create_stmt.name = stmt.table_name;
179        create_stmt.if_not_exists = false;
180
181        let table_info = table_ref.table_info();
182        let partitions =
183            create_partitions_stmt(&table_info, partitions)?.and_then(|mut partitions| {
184                if !partitions.column_list.is_empty() {
185                    partitions.set_quote(quote_style);
186                    Some(partitions)
187                } else {
188                    None
189                }
190            });
191
192        let create_expr = &mut expr_helper::create_to_expr(&create_stmt, &ctx)?;
193        self.create_table_inner(create_expr, partitions, ctx).await
194    }
195
196    #[tracing::instrument(skip_all)]
197    pub async fn create_external_table(
198        &self,
199        create_expr: CreateExternalTable,
200        ctx: QueryContextRef,
201    ) -> Result<TableRef> {
202        let create_expr = &mut expr_helper::create_external_expr(create_expr, &ctx).await?;
203        self.create_table_inner(create_expr, None, ctx).await
204    }
205
206    #[tracing::instrument(skip_all)]
207    pub async fn create_table_inner(
208        &self,
209        create_table: &mut CreateTableExpr,
210        partitions: Option<Partitions>,
211        query_ctx: QueryContextRef,
212    ) -> Result<TableRef> {
213        ensure!(
214            !is_readonly_schema(&create_table.schema_name),
215            SchemaReadOnlySnafu {
216                name: create_table.schema_name.clone()
217            }
218        );
219
220        if create_table.engine == METRIC_ENGINE_NAME
221            && create_table
222                .table_options
223                .contains_key(LOGICAL_TABLE_METADATA_KEY)
224        {
225            // Create logical tables
226            ensure!(
227                partitions.is_none(),
228                InvalidPartitionRuleSnafu {
229                    reason: "logical table in metric engine should not have partition rule, it will be inherited from physical table",
230                }
231            );
232            self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
233                .await?
234                .into_iter()
235                .next()
236                .context(error::UnexpectedSnafu {
237                    violated: "expected to create logical tables",
238                })
239        } else {
240            // Create other normal table
241            self.create_non_logic_table(create_table, partitions, query_ctx)
242                .await
243        }
244    }
245
246    #[tracing::instrument(skip_all)]
247    pub async fn create_non_logic_table(
248        &self,
249        create_table: &mut CreateTableExpr,
250        partitions: Option<Partitions>,
251        query_ctx: QueryContextRef,
252    ) -> Result<TableRef> {
253        let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
254
255        // Check if schema exists
256        let schema = self
257            .table_metadata_manager
258            .schema_manager()
259            .get(SchemaNameKey::new(
260                &create_table.catalog_name,
261                &create_table.schema_name,
262            ))
263            .await
264            .context(TableMetadataManagerSnafu)?;
265        ensure!(
266            schema.is_some(),
267            SchemaNotFoundSnafu {
268                schema_info: &create_table.schema_name,
269            }
270        );
271
272        // if table exists.
273        if let Some(table) = self
274            .catalog_manager
275            .table(
276                &create_table.catalog_name,
277                &create_table.schema_name,
278                &create_table.table_name,
279                Some(&query_ctx),
280            )
281            .await
282            .context(CatalogSnafu)?
283        {
284            return if create_table.create_if_not_exists {
285                Ok(table)
286            } else {
287                TableAlreadyExistsSnafu {
288                    table: format_full_table_name(
289                        &create_table.catalog_name,
290                        &create_table.schema_name,
291                        &create_table.table_name,
292                    ),
293                }
294                .fail()
295            };
296        }
297
298        ensure!(
299            NAME_PATTERN_REG.is_match(&create_table.table_name),
300            InvalidTableNameSnafu {
301                table_name: &create_table.table_name,
302            }
303        );
304
305        let table_name = TableName::new(
306            &create_table.catalog_name,
307            &create_table.schema_name,
308            &create_table.table_name,
309        );
310
311        let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
312        let mut table_info = create_table_info(create_table, partition_cols)?;
313
314        let resp = self
315            .create_table_procedure(
316                create_table.clone(),
317                partitions,
318                table_info.clone(),
319                query_ctx,
320            )
321            .await?;
322
323        let table_id = resp
324            .table_ids
325            .into_iter()
326            .next()
327            .context(error::UnexpectedSnafu {
328                violated: "expected table_id",
329            })?;
330        info!("Successfully created table '{table_name}' with table id {table_id}");
331
332        table_info.ident.table_id = table_id;
333
334        let table_info: Arc<TableInfo> =
335            Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?);
336        create_table.table_id = Some(api::v1::TableId { id: table_id });
337
338        let table = DistTable::table(table_info);
339
340        Ok(table)
341    }
342
343    #[tracing::instrument(skip_all)]
344    pub async fn create_logical_tables(
345        &self,
346        create_table_exprs: &[CreateTableExpr],
347        query_context: QueryContextRef,
348    ) -> Result<Vec<TableRef>> {
349        let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
350        ensure!(
351            !create_table_exprs.is_empty(),
352            EmptyDdlExprSnafu {
353                name: "create logic tables"
354            }
355        );
356
357        // Check table names
358        for create_table in create_table_exprs {
359            ensure!(
360                NAME_PATTERN_REG.is_match(&create_table.table_name),
361                InvalidTableNameSnafu {
362                    table_name: &create_table.table_name,
363                }
364            );
365        }
366
367        let mut raw_tables_info = create_table_exprs
368            .iter()
369            .map(|create| create_table_info(create, vec![]))
370            .collect::<Result<Vec<_>>>()?;
371        let tables_data = create_table_exprs
372            .iter()
373            .cloned()
374            .zip(raw_tables_info.iter().cloned())
375            .collect::<Vec<_>>();
376
377        let resp = self
378            .create_logical_tables_procedure(tables_data, query_context)
379            .await?;
380
381        let table_ids = resp.table_ids;
382        ensure!(table_ids.len() == raw_tables_info.len(), CreateLogicalTablesSnafu {
383            reason: format!("The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}", raw_tables_info.len(), table_ids.len())
384        });
385        info!("Successfully created logical tables: {:?}", table_ids);
386
387        for (i, table_info) in raw_tables_info.iter_mut().enumerate() {
388            table_info.ident.table_id = table_ids[i];
389        }
390        let tables_info = raw_tables_info
391            .into_iter()
392            .map(|x| x.try_into().context(CreateTableInfoSnafu))
393            .collect::<Result<Vec<_>>>()?;
394
395        Ok(tables_info
396            .into_iter()
397            .map(|x| DistTable::table(Arc::new(x)))
398            .collect())
399    }
400
401    #[cfg(feature = "enterprise")]
402    #[tracing::instrument(skip_all)]
403    pub async fn create_trigger(
404        &self,
405        stmt: CreateTrigger,
406        query_context: QueryContextRef,
407    ) -> Result<Output> {
408        let expr = expr_helper::to_create_trigger_task_expr(stmt, &query_context)?;
409        self.create_trigger_inner(expr, query_context).await
410    }
411
412    #[cfg(feature = "enterprise")]
413    pub async fn create_trigger_inner(
414        &self,
415        expr: PbCreateTriggerExpr,
416        query_context: QueryContextRef,
417    ) -> Result<Output> {
418        self.create_trigger_procedure(expr, query_context).await?;
419        Ok(Output::new_with_affected_rows(0))
420    }
421
422    #[cfg(feature = "enterprise")]
423    async fn create_trigger_procedure(
424        &self,
425        expr: PbCreateTriggerExpr,
426        query_context: QueryContextRef,
427    ) -> Result<SubmitDdlTaskResponse> {
428        let task = CreateTriggerTask::try_from(PbCreateTriggerTask {
429            create_trigger: Some(expr),
430        })
431        .context(error::InvalidExprSnafu)?;
432
433        let request = SubmitDdlTaskRequest {
434            query_context,
435            task: DdlTask::new_create_trigger(task),
436        };
437
438        self.procedure_executor
439            .submit_ddl_task(&ExecutorContext::default(), request)
440            .await
441            .context(error::ExecuteDdlSnafu)
442    }
443
444    #[tracing::instrument(skip_all)]
445    pub async fn create_flow(
446        &self,
447        stmt: CreateFlow,
448        query_context: QueryContextRef,
449    ) -> Result<Output> {
450        // TODO(ruihang): do some verification
451        let expr = expr_helper::to_create_flow_task_expr(stmt, &query_context)?;
452
453        self.create_flow_inner(expr, query_context).await
454    }
455
456    pub async fn create_flow_inner(
457        &self,
458        expr: CreateFlowExpr,
459        query_context: QueryContextRef,
460    ) -> Result<Output> {
461        self.create_flow_procedure(expr, query_context).await?;
462        Ok(Output::new_with_affected_rows(0))
463    }
464
465    async fn create_flow_procedure(
466        &self,
467        expr: CreateFlowExpr,
468        query_context: QueryContextRef,
469    ) -> Result<SubmitDdlTaskResponse> {
470        let flow_type = self
471            .determine_flow_type(&expr, query_context.clone())
472            .await?;
473        info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
474
475        let expr = {
476            let mut expr = expr;
477            expr.flow_options
478                .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
479            expr
480        };
481
482        let task = CreateFlowTask::try_from(PbCreateFlowTask {
483            create_flow: Some(expr),
484        })
485        .context(error::InvalidExprSnafu)?;
486        let request = SubmitDdlTaskRequest {
487            query_context,
488            task: DdlTask::new_create_flow(task),
489        };
490
491        self.procedure_executor
492            .submit_ddl_task(&ExecutorContext::default(), request)
493            .await
494            .context(error::ExecuteDdlSnafu)
495    }
496
497    /// Determine the flow type based on the SQL query
498    ///
499    /// If it contains aggregation or distinct, then it is a batch flow, otherwise it is a streaming flow
500    async fn determine_flow_type(
501        &self,
502        expr: &CreateFlowExpr,
503        query_ctx: QueryContextRef,
504    ) -> Result<FlowType> {
505        // first check if source table's ttl is instant, if it is, force streaming mode
506        for src_table_name in &expr.source_table_names {
507            let table = self
508                .catalog_manager()
509                .table(
510                    &src_table_name.catalog_name,
511                    &src_table_name.schema_name,
512                    &src_table_name.table_name,
513                    Some(&query_ctx),
514                )
515                .await
516                .map_err(BoxedError::new)
517                .context(ExternalSnafu)?
518                .with_context(|| TableNotFoundSnafu {
519                    table_name: format_full_table_name(
520                        &src_table_name.catalog_name,
521                        &src_table_name.schema_name,
522                        &src_table_name.table_name,
523                    ),
524                })?;
525
526            // instant source table can only be handled by streaming mode
527            if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
528                warn!(
529                    "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
530                    format_full_table_name(
531                        &src_table_name.catalog_name,
532                        &src_table_name.schema_name,
533                        &src_table_name.table_name
534                    ),
535                    expr.flow_name
536                );
537                return Ok(FlowType::Streaming);
538            }
539        }
540
541        let engine = &self.query_engine;
542        let stmts = ParserContext::create_with_dialect(
543            &expr.sql,
544            query_ctx.sql_dialect(),
545            ParseOptions::default(),
546        )
547        .map_err(BoxedError::new)
548        .context(ExternalSnafu)?;
549
550        ensure!(
551            stmts.len() == 1,
552            InvalidSqlSnafu {
553                err_msg: format!("Expect only one statement, found {}", stmts.len())
554            }
555        );
556        let stmt = &stmts[0];
557
558        // support tql parse too
559        let plan = match stmt {
560            // prom ql is only supported in batching mode
561            Statement::Tql(_) => return Ok(FlowType::Batching),
562            _ => engine
563                .planner()
564                .plan(&QueryStatement::Sql(stmt.clone()), query_ctx)
565                .await
566                .map_err(BoxedError::new)
567                .context(ExternalSnafu)?,
568        };
569
570        /// Visitor to find aggregation or distinct
571        struct FindAggr {
572            is_aggr: bool,
573        }
574
575        impl TreeNodeVisitor<'_> for FindAggr {
576            type Node = LogicalPlan;
577            fn f_down(
578                &mut self,
579                node: &Self::Node,
580            ) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
581            {
582                match node {
583                    LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
584                        self.is_aggr = true;
585                        return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
586                    }
587                    _ => (),
588                }
589                Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
590            }
591        }
592
593        let mut find_aggr = FindAggr { is_aggr: false };
594
595        plan.visit_with_subqueries(&mut find_aggr)
596            .context(BuildDfLogicalPlanSnafu)?;
597        if find_aggr.is_aggr {
598            Ok(FlowType::Batching)
599        } else {
600            Ok(FlowType::Streaming)
601        }
602    }
603
604    #[tracing::instrument(skip_all)]
605    pub async fn create_view(
606        &self,
607        create_view: CreateView,
608        ctx: QueryContextRef,
609    ) -> Result<TableRef> {
610        // convert input into logical plan
611        let logical_plan = match &*create_view.query {
612            Statement::Query(query) => {
613                self.plan(
614                    &QueryStatement::Sql(Statement::Query(query.clone())),
615                    ctx.clone(),
616                )
617                .await?
618            }
619            Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?,
620            _ => {
621                return InvalidViewStmtSnafu {}.fail();
622            }
623        };
624        // Save the definition for `show create view`.
625        let definition = create_view.to_string();
626
627        // Save the columns in plan, it may changed when the schemas of tables in plan
628        // are altered.
629        let schema: Schema = logical_plan
630            .schema()
631            .clone()
632            .try_into()
633            .context(ConvertSchemaSnafu)?;
634        let plan_columns: Vec<_> = schema
635            .column_schemas()
636            .iter()
637            .map(|c| c.name.clone())
638            .collect();
639
640        let columns: Vec<_> = create_view
641            .columns
642            .iter()
643            .map(|ident| ident.to_string())
644            .collect();
645
646        // Validate columns
647        if !columns.is_empty() {
648            ensure!(
649                columns.len() == plan_columns.len(),
650                error::ViewColumnsMismatchSnafu {
651                    view_name: create_view.name.to_string(),
652                    expected: plan_columns.len(),
653                    actual: columns.len(),
654                }
655            );
656        }
657
658        // Extract the table names from the original plan
659        // and rewrite them as fully qualified names.
660        let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
661            .context(ExtractTableNamesSnafu)?;
662
663        let table_names = table_names.into_iter().map(|t| t.into()).collect();
664
665        // TODO(dennis): we don't save the optimized plan yet,
666        // because there are some serialization issue with our own defined plan node (such as `MergeScanLogicalPlan`).
667        // When the issues are fixed, we can use the `optimized_plan` instead.
668        // let optimized_plan = self.optimize_logical_plan(logical_plan)?.unwrap_df_plan();
669
670        // encode logical plan
671        let encoded_plan = DFLogicalSubstraitConvertor
672            .encode(&plan, DefaultSerializer)
673            .context(SubstraitCodecSnafu)?;
674
675        let expr = expr_helper::to_create_view_expr(
676            create_view,
677            encoded_plan.to_vec(),
678            table_names,
679            columns,
680            plan_columns,
681            definition,
682            ctx.clone(),
683        )?;
684
685        // TODO(dennis): validate the logical plan
686        self.create_view_by_expr(expr, ctx).await
687    }
688
689    pub async fn create_view_by_expr(
690        &self,
691        expr: CreateViewExpr,
692        ctx: QueryContextRef,
693    ) -> Result<TableRef> {
694        ensure! {
695            !(expr.create_if_not_exists & expr.or_replace),
696            InvalidSqlSnafu {
697                err_msg: "syntax error Create Or Replace and If Not Exist cannot be used together",
698            }
699        };
700        let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
701
702        let schema_exists = self
703            .table_metadata_manager
704            .schema_manager()
705            .exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name))
706            .await
707            .context(TableMetadataManagerSnafu)?;
708
709        ensure!(
710            schema_exists,
711            SchemaNotFoundSnafu {
712                schema_info: &expr.schema_name,
713            }
714        );
715
716        // if view or table exists.
717        if let Some(table) = self
718            .catalog_manager
719            .table(
720                &expr.catalog_name,
721                &expr.schema_name,
722                &expr.view_name,
723                Some(&ctx),
724            )
725            .await
726            .context(CatalogSnafu)?
727        {
728            let table_type = table.table_info().table_type;
729
730            match (table_type, expr.create_if_not_exists, expr.or_replace) {
731                (TableType::View, true, false) => {
732                    return Ok(table);
733                }
734                (TableType::View, false, false) => {
735                    return ViewAlreadyExistsSnafu {
736                        name: format_full_table_name(
737                            &expr.catalog_name,
738                            &expr.schema_name,
739                            &expr.view_name,
740                        ),
741                    }
742                    .fail();
743                }
744                (TableType::View, _, true) => {
745                    // Try to replace an exists view
746                }
747                _ => {
748                    return TableAlreadyExistsSnafu {
749                        table: format_full_table_name(
750                            &expr.catalog_name,
751                            &expr.schema_name,
752                            &expr.view_name,
753                        ),
754                    }
755                    .fail();
756                }
757            }
758        }
759
760        ensure!(
761            NAME_PATTERN_REG.is_match(&expr.view_name),
762            InvalidViewNameSnafu {
763                name: expr.view_name.clone(),
764            }
765        );
766
767        let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
768
769        let mut view_info = RawTableInfo {
770            ident: metadata::TableIdent {
771                // The view id of distributed table is assigned by Meta, set "0" here as a placeholder.
772                table_id: 0,
773                version: 0,
774            },
775            name: expr.view_name.clone(),
776            desc: None,
777            catalog_name: expr.catalog_name.clone(),
778            schema_name: expr.schema_name.clone(),
779            // The meta doesn't make sense for views, so using a default one.
780            meta: RawTableMeta::default(),
781            table_type: TableType::View,
782        };
783
784        let request = SubmitDdlTaskRequest {
785            query_context: ctx,
786            task: DdlTask::new_create_view(expr, view_info.clone()),
787        };
788
789        let resp = self
790            .procedure_executor
791            .submit_ddl_task(&ExecutorContext::default(), request)
792            .await
793            .context(error::ExecuteDdlSnafu)?;
794
795        debug!(
796            "Submit creating view '{view_name}' task response: {:?}",
797            resp
798        );
799
800        let view_id = resp
801            .table_ids
802            .into_iter()
803            .next()
804            .context(error::UnexpectedSnafu {
805                violated: "expected table_id",
806            })?;
807        info!("Successfully created view '{view_name}' with view id {view_id}");
808
809        view_info.ident.table_id = view_id;
810
811        let view_info = Arc::new(view_info.try_into().context(CreateTableInfoSnafu)?);
812
813        let table = DistTable::table(view_info);
814
815        // Invalidates local cache ASAP.
816        self.cache_invalidator
817            .invalidate(
818                &Context::default(),
819                &[
820                    CacheIdent::TableId(view_id),
821                    CacheIdent::TableName(view_name.clone()),
822                ],
823            )
824            .await
825            .context(error::InvalidateTableCacheSnafu)?;
826
827        Ok(table)
828    }
829
830    #[tracing::instrument(skip_all)]
831    pub async fn drop_flow(
832        &self,
833        catalog_name: String,
834        flow_name: String,
835        drop_if_exists: bool,
836        query_context: QueryContextRef,
837    ) -> Result<Output> {
838        if let Some(flow) = self
839            .flow_metadata_manager
840            .flow_name_manager()
841            .get(&catalog_name, &flow_name)
842            .await
843            .context(error::TableMetadataManagerSnafu)?
844        {
845            let flow_id = flow.flow_id();
846            let task = DropFlowTask {
847                catalog_name,
848                flow_name,
849                flow_id,
850                drop_if_exists,
851            };
852            self.drop_flow_procedure(task, query_context).await?;
853
854            Ok(Output::new_with_affected_rows(0))
855        } else if drop_if_exists {
856            Ok(Output::new_with_affected_rows(0))
857        } else {
858            FlowNotFoundSnafu {
859                flow_name: format_full_flow_name(&catalog_name, &flow_name),
860            }
861            .fail()
862        }
863    }
864
865    async fn drop_flow_procedure(
866        &self,
867        expr: DropFlowTask,
868        query_context: QueryContextRef,
869    ) -> Result<SubmitDdlTaskResponse> {
870        let request = SubmitDdlTaskRequest {
871            query_context,
872            task: DdlTask::new_drop_flow(expr),
873        };
874
875        self.procedure_executor
876            .submit_ddl_task(&ExecutorContext::default(), request)
877            .await
878            .context(error::ExecuteDdlSnafu)
879    }
880
881    #[cfg(feature = "enterprise")]
882    #[tracing::instrument(skip_all)]
883    pub(super) async fn drop_trigger(
884        &self,
885        catalog_name: String,
886        trigger_name: String,
887        drop_if_exists: bool,
888        query_context: QueryContextRef,
889    ) -> Result<Output> {
890        let task = DropTriggerTask {
891            catalog_name,
892            trigger_name,
893            drop_if_exists,
894        };
895        self.drop_trigger_procedure(task, query_context).await?;
896        Ok(Output::new_with_affected_rows(0))
897    }
898
899    #[cfg(feature = "enterprise")]
900    async fn drop_trigger_procedure(
901        &self,
902        expr: DropTriggerTask,
903        query_context: QueryContextRef,
904    ) -> Result<SubmitDdlTaskResponse> {
905        let request = SubmitDdlTaskRequest {
906            query_context,
907            task: DdlTask::new_drop_trigger(expr),
908        };
909
910        self.procedure_executor
911            .submit_ddl_task(&ExecutorContext::default(), request)
912            .await
913            .context(error::ExecuteDdlSnafu)
914    }
915
916    /// Drop a view
917    #[tracing::instrument(skip_all)]
918    pub(crate) async fn drop_view(
919        &self,
920        catalog: String,
921        schema: String,
922        view: String,
923        drop_if_exists: bool,
924        query_context: QueryContextRef,
925    ) -> Result<Output> {
926        let view_info = if let Some(view) = self
927            .catalog_manager
928            .table(&catalog, &schema, &view, None)
929            .await
930            .context(CatalogSnafu)?
931        {
932            view.table_info()
933        } else if drop_if_exists {
934            // DROP VIEW IF EXISTS meets view not found - ignored
935            return Ok(Output::new_with_affected_rows(0));
936        } else {
937            return TableNotFoundSnafu {
938                table_name: format_full_table_name(&catalog, &schema, &view),
939            }
940            .fail();
941        };
942
943        // Ensure the exists one is view, we can't drop other table types
944        ensure!(
945            view_info.table_type == TableType::View,
946            error::InvalidViewSnafu {
947                msg: "not a view",
948                view_name: format_full_table_name(&catalog, &schema, &view),
949            }
950        );
951
952        let view_id = view_info.table_id();
953
954        let task = DropViewTask {
955            catalog,
956            schema,
957            view,
958            view_id,
959            drop_if_exists,
960        };
961
962        self.drop_view_procedure(task, query_context).await?;
963
964        Ok(Output::new_with_affected_rows(0))
965    }
966
967    /// Submit [DropViewTask] to procedure executor.
968    async fn drop_view_procedure(
969        &self,
970        expr: DropViewTask,
971        query_context: QueryContextRef,
972    ) -> Result<SubmitDdlTaskResponse> {
973        let request = SubmitDdlTaskRequest {
974            query_context,
975            task: DdlTask::new_drop_view(expr),
976        };
977
978        self.procedure_executor
979            .submit_ddl_task(&ExecutorContext::default(), request)
980            .await
981            .context(error::ExecuteDdlSnafu)
982    }
983
984    #[tracing::instrument(skip_all)]
985    pub async fn alter_logical_tables(
986        &self,
987        alter_table_exprs: Vec<AlterTableExpr>,
988        query_context: QueryContextRef,
989    ) -> Result<Output> {
990        let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
991        ensure!(
992            !alter_table_exprs.is_empty(),
993            EmptyDdlExprSnafu {
994                name: "alter logical tables"
995            }
996        );
997
998        // group by physical table id
999        let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
1000        for expr in alter_table_exprs {
1001            // Get table_id from catalog_manager
1002            let catalog = if expr.catalog_name.is_empty() {
1003                query_context.current_catalog()
1004            } else {
1005                &expr.catalog_name
1006            };
1007            let schema = if expr.schema_name.is_empty() {
1008                query_context.current_schema()
1009            } else {
1010                expr.schema_name.to_string()
1011            };
1012            let table_name = &expr.table_name;
1013            let table = self
1014                .catalog_manager
1015                .table(catalog, &schema, table_name, Some(&query_context))
1016                .await
1017                .context(CatalogSnafu)?
1018                .with_context(|| TableNotFoundSnafu {
1019                    table_name: format_full_table_name(catalog, &schema, table_name),
1020                })?;
1021            let table_id = table.table_info().ident.table_id;
1022            let physical_table_id = self
1023                .table_metadata_manager
1024                .table_route_manager()
1025                .get_physical_table_id(table_id)
1026                .await
1027                .context(TableMetadataManagerSnafu)?;
1028            groups.entry(physical_table_id).or_default().push(expr);
1029        }
1030
1031        // Submit procedure for each physical table
1032        let mut handles = Vec::with_capacity(groups.len());
1033        for (_physical_table_id, exprs) in groups {
1034            let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
1035            handles.push(fut);
1036        }
1037        let _results = futures::future::try_join_all(handles).await?;
1038
1039        Ok(Output::new_with_affected_rows(0))
1040    }
1041
1042    #[tracing::instrument(skip_all)]
1043    pub async fn drop_table(
1044        &self,
1045        table_name: TableName,
1046        drop_if_exists: bool,
1047        query_context: QueryContextRef,
1048    ) -> Result<Output> {
1049        // Reserved for grpc call
1050        self.drop_tables(&[table_name], drop_if_exists, query_context)
1051            .await
1052    }
1053
1054    #[tracing::instrument(skip_all)]
1055    pub async fn drop_tables(
1056        &self,
1057        table_names: &[TableName],
1058        drop_if_exists: bool,
1059        query_context: QueryContextRef,
1060    ) -> Result<Output> {
1061        let mut tables = Vec::with_capacity(table_names.len());
1062        for table_name in table_names {
1063            ensure!(
1064                !is_readonly_schema(&table_name.schema_name),
1065                SchemaReadOnlySnafu {
1066                    name: table_name.schema_name.clone()
1067                }
1068            );
1069
1070            if let Some(table) = self
1071                .catalog_manager
1072                .table(
1073                    &table_name.catalog_name,
1074                    &table_name.schema_name,
1075                    &table_name.table_name,
1076                    Some(&query_context),
1077                )
1078                .await
1079                .context(CatalogSnafu)?
1080            {
1081                tables.push(table.table_info().table_id());
1082            } else if drop_if_exists {
1083                // DROP TABLE IF EXISTS meets table not found - ignored
1084                continue;
1085            } else {
1086                return TableNotFoundSnafu {
1087                    table_name: table_name.to_string(),
1088                }
1089                .fail();
1090            }
1091        }
1092
1093        for (table_name, table_id) in table_names.iter().zip(tables.into_iter()) {
1094            self.drop_table_procedure(table_name, table_id, drop_if_exists, query_context.clone())
1095                .await?;
1096
1097            // Invalidates local cache ASAP.
1098            self.cache_invalidator
1099                .invalidate(
1100                    &Context::default(),
1101                    &[
1102                        CacheIdent::TableId(table_id),
1103                        CacheIdent::TableName(table_name.clone()),
1104                    ],
1105                )
1106                .await
1107                .context(error::InvalidateTableCacheSnafu)?;
1108        }
1109        Ok(Output::new_with_affected_rows(0))
1110    }
1111
1112    #[tracing::instrument(skip_all)]
1113    pub async fn drop_database(
1114        &self,
1115        catalog: String,
1116        schema: String,
1117        drop_if_exists: bool,
1118        query_context: QueryContextRef,
1119    ) -> Result<Output> {
1120        ensure!(
1121            !is_readonly_schema(&schema),
1122            SchemaReadOnlySnafu { name: schema }
1123        );
1124
1125        if self
1126            .catalog_manager
1127            .schema_exists(&catalog, &schema, None)
1128            .await
1129            .context(CatalogSnafu)?
1130        {
1131            if schema == query_context.current_schema() {
1132                SchemaInUseSnafu { name: schema }.fail()
1133            } else {
1134                self.drop_database_procedure(catalog, schema, drop_if_exists, query_context)
1135                    .await?;
1136
1137                Ok(Output::new_with_affected_rows(0))
1138            }
1139        } else if drop_if_exists {
1140            // DROP TABLE IF EXISTS meets table not found - ignored
1141            Ok(Output::new_with_affected_rows(0))
1142        } else {
1143            SchemaNotFoundSnafu {
1144                schema_info: schema,
1145            }
1146            .fail()
1147        }
1148    }
1149
1150    #[tracing::instrument(skip_all)]
1151    pub async fn truncate_table(
1152        &self,
1153        table_name: TableName,
1154        time_ranges: Vec<(Timestamp, Timestamp)>,
1155        query_context: QueryContextRef,
1156    ) -> Result<Output> {
1157        ensure!(
1158            !is_readonly_schema(&table_name.schema_name),
1159            SchemaReadOnlySnafu {
1160                name: table_name.schema_name.clone()
1161            }
1162        );
1163
1164        let table = self
1165            .catalog_manager
1166            .table(
1167                &table_name.catalog_name,
1168                &table_name.schema_name,
1169                &table_name.table_name,
1170                Some(&query_context),
1171            )
1172            .await
1173            .context(CatalogSnafu)?
1174            .with_context(|| TableNotFoundSnafu {
1175                table_name: table_name.to_string(),
1176            })?;
1177        let table_id = table.table_info().table_id();
1178        self.truncate_table_procedure(&table_name, table_id, time_ranges, query_context)
1179            .await?;
1180
1181        Ok(Output::new_with_affected_rows(0))
1182    }
1183
1184    #[tracing::instrument(skip_all)]
1185    pub async fn alter_table(
1186        &self,
1187        alter_table: AlterTable,
1188        query_context: QueryContextRef,
1189    ) -> Result<Output> {
1190        let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
1191        self.alter_table_inner(expr, query_context).await
1192    }
1193
1194    #[tracing::instrument(skip_all)]
1195    pub async fn alter_table_inner(
1196        &self,
1197        expr: AlterTableExpr,
1198        query_context: QueryContextRef,
1199    ) -> Result<Output> {
1200        ensure!(
1201            !is_readonly_schema(&expr.schema_name),
1202            SchemaReadOnlySnafu {
1203                name: expr.schema_name.clone()
1204            }
1205        );
1206
1207        let catalog_name = if expr.catalog_name.is_empty() {
1208            DEFAULT_CATALOG_NAME.to_string()
1209        } else {
1210            expr.catalog_name.clone()
1211        };
1212
1213        let schema_name = if expr.schema_name.is_empty() {
1214            DEFAULT_SCHEMA_NAME.to_string()
1215        } else {
1216            expr.schema_name.clone()
1217        };
1218
1219        let table_name = expr.table_name.clone();
1220
1221        let table = self
1222            .catalog_manager
1223            .table(
1224                &catalog_name,
1225                &schema_name,
1226                &table_name,
1227                Some(&query_context),
1228            )
1229            .await
1230            .context(CatalogSnafu)?
1231            .with_context(|| TableNotFoundSnafu {
1232                table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
1233            })?;
1234
1235        let table_id = table.table_info().ident.table_id;
1236        let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?;
1237        if !need_alter {
1238            return Ok(Output::new_with_affected_rows(0));
1239        }
1240        info!(
1241            "Table info before alter is {:?}, expr: {:?}",
1242            table.table_info(),
1243            expr
1244        );
1245
1246        let physical_table_id = self
1247            .table_metadata_manager
1248            .table_route_manager()
1249            .get_physical_table_id(table_id)
1250            .await
1251            .context(TableMetadataManagerSnafu)?;
1252
1253        let (req, invalidate_keys) = if physical_table_id == table_id {
1254            // This is physical table
1255            let req = SubmitDdlTaskRequest {
1256                query_context,
1257                task: DdlTask::new_alter_table(expr),
1258            };
1259
1260            let invalidate_keys = vec![
1261                CacheIdent::TableId(table_id),
1262                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1263            ];
1264
1265            (req, invalidate_keys)
1266        } else {
1267            // This is logical table
1268            let req = SubmitDdlTaskRequest {
1269                query_context,
1270                task: DdlTask::new_alter_logical_tables(vec![expr]),
1271            };
1272
1273            let mut invalidate_keys = vec![
1274                CacheIdent::TableId(physical_table_id),
1275                CacheIdent::TableId(table_id),
1276                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1277            ];
1278
1279            let physical_table = self
1280                .table_metadata_manager
1281                .table_info_manager()
1282                .get(physical_table_id)
1283                .await
1284                .context(TableMetadataManagerSnafu)?
1285                .map(|x| x.into_inner());
1286            if let Some(physical_table) = physical_table {
1287                let physical_table_name = TableName::new(
1288                    physical_table.table_info.catalog_name,
1289                    physical_table.table_info.schema_name,
1290                    physical_table.table_info.name,
1291                );
1292                invalidate_keys.push(CacheIdent::TableName(physical_table_name));
1293            }
1294
1295            (req, invalidate_keys)
1296        };
1297
1298        self.procedure_executor
1299            .submit_ddl_task(&ExecutorContext::default(), req)
1300            .await
1301            .context(error::ExecuteDdlSnafu)?;
1302
1303        // Invalidates local cache ASAP.
1304        self.cache_invalidator
1305            .invalidate(&Context::default(), &invalidate_keys)
1306            .await
1307            .context(error::InvalidateTableCacheSnafu)?;
1308
1309        Ok(Output::new_with_affected_rows(0))
1310    }
1311
1312    #[cfg(feature = "enterprise")]
1313    #[tracing::instrument(skip_all)]
1314    pub async fn alter_trigger(
1315        &self,
1316        _alter_expr: AlterTrigger,
1317        _query_context: QueryContextRef,
1318    ) -> Result<Output> {
1319        crate::error::NotSupportedSnafu {
1320            feat: "alter trigger",
1321        }
1322        .fail()
1323    }
1324
1325    #[tracing::instrument(skip_all)]
1326    pub async fn alter_database(
1327        &self,
1328        alter_expr: AlterDatabase,
1329        query_context: QueryContextRef,
1330    ) -> Result<Output> {
1331        let alter_expr = expr_helper::to_alter_database_expr(alter_expr, &query_context)?;
1332        self.alter_database_inner(alter_expr, query_context).await
1333    }
1334
1335    #[tracing::instrument(skip_all)]
1336    pub async fn alter_database_inner(
1337        &self,
1338        alter_expr: AlterDatabaseExpr,
1339        query_context: QueryContextRef,
1340    ) -> Result<Output> {
1341        ensure!(
1342            !is_readonly_schema(&alter_expr.schema_name),
1343            SchemaReadOnlySnafu {
1344                name: query_context.current_schema().clone()
1345            }
1346        );
1347
1348        let exists = self
1349            .catalog_manager
1350            .schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
1351            .await
1352            .context(CatalogSnafu)?;
1353        ensure!(
1354            exists,
1355            SchemaNotFoundSnafu {
1356                schema_info: alter_expr.schema_name,
1357            }
1358        );
1359
1360        let cache_ident = [CacheIdent::SchemaName(SchemaName {
1361            catalog_name: alter_expr.catalog_name.clone(),
1362            schema_name: alter_expr.schema_name.clone(),
1363        })];
1364
1365        self.alter_database_procedure(alter_expr, query_context)
1366            .await?;
1367
1368        // Invalidates local cache ASAP.
1369        self.cache_invalidator
1370            .invalidate(&Context::default(), &cache_ident)
1371            .await
1372            .context(error::InvalidateTableCacheSnafu)?;
1373
1374        Ok(Output::new_with_affected_rows(0))
1375    }
1376
1377    async fn create_table_procedure(
1378        &self,
1379        create_table: CreateTableExpr,
1380        partitions: Vec<PartitionExpr>,
1381        table_info: RawTableInfo,
1382        query_context: QueryContextRef,
1383    ) -> Result<SubmitDdlTaskResponse> {
1384        let partitions = partitions
1385            .into_iter()
1386            .map(|expr| expr.as_pb_partition().context(PartitionExprToPbSnafu))
1387            .collect::<Result<Vec<_>>>()?;
1388
1389        let request = SubmitDdlTaskRequest {
1390            query_context,
1391            task: DdlTask::new_create_table(create_table, partitions, table_info),
1392        };
1393
1394        self.procedure_executor
1395            .submit_ddl_task(&ExecutorContext::default(), request)
1396            .await
1397            .context(error::ExecuteDdlSnafu)
1398    }
1399
1400    async fn create_logical_tables_procedure(
1401        &self,
1402        tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
1403        query_context: QueryContextRef,
1404    ) -> Result<SubmitDdlTaskResponse> {
1405        let request = SubmitDdlTaskRequest {
1406            query_context,
1407            task: DdlTask::new_create_logical_tables(tables_data),
1408        };
1409
1410        self.procedure_executor
1411            .submit_ddl_task(&ExecutorContext::default(), request)
1412            .await
1413            .context(error::ExecuteDdlSnafu)
1414    }
1415
1416    async fn alter_logical_tables_procedure(
1417        &self,
1418        tables_data: Vec<AlterTableExpr>,
1419        query_context: QueryContextRef,
1420    ) -> Result<SubmitDdlTaskResponse> {
1421        let request = SubmitDdlTaskRequest {
1422            query_context,
1423            task: DdlTask::new_alter_logical_tables(tables_data),
1424        };
1425
1426        self.procedure_executor
1427            .submit_ddl_task(&ExecutorContext::default(), request)
1428            .await
1429            .context(error::ExecuteDdlSnafu)
1430    }
1431
1432    async fn drop_table_procedure(
1433        &self,
1434        table_name: &TableName,
1435        table_id: TableId,
1436        drop_if_exists: bool,
1437        query_context: QueryContextRef,
1438    ) -> Result<SubmitDdlTaskResponse> {
1439        let request = SubmitDdlTaskRequest {
1440            query_context,
1441            task: DdlTask::new_drop_table(
1442                table_name.catalog_name.to_string(),
1443                table_name.schema_name.to_string(),
1444                table_name.table_name.to_string(),
1445                table_id,
1446                drop_if_exists,
1447            ),
1448        };
1449
1450        self.procedure_executor
1451            .submit_ddl_task(&ExecutorContext::default(), request)
1452            .await
1453            .context(error::ExecuteDdlSnafu)
1454    }
1455
1456    async fn drop_database_procedure(
1457        &self,
1458        catalog: String,
1459        schema: String,
1460        drop_if_exists: bool,
1461        query_context: QueryContextRef,
1462    ) -> Result<SubmitDdlTaskResponse> {
1463        let request = SubmitDdlTaskRequest {
1464            query_context,
1465            task: DdlTask::new_drop_database(catalog, schema, drop_if_exists),
1466        };
1467
1468        self.procedure_executor
1469            .submit_ddl_task(&ExecutorContext::default(), request)
1470            .await
1471            .context(error::ExecuteDdlSnafu)
1472    }
1473
1474    async fn alter_database_procedure(
1475        &self,
1476        alter_expr: AlterDatabaseExpr,
1477        query_context: QueryContextRef,
1478    ) -> Result<SubmitDdlTaskResponse> {
1479        let request = SubmitDdlTaskRequest {
1480            query_context,
1481            task: DdlTask::new_alter_database(alter_expr),
1482        };
1483
1484        self.procedure_executor
1485            .submit_ddl_task(&ExecutorContext::default(), request)
1486            .await
1487            .context(error::ExecuteDdlSnafu)
1488    }
1489
1490    async fn truncate_table_procedure(
1491        &self,
1492        table_name: &TableName,
1493        table_id: TableId,
1494        time_ranges: Vec<(Timestamp, Timestamp)>,
1495        query_context: QueryContextRef,
1496    ) -> Result<SubmitDdlTaskResponse> {
1497        let request = SubmitDdlTaskRequest {
1498            query_context,
1499            task: DdlTask::new_truncate_table(
1500                table_name.catalog_name.to_string(),
1501                table_name.schema_name.to_string(),
1502                table_name.table_name.to_string(),
1503                table_id,
1504                time_ranges,
1505            ),
1506        };
1507
1508        self.procedure_executor
1509            .submit_ddl_task(&ExecutorContext::default(), request)
1510            .await
1511            .context(error::ExecuteDdlSnafu)
1512    }
1513
1514    #[tracing::instrument(skip_all)]
1515    pub async fn create_database(
1516        &self,
1517        database: &str,
1518        create_if_not_exists: bool,
1519        options: HashMap<String, String>,
1520        query_context: QueryContextRef,
1521    ) -> Result<Output> {
1522        let catalog = query_context.current_catalog();
1523        ensure!(
1524            NAME_PATTERN_REG.is_match(catalog),
1525            error::UnexpectedSnafu {
1526                violated: format!("Invalid catalog name: {}", catalog)
1527            }
1528        );
1529
1530        ensure!(
1531            NAME_PATTERN_REG.is_match(database),
1532            error::UnexpectedSnafu {
1533                violated: format!("Invalid database name: {}", database)
1534            }
1535        );
1536
1537        if !self
1538            .catalog_manager
1539            .schema_exists(catalog, database, None)
1540            .await
1541            .context(CatalogSnafu)?
1542            && !self.catalog_manager.is_reserved_schema_name(database)
1543        {
1544            self.create_database_procedure(
1545                catalog.to_string(),
1546                database.to_string(),
1547                create_if_not_exists,
1548                options,
1549                query_context,
1550            )
1551            .await?;
1552
1553            Ok(Output::new_with_affected_rows(1))
1554        } else if create_if_not_exists {
1555            Ok(Output::new_with_affected_rows(1))
1556        } else {
1557            error::SchemaExistsSnafu { name: database }.fail()
1558        }
1559    }
1560
1561    async fn create_database_procedure(
1562        &self,
1563        catalog: String,
1564        database: String,
1565        create_if_not_exists: bool,
1566        options: HashMap<String, String>,
1567        query_context: QueryContextRef,
1568    ) -> Result<SubmitDdlTaskResponse> {
1569        let request = SubmitDdlTaskRequest {
1570            query_context,
1571            task: DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
1572        };
1573
1574        self.procedure_executor
1575            .submit_ddl_task(&ExecutorContext::default(), request)
1576            .await
1577            .context(error::ExecuteDdlSnafu)
1578    }
1579}
1580
1581/// Parse partition statement [Partitions] into [PartitionExpr] and partition columns.
1582pub fn parse_partitions(
1583    create_table: &CreateTableExpr,
1584    partitions: Option<Partitions>,
1585    query_ctx: &QueryContextRef,
1586) -> Result<(Vec<PartitionExpr>, Vec<String>)> {
1587    // If partitions are not defined by user, use the timestamp column (which has to be existed) as
1588    // the partition column, and create only one partition.
1589    let partition_columns = find_partition_columns(&partitions)?;
1590    let partition_exprs =
1591        find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
1592
1593    // Validates partition
1594    let exprs = partition_exprs.clone();
1595    MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true)
1596        .context(InvalidPartitionSnafu)?;
1597
1598    Ok((partition_exprs, partition_columns))
1599}
1600
1601/// Verifies an alter and returns whether it is necessary to perform the alter.
1602///
1603/// # Returns
1604///
1605/// Returns true if the alter need to be porformed; otherwise, it returns false.
1606pub fn verify_alter(
1607    table_id: TableId,
1608    table_info: Arc<TableInfo>,
1609    expr: AlterTableExpr,
1610) -> Result<bool> {
1611    let request: AlterTableRequest =
1612        common_grpc_expr::alter_expr_to_request(table_id, expr, Some(&table_info.meta))
1613            .context(AlterExprToRequestSnafu)?;
1614
1615    let AlterTableRequest {
1616        table_name,
1617        alter_kind,
1618        ..
1619    } = &request;
1620
1621    if let AlterKind::RenameTable { new_table_name } = alter_kind {
1622        ensure!(
1623            NAME_PATTERN_REG.is_match(new_table_name),
1624            error::UnexpectedSnafu {
1625                violated: format!("Invalid table name: {}", new_table_name)
1626            }
1627        );
1628    } else if let AlterKind::AddColumns { columns } = alter_kind {
1629        // If all the columns are marked as add_if_not_exists and they already exist in the table,
1630        // there is no need to perform the alter.
1631        let column_names: HashSet<_> = table_info
1632            .meta
1633            .schema
1634            .column_schemas()
1635            .iter()
1636            .map(|schema| &schema.name)
1637            .collect();
1638        if columns.iter().all(|column| {
1639            column_names.contains(&column.column_schema.name) && column.add_if_not_exists
1640        }) {
1641            return Ok(false);
1642        }
1643    }
1644
1645    let _ = table_info
1646        .meta
1647        .builder_with_alter_kind(table_name, &request.alter_kind)
1648        .context(error::TableSnafu)?
1649        .build()
1650        .context(error::BuildTableMetaSnafu { table_name })?;
1651
1652    Ok(true)
1653}
1654
1655pub fn create_table_info(
1656    create_table: &CreateTableExpr,
1657    partition_columns: Vec<String>,
1658) -> Result<RawTableInfo> {
1659    let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
1660    let mut column_name_to_index_map = HashMap::new();
1661
1662    for (idx, column) in create_table.column_defs.iter().enumerate() {
1663        let schema =
1664            column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
1665                column: &column.name,
1666            })?;
1667        let schema = schema.with_time_index(column.name == create_table.time_index);
1668
1669        column_schemas.push(schema);
1670        let _ = column_name_to_index_map.insert(column.name.clone(), idx);
1671    }
1672
1673    let timestamp_index = column_name_to_index_map
1674        .get(&create_table.time_index)
1675        .cloned();
1676
1677    let raw_schema = RawSchema {
1678        column_schemas: column_schemas.clone(),
1679        timestamp_index,
1680        version: 0,
1681    };
1682
1683    let primary_key_indices = create_table
1684        .primary_keys
1685        .iter()
1686        .map(|name| {
1687            column_name_to_index_map
1688                .get(name)
1689                .cloned()
1690                .context(ColumnNotFoundSnafu { msg: name })
1691        })
1692        .collect::<Result<Vec<_>>>()?;
1693
1694    let partition_key_indices = partition_columns
1695        .into_iter()
1696        .map(|col_name| {
1697            column_name_to_index_map
1698                .get(&col_name)
1699                .cloned()
1700                .context(ColumnNotFoundSnafu { msg: col_name })
1701        })
1702        .collect::<Result<Vec<_>>>()?;
1703
1704    let table_options = TableOptions::try_from_iter(&create_table.table_options)
1705        .context(UnrecognizedTableOptionSnafu)?;
1706
1707    let meta = RawTableMeta {
1708        schema: raw_schema,
1709        primary_key_indices,
1710        value_indices: vec![],
1711        engine: create_table.engine.clone(),
1712        next_column_id: column_schemas.len() as u32,
1713        region_numbers: vec![],
1714        options: table_options,
1715        created_on: Utc::now(),
1716        partition_key_indices,
1717        column_ids: vec![],
1718    };
1719
1720    let desc = if create_table.desc.is_empty() {
1721        create_table.table_options.get(COMMENT_KEY).cloned()
1722    } else {
1723        Some(create_table.desc.clone())
1724    };
1725
1726    let table_info = RawTableInfo {
1727        ident: metadata::TableIdent {
1728            // The table id of distributed table is assigned by Meta, set "0" here as a placeholder.
1729            table_id: 0,
1730            version: 0,
1731        },
1732        name: create_table.table_name.clone(),
1733        desc,
1734        catalog_name: create_table.catalog_name.clone(),
1735        schema_name: create_table.schema_name.clone(),
1736        meta,
1737        table_type: TableType::Base,
1738    };
1739    Ok(table_info)
1740}
1741
1742fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
1743    let columns = if let Some(partitions) = partitions {
1744        partitions
1745            .column_list
1746            .iter()
1747            .map(|x| x.value.clone())
1748            .collect::<Vec<_>>()
1749    } else {
1750        vec![]
1751    };
1752    Ok(columns)
1753}
1754
1755/// Parse [Partitions] into a group of partition entries.
1756///
1757/// Returns a list of [PartitionExpr], each of which defines a partition.
1758fn find_partition_entries(
1759    create_table: &CreateTableExpr,
1760    partitions: &Option<Partitions>,
1761    partition_columns: &[String],
1762    query_ctx: &QueryContextRef,
1763) -> Result<Vec<PartitionExpr>> {
1764    let Some(partitions) = partitions else {
1765        return Ok(vec![]);
1766    };
1767
1768    // extract concrete data type of partition columns
1769    let column_name_and_type = partition_columns
1770        .iter()
1771        .map(|pc| {
1772            let column = create_table
1773                .column_defs
1774                .iter()
1775                .find(|c| &c.name == pc)
1776                // unwrap is safe here because we have checked that partition columns are defined
1777                .unwrap();
1778            let column_name = &column.name;
1779            let data_type = ConcreteDataType::from(
1780                ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension)
1781                    .context(ColumnDataTypeSnafu)?,
1782            );
1783            Ok((column_name, data_type))
1784        })
1785        .collect::<Result<HashMap<_, _>>>()?;
1786
1787    // Transform parser expr to partition expr
1788    let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
1789    for partition in &partitions.exprs {
1790        let partition_expr =
1791            convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
1792        partition_exprs.push(partition_expr);
1793    }
1794
1795    Ok(partition_exprs)
1796}
1797
1798fn convert_one_expr(
1799    expr: &Expr,
1800    column_name_and_type: &HashMap<&String, ConcreteDataType>,
1801    timezone: &Timezone,
1802) -> Result<PartitionExpr> {
1803    let Expr::BinaryOp { left, op, right } = expr else {
1804        return InvalidPartitionRuleSnafu {
1805            reason: "partition rule must be a binary expression",
1806        }
1807        .fail();
1808    };
1809
1810    let op =
1811        RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
1812            reason: format!("unsupported operator in partition expr {op}"),
1813        })?;
1814
1815    // convert leaf node.
1816    let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
1817        // col, val
1818        (Expr::Identifier(ident), Expr::Value(value)) => {
1819            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1820            let value = convert_value(&value.value, data_type, timezone, None)?;
1821            (Operand::Column(column_name), op, Operand::Value(value))
1822        }
1823        (Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
1824            if let Expr::Value(v) = &**expr =>
1825        {
1826            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1827            let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
1828            (Operand::Column(column_name), op, Operand::Value(value))
1829        }
1830        // val, col
1831        (Expr::Value(value), Expr::Identifier(ident)) => {
1832            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1833            let value = convert_value(&value.value, data_type, timezone, None)?;
1834            (Operand::Value(value), op, Operand::Column(column_name))
1835        }
1836        (Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
1837            if let Expr::Value(v) = &**expr =>
1838        {
1839            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1840            let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
1841            (Operand::Value(value), op, Operand::Column(column_name))
1842        }
1843        (Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
1844            // sub-expr must against another sub-expr
1845            let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
1846            let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
1847            (Operand::Expr(lhs), op, Operand::Expr(rhs))
1848        }
1849        _ => {
1850            return InvalidPartitionRuleSnafu {
1851                reason: format!("invalid partition expr {expr}"),
1852            }
1853            .fail();
1854        }
1855    };
1856
1857    Ok(PartitionExpr::new(lhs, op, rhs))
1858}
1859
1860fn convert_identifier(
1861    ident: &Ident,
1862    column_name_and_type: &HashMap<&String, ConcreteDataType>,
1863) -> Result<(String, ConcreteDataType)> {
1864    let column_name = ident.value.clone();
1865    let data_type = column_name_and_type
1866        .get(&column_name)
1867        .cloned()
1868        .with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
1869    Ok((column_name, data_type))
1870}
1871
1872fn convert_value(
1873    value: &ParserValue,
1874    data_type: ConcreteDataType,
1875    timezone: &Timezone,
1876    unary_op: Option<UnaryOperator>,
1877) -> Result<Value> {
1878    sql_value_to_value(
1879        "<NONAME>",
1880        &data_type,
1881        value,
1882        Some(timezone),
1883        unary_op,
1884        false,
1885    )
1886    .context(error::SqlCommonSnafu)
1887}
1888
1889#[cfg(test)]
1890mod test {
1891    use session::context::{QueryContext, QueryContextBuilder};
1892    use sql::dialect::GreptimeDbDialect;
1893    use sql::parser::{ParseOptions, ParserContext};
1894    use sql::statements::statement::Statement;
1895
1896    use super::*;
1897    use crate::expr_helper;
1898
1899    #[test]
1900    fn test_name_is_match() {
1901        assert!(!NAME_PATTERN_REG.is_match("/adaf"));
1902        assert!(!NAME_PATTERN_REG.is_match("🈲"));
1903        assert!(NAME_PATTERN_REG.is_match("hello"));
1904        assert!(NAME_PATTERN_REG.is_match("test@"));
1905        assert!(!NAME_PATTERN_REG.is_match("@test"));
1906        assert!(NAME_PATTERN_REG.is_match("test#"));
1907        assert!(!NAME_PATTERN_REG.is_match("#test"));
1908        assert!(!NAME_PATTERN_REG.is_match("@"));
1909        assert!(!NAME_PATTERN_REG.is_match("#"));
1910    }
1911
1912    #[tokio::test]
1913    #[ignore = "TODO(ruihang): WIP new partition rule"]
1914    async fn test_parse_partitions() {
1915        common_telemetry::init_default_ut_logging();
1916        let cases = [
1917            (
1918                r"
1919CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
1920PARTITION ON COLUMNS (b) (
1921  b < 'hz',
1922  b >= 'hz' AND b < 'sh',
1923  b >= 'sh'
1924)
1925ENGINE=mito",
1926                r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
1927            ),
1928            (
1929                r"
1930CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
1931PARTITION BY RANGE COLUMNS (b, a) (
1932  PARTITION r0 VALUES LESS THAN ('hz', 10),
1933  b < 'hz' AND a < 10,
1934  b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20,
1935  b >= 'sh' AND a >= 20
1936)
1937ENGINE=mito",
1938                r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
1939            ),
1940        ];
1941        let ctx = QueryContextBuilder::default().build().into();
1942        for (sql, expected) in cases {
1943            let result = ParserContext::create_with_dialect(
1944                sql,
1945                &GreptimeDbDialect {},
1946                ParseOptions::default(),
1947            )
1948            .unwrap();
1949            match &result[0] {
1950                Statement::CreateTable(c) => {
1951                    let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
1952                    let (partitions, _) =
1953                        parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
1954                    let json = serde_json::to_string(&partitions).unwrap();
1955                    assert_eq!(json, expected);
1956                }
1957                _ => unreachable!(),
1958            }
1959        }
1960    }
1961}