operator/statement/
ddl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use api::helper::ColumnDataTypeWrapper;
19use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
20use api::v1::{
21    AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, column_def,
22};
23#[cfg(feature = "enterprise")]
24use api::v1::{
25    CreateTriggerExpr as PbCreateTriggerExpr, meta::CreateTriggerTask as PbCreateTriggerTask,
26};
27use catalog::CatalogManagerRef;
28use chrono::Utc;
29use common_base::regex_pattern::NAME_PATTERN_REG;
30use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, is_readonly_schema};
31use common_catalog::{format_full_flow_name, format_full_table_name};
32use common_error::ext::BoxedError;
33use common_meta::cache_invalidator::Context;
34use common_meta::ddl::create_flow::FlowType;
35use common_meta::instruction::CacheIdent;
36use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
37use common_meta::procedure_executor::ExecutorContext;
38#[cfg(feature = "enterprise")]
39use common_meta::rpc::ddl::trigger::CreateTriggerTask;
40#[cfg(feature = "enterprise")]
41use common_meta::rpc::ddl::trigger::DropTriggerTask;
42use common_meta::rpc::ddl::{
43    CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
44    SubmitDdlTaskResponse,
45};
46use common_query::Output;
47use common_sql::convert::sql_value_to_value;
48use common_telemetry::{debug, info, tracing, warn};
49use common_time::{Timestamp, Timezone};
50use datafusion_common::tree_node::TreeNodeVisitor;
51use datafusion_expr::LogicalPlan;
52use datatypes::prelude::ConcreteDataType;
53use datatypes::schema::{ColumnSchema, RawSchema, Schema};
54use datatypes::value::Value;
55use partition::expr::{Operand, PartitionExpr, RestrictedOp};
56use partition::multi_dim::MultiDimPartitionRule;
57use query::parser::QueryStatement;
58use query::plan::extract_and_rewrite_full_table_names;
59use query::query_engine::DefaultSerializer;
60use query::sql::create_table_stmt;
61use session::context::QueryContextRef;
62use session::table_name::table_idents_to_full_name;
63use snafu::{OptionExt, ResultExt, ensure};
64use sql::parser::{ParseOptions, ParserContext};
65#[cfg(feature = "enterprise")]
66use sql::statements::alter::trigger::AlterTrigger;
67use sql::statements::alter::{AlterDatabase, AlterTable, AlterTableOperation};
68#[cfg(feature = "enterprise")]
69use sql::statements::create::trigger::CreateTrigger;
70use sql::statements::create::{
71    CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
72};
73use sql::statements::statement::Statement;
74use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
75use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
76use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
77use table::TableRef;
78use table::dist_table::DistTable;
79use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
80use table::requests::{AlterKind, AlterTableRequest, COMMENT_KEY, TableOptions};
81use table::table_name::TableName;
82use table::table_reference::TableReference;
83
84use crate::error::{
85    self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
86    ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
87    DeserializePartitionExprSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
88    FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
89    InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu,
90    PartitionExprToPbSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu,
91    SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
92    UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
93};
94use crate::expr_helper::{self, RepartitionRequest};
95use crate::statement::StatementExecutor;
96use crate::statement::show::create_partitions_stmt;
97
98impl StatementExecutor {
99    pub fn catalog_manager(&self) -> CatalogManagerRef {
100        self.catalog_manager.clone()
101    }
102
103    #[tracing::instrument(skip_all)]
104    pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
105        let (catalog, schema, _table) = table_idents_to_full_name(&stmt.name, &ctx)
106            .map_err(BoxedError::new)
107            .context(error::ExternalSnafu)?;
108
109        let schema_options = self
110            .table_metadata_manager
111            .schema_manager()
112            .get(SchemaNameKey {
113                catalog: &catalog,
114                schema: &schema,
115            })
116            .await
117            .context(TableMetadataManagerSnafu)?
118            .map(|v| v.into_inner());
119
120        let create_expr = &mut expr_helper::create_to_expr(&stmt, &ctx)?;
121        // Don't inherit schema-level TTL/compaction options into table options:
122        // TTL is applied during compaction, and `compaction.*` is handled separately.
123        if let Some(schema_options) = schema_options {
124            for (key, value) in schema_options.extra_options.iter() {
125                if key.starts_with("compaction.") {
126                    continue;
127                }
128                create_expr
129                    .table_options
130                    .entry(key.clone())
131                    .or_insert(value.clone());
132            }
133        }
134
135        self.create_table_inner(create_expr, stmt.partitions, ctx)
136            .await
137    }
138
139    #[tracing::instrument(skip_all)]
140    pub async fn create_table_like(
141        &self,
142        stmt: CreateTableLike,
143        ctx: QueryContextRef,
144    ) -> Result<TableRef> {
145        let (catalog, schema, table) = table_idents_to_full_name(&stmt.source_name, &ctx)
146            .map_err(BoxedError::new)
147            .context(error::ExternalSnafu)?;
148        let table_ref = self
149            .catalog_manager
150            .table(&catalog, &schema, &table, Some(&ctx))
151            .await
152            .context(CatalogSnafu)?
153            .context(TableNotFoundSnafu { table_name: &table })?;
154        let partitions = self
155            .partition_manager
156            .find_table_partitions(table_ref.table_info().table_id())
157            .await
158            .context(error::FindTablePartitionRuleSnafu { table_name: table })?;
159
160        // CREATE TABLE LIKE also inherits database level options.
161        let schema_options = self
162            .table_metadata_manager
163            .schema_manager()
164            .get(SchemaNameKey {
165                catalog: &catalog,
166                schema: &schema,
167            })
168            .await
169            .context(TableMetadataManagerSnafu)?
170            .map(|v| v.into_inner());
171
172        let quote_style = ctx.quote_style();
173        let mut create_stmt =
174            create_table_stmt(&table_ref.table_info(), schema_options, quote_style)
175                .context(error::ParseQuerySnafu)?;
176        create_stmt.name = stmt.table_name;
177        create_stmt.if_not_exists = false;
178
179        let table_info = table_ref.table_info();
180        let partitions =
181            create_partitions_stmt(&table_info, partitions)?.and_then(|mut partitions| {
182                if !partitions.column_list.is_empty() {
183                    partitions.set_quote(quote_style);
184                    Some(partitions)
185                } else {
186                    None
187                }
188            });
189
190        let create_expr = &mut expr_helper::create_to_expr(&create_stmt, &ctx)?;
191        self.create_table_inner(create_expr, partitions, ctx).await
192    }
193
194    #[tracing::instrument(skip_all)]
195    pub async fn create_external_table(
196        &self,
197        create_expr: CreateExternalTable,
198        ctx: QueryContextRef,
199    ) -> Result<TableRef> {
200        let create_expr = &mut expr_helper::create_external_expr(create_expr, &ctx).await?;
201        self.create_table_inner(create_expr, None, ctx).await
202    }
203
204    #[tracing::instrument(skip_all)]
205    pub async fn create_table_inner(
206        &self,
207        create_table: &mut CreateTableExpr,
208        partitions: Option<Partitions>,
209        query_ctx: QueryContextRef,
210    ) -> Result<TableRef> {
211        ensure!(
212            !is_readonly_schema(&create_table.schema_name),
213            SchemaReadOnlySnafu {
214                name: create_table.schema_name.clone()
215            }
216        );
217
218        if create_table.engine == METRIC_ENGINE_NAME
219            && create_table
220                .table_options
221                .contains_key(LOGICAL_TABLE_METADATA_KEY)
222        {
223            if let Some(partitions) = partitions.as_ref()
224                && !partitions.exprs.is_empty()
225            {
226                self.validate_logical_table_partition_rule(create_table, partitions, &query_ctx)
227                    .await?;
228            }
229            // Create logical tables
230            self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
231                .await?
232                .into_iter()
233                .next()
234                .context(error::UnexpectedSnafu {
235                    violated: "expected to create logical tables",
236                })
237        } else {
238            // Create other normal table
239            self.create_non_logic_table(create_table, partitions, query_ctx)
240                .await
241        }
242    }
243
244    #[tracing::instrument(skip_all)]
245    pub async fn create_non_logic_table(
246        &self,
247        create_table: &mut CreateTableExpr,
248        partitions: Option<Partitions>,
249        query_ctx: QueryContextRef,
250    ) -> Result<TableRef> {
251        let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
252
253        // Check if schema exists
254        let schema = self
255            .table_metadata_manager
256            .schema_manager()
257            .get(SchemaNameKey::new(
258                &create_table.catalog_name,
259                &create_table.schema_name,
260            ))
261            .await
262            .context(TableMetadataManagerSnafu)?;
263        ensure!(
264            schema.is_some(),
265            SchemaNotFoundSnafu {
266                schema_info: &create_table.schema_name,
267            }
268        );
269
270        // if table exists.
271        if let Some(table) = self
272            .catalog_manager
273            .table(
274                &create_table.catalog_name,
275                &create_table.schema_name,
276                &create_table.table_name,
277                Some(&query_ctx),
278            )
279            .await
280            .context(CatalogSnafu)?
281        {
282            return if create_table.create_if_not_exists {
283                Ok(table)
284            } else {
285                TableAlreadyExistsSnafu {
286                    table: format_full_table_name(
287                        &create_table.catalog_name,
288                        &create_table.schema_name,
289                        &create_table.table_name,
290                    ),
291                }
292                .fail()
293            };
294        }
295
296        ensure!(
297            NAME_PATTERN_REG.is_match(&create_table.table_name),
298            InvalidTableNameSnafu {
299                table_name: &create_table.table_name,
300            }
301        );
302
303        let table_name = TableName::new(
304            &create_table.catalog_name,
305            &create_table.schema_name,
306            &create_table.table_name,
307        );
308
309        let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
310        let mut table_info = create_table_info(create_table, partition_cols)?;
311
312        let resp = self
313            .create_table_procedure(
314                create_table.clone(),
315                partitions,
316                table_info.clone(),
317                query_ctx,
318            )
319            .await?;
320
321        let table_id = resp
322            .table_ids
323            .into_iter()
324            .next()
325            .context(error::UnexpectedSnafu {
326                violated: "expected table_id",
327            })?;
328        info!("Successfully created table '{table_name}' with table id {table_id}");
329
330        table_info.ident.table_id = table_id;
331
332        let table_info: Arc<TableInfo> =
333            Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?);
334        create_table.table_id = Some(api::v1::TableId { id: table_id });
335
336        let table = DistTable::table(table_info);
337
338        Ok(table)
339    }
340
341    #[tracing::instrument(skip_all)]
342    pub async fn create_logical_tables(
343        &self,
344        create_table_exprs: &[CreateTableExpr],
345        query_context: QueryContextRef,
346    ) -> Result<Vec<TableRef>> {
347        let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
348        ensure!(
349            !create_table_exprs.is_empty(),
350            EmptyDdlExprSnafu {
351                name: "create logic tables"
352            }
353        );
354
355        // Check table names
356        for create_table in create_table_exprs {
357            ensure!(
358                NAME_PATTERN_REG.is_match(&create_table.table_name),
359                InvalidTableNameSnafu {
360                    table_name: &create_table.table_name,
361                }
362            );
363        }
364
365        let mut raw_tables_info = create_table_exprs
366            .iter()
367            .map(|create| create_table_info(create, vec![]))
368            .collect::<Result<Vec<_>>>()?;
369        let tables_data = create_table_exprs
370            .iter()
371            .cloned()
372            .zip(raw_tables_info.iter().cloned())
373            .collect::<Vec<_>>();
374
375        let resp = self
376            .create_logical_tables_procedure(tables_data, query_context)
377            .await?;
378
379        let table_ids = resp.table_ids;
380        ensure!(
381            table_ids.len() == raw_tables_info.len(),
382            CreateLogicalTablesSnafu {
383                reason: format!(
384                    "The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}",
385                    raw_tables_info.len(),
386                    table_ids.len()
387                )
388            }
389        );
390        info!("Successfully created logical tables: {:?}", table_ids);
391
392        for (i, table_info) in raw_tables_info.iter_mut().enumerate() {
393            table_info.ident.table_id = table_ids[i];
394        }
395        let tables_info = raw_tables_info
396            .into_iter()
397            .map(|x| x.try_into().context(CreateTableInfoSnafu))
398            .collect::<Result<Vec<_>>>()?;
399
400        Ok(tables_info
401            .into_iter()
402            .map(|x| DistTable::table(Arc::new(x)))
403            .collect())
404    }
405
406    async fn validate_logical_table_partition_rule(
407        &self,
408        create_table: &CreateTableExpr,
409        partitions: &Partitions,
410        query_ctx: &QueryContextRef,
411    ) -> Result<()> {
412        let (_, mut logical_partition_exprs) =
413            parse_partitions_for_logical_validation(create_table, partitions, query_ctx)?;
414
415        let physical_table_name = create_table
416            .table_options
417            .get(LOGICAL_TABLE_METADATA_KEY)
418            .with_context(|| CreateLogicalTablesSnafu {
419                reason: format!(
420                    "expect `{LOGICAL_TABLE_METADATA_KEY}` option on creating logical table"
421                ),
422            })?;
423
424        let physical_table = self
425            .catalog_manager
426            .table(
427                &create_table.catalog_name,
428                &create_table.schema_name,
429                physical_table_name,
430                Some(query_ctx),
431            )
432            .await
433            .context(CatalogSnafu)?
434            .context(TableNotFoundSnafu {
435                table_name: physical_table_name.clone(),
436            })?;
437
438        let physical_table_info = physical_table.table_info();
439        let partition_rule = self
440            .partition_manager
441            .find_table_partition_rule(&physical_table_info)
442            .await
443            .context(error::FindTablePartitionRuleSnafu {
444                table_name: physical_table_name.clone(),
445            })?;
446
447        let multi_dim_rule = partition_rule
448            .as_ref()
449            .as_any()
450            .downcast_ref::<MultiDimPartitionRule>()
451            .context(InvalidPartitionRuleSnafu {
452                reason: "physical table partition rule is not range-based",
453            })?;
454
455        // TODO(ruihang): project physical partition exprs to logical partition column
456        let mut physical_partition_exprs = multi_dim_rule.exprs().to_vec();
457        logical_partition_exprs.sort_unstable();
458        physical_partition_exprs.sort_unstable();
459
460        ensure!(
461            physical_partition_exprs == logical_partition_exprs,
462            InvalidPartitionRuleSnafu {
463                reason: format!(
464                    "logical table partition rule must match the corresponding physical table's\n logical table partition exprs:\t\t {:?}\n physical table partition exprs:\t {:?}",
465                    logical_partition_exprs, physical_partition_exprs
466                ),
467            }
468        );
469
470        Ok(())
471    }
472
473    #[cfg(feature = "enterprise")]
474    #[tracing::instrument(skip_all)]
475    pub async fn create_trigger(
476        &self,
477        stmt: CreateTrigger,
478        query_context: QueryContextRef,
479    ) -> Result<Output> {
480        let expr = expr_helper::to_create_trigger_task_expr(stmt, &query_context)?;
481        self.create_trigger_inner(expr, query_context).await
482    }
483
484    #[cfg(feature = "enterprise")]
485    pub async fn create_trigger_inner(
486        &self,
487        expr: PbCreateTriggerExpr,
488        query_context: QueryContextRef,
489    ) -> Result<Output> {
490        self.create_trigger_procedure(expr, query_context).await?;
491        Ok(Output::new_with_affected_rows(0))
492    }
493
494    #[cfg(feature = "enterprise")]
495    async fn create_trigger_procedure(
496        &self,
497        expr: PbCreateTriggerExpr,
498        query_context: QueryContextRef,
499    ) -> Result<SubmitDdlTaskResponse> {
500        let task = CreateTriggerTask::try_from(PbCreateTriggerTask {
501            create_trigger: Some(expr),
502        })
503        .context(error::InvalidExprSnafu)?;
504
505        let request = SubmitDdlTaskRequest {
506            query_context,
507            task: DdlTask::new_create_trigger(task),
508        };
509
510        self.procedure_executor
511            .submit_ddl_task(&ExecutorContext::default(), request)
512            .await
513            .context(error::ExecuteDdlSnafu)
514    }
515
516    #[tracing::instrument(skip_all)]
517    pub async fn create_flow(
518        &self,
519        stmt: CreateFlow,
520        query_context: QueryContextRef,
521    ) -> Result<Output> {
522        // TODO(ruihang): do some verification
523        let expr = expr_helper::to_create_flow_task_expr(stmt, &query_context)?;
524
525        self.create_flow_inner(expr, query_context).await
526    }
527
528    pub async fn create_flow_inner(
529        &self,
530        expr: CreateFlowExpr,
531        query_context: QueryContextRef,
532    ) -> Result<Output> {
533        self.create_flow_procedure(expr, query_context).await?;
534        Ok(Output::new_with_affected_rows(0))
535    }
536
537    async fn create_flow_procedure(
538        &self,
539        expr: CreateFlowExpr,
540        query_context: QueryContextRef,
541    ) -> Result<SubmitDdlTaskResponse> {
542        let flow_type = self
543            .determine_flow_type(&expr, query_context.clone())
544            .await?;
545        info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
546
547        let expr = {
548            let mut expr = expr;
549            expr.flow_options
550                .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
551            expr
552        };
553
554        let task = CreateFlowTask::try_from(PbCreateFlowTask {
555            create_flow: Some(expr),
556        })
557        .context(error::InvalidExprSnafu)?;
558        let request = SubmitDdlTaskRequest {
559            query_context,
560            task: DdlTask::new_create_flow(task),
561        };
562
563        self.procedure_executor
564            .submit_ddl_task(&ExecutorContext::default(), request)
565            .await
566            .context(error::ExecuteDdlSnafu)
567    }
568
569    /// Determine the flow type based on the SQL query
570    ///
571    /// If it contains aggregation or distinct, then it is a batch flow, otherwise it is a streaming flow
572    async fn determine_flow_type(
573        &self,
574        expr: &CreateFlowExpr,
575        query_ctx: QueryContextRef,
576    ) -> Result<FlowType> {
577        // first check if source table's ttl is instant, if it is, force streaming mode
578        for src_table_name in &expr.source_table_names {
579            let table = self
580                .catalog_manager()
581                .table(
582                    &src_table_name.catalog_name,
583                    &src_table_name.schema_name,
584                    &src_table_name.table_name,
585                    Some(&query_ctx),
586                )
587                .await
588                .map_err(BoxedError::new)
589                .context(ExternalSnafu)?
590                .with_context(|| TableNotFoundSnafu {
591                    table_name: format_full_table_name(
592                        &src_table_name.catalog_name,
593                        &src_table_name.schema_name,
594                        &src_table_name.table_name,
595                    ),
596                })?;
597
598            // instant source table can only be handled by streaming mode
599            if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
600                warn!(
601                    "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
602                    format_full_table_name(
603                        &src_table_name.catalog_name,
604                        &src_table_name.schema_name,
605                        &src_table_name.table_name
606                    ),
607                    expr.flow_name
608                );
609                return Ok(FlowType::Streaming);
610            }
611        }
612
613        let engine = &self.query_engine;
614        let stmts = ParserContext::create_with_dialect(
615            &expr.sql,
616            query_ctx.sql_dialect(),
617            ParseOptions::default(),
618        )
619        .map_err(BoxedError::new)
620        .context(ExternalSnafu)?;
621
622        ensure!(
623            stmts.len() == 1,
624            InvalidSqlSnafu {
625                err_msg: format!("Expect only one statement, found {}", stmts.len())
626            }
627        );
628        let stmt = &stmts[0];
629
630        // support tql parse too
631        let plan = match stmt {
632            // prom ql is only supported in batching mode
633            Statement::Tql(_) => return Ok(FlowType::Batching),
634            _ => engine
635                .planner()
636                .plan(&QueryStatement::Sql(stmt.clone()), query_ctx)
637                .await
638                .map_err(BoxedError::new)
639                .context(ExternalSnafu)?,
640        };
641
642        /// Visitor to find aggregation or distinct
643        struct FindAggr {
644            is_aggr: bool,
645        }
646
647        impl TreeNodeVisitor<'_> for FindAggr {
648            type Node = LogicalPlan;
649            fn f_down(
650                &mut self,
651                node: &Self::Node,
652            ) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
653            {
654                match node {
655                    LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
656                        self.is_aggr = true;
657                        return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
658                    }
659                    _ => (),
660                }
661                Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
662            }
663        }
664
665        let mut find_aggr = FindAggr { is_aggr: false };
666
667        plan.visit_with_subqueries(&mut find_aggr)
668            .context(BuildDfLogicalPlanSnafu)?;
669        if find_aggr.is_aggr {
670            Ok(FlowType::Batching)
671        } else {
672            Ok(FlowType::Streaming)
673        }
674    }
675
676    #[tracing::instrument(skip_all)]
677    pub async fn create_view(
678        &self,
679        create_view: CreateView,
680        ctx: QueryContextRef,
681    ) -> Result<TableRef> {
682        // convert input into logical plan
683        let logical_plan = match &*create_view.query {
684            Statement::Query(query) => {
685                self.plan(
686                    &QueryStatement::Sql(Statement::Query(query.clone())),
687                    ctx.clone(),
688                )
689                .await?
690            }
691            Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?,
692            _ => {
693                return InvalidViewStmtSnafu {}.fail();
694            }
695        };
696        // Save the definition for `show create view`.
697        let definition = create_view.to_string();
698
699        // Save the columns in plan, it may changed when the schemas of tables in plan
700        // are altered.
701        let schema: Schema = logical_plan
702            .schema()
703            .clone()
704            .try_into()
705            .context(ConvertSchemaSnafu)?;
706        let plan_columns: Vec<_> = schema
707            .column_schemas()
708            .iter()
709            .map(|c| c.name.clone())
710            .collect();
711
712        let columns: Vec<_> = create_view
713            .columns
714            .iter()
715            .map(|ident| ident.to_string())
716            .collect();
717
718        // Validate columns
719        if !columns.is_empty() {
720            ensure!(
721                columns.len() == plan_columns.len(),
722                error::ViewColumnsMismatchSnafu {
723                    view_name: create_view.name.to_string(),
724                    expected: plan_columns.len(),
725                    actual: columns.len(),
726                }
727            );
728        }
729
730        // Extract the table names from the original plan
731        // and rewrite them as fully qualified names.
732        let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
733            .context(ExtractTableNamesSnafu)?;
734
735        let table_names = table_names.into_iter().map(|t| t.into()).collect();
736
737        // TODO(dennis): we don't save the optimized plan yet,
738        // because there are some serialization issue with our own defined plan node (such as `MergeScanLogicalPlan`).
739        // When the issues are fixed, we can use the `optimized_plan` instead.
740        // let optimized_plan = self.optimize_logical_plan(logical_plan)?.unwrap_df_plan();
741
742        // encode logical plan
743        let encoded_plan = DFLogicalSubstraitConvertor
744            .encode(&plan, DefaultSerializer)
745            .context(SubstraitCodecSnafu)?;
746
747        let expr = expr_helper::to_create_view_expr(
748            create_view,
749            encoded_plan.to_vec(),
750            table_names,
751            columns,
752            plan_columns,
753            definition,
754            ctx.clone(),
755        )?;
756
757        // TODO(dennis): validate the logical plan
758        self.create_view_by_expr(expr, ctx).await
759    }
760
761    pub async fn create_view_by_expr(
762        &self,
763        expr: CreateViewExpr,
764        ctx: QueryContextRef,
765    ) -> Result<TableRef> {
766        ensure! {
767            !(expr.create_if_not_exists & expr.or_replace),
768            InvalidSqlSnafu {
769                err_msg: "syntax error Create Or Replace and If Not Exist cannot be used together",
770            }
771        };
772        let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
773
774        let schema_exists = self
775            .table_metadata_manager
776            .schema_manager()
777            .exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name))
778            .await
779            .context(TableMetadataManagerSnafu)?;
780
781        ensure!(
782            schema_exists,
783            SchemaNotFoundSnafu {
784                schema_info: &expr.schema_name,
785            }
786        );
787
788        // if view or table exists.
789        if let Some(table) = self
790            .catalog_manager
791            .table(
792                &expr.catalog_name,
793                &expr.schema_name,
794                &expr.view_name,
795                Some(&ctx),
796            )
797            .await
798            .context(CatalogSnafu)?
799        {
800            let table_type = table.table_info().table_type;
801
802            match (table_type, expr.create_if_not_exists, expr.or_replace) {
803                (TableType::View, true, false) => {
804                    return Ok(table);
805                }
806                (TableType::View, false, false) => {
807                    return ViewAlreadyExistsSnafu {
808                        name: format_full_table_name(
809                            &expr.catalog_name,
810                            &expr.schema_name,
811                            &expr.view_name,
812                        ),
813                    }
814                    .fail();
815                }
816                (TableType::View, _, true) => {
817                    // Try to replace an exists view
818                }
819                _ => {
820                    return TableAlreadyExistsSnafu {
821                        table: format_full_table_name(
822                            &expr.catalog_name,
823                            &expr.schema_name,
824                            &expr.view_name,
825                        ),
826                    }
827                    .fail();
828                }
829            }
830        }
831
832        ensure!(
833            NAME_PATTERN_REG.is_match(&expr.view_name),
834            InvalidViewNameSnafu {
835                name: expr.view_name.clone(),
836            }
837        );
838
839        let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
840
841        let mut view_info = RawTableInfo {
842            ident: metadata::TableIdent {
843                // The view id of distributed table is assigned by Meta, set "0" here as a placeholder.
844                table_id: 0,
845                version: 0,
846            },
847            name: expr.view_name.clone(),
848            desc: None,
849            catalog_name: expr.catalog_name.clone(),
850            schema_name: expr.schema_name.clone(),
851            // The meta doesn't make sense for views, so using a default one.
852            meta: RawTableMeta::default(),
853            table_type: TableType::View,
854        };
855
856        let request = SubmitDdlTaskRequest {
857            query_context: ctx,
858            task: DdlTask::new_create_view(expr, view_info.clone()),
859        };
860
861        let resp = self
862            .procedure_executor
863            .submit_ddl_task(&ExecutorContext::default(), request)
864            .await
865            .context(error::ExecuteDdlSnafu)?;
866
867        debug!(
868            "Submit creating view '{view_name}' task response: {:?}",
869            resp
870        );
871
872        let view_id = resp
873            .table_ids
874            .into_iter()
875            .next()
876            .context(error::UnexpectedSnafu {
877                violated: "expected table_id",
878            })?;
879        info!("Successfully created view '{view_name}' with view id {view_id}");
880
881        view_info.ident.table_id = view_id;
882
883        let view_info = Arc::new(view_info.try_into().context(CreateTableInfoSnafu)?);
884
885        let table = DistTable::table(view_info);
886
887        // Invalidates local cache ASAP.
888        self.cache_invalidator
889            .invalidate(
890                &Context::default(),
891                &[
892                    CacheIdent::TableId(view_id),
893                    CacheIdent::TableName(view_name.clone()),
894                ],
895            )
896            .await
897            .context(error::InvalidateTableCacheSnafu)?;
898
899        Ok(table)
900    }
901
902    #[tracing::instrument(skip_all)]
903    pub async fn drop_flow(
904        &self,
905        catalog_name: String,
906        flow_name: String,
907        drop_if_exists: bool,
908        query_context: QueryContextRef,
909    ) -> Result<Output> {
910        if let Some(flow) = self
911            .flow_metadata_manager
912            .flow_name_manager()
913            .get(&catalog_name, &flow_name)
914            .await
915            .context(error::TableMetadataManagerSnafu)?
916        {
917            let flow_id = flow.flow_id();
918            let task = DropFlowTask {
919                catalog_name,
920                flow_name,
921                flow_id,
922                drop_if_exists,
923            };
924            self.drop_flow_procedure(task, query_context).await?;
925
926            Ok(Output::new_with_affected_rows(0))
927        } else if drop_if_exists {
928            Ok(Output::new_with_affected_rows(0))
929        } else {
930            FlowNotFoundSnafu {
931                flow_name: format_full_flow_name(&catalog_name, &flow_name),
932            }
933            .fail()
934        }
935    }
936
937    async fn drop_flow_procedure(
938        &self,
939        expr: DropFlowTask,
940        query_context: QueryContextRef,
941    ) -> Result<SubmitDdlTaskResponse> {
942        let request = SubmitDdlTaskRequest {
943            query_context,
944            task: DdlTask::new_drop_flow(expr),
945        };
946
947        self.procedure_executor
948            .submit_ddl_task(&ExecutorContext::default(), request)
949            .await
950            .context(error::ExecuteDdlSnafu)
951    }
952
953    #[cfg(feature = "enterprise")]
954    #[tracing::instrument(skip_all)]
955    pub(super) async fn drop_trigger(
956        &self,
957        catalog_name: String,
958        trigger_name: String,
959        drop_if_exists: bool,
960        query_context: QueryContextRef,
961    ) -> Result<Output> {
962        let task = DropTriggerTask {
963            catalog_name,
964            trigger_name,
965            drop_if_exists,
966        };
967        self.drop_trigger_procedure(task, query_context).await?;
968        Ok(Output::new_with_affected_rows(0))
969    }
970
971    #[cfg(feature = "enterprise")]
972    async fn drop_trigger_procedure(
973        &self,
974        expr: DropTriggerTask,
975        query_context: QueryContextRef,
976    ) -> Result<SubmitDdlTaskResponse> {
977        let request = SubmitDdlTaskRequest {
978            query_context,
979            task: DdlTask::new_drop_trigger(expr),
980        };
981
982        self.procedure_executor
983            .submit_ddl_task(&ExecutorContext::default(), request)
984            .await
985            .context(error::ExecuteDdlSnafu)
986    }
987
988    /// Drop a view
989    #[tracing::instrument(skip_all)]
990    pub(crate) async fn drop_view(
991        &self,
992        catalog: String,
993        schema: String,
994        view: String,
995        drop_if_exists: bool,
996        query_context: QueryContextRef,
997    ) -> Result<Output> {
998        let view_info = if let Some(view) = self
999            .catalog_manager
1000            .table(&catalog, &schema, &view, None)
1001            .await
1002            .context(CatalogSnafu)?
1003        {
1004            view.table_info()
1005        } else if drop_if_exists {
1006            // DROP VIEW IF EXISTS meets view not found - ignored
1007            return Ok(Output::new_with_affected_rows(0));
1008        } else {
1009            return TableNotFoundSnafu {
1010                table_name: format_full_table_name(&catalog, &schema, &view),
1011            }
1012            .fail();
1013        };
1014
1015        // Ensure the exists one is view, we can't drop other table types
1016        ensure!(
1017            view_info.table_type == TableType::View,
1018            error::InvalidViewSnafu {
1019                msg: "not a view",
1020                view_name: format_full_table_name(&catalog, &schema, &view),
1021            }
1022        );
1023
1024        let view_id = view_info.table_id();
1025
1026        let task = DropViewTask {
1027            catalog,
1028            schema,
1029            view,
1030            view_id,
1031            drop_if_exists,
1032        };
1033
1034        self.drop_view_procedure(task, query_context).await?;
1035
1036        Ok(Output::new_with_affected_rows(0))
1037    }
1038
1039    /// Submit [DropViewTask] to procedure executor.
1040    async fn drop_view_procedure(
1041        &self,
1042        expr: DropViewTask,
1043        query_context: QueryContextRef,
1044    ) -> Result<SubmitDdlTaskResponse> {
1045        let request = SubmitDdlTaskRequest {
1046            query_context,
1047            task: DdlTask::new_drop_view(expr),
1048        };
1049
1050        self.procedure_executor
1051            .submit_ddl_task(&ExecutorContext::default(), request)
1052            .await
1053            .context(error::ExecuteDdlSnafu)
1054    }
1055
1056    #[tracing::instrument(skip_all)]
1057    pub async fn alter_logical_tables(
1058        &self,
1059        alter_table_exprs: Vec<AlterTableExpr>,
1060        query_context: QueryContextRef,
1061    ) -> Result<Output> {
1062        let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
1063        ensure!(
1064            !alter_table_exprs.is_empty(),
1065            EmptyDdlExprSnafu {
1066                name: "alter logical tables"
1067            }
1068        );
1069
1070        // group by physical table id
1071        let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
1072        for expr in alter_table_exprs {
1073            // Get table_id from catalog_manager
1074            let catalog = if expr.catalog_name.is_empty() {
1075                query_context.current_catalog()
1076            } else {
1077                &expr.catalog_name
1078            };
1079            let schema = if expr.schema_name.is_empty() {
1080                query_context.current_schema()
1081            } else {
1082                expr.schema_name.clone()
1083            };
1084            let table_name = &expr.table_name;
1085            let table = self
1086                .catalog_manager
1087                .table(catalog, &schema, table_name, Some(&query_context))
1088                .await
1089                .context(CatalogSnafu)?
1090                .with_context(|| TableNotFoundSnafu {
1091                    table_name: format_full_table_name(catalog, &schema, table_name),
1092                })?;
1093            let table_id = table.table_info().ident.table_id;
1094            let physical_table_id = self
1095                .table_metadata_manager
1096                .table_route_manager()
1097                .get_physical_table_id(table_id)
1098                .await
1099                .context(TableMetadataManagerSnafu)?;
1100            groups.entry(physical_table_id).or_default().push(expr);
1101        }
1102
1103        // Submit procedure for each physical table
1104        let mut handles = Vec::with_capacity(groups.len());
1105        for (_physical_table_id, exprs) in groups {
1106            let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
1107            handles.push(fut);
1108        }
1109        let _results = futures::future::try_join_all(handles).await?;
1110
1111        Ok(Output::new_with_affected_rows(0))
1112    }
1113
1114    #[tracing::instrument(skip_all)]
1115    pub async fn drop_table(
1116        &self,
1117        table_name: TableName,
1118        drop_if_exists: bool,
1119        query_context: QueryContextRef,
1120    ) -> Result<Output> {
1121        // Reserved for grpc call
1122        self.drop_tables(&[table_name], drop_if_exists, query_context)
1123            .await
1124    }
1125
1126    #[tracing::instrument(skip_all)]
1127    pub async fn drop_tables(
1128        &self,
1129        table_names: &[TableName],
1130        drop_if_exists: bool,
1131        query_context: QueryContextRef,
1132    ) -> Result<Output> {
1133        let mut tables = Vec::with_capacity(table_names.len());
1134        for table_name in table_names {
1135            ensure!(
1136                !is_readonly_schema(&table_name.schema_name),
1137                SchemaReadOnlySnafu {
1138                    name: table_name.schema_name.clone()
1139                }
1140            );
1141
1142            if let Some(table) = self
1143                .catalog_manager
1144                .table(
1145                    &table_name.catalog_name,
1146                    &table_name.schema_name,
1147                    &table_name.table_name,
1148                    Some(&query_context),
1149                )
1150                .await
1151                .context(CatalogSnafu)?
1152            {
1153                tables.push(table.table_info().table_id());
1154            } else if drop_if_exists {
1155                // DROP TABLE IF EXISTS meets table not found - ignored
1156                continue;
1157            } else {
1158                return TableNotFoundSnafu {
1159                    table_name: table_name.to_string(),
1160                }
1161                .fail();
1162            }
1163        }
1164
1165        for (table_name, table_id) in table_names.iter().zip(tables.into_iter()) {
1166            self.drop_table_procedure(table_name, table_id, drop_if_exists, query_context.clone())
1167                .await?;
1168
1169            // Invalidates local cache ASAP.
1170            self.cache_invalidator
1171                .invalidate(
1172                    &Context::default(),
1173                    &[
1174                        CacheIdent::TableId(table_id),
1175                        CacheIdent::TableName(table_name.clone()),
1176                    ],
1177                )
1178                .await
1179                .context(error::InvalidateTableCacheSnafu)?;
1180        }
1181        Ok(Output::new_with_affected_rows(0))
1182    }
1183
1184    #[tracing::instrument(skip_all)]
1185    pub async fn drop_database(
1186        &self,
1187        catalog: String,
1188        schema: String,
1189        drop_if_exists: bool,
1190        query_context: QueryContextRef,
1191    ) -> Result<Output> {
1192        ensure!(
1193            !is_readonly_schema(&schema),
1194            SchemaReadOnlySnafu { name: schema }
1195        );
1196
1197        if self
1198            .catalog_manager
1199            .schema_exists(&catalog, &schema, None)
1200            .await
1201            .context(CatalogSnafu)?
1202        {
1203            if schema == query_context.current_schema() {
1204                SchemaInUseSnafu { name: schema }.fail()
1205            } else {
1206                self.drop_database_procedure(catalog, schema, drop_if_exists, query_context)
1207                    .await?;
1208
1209                Ok(Output::new_with_affected_rows(0))
1210            }
1211        } else if drop_if_exists {
1212            // DROP TABLE IF EXISTS meets table not found - ignored
1213            Ok(Output::new_with_affected_rows(0))
1214        } else {
1215            SchemaNotFoundSnafu {
1216                schema_info: schema,
1217            }
1218            .fail()
1219        }
1220    }
1221
1222    #[tracing::instrument(skip_all)]
1223    pub async fn truncate_table(
1224        &self,
1225        table_name: TableName,
1226        time_ranges: Vec<(Timestamp, Timestamp)>,
1227        query_context: QueryContextRef,
1228    ) -> Result<Output> {
1229        ensure!(
1230            !is_readonly_schema(&table_name.schema_name),
1231            SchemaReadOnlySnafu {
1232                name: table_name.schema_name.clone()
1233            }
1234        );
1235
1236        let table = self
1237            .catalog_manager
1238            .table(
1239                &table_name.catalog_name,
1240                &table_name.schema_name,
1241                &table_name.table_name,
1242                Some(&query_context),
1243            )
1244            .await
1245            .context(CatalogSnafu)?
1246            .with_context(|| TableNotFoundSnafu {
1247                table_name: table_name.to_string(),
1248            })?;
1249        let table_id = table.table_info().table_id();
1250        self.truncate_table_procedure(&table_name, table_id, time_ranges, query_context)
1251            .await?;
1252
1253        Ok(Output::new_with_affected_rows(0))
1254    }
1255
1256    #[tracing::instrument(skip_all)]
1257    pub async fn alter_table(
1258        &self,
1259        alter_table: AlterTable,
1260        query_context: QueryContextRef,
1261    ) -> Result<Output> {
1262        if matches!(
1263            alter_table.alter_operation(),
1264            AlterTableOperation::Repartition { .. }
1265        ) {
1266            let request = expr_helper::to_repartition_request(alter_table, &query_context)?;
1267            return self.repartition_table(request, &query_context).await;
1268        }
1269
1270        let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
1271        self.alter_table_inner(expr, query_context).await
1272    }
1273
1274    #[tracing::instrument(skip_all)]
1275    pub async fn repartition_table(
1276        &self,
1277        request: RepartitionRequest,
1278        query_context: &QueryContextRef,
1279    ) -> Result<Output> {
1280        // Check if the schema is read-only.
1281        ensure!(
1282            !is_readonly_schema(&request.schema_name),
1283            SchemaReadOnlySnafu {
1284                name: request.schema_name.clone()
1285            }
1286        );
1287
1288        let table_ref = TableReference::full(
1289            &request.catalog_name,
1290            &request.schema_name,
1291            &request.table_name,
1292        );
1293        // Get the table from the catalog.
1294        let table = self
1295            .catalog_manager
1296            .table(
1297                &request.catalog_name,
1298                &request.schema_name,
1299                &request.table_name,
1300                Some(query_context),
1301            )
1302            .await
1303            .context(CatalogSnafu)?
1304            .with_context(|| TableNotFoundSnafu {
1305                table_name: table_ref.to_string(),
1306            })?;
1307        let table_id = table.table_info().ident.table_id;
1308        // Get existing partition expressions from the table route.
1309        let (physical_table_id, physical_table_route) = self
1310            .table_metadata_manager
1311            .table_route_manager()
1312            .get_physical_table_route(table_id)
1313            .await
1314            .context(TableMetadataManagerSnafu)?;
1315
1316        ensure!(
1317            physical_table_id == table_id,
1318            NotSupportedSnafu {
1319                feat: "REPARTITION on logical tables"
1320            }
1321        );
1322
1323        let table_info = table.table_info();
1324        // Get partition column names from the table metadata.
1325        let existing_partition_columns = table_info.meta.partition_columns().collect::<Vec<_>>();
1326        // Repartition requires the table to have partition columns.
1327        ensure!(
1328            !existing_partition_columns.is_empty(),
1329            InvalidPartitionRuleSnafu {
1330                reason: format!(
1331                    "table {} does not have partition columns, cannot repartition",
1332                    table_ref
1333                )
1334            }
1335        );
1336
1337        // Repartition operations involving columns outside the existing partition columns are not supported.
1338        // This restriction ensures repartition only applies to current partition columns.
1339        let column_name_and_type = existing_partition_columns
1340            .iter()
1341            .map(|column| (&column.name, column.data_type.clone()))
1342            .collect();
1343        let timezone = query_context.timezone();
1344        // Convert SQL Exprs to PartitionExprs.
1345        let from_partition_exprs = request
1346            .from_exprs
1347            .iter()
1348            .map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone))
1349            .collect::<Result<Vec<_>>>()?;
1350
1351        let into_partition_exprs = request
1352            .into_exprs
1353            .iter()
1354            .map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone))
1355            .collect::<Result<Vec<_>>>()?;
1356
1357        // Parse existing partition expressions from region routes.
1358        let mut existing_partition_exprs =
1359            Vec::with_capacity(physical_table_route.region_routes.len());
1360        for route in &physical_table_route.region_routes {
1361            let expr_json = route.region.partition_expr();
1362            if !expr_json.is_empty() {
1363                match PartitionExpr::from_json_str(&expr_json) {
1364                    Ok(Some(expr)) => existing_partition_exprs.push(expr),
1365                    Ok(None) => {
1366                        // Empty
1367                    }
1368                    Err(e) => {
1369                        return Err(e).context(DeserializePartitionExprSnafu);
1370                    }
1371                }
1372            }
1373        }
1374
1375        // Validate that from_partition_exprs are a subset of existing partition exprs.
1376        // We compare PartitionExpr directly since it implements Eq.
1377        for from_expr in &from_partition_exprs {
1378            ensure!(
1379                existing_partition_exprs.contains(from_expr),
1380                InvalidPartitionRuleSnafu {
1381                    reason: format!(
1382                        "partition expression '{}' does not exist in table {}",
1383                        from_expr, table_ref
1384                    )
1385                }
1386            );
1387        }
1388
1389        // Build the new partition expressions:
1390        // new_exprs = existing_exprs - from_exprs + into_exprs
1391        let new_partition_exprs: Vec<PartitionExpr> = existing_partition_exprs
1392            .into_iter()
1393            .filter(|expr| !from_partition_exprs.contains(expr))
1394            .chain(into_partition_exprs.clone().into_iter())
1395            .collect();
1396        let new_partition_exprs_len = new_partition_exprs.len();
1397
1398        // Validate the new partition expressions using MultiDimPartitionRule and PartitionChecker.
1399        let _ = MultiDimPartitionRule::try_new(
1400            existing_partition_columns
1401                .iter()
1402                .map(|c| c.name.clone())
1403                .collect(),
1404            vec![],
1405            new_partition_exprs,
1406            true,
1407        )
1408        .context(InvalidPartitionSnafu)?;
1409
1410        info!(
1411            "Repartition table {} (table_id={}) from {:?} to {:?}, new partition count: {}",
1412            table_ref,
1413            table_id,
1414            from_partition_exprs,
1415            into_partition_exprs,
1416            new_partition_exprs_len
1417        );
1418
1419        // TODO(weny): Implement the repartition procedure submission.
1420        // The repartition procedure infrastructure is not yet fully integrated with the DDL task system.
1421        NotSupportedSnafu {
1422            feat: "ALTER TABLE REPARTITION",
1423        }
1424        .fail()
1425    }
1426
1427    #[tracing::instrument(skip_all)]
1428    pub async fn alter_table_inner(
1429        &self,
1430        expr: AlterTableExpr,
1431        query_context: QueryContextRef,
1432    ) -> Result<Output> {
1433        ensure!(
1434            !is_readonly_schema(&expr.schema_name),
1435            SchemaReadOnlySnafu {
1436                name: expr.schema_name.clone()
1437            }
1438        );
1439
1440        let catalog_name = if expr.catalog_name.is_empty() {
1441            DEFAULT_CATALOG_NAME.to_string()
1442        } else {
1443            expr.catalog_name.clone()
1444        };
1445
1446        let schema_name = if expr.schema_name.is_empty() {
1447            DEFAULT_SCHEMA_NAME.to_string()
1448        } else {
1449            expr.schema_name.clone()
1450        };
1451
1452        let table_name = expr.table_name.clone();
1453
1454        let table = self
1455            .catalog_manager
1456            .table(
1457                &catalog_name,
1458                &schema_name,
1459                &table_name,
1460                Some(&query_context),
1461            )
1462            .await
1463            .context(CatalogSnafu)?
1464            .with_context(|| TableNotFoundSnafu {
1465                table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
1466            })?;
1467
1468        let table_id = table.table_info().ident.table_id;
1469        let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?;
1470        if !need_alter {
1471            return Ok(Output::new_with_affected_rows(0));
1472        }
1473        info!(
1474            "Table info before alter is {:?}, expr: {:?}",
1475            table.table_info(),
1476            expr
1477        );
1478
1479        let physical_table_id = self
1480            .table_metadata_manager
1481            .table_route_manager()
1482            .get_physical_table_id(table_id)
1483            .await
1484            .context(TableMetadataManagerSnafu)?;
1485
1486        let (req, invalidate_keys) = if physical_table_id == table_id {
1487            // This is physical table
1488            let req = SubmitDdlTaskRequest {
1489                query_context,
1490                task: DdlTask::new_alter_table(expr),
1491            };
1492
1493            let invalidate_keys = vec![
1494                CacheIdent::TableId(table_id),
1495                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1496            ];
1497
1498            (req, invalidate_keys)
1499        } else {
1500            // This is logical table
1501            let req = SubmitDdlTaskRequest {
1502                query_context,
1503                task: DdlTask::new_alter_logical_tables(vec![expr]),
1504            };
1505
1506            let mut invalidate_keys = vec![
1507                CacheIdent::TableId(physical_table_id),
1508                CacheIdent::TableId(table_id),
1509                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1510            ];
1511
1512            let physical_table = self
1513                .table_metadata_manager
1514                .table_info_manager()
1515                .get(physical_table_id)
1516                .await
1517                .context(TableMetadataManagerSnafu)?
1518                .map(|x| x.into_inner());
1519            if let Some(physical_table) = physical_table {
1520                let physical_table_name = TableName::new(
1521                    physical_table.table_info.catalog_name,
1522                    physical_table.table_info.schema_name,
1523                    physical_table.table_info.name,
1524                );
1525                invalidate_keys.push(CacheIdent::TableName(physical_table_name));
1526            }
1527
1528            (req, invalidate_keys)
1529        };
1530
1531        self.procedure_executor
1532            .submit_ddl_task(&ExecutorContext::default(), req)
1533            .await
1534            .context(error::ExecuteDdlSnafu)?;
1535
1536        // Invalidates local cache ASAP.
1537        self.cache_invalidator
1538            .invalidate(&Context::default(), &invalidate_keys)
1539            .await
1540            .context(error::InvalidateTableCacheSnafu)?;
1541
1542        Ok(Output::new_with_affected_rows(0))
1543    }
1544
1545    #[cfg(feature = "enterprise")]
1546    #[tracing::instrument(skip_all)]
1547    pub async fn alter_trigger(
1548        &self,
1549        _alter_expr: AlterTrigger,
1550        _query_context: QueryContextRef,
1551    ) -> Result<Output> {
1552        crate::error::NotSupportedSnafu {
1553            feat: "alter trigger",
1554        }
1555        .fail()
1556    }
1557
1558    #[tracing::instrument(skip_all)]
1559    pub async fn alter_database(
1560        &self,
1561        alter_expr: AlterDatabase,
1562        query_context: QueryContextRef,
1563    ) -> Result<Output> {
1564        let alter_expr = expr_helper::to_alter_database_expr(alter_expr, &query_context)?;
1565        self.alter_database_inner(alter_expr, query_context).await
1566    }
1567
1568    #[tracing::instrument(skip_all)]
1569    pub async fn alter_database_inner(
1570        &self,
1571        alter_expr: AlterDatabaseExpr,
1572        query_context: QueryContextRef,
1573    ) -> Result<Output> {
1574        ensure!(
1575            !is_readonly_schema(&alter_expr.schema_name),
1576            SchemaReadOnlySnafu {
1577                name: query_context.current_schema().clone()
1578            }
1579        );
1580
1581        let exists = self
1582            .catalog_manager
1583            .schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
1584            .await
1585            .context(CatalogSnafu)?;
1586        ensure!(
1587            exists,
1588            SchemaNotFoundSnafu {
1589                schema_info: alter_expr.schema_name,
1590            }
1591        );
1592
1593        let cache_ident = [CacheIdent::SchemaName(SchemaName {
1594            catalog_name: alter_expr.catalog_name.clone(),
1595            schema_name: alter_expr.schema_name.clone(),
1596        })];
1597
1598        self.alter_database_procedure(alter_expr, query_context)
1599            .await?;
1600
1601        // Invalidates local cache ASAP.
1602        self.cache_invalidator
1603            .invalidate(&Context::default(), &cache_ident)
1604            .await
1605            .context(error::InvalidateTableCacheSnafu)?;
1606
1607        Ok(Output::new_with_affected_rows(0))
1608    }
1609
1610    async fn create_table_procedure(
1611        &self,
1612        create_table: CreateTableExpr,
1613        partitions: Vec<PartitionExpr>,
1614        table_info: RawTableInfo,
1615        query_context: QueryContextRef,
1616    ) -> Result<SubmitDdlTaskResponse> {
1617        let partitions = partitions
1618            .into_iter()
1619            .map(|expr| expr.as_pb_partition().context(PartitionExprToPbSnafu))
1620            .collect::<Result<Vec<_>>>()?;
1621
1622        let request = SubmitDdlTaskRequest {
1623            query_context,
1624            task: DdlTask::new_create_table(create_table, partitions, table_info),
1625        };
1626
1627        self.procedure_executor
1628            .submit_ddl_task(&ExecutorContext::default(), request)
1629            .await
1630            .context(error::ExecuteDdlSnafu)
1631    }
1632
1633    async fn create_logical_tables_procedure(
1634        &self,
1635        tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
1636        query_context: QueryContextRef,
1637    ) -> Result<SubmitDdlTaskResponse> {
1638        let request = SubmitDdlTaskRequest {
1639            query_context,
1640            task: DdlTask::new_create_logical_tables(tables_data),
1641        };
1642
1643        self.procedure_executor
1644            .submit_ddl_task(&ExecutorContext::default(), request)
1645            .await
1646            .context(error::ExecuteDdlSnafu)
1647    }
1648
1649    async fn alter_logical_tables_procedure(
1650        &self,
1651        tables_data: Vec<AlterTableExpr>,
1652        query_context: QueryContextRef,
1653    ) -> Result<SubmitDdlTaskResponse> {
1654        let request = SubmitDdlTaskRequest {
1655            query_context,
1656            task: DdlTask::new_alter_logical_tables(tables_data),
1657        };
1658
1659        self.procedure_executor
1660            .submit_ddl_task(&ExecutorContext::default(), request)
1661            .await
1662            .context(error::ExecuteDdlSnafu)
1663    }
1664
1665    async fn drop_table_procedure(
1666        &self,
1667        table_name: &TableName,
1668        table_id: TableId,
1669        drop_if_exists: bool,
1670        query_context: QueryContextRef,
1671    ) -> Result<SubmitDdlTaskResponse> {
1672        let request = SubmitDdlTaskRequest {
1673            query_context,
1674            task: DdlTask::new_drop_table(
1675                table_name.catalog_name.clone(),
1676                table_name.schema_name.clone(),
1677                table_name.table_name.clone(),
1678                table_id,
1679                drop_if_exists,
1680            ),
1681        };
1682
1683        self.procedure_executor
1684            .submit_ddl_task(&ExecutorContext::default(), request)
1685            .await
1686            .context(error::ExecuteDdlSnafu)
1687    }
1688
1689    async fn drop_database_procedure(
1690        &self,
1691        catalog: String,
1692        schema: String,
1693        drop_if_exists: bool,
1694        query_context: QueryContextRef,
1695    ) -> Result<SubmitDdlTaskResponse> {
1696        let request = SubmitDdlTaskRequest {
1697            query_context,
1698            task: DdlTask::new_drop_database(catalog, schema, drop_if_exists),
1699        };
1700
1701        self.procedure_executor
1702            .submit_ddl_task(&ExecutorContext::default(), request)
1703            .await
1704            .context(error::ExecuteDdlSnafu)
1705    }
1706
1707    async fn alter_database_procedure(
1708        &self,
1709        alter_expr: AlterDatabaseExpr,
1710        query_context: QueryContextRef,
1711    ) -> Result<SubmitDdlTaskResponse> {
1712        let request = SubmitDdlTaskRequest {
1713            query_context,
1714            task: DdlTask::new_alter_database(alter_expr),
1715        };
1716
1717        self.procedure_executor
1718            .submit_ddl_task(&ExecutorContext::default(), request)
1719            .await
1720            .context(error::ExecuteDdlSnafu)
1721    }
1722
1723    async fn truncate_table_procedure(
1724        &self,
1725        table_name: &TableName,
1726        table_id: TableId,
1727        time_ranges: Vec<(Timestamp, Timestamp)>,
1728        query_context: QueryContextRef,
1729    ) -> Result<SubmitDdlTaskResponse> {
1730        let request = SubmitDdlTaskRequest {
1731            query_context,
1732            task: DdlTask::new_truncate_table(
1733                table_name.catalog_name.clone(),
1734                table_name.schema_name.clone(),
1735                table_name.table_name.clone(),
1736                table_id,
1737                time_ranges,
1738            ),
1739        };
1740
1741        self.procedure_executor
1742            .submit_ddl_task(&ExecutorContext::default(), request)
1743            .await
1744            .context(error::ExecuteDdlSnafu)
1745    }
1746
1747    #[tracing::instrument(skip_all)]
1748    pub async fn create_database(
1749        &self,
1750        database: &str,
1751        create_if_not_exists: bool,
1752        options: HashMap<String, String>,
1753        query_context: QueryContextRef,
1754    ) -> Result<Output> {
1755        let catalog = query_context.current_catalog();
1756        ensure!(
1757            NAME_PATTERN_REG.is_match(catalog),
1758            error::UnexpectedSnafu {
1759                violated: format!("Invalid catalog name: {}", catalog)
1760            }
1761        );
1762
1763        ensure!(
1764            NAME_PATTERN_REG.is_match(database),
1765            error::UnexpectedSnafu {
1766                violated: format!("Invalid database name: {}", database)
1767            }
1768        );
1769
1770        if !self
1771            .catalog_manager
1772            .schema_exists(catalog, database, None)
1773            .await
1774            .context(CatalogSnafu)?
1775            && !self.catalog_manager.is_reserved_schema_name(database)
1776        {
1777            self.create_database_procedure(
1778                catalog.to_string(),
1779                database.to_string(),
1780                create_if_not_exists,
1781                options,
1782                query_context,
1783            )
1784            .await?;
1785
1786            Ok(Output::new_with_affected_rows(1))
1787        } else if create_if_not_exists {
1788            Ok(Output::new_with_affected_rows(1))
1789        } else {
1790            error::SchemaExistsSnafu { name: database }.fail()
1791        }
1792    }
1793
1794    async fn create_database_procedure(
1795        &self,
1796        catalog: String,
1797        database: String,
1798        create_if_not_exists: bool,
1799        options: HashMap<String, String>,
1800        query_context: QueryContextRef,
1801    ) -> Result<SubmitDdlTaskResponse> {
1802        let request = SubmitDdlTaskRequest {
1803            query_context,
1804            task: DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
1805        };
1806
1807        self.procedure_executor
1808            .submit_ddl_task(&ExecutorContext::default(), request)
1809            .await
1810            .context(error::ExecuteDdlSnafu)
1811    }
1812}
1813
1814/// Parse partition statement [Partitions] into [PartitionExpr] and partition columns.
1815pub fn parse_partitions(
1816    create_table: &CreateTableExpr,
1817    partitions: Option<Partitions>,
1818    query_ctx: &QueryContextRef,
1819) -> Result<(Vec<PartitionExpr>, Vec<String>)> {
1820    // If partitions are not defined by user, use the timestamp column (which has to be existed) as
1821    // the partition column, and create only one partition.
1822    let partition_columns = find_partition_columns(&partitions)?;
1823    let partition_exprs =
1824        find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
1825
1826    // Validates partition
1827    let exprs = partition_exprs.clone();
1828    MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true)
1829        .context(InvalidPartitionSnafu)?;
1830
1831    Ok((partition_exprs, partition_columns))
1832}
1833
1834fn parse_partitions_for_logical_validation(
1835    create_table: &CreateTableExpr,
1836    partitions: &Partitions,
1837    query_ctx: &QueryContextRef,
1838) -> Result<(Vec<String>, Vec<PartitionExpr>)> {
1839    let partition_columns = partitions
1840        .column_list
1841        .iter()
1842        .map(|ident| ident.value.clone())
1843        .collect::<Vec<_>>();
1844
1845    let column_name_and_type = partition_columns
1846        .iter()
1847        .map(|pc| {
1848            let column = create_table
1849                .column_defs
1850                .iter()
1851                .find(|c| &c.name == pc)
1852                .context(ColumnNotFoundSnafu { msg: pc.clone() })?;
1853            let column_name = &column.name;
1854            let data_type = ConcreteDataType::from(
1855                ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
1856                    .context(ColumnDataTypeSnafu)?,
1857            );
1858            Ok((column_name, data_type))
1859        })
1860        .collect::<Result<HashMap<_, _>>>()?;
1861
1862    let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
1863    for expr in &partitions.exprs {
1864        let partition_expr = convert_one_expr(expr, &column_name_and_type, &query_ctx.timezone())?;
1865        partition_exprs.push(partition_expr);
1866    }
1867
1868    MultiDimPartitionRule::try_new(
1869        partition_columns.clone(),
1870        vec![],
1871        partition_exprs.clone(),
1872        true,
1873    )
1874    .context(InvalidPartitionSnafu)?;
1875
1876    Ok((partition_columns, partition_exprs))
1877}
1878
1879/// Verifies an alter and returns whether it is necessary to perform the alter.
1880///
1881/// # Returns
1882///
1883/// Returns true if the alter need to be porformed; otherwise, it returns false.
1884pub fn verify_alter(
1885    table_id: TableId,
1886    table_info: Arc<TableInfo>,
1887    expr: AlterTableExpr,
1888) -> Result<bool> {
1889    let request: AlterTableRequest =
1890        common_grpc_expr::alter_expr_to_request(table_id, expr, Some(&table_info.meta))
1891            .context(AlterExprToRequestSnafu)?;
1892
1893    let AlterTableRequest {
1894        table_name,
1895        alter_kind,
1896        ..
1897    } = &request;
1898
1899    if let AlterKind::RenameTable { new_table_name } = alter_kind {
1900        ensure!(
1901            NAME_PATTERN_REG.is_match(new_table_name),
1902            error::UnexpectedSnafu {
1903                violated: format!("Invalid table name: {}", new_table_name)
1904            }
1905        );
1906    } else if let AlterKind::AddColumns { columns } = alter_kind {
1907        // If all the columns are marked as add_if_not_exists and they already exist in the table,
1908        // there is no need to perform the alter.
1909        let column_names: HashSet<_> = table_info
1910            .meta
1911            .schema
1912            .column_schemas()
1913            .iter()
1914            .map(|schema| &schema.name)
1915            .collect();
1916        if columns.iter().all(|column| {
1917            column_names.contains(&column.column_schema.name) && column.add_if_not_exists
1918        }) {
1919            return Ok(false);
1920        }
1921    }
1922
1923    let _ = table_info
1924        .meta
1925        .builder_with_alter_kind(table_name, &request.alter_kind)
1926        .context(error::TableSnafu)?
1927        .build()
1928        .context(error::BuildTableMetaSnafu { table_name })?;
1929
1930    Ok(true)
1931}
1932
1933pub fn create_table_info(
1934    create_table: &CreateTableExpr,
1935    partition_columns: Vec<String>,
1936) -> Result<RawTableInfo> {
1937    let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
1938    let mut column_name_to_index_map = HashMap::new();
1939
1940    for (idx, column) in create_table.column_defs.iter().enumerate() {
1941        let schema =
1942            column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
1943                column: &column.name,
1944            })?;
1945        let schema = schema.with_time_index(column.name == create_table.time_index);
1946
1947        column_schemas.push(schema);
1948        let _ = column_name_to_index_map.insert(column.name.clone(), idx);
1949    }
1950
1951    let timestamp_index = column_name_to_index_map
1952        .get(&create_table.time_index)
1953        .cloned();
1954
1955    let raw_schema = RawSchema {
1956        column_schemas: column_schemas.clone(),
1957        timestamp_index,
1958        version: 0,
1959    };
1960
1961    let primary_key_indices = create_table
1962        .primary_keys
1963        .iter()
1964        .map(|name| {
1965            column_name_to_index_map
1966                .get(name)
1967                .cloned()
1968                .context(ColumnNotFoundSnafu { msg: name })
1969        })
1970        .collect::<Result<Vec<_>>>()?;
1971
1972    let partition_key_indices = partition_columns
1973        .into_iter()
1974        .map(|col_name| {
1975            column_name_to_index_map
1976                .get(&col_name)
1977                .cloned()
1978                .context(ColumnNotFoundSnafu { msg: col_name })
1979        })
1980        .collect::<Result<Vec<_>>>()?;
1981
1982    let table_options = TableOptions::try_from_iter(&create_table.table_options)
1983        .context(UnrecognizedTableOptionSnafu)?;
1984
1985    let meta = RawTableMeta {
1986        schema: raw_schema,
1987        primary_key_indices,
1988        value_indices: vec![],
1989        engine: create_table.engine.clone(),
1990        next_column_id: column_schemas.len() as u32,
1991        options: table_options,
1992        created_on: Utc::now(),
1993        updated_on: Utc::now(),
1994        partition_key_indices,
1995        column_ids: vec![],
1996    };
1997
1998    let desc = if create_table.desc.is_empty() {
1999        create_table.table_options.get(COMMENT_KEY).cloned()
2000    } else {
2001        Some(create_table.desc.clone())
2002    };
2003
2004    let table_info = RawTableInfo {
2005        ident: metadata::TableIdent {
2006            // The table id of distributed table is assigned by Meta, set "0" here as a placeholder.
2007            table_id: 0,
2008            version: 0,
2009        },
2010        name: create_table.table_name.clone(),
2011        desc,
2012        catalog_name: create_table.catalog_name.clone(),
2013        schema_name: create_table.schema_name.clone(),
2014        meta,
2015        table_type: TableType::Base,
2016    };
2017    Ok(table_info)
2018}
2019
2020fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
2021    let columns = if let Some(partitions) = partitions {
2022        partitions
2023            .column_list
2024            .iter()
2025            .map(|x| x.value.clone())
2026            .collect::<Vec<_>>()
2027    } else {
2028        vec![]
2029    };
2030    Ok(columns)
2031}
2032
2033/// Parse [Partitions] into a group of partition entries.
2034///
2035/// Returns a list of [PartitionExpr], each of which defines a partition.
2036fn find_partition_entries(
2037    create_table: &CreateTableExpr,
2038    partitions: &Option<Partitions>,
2039    partition_columns: &[String],
2040    query_ctx: &QueryContextRef,
2041) -> Result<Vec<PartitionExpr>> {
2042    let Some(partitions) = partitions else {
2043        return Ok(vec![]);
2044    };
2045
2046    // extract concrete data type of partition columns
2047    let column_name_and_type = partition_columns
2048        .iter()
2049        .map(|pc| {
2050            let column = create_table
2051                .column_defs
2052                .iter()
2053                .find(|c| &c.name == pc)
2054                // unwrap is safe here because we have checked that partition columns are defined
2055                .unwrap();
2056            let column_name = &column.name;
2057            let data_type = ConcreteDataType::from(
2058                ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
2059                    .context(ColumnDataTypeSnafu)?,
2060            );
2061            Ok((column_name, data_type))
2062        })
2063        .collect::<Result<HashMap<_, _>>>()?;
2064
2065    // Transform parser expr to partition expr
2066    let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
2067    for partition in &partitions.exprs {
2068        let partition_expr =
2069            convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
2070        partition_exprs.push(partition_expr);
2071    }
2072
2073    Ok(partition_exprs)
2074}
2075
2076fn convert_one_expr(
2077    expr: &Expr,
2078    column_name_and_type: &HashMap<&String, ConcreteDataType>,
2079    timezone: &Timezone,
2080) -> Result<PartitionExpr> {
2081    let Expr::BinaryOp { left, op, right } = expr else {
2082        return InvalidPartitionRuleSnafu {
2083            reason: "partition rule must be a binary expression",
2084        }
2085        .fail();
2086    };
2087
2088    let op =
2089        RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
2090            reason: format!("unsupported operator in partition expr {op}"),
2091        })?;
2092
2093    // convert leaf node.
2094    let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
2095        // col, val
2096        (Expr::Identifier(ident), Expr::Value(value)) => {
2097            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2098            let value = convert_value(&value.value, data_type, timezone, None)?;
2099            (Operand::Column(column_name), op, Operand::Value(value))
2100        }
2101        (Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
2102            if let Expr::Value(v) = &**expr =>
2103        {
2104            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2105            let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
2106            (Operand::Column(column_name), op, Operand::Value(value))
2107        }
2108        // val, col
2109        (Expr::Value(value), Expr::Identifier(ident)) => {
2110            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2111            let value = convert_value(&value.value, data_type, timezone, None)?;
2112            (Operand::Value(value), op, Operand::Column(column_name))
2113        }
2114        (Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
2115            if let Expr::Value(v) = &**expr =>
2116        {
2117            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2118            let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
2119            (Operand::Value(value), op, Operand::Column(column_name))
2120        }
2121        (Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
2122            // sub-expr must against another sub-expr
2123            let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
2124            let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
2125            (Operand::Expr(lhs), op, Operand::Expr(rhs))
2126        }
2127        _ => {
2128            return InvalidPartitionRuleSnafu {
2129                reason: format!("invalid partition expr {expr}"),
2130            }
2131            .fail();
2132        }
2133    };
2134
2135    Ok(PartitionExpr::new(lhs, op, rhs))
2136}
2137
2138fn convert_identifier(
2139    ident: &Ident,
2140    column_name_and_type: &HashMap<&String, ConcreteDataType>,
2141) -> Result<(String, ConcreteDataType)> {
2142    let column_name = ident.value.clone();
2143    let data_type = column_name_and_type
2144        .get(&column_name)
2145        .cloned()
2146        .with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
2147    Ok((column_name, data_type))
2148}
2149
2150fn convert_value(
2151    value: &ParserValue,
2152    data_type: ConcreteDataType,
2153    timezone: &Timezone,
2154    unary_op: Option<UnaryOperator>,
2155) -> Result<Value> {
2156    sql_value_to_value(
2157        &ColumnSchema::new("<NONAME>", data_type, true),
2158        value,
2159        Some(timezone),
2160        unary_op,
2161        false,
2162    )
2163    .context(error::SqlCommonSnafu)
2164}
2165
2166#[cfg(test)]
2167mod test {
2168    use session::context::{QueryContext, QueryContextBuilder};
2169    use sql::dialect::GreptimeDbDialect;
2170    use sql::parser::{ParseOptions, ParserContext};
2171    use sql::statements::statement::Statement;
2172
2173    use super::*;
2174    use crate::expr_helper;
2175
2176    #[test]
2177    fn test_name_is_match() {
2178        assert!(!NAME_PATTERN_REG.is_match("/adaf"));
2179        assert!(!NAME_PATTERN_REG.is_match("🈲"));
2180        assert!(NAME_PATTERN_REG.is_match("hello"));
2181        assert!(NAME_PATTERN_REG.is_match("test@"));
2182        assert!(!NAME_PATTERN_REG.is_match("@test"));
2183        assert!(NAME_PATTERN_REG.is_match("test#"));
2184        assert!(!NAME_PATTERN_REG.is_match("#test"));
2185        assert!(!NAME_PATTERN_REG.is_match("@"));
2186        assert!(!NAME_PATTERN_REG.is_match("#"));
2187    }
2188
2189    #[tokio::test]
2190    #[ignore = "TODO(ruihang): WIP new partition rule"]
2191    async fn test_parse_partitions() {
2192        common_telemetry::init_default_ut_logging();
2193        let cases = [
2194            (
2195                r"
2196CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
2197PARTITION ON COLUMNS (b) (
2198  b < 'hz',
2199  b >= 'hz' AND b < 'sh',
2200  b >= 'sh'
2201)
2202ENGINE=mito",
2203                r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
2204            ),
2205            (
2206                r"
2207CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
2208PARTITION BY RANGE COLUMNS (b, a) (
2209  PARTITION r0 VALUES LESS THAN ('hz', 10),
2210  b < 'hz' AND a < 10,
2211  b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20,
2212  b >= 'sh' AND a >= 20
2213)
2214ENGINE=mito",
2215                r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
2216            ),
2217        ];
2218        let ctx = QueryContextBuilder::default().build().into();
2219        for (sql, expected) in cases {
2220            let result = ParserContext::create_with_dialect(
2221                sql,
2222                &GreptimeDbDialect {},
2223                ParseOptions::default(),
2224            )
2225            .unwrap();
2226            match &result[0] {
2227                Statement::CreateTable(c) => {
2228                    let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
2229                    let (partitions, _) =
2230                        parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
2231                    let json = serde_json::to_string(&partitions).unwrap();
2232                    assert_eq!(json, expected);
2233                }
2234                _ => unreachable!(),
2235            }
2236        }
2237    }
2238}