Skip to main content

common_meta/
ddl_manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::Repartition;
19use api::v1::alter_table_expr::Kind;
20use api::v1::repartition::Source as PbRepartitionSource;
21use common_error::ext::BoxedError;
22use common_procedure::{
23    BoxedProcedure, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef,
24    ProcedureWithId, watcher,
25};
26use common_telemetry::tracing_context::{FutureExt, TracingContext};
27use common_telemetry::{debug, info, tracing};
28use derive_builder::Builder;
29use snafu::{OptionExt, ResultExt, ensure};
30use store_api::storage::TableId;
31use table::table_name::TableName;
32
33use crate::ddl::alter_database::AlterDatabaseProcedure;
34use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
35use crate::ddl::alter_table::AlterTableProcedure;
36use crate::ddl::comment_on::CommentOnProcedure;
37use crate::ddl::create_database::CreateDatabaseProcedure;
38use crate::ddl::create_flow::CreateFlowProcedure;
39use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
40use crate::ddl::create_table::CreateTableProcedure;
41use crate::ddl::create_view::CreateViewProcedure;
42use crate::ddl::drop_database::DropDatabaseProcedure;
43use crate::ddl::drop_flow::DropFlowProcedure;
44use crate::ddl::drop_table::DropTableProcedure;
45use crate::ddl::drop_view::DropViewProcedure;
46use crate::ddl::truncate_table::TruncateTableProcedure;
47use crate::ddl::{DdlContext, utils};
48use crate::error::{
49    self, CreateRepartitionProcedureSnafu, EmptyDdlTasksSnafu, ProcedureOutputSnafu,
50    RegisterProcedureLoaderSnafu, RegisterRepartitionProcedureLoaderSnafu, Result,
51    SubmitProcedureSnafu, TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
52    UnexpectedLogicalRouteTableSnafu, WaitProcedureSnafu,
53};
54use crate::key::table_info::TableInfoValue;
55use crate::key::table_name::TableNameKey;
56use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
57use crate::procedure_executor::ExecutorContext;
58#[cfg(feature = "enterprise")]
59use crate::rpc::ddl::DdlTask::CreateTrigger;
60#[cfg(feature = "enterprise")]
61use crate::rpc::ddl::DdlTask::DropTrigger;
62use crate::rpc::ddl::DdlTask::{
63    AlterDatabase, AlterLogicalTables, AlterTable, CommentOn, CreateDatabase, CreateFlow,
64    CreateLogicalTables, CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables,
65    DropTable, DropView, TruncateTable,
66};
67#[cfg(feature = "enterprise")]
68use crate::rpc::ddl::trigger::CreateTriggerTask;
69#[cfg(feature = "enterprise")]
70use crate::rpc::ddl::trigger::DropTriggerTask;
71use crate::rpc::ddl::{
72    AlterDatabaseTask, AlterTableTask, CommentOnTask, CreateDatabaseTask, CreateFlowTask,
73    CreateTableTask, CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask,
74    QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
75};
76
77/// A configurator that customizes or enhances a [`DdlManager`].
78#[async_trait::async_trait]
79pub trait DdlManagerConfigurator<C>: Send + Sync {
80    /// Configures the given [`DdlManager`] using the provided [`DdlManagerConfigureContext`].
81    async fn configure(
82        &self,
83        ddl_manager: DdlManager,
84        ctx: C,
85    ) -> std::result::Result<DdlManager, BoxedError>;
86}
87
88pub type DdlManagerConfiguratorRef<C> = Arc<dyn DdlManagerConfigurator<C>>;
89
90pub type DdlManagerRef = Arc<DdlManager>;
91
92pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader;
93
94/// The [DdlManager] provides the ability to execute Ddl.
95#[derive(Builder)]
96pub struct DdlManager {
97    ddl_context: DdlContext,
98    procedure_manager: ProcedureManagerRef,
99    repartition_procedure_factory: RepartitionProcedureFactoryRef,
100    #[cfg(feature = "enterprise")]
101    trigger_ddl_manager: Option<TriggerDdlManagerRef>,
102}
103
104/// This trait is responsible for handling DDL tasks about triggers. e.g.,
105/// create trigger, drop trigger, etc.
106#[cfg(feature = "enterprise")]
107#[async_trait::async_trait]
108pub trait TriggerDdlManager: Send + Sync {
109    async fn create_trigger(
110        &self,
111        create_trigger_task: CreateTriggerTask,
112        procedure_manager: ProcedureManagerRef,
113        ddl_context: DdlContext,
114        query_context: QueryContext,
115    ) -> Result<SubmitDdlTaskResponse>;
116
117    async fn drop_trigger(
118        &self,
119        drop_trigger_task: DropTriggerTask,
120        procedure_manager: ProcedureManagerRef,
121        ddl_context: DdlContext,
122        query_context: QueryContext,
123    ) -> Result<SubmitDdlTaskResponse>;
124
125    fn as_any(&self) -> &dyn std::any::Any;
126}
127
128#[cfg(feature = "enterprise")]
129pub type TriggerDdlManagerRef = Arc<dyn TriggerDdlManager>;
130
131macro_rules! procedure_loader_entry {
132    ($procedure:ident) => {
133        (
134            $procedure::TYPE_NAME,
135            &|context: DdlContext| -> BoxedProcedureLoader {
136                Box::new(move |json: &str| {
137                    let context = context.clone();
138                    $procedure::from_json(json, context).map(|p| Box::new(p) as _)
139                })
140            },
141        )
142    };
143}
144
145macro_rules! procedure_loader {
146    ($($procedure:ident),*) => {
147        vec![
148            $(procedure_loader_entry!($procedure)),*
149        ]
150    };
151}
152
153pub type RepartitionProcedureFactoryRef = Arc<dyn RepartitionProcedureFactory>;
154
155pub enum RepartitionSource {
156    Partitioned {
157        exprs: Vec<String>,
158        /// Full target partition columns to overwrite table metadata.
159        ///
160        /// `None` means the repartition keeps using the current table
161        /// partition columns, so the procedure won't update
162        /// `partition_key_indices`.
163        target_partition_columns: Option<Vec<String>>,
164    },
165    Unpartitioned {
166        partition_columns: Vec<String>,
167    },
168}
169
170pub trait RepartitionProcedureFactory: Send + Sync {
171    fn create(
172        &self,
173        ddl_ctx: &DdlContext,
174        table_name: TableName,
175        table_id: TableId,
176        source: RepartitionSource,
177        to_exprs: Vec<String>,
178        timeout: Option<Duration>,
179    ) -> std::result::Result<BoxedProcedure, BoxedError>;
180
181    fn register_loaders(
182        &self,
183        ddl_ctx: &DdlContext,
184        procedure_manager: &ProcedureManagerRef,
185    ) -> std::result::Result<(), BoxedError>;
186}
187
188/// The options for DDL tasks.
189///
190/// Note: These options may not be utilized by all procedures.
191/// At present, they are specifically applied in `RepartitionProcedure`.
192#[derive(Debug, Clone, Copy)]
193pub struct DdlOptions {
194    /// The timeout will be passed to the procedure.
195    ///
196    /// Note: Each procedure may implement its own timeout handling mechanism.
197    pub timeout: Duration,
198    /// The flag that controls whether to wait for the procedure to complete.
199    ///
200    /// If wait is `true`, the procedure will wait for completion(success or failure) and the result will be returned.
201    /// Otherwise, the procedure will be submitted and return the [ProcedureId](common_procedure::ProcedureId) immediately.
202    ///
203    /// Note: The value of `wait` is independent of the `timeout` option. If a procedure ignores the `timeout` and `wait` is set to true, the operation returns until the procedure completes.
204    pub wait: bool,
205}
206
207impl DdlManager {
208    /// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered.
209    pub fn try_new(
210        ddl_context: DdlContext,
211        procedure_manager: ProcedureManagerRef,
212        repartition_procedure_factory: RepartitionProcedureFactoryRef,
213        register_loaders: bool,
214    ) -> Result<Self> {
215        let manager = Self {
216            ddl_context,
217            procedure_manager,
218            repartition_procedure_factory,
219            #[cfg(feature = "enterprise")]
220            trigger_ddl_manager: None,
221        };
222        if register_loaders {
223            manager.register_loaders()?;
224        }
225        Ok(manager)
226    }
227
228    #[cfg(feature = "enterprise")]
229    pub fn with_trigger_ddl_manager(mut self, trigger_ddl_manager: TriggerDdlManagerRef) -> Self {
230        self.trigger_ddl_manager = Some(trigger_ddl_manager);
231        self
232    }
233
234    /// Returns the [TableMetadataManagerRef].
235    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
236        &self.ddl_context.table_metadata_manager
237    }
238
239    /// Returns the [DdlContext]
240    pub fn create_context(&self) -> DdlContext {
241        self.ddl_context.clone()
242    }
243
244    /// Registers all Ddl loaders.
245    pub fn register_loaders(&self) -> Result<()> {
246        let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = procedure_loader!(
247            CreateTableProcedure,
248            CreateLogicalTablesProcedure,
249            CreateViewProcedure,
250            CreateFlowProcedure,
251            AlterTableProcedure,
252            AlterLogicalTablesProcedure,
253            AlterDatabaseProcedure,
254            DropTableProcedure,
255            DropFlowProcedure,
256            TruncateTableProcedure,
257            CreateDatabaseProcedure,
258            DropDatabaseProcedure,
259            DropViewProcedure,
260            CommentOnProcedure
261        );
262
263        for (type_name, loader_factory) in loaders {
264            let context = self.create_context();
265            self.procedure_manager
266                .register_loader(type_name, loader_factory(context))
267                .context(RegisterProcedureLoaderSnafu { type_name })?;
268        }
269
270        self.repartition_procedure_factory
271            .register_loaders(&self.ddl_context, &self.procedure_manager)
272            .context(RegisterRepartitionProcedureLoaderSnafu)?;
273
274        Ok(())
275    }
276
277    /// Submits a repartition procedure for the specified table.
278    ///
279    /// This creates a repartition procedure using the provided `table_id`,
280    /// `table_name`, and `Repartition` configuration, and then either executes it
281    /// to completion or just submits it for asynchronous execution.
282    ///
283    /// The `Repartition` argument contains the original (`from_partition_exprs`)
284    /// and target (`into_partition_exprs`) partition expressions that define how
285    /// the table should be repartitioned.
286    ///
287    /// The `wait` flag controls whether this method waits for the repartition
288    /// procedure to finish:
289    /// - If `wait` is `true`, the procedure is executed and this method awaits
290    ///   its completion, returning both the generated `ProcedureId` and the
291    ///   final `Output` of the procedure.
292    /// - If `wait` is `false`, the procedure is only submitted to the procedure
293    ///   manager for asynchronous execution, and this method returns the
294    ///   `ProcedureId` along with `None` as the output.
295    async fn submit_repartition_task(
296        &self,
297        table_id: TableId,
298        table_name: TableName,
299        repartition: Repartition,
300        wait: bool,
301        timeout: Duration,
302    ) -> Result<(ProcedureId, Option<Output>)> {
303        let context = self.create_context();
304
305        let into_partition_exprs = repartition.into_partition_exprs;
306        let source = repartition.source;
307
308        let source = match source {
309            Some(PbRepartitionSource::PartitionExprs(source)) => RepartitionSource::Partitioned {
310                exprs: source.exprs,
311                target_partition_columns: source
312                    .target_partition_columns
313                    .map(|columns| columns.columns),
314            },
315            Some(PbRepartitionSource::Unpartitioned(source)) => RepartitionSource::Unpartitioned {
316                partition_columns: source.partition_columns,
317            },
318            None => {
319                // Reads the deprecated field for backward compatibility with old persisted DDL tasks.
320                #[allow(deprecated)]
321                RepartitionSource::Partitioned {
322                    exprs: repartition.from_partition_exprs,
323                    target_partition_columns: None,
324                }
325            }
326        };
327
328        let procedure = self
329            .repartition_procedure_factory
330            .create(
331                &context,
332                table_name,
333                table_id,
334                source,
335                into_partition_exprs,
336                Some(timeout),
337            )
338            .context(CreateRepartitionProcedureSnafu)?;
339        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
340        if wait {
341            self.execute_procedure_and_wait(procedure_with_id).await
342        } else {
343            self.submit_procedure(procedure_with_id)
344                .await
345                .map(|p| (p, None))
346        }
347    }
348
349    /// Submits and executes an alter table task.
350    #[tracing::instrument(skip_all)]
351    pub async fn submit_alter_table_task(
352        &self,
353        table_id: TableId,
354        alter_table_task: AlterTableTask,
355        ddl_options: DdlOptions,
356    ) -> Result<(ProcedureId, Option<Output>)> {
357        // make alter_table_task mutable so we can call .take() on its field
358        let mut alter_table_task = alter_table_task;
359        if let Some(Kind::Repartition(_)) = alter_table_task.alter_table.kind.as_ref()
360            && let Kind::Repartition(repartition) =
361                alter_table_task.alter_table.kind.take().unwrap()
362        {
363            let table_name = TableName::new(
364                alter_table_task.alter_table.catalog_name,
365                alter_table_task.alter_table.schema_name,
366                alter_table_task.alter_table.table_name,
367            );
368            return self
369                .submit_repartition_task(
370                    table_id,
371                    table_name,
372                    repartition,
373                    ddl_options.wait,
374                    ddl_options.timeout,
375                )
376                .await;
377        }
378
379        let context = self.create_context();
380        let procedure = AlterTableProcedure::new(table_id, alter_table_task, context)?;
381
382        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
383
384        self.execute_procedure_and_wait(procedure_with_id).await
385    }
386
387    /// Submits and executes a create table task.
388    #[tracing::instrument(skip_all)]
389    pub async fn submit_create_table_task(
390        &self,
391        create_table_task: CreateTableTask,
392        query_context: QueryContext,
393    ) -> Result<(ProcedureId, Option<Output>)> {
394        let context = self.create_context();
395
396        let procedure = CreateTableProcedure::new_with_query_context(
397            create_table_task,
398            query_context,
399            context,
400        )?;
401
402        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
403
404        self.execute_procedure_and_wait(procedure_with_id).await
405    }
406
407    /// Submits and executes a `[CreateViewTask]`.
408    #[tracing::instrument(skip_all)]
409    pub async fn submit_create_view_task(
410        &self,
411        create_view_task: CreateViewTask,
412    ) -> Result<(ProcedureId, Option<Output>)> {
413        let context = self.create_context();
414
415        let procedure = CreateViewProcedure::new(create_view_task, context);
416
417        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
418
419        self.execute_procedure_and_wait(procedure_with_id).await
420    }
421
422    /// Submits and executes a create multiple logical table tasks.
423    #[tracing::instrument(skip_all)]
424    pub async fn submit_create_logical_table_tasks(
425        &self,
426        create_table_tasks: Vec<CreateTableTask>,
427        physical_table_id: TableId,
428    ) -> Result<(ProcedureId, Option<Output>)> {
429        let context = self.create_context();
430
431        let procedure =
432            CreateLogicalTablesProcedure::new(create_table_tasks, physical_table_id, context);
433
434        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
435
436        self.execute_procedure_and_wait(procedure_with_id).await
437    }
438
439    /// Submits and executes alter multiple table tasks.
440    #[tracing::instrument(skip_all)]
441    pub async fn submit_alter_logical_table_tasks(
442        &self,
443        alter_table_tasks: Vec<AlterTableTask>,
444        physical_table_id: TableId,
445    ) -> Result<(ProcedureId, Option<Output>)> {
446        let context = self.create_context();
447
448        let procedure =
449            AlterLogicalTablesProcedure::new(alter_table_tasks, physical_table_id, context);
450
451        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
452
453        self.execute_procedure_and_wait(procedure_with_id).await
454    }
455
456    /// Submits and executes a drop table task.
457    #[tracing::instrument(skip_all)]
458    pub async fn submit_drop_table_task(
459        &self,
460        drop_table_task: DropTableTask,
461    ) -> Result<(ProcedureId, Option<Output>)> {
462        let context = self.create_context();
463
464        let procedure = DropTableProcedure::new(drop_table_task, context);
465
466        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
467
468        self.execute_procedure_and_wait(procedure_with_id).await
469    }
470
471    /// Submits and executes a create database task.
472    #[tracing::instrument(skip_all)]
473    pub async fn submit_create_database(
474        &self,
475        CreateDatabaseTask {
476            catalog,
477            schema,
478            create_if_not_exists,
479            options,
480        }: CreateDatabaseTask,
481    ) -> Result<(ProcedureId, Option<Output>)> {
482        let context = self.create_context();
483        let procedure =
484            CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context);
485        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
486
487        self.execute_procedure_and_wait(procedure_with_id).await
488    }
489
490    /// Submits and executes a drop table task.
491    #[tracing::instrument(skip_all)]
492    pub async fn submit_drop_database(
493        &self,
494        DropDatabaseTask {
495            catalog,
496            schema,
497            drop_if_exists,
498        }: DropDatabaseTask,
499    ) -> Result<(ProcedureId, Option<Output>)> {
500        let context = self.create_context();
501        let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context);
502        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
503
504        self.execute_procedure_and_wait(procedure_with_id).await
505    }
506
507    pub async fn submit_alter_database(
508        &self,
509        alter_database_task: AlterDatabaseTask,
510    ) -> Result<(ProcedureId, Option<Output>)> {
511        let context = self.create_context();
512        let procedure = AlterDatabaseProcedure::new(alter_database_task, context)?;
513        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
514
515        self.execute_procedure_and_wait(procedure_with_id).await
516    }
517
518    /// Submits and executes a create flow task.
519    #[tracing::instrument(skip_all)]
520    pub async fn submit_create_flow_task(
521        &self,
522        create_flow: CreateFlowTask,
523        query_context: QueryContext,
524    ) -> Result<(ProcedureId, Option<Output>)> {
525        let context = self.create_context();
526        let procedure = CreateFlowProcedure::new(create_flow, query_context, context);
527        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
528
529        self.execute_procedure_and_wait(procedure_with_id).await
530    }
531
532    /// Submits and executes a drop flow task.
533    #[tracing::instrument(skip_all)]
534    pub async fn submit_drop_flow_task(
535        &self,
536        drop_flow: DropFlowTask,
537    ) -> Result<(ProcedureId, Option<Output>)> {
538        let context = self.create_context();
539        let procedure = DropFlowProcedure::new(drop_flow, context);
540        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
541
542        self.execute_procedure_and_wait(procedure_with_id).await
543    }
544
545    /// Submits and executes a drop view task.
546    #[tracing::instrument(skip_all)]
547    pub async fn submit_drop_view_task(
548        &self,
549        drop_view: DropViewTask,
550    ) -> Result<(ProcedureId, Option<Output>)> {
551        let context = self.create_context();
552        let procedure = DropViewProcedure::new(drop_view, context);
553        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
554
555        self.execute_procedure_and_wait(procedure_with_id).await
556    }
557
558    /// Submits and executes a truncate table task.
559    #[tracing::instrument(skip_all)]
560    pub async fn submit_truncate_table_task(
561        &self,
562        truncate_table_task: TruncateTableTask,
563        table_info_value: DeserializedValueWithBytes<TableInfoValue>,
564    ) -> Result<(ProcedureId, Option<Output>)> {
565        let context = self.create_context();
566        let procedure = TruncateTableProcedure::new(truncate_table_task, table_info_value, context);
567
568        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
569
570        self.execute_procedure_and_wait(procedure_with_id).await
571    }
572
573    /// Submits and executes a comment on task.
574    #[tracing::instrument(skip_all)]
575    pub async fn submit_comment_on_task(
576        &self,
577        comment_on_task: CommentOnTask,
578    ) -> Result<(ProcedureId, Option<Output>)> {
579        let context = self.create_context();
580        let procedure = CommentOnProcedure::new(comment_on_task, context);
581        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
582
583        self.execute_procedure_and_wait(procedure_with_id).await
584    }
585
586    /// Executes a procedure and waits for the result.
587    async fn execute_procedure_and_wait(
588        &self,
589        procedure_with_id: ProcedureWithId,
590    ) -> Result<(ProcedureId, Option<Output>)> {
591        let procedure_id = procedure_with_id.id;
592
593        let mut watcher = self
594            .procedure_manager
595            .submit(procedure_with_id)
596            .await
597            .context(SubmitProcedureSnafu)?;
598
599        let output = watcher::wait(&mut watcher)
600            .await
601            .context(WaitProcedureSnafu)?;
602
603        Ok((procedure_id, output))
604    }
605
606    /// Submits a procedure and returns the procedure id.
607    async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result<ProcedureId> {
608        let procedure_id = procedure_with_id.id;
609        let _ = self
610            .procedure_manager
611            .submit(procedure_with_id)
612            .await
613            .context(SubmitProcedureSnafu)?;
614
615        Ok(procedure_id)
616    }
617
618    pub async fn submit_ddl_task(
619        &self,
620        ctx: &ExecutorContext,
621        request: SubmitDdlTaskRequest,
622    ) -> Result<SubmitDdlTaskResponse> {
623        let span = ctx
624            .tracing_context
625            .as_ref()
626            .map(TracingContext::from_w3c)
627            .unwrap_or_else(TracingContext::from_current_span)
628            .attach(tracing::info_span!("DdlManager::submit_ddl_task"));
629        let ddl_options = DdlOptions {
630            wait: request.wait,
631            timeout: request.timeout,
632        };
633        async move {
634            debug!("Submitting Ddl task: {:?}", request.task);
635            match request.task {
636                CreateTable(create_table_task) => {
637                    handle_create_table_task(self, create_table_task, request.query_context).await
638                }
639                DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await,
640                AlterTable(alter_table_task) => {
641                    handle_alter_table_task(self, alter_table_task, ddl_options).await
642                }
643                TruncateTable(truncate_table_task) => {
644                    handle_truncate_table_task(self, truncate_table_task).await
645                }
646                CreateLogicalTables(create_table_tasks) => {
647                    handle_create_logical_table_tasks(self, create_table_tasks).await
648                }
649                AlterLogicalTables(alter_table_tasks) => {
650                    handle_alter_logical_table_tasks(self, alter_table_tasks).await
651                }
652                DropLogicalTables(_) => todo!(),
653                CreateDatabase(create_database_task) => {
654                    handle_create_database_task(self, create_database_task).await
655                }
656                DropDatabase(drop_database_task) => {
657                    handle_drop_database_task(self, drop_database_task).await
658                }
659                AlterDatabase(alter_database_task) => {
660                    handle_alter_database_task(self, alter_database_task).await
661                }
662                CreateFlow(create_flow_task) => {
663                    handle_create_flow_task(self, create_flow_task, request.query_context).await
664                }
665                DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
666                CreateView(create_view_task) => {
667                    handle_create_view_task(self, create_view_task).await
668                }
669                DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
670                CommentOn(comment_on_task) => handle_comment_on_task(self, comment_on_task).await,
671                #[cfg(feature = "enterprise")]
672                CreateTrigger(create_trigger_task) => {
673                    handle_create_trigger_task(self, create_trigger_task, request.query_context)
674                        .await
675                }
676                #[cfg(feature = "enterprise")]
677                DropTrigger(drop_trigger_task) => {
678                    handle_drop_trigger_task(self, drop_trigger_task, request.query_context).await
679                }
680            }
681        }
682        .trace(span)
683        .await
684    }
685}
686
687async fn handle_truncate_table_task(
688    ddl_manager: &DdlManager,
689    truncate_table_task: TruncateTableTask,
690) -> Result<SubmitDdlTaskResponse> {
691    let table_id = truncate_table_task.table_id;
692    let table_metadata_manager = &ddl_manager.table_metadata_manager();
693    let table_ref = truncate_table_task.table_ref();
694
695    let table_info_value = table_metadata_manager
696        .table_info_manager()
697        .get(table_id)
698        .await?
699        .with_context(|| TableInfoNotFoundSnafu {
700            table: table_ref.to_string(),
701        })?;
702    let physical_table_id = table_metadata_manager
703        .table_route_manager()
704        .get_physical_table_id(table_id)
705        .await?;
706    ensure!(
707        physical_table_id == table_id,
708        error::UnexpectedSnafu {
709            err_msg: "Truncate table is only supported for physical tables."
710        }
711    );
712
713    let (id, _) = ddl_manager
714        .submit_truncate_table_task(truncate_table_task, table_info_value)
715        .await?;
716
717    info!("Table: {table_id} is truncated via procedure_id {id:?}");
718
719    Ok(SubmitDdlTaskResponse {
720        key: id.to_string().into(),
721        ..Default::default()
722    })
723}
724
725async fn handle_alter_table_task(
726    ddl_manager: &DdlManager,
727    alter_table_task: AlterTableTask,
728    ddl_options: DdlOptions,
729) -> Result<SubmitDdlTaskResponse> {
730    let table_ref = alter_table_task.table_ref();
731
732    let table_id = ddl_manager
733        .table_metadata_manager()
734        .table_name_manager()
735        .get(TableNameKey::new(
736            table_ref.catalog,
737            table_ref.schema,
738            table_ref.table,
739        ))
740        .await?
741        .with_context(|| TableNotFoundSnafu {
742            table_name: table_ref.to_string(),
743        })?
744        .table_id();
745
746    let table_route_value = ddl_manager
747        .table_metadata_manager()
748        .table_route_manager()
749        .table_route_storage()
750        .get(table_id)
751        .await?
752        .context(TableRouteNotFoundSnafu { table_id })?;
753    ensure!(
754        table_route_value.is_physical(),
755        UnexpectedLogicalRouteTableSnafu {
756            err_msg: format!("{:?} is a non-physical TableRouteValue.", table_ref),
757        }
758    );
759
760    let (id, _) = ddl_manager
761        .submit_alter_table_task(table_id, alter_table_task, ddl_options)
762        .await?;
763
764    info!("Table: {table_id} is altered via procedure_id {id:?}");
765
766    Ok(SubmitDdlTaskResponse {
767        key: id.to_string().into(),
768        ..Default::default()
769    })
770}
771
772async fn handle_drop_table_task(
773    ddl_manager: &DdlManager,
774    drop_table_task: DropTableTask,
775) -> Result<SubmitDdlTaskResponse> {
776    let table_id = drop_table_task.table_id;
777    let (id, _) = ddl_manager.submit_drop_table_task(drop_table_task).await?;
778
779    info!("Table: {table_id} is dropped via procedure_id {id:?}");
780
781    Ok(SubmitDdlTaskResponse {
782        key: id.to_string().into(),
783        ..Default::default()
784    })
785}
786
787async fn handle_create_table_task(
788    ddl_manager: &DdlManager,
789    create_table_task: CreateTableTask,
790    query_context: QueryContext,
791) -> Result<SubmitDdlTaskResponse> {
792    let (id, output) = ddl_manager
793        .submit_create_table_task(create_table_task, query_context)
794        .await?;
795
796    let procedure_id = id.to_string();
797    let output = output.context(ProcedureOutputSnafu {
798        procedure_id: &procedure_id,
799        err_msg: "empty output",
800    })?;
801    let table_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
802        procedure_id: &procedure_id,
803        err_msg: "downcast to `u32`",
804    })?);
805    info!("Table: {table_id} is created via procedure_id {id:?}");
806
807    Ok(SubmitDdlTaskResponse {
808        key: procedure_id.into(),
809        table_ids: vec![table_id],
810    })
811}
812
813async fn handle_create_logical_table_tasks(
814    ddl_manager: &DdlManager,
815    create_table_tasks: Vec<CreateTableTask>,
816) -> Result<SubmitDdlTaskResponse> {
817    ensure!(
818        !create_table_tasks.is_empty(),
819        EmptyDdlTasksSnafu {
820            name: "create logical tables"
821        }
822    );
823    let physical_table_id = utils::check_and_get_physical_table_id(
824        ddl_manager.table_metadata_manager(),
825        &create_table_tasks,
826    )
827    .await?;
828    let num_logical_tables = create_table_tasks.len();
829
830    let (id, output) = ddl_manager
831        .submit_create_logical_table_tasks(create_table_tasks, physical_table_id)
832        .await?;
833
834    info!(
835        "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is created via procedure_id {id:?}"
836    );
837
838    let procedure_id = id.to_string();
839    let output = output.context(ProcedureOutputSnafu {
840        procedure_id: &procedure_id,
841        err_msg: "empty output",
842    })?;
843    let table_ids = output
844        .downcast_ref::<Vec<TableId>>()
845        .context(ProcedureOutputSnafu {
846            procedure_id: &procedure_id,
847            err_msg: "downcast to `Vec<TableId>`",
848        })?
849        .clone();
850
851    Ok(SubmitDdlTaskResponse {
852        key: procedure_id.into(),
853        table_ids,
854    })
855}
856
857async fn handle_create_database_task(
858    ddl_manager: &DdlManager,
859    create_database_task: CreateDatabaseTask,
860) -> Result<SubmitDdlTaskResponse> {
861    let (id, _) = ddl_manager
862        .submit_create_database(create_database_task.clone())
863        .await?;
864
865    let procedure_id = id.to_string();
866    info!(
867        "Database {}.{} is created via procedure_id {id:?}",
868        create_database_task.catalog, create_database_task.schema
869    );
870
871    Ok(SubmitDdlTaskResponse {
872        key: procedure_id.into(),
873        ..Default::default()
874    })
875}
876
877async fn handle_drop_database_task(
878    ddl_manager: &DdlManager,
879    drop_database_task: DropDatabaseTask,
880) -> Result<SubmitDdlTaskResponse> {
881    let (id, _) = ddl_manager
882        .submit_drop_database(drop_database_task.clone())
883        .await?;
884
885    let procedure_id = id.to_string();
886    info!(
887        "Database {}.{} is dropped via procedure_id {id:?}",
888        drop_database_task.catalog, drop_database_task.schema
889    );
890
891    Ok(SubmitDdlTaskResponse {
892        key: procedure_id.into(),
893        ..Default::default()
894    })
895}
896
897async fn handle_alter_database_task(
898    ddl_manager: &DdlManager,
899    alter_database_task: AlterDatabaseTask,
900) -> Result<SubmitDdlTaskResponse> {
901    let (id, _) = ddl_manager
902        .submit_alter_database(alter_database_task.clone())
903        .await?;
904
905    let procedure_id = id.to_string();
906    info!(
907        "Database {}.{} is altered via procedure_id {id:?}",
908        alter_database_task.catalog(),
909        alter_database_task.schema()
910    );
911
912    Ok(SubmitDdlTaskResponse {
913        key: procedure_id.into(),
914        ..Default::default()
915    })
916}
917
918async fn handle_drop_flow_task(
919    ddl_manager: &DdlManager,
920    drop_flow_task: DropFlowTask,
921) -> Result<SubmitDdlTaskResponse> {
922    let (id, _) = ddl_manager
923        .submit_drop_flow_task(drop_flow_task.clone())
924        .await?;
925
926    let procedure_id = id.to_string();
927    info!(
928        "Flow {}.{}({}) is dropped via procedure_id {id:?}",
929        drop_flow_task.catalog_name, drop_flow_task.flow_name, drop_flow_task.flow_id,
930    );
931
932    Ok(SubmitDdlTaskResponse {
933        key: procedure_id.into(),
934        ..Default::default()
935    })
936}
937
938#[cfg(feature = "enterprise")]
939async fn handle_drop_trigger_task(
940    ddl_manager: &DdlManager,
941    drop_trigger_task: DropTriggerTask,
942    query_context: QueryContext,
943) -> Result<SubmitDdlTaskResponse> {
944    let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
945        use crate::error::UnsupportedSnafu;
946
947        return UnsupportedSnafu {
948            operation: "drop trigger",
949        }
950        .fail();
951    };
952
953    m.drop_trigger(
954        drop_trigger_task,
955        ddl_manager.procedure_manager.clone(),
956        ddl_manager.ddl_context.clone(),
957        query_context,
958    )
959    .await
960}
961
962async fn handle_drop_view_task(
963    ddl_manager: &DdlManager,
964    drop_view_task: DropViewTask,
965) -> Result<SubmitDdlTaskResponse> {
966    let (id, _) = ddl_manager
967        .submit_drop_view_task(drop_view_task.clone())
968        .await?;
969
970    let procedure_id = id.to_string();
971    info!(
972        "View {}({}) is dropped via procedure_id {id:?}",
973        drop_view_task.table_ref(),
974        drop_view_task.view_id,
975    );
976
977    Ok(SubmitDdlTaskResponse {
978        key: procedure_id.into(),
979        ..Default::default()
980    })
981}
982
983async fn handle_create_flow_task(
984    ddl_manager: &DdlManager,
985    create_flow_task: CreateFlowTask,
986    query_context: QueryContext,
987) -> Result<SubmitDdlTaskResponse> {
988    let (id, output) = ddl_manager
989        .submit_create_flow_task(create_flow_task.clone(), query_context)
990        .await?;
991
992    let procedure_id = id.to_string();
993    let output = output.context(ProcedureOutputSnafu {
994        procedure_id: &procedure_id,
995        err_msg: "empty output",
996    })?;
997    let flow_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
998        procedure_id: &procedure_id,
999        err_msg: "downcast to `u32`",
1000    })?);
1001    if !create_flow_task.or_replace {
1002        info!(
1003            "Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
1004            create_flow_task.catalog_name, create_flow_task.flow_name,
1005        );
1006    } else {
1007        info!(
1008            "Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
1009            create_flow_task.catalog_name, create_flow_task.flow_name,
1010        );
1011    }
1012
1013    Ok(SubmitDdlTaskResponse {
1014        key: procedure_id.into(),
1015        ..Default::default()
1016    })
1017}
1018
1019#[cfg(feature = "enterprise")]
1020async fn handle_create_trigger_task(
1021    ddl_manager: &DdlManager,
1022    create_trigger_task: CreateTriggerTask,
1023    query_context: QueryContext,
1024) -> Result<SubmitDdlTaskResponse> {
1025    let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
1026        use crate::error::UnsupportedSnafu;
1027
1028        return UnsupportedSnafu {
1029            operation: "create trigger",
1030        }
1031        .fail();
1032    };
1033
1034    m.create_trigger(
1035        create_trigger_task,
1036        ddl_manager.procedure_manager.clone(),
1037        ddl_manager.ddl_context.clone(),
1038        query_context,
1039    )
1040    .await
1041}
1042
1043async fn handle_alter_logical_table_tasks(
1044    ddl_manager: &DdlManager,
1045    alter_table_tasks: Vec<AlterTableTask>,
1046) -> Result<SubmitDdlTaskResponse> {
1047    ensure!(
1048        !alter_table_tasks.is_empty(),
1049        EmptyDdlTasksSnafu {
1050            name: "alter logical tables"
1051        }
1052    );
1053
1054    // Use the physical table id in the first logical table, then it will be checked in the procedure.
1055    let first_table = TableNameKey {
1056        catalog: &alter_table_tasks[0].alter_table.catalog_name,
1057        schema: &alter_table_tasks[0].alter_table.schema_name,
1058        table: &alter_table_tasks[0].alter_table.table_name,
1059    };
1060    let physical_table_id =
1061        utils::get_physical_table_id(ddl_manager.table_metadata_manager(), first_table).await?;
1062    let num_logical_tables = alter_table_tasks.len();
1063
1064    let (id, _) = ddl_manager
1065        .submit_alter_logical_table_tasks(alter_table_tasks, physical_table_id)
1066        .await?;
1067
1068    info!(
1069        "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is altered via procedure_id {id:?}"
1070    );
1071
1072    let procedure_id = id.to_string();
1073
1074    Ok(SubmitDdlTaskResponse {
1075        key: procedure_id.into(),
1076        ..Default::default()
1077    })
1078}
1079
1080/// Handle the `[CreateViewTask]` and returns the DDL response when success.
1081async fn handle_create_view_task(
1082    ddl_manager: &DdlManager,
1083    create_view_task: CreateViewTask,
1084) -> Result<SubmitDdlTaskResponse> {
1085    let (id, output) = ddl_manager
1086        .submit_create_view_task(create_view_task)
1087        .await?;
1088
1089    let procedure_id = id.to_string();
1090    let output = output.context(ProcedureOutputSnafu {
1091        procedure_id: &procedure_id,
1092        err_msg: "empty output",
1093    })?;
1094    let view_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
1095        procedure_id: &procedure_id,
1096        err_msg: "downcast to `u32`",
1097    })?);
1098    info!("View: {view_id} is created via procedure_id {id:?}");
1099
1100    Ok(SubmitDdlTaskResponse {
1101        key: procedure_id.into(),
1102        table_ids: vec![view_id],
1103    })
1104}
1105
1106async fn handle_comment_on_task(
1107    ddl_manager: &DdlManager,
1108    comment_on_task: CommentOnTask,
1109) -> Result<SubmitDdlTaskResponse> {
1110    let (id, _) = ddl_manager
1111        .submit_comment_on_task(comment_on_task.clone())
1112        .await?;
1113
1114    let procedure_id = id.to_string();
1115    info!(
1116        "Comment on {}.{}.{} is updated via procedure_id {id:?}",
1117        comment_on_task.catalog_name, comment_on_task.schema_name, comment_on_task.object_name
1118    );
1119
1120    Ok(SubmitDdlTaskResponse {
1121        key: procedure_id.into(),
1122        ..Default::default()
1123    })
1124}
1125
1126#[cfg(test)]
1127mod tests {
1128    use std::sync::Arc;
1129    use std::time::Duration;
1130
1131    use common_error::ext::BoxedError;
1132    use common_procedure::local::LocalManager;
1133    use common_procedure::test_util::InMemoryPoisonStore;
1134    use common_procedure::{BoxedProcedure, ProcedureManagerRef};
1135    use store_api::storage::TableId;
1136    use table::table_name::TableName;
1137
1138    use super::DdlManager;
1139    use crate::cache_invalidator::DummyCacheInvalidator;
1140    use crate::ddl::alter_table::AlterTableProcedure;
1141    use crate::ddl::create_table::CreateTableProcedure;
1142    use crate::ddl::drop_table::DropTableProcedure;
1143    use crate::ddl::flow_meta::FlowMetadataAllocator;
1144    use crate::ddl::table_meta::TableMetadataAllocator;
1145    use crate::ddl::truncate_table::TruncateTableProcedure;
1146    use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
1147    use crate::ddl_manager::{RepartitionProcedureFactory, RepartitionSource};
1148    use crate::key::TableMetadataManager;
1149    use crate::key::flow::FlowMetadataManager;
1150    use crate::kv_backend::memory::MemoryKvBackend;
1151    use crate::node_manager::{DatanodeManager, DatanodeRef, FlownodeManager, FlownodeRef};
1152    use crate::peer::Peer;
1153    use crate::region_keeper::MemoryRegionKeeper;
1154    use crate::region_registry::LeaderRegionRegistry;
1155    use crate::sequence::SequenceBuilder;
1156    use crate::state_store::KvStateStore;
1157    use crate::wal_provider::WalProvider;
1158
1159    /// A dummy implemented [NodeManager].
1160    pub struct DummyDatanodeManager;
1161
1162    #[async_trait::async_trait]
1163    impl DatanodeManager for DummyDatanodeManager {
1164        async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
1165            unimplemented!()
1166        }
1167    }
1168
1169    #[async_trait::async_trait]
1170    impl FlownodeManager for DummyDatanodeManager {
1171        async fn flownode(&self, _node: &Peer) -> FlownodeRef {
1172            unimplemented!()
1173        }
1174    }
1175
1176    struct DummyRepartitionProcedureFactory;
1177
1178    #[async_trait::async_trait]
1179    impl RepartitionProcedureFactory for DummyRepartitionProcedureFactory {
1180        fn create(
1181            &self,
1182            _ddl_ctx: &DdlContext,
1183            _table_name: TableName,
1184            _table_id: TableId,
1185            _source: RepartitionSource,
1186            _to_exprs: Vec<String>,
1187            _timeout: Option<Duration>,
1188        ) -> std::result::Result<BoxedProcedure, BoxedError> {
1189            unimplemented!()
1190        }
1191
1192        fn register_loaders(
1193            &self,
1194            _ddl_ctx: &DdlContext,
1195            _procedure_manager: &ProcedureManagerRef,
1196        ) -> std::result::Result<(), BoxedError> {
1197            Ok(())
1198        }
1199    }
1200
1201    #[test]
1202    fn test_try_new() {
1203        let kv_backend = Arc::new(MemoryKvBackend::new());
1204        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
1205        let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
1206            Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
1207            Arc::new(WalProvider::default()),
1208        ));
1209        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
1210        let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
1211            Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()),
1212        ));
1213
1214        let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
1215        let poison_manager = Arc::new(InMemoryPoisonStore::default());
1216        let procedure_manager = Arc::new(LocalManager::new(
1217            Default::default(),
1218            state_store,
1219            poison_manager,
1220            None,
1221            None,
1222        ));
1223
1224        let _ = DdlManager::try_new(
1225            DdlContext {
1226                node_manager: Arc::new(DummyDatanodeManager),
1227                cache_invalidator: Arc::new(DummyCacheInvalidator),
1228                table_metadata_manager,
1229                table_metadata_allocator,
1230                flow_metadata_manager,
1231                flow_metadata_allocator,
1232                memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
1233                leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
1234                region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
1235            },
1236            procedure_manager.clone(),
1237            Arc::new(DummyRepartitionProcedureFactory),
1238            true,
1239        );
1240
1241        let expected_loaders = vec![
1242            CreateTableProcedure::TYPE_NAME,
1243            AlterTableProcedure::TYPE_NAME,
1244            DropTableProcedure::TYPE_NAME,
1245            TruncateTableProcedure::TYPE_NAME,
1246        ];
1247
1248        for loader in expected_loaders {
1249            assert!(procedure_manager.contains_loader(loader));
1250        }
1251    }
1252}