operator/statement/
ddl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use api::helper::ColumnDataTypeWrapper;
19use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
20use api::v1::{
21    AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, column_def,
22};
23#[cfg(feature = "enterprise")]
24use api::v1::{
25    CreateTriggerExpr as PbCreateTriggerExpr, meta::CreateTriggerTask as PbCreateTriggerTask,
26};
27use catalog::CatalogManagerRef;
28use chrono::Utc;
29use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, is_readonly_schema};
30use common_catalog::{format_full_flow_name, format_full_table_name};
31use common_error::ext::BoxedError;
32use common_meta::cache_invalidator::Context;
33use common_meta::ddl::create_flow::FlowType;
34use common_meta::instruction::CacheIdent;
35use common_meta::key::NAME_PATTERN;
36use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
37use common_meta::procedure_executor::ExecutorContext;
38#[cfg(feature = "enterprise")]
39use common_meta::rpc::ddl::trigger::CreateTriggerTask;
40#[cfg(feature = "enterprise")]
41use common_meta::rpc::ddl::trigger::DropTriggerTask;
42use common_meta::rpc::ddl::{
43    CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
44    SubmitDdlTaskResponse,
45};
46use common_query::Output;
47use common_sql::convert::sql_value_to_value;
48use common_telemetry::{debug, info, tracing, warn};
49use common_time::{Timestamp, Timezone};
50use datafusion_common::tree_node::TreeNodeVisitor;
51use datafusion_expr::LogicalPlan;
52use datatypes::prelude::ConcreteDataType;
53use datatypes::schema::{RawSchema, Schema};
54use datatypes::value::Value;
55use lazy_static::lazy_static;
56use partition::expr::{Operand, PartitionExpr, RestrictedOp};
57use partition::multi_dim::MultiDimPartitionRule;
58use query::parser::QueryStatement;
59use query::plan::extract_and_rewrite_full_table_names;
60use query::query_engine::DefaultSerializer;
61use query::sql::create_table_stmt;
62use regex::Regex;
63use session::context::QueryContextRef;
64use session::table_name::table_idents_to_full_name;
65use snafu::{OptionExt, ResultExt, ensure};
66use sql::parser::{ParseOptions, ParserContext};
67#[cfg(feature = "enterprise")]
68use sql::statements::alter::trigger::AlterTrigger;
69use sql::statements::alter::{AlterDatabase, AlterTable, AlterTableOperation};
70#[cfg(feature = "enterprise")]
71use sql::statements::create::trigger::CreateTrigger;
72use sql::statements::create::{
73    CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
74};
75use sql::statements::statement::Statement;
76use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
77use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
78use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
79use table::TableRef;
80use table::dist_table::DistTable;
81use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
82use table::requests::{AlterKind, AlterTableRequest, COMMENT_KEY, TableOptions};
83use table::table_name::TableName;
84
85use crate::error::{
86    self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
87    ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
88    EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu,
89    InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu, InvalidTableNameSnafu,
90    InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu, PartitionExprToPbSnafu, Result,
91    SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu,
92    TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
93    UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
94};
95use crate::expr_helper;
96use crate::statement::StatementExecutor;
97use crate::statement::show::create_partitions_stmt;
98
99lazy_static! {
100    pub static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap();
101}
102
103impl StatementExecutor {
104    pub fn catalog_manager(&self) -> CatalogManagerRef {
105        self.catalog_manager.clone()
106    }
107
108    #[tracing::instrument(skip_all)]
109    pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
110        let (catalog, schema, _table) = table_idents_to_full_name(&stmt.name, &ctx)
111            .map_err(BoxedError::new)
112            .context(error::ExternalSnafu)?;
113
114        let schema_options = self
115            .table_metadata_manager
116            .schema_manager()
117            .get(SchemaNameKey {
118                catalog: &catalog,
119                schema: &schema,
120            })
121            .await
122            .context(TableMetadataManagerSnafu)?
123            .map(|v| v.into_inner());
124
125        let create_expr = &mut expr_helper::create_to_expr(&stmt, &ctx)?;
126        // We don't put ttl into the table options
127        // Because it will be used directly while compaction.
128        if let Some(schema_options) = schema_options {
129            for (key, value) in schema_options.extra_options.iter() {
130                create_expr
131                    .table_options
132                    .entry(key.clone())
133                    .or_insert(value.clone());
134            }
135        }
136
137        self.create_table_inner(create_expr, stmt.partitions, ctx)
138            .await
139    }
140
141    #[tracing::instrument(skip_all)]
142    pub async fn create_table_like(
143        &self,
144        stmt: CreateTableLike,
145        ctx: QueryContextRef,
146    ) -> Result<TableRef> {
147        let (catalog, schema, table) = table_idents_to_full_name(&stmt.source_name, &ctx)
148            .map_err(BoxedError::new)
149            .context(error::ExternalSnafu)?;
150        let table_ref = self
151            .catalog_manager
152            .table(&catalog, &schema, &table, Some(&ctx))
153            .await
154            .context(CatalogSnafu)?
155            .context(TableNotFoundSnafu { table_name: &table })?;
156        let partitions = self
157            .partition_manager
158            .find_table_partitions(table_ref.table_info().table_id())
159            .await
160            .context(error::FindTablePartitionRuleSnafu { table_name: table })?;
161
162        // CREATE TABLE LIKE also inherits database level options.
163        let schema_options = self
164            .table_metadata_manager
165            .schema_manager()
166            .get(SchemaNameKey {
167                catalog: &catalog,
168                schema: &schema,
169            })
170            .await
171            .context(TableMetadataManagerSnafu)?
172            .map(|v| v.into_inner());
173
174        let quote_style = ctx.quote_style();
175        let mut create_stmt =
176            create_table_stmt(&table_ref.table_info(), schema_options, quote_style)
177                .context(error::ParseQuerySnafu)?;
178        create_stmt.name = stmt.table_name;
179        create_stmt.if_not_exists = false;
180
181        let table_info = table_ref.table_info();
182        let partitions =
183            create_partitions_stmt(&table_info, partitions)?.and_then(|mut partitions| {
184                if !partitions.column_list.is_empty() {
185                    partitions.set_quote(quote_style);
186                    Some(partitions)
187                } else {
188                    None
189                }
190            });
191
192        let create_expr = &mut expr_helper::create_to_expr(&create_stmt, &ctx)?;
193        self.create_table_inner(create_expr, partitions, ctx).await
194    }
195
196    #[tracing::instrument(skip_all)]
197    pub async fn create_external_table(
198        &self,
199        create_expr: CreateExternalTable,
200        ctx: QueryContextRef,
201    ) -> Result<TableRef> {
202        let create_expr = &mut expr_helper::create_external_expr(create_expr, &ctx).await?;
203        self.create_table_inner(create_expr, None, ctx).await
204    }
205
206    #[tracing::instrument(skip_all)]
207    pub async fn create_table_inner(
208        &self,
209        create_table: &mut CreateTableExpr,
210        partitions: Option<Partitions>,
211        query_ctx: QueryContextRef,
212    ) -> Result<TableRef> {
213        ensure!(
214            !is_readonly_schema(&create_table.schema_name),
215            SchemaReadOnlySnafu {
216                name: create_table.schema_name.clone()
217            }
218        );
219
220        if create_table.engine == METRIC_ENGINE_NAME
221            && create_table
222                .table_options
223                .contains_key(LOGICAL_TABLE_METADATA_KEY)
224        {
225            // Create logical tables
226            ensure!(
227                partitions.is_none(),
228                InvalidPartitionRuleSnafu {
229                    reason: "logical table in metric engine should not have partition rule, it will be inherited from physical table",
230                }
231            );
232            self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
233                .await?
234                .into_iter()
235                .next()
236                .context(error::UnexpectedSnafu {
237                    violated: "expected to create logical tables",
238                })
239        } else {
240            // Create other normal table
241            self.create_non_logic_table(create_table, partitions, query_ctx)
242                .await
243        }
244    }
245
246    #[tracing::instrument(skip_all)]
247    pub async fn create_non_logic_table(
248        &self,
249        create_table: &mut CreateTableExpr,
250        partitions: Option<Partitions>,
251        query_ctx: QueryContextRef,
252    ) -> Result<TableRef> {
253        let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
254
255        // Check if schema exists
256        let schema = self
257            .table_metadata_manager
258            .schema_manager()
259            .get(SchemaNameKey::new(
260                &create_table.catalog_name,
261                &create_table.schema_name,
262            ))
263            .await
264            .context(TableMetadataManagerSnafu)?;
265        ensure!(
266            schema.is_some(),
267            SchemaNotFoundSnafu {
268                schema_info: &create_table.schema_name,
269            }
270        );
271
272        // if table exists.
273        if let Some(table) = self
274            .catalog_manager
275            .table(
276                &create_table.catalog_name,
277                &create_table.schema_name,
278                &create_table.table_name,
279                Some(&query_ctx),
280            )
281            .await
282            .context(CatalogSnafu)?
283        {
284            return if create_table.create_if_not_exists {
285                Ok(table)
286            } else {
287                TableAlreadyExistsSnafu {
288                    table: format_full_table_name(
289                        &create_table.catalog_name,
290                        &create_table.schema_name,
291                        &create_table.table_name,
292                    ),
293                }
294                .fail()
295            };
296        }
297
298        ensure!(
299            NAME_PATTERN_REG.is_match(&create_table.table_name),
300            InvalidTableNameSnafu {
301                table_name: &create_table.table_name,
302            }
303        );
304
305        let table_name = TableName::new(
306            &create_table.catalog_name,
307            &create_table.schema_name,
308            &create_table.table_name,
309        );
310
311        let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
312        let mut table_info = create_table_info(create_table, partition_cols)?;
313
314        let resp = self
315            .create_table_procedure(
316                create_table.clone(),
317                partitions,
318                table_info.clone(),
319                query_ctx,
320            )
321            .await?;
322
323        let table_id = resp
324            .table_ids
325            .into_iter()
326            .next()
327            .context(error::UnexpectedSnafu {
328                violated: "expected table_id",
329            })?;
330        info!("Successfully created table '{table_name}' with table id {table_id}");
331
332        table_info.ident.table_id = table_id;
333
334        let table_info: Arc<TableInfo> =
335            Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?);
336        create_table.table_id = Some(api::v1::TableId { id: table_id });
337
338        let table = DistTable::table(table_info);
339
340        Ok(table)
341    }
342
343    #[tracing::instrument(skip_all)]
344    pub async fn create_logical_tables(
345        &self,
346        create_table_exprs: &[CreateTableExpr],
347        query_context: QueryContextRef,
348    ) -> Result<Vec<TableRef>> {
349        let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
350        ensure!(
351            !create_table_exprs.is_empty(),
352            EmptyDdlExprSnafu {
353                name: "create logic tables"
354            }
355        );
356
357        // Check table names
358        for create_table in create_table_exprs {
359            ensure!(
360                NAME_PATTERN_REG.is_match(&create_table.table_name),
361                InvalidTableNameSnafu {
362                    table_name: &create_table.table_name,
363                }
364            );
365        }
366
367        let mut raw_tables_info = create_table_exprs
368            .iter()
369            .map(|create| create_table_info(create, vec![]))
370            .collect::<Result<Vec<_>>>()?;
371        let tables_data = create_table_exprs
372            .iter()
373            .cloned()
374            .zip(raw_tables_info.iter().cloned())
375            .collect::<Vec<_>>();
376
377        let resp = self
378            .create_logical_tables_procedure(tables_data, query_context)
379            .await?;
380
381        let table_ids = resp.table_ids;
382        ensure!(
383            table_ids.len() == raw_tables_info.len(),
384            CreateLogicalTablesSnafu {
385                reason: format!(
386                    "The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}",
387                    raw_tables_info.len(),
388                    table_ids.len()
389                )
390            }
391        );
392        info!("Successfully created logical tables: {:?}", table_ids);
393
394        for (i, table_info) in raw_tables_info.iter_mut().enumerate() {
395            table_info.ident.table_id = table_ids[i];
396        }
397        let tables_info = raw_tables_info
398            .into_iter()
399            .map(|x| x.try_into().context(CreateTableInfoSnafu))
400            .collect::<Result<Vec<_>>>()?;
401
402        Ok(tables_info
403            .into_iter()
404            .map(|x| DistTable::table(Arc::new(x)))
405            .collect())
406    }
407
408    #[cfg(feature = "enterprise")]
409    #[tracing::instrument(skip_all)]
410    pub async fn create_trigger(
411        &self,
412        stmt: CreateTrigger,
413        query_context: QueryContextRef,
414    ) -> Result<Output> {
415        let expr = expr_helper::to_create_trigger_task_expr(stmt, &query_context)?;
416        self.create_trigger_inner(expr, query_context).await
417    }
418
419    #[cfg(feature = "enterprise")]
420    pub async fn create_trigger_inner(
421        &self,
422        expr: PbCreateTriggerExpr,
423        query_context: QueryContextRef,
424    ) -> Result<Output> {
425        self.create_trigger_procedure(expr, query_context).await?;
426        Ok(Output::new_with_affected_rows(0))
427    }
428
429    #[cfg(feature = "enterprise")]
430    async fn create_trigger_procedure(
431        &self,
432        expr: PbCreateTriggerExpr,
433        query_context: QueryContextRef,
434    ) -> Result<SubmitDdlTaskResponse> {
435        let task = CreateTriggerTask::try_from(PbCreateTriggerTask {
436            create_trigger: Some(expr),
437        })
438        .context(error::InvalidExprSnafu)?;
439
440        let request = SubmitDdlTaskRequest {
441            query_context,
442            task: DdlTask::new_create_trigger(task),
443        };
444
445        self.procedure_executor
446            .submit_ddl_task(&ExecutorContext::default(), request)
447            .await
448            .context(error::ExecuteDdlSnafu)
449    }
450
451    #[tracing::instrument(skip_all)]
452    pub async fn create_flow(
453        &self,
454        stmt: CreateFlow,
455        query_context: QueryContextRef,
456    ) -> Result<Output> {
457        // TODO(ruihang): do some verification
458        let expr = expr_helper::to_create_flow_task_expr(stmt, &query_context)?;
459
460        self.create_flow_inner(expr, query_context).await
461    }
462
463    pub async fn create_flow_inner(
464        &self,
465        expr: CreateFlowExpr,
466        query_context: QueryContextRef,
467    ) -> Result<Output> {
468        self.create_flow_procedure(expr, query_context).await?;
469        Ok(Output::new_with_affected_rows(0))
470    }
471
472    async fn create_flow_procedure(
473        &self,
474        expr: CreateFlowExpr,
475        query_context: QueryContextRef,
476    ) -> Result<SubmitDdlTaskResponse> {
477        let flow_type = self
478            .determine_flow_type(&expr, query_context.clone())
479            .await?;
480        info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
481
482        let expr = {
483            let mut expr = expr;
484            expr.flow_options
485                .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
486            expr
487        };
488
489        let task = CreateFlowTask::try_from(PbCreateFlowTask {
490            create_flow: Some(expr),
491        })
492        .context(error::InvalidExprSnafu)?;
493        let request = SubmitDdlTaskRequest {
494            query_context,
495            task: DdlTask::new_create_flow(task),
496        };
497
498        self.procedure_executor
499            .submit_ddl_task(&ExecutorContext::default(), request)
500            .await
501            .context(error::ExecuteDdlSnafu)
502    }
503
504    /// Determine the flow type based on the SQL query
505    ///
506    /// If it contains aggregation or distinct, then it is a batch flow, otherwise it is a streaming flow
507    async fn determine_flow_type(
508        &self,
509        expr: &CreateFlowExpr,
510        query_ctx: QueryContextRef,
511    ) -> Result<FlowType> {
512        // first check if source table's ttl is instant, if it is, force streaming mode
513        for src_table_name in &expr.source_table_names {
514            let table = self
515                .catalog_manager()
516                .table(
517                    &src_table_name.catalog_name,
518                    &src_table_name.schema_name,
519                    &src_table_name.table_name,
520                    Some(&query_ctx),
521                )
522                .await
523                .map_err(BoxedError::new)
524                .context(ExternalSnafu)?
525                .with_context(|| TableNotFoundSnafu {
526                    table_name: format_full_table_name(
527                        &src_table_name.catalog_name,
528                        &src_table_name.schema_name,
529                        &src_table_name.table_name,
530                    ),
531                })?;
532
533            // instant source table can only be handled by streaming mode
534            if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
535                warn!(
536                    "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
537                    format_full_table_name(
538                        &src_table_name.catalog_name,
539                        &src_table_name.schema_name,
540                        &src_table_name.table_name
541                    ),
542                    expr.flow_name
543                );
544                return Ok(FlowType::Streaming);
545            }
546        }
547
548        let engine = &self.query_engine;
549        let stmts = ParserContext::create_with_dialect(
550            &expr.sql,
551            query_ctx.sql_dialect(),
552            ParseOptions::default(),
553        )
554        .map_err(BoxedError::new)
555        .context(ExternalSnafu)?;
556
557        ensure!(
558            stmts.len() == 1,
559            InvalidSqlSnafu {
560                err_msg: format!("Expect only one statement, found {}", stmts.len())
561            }
562        );
563        let stmt = &stmts[0];
564
565        // support tql parse too
566        let plan = match stmt {
567            // prom ql is only supported in batching mode
568            Statement::Tql(_) => return Ok(FlowType::Batching),
569            _ => engine
570                .planner()
571                .plan(&QueryStatement::Sql(stmt.clone()), query_ctx)
572                .await
573                .map_err(BoxedError::new)
574                .context(ExternalSnafu)?,
575        };
576
577        /// Visitor to find aggregation or distinct
578        struct FindAggr {
579            is_aggr: bool,
580        }
581
582        impl TreeNodeVisitor<'_> for FindAggr {
583            type Node = LogicalPlan;
584            fn f_down(
585                &mut self,
586                node: &Self::Node,
587            ) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
588            {
589                match node {
590                    LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
591                        self.is_aggr = true;
592                        return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
593                    }
594                    _ => (),
595                }
596                Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
597            }
598        }
599
600        let mut find_aggr = FindAggr { is_aggr: false };
601
602        plan.visit_with_subqueries(&mut find_aggr)
603            .context(BuildDfLogicalPlanSnafu)?;
604        if find_aggr.is_aggr {
605            Ok(FlowType::Batching)
606        } else {
607            Ok(FlowType::Streaming)
608        }
609    }
610
611    #[tracing::instrument(skip_all)]
612    pub async fn create_view(
613        &self,
614        create_view: CreateView,
615        ctx: QueryContextRef,
616    ) -> Result<TableRef> {
617        // convert input into logical plan
618        let logical_plan = match &*create_view.query {
619            Statement::Query(query) => {
620                self.plan(
621                    &QueryStatement::Sql(Statement::Query(query.clone())),
622                    ctx.clone(),
623                )
624                .await?
625            }
626            Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?,
627            _ => {
628                return InvalidViewStmtSnafu {}.fail();
629            }
630        };
631        // Save the definition for `show create view`.
632        let definition = create_view.to_string();
633
634        // Save the columns in plan, it may changed when the schemas of tables in plan
635        // are altered.
636        let schema: Schema = logical_plan
637            .schema()
638            .clone()
639            .try_into()
640            .context(ConvertSchemaSnafu)?;
641        let plan_columns: Vec<_> = schema
642            .column_schemas()
643            .iter()
644            .map(|c| c.name.clone())
645            .collect();
646
647        let columns: Vec<_> = create_view
648            .columns
649            .iter()
650            .map(|ident| ident.to_string())
651            .collect();
652
653        // Validate columns
654        if !columns.is_empty() {
655            ensure!(
656                columns.len() == plan_columns.len(),
657                error::ViewColumnsMismatchSnafu {
658                    view_name: create_view.name.to_string(),
659                    expected: plan_columns.len(),
660                    actual: columns.len(),
661                }
662            );
663        }
664
665        // Extract the table names from the original plan
666        // and rewrite them as fully qualified names.
667        let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
668            .context(ExtractTableNamesSnafu)?;
669
670        let table_names = table_names.into_iter().map(|t| t.into()).collect();
671
672        // TODO(dennis): we don't save the optimized plan yet,
673        // because there are some serialization issue with our own defined plan node (such as `MergeScanLogicalPlan`).
674        // When the issues are fixed, we can use the `optimized_plan` instead.
675        // let optimized_plan = self.optimize_logical_plan(logical_plan)?.unwrap_df_plan();
676
677        // encode logical plan
678        let encoded_plan = DFLogicalSubstraitConvertor
679            .encode(&plan, DefaultSerializer)
680            .context(SubstraitCodecSnafu)?;
681
682        let expr = expr_helper::to_create_view_expr(
683            create_view,
684            encoded_plan.to_vec(),
685            table_names,
686            columns,
687            plan_columns,
688            definition,
689            ctx.clone(),
690        )?;
691
692        // TODO(dennis): validate the logical plan
693        self.create_view_by_expr(expr, ctx).await
694    }
695
696    pub async fn create_view_by_expr(
697        &self,
698        expr: CreateViewExpr,
699        ctx: QueryContextRef,
700    ) -> Result<TableRef> {
701        ensure! {
702            !(expr.create_if_not_exists & expr.or_replace),
703            InvalidSqlSnafu {
704                err_msg: "syntax error Create Or Replace and If Not Exist cannot be used together",
705            }
706        };
707        let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
708
709        let schema_exists = self
710            .table_metadata_manager
711            .schema_manager()
712            .exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name))
713            .await
714            .context(TableMetadataManagerSnafu)?;
715
716        ensure!(
717            schema_exists,
718            SchemaNotFoundSnafu {
719                schema_info: &expr.schema_name,
720            }
721        );
722
723        // if view or table exists.
724        if let Some(table) = self
725            .catalog_manager
726            .table(
727                &expr.catalog_name,
728                &expr.schema_name,
729                &expr.view_name,
730                Some(&ctx),
731            )
732            .await
733            .context(CatalogSnafu)?
734        {
735            let table_type = table.table_info().table_type;
736
737            match (table_type, expr.create_if_not_exists, expr.or_replace) {
738                (TableType::View, true, false) => {
739                    return Ok(table);
740                }
741                (TableType::View, false, false) => {
742                    return ViewAlreadyExistsSnafu {
743                        name: format_full_table_name(
744                            &expr.catalog_name,
745                            &expr.schema_name,
746                            &expr.view_name,
747                        ),
748                    }
749                    .fail();
750                }
751                (TableType::View, _, true) => {
752                    // Try to replace an exists view
753                }
754                _ => {
755                    return TableAlreadyExistsSnafu {
756                        table: format_full_table_name(
757                            &expr.catalog_name,
758                            &expr.schema_name,
759                            &expr.view_name,
760                        ),
761                    }
762                    .fail();
763                }
764            }
765        }
766
767        ensure!(
768            NAME_PATTERN_REG.is_match(&expr.view_name),
769            InvalidViewNameSnafu {
770                name: expr.view_name.clone(),
771            }
772        );
773
774        let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
775
776        let mut view_info = RawTableInfo {
777            ident: metadata::TableIdent {
778                // The view id of distributed table is assigned by Meta, set "0" here as a placeholder.
779                table_id: 0,
780                version: 0,
781            },
782            name: expr.view_name.clone(),
783            desc: None,
784            catalog_name: expr.catalog_name.clone(),
785            schema_name: expr.schema_name.clone(),
786            // The meta doesn't make sense for views, so using a default one.
787            meta: RawTableMeta::default(),
788            table_type: TableType::View,
789        };
790
791        let request = SubmitDdlTaskRequest {
792            query_context: ctx,
793            task: DdlTask::new_create_view(expr, view_info.clone()),
794        };
795
796        let resp = self
797            .procedure_executor
798            .submit_ddl_task(&ExecutorContext::default(), request)
799            .await
800            .context(error::ExecuteDdlSnafu)?;
801
802        debug!(
803            "Submit creating view '{view_name}' task response: {:?}",
804            resp
805        );
806
807        let view_id = resp
808            .table_ids
809            .into_iter()
810            .next()
811            .context(error::UnexpectedSnafu {
812                violated: "expected table_id",
813            })?;
814        info!("Successfully created view '{view_name}' with view id {view_id}");
815
816        view_info.ident.table_id = view_id;
817
818        let view_info = Arc::new(view_info.try_into().context(CreateTableInfoSnafu)?);
819
820        let table = DistTable::table(view_info);
821
822        // Invalidates local cache ASAP.
823        self.cache_invalidator
824            .invalidate(
825                &Context::default(),
826                &[
827                    CacheIdent::TableId(view_id),
828                    CacheIdent::TableName(view_name.clone()),
829                ],
830            )
831            .await
832            .context(error::InvalidateTableCacheSnafu)?;
833
834        Ok(table)
835    }
836
837    #[tracing::instrument(skip_all)]
838    pub async fn drop_flow(
839        &self,
840        catalog_name: String,
841        flow_name: String,
842        drop_if_exists: bool,
843        query_context: QueryContextRef,
844    ) -> Result<Output> {
845        if let Some(flow) = self
846            .flow_metadata_manager
847            .flow_name_manager()
848            .get(&catalog_name, &flow_name)
849            .await
850            .context(error::TableMetadataManagerSnafu)?
851        {
852            let flow_id = flow.flow_id();
853            let task = DropFlowTask {
854                catalog_name,
855                flow_name,
856                flow_id,
857                drop_if_exists,
858            };
859            self.drop_flow_procedure(task, query_context).await?;
860
861            Ok(Output::new_with_affected_rows(0))
862        } else if drop_if_exists {
863            Ok(Output::new_with_affected_rows(0))
864        } else {
865            FlowNotFoundSnafu {
866                flow_name: format_full_flow_name(&catalog_name, &flow_name),
867            }
868            .fail()
869        }
870    }
871
872    async fn drop_flow_procedure(
873        &self,
874        expr: DropFlowTask,
875        query_context: QueryContextRef,
876    ) -> Result<SubmitDdlTaskResponse> {
877        let request = SubmitDdlTaskRequest {
878            query_context,
879            task: DdlTask::new_drop_flow(expr),
880        };
881
882        self.procedure_executor
883            .submit_ddl_task(&ExecutorContext::default(), request)
884            .await
885            .context(error::ExecuteDdlSnafu)
886    }
887
888    #[cfg(feature = "enterprise")]
889    #[tracing::instrument(skip_all)]
890    pub(super) async fn drop_trigger(
891        &self,
892        catalog_name: String,
893        trigger_name: String,
894        drop_if_exists: bool,
895        query_context: QueryContextRef,
896    ) -> Result<Output> {
897        let task = DropTriggerTask {
898            catalog_name,
899            trigger_name,
900            drop_if_exists,
901        };
902        self.drop_trigger_procedure(task, query_context).await?;
903        Ok(Output::new_with_affected_rows(0))
904    }
905
906    #[cfg(feature = "enterprise")]
907    async fn drop_trigger_procedure(
908        &self,
909        expr: DropTriggerTask,
910        query_context: QueryContextRef,
911    ) -> Result<SubmitDdlTaskResponse> {
912        let request = SubmitDdlTaskRequest {
913            query_context,
914            task: DdlTask::new_drop_trigger(expr),
915        };
916
917        self.procedure_executor
918            .submit_ddl_task(&ExecutorContext::default(), request)
919            .await
920            .context(error::ExecuteDdlSnafu)
921    }
922
923    /// Drop a view
924    #[tracing::instrument(skip_all)]
925    pub(crate) async fn drop_view(
926        &self,
927        catalog: String,
928        schema: String,
929        view: String,
930        drop_if_exists: bool,
931        query_context: QueryContextRef,
932    ) -> Result<Output> {
933        let view_info = if let Some(view) = self
934            .catalog_manager
935            .table(&catalog, &schema, &view, None)
936            .await
937            .context(CatalogSnafu)?
938        {
939            view.table_info()
940        } else if drop_if_exists {
941            // DROP VIEW IF EXISTS meets view not found - ignored
942            return Ok(Output::new_with_affected_rows(0));
943        } else {
944            return TableNotFoundSnafu {
945                table_name: format_full_table_name(&catalog, &schema, &view),
946            }
947            .fail();
948        };
949
950        // Ensure the exists one is view, we can't drop other table types
951        ensure!(
952            view_info.table_type == TableType::View,
953            error::InvalidViewSnafu {
954                msg: "not a view",
955                view_name: format_full_table_name(&catalog, &schema, &view),
956            }
957        );
958
959        let view_id = view_info.table_id();
960
961        let task = DropViewTask {
962            catalog,
963            schema,
964            view,
965            view_id,
966            drop_if_exists,
967        };
968
969        self.drop_view_procedure(task, query_context).await?;
970
971        Ok(Output::new_with_affected_rows(0))
972    }
973
974    /// Submit [DropViewTask] to procedure executor.
975    async fn drop_view_procedure(
976        &self,
977        expr: DropViewTask,
978        query_context: QueryContextRef,
979    ) -> Result<SubmitDdlTaskResponse> {
980        let request = SubmitDdlTaskRequest {
981            query_context,
982            task: DdlTask::new_drop_view(expr),
983        };
984
985        self.procedure_executor
986            .submit_ddl_task(&ExecutorContext::default(), request)
987            .await
988            .context(error::ExecuteDdlSnafu)
989    }
990
991    #[tracing::instrument(skip_all)]
992    pub async fn alter_logical_tables(
993        &self,
994        alter_table_exprs: Vec<AlterTableExpr>,
995        query_context: QueryContextRef,
996    ) -> Result<Output> {
997        let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
998        ensure!(
999            !alter_table_exprs.is_empty(),
1000            EmptyDdlExprSnafu {
1001                name: "alter logical tables"
1002            }
1003        );
1004
1005        // group by physical table id
1006        let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
1007        for expr in alter_table_exprs {
1008            // Get table_id from catalog_manager
1009            let catalog = if expr.catalog_name.is_empty() {
1010                query_context.current_catalog()
1011            } else {
1012                &expr.catalog_name
1013            };
1014            let schema = if expr.schema_name.is_empty() {
1015                query_context.current_schema()
1016            } else {
1017                expr.schema_name.clone()
1018            };
1019            let table_name = &expr.table_name;
1020            let table = self
1021                .catalog_manager
1022                .table(catalog, &schema, table_name, Some(&query_context))
1023                .await
1024                .context(CatalogSnafu)?
1025                .with_context(|| TableNotFoundSnafu {
1026                    table_name: format_full_table_name(catalog, &schema, table_name),
1027                })?;
1028            let table_id = table.table_info().ident.table_id;
1029            let physical_table_id = self
1030                .table_metadata_manager
1031                .table_route_manager()
1032                .get_physical_table_id(table_id)
1033                .await
1034                .context(TableMetadataManagerSnafu)?;
1035            groups.entry(physical_table_id).or_default().push(expr);
1036        }
1037
1038        // Submit procedure for each physical table
1039        let mut handles = Vec::with_capacity(groups.len());
1040        for (_physical_table_id, exprs) in groups {
1041            let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
1042            handles.push(fut);
1043        }
1044        let _results = futures::future::try_join_all(handles).await?;
1045
1046        Ok(Output::new_with_affected_rows(0))
1047    }
1048
1049    #[tracing::instrument(skip_all)]
1050    pub async fn drop_table(
1051        &self,
1052        table_name: TableName,
1053        drop_if_exists: bool,
1054        query_context: QueryContextRef,
1055    ) -> Result<Output> {
1056        // Reserved for grpc call
1057        self.drop_tables(&[table_name], drop_if_exists, query_context)
1058            .await
1059    }
1060
1061    #[tracing::instrument(skip_all)]
1062    pub async fn drop_tables(
1063        &self,
1064        table_names: &[TableName],
1065        drop_if_exists: bool,
1066        query_context: QueryContextRef,
1067    ) -> Result<Output> {
1068        let mut tables = Vec::with_capacity(table_names.len());
1069        for table_name in table_names {
1070            ensure!(
1071                !is_readonly_schema(&table_name.schema_name),
1072                SchemaReadOnlySnafu {
1073                    name: table_name.schema_name.clone()
1074                }
1075            );
1076
1077            if let Some(table) = self
1078                .catalog_manager
1079                .table(
1080                    &table_name.catalog_name,
1081                    &table_name.schema_name,
1082                    &table_name.table_name,
1083                    Some(&query_context),
1084                )
1085                .await
1086                .context(CatalogSnafu)?
1087            {
1088                tables.push(table.table_info().table_id());
1089            } else if drop_if_exists {
1090                // DROP TABLE IF EXISTS meets table not found - ignored
1091                continue;
1092            } else {
1093                return TableNotFoundSnafu {
1094                    table_name: table_name.to_string(),
1095                }
1096                .fail();
1097            }
1098        }
1099
1100        for (table_name, table_id) in table_names.iter().zip(tables.into_iter()) {
1101            self.drop_table_procedure(table_name, table_id, drop_if_exists, query_context.clone())
1102                .await?;
1103
1104            // Invalidates local cache ASAP.
1105            self.cache_invalidator
1106                .invalidate(
1107                    &Context::default(),
1108                    &[
1109                        CacheIdent::TableId(table_id),
1110                        CacheIdent::TableName(table_name.clone()),
1111                    ],
1112                )
1113                .await
1114                .context(error::InvalidateTableCacheSnafu)?;
1115        }
1116        Ok(Output::new_with_affected_rows(0))
1117    }
1118
1119    #[tracing::instrument(skip_all)]
1120    pub async fn drop_database(
1121        &self,
1122        catalog: String,
1123        schema: String,
1124        drop_if_exists: bool,
1125        query_context: QueryContextRef,
1126    ) -> Result<Output> {
1127        ensure!(
1128            !is_readonly_schema(&schema),
1129            SchemaReadOnlySnafu { name: schema }
1130        );
1131
1132        if self
1133            .catalog_manager
1134            .schema_exists(&catalog, &schema, None)
1135            .await
1136            .context(CatalogSnafu)?
1137        {
1138            if schema == query_context.current_schema() {
1139                SchemaInUseSnafu { name: schema }.fail()
1140            } else {
1141                self.drop_database_procedure(catalog, schema, drop_if_exists, query_context)
1142                    .await?;
1143
1144                Ok(Output::new_with_affected_rows(0))
1145            }
1146        } else if drop_if_exists {
1147            // DROP TABLE IF EXISTS meets table not found - ignored
1148            Ok(Output::new_with_affected_rows(0))
1149        } else {
1150            SchemaNotFoundSnafu {
1151                schema_info: schema,
1152            }
1153            .fail()
1154        }
1155    }
1156
1157    #[tracing::instrument(skip_all)]
1158    pub async fn truncate_table(
1159        &self,
1160        table_name: TableName,
1161        time_ranges: Vec<(Timestamp, Timestamp)>,
1162        query_context: QueryContextRef,
1163    ) -> Result<Output> {
1164        ensure!(
1165            !is_readonly_schema(&table_name.schema_name),
1166            SchemaReadOnlySnafu {
1167                name: table_name.schema_name.clone()
1168            }
1169        );
1170
1171        let table = self
1172            .catalog_manager
1173            .table(
1174                &table_name.catalog_name,
1175                &table_name.schema_name,
1176                &table_name.table_name,
1177                Some(&query_context),
1178            )
1179            .await
1180            .context(CatalogSnafu)?
1181            .with_context(|| TableNotFoundSnafu {
1182                table_name: table_name.to_string(),
1183            })?;
1184        let table_id = table.table_info().table_id();
1185        self.truncate_table_procedure(&table_name, table_id, time_ranges, query_context)
1186            .await?;
1187
1188        Ok(Output::new_with_affected_rows(0))
1189    }
1190
1191    #[tracing::instrument(skip_all)]
1192    pub async fn alter_table(
1193        &self,
1194        alter_table: AlterTable,
1195        query_context: QueryContextRef,
1196    ) -> Result<Output> {
1197        if matches!(
1198            alter_table.alter_operation(),
1199            AlterTableOperation::Repartition { .. }
1200        ) {
1201            let _request = expr_helper::to_repartition_request(alter_table, &query_context)?;
1202            return NotSupportedSnafu {
1203                feat: "ALTER TABLE REPARTITION",
1204            }
1205            .fail();
1206        }
1207
1208        let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
1209        self.alter_table_inner(expr, query_context).await
1210    }
1211
1212    #[tracing::instrument(skip_all)]
1213    pub async fn alter_table_inner(
1214        &self,
1215        expr: AlterTableExpr,
1216        query_context: QueryContextRef,
1217    ) -> Result<Output> {
1218        ensure!(
1219            !is_readonly_schema(&expr.schema_name),
1220            SchemaReadOnlySnafu {
1221                name: expr.schema_name.clone()
1222            }
1223        );
1224
1225        let catalog_name = if expr.catalog_name.is_empty() {
1226            DEFAULT_CATALOG_NAME.to_string()
1227        } else {
1228            expr.catalog_name.clone()
1229        };
1230
1231        let schema_name = if expr.schema_name.is_empty() {
1232            DEFAULT_SCHEMA_NAME.to_string()
1233        } else {
1234            expr.schema_name.clone()
1235        };
1236
1237        let table_name = expr.table_name.clone();
1238
1239        let table = self
1240            .catalog_manager
1241            .table(
1242                &catalog_name,
1243                &schema_name,
1244                &table_name,
1245                Some(&query_context),
1246            )
1247            .await
1248            .context(CatalogSnafu)?
1249            .with_context(|| TableNotFoundSnafu {
1250                table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
1251            })?;
1252
1253        let table_id = table.table_info().ident.table_id;
1254        let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?;
1255        if !need_alter {
1256            return Ok(Output::new_with_affected_rows(0));
1257        }
1258        info!(
1259            "Table info before alter is {:?}, expr: {:?}",
1260            table.table_info(),
1261            expr
1262        );
1263
1264        let physical_table_id = self
1265            .table_metadata_manager
1266            .table_route_manager()
1267            .get_physical_table_id(table_id)
1268            .await
1269            .context(TableMetadataManagerSnafu)?;
1270
1271        let (req, invalidate_keys) = if physical_table_id == table_id {
1272            // This is physical table
1273            let req = SubmitDdlTaskRequest {
1274                query_context,
1275                task: DdlTask::new_alter_table(expr),
1276            };
1277
1278            let invalidate_keys = vec![
1279                CacheIdent::TableId(table_id),
1280                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1281            ];
1282
1283            (req, invalidate_keys)
1284        } else {
1285            // This is logical table
1286            let req = SubmitDdlTaskRequest {
1287                query_context,
1288                task: DdlTask::new_alter_logical_tables(vec![expr]),
1289            };
1290
1291            let mut invalidate_keys = vec![
1292                CacheIdent::TableId(physical_table_id),
1293                CacheIdent::TableId(table_id),
1294                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1295            ];
1296
1297            let physical_table = self
1298                .table_metadata_manager
1299                .table_info_manager()
1300                .get(physical_table_id)
1301                .await
1302                .context(TableMetadataManagerSnafu)?
1303                .map(|x| x.into_inner());
1304            if let Some(physical_table) = physical_table {
1305                let physical_table_name = TableName::new(
1306                    physical_table.table_info.catalog_name,
1307                    physical_table.table_info.schema_name,
1308                    physical_table.table_info.name,
1309                );
1310                invalidate_keys.push(CacheIdent::TableName(physical_table_name));
1311            }
1312
1313            (req, invalidate_keys)
1314        };
1315
1316        self.procedure_executor
1317            .submit_ddl_task(&ExecutorContext::default(), req)
1318            .await
1319            .context(error::ExecuteDdlSnafu)?;
1320
1321        // Invalidates local cache ASAP.
1322        self.cache_invalidator
1323            .invalidate(&Context::default(), &invalidate_keys)
1324            .await
1325            .context(error::InvalidateTableCacheSnafu)?;
1326
1327        Ok(Output::new_with_affected_rows(0))
1328    }
1329
1330    #[cfg(feature = "enterprise")]
1331    #[tracing::instrument(skip_all)]
1332    pub async fn alter_trigger(
1333        &self,
1334        _alter_expr: AlterTrigger,
1335        _query_context: QueryContextRef,
1336    ) -> Result<Output> {
1337        crate::error::NotSupportedSnafu {
1338            feat: "alter trigger",
1339        }
1340        .fail()
1341    }
1342
1343    #[tracing::instrument(skip_all)]
1344    pub async fn alter_database(
1345        &self,
1346        alter_expr: AlterDatabase,
1347        query_context: QueryContextRef,
1348    ) -> Result<Output> {
1349        let alter_expr = expr_helper::to_alter_database_expr(alter_expr, &query_context)?;
1350        self.alter_database_inner(alter_expr, query_context).await
1351    }
1352
1353    #[tracing::instrument(skip_all)]
1354    pub async fn alter_database_inner(
1355        &self,
1356        alter_expr: AlterDatabaseExpr,
1357        query_context: QueryContextRef,
1358    ) -> Result<Output> {
1359        ensure!(
1360            !is_readonly_schema(&alter_expr.schema_name),
1361            SchemaReadOnlySnafu {
1362                name: query_context.current_schema().clone()
1363            }
1364        );
1365
1366        let exists = self
1367            .catalog_manager
1368            .schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
1369            .await
1370            .context(CatalogSnafu)?;
1371        ensure!(
1372            exists,
1373            SchemaNotFoundSnafu {
1374                schema_info: alter_expr.schema_name,
1375            }
1376        );
1377
1378        let cache_ident = [CacheIdent::SchemaName(SchemaName {
1379            catalog_name: alter_expr.catalog_name.clone(),
1380            schema_name: alter_expr.schema_name.clone(),
1381        })];
1382
1383        self.alter_database_procedure(alter_expr, query_context)
1384            .await?;
1385
1386        // Invalidates local cache ASAP.
1387        self.cache_invalidator
1388            .invalidate(&Context::default(), &cache_ident)
1389            .await
1390            .context(error::InvalidateTableCacheSnafu)?;
1391
1392        Ok(Output::new_with_affected_rows(0))
1393    }
1394
1395    async fn create_table_procedure(
1396        &self,
1397        create_table: CreateTableExpr,
1398        partitions: Vec<PartitionExpr>,
1399        table_info: RawTableInfo,
1400        query_context: QueryContextRef,
1401    ) -> Result<SubmitDdlTaskResponse> {
1402        let partitions = partitions
1403            .into_iter()
1404            .map(|expr| expr.as_pb_partition().context(PartitionExprToPbSnafu))
1405            .collect::<Result<Vec<_>>>()?;
1406
1407        let request = SubmitDdlTaskRequest {
1408            query_context,
1409            task: DdlTask::new_create_table(create_table, partitions, table_info),
1410        };
1411
1412        self.procedure_executor
1413            .submit_ddl_task(&ExecutorContext::default(), request)
1414            .await
1415            .context(error::ExecuteDdlSnafu)
1416    }
1417
1418    async fn create_logical_tables_procedure(
1419        &self,
1420        tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
1421        query_context: QueryContextRef,
1422    ) -> Result<SubmitDdlTaskResponse> {
1423        let request = SubmitDdlTaskRequest {
1424            query_context,
1425            task: DdlTask::new_create_logical_tables(tables_data),
1426        };
1427
1428        self.procedure_executor
1429            .submit_ddl_task(&ExecutorContext::default(), request)
1430            .await
1431            .context(error::ExecuteDdlSnafu)
1432    }
1433
1434    async fn alter_logical_tables_procedure(
1435        &self,
1436        tables_data: Vec<AlterTableExpr>,
1437        query_context: QueryContextRef,
1438    ) -> Result<SubmitDdlTaskResponse> {
1439        let request = SubmitDdlTaskRequest {
1440            query_context,
1441            task: DdlTask::new_alter_logical_tables(tables_data),
1442        };
1443
1444        self.procedure_executor
1445            .submit_ddl_task(&ExecutorContext::default(), request)
1446            .await
1447            .context(error::ExecuteDdlSnafu)
1448    }
1449
1450    async fn drop_table_procedure(
1451        &self,
1452        table_name: &TableName,
1453        table_id: TableId,
1454        drop_if_exists: bool,
1455        query_context: QueryContextRef,
1456    ) -> Result<SubmitDdlTaskResponse> {
1457        let request = SubmitDdlTaskRequest {
1458            query_context,
1459            task: DdlTask::new_drop_table(
1460                table_name.catalog_name.clone(),
1461                table_name.schema_name.clone(),
1462                table_name.table_name.clone(),
1463                table_id,
1464                drop_if_exists,
1465            ),
1466        };
1467
1468        self.procedure_executor
1469            .submit_ddl_task(&ExecutorContext::default(), request)
1470            .await
1471            .context(error::ExecuteDdlSnafu)
1472    }
1473
1474    async fn drop_database_procedure(
1475        &self,
1476        catalog: String,
1477        schema: String,
1478        drop_if_exists: bool,
1479        query_context: QueryContextRef,
1480    ) -> Result<SubmitDdlTaskResponse> {
1481        let request = SubmitDdlTaskRequest {
1482            query_context,
1483            task: DdlTask::new_drop_database(catalog, schema, drop_if_exists),
1484        };
1485
1486        self.procedure_executor
1487            .submit_ddl_task(&ExecutorContext::default(), request)
1488            .await
1489            .context(error::ExecuteDdlSnafu)
1490    }
1491
1492    async fn alter_database_procedure(
1493        &self,
1494        alter_expr: AlterDatabaseExpr,
1495        query_context: QueryContextRef,
1496    ) -> Result<SubmitDdlTaskResponse> {
1497        let request = SubmitDdlTaskRequest {
1498            query_context,
1499            task: DdlTask::new_alter_database(alter_expr),
1500        };
1501
1502        self.procedure_executor
1503            .submit_ddl_task(&ExecutorContext::default(), request)
1504            .await
1505            .context(error::ExecuteDdlSnafu)
1506    }
1507
1508    async fn truncate_table_procedure(
1509        &self,
1510        table_name: &TableName,
1511        table_id: TableId,
1512        time_ranges: Vec<(Timestamp, Timestamp)>,
1513        query_context: QueryContextRef,
1514    ) -> Result<SubmitDdlTaskResponse> {
1515        let request = SubmitDdlTaskRequest {
1516            query_context,
1517            task: DdlTask::new_truncate_table(
1518                table_name.catalog_name.clone(),
1519                table_name.schema_name.clone(),
1520                table_name.table_name.clone(),
1521                table_id,
1522                time_ranges,
1523            ),
1524        };
1525
1526        self.procedure_executor
1527            .submit_ddl_task(&ExecutorContext::default(), request)
1528            .await
1529            .context(error::ExecuteDdlSnafu)
1530    }
1531
1532    #[tracing::instrument(skip_all)]
1533    pub async fn create_database(
1534        &self,
1535        database: &str,
1536        create_if_not_exists: bool,
1537        options: HashMap<String, String>,
1538        query_context: QueryContextRef,
1539    ) -> Result<Output> {
1540        let catalog = query_context.current_catalog();
1541        ensure!(
1542            NAME_PATTERN_REG.is_match(catalog),
1543            error::UnexpectedSnafu {
1544                violated: format!("Invalid catalog name: {}", catalog)
1545            }
1546        );
1547
1548        ensure!(
1549            NAME_PATTERN_REG.is_match(database),
1550            error::UnexpectedSnafu {
1551                violated: format!("Invalid database name: {}", database)
1552            }
1553        );
1554
1555        if !self
1556            .catalog_manager
1557            .schema_exists(catalog, database, None)
1558            .await
1559            .context(CatalogSnafu)?
1560            && !self.catalog_manager.is_reserved_schema_name(database)
1561        {
1562            self.create_database_procedure(
1563                catalog.to_string(),
1564                database.to_string(),
1565                create_if_not_exists,
1566                options,
1567                query_context,
1568            )
1569            .await?;
1570
1571            Ok(Output::new_with_affected_rows(1))
1572        } else if create_if_not_exists {
1573            Ok(Output::new_with_affected_rows(1))
1574        } else {
1575            error::SchemaExistsSnafu { name: database }.fail()
1576        }
1577    }
1578
1579    async fn create_database_procedure(
1580        &self,
1581        catalog: String,
1582        database: String,
1583        create_if_not_exists: bool,
1584        options: HashMap<String, String>,
1585        query_context: QueryContextRef,
1586    ) -> Result<SubmitDdlTaskResponse> {
1587        let request = SubmitDdlTaskRequest {
1588            query_context,
1589            task: DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
1590        };
1591
1592        self.procedure_executor
1593            .submit_ddl_task(&ExecutorContext::default(), request)
1594            .await
1595            .context(error::ExecuteDdlSnafu)
1596    }
1597}
1598
1599/// Parse partition statement [Partitions] into [PartitionExpr] and partition columns.
1600pub fn parse_partitions(
1601    create_table: &CreateTableExpr,
1602    partitions: Option<Partitions>,
1603    query_ctx: &QueryContextRef,
1604) -> Result<(Vec<PartitionExpr>, Vec<String>)> {
1605    // If partitions are not defined by user, use the timestamp column (which has to be existed) as
1606    // the partition column, and create only one partition.
1607    let partition_columns = find_partition_columns(&partitions)?;
1608    let partition_exprs =
1609        find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
1610
1611    // Validates partition
1612    let exprs = partition_exprs.clone();
1613    MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true)
1614        .context(InvalidPartitionSnafu)?;
1615
1616    Ok((partition_exprs, partition_columns))
1617}
1618
1619/// Verifies an alter and returns whether it is necessary to perform the alter.
1620///
1621/// # Returns
1622///
1623/// Returns true if the alter need to be porformed; otherwise, it returns false.
1624pub fn verify_alter(
1625    table_id: TableId,
1626    table_info: Arc<TableInfo>,
1627    expr: AlterTableExpr,
1628) -> Result<bool> {
1629    let request: AlterTableRequest =
1630        common_grpc_expr::alter_expr_to_request(table_id, expr, Some(&table_info.meta))
1631            .context(AlterExprToRequestSnafu)?;
1632
1633    let AlterTableRequest {
1634        table_name,
1635        alter_kind,
1636        ..
1637    } = &request;
1638
1639    if let AlterKind::RenameTable { new_table_name } = alter_kind {
1640        ensure!(
1641            NAME_PATTERN_REG.is_match(new_table_name),
1642            error::UnexpectedSnafu {
1643                violated: format!("Invalid table name: {}", new_table_name)
1644            }
1645        );
1646    } else if let AlterKind::AddColumns { columns } = alter_kind {
1647        // If all the columns are marked as add_if_not_exists and they already exist in the table,
1648        // there is no need to perform the alter.
1649        let column_names: HashSet<_> = table_info
1650            .meta
1651            .schema
1652            .column_schemas()
1653            .iter()
1654            .map(|schema| &schema.name)
1655            .collect();
1656        if columns.iter().all(|column| {
1657            column_names.contains(&column.column_schema.name) && column.add_if_not_exists
1658        }) {
1659            return Ok(false);
1660        }
1661    }
1662
1663    let _ = table_info
1664        .meta
1665        .builder_with_alter_kind(table_name, &request.alter_kind)
1666        .context(error::TableSnafu)?
1667        .build()
1668        .context(error::BuildTableMetaSnafu { table_name })?;
1669
1670    Ok(true)
1671}
1672
1673pub fn create_table_info(
1674    create_table: &CreateTableExpr,
1675    partition_columns: Vec<String>,
1676) -> Result<RawTableInfo> {
1677    let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
1678    let mut column_name_to_index_map = HashMap::new();
1679
1680    for (idx, column) in create_table.column_defs.iter().enumerate() {
1681        let schema =
1682            column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
1683                column: &column.name,
1684            })?;
1685        let schema = schema.with_time_index(column.name == create_table.time_index);
1686
1687        column_schemas.push(schema);
1688        let _ = column_name_to_index_map.insert(column.name.clone(), idx);
1689    }
1690
1691    let timestamp_index = column_name_to_index_map
1692        .get(&create_table.time_index)
1693        .cloned();
1694
1695    let raw_schema = RawSchema {
1696        column_schemas: column_schemas.clone(),
1697        timestamp_index,
1698        version: 0,
1699    };
1700
1701    let primary_key_indices = create_table
1702        .primary_keys
1703        .iter()
1704        .map(|name| {
1705            column_name_to_index_map
1706                .get(name)
1707                .cloned()
1708                .context(ColumnNotFoundSnafu { msg: name })
1709        })
1710        .collect::<Result<Vec<_>>>()?;
1711
1712    let partition_key_indices = partition_columns
1713        .into_iter()
1714        .map(|col_name| {
1715            column_name_to_index_map
1716                .get(&col_name)
1717                .cloned()
1718                .context(ColumnNotFoundSnafu { msg: col_name })
1719        })
1720        .collect::<Result<Vec<_>>>()?;
1721
1722    let table_options = TableOptions::try_from_iter(&create_table.table_options)
1723        .context(UnrecognizedTableOptionSnafu)?;
1724
1725    let meta = RawTableMeta {
1726        schema: raw_schema,
1727        primary_key_indices,
1728        value_indices: vec![],
1729        engine: create_table.engine.clone(),
1730        next_column_id: column_schemas.len() as u32,
1731        region_numbers: vec![],
1732        options: table_options,
1733        created_on: Utc::now(),
1734        partition_key_indices,
1735        column_ids: vec![],
1736    };
1737
1738    let desc = if create_table.desc.is_empty() {
1739        create_table.table_options.get(COMMENT_KEY).cloned()
1740    } else {
1741        Some(create_table.desc.clone())
1742    };
1743
1744    let table_info = RawTableInfo {
1745        ident: metadata::TableIdent {
1746            // The table id of distributed table is assigned by Meta, set "0" here as a placeholder.
1747            table_id: 0,
1748            version: 0,
1749        },
1750        name: create_table.table_name.clone(),
1751        desc,
1752        catalog_name: create_table.catalog_name.clone(),
1753        schema_name: create_table.schema_name.clone(),
1754        meta,
1755        table_type: TableType::Base,
1756    };
1757    Ok(table_info)
1758}
1759
1760fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
1761    let columns = if let Some(partitions) = partitions {
1762        partitions
1763            .column_list
1764            .iter()
1765            .map(|x| x.value.clone())
1766            .collect::<Vec<_>>()
1767    } else {
1768        vec![]
1769    };
1770    Ok(columns)
1771}
1772
1773/// Parse [Partitions] into a group of partition entries.
1774///
1775/// Returns a list of [PartitionExpr], each of which defines a partition.
1776fn find_partition_entries(
1777    create_table: &CreateTableExpr,
1778    partitions: &Option<Partitions>,
1779    partition_columns: &[String],
1780    query_ctx: &QueryContextRef,
1781) -> Result<Vec<PartitionExpr>> {
1782    let Some(partitions) = partitions else {
1783        return Ok(vec![]);
1784    };
1785
1786    // extract concrete data type of partition columns
1787    let column_name_and_type = partition_columns
1788        .iter()
1789        .map(|pc| {
1790            let column = create_table
1791                .column_defs
1792                .iter()
1793                .find(|c| &c.name == pc)
1794                // unwrap is safe here because we have checked that partition columns are defined
1795                .unwrap();
1796            let column_name = &column.name;
1797            let data_type = ConcreteDataType::from(
1798                ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
1799                    .context(ColumnDataTypeSnafu)?,
1800            );
1801            Ok((column_name, data_type))
1802        })
1803        .collect::<Result<HashMap<_, _>>>()?;
1804
1805    // Transform parser expr to partition expr
1806    let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
1807    for partition in &partitions.exprs {
1808        let partition_expr =
1809            convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
1810        partition_exprs.push(partition_expr);
1811    }
1812
1813    Ok(partition_exprs)
1814}
1815
1816fn convert_one_expr(
1817    expr: &Expr,
1818    column_name_and_type: &HashMap<&String, ConcreteDataType>,
1819    timezone: &Timezone,
1820) -> Result<PartitionExpr> {
1821    let Expr::BinaryOp { left, op, right } = expr else {
1822        return InvalidPartitionRuleSnafu {
1823            reason: "partition rule must be a binary expression",
1824        }
1825        .fail();
1826    };
1827
1828    let op =
1829        RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
1830            reason: format!("unsupported operator in partition expr {op}"),
1831        })?;
1832
1833    // convert leaf node.
1834    let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
1835        // col, val
1836        (Expr::Identifier(ident), Expr::Value(value)) => {
1837            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1838            let value = convert_value(&value.value, data_type, timezone, None)?;
1839            (Operand::Column(column_name), op, Operand::Value(value))
1840        }
1841        (Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
1842            if let Expr::Value(v) = &**expr =>
1843        {
1844            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1845            let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
1846            (Operand::Column(column_name), op, Operand::Value(value))
1847        }
1848        // val, col
1849        (Expr::Value(value), Expr::Identifier(ident)) => {
1850            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1851            let value = convert_value(&value.value, data_type, timezone, None)?;
1852            (Operand::Value(value), op, Operand::Column(column_name))
1853        }
1854        (Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
1855            if let Expr::Value(v) = &**expr =>
1856        {
1857            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1858            let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
1859            (Operand::Value(value), op, Operand::Column(column_name))
1860        }
1861        (Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
1862            // sub-expr must against another sub-expr
1863            let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
1864            let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
1865            (Operand::Expr(lhs), op, Operand::Expr(rhs))
1866        }
1867        _ => {
1868            return InvalidPartitionRuleSnafu {
1869                reason: format!("invalid partition expr {expr}"),
1870            }
1871            .fail();
1872        }
1873    };
1874
1875    Ok(PartitionExpr::new(lhs, op, rhs))
1876}
1877
1878fn convert_identifier(
1879    ident: &Ident,
1880    column_name_and_type: &HashMap<&String, ConcreteDataType>,
1881) -> Result<(String, ConcreteDataType)> {
1882    let column_name = ident.value.clone();
1883    let data_type = column_name_and_type
1884        .get(&column_name)
1885        .cloned()
1886        .with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
1887    Ok((column_name, data_type))
1888}
1889
1890fn convert_value(
1891    value: &ParserValue,
1892    data_type: ConcreteDataType,
1893    timezone: &Timezone,
1894    unary_op: Option<UnaryOperator>,
1895) -> Result<Value> {
1896    sql_value_to_value(
1897        "<NONAME>",
1898        &data_type,
1899        value,
1900        Some(timezone),
1901        unary_op,
1902        false,
1903    )
1904    .context(error::SqlCommonSnafu)
1905}
1906
1907#[cfg(test)]
1908mod test {
1909    use session::context::{QueryContext, QueryContextBuilder};
1910    use sql::dialect::GreptimeDbDialect;
1911    use sql::parser::{ParseOptions, ParserContext};
1912    use sql::statements::statement::Statement;
1913
1914    use super::*;
1915    use crate::expr_helper;
1916
1917    #[test]
1918    fn test_name_is_match() {
1919        assert!(!NAME_PATTERN_REG.is_match("/adaf"));
1920        assert!(!NAME_PATTERN_REG.is_match("🈲"));
1921        assert!(NAME_PATTERN_REG.is_match("hello"));
1922        assert!(NAME_PATTERN_REG.is_match("test@"));
1923        assert!(!NAME_PATTERN_REG.is_match("@test"));
1924        assert!(NAME_PATTERN_REG.is_match("test#"));
1925        assert!(!NAME_PATTERN_REG.is_match("#test"));
1926        assert!(!NAME_PATTERN_REG.is_match("@"));
1927        assert!(!NAME_PATTERN_REG.is_match("#"));
1928    }
1929
1930    #[tokio::test]
1931    #[ignore = "TODO(ruihang): WIP new partition rule"]
1932    async fn test_parse_partitions() {
1933        common_telemetry::init_default_ut_logging();
1934        let cases = [
1935            (
1936                r"
1937CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
1938PARTITION ON COLUMNS (b) (
1939  b < 'hz',
1940  b >= 'hz' AND b < 'sh',
1941  b >= 'sh'
1942)
1943ENGINE=mito",
1944                r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
1945            ),
1946            (
1947                r"
1948CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
1949PARTITION BY RANGE COLUMNS (b, a) (
1950  PARTITION r0 VALUES LESS THAN ('hz', 10),
1951  b < 'hz' AND a < 10,
1952  b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20,
1953  b >= 'sh' AND a >= 20
1954)
1955ENGINE=mito",
1956                r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
1957            ),
1958        ];
1959        let ctx = QueryContextBuilder::default().build().into();
1960        for (sql, expected) in cases {
1961            let result = ParserContext::create_with_dialect(
1962                sql,
1963                &GreptimeDbDialect {},
1964                ParseOptions::default(),
1965            )
1966            .unwrap();
1967            match &result[0] {
1968                Statement::CreateTable(c) => {
1969                    let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
1970                    let (partitions, _) =
1971                        parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
1972                    let json = serde_json::to_string(&partitions).unwrap();
1973                    assert_eq!(json, expected);
1974                }
1975                _ => unreachable!(),
1976            }
1977        }
1978    }
1979}