operator/statement/
ddl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use api::helper::ColumnDataTypeWrapper;
19use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
20use api::v1::{
21    column_def, AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
22};
23use catalog::CatalogManagerRef;
24use chrono::Utc;
25use common_catalog::consts::{is_readonly_schema, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
26use common_catalog::{format_full_flow_name, format_full_table_name};
27use common_error::ext::BoxedError;
28use common_meta::cache_invalidator::Context;
29use common_meta::ddl::create_flow::FlowType;
30use common_meta::ddl::ExecutorContext;
31use common_meta::instruction::CacheIdent;
32use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
33use common_meta::key::NAME_PATTERN;
34use common_meta::rpc::ddl::{
35    CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
36    SubmitDdlTaskResponse,
37};
38use common_meta::rpc::router::{Partition, Partition as MetaPartition};
39use common_query::Output;
40use common_telemetry::{debug, info, tracing, warn};
41use common_time::Timezone;
42use datafusion_common::tree_node::TreeNodeVisitor;
43use datafusion_expr::LogicalPlan;
44use datatypes::prelude::ConcreteDataType;
45use datatypes::schema::{RawSchema, Schema};
46use datatypes::value::Value;
47use lazy_static::lazy_static;
48use partition::expr::{Operand, PartitionExpr, RestrictedOp};
49use partition::multi_dim::MultiDimPartitionRule;
50use partition::partition::{PartitionBound, PartitionDef};
51use query::parser::{QueryLanguageParser, QueryStatement};
52use query::plan::extract_and_rewrite_full_table_names;
53use query::query_engine::DefaultSerializer;
54use query::sql::create_table_stmt;
55use regex::Regex;
56use session::context::QueryContextRef;
57use session::table_name::table_idents_to_full_name;
58use snafu::{ensure, OptionExt, ResultExt};
59use sql::statements::alter::{AlterDatabase, AlterTable};
60use sql::statements::create::{
61    CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
62};
63use sql::statements::sql_value_to_value;
64use sql::statements::statement::Statement;
65use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
66use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
67use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
68use table::dist_table::DistTable;
69use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
70use table::requests::{AlterKind, AlterTableRequest, TableOptions, COMMENT_KEY};
71use table::table_name::TableName;
72use table::TableRef;
73
74use crate::error::{
75    self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
76    ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
77    DeserializePartitionSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
78    FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
79    InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, ParseSqlValueSnafu, Result,
80    SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu,
81    TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
82    UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
83};
84use crate::expr_helper;
85use crate::statement::show::create_partitions_stmt;
86use crate::statement::StatementExecutor;
87
88lazy_static! {
89    static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap();
90}
91
92impl StatementExecutor {
93    pub fn catalog_manager(&self) -> CatalogManagerRef {
94        self.catalog_manager.clone()
95    }
96
97    #[tracing::instrument(skip_all)]
98    pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
99        let create_expr = &mut expr_helper::create_to_expr(&stmt, &ctx)?;
100        self.create_table_inner(create_expr, stmt.partitions, ctx)
101            .await
102    }
103
104    #[tracing::instrument(skip_all)]
105    pub async fn create_table_like(
106        &self,
107        stmt: CreateTableLike,
108        ctx: QueryContextRef,
109    ) -> Result<TableRef> {
110        let (catalog, schema, table) = table_idents_to_full_name(&stmt.source_name, &ctx)
111            .map_err(BoxedError::new)
112            .context(error::ExternalSnafu)?;
113        let table_ref = self
114            .catalog_manager
115            .table(&catalog, &schema, &table, Some(&ctx))
116            .await
117            .context(CatalogSnafu)?
118            .context(TableNotFoundSnafu { table_name: &table })?;
119        let partitions = self
120            .partition_manager
121            .find_table_partitions(table_ref.table_info().table_id())
122            .await
123            .context(error::FindTablePartitionRuleSnafu { table_name: table })?;
124
125        // CREATE TABLE LIKE also inherits database level options.
126        let schema_options = self
127            .table_metadata_manager
128            .schema_manager()
129            .get(SchemaNameKey {
130                catalog: &catalog,
131                schema: &schema,
132            })
133            .await
134            .context(TableMetadataManagerSnafu)?
135            .map(|v| v.into_inner());
136
137        let quote_style = ctx.quote_style();
138        let mut create_stmt =
139            create_table_stmt(&table_ref.table_info(), schema_options, quote_style)
140                .context(error::ParseQuerySnafu)?;
141        create_stmt.name = stmt.table_name;
142        create_stmt.if_not_exists = false;
143
144        let partitions = create_partitions_stmt(partitions)?.and_then(|mut partitions| {
145            if !partitions.column_list.is_empty() {
146                partitions.set_quote(quote_style);
147                Some(partitions)
148            } else {
149                None
150            }
151        });
152
153        let create_expr = &mut expr_helper::create_to_expr(&create_stmt, &ctx)?;
154        self.create_table_inner(create_expr, partitions, ctx).await
155    }
156
157    #[tracing::instrument(skip_all)]
158    pub async fn create_external_table(
159        &self,
160        create_expr: CreateExternalTable,
161        ctx: QueryContextRef,
162    ) -> Result<TableRef> {
163        let create_expr = &mut expr_helper::create_external_expr(create_expr, &ctx).await?;
164        self.create_table_inner(create_expr, None, ctx).await
165    }
166
167    #[tracing::instrument(skip_all)]
168    pub async fn create_table_inner(
169        &self,
170        create_table: &mut CreateTableExpr,
171        partitions: Option<Partitions>,
172        query_ctx: QueryContextRef,
173    ) -> Result<TableRef> {
174        ensure!(
175            !is_readonly_schema(&create_table.schema_name),
176            SchemaReadOnlySnafu {
177                name: create_table.schema_name.clone()
178            }
179        );
180
181        // Check if is creating logical table
182        if create_table.engine == METRIC_ENGINE_NAME
183            && create_table
184                .table_options
185                .contains_key(LOGICAL_TABLE_METADATA_KEY)
186        {
187            return self
188                .create_logical_tables(&[create_table.clone()], query_ctx)
189                .await?
190                .into_iter()
191                .next()
192                .context(error::UnexpectedSnafu {
193                    violated: "expected to create a logical table",
194                });
195        }
196
197        let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
198
199        let schema = self
200            .table_metadata_manager
201            .schema_manager()
202            .get(SchemaNameKey::new(
203                &create_table.catalog_name,
204                &create_table.schema_name,
205            ))
206            .await
207            .context(TableMetadataManagerSnafu)?;
208
209        ensure!(
210            schema.is_some(),
211            SchemaNotFoundSnafu {
212                schema_info: &create_table.schema_name,
213            }
214        );
215
216        // if table exists.
217        if let Some(table) = self
218            .catalog_manager
219            .table(
220                &create_table.catalog_name,
221                &create_table.schema_name,
222                &create_table.table_name,
223                Some(&query_ctx),
224            )
225            .await
226            .context(CatalogSnafu)?
227        {
228            return if create_table.create_if_not_exists {
229                Ok(table)
230            } else {
231                TableAlreadyExistsSnafu {
232                    table: format_full_table_name(
233                        &create_table.catalog_name,
234                        &create_table.schema_name,
235                        &create_table.table_name,
236                    ),
237                }
238                .fail()
239            };
240        }
241
242        ensure!(
243            NAME_PATTERN_REG.is_match(&create_table.table_name),
244            InvalidTableNameSnafu {
245                table_name: &create_table.table_name,
246            }
247        );
248
249        let table_name = TableName::new(
250            &create_table.catalog_name,
251            &create_table.schema_name,
252            &create_table.table_name,
253        );
254
255        let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
256        let mut table_info = create_table_info(create_table, partition_cols)?;
257
258        let resp = self
259            .create_table_procedure(
260                create_table.clone(),
261                partitions,
262                table_info.clone(),
263                query_ctx,
264            )
265            .await?;
266
267        let table_id = resp
268            .table_ids
269            .into_iter()
270            .next()
271            .context(error::UnexpectedSnafu {
272                violated: "expected table_id",
273            })?;
274        info!("Successfully created table '{table_name}' with table id {table_id}");
275
276        table_info.ident.table_id = table_id;
277
278        let table_info: Arc<TableInfo> =
279            Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?);
280        create_table.table_id = Some(api::v1::TableId { id: table_id });
281
282        let table = DistTable::table(table_info);
283
284        Ok(table)
285    }
286
287    #[tracing::instrument(skip_all)]
288    pub async fn create_logical_tables(
289        &self,
290        create_table_exprs: &[CreateTableExpr],
291        query_context: QueryContextRef,
292    ) -> Result<Vec<TableRef>> {
293        let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
294        ensure!(
295            !create_table_exprs.is_empty(),
296            EmptyDdlExprSnafu {
297                name: "create logic tables"
298            }
299        );
300
301        // Check table names
302        for create_table in create_table_exprs {
303            ensure!(
304                NAME_PATTERN_REG.is_match(&create_table.table_name),
305                InvalidTableNameSnafu {
306                    table_name: &create_table.table_name,
307                }
308            );
309        }
310
311        let mut raw_tables_info = create_table_exprs
312            .iter()
313            .map(|create| create_table_info(create, vec![]))
314            .collect::<Result<Vec<_>>>()?;
315        let tables_data = create_table_exprs
316            .iter()
317            .cloned()
318            .zip(raw_tables_info.iter().cloned())
319            .collect::<Vec<_>>();
320
321        let resp = self
322            .create_logical_tables_procedure(tables_data, query_context)
323            .await?;
324
325        let table_ids = resp.table_ids;
326        ensure!(table_ids.len() == raw_tables_info.len(), CreateLogicalTablesSnafu {
327            reason: format!("The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}", raw_tables_info.len(), table_ids.len())
328        });
329        info!("Successfully created logical tables: {:?}", table_ids);
330
331        for (i, table_info) in raw_tables_info.iter_mut().enumerate() {
332            table_info.ident.table_id = table_ids[i];
333        }
334        let tables_info = raw_tables_info
335            .into_iter()
336            .map(|x| x.try_into().context(CreateTableInfoSnafu))
337            .collect::<Result<Vec<_>>>()?;
338
339        Ok(tables_info
340            .into_iter()
341            .map(|x| DistTable::table(Arc::new(x)))
342            .collect())
343    }
344
345    #[tracing::instrument(skip_all)]
346    pub async fn create_flow(
347        &self,
348        stmt: CreateFlow,
349        query_context: QueryContextRef,
350    ) -> Result<Output> {
351        // TODO(ruihang): do some verification
352        let expr = expr_helper::to_create_flow_task_expr(stmt, &query_context)?;
353
354        self.create_flow_inner(expr, query_context).await
355    }
356
357    pub async fn create_flow_inner(
358        &self,
359        expr: CreateFlowExpr,
360        query_context: QueryContextRef,
361    ) -> Result<Output> {
362        self.create_flow_procedure(expr, query_context).await?;
363        Ok(Output::new_with_affected_rows(0))
364    }
365
366    async fn create_flow_procedure(
367        &self,
368        expr: CreateFlowExpr,
369        query_context: QueryContextRef,
370    ) -> Result<SubmitDdlTaskResponse> {
371        let flow_type = self
372            .determine_flow_type(&expr, query_context.clone())
373            .await?;
374        info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
375
376        let expr = {
377            let mut expr = expr;
378            expr.flow_options
379                .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
380            expr
381        };
382
383        let task = CreateFlowTask::try_from(PbCreateFlowTask {
384            create_flow: Some(expr),
385        })
386        .context(error::InvalidExprSnafu)?;
387        let request = SubmitDdlTaskRequest {
388            query_context,
389            task: DdlTask::new_create_flow(task),
390        };
391
392        self.procedure_executor
393            .submit_ddl_task(&ExecutorContext::default(), request)
394            .await
395            .context(error::ExecuteDdlSnafu)
396    }
397
398    /// Determine the flow type based on the SQL query
399    ///
400    /// If it contains aggregation or distinct, then it is a batch flow, otherwise it is a streaming flow
401    async fn determine_flow_type(
402        &self,
403        expr: &CreateFlowExpr,
404        query_ctx: QueryContextRef,
405    ) -> Result<FlowType> {
406        // first check if source table's ttl is instant, if it is, force streaming mode
407        for src_table_name in &expr.source_table_names {
408            let table = self
409                .catalog_manager()
410                .table(
411                    &src_table_name.catalog_name,
412                    &src_table_name.schema_name,
413                    &src_table_name.table_name,
414                    Some(&query_ctx),
415                )
416                .await
417                .map_err(BoxedError::new)
418                .context(ExternalSnafu)?
419                .with_context(|| TableNotFoundSnafu {
420                    table_name: format_full_table_name(
421                        &src_table_name.catalog_name,
422                        &src_table_name.schema_name,
423                        &src_table_name.table_name,
424                    ),
425                })?;
426
427            // instant source table can only be handled by streaming mode
428            if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
429                warn!(
430                    "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
431                    format_full_table_name(
432                        &src_table_name.catalog_name,
433                        &src_table_name.schema_name,
434                        &src_table_name.table_name
435                    ),
436                    expr.flow_name
437                );
438                return Ok(FlowType::Streaming);
439            }
440        }
441
442        let engine = &self.query_engine;
443        let stmt = QueryLanguageParser::parse_sql(&expr.sql, &query_ctx)
444            .map_err(BoxedError::new)
445            .context(ExternalSnafu)?;
446        let plan = engine
447            .planner()
448            .plan(&stmt, query_ctx)
449            .await
450            .map_err(BoxedError::new)
451            .context(ExternalSnafu)?;
452
453        /// Visitor to find aggregation or distinct
454        struct FindAggr {
455            is_aggr: bool,
456        }
457
458        impl TreeNodeVisitor<'_> for FindAggr {
459            type Node = LogicalPlan;
460            fn f_down(
461                &mut self,
462                node: &Self::Node,
463            ) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
464            {
465                match node {
466                    LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
467                        self.is_aggr = true;
468                        return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
469                    }
470                    _ => (),
471                }
472                Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
473            }
474        }
475
476        let mut find_aggr = FindAggr { is_aggr: false };
477
478        plan.visit_with_subqueries(&mut find_aggr)
479            .context(BuildDfLogicalPlanSnafu)?;
480        if find_aggr.is_aggr {
481            Ok(FlowType::Batching)
482        } else {
483            Ok(FlowType::Streaming)
484        }
485    }
486
487    #[tracing::instrument(skip_all)]
488    pub async fn create_view(
489        &self,
490        create_view: CreateView,
491        ctx: QueryContextRef,
492    ) -> Result<TableRef> {
493        // convert input into logical plan
494        let logical_plan = match &*create_view.query {
495            Statement::Query(query) => {
496                self.plan(
497                    &QueryStatement::Sql(Statement::Query(query.clone())),
498                    ctx.clone(),
499                )
500                .await?
501            }
502            Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?,
503            _ => {
504                return InvalidViewStmtSnafu {}.fail();
505            }
506        };
507        // Save the definition for `show create view`.
508        let definition = create_view.to_string();
509
510        // Save the columns in plan, it may changed when the schemas of tables in plan
511        // are altered.
512        let schema: Schema = logical_plan
513            .schema()
514            .clone()
515            .try_into()
516            .context(ConvertSchemaSnafu)?;
517        let plan_columns: Vec<_> = schema
518            .column_schemas()
519            .iter()
520            .map(|c| c.name.clone())
521            .collect();
522
523        let columns: Vec<_> = create_view
524            .columns
525            .iter()
526            .map(|ident| ident.to_string())
527            .collect();
528
529        // Validate columns
530        if !columns.is_empty() {
531            ensure!(
532                columns.len() == plan_columns.len(),
533                error::ViewColumnsMismatchSnafu {
534                    view_name: create_view.name.to_string(),
535                    expected: plan_columns.len(),
536                    actual: columns.len(),
537                }
538            );
539        }
540
541        // Extract the table names from the original plan
542        // and rewrite them as fully qualified names.
543        let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
544            .context(ExtractTableNamesSnafu)?;
545
546        let table_names = table_names.into_iter().map(|t| t.into()).collect();
547
548        // TODO(dennis): we don't save the optimized plan yet,
549        // because there are some serialization issue with our own defined plan node (such as `MergeScanLogicalPlan`).
550        // When the issues are fixed, we can use the `optimized_plan` instead.
551        // let optimized_plan = self.optimize_logical_plan(logical_plan)?.unwrap_df_plan();
552
553        // encode logical plan
554        let encoded_plan = DFLogicalSubstraitConvertor
555            .encode(&plan, DefaultSerializer)
556            .context(SubstraitCodecSnafu)?;
557
558        let expr = expr_helper::to_create_view_expr(
559            create_view,
560            encoded_plan.to_vec(),
561            table_names,
562            columns,
563            plan_columns,
564            definition,
565            ctx.clone(),
566        )?;
567
568        //TODO(dennis): validate the logical plan
569        self.create_view_by_expr(expr, ctx).await
570    }
571
572    pub async fn create_view_by_expr(
573        &self,
574        expr: CreateViewExpr,
575        ctx: QueryContextRef,
576    ) -> Result<TableRef> {
577        ensure! {
578            !(expr.create_if_not_exists & expr.or_replace),
579            InvalidSqlSnafu {
580                err_msg: "syntax error Create Or Replace and If Not Exist cannot be used together",
581            }
582        };
583        let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
584
585        let schema_exists = self
586            .table_metadata_manager
587            .schema_manager()
588            .exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name))
589            .await
590            .context(TableMetadataManagerSnafu)?;
591
592        ensure!(
593            schema_exists,
594            SchemaNotFoundSnafu {
595                schema_info: &expr.schema_name,
596            }
597        );
598
599        // if view or table exists.
600        if let Some(table) = self
601            .catalog_manager
602            .table(
603                &expr.catalog_name,
604                &expr.schema_name,
605                &expr.view_name,
606                Some(&ctx),
607            )
608            .await
609            .context(CatalogSnafu)?
610        {
611            let table_type = table.table_info().table_type;
612
613            match (table_type, expr.create_if_not_exists, expr.or_replace) {
614                (TableType::View, true, false) => {
615                    return Ok(table);
616                }
617                (TableType::View, false, false) => {
618                    return ViewAlreadyExistsSnafu {
619                        name: format_full_table_name(
620                            &expr.catalog_name,
621                            &expr.schema_name,
622                            &expr.view_name,
623                        ),
624                    }
625                    .fail();
626                }
627                (TableType::View, _, true) => {
628                    // Try to replace an exists view
629                }
630                _ => {
631                    return TableAlreadyExistsSnafu {
632                        table: format_full_table_name(
633                            &expr.catalog_name,
634                            &expr.schema_name,
635                            &expr.view_name,
636                        ),
637                    }
638                    .fail();
639                }
640            }
641        }
642
643        ensure!(
644            NAME_PATTERN_REG.is_match(&expr.view_name),
645            InvalidViewNameSnafu {
646                name: expr.view_name.clone(),
647            }
648        );
649
650        let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
651
652        let mut view_info = RawTableInfo {
653            ident: metadata::TableIdent {
654                // The view id of distributed table is assigned by Meta, set "0" here as a placeholder.
655                table_id: 0,
656                version: 0,
657            },
658            name: expr.view_name.clone(),
659            desc: None,
660            catalog_name: expr.catalog_name.clone(),
661            schema_name: expr.schema_name.clone(),
662            // The meta doesn't make sense for views, so using a default one.
663            meta: RawTableMeta::default(),
664            table_type: TableType::View,
665        };
666
667        let request = SubmitDdlTaskRequest {
668            query_context: ctx,
669            task: DdlTask::new_create_view(expr, view_info.clone()),
670        };
671
672        let resp = self
673            .procedure_executor
674            .submit_ddl_task(&ExecutorContext::default(), request)
675            .await
676            .context(error::ExecuteDdlSnafu)?;
677
678        debug!(
679            "Submit creating view '{view_name}' task response: {:?}",
680            resp
681        );
682
683        let view_id = resp
684            .table_ids
685            .into_iter()
686            .next()
687            .context(error::UnexpectedSnafu {
688                violated: "expected table_id",
689            })?;
690        info!("Successfully created view '{view_name}' with view id {view_id}");
691
692        view_info.ident.table_id = view_id;
693
694        let view_info = Arc::new(view_info.try_into().context(CreateTableInfoSnafu)?);
695
696        let table = DistTable::table(view_info);
697
698        // Invalidates local cache ASAP.
699        self.cache_invalidator
700            .invalidate(
701                &Context::default(),
702                &[
703                    CacheIdent::TableId(view_id),
704                    CacheIdent::TableName(view_name.clone()),
705                ],
706            )
707            .await
708            .context(error::InvalidateTableCacheSnafu)?;
709
710        Ok(table)
711    }
712
713    #[tracing::instrument(skip_all)]
714    pub async fn drop_flow(
715        &self,
716        catalog_name: String,
717        flow_name: String,
718        drop_if_exists: bool,
719        query_context: QueryContextRef,
720    ) -> Result<Output> {
721        if let Some(flow) = self
722            .flow_metadata_manager
723            .flow_name_manager()
724            .get(&catalog_name, &flow_name)
725            .await
726            .context(error::TableMetadataManagerSnafu)?
727        {
728            let flow_id = flow.flow_id();
729            let task = DropFlowTask {
730                catalog_name,
731                flow_name,
732                flow_id,
733                drop_if_exists,
734            };
735            self.drop_flow_procedure(task, query_context).await?;
736
737            Ok(Output::new_with_affected_rows(0))
738        } else if drop_if_exists {
739            Ok(Output::new_with_affected_rows(0))
740        } else {
741            FlowNotFoundSnafu {
742                flow_name: format_full_flow_name(&catalog_name, &flow_name),
743            }
744            .fail()
745        }
746    }
747
748    async fn drop_flow_procedure(
749        &self,
750        expr: DropFlowTask,
751        query_context: QueryContextRef,
752    ) -> Result<SubmitDdlTaskResponse> {
753        let request = SubmitDdlTaskRequest {
754            query_context,
755            task: DdlTask::new_drop_flow(expr),
756        };
757
758        self.procedure_executor
759            .submit_ddl_task(&ExecutorContext::default(), request)
760            .await
761            .context(error::ExecuteDdlSnafu)
762    }
763
764    /// Drop a view
765    #[tracing::instrument(skip_all)]
766    pub(crate) async fn drop_view(
767        &self,
768        catalog: String,
769        schema: String,
770        view: String,
771        drop_if_exists: bool,
772        query_context: QueryContextRef,
773    ) -> Result<Output> {
774        let view_info = if let Some(view) = self
775            .catalog_manager
776            .table(&catalog, &schema, &view, None)
777            .await
778            .context(CatalogSnafu)?
779        {
780            view.table_info()
781        } else if drop_if_exists {
782            // DROP VIEW IF EXISTS meets view not found - ignored
783            return Ok(Output::new_with_affected_rows(0));
784        } else {
785            return TableNotFoundSnafu {
786                table_name: format_full_table_name(&catalog, &schema, &view),
787            }
788            .fail();
789        };
790
791        // Ensure the exists one is view, we can't drop other table types
792        ensure!(
793            view_info.table_type == TableType::View,
794            error::InvalidViewSnafu {
795                msg: "not a view",
796                view_name: format_full_table_name(&catalog, &schema, &view),
797            }
798        );
799
800        let view_id = view_info.table_id();
801
802        let task = DropViewTask {
803            catalog,
804            schema,
805            view,
806            view_id,
807            drop_if_exists,
808        };
809
810        self.drop_view_procedure(task, query_context).await?;
811
812        Ok(Output::new_with_affected_rows(0))
813    }
814
815    /// Submit [DropViewTask] to procedure executor.
816    async fn drop_view_procedure(
817        &self,
818        expr: DropViewTask,
819        query_context: QueryContextRef,
820    ) -> Result<SubmitDdlTaskResponse> {
821        let request = SubmitDdlTaskRequest {
822            query_context,
823            task: DdlTask::new_drop_view(expr),
824        };
825
826        self.procedure_executor
827            .submit_ddl_task(&ExecutorContext::default(), request)
828            .await
829            .context(error::ExecuteDdlSnafu)
830    }
831
832    #[tracing::instrument(skip_all)]
833    pub async fn alter_logical_tables(
834        &self,
835        alter_table_exprs: Vec<AlterTableExpr>,
836        query_context: QueryContextRef,
837    ) -> Result<Output> {
838        let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
839        ensure!(
840            !alter_table_exprs.is_empty(),
841            EmptyDdlExprSnafu {
842                name: "alter logical tables"
843            }
844        );
845
846        self.alter_logical_tables_procedure(alter_table_exprs, query_context)
847            .await?;
848
849        Ok(Output::new_with_affected_rows(0))
850    }
851
852    #[tracing::instrument(skip_all)]
853    pub async fn drop_table(
854        &self,
855        table_name: TableName,
856        drop_if_exists: bool,
857        query_context: QueryContextRef,
858    ) -> Result<Output> {
859        // Reserved for grpc call
860        self.drop_tables(&[table_name], drop_if_exists, query_context)
861            .await
862    }
863
864    #[tracing::instrument(skip_all)]
865    pub async fn drop_tables(
866        &self,
867        table_names: &[TableName],
868        drop_if_exists: bool,
869        query_context: QueryContextRef,
870    ) -> Result<Output> {
871        let mut tables = Vec::with_capacity(table_names.len());
872        for table_name in table_names {
873            ensure!(
874                !is_readonly_schema(&table_name.schema_name),
875                SchemaReadOnlySnafu {
876                    name: table_name.schema_name.clone()
877                }
878            );
879
880            if let Some(table) = self
881                .catalog_manager
882                .table(
883                    &table_name.catalog_name,
884                    &table_name.schema_name,
885                    &table_name.table_name,
886                    Some(&query_context),
887                )
888                .await
889                .context(CatalogSnafu)?
890            {
891                tables.push(table.table_info().table_id());
892            } else if drop_if_exists {
893                // DROP TABLE IF EXISTS meets table not found - ignored
894                continue;
895            } else {
896                return TableNotFoundSnafu {
897                    table_name: table_name.to_string(),
898                }
899                .fail();
900            }
901        }
902
903        for (table_name, table_id) in table_names.iter().zip(tables.into_iter()) {
904            self.drop_table_procedure(table_name, table_id, drop_if_exists, query_context.clone())
905                .await?;
906
907            // Invalidates local cache ASAP.
908            self.cache_invalidator
909                .invalidate(
910                    &Context::default(),
911                    &[
912                        CacheIdent::TableId(table_id),
913                        CacheIdent::TableName(table_name.clone()),
914                    ],
915                )
916                .await
917                .context(error::InvalidateTableCacheSnafu)?;
918        }
919        Ok(Output::new_with_affected_rows(0))
920    }
921
922    #[tracing::instrument(skip_all)]
923    pub async fn drop_database(
924        &self,
925        catalog: String,
926        schema: String,
927        drop_if_exists: bool,
928        query_context: QueryContextRef,
929    ) -> Result<Output> {
930        ensure!(
931            !is_readonly_schema(&schema),
932            SchemaReadOnlySnafu { name: schema }
933        );
934
935        if self
936            .catalog_manager
937            .schema_exists(&catalog, &schema, None)
938            .await
939            .context(CatalogSnafu)?
940        {
941            if schema == query_context.current_schema() {
942                SchemaInUseSnafu { name: schema }.fail()
943            } else {
944                self.drop_database_procedure(catalog, schema, drop_if_exists, query_context)
945                    .await?;
946
947                Ok(Output::new_with_affected_rows(0))
948            }
949        } else if drop_if_exists {
950            // DROP TABLE IF EXISTS meets table not found - ignored
951            Ok(Output::new_with_affected_rows(0))
952        } else {
953            SchemaNotFoundSnafu {
954                schema_info: schema,
955            }
956            .fail()
957        }
958    }
959
960    #[tracing::instrument(skip_all)]
961    pub async fn truncate_table(
962        &self,
963        table_name: TableName,
964        query_context: QueryContextRef,
965    ) -> Result<Output> {
966        ensure!(
967            !is_readonly_schema(&table_name.schema_name),
968            SchemaReadOnlySnafu {
969                name: table_name.schema_name.clone()
970            }
971        );
972
973        let table = self
974            .catalog_manager
975            .table(
976                &table_name.catalog_name,
977                &table_name.schema_name,
978                &table_name.table_name,
979                Some(&query_context),
980            )
981            .await
982            .context(CatalogSnafu)?
983            .with_context(|| TableNotFoundSnafu {
984                table_name: table_name.to_string(),
985            })?;
986        let table_id = table.table_info().table_id();
987        self.truncate_table_procedure(&table_name, table_id, query_context)
988            .await?;
989
990        Ok(Output::new_with_affected_rows(0))
991    }
992
993    /// Verifies an alter and returns whether it is necessary to perform the alter.
994    ///
995    /// # Returns
996    ///
997    /// Returns true if the alter need to be porformed; otherwise, it returns false.
998    fn verify_alter(
999        &self,
1000        table_id: TableId,
1001        table_info: Arc<TableInfo>,
1002        expr: AlterTableExpr,
1003    ) -> Result<bool> {
1004        let request: AlterTableRequest = common_grpc_expr::alter_expr_to_request(table_id, expr)
1005            .context(AlterExprToRequestSnafu)?;
1006
1007        let AlterTableRequest {
1008            table_name,
1009            alter_kind,
1010            ..
1011        } = &request;
1012
1013        if let AlterKind::RenameTable { new_table_name } = alter_kind {
1014            ensure!(
1015                NAME_PATTERN_REG.is_match(new_table_name),
1016                error::UnexpectedSnafu {
1017                    violated: format!("Invalid table name: {}", new_table_name)
1018                }
1019            );
1020        } else if let AlterKind::AddColumns { columns } = alter_kind {
1021            // If all the columns are marked as add_if_not_exists and they already exist in the table,
1022            // there is no need to perform the alter.
1023            let column_names: HashSet<_> = table_info
1024                .meta
1025                .schema
1026                .column_schemas()
1027                .iter()
1028                .map(|schema| &schema.name)
1029                .collect();
1030            if columns.iter().all(|column| {
1031                column_names.contains(&column.column_schema.name) && column.add_if_not_exists
1032            }) {
1033                return Ok(false);
1034            }
1035        }
1036
1037        let _ = table_info
1038            .meta
1039            .builder_with_alter_kind(table_name, &request.alter_kind)
1040            .context(error::TableSnafu)?
1041            .build()
1042            .context(error::BuildTableMetaSnafu { table_name })?;
1043
1044        Ok(true)
1045    }
1046
1047    #[tracing::instrument(skip_all)]
1048    pub async fn alter_table(
1049        &self,
1050        alter_table: AlterTable,
1051        query_context: QueryContextRef,
1052    ) -> Result<Output> {
1053        let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
1054        self.alter_table_inner(expr, query_context).await
1055    }
1056
1057    #[tracing::instrument(skip_all)]
1058    pub async fn alter_table_inner(
1059        &self,
1060        expr: AlterTableExpr,
1061        query_context: QueryContextRef,
1062    ) -> Result<Output> {
1063        ensure!(
1064            !is_readonly_schema(&expr.schema_name),
1065            SchemaReadOnlySnafu {
1066                name: expr.schema_name.clone()
1067            }
1068        );
1069
1070        let catalog_name = if expr.catalog_name.is_empty() {
1071            DEFAULT_CATALOG_NAME.to_string()
1072        } else {
1073            expr.catalog_name.clone()
1074        };
1075
1076        let schema_name = if expr.schema_name.is_empty() {
1077            DEFAULT_SCHEMA_NAME.to_string()
1078        } else {
1079            expr.schema_name.clone()
1080        };
1081
1082        let table_name = expr.table_name.clone();
1083
1084        let table = self
1085            .catalog_manager
1086            .table(
1087                &catalog_name,
1088                &schema_name,
1089                &table_name,
1090                Some(&query_context),
1091            )
1092            .await
1093            .context(CatalogSnafu)?
1094            .with_context(|| TableNotFoundSnafu {
1095                table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
1096            })?;
1097
1098        let table_id = table.table_info().ident.table_id;
1099        let need_alter = self.verify_alter(table_id, table.table_info(), expr.clone())?;
1100        if !need_alter {
1101            return Ok(Output::new_with_affected_rows(0));
1102        }
1103        info!(
1104            "Table info before alter is {:?}, expr: {:?}",
1105            table.table_info(),
1106            expr
1107        );
1108
1109        let physical_table_id = self
1110            .table_metadata_manager
1111            .table_route_manager()
1112            .get_physical_table_id(table_id)
1113            .await
1114            .context(TableMetadataManagerSnafu)?;
1115
1116        let (req, invalidate_keys) = if physical_table_id == table_id {
1117            // This is physical table
1118            let req = SubmitDdlTaskRequest {
1119                query_context,
1120                task: DdlTask::new_alter_table(expr),
1121            };
1122
1123            let invalidate_keys = vec![
1124                CacheIdent::TableId(table_id),
1125                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1126            ];
1127
1128            (req, invalidate_keys)
1129        } else {
1130            // This is logical table
1131            let req = SubmitDdlTaskRequest {
1132                query_context,
1133                task: DdlTask::new_alter_logical_tables(vec![expr]),
1134            };
1135
1136            let mut invalidate_keys = vec![
1137                CacheIdent::TableId(physical_table_id),
1138                CacheIdent::TableId(table_id),
1139                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1140            ];
1141
1142            let physical_table = self
1143                .table_metadata_manager
1144                .table_info_manager()
1145                .get(physical_table_id)
1146                .await
1147                .context(TableMetadataManagerSnafu)?
1148                .map(|x| x.into_inner());
1149            if let Some(physical_table) = physical_table {
1150                let physical_table_name = TableName::new(
1151                    physical_table.table_info.catalog_name,
1152                    physical_table.table_info.schema_name,
1153                    physical_table.table_info.name,
1154                );
1155                invalidate_keys.push(CacheIdent::TableName(physical_table_name));
1156            }
1157
1158            (req, invalidate_keys)
1159        };
1160
1161        self.procedure_executor
1162            .submit_ddl_task(&ExecutorContext::default(), req)
1163            .await
1164            .context(error::ExecuteDdlSnafu)?;
1165
1166        // Invalidates local cache ASAP.
1167        self.cache_invalidator
1168            .invalidate(&Context::default(), &invalidate_keys)
1169            .await
1170            .context(error::InvalidateTableCacheSnafu)?;
1171
1172        Ok(Output::new_with_affected_rows(0))
1173    }
1174
1175    #[tracing::instrument(skip_all)]
1176    pub async fn alter_database(
1177        &self,
1178        alter_expr: AlterDatabase,
1179        query_context: QueryContextRef,
1180    ) -> Result<Output> {
1181        let alter_expr = expr_helper::to_alter_database_expr(alter_expr, &query_context)?;
1182        self.alter_database_inner(alter_expr, query_context).await
1183    }
1184
1185    #[tracing::instrument(skip_all)]
1186    pub async fn alter_database_inner(
1187        &self,
1188        alter_expr: AlterDatabaseExpr,
1189        query_context: QueryContextRef,
1190    ) -> Result<Output> {
1191        ensure!(
1192            !is_readonly_schema(&alter_expr.schema_name),
1193            SchemaReadOnlySnafu {
1194                name: query_context.current_schema().clone()
1195            }
1196        );
1197
1198        let exists = self
1199            .catalog_manager
1200            .schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
1201            .await
1202            .context(CatalogSnafu)?;
1203        ensure!(
1204            exists,
1205            SchemaNotFoundSnafu {
1206                schema_info: alter_expr.schema_name,
1207            }
1208        );
1209
1210        let cache_ident = [CacheIdent::SchemaName(SchemaName {
1211            catalog_name: alter_expr.catalog_name.clone(),
1212            schema_name: alter_expr.schema_name.clone(),
1213        })];
1214
1215        self.alter_database_procedure(alter_expr, query_context)
1216            .await?;
1217
1218        // Invalidates local cache ASAP.
1219        self.cache_invalidator
1220            .invalidate(&Context::default(), &cache_ident)
1221            .await
1222            .context(error::InvalidateTableCacheSnafu)?;
1223
1224        Ok(Output::new_with_affected_rows(0))
1225    }
1226
1227    async fn create_table_procedure(
1228        &self,
1229        create_table: CreateTableExpr,
1230        partitions: Vec<Partition>,
1231        table_info: RawTableInfo,
1232        query_context: QueryContextRef,
1233    ) -> Result<SubmitDdlTaskResponse> {
1234        let partitions = partitions.into_iter().map(Into::into).collect();
1235
1236        let request = SubmitDdlTaskRequest {
1237            query_context,
1238            task: DdlTask::new_create_table(create_table, partitions, table_info),
1239        };
1240
1241        self.procedure_executor
1242            .submit_ddl_task(&ExecutorContext::default(), request)
1243            .await
1244            .context(error::ExecuteDdlSnafu)
1245    }
1246
1247    async fn create_logical_tables_procedure(
1248        &self,
1249        tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
1250        query_context: QueryContextRef,
1251    ) -> Result<SubmitDdlTaskResponse> {
1252        let request = SubmitDdlTaskRequest {
1253            query_context,
1254            task: DdlTask::new_create_logical_tables(tables_data),
1255        };
1256
1257        self.procedure_executor
1258            .submit_ddl_task(&ExecutorContext::default(), request)
1259            .await
1260            .context(error::ExecuteDdlSnafu)
1261    }
1262
1263    async fn alter_logical_tables_procedure(
1264        &self,
1265        tables_data: Vec<AlterTableExpr>,
1266        query_context: QueryContextRef,
1267    ) -> Result<SubmitDdlTaskResponse> {
1268        let request = SubmitDdlTaskRequest {
1269            query_context,
1270            task: DdlTask::new_alter_logical_tables(tables_data),
1271        };
1272
1273        self.procedure_executor
1274            .submit_ddl_task(&ExecutorContext::default(), request)
1275            .await
1276            .context(error::ExecuteDdlSnafu)
1277    }
1278
1279    async fn drop_table_procedure(
1280        &self,
1281        table_name: &TableName,
1282        table_id: TableId,
1283        drop_if_exists: bool,
1284        query_context: QueryContextRef,
1285    ) -> Result<SubmitDdlTaskResponse> {
1286        let request = SubmitDdlTaskRequest {
1287            query_context,
1288            task: DdlTask::new_drop_table(
1289                table_name.catalog_name.to_string(),
1290                table_name.schema_name.to_string(),
1291                table_name.table_name.to_string(),
1292                table_id,
1293                drop_if_exists,
1294            ),
1295        };
1296
1297        self.procedure_executor
1298            .submit_ddl_task(&ExecutorContext::default(), request)
1299            .await
1300            .context(error::ExecuteDdlSnafu)
1301    }
1302
1303    async fn drop_database_procedure(
1304        &self,
1305        catalog: String,
1306        schema: String,
1307        drop_if_exists: bool,
1308        query_context: QueryContextRef,
1309    ) -> Result<SubmitDdlTaskResponse> {
1310        let request = SubmitDdlTaskRequest {
1311            query_context,
1312            task: DdlTask::new_drop_database(catalog, schema, drop_if_exists),
1313        };
1314
1315        self.procedure_executor
1316            .submit_ddl_task(&ExecutorContext::default(), request)
1317            .await
1318            .context(error::ExecuteDdlSnafu)
1319    }
1320
1321    async fn alter_database_procedure(
1322        &self,
1323        alter_expr: AlterDatabaseExpr,
1324        query_context: QueryContextRef,
1325    ) -> Result<SubmitDdlTaskResponse> {
1326        let request = SubmitDdlTaskRequest {
1327            query_context,
1328            task: DdlTask::new_alter_database(alter_expr),
1329        };
1330
1331        self.procedure_executor
1332            .submit_ddl_task(&ExecutorContext::default(), request)
1333            .await
1334            .context(error::ExecuteDdlSnafu)
1335    }
1336
1337    async fn truncate_table_procedure(
1338        &self,
1339        table_name: &TableName,
1340        table_id: TableId,
1341        query_context: QueryContextRef,
1342    ) -> Result<SubmitDdlTaskResponse> {
1343        let request = SubmitDdlTaskRequest {
1344            query_context,
1345            task: DdlTask::new_truncate_table(
1346                table_name.catalog_name.to_string(),
1347                table_name.schema_name.to_string(),
1348                table_name.table_name.to_string(),
1349                table_id,
1350            ),
1351        };
1352
1353        self.procedure_executor
1354            .submit_ddl_task(&ExecutorContext::default(), request)
1355            .await
1356            .context(error::ExecuteDdlSnafu)
1357    }
1358
1359    #[tracing::instrument(skip_all)]
1360    pub async fn create_database(
1361        &self,
1362        database: &str,
1363        create_if_not_exists: bool,
1364        options: HashMap<String, String>,
1365        query_context: QueryContextRef,
1366    ) -> Result<Output> {
1367        let catalog = query_context.current_catalog();
1368        ensure!(
1369            NAME_PATTERN_REG.is_match(catalog),
1370            error::UnexpectedSnafu {
1371                violated: format!("Invalid catalog name: {}", catalog)
1372            }
1373        );
1374
1375        ensure!(
1376            NAME_PATTERN_REG.is_match(database),
1377            error::UnexpectedSnafu {
1378                violated: format!("Invalid database name: {}", database)
1379            }
1380        );
1381
1382        if !self
1383            .catalog_manager
1384            .schema_exists(catalog, database, None)
1385            .await
1386            .context(CatalogSnafu)?
1387            && !self.catalog_manager.is_reserved_schema_name(database)
1388        {
1389            self.create_database_procedure(
1390                catalog.to_string(),
1391                database.to_string(),
1392                create_if_not_exists,
1393                options,
1394                query_context,
1395            )
1396            .await?;
1397
1398            Ok(Output::new_with_affected_rows(1))
1399        } else if create_if_not_exists {
1400            Ok(Output::new_with_affected_rows(1))
1401        } else {
1402            error::SchemaExistsSnafu { name: database }.fail()
1403        }
1404    }
1405
1406    async fn create_database_procedure(
1407        &self,
1408        catalog: String,
1409        database: String,
1410        create_if_not_exists: bool,
1411        options: HashMap<String, String>,
1412        query_context: QueryContextRef,
1413    ) -> Result<SubmitDdlTaskResponse> {
1414        let request = SubmitDdlTaskRequest {
1415            query_context,
1416            task: DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
1417        };
1418
1419        self.procedure_executor
1420            .submit_ddl_task(&ExecutorContext::default(), request)
1421            .await
1422            .context(error::ExecuteDdlSnafu)
1423    }
1424}
1425
1426/// Parse partition statement [Partitions] into [MetaPartition] and partition columns.
1427fn parse_partitions(
1428    create_table: &CreateTableExpr,
1429    partitions: Option<Partitions>,
1430    query_ctx: &QueryContextRef,
1431) -> Result<(Vec<MetaPartition>, Vec<String>)> {
1432    // If partitions are not defined by user, use the timestamp column (which has to be existed) as
1433    // the partition column, and create only one partition.
1434    let partition_columns = find_partition_columns(&partitions)?;
1435    let partition_entries =
1436        find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
1437
1438    // Validates partition
1439    let mut exprs = vec![];
1440    for partition in &partition_entries {
1441        for bound in partition {
1442            if let PartitionBound::Expr(expr) = bound {
1443                exprs.push(expr.clone());
1444            }
1445        }
1446    }
1447    MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs)
1448        .context(InvalidPartitionSnafu)?;
1449
1450    Ok((
1451        partition_entries
1452            .into_iter()
1453            .map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x)))
1454            .collect::<std::result::Result<_, _>>()
1455            .context(DeserializePartitionSnafu)?,
1456        partition_columns,
1457    ))
1458}
1459
1460fn create_table_info(
1461    create_table: &CreateTableExpr,
1462    partition_columns: Vec<String>,
1463) -> Result<RawTableInfo> {
1464    let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
1465    let mut column_name_to_index_map = HashMap::new();
1466
1467    for (idx, column) in create_table.column_defs.iter().enumerate() {
1468        let schema =
1469            column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
1470                column: &column.name,
1471            })?;
1472        let schema = schema.with_time_index(column.name == create_table.time_index);
1473
1474        column_schemas.push(schema);
1475        let _ = column_name_to_index_map.insert(column.name.clone(), idx);
1476    }
1477
1478    let timestamp_index = column_name_to_index_map
1479        .get(&create_table.time_index)
1480        .cloned();
1481
1482    let raw_schema = RawSchema {
1483        column_schemas: column_schemas.clone(),
1484        timestamp_index,
1485        version: 0,
1486    };
1487
1488    let primary_key_indices = create_table
1489        .primary_keys
1490        .iter()
1491        .map(|name| {
1492            column_name_to_index_map
1493                .get(name)
1494                .cloned()
1495                .context(ColumnNotFoundSnafu { msg: name })
1496        })
1497        .collect::<Result<Vec<_>>>()?;
1498
1499    let partition_key_indices = partition_columns
1500        .into_iter()
1501        .map(|col_name| {
1502            column_name_to_index_map
1503                .get(&col_name)
1504                .cloned()
1505                .context(ColumnNotFoundSnafu { msg: col_name })
1506        })
1507        .collect::<Result<Vec<_>>>()?;
1508
1509    let table_options = TableOptions::try_from_iter(&create_table.table_options)
1510        .context(UnrecognizedTableOptionSnafu)?;
1511
1512    let meta = RawTableMeta {
1513        schema: raw_schema,
1514        primary_key_indices,
1515        value_indices: vec![],
1516        engine: create_table.engine.clone(),
1517        next_column_id: column_schemas.len() as u32,
1518        region_numbers: vec![],
1519        options: table_options,
1520        created_on: Utc::now(),
1521        partition_key_indices,
1522    };
1523
1524    let desc = if create_table.desc.is_empty() {
1525        create_table.table_options.get(COMMENT_KEY).cloned()
1526    } else {
1527        Some(create_table.desc.clone())
1528    };
1529
1530    let table_info = RawTableInfo {
1531        ident: metadata::TableIdent {
1532            // The table id of distributed table is assigned by Meta, set "0" here as a placeholder.
1533            table_id: 0,
1534            version: 0,
1535        },
1536        name: create_table.table_name.clone(),
1537        desc,
1538        catalog_name: create_table.catalog_name.clone(),
1539        schema_name: create_table.schema_name.clone(),
1540        meta,
1541        table_type: TableType::Base,
1542    };
1543    Ok(table_info)
1544}
1545
1546fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
1547    let columns = if let Some(partitions) = partitions {
1548        partitions
1549            .column_list
1550            .iter()
1551            .map(|x| x.value.clone())
1552            .collect::<Vec<_>>()
1553    } else {
1554        vec![]
1555    };
1556    Ok(columns)
1557}
1558
1559/// Parse [Partitions] into a group of partition entries.
1560///
1561/// Returns a list of [PartitionBound], each of which defines a partition.
1562fn find_partition_entries(
1563    create_table: &CreateTableExpr,
1564    partitions: &Option<Partitions>,
1565    partition_columns: &[String],
1566    query_ctx: &QueryContextRef,
1567) -> Result<Vec<Vec<PartitionBound>>> {
1568    let entries = if let Some(partitions) = partitions {
1569        // extract concrete data type of partition columns
1570        let column_defs = partition_columns
1571            .iter()
1572            .map(|pc| {
1573                create_table
1574                    .column_defs
1575                    .iter()
1576                    .find(|c| &c.name == pc)
1577                    // unwrap is safe here because we have checked that partition columns are defined
1578                    .unwrap()
1579            })
1580            .collect::<Vec<_>>();
1581        let mut column_name_and_type = HashMap::with_capacity(column_defs.len());
1582        for column in column_defs {
1583            let column_name = &column.name;
1584            let data_type = ConcreteDataType::from(
1585                ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension)
1586                    .context(ColumnDataTypeSnafu)?,
1587            );
1588            column_name_and_type.insert(column_name, data_type);
1589        }
1590
1591        // Transform parser expr to partition expr
1592        let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
1593        for partition in &partitions.exprs {
1594            let partition_expr =
1595                convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
1596            partition_exprs.push(vec![PartitionBound::Expr(partition_expr)]);
1597        }
1598
1599        // fallback for no expr
1600        if partition_exprs.is_empty() {
1601            partition_exprs.push(vec![PartitionBound::MaxValue]);
1602        }
1603
1604        partition_exprs
1605    } else {
1606        vec![vec![PartitionBound::MaxValue]]
1607    };
1608    Ok(entries)
1609}
1610
1611fn convert_one_expr(
1612    expr: &Expr,
1613    column_name_and_type: &HashMap<&String, ConcreteDataType>,
1614    timezone: &Timezone,
1615) -> Result<PartitionExpr> {
1616    let Expr::BinaryOp { left, op, right } = expr else {
1617        return InvalidPartitionRuleSnafu {
1618            reason: "partition rule must be a binary expression",
1619        }
1620        .fail();
1621    };
1622
1623    let op =
1624        RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
1625            reason: format!("unsupported operator in partition expr {op}"),
1626        })?;
1627
1628    // convert leaf node.
1629    let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
1630        // col, val
1631        (Expr::Identifier(ident), Expr::Value(value)) => {
1632            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1633            let value = convert_value(value, data_type, timezone, None)?;
1634            (Operand::Column(column_name), op, Operand::Value(value))
1635        }
1636        (Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
1637            if let Expr::Value(v) = &**expr =>
1638        {
1639            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1640            let value = convert_value(v, data_type, timezone, Some(*unary_op))?;
1641            (Operand::Column(column_name), op, Operand::Value(value))
1642        }
1643        // val, col
1644        (Expr::Value(value), Expr::Identifier(ident)) => {
1645            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1646            let value = convert_value(value, data_type, timezone, None)?;
1647            (Operand::Value(value), op, Operand::Column(column_name))
1648        }
1649        (Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
1650            if let Expr::Value(v) = &**expr =>
1651        {
1652            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1653            let value = convert_value(v, data_type, timezone, Some(*unary_op))?;
1654            (Operand::Value(value), op, Operand::Column(column_name))
1655        }
1656        (Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
1657            // sub-expr must against another sub-expr
1658            let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
1659            let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
1660            (Operand::Expr(lhs), op, Operand::Expr(rhs))
1661        }
1662        _ => {
1663            return InvalidPartitionRuleSnafu {
1664                reason: format!("invalid partition expr {expr}"),
1665            }
1666            .fail();
1667        }
1668    };
1669
1670    Ok(PartitionExpr::new(lhs, op, rhs))
1671}
1672
1673fn convert_identifier(
1674    ident: &Ident,
1675    column_name_and_type: &HashMap<&String, ConcreteDataType>,
1676) -> Result<(String, ConcreteDataType)> {
1677    let column_name = ident.value.clone();
1678    let data_type = column_name_and_type
1679        .get(&column_name)
1680        .cloned()
1681        .with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
1682    Ok((column_name, data_type))
1683}
1684
1685fn convert_value(
1686    value: &ParserValue,
1687    data_type: ConcreteDataType,
1688    timezone: &Timezone,
1689    unary_op: Option<UnaryOperator>,
1690) -> Result<Value> {
1691    sql_value_to_value(
1692        "<NONAME>",
1693        &data_type,
1694        value,
1695        Some(timezone),
1696        unary_op,
1697        false,
1698    )
1699    .context(ParseSqlValueSnafu)
1700}
1701
1702#[cfg(test)]
1703mod test {
1704    use session::context::{QueryContext, QueryContextBuilder};
1705    use sql::dialect::GreptimeDbDialect;
1706    use sql::parser::{ParseOptions, ParserContext};
1707    use sql::statements::statement::Statement;
1708
1709    use super::*;
1710    use crate::expr_helper;
1711
1712    #[test]
1713    fn test_name_is_match() {
1714        assert!(!NAME_PATTERN_REG.is_match("/adaf"));
1715        assert!(!NAME_PATTERN_REG.is_match("🈲"));
1716        assert!(NAME_PATTERN_REG.is_match("hello"));
1717        assert!(NAME_PATTERN_REG.is_match("test@"));
1718        assert!(!NAME_PATTERN_REG.is_match("@test"));
1719        assert!(NAME_PATTERN_REG.is_match("test#"));
1720        assert!(!NAME_PATTERN_REG.is_match("#test"));
1721        assert!(!NAME_PATTERN_REG.is_match("@"));
1722        assert!(!NAME_PATTERN_REG.is_match("#"));
1723    }
1724
1725    #[tokio::test]
1726    #[ignore = "TODO(ruihang): WIP new partition rule"]
1727    async fn test_parse_partitions() {
1728        common_telemetry::init_default_ut_logging();
1729        let cases = [
1730            (
1731                r"
1732CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
1733PARTITION ON COLUMNS (b) (
1734  b < 'hz',
1735  b >= 'hz' AND b < 'sh',
1736  b >= 'sh'
1737)
1738ENGINE=mito",
1739                r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
1740            ),
1741            (
1742                r"
1743CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
1744PARTITION BY RANGE COLUMNS (b, a) (
1745  PARTITION r0 VALUES LESS THAN ('hz', 10),
1746  b < 'hz' AND a < 10,
1747  b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20,
1748  b >= 'sh' AND a >= 20
1749)
1750ENGINE=mito",
1751                r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
1752            ),
1753        ];
1754        let ctx = QueryContextBuilder::default().build().into();
1755        for (sql, expected) in cases {
1756            let result = ParserContext::create_with_dialect(
1757                sql,
1758                &GreptimeDbDialect {},
1759                ParseOptions::default(),
1760            )
1761            .unwrap();
1762            match &result[0] {
1763                Statement::CreateTable(c) => {
1764                    let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
1765                    let (partitions, _) =
1766                        parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
1767                    let json = serde_json::to_string(&partitions).unwrap();
1768                    assert_eq!(json, expected);
1769                }
1770                _ => unreachable!(),
1771            }
1772        }
1773    }
1774}