Skip to main content

common_meta/
ddl_manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::Repartition;
19use api::v1::alter_table_expr::Kind;
20use common_error::ext::BoxedError;
21use common_procedure::{
22    BoxedProcedure, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef,
23    ProcedureWithId, watcher,
24};
25use common_telemetry::tracing_context::{FutureExt, TracingContext};
26use common_telemetry::{debug, info, tracing};
27use derive_builder::Builder;
28use snafu::{OptionExt, ResultExt, ensure};
29use store_api::storage::TableId;
30use table::table_name::TableName;
31
32use crate::ddl::alter_database::AlterDatabaseProcedure;
33use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
34use crate::ddl::alter_table::AlterTableProcedure;
35use crate::ddl::comment_on::CommentOnProcedure;
36use crate::ddl::create_database::CreateDatabaseProcedure;
37use crate::ddl::create_flow::CreateFlowProcedure;
38use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
39use crate::ddl::create_table::CreateTableProcedure;
40use crate::ddl::create_view::CreateViewProcedure;
41use crate::ddl::drop_database::DropDatabaseProcedure;
42use crate::ddl::drop_flow::DropFlowProcedure;
43use crate::ddl::drop_table::DropTableProcedure;
44use crate::ddl::drop_view::DropViewProcedure;
45use crate::ddl::truncate_table::TruncateTableProcedure;
46use crate::ddl::{DdlContext, utils};
47use crate::error::{
48    self, CreateRepartitionProcedureSnafu, EmptyDdlTasksSnafu, ProcedureOutputSnafu,
49    RegisterProcedureLoaderSnafu, RegisterRepartitionProcedureLoaderSnafu, Result,
50    SubmitProcedureSnafu, TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
51    UnexpectedLogicalRouteTableSnafu, WaitProcedureSnafu,
52};
53use crate::key::table_info::TableInfoValue;
54use crate::key::table_name::TableNameKey;
55use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
56use crate::procedure_executor::ExecutorContext;
57#[cfg(feature = "enterprise")]
58use crate::rpc::ddl::DdlTask::CreateTrigger;
59#[cfg(feature = "enterprise")]
60use crate::rpc::ddl::DdlTask::DropTrigger;
61use crate::rpc::ddl::DdlTask::{
62    AlterDatabase, AlterLogicalTables, AlterTable, CommentOn, CreateDatabase, CreateFlow,
63    CreateLogicalTables, CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables,
64    DropTable, DropView, TruncateTable,
65};
66#[cfg(feature = "enterprise")]
67use crate::rpc::ddl::trigger::CreateTriggerTask;
68#[cfg(feature = "enterprise")]
69use crate::rpc::ddl::trigger::DropTriggerTask;
70use crate::rpc::ddl::{
71    AlterDatabaseTask, AlterTableTask, CommentOnTask, CreateDatabaseTask, CreateFlowTask,
72    CreateTableTask, CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask,
73    QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
74};
75
76/// A configurator that customizes or enhances a [`DdlManager`].
77#[async_trait::async_trait]
78pub trait DdlManagerConfigurator<C>: Send + Sync {
79    /// Configures the given [`DdlManager`] using the provided [`DdlManagerConfigureContext`].
80    async fn configure(
81        &self,
82        ddl_manager: DdlManager,
83        ctx: C,
84    ) -> std::result::Result<DdlManager, BoxedError>;
85}
86
87pub type DdlManagerConfiguratorRef<C> = Arc<dyn DdlManagerConfigurator<C>>;
88
89pub type DdlManagerRef = Arc<DdlManager>;
90
91pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader;
92
93/// The [DdlManager] provides the ability to execute Ddl.
94#[derive(Builder)]
95pub struct DdlManager {
96    ddl_context: DdlContext,
97    procedure_manager: ProcedureManagerRef,
98    repartition_procedure_factory: RepartitionProcedureFactoryRef,
99    #[cfg(feature = "enterprise")]
100    trigger_ddl_manager: Option<TriggerDdlManagerRef>,
101}
102
103/// This trait is responsible for handling DDL tasks about triggers. e.g.,
104/// create trigger, drop trigger, etc.
105#[cfg(feature = "enterprise")]
106#[async_trait::async_trait]
107pub trait TriggerDdlManager: Send + Sync {
108    async fn create_trigger(
109        &self,
110        create_trigger_task: CreateTriggerTask,
111        procedure_manager: ProcedureManagerRef,
112        ddl_context: DdlContext,
113        query_context: QueryContext,
114    ) -> Result<SubmitDdlTaskResponse>;
115
116    async fn drop_trigger(
117        &self,
118        drop_trigger_task: DropTriggerTask,
119        procedure_manager: ProcedureManagerRef,
120        ddl_context: DdlContext,
121        query_context: QueryContext,
122    ) -> Result<SubmitDdlTaskResponse>;
123
124    fn as_any(&self) -> &dyn std::any::Any;
125}
126
127#[cfg(feature = "enterprise")]
128pub type TriggerDdlManagerRef = Arc<dyn TriggerDdlManager>;
129
130macro_rules! procedure_loader_entry {
131    ($procedure:ident) => {
132        (
133            $procedure::TYPE_NAME,
134            &|context: DdlContext| -> BoxedProcedureLoader {
135                Box::new(move |json: &str| {
136                    let context = context.clone();
137                    $procedure::from_json(json, context).map(|p| Box::new(p) as _)
138                })
139            },
140        )
141    };
142}
143
144macro_rules! procedure_loader {
145    ($($procedure:ident),*) => {
146        vec![
147            $(procedure_loader_entry!($procedure)),*
148        ]
149    };
150}
151
152pub type RepartitionProcedureFactoryRef = Arc<dyn RepartitionProcedureFactory>;
153
154pub trait RepartitionProcedureFactory: Send + Sync {
155    fn create(
156        &self,
157        ddl_ctx: &DdlContext,
158        table_name: TableName,
159        table_id: TableId,
160        from_exprs: Vec<String>,
161        to_exprs: Vec<String>,
162        timeout: Option<Duration>,
163    ) -> std::result::Result<BoxedProcedure, BoxedError>;
164
165    fn register_loaders(
166        &self,
167        ddl_ctx: &DdlContext,
168        procedure_manager: &ProcedureManagerRef,
169    ) -> std::result::Result<(), BoxedError>;
170}
171
172/// The options for DDL tasks.
173///
174/// Note: These options may not be utilized by all procedures.
175/// At present, they are specifically applied in `RepartitionProcedure`.
176#[derive(Debug, Clone, Copy)]
177pub struct DdlOptions {
178    /// The timeout will be passed to the procedure.
179    ///
180    /// Note: Each procedure may implement its own timeout handling mechanism.
181    pub timeout: Duration,
182    /// The flag that controls whether to wait for the procedure to complete.
183    ///
184    /// If wait is `true`, the procedure will wait for completion(success or failure) and the result will be returned.
185    /// Otherwise, the procedure will be submitted and return the [ProcedureId](common_procedure::ProcedureId) immediately.
186    ///
187    /// Note: The value of `wait` is independent of the `timeout` option. If a procedure ignores the `timeout` and `wait` is set to true, the operation returns until the procedure completes.
188    pub wait: bool,
189}
190
191impl DdlManager {
192    /// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered.
193    pub fn try_new(
194        ddl_context: DdlContext,
195        procedure_manager: ProcedureManagerRef,
196        repartition_procedure_factory: RepartitionProcedureFactoryRef,
197        register_loaders: bool,
198    ) -> Result<Self> {
199        let manager = Self {
200            ddl_context,
201            procedure_manager,
202            repartition_procedure_factory,
203            #[cfg(feature = "enterprise")]
204            trigger_ddl_manager: None,
205        };
206        if register_loaders {
207            manager.register_loaders()?;
208        }
209        Ok(manager)
210    }
211
212    #[cfg(feature = "enterprise")]
213    pub fn with_trigger_ddl_manager(mut self, trigger_ddl_manager: TriggerDdlManagerRef) -> Self {
214        self.trigger_ddl_manager = Some(trigger_ddl_manager);
215        self
216    }
217
218    /// Returns the [TableMetadataManagerRef].
219    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
220        &self.ddl_context.table_metadata_manager
221    }
222
223    /// Returns the [DdlContext]
224    pub fn create_context(&self) -> DdlContext {
225        self.ddl_context.clone()
226    }
227
228    /// Registers all Ddl loaders.
229    pub fn register_loaders(&self) -> Result<()> {
230        let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = procedure_loader!(
231            CreateTableProcedure,
232            CreateLogicalTablesProcedure,
233            CreateViewProcedure,
234            CreateFlowProcedure,
235            AlterTableProcedure,
236            AlterLogicalTablesProcedure,
237            AlterDatabaseProcedure,
238            DropTableProcedure,
239            DropFlowProcedure,
240            TruncateTableProcedure,
241            CreateDatabaseProcedure,
242            DropDatabaseProcedure,
243            DropViewProcedure,
244            CommentOnProcedure
245        );
246
247        for (type_name, loader_factory) in loaders {
248            let context = self.create_context();
249            self.procedure_manager
250                .register_loader(type_name, loader_factory(context))
251                .context(RegisterProcedureLoaderSnafu { type_name })?;
252        }
253
254        self.repartition_procedure_factory
255            .register_loaders(&self.ddl_context, &self.procedure_manager)
256            .context(RegisterRepartitionProcedureLoaderSnafu)?;
257
258        Ok(())
259    }
260
261    /// Submits a repartition procedure for the specified table.
262    ///
263    /// This creates a repartition procedure using the provided `table_id`,
264    /// `table_name`, and `Repartition` configuration, and then either executes it
265    /// to completion or just submits it for asynchronous execution.
266    ///
267    /// The `Repartition` argument contains the original (`from_partition_exprs`)
268    /// and target (`into_partition_exprs`) partition expressions that define how
269    /// the table should be repartitioned.
270    ///
271    /// The `wait` flag controls whether this method waits for the repartition
272    /// procedure to finish:
273    /// - If `wait` is `true`, the procedure is executed and this method awaits
274    ///   its completion, returning both the generated `ProcedureId` and the
275    ///   final `Output` of the procedure.
276    /// - If `wait` is `false`, the procedure is only submitted to the procedure
277    ///   manager for asynchronous execution, and this method returns the
278    ///   `ProcedureId` along with `None` as the output.
279    async fn submit_repartition_task(
280        &self,
281        table_id: TableId,
282        table_name: TableName,
283        Repartition {
284            from_partition_exprs,
285            into_partition_exprs,
286        }: Repartition,
287        wait: bool,
288        timeout: Duration,
289    ) -> Result<(ProcedureId, Option<Output>)> {
290        let context = self.create_context();
291
292        let procedure = self
293            .repartition_procedure_factory
294            .create(
295                &context,
296                table_name,
297                table_id,
298                from_partition_exprs,
299                into_partition_exprs,
300                Some(timeout),
301            )
302            .context(CreateRepartitionProcedureSnafu)?;
303        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
304        if wait {
305            self.execute_procedure_and_wait(procedure_with_id).await
306        } else {
307            self.submit_procedure(procedure_with_id)
308                .await
309                .map(|p| (p, None))
310        }
311    }
312
313    /// Submits and executes an alter table task.
314    #[tracing::instrument(skip_all)]
315    pub async fn submit_alter_table_task(
316        &self,
317        table_id: TableId,
318        alter_table_task: AlterTableTask,
319        ddl_options: DdlOptions,
320    ) -> Result<(ProcedureId, Option<Output>)> {
321        // make alter_table_task mutable so we can call .take() on its field
322        let mut alter_table_task = alter_table_task;
323        if let Some(Kind::Repartition(_)) = alter_table_task.alter_table.kind.as_ref()
324            && let Kind::Repartition(repartition) =
325                alter_table_task.alter_table.kind.take().unwrap()
326        {
327            let table_name = TableName::new(
328                alter_table_task.alter_table.catalog_name,
329                alter_table_task.alter_table.schema_name,
330                alter_table_task.alter_table.table_name,
331            );
332            return self
333                .submit_repartition_task(
334                    table_id,
335                    table_name,
336                    repartition,
337                    ddl_options.wait,
338                    ddl_options.timeout,
339                )
340                .await;
341        }
342
343        let context = self.create_context();
344        let procedure = AlterTableProcedure::new(table_id, alter_table_task, context)?;
345
346        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
347
348        self.execute_procedure_and_wait(procedure_with_id).await
349    }
350
351    /// Submits and executes a create table task.
352    #[tracing::instrument(skip_all)]
353    pub async fn submit_create_table_task(
354        &self,
355        create_table_task: CreateTableTask,
356    ) -> Result<(ProcedureId, Option<Output>)> {
357        let context = self.create_context();
358
359        let procedure = CreateTableProcedure::new(create_table_task, context)?;
360
361        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
362
363        self.execute_procedure_and_wait(procedure_with_id).await
364    }
365
366    /// Submits and executes a `[CreateViewTask]`.
367    #[tracing::instrument(skip_all)]
368    pub async fn submit_create_view_task(
369        &self,
370        create_view_task: CreateViewTask,
371    ) -> Result<(ProcedureId, Option<Output>)> {
372        let context = self.create_context();
373
374        let procedure = CreateViewProcedure::new(create_view_task, context);
375
376        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
377
378        self.execute_procedure_and_wait(procedure_with_id).await
379    }
380
381    /// Submits and executes a create multiple logical table tasks.
382    #[tracing::instrument(skip_all)]
383    pub async fn submit_create_logical_table_tasks(
384        &self,
385        create_table_tasks: Vec<CreateTableTask>,
386        physical_table_id: TableId,
387    ) -> Result<(ProcedureId, Option<Output>)> {
388        let context = self.create_context();
389
390        let procedure =
391            CreateLogicalTablesProcedure::new(create_table_tasks, physical_table_id, context);
392
393        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
394
395        self.execute_procedure_and_wait(procedure_with_id).await
396    }
397
398    /// Submits and executes alter multiple table tasks.
399    #[tracing::instrument(skip_all)]
400    pub async fn submit_alter_logical_table_tasks(
401        &self,
402        alter_table_tasks: Vec<AlterTableTask>,
403        physical_table_id: TableId,
404    ) -> Result<(ProcedureId, Option<Output>)> {
405        let context = self.create_context();
406
407        let procedure =
408            AlterLogicalTablesProcedure::new(alter_table_tasks, physical_table_id, context);
409
410        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
411
412        self.execute_procedure_and_wait(procedure_with_id).await
413    }
414
415    /// Submits and executes a drop table task.
416    #[tracing::instrument(skip_all)]
417    pub async fn submit_drop_table_task(
418        &self,
419        drop_table_task: DropTableTask,
420    ) -> Result<(ProcedureId, Option<Output>)> {
421        let context = self.create_context();
422
423        let procedure = DropTableProcedure::new(drop_table_task, context);
424
425        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
426
427        self.execute_procedure_and_wait(procedure_with_id).await
428    }
429
430    /// Submits and executes a create database task.
431    #[tracing::instrument(skip_all)]
432    pub async fn submit_create_database(
433        &self,
434        CreateDatabaseTask {
435            catalog,
436            schema,
437            create_if_not_exists,
438            options,
439        }: CreateDatabaseTask,
440    ) -> Result<(ProcedureId, Option<Output>)> {
441        let context = self.create_context();
442        let procedure =
443            CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context);
444        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
445
446        self.execute_procedure_and_wait(procedure_with_id).await
447    }
448
449    /// Submits and executes a drop table task.
450    #[tracing::instrument(skip_all)]
451    pub async fn submit_drop_database(
452        &self,
453        DropDatabaseTask {
454            catalog,
455            schema,
456            drop_if_exists,
457        }: DropDatabaseTask,
458    ) -> Result<(ProcedureId, Option<Output>)> {
459        let context = self.create_context();
460        let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context);
461        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
462
463        self.execute_procedure_and_wait(procedure_with_id).await
464    }
465
466    pub async fn submit_alter_database(
467        &self,
468        alter_database_task: AlterDatabaseTask,
469    ) -> Result<(ProcedureId, Option<Output>)> {
470        let context = self.create_context();
471        let procedure = AlterDatabaseProcedure::new(alter_database_task, context)?;
472        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
473
474        self.execute_procedure_and_wait(procedure_with_id).await
475    }
476
477    /// Submits and executes a create flow task.
478    #[tracing::instrument(skip_all)]
479    pub async fn submit_create_flow_task(
480        &self,
481        create_flow: CreateFlowTask,
482        query_context: QueryContext,
483    ) -> Result<(ProcedureId, Option<Output>)> {
484        let context = self.create_context();
485        let procedure = CreateFlowProcedure::new(create_flow, query_context, context);
486        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
487
488        self.execute_procedure_and_wait(procedure_with_id).await
489    }
490
491    /// Submits and executes a drop flow task.
492    #[tracing::instrument(skip_all)]
493    pub async fn submit_drop_flow_task(
494        &self,
495        drop_flow: DropFlowTask,
496    ) -> Result<(ProcedureId, Option<Output>)> {
497        let context = self.create_context();
498        let procedure = DropFlowProcedure::new(drop_flow, context);
499        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
500
501        self.execute_procedure_and_wait(procedure_with_id).await
502    }
503
504    /// Submits and executes a drop view task.
505    #[tracing::instrument(skip_all)]
506    pub async fn submit_drop_view_task(
507        &self,
508        drop_view: DropViewTask,
509    ) -> Result<(ProcedureId, Option<Output>)> {
510        let context = self.create_context();
511        let procedure = DropViewProcedure::new(drop_view, context);
512        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
513
514        self.execute_procedure_and_wait(procedure_with_id).await
515    }
516
517    /// Submits and executes a truncate table task.
518    #[tracing::instrument(skip_all)]
519    pub async fn submit_truncate_table_task(
520        &self,
521        truncate_table_task: TruncateTableTask,
522        table_info_value: DeserializedValueWithBytes<TableInfoValue>,
523    ) -> Result<(ProcedureId, Option<Output>)> {
524        let context = self.create_context();
525        let procedure = TruncateTableProcedure::new(truncate_table_task, table_info_value, context);
526
527        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
528
529        self.execute_procedure_and_wait(procedure_with_id).await
530    }
531
532    /// Submits and executes a comment on task.
533    #[tracing::instrument(skip_all)]
534    pub async fn submit_comment_on_task(
535        &self,
536        comment_on_task: CommentOnTask,
537    ) -> Result<(ProcedureId, Option<Output>)> {
538        let context = self.create_context();
539        let procedure = CommentOnProcedure::new(comment_on_task, context);
540        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
541
542        self.execute_procedure_and_wait(procedure_with_id).await
543    }
544
545    /// Executes a procedure and waits for the result.
546    async fn execute_procedure_and_wait(
547        &self,
548        procedure_with_id: ProcedureWithId,
549    ) -> Result<(ProcedureId, Option<Output>)> {
550        let procedure_id = procedure_with_id.id;
551
552        let mut watcher = self
553            .procedure_manager
554            .submit(procedure_with_id)
555            .await
556            .context(SubmitProcedureSnafu)?;
557
558        let output = watcher::wait(&mut watcher)
559            .await
560            .context(WaitProcedureSnafu)?;
561
562        Ok((procedure_id, output))
563    }
564
565    /// Submits a procedure and returns the procedure id.
566    async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result<ProcedureId> {
567        let procedure_id = procedure_with_id.id;
568        let _ = self
569            .procedure_manager
570            .submit(procedure_with_id)
571            .await
572            .context(SubmitProcedureSnafu)?;
573
574        Ok(procedure_id)
575    }
576
577    pub async fn submit_ddl_task(
578        &self,
579        ctx: &ExecutorContext,
580        request: SubmitDdlTaskRequest,
581    ) -> Result<SubmitDdlTaskResponse> {
582        let span = ctx
583            .tracing_context
584            .as_ref()
585            .map(TracingContext::from_w3c)
586            .unwrap_or_else(TracingContext::from_current_span)
587            .attach(tracing::info_span!("DdlManager::submit_ddl_task"));
588        let ddl_options = DdlOptions {
589            wait: request.wait,
590            timeout: request.timeout,
591        };
592        async move {
593            debug!("Submitting Ddl task: {:?}", request.task);
594            match request.task {
595                CreateTable(create_table_task) => {
596                    handle_create_table_task(self, create_table_task).await
597                }
598                DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await,
599                AlterTable(alter_table_task) => {
600                    handle_alter_table_task(self, alter_table_task, ddl_options).await
601                }
602                TruncateTable(truncate_table_task) => {
603                    handle_truncate_table_task(self, truncate_table_task).await
604                }
605                CreateLogicalTables(create_table_tasks) => {
606                    handle_create_logical_table_tasks(self, create_table_tasks).await
607                }
608                AlterLogicalTables(alter_table_tasks) => {
609                    handle_alter_logical_table_tasks(self, alter_table_tasks).await
610                }
611                DropLogicalTables(_) => todo!(),
612                CreateDatabase(create_database_task) => {
613                    handle_create_database_task(self, create_database_task).await
614                }
615                DropDatabase(drop_database_task) => {
616                    handle_drop_database_task(self, drop_database_task).await
617                }
618                AlterDatabase(alter_database_task) => {
619                    handle_alter_database_task(self, alter_database_task).await
620                }
621                CreateFlow(create_flow_task) => {
622                    handle_create_flow_task(self, create_flow_task, request.query_context).await
623                }
624                DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
625                CreateView(create_view_task) => {
626                    handle_create_view_task(self, create_view_task).await
627                }
628                DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
629                CommentOn(comment_on_task) => handle_comment_on_task(self, comment_on_task).await,
630                #[cfg(feature = "enterprise")]
631                CreateTrigger(create_trigger_task) => {
632                    handle_create_trigger_task(self, create_trigger_task, request.query_context)
633                        .await
634                }
635                #[cfg(feature = "enterprise")]
636                DropTrigger(drop_trigger_task) => {
637                    handle_drop_trigger_task(self, drop_trigger_task, request.query_context).await
638                }
639            }
640        }
641        .trace(span)
642        .await
643    }
644}
645
646async fn handle_truncate_table_task(
647    ddl_manager: &DdlManager,
648    truncate_table_task: TruncateTableTask,
649) -> Result<SubmitDdlTaskResponse> {
650    let table_id = truncate_table_task.table_id;
651    let table_metadata_manager = &ddl_manager.table_metadata_manager();
652    let table_ref = truncate_table_task.table_ref();
653
654    let table_info_value = table_metadata_manager
655        .table_info_manager()
656        .get(table_id)
657        .await?
658        .with_context(|| TableInfoNotFoundSnafu {
659            table: table_ref.to_string(),
660        })?;
661    let physical_table_id = table_metadata_manager
662        .table_route_manager()
663        .get_physical_table_id(table_id)
664        .await?;
665    ensure!(
666        physical_table_id == table_id,
667        error::UnexpectedSnafu {
668            err_msg: "Truncate table is only supported for physical tables."
669        }
670    );
671
672    let (id, _) = ddl_manager
673        .submit_truncate_table_task(truncate_table_task, table_info_value)
674        .await?;
675
676    info!("Table: {table_id} is truncated via procedure_id {id:?}");
677
678    Ok(SubmitDdlTaskResponse {
679        key: id.to_string().into(),
680        ..Default::default()
681    })
682}
683
684async fn handle_alter_table_task(
685    ddl_manager: &DdlManager,
686    alter_table_task: AlterTableTask,
687    ddl_options: DdlOptions,
688) -> Result<SubmitDdlTaskResponse> {
689    let table_ref = alter_table_task.table_ref();
690
691    let table_id = ddl_manager
692        .table_metadata_manager()
693        .table_name_manager()
694        .get(TableNameKey::new(
695            table_ref.catalog,
696            table_ref.schema,
697            table_ref.table,
698        ))
699        .await?
700        .with_context(|| TableNotFoundSnafu {
701            table_name: table_ref.to_string(),
702        })?
703        .table_id();
704
705    let table_route_value = ddl_manager
706        .table_metadata_manager()
707        .table_route_manager()
708        .table_route_storage()
709        .get(table_id)
710        .await?
711        .context(TableRouteNotFoundSnafu { table_id })?;
712    ensure!(
713        table_route_value.is_physical(),
714        UnexpectedLogicalRouteTableSnafu {
715            err_msg: format!("{:?} is a non-physical TableRouteValue.", table_ref),
716        }
717    );
718
719    let (id, _) = ddl_manager
720        .submit_alter_table_task(table_id, alter_table_task, ddl_options)
721        .await?;
722
723    info!("Table: {table_id} is altered via procedure_id {id:?}");
724
725    Ok(SubmitDdlTaskResponse {
726        key: id.to_string().into(),
727        ..Default::default()
728    })
729}
730
731async fn handle_drop_table_task(
732    ddl_manager: &DdlManager,
733    drop_table_task: DropTableTask,
734) -> Result<SubmitDdlTaskResponse> {
735    let table_id = drop_table_task.table_id;
736    let (id, _) = ddl_manager.submit_drop_table_task(drop_table_task).await?;
737
738    info!("Table: {table_id} is dropped via procedure_id {id:?}");
739
740    Ok(SubmitDdlTaskResponse {
741        key: id.to_string().into(),
742        ..Default::default()
743    })
744}
745
746async fn handle_create_table_task(
747    ddl_manager: &DdlManager,
748    create_table_task: CreateTableTask,
749) -> Result<SubmitDdlTaskResponse> {
750    let (id, output) = ddl_manager
751        .submit_create_table_task(create_table_task)
752        .await?;
753
754    let procedure_id = id.to_string();
755    let output = output.context(ProcedureOutputSnafu {
756        procedure_id: &procedure_id,
757        err_msg: "empty output",
758    })?;
759    let table_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
760        procedure_id: &procedure_id,
761        err_msg: "downcast to `u32`",
762    })?);
763    info!("Table: {table_id} is created via procedure_id {id:?}");
764
765    Ok(SubmitDdlTaskResponse {
766        key: procedure_id.into(),
767        table_ids: vec![table_id],
768    })
769}
770
771async fn handle_create_logical_table_tasks(
772    ddl_manager: &DdlManager,
773    create_table_tasks: Vec<CreateTableTask>,
774) -> Result<SubmitDdlTaskResponse> {
775    ensure!(
776        !create_table_tasks.is_empty(),
777        EmptyDdlTasksSnafu {
778            name: "create logical tables"
779        }
780    );
781    let physical_table_id = utils::check_and_get_physical_table_id(
782        ddl_manager.table_metadata_manager(),
783        &create_table_tasks,
784    )
785    .await?;
786    let num_logical_tables = create_table_tasks.len();
787
788    let (id, output) = ddl_manager
789        .submit_create_logical_table_tasks(create_table_tasks, physical_table_id)
790        .await?;
791
792    info!(
793        "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is created via procedure_id {id:?}"
794    );
795
796    let procedure_id = id.to_string();
797    let output = output.context(ProcedureOutputSnafu {
798        procedure_id: &procedure_id,
799        err_msg: "empty output",
800    })?;
801    let table_ids = output
802        .downcast_ref::<Vec<TableId>>()
803        .context(ProcedureOutputSnafu {
804            procedure_id: &procedure_id,
805            err_msg: "downcast to `Vec<TableId>`",
806        })?
807        .clone();
808
809    Ok(SubmitDdlTaskResponse {
810        key: procedure_id.into(),
811        table_ids,
812    })
813}
814
815async fn handle_create_database_task(
816    ddl_manager: &DdlManager,
817    create_database_task: CreateDatabaseTask,
818) -> Result<SubmitDdlTaskResponse> {
819    let (id, _) = ddl_manager
820        .submit_create_database(create_database_task.clone())
821        .await?;
822
823    let procedure_id = id.to_string();
824    info!(
825        "Database {}.{} is created via procedure_id {id:?}",
826        create_database_task.catalog, create_database_task.schema
827    );
828
829    Ok(SubmitDdlTaskResponse {
830        key: procedure_id.into(),
831        ..Default::default()
832    })
833}
834
835async fn handle_drop_database_task(
836    ddl_manager: &DdlManager,
837    drop_database_task: DropDatabaseTask,
838) -> Result<SubmitDdlTaskResponse> {
839    let (id, _) = ddl_manager
840        .submit_drop_database(drop_database_task.clone())
841        .await?;
842
843    let procedure_id = id.to_string();
844    info!(
845        "Database {}.{} is dropped via procedure_id {id:?}",
846        drop_database_task.catalog, drop_database_task.schema
847    );
848
849    Ok(SubmitDdlTaskResponse {
850        key: procedure_id.into(),
851        ..Default::default()
852    })
853}
854
855async fn handle_alter_database_task(
856    ddl_manager: &DdlManager,
857    alter_database_task: AlterDatabaseTask,
858) -> Result<SubmitDdlTaskResponse> {
859    let (id, _) = ddl_manager
860        .submit_alter_database(alter_database_task.clone())
861        .await?;
862
863    let procedure_id = id.to_string();
864    info!(
865        "Database {}.{} is altered via procedure_id {id:?}",
866        alter_database_task.catalog(),
867        alter_database_task.schema()
868    );
869
870    Ok(SubmitDdlTaskResponse {
871        key: procedure_id.into(),
872        ..Default::default()
873    })
874}
875
876async fn handle_drop_flow_task(
877    ddl_manager: &DdlManager,
878    drop_flow_task: DropFlowTask,
879) -> Result<SubmitDdlTaskResponse> {
880    let (id, _) = ddl_manager
881        .submit_drop_flow_task(drop_flow_task.clone())
882        .await?;
883
884    let procedure_id = id.to_string();
885    info!(
886        "Flow {}.{}({}) is dropped via procedure_id {id:?}",
887        drop_flow_task.catalog_name, drop_flow_task.flow_name, drop_flow_task.flow_id,
888    );
889
890    Ok(SubmitDdlTaskResponse {
891        key: procedure_id.into(),
892        ..Default::default()
893    })
894}
895
896#[cfg(feature = "enterprise")]
897async fn handle_drop_trigger_task(
898    ddl_manager: &DdlManager,
899    drop_trigger_task: DropTriggerTask,
900    query_context: QueryContext,
901) -> Result<SubmitDdlTaskResponse> {
902    let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
903        use crate::error::UnsupportedSnafu;
904
905        return UnsupportedSnafu {
906            operation: "drop trigger",
907        }
908        .fail();
909    };
910
911    m.drop_trigger(
912        drop_trigger_task,
913        ddl_manager.procedure_manager.clone(),
914        ddl_manager.ddl_context.clone(),
915        query_context,
916    )
917    .await
918}
919
920async fn handle_drop_view_task(
921    ddl_manager: &DdlManager,
922    drop_view_task: DropViewTask,
923) -> Result<SubmitDdlTaskResponse> {
924    let (id, _) = ddl_manager
925        .submit_drop_view_task(drop_view_task.clone())
926        .await?;
927
928    let procedure_id = id.to_string();
929    info!(
930        "View {}({}) is dropped via procedure_id {id:?}",
931        drop_view_task.table_ref(),
932        drop_view_task.view_id,
933    );
934
935    Ok(SubmitDdlTaskResponse {
936        key: procedure_id.into(),
937        ..Default::default()
938    })
939}
940
941async fn handle_create_flow_task(
942    ddl_manager: &DdlManager,
943    create_flow_task: CreateFlowTask,
944    query_context: QueryContext,
945) -> Result<SubmitDdlTaskResponse> {
946    let (id, output) = ddl_manager
947        .submit_create_flow_task(create_flow_task.clone(), query_context)
948        .await?;
949
950    let procedure_id = id.to_string();
951    let output = output.context(ProcedureOutputSnafu {
952        procedure_id: &procedure_id,
953        err_msg: "empty output",
954    })?;
955    let flow_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
956        procedure_id: &procedure_id,
957        err_msg: "downcast to `u32`",
958    })?);
959    if !create_flow_task.or_replace {
960        info!(
961            "Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
962            create_flow_task.catalog_name, create_flow_task.flow_name,
963        );
964    } else {
965        info!(
966            "Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
967            create_flow_task.catalog_name, create_flow_task.flow_name,
968        );
969    }
970
971    Ok(SubmitDdlTaskResponse {
972        key: procedure_id.into(),
973        ..Default::default()
974    })
975}
976
977#[cfg(feature = "enterprise")]
978async fn handle_create_trigger_task(
979    ddl_manager: &DdlManager,
980    create_trigger_task: CreateTriggerTask,
981    query_context: QueryContext,
982) -> Result<SubmitDdlTaskResponse> {
983    let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
984        use crate::error::UnsupportedSnafu;
985
986        return UnsupportedSnafu {
987            operation: "create trigger",
988        }
989        .fail();
990    };
991
992    m.create_trigger(
993        create_trigger_task,
994        ddl_manager.procedure_manager.clone(),
995        ddl_manager.ddl_context.clone(),
996        query_context,
997    )
998    .await
999}
1000
1001async fn handle_alter_logical_table_tasks(
1002    ddl_manager: &DdlManager,
1003    alter_table_tasks: Vec<AlterTableTask>,
1004) -> Result<SubmitDdlTaskResponse> {
1005    ensure!(
1006        !alter_table_tasks.is_empty(),
1007        EmptyDdlTasksSnafu {
1008            name: "alter logical tables"
1009        }
1010    );
1011
1012    // Use the physical table id in the first logical table, then it will be checked in the procedure.
1013    let first_table = TableNameKey {
1014        catalog: &alter_table_tasks[0].alter_table.catalog_name,
1015        schema: &alter_table_tasks[0].alter_table.schema_name,
1016        table: &alter_table_tasks[0].alter_table.table_name,
1017    };
1018    let physical_table_id =
1019        utils::get_physical_table_id(ddl_manager.table_metadata_manager(), first_table).await?;
1020    let num_logical_tables = alter_table_tasks.len();
1021
1022    let (id, _) = ddl_manager
1023        .submit_alter_logical_table_tasks(alter_table_tasks, physical_table_id)
1024        .await?;
1025
1026    info!(
1027        "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is altered via procedure_id {id:?}"
1028    );
1029
1030    let procedure_id = id.to_string();
1031
1032    Ok(SubmitDdlTaskResponse {
1033        key: procedure_id.into(),
1034        ..Default::default()
1035    })
1036}
1037
1038/// Handle the `[CreateViewTask]` and returns the DDL response when success.
1039async fn handle_create_view_task(
1040    ddl_manager: &DdlManager,
1041    create_view_task: CreateViewTask,
1042) -> Result<SubmitDdlTaskResponse> {
1043    let (id, output) = ddl_manager
1044        .submit_create_view_task(create_view_task)
1045        .await?;
1046
1047    let procedure_id = id.to_string();
1048    let output = output.context(ProcedureOutputSnafu {
1049        procedure_id: &procedure_id,
1050        err_msg: "empty output",
1051    })?;
1052    let view_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
1053        procedure_id: &procedure_id,
1054        err_msg: "downcast to `u32`",
1055    })?);
1056    info!("View: {view_id} is created via procedure_id {id:?}");
1057
1058    Ok(SubmitDdlTaskResponse {
1059        key: procedure_id.into(),
1060        table_ids: vec![view_id],
1061    })
1062}
1063
1064async fn handle_comment_on_task(
1065    ddl_manager: &DdlManager,
1066    comment_on_task: CommentOnTask,
1067) -> Result<SubmitDdlTaskResponse> {
1068    let (id, _) = ddl_manager
1069        .submit_comment_on_task(comment_on_task.clone())
1070        .await?;
1071
1072    let procedure_id = id.to_string();
1073    info!(
1074        "Comment on {}.{}.{} is updated via procedure_id {id:?}",
1075        comment_on_task.catalog_name, comment_on_task.schema_name, comment_on_task.object_name
1076    );
1077
1078    Ok(SubmitDdlTaskResponse {
1079        key: procedure_id.into(),
1080        ..Default::default()
1081    })
1082}
1083
1084#[cfg(test)]
1085mod tests {
1086    use std::sync::Arc;
1087    use std::time::Duration;
1088
1089    use common_error::ext::BoxedError;
1090    use common_procedure::local::LocalManager;
1091    use common_procedure::test_util::InMemoryPoisonStore;
1092    use common_procedure::{BoxedProcedure, ProcedureManagerRef};
1093    use store_api::storage::TableId;
1094    use table::table_name::TableName;
1095
1096    use super::DdlManager;
1097    use crate::cache_invalidator::DummyCacheInvalidator;
1098    use crate::ddl::alter_table::AlterTableProcedure;
1099    use crate::ddl::create_table::CreateTableProcedure;
1100    use crate::ddl::drop_table::DropTableProcedure;
1101    use crate::ddl::flow_meta::FlowMetadataAllocator;
1102    use crate::ddl::table_meta::TableMetadataAllocator;
1103    use crate::ddl::truncate_table::TruncateTableProcedure;
1104    use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
1105    use crate::ddl_manager::RepartitionProcedureFactory;
1106    use crate::key::TableMetadataManager;
1107    use crate::key::flow::FlowMetadataManager;
1108    use crate::kv_backend::memory::MemoryKvBackend;
1109    use crate::node_manager::{DatanodeManager, DatanodeRef, FlownodeManager, FlownodeRef};
1110    use crate::peer::Peer;
1111    use crate::region_keeper::MemoryRegionKeeper;
1112    use crate::region_registry::LeaderRegionRegistry;
1113    use crate::sequence::SequenceBuilder;
1114    use crate::state_store::KvStateStore;
1115    use crate::wal_provider::WalProvider;
1116
1117    /// A dummy implemented [NodeManager].
1118    pub struct DummyDatanodeManager;
1119
1120    #[async_trait::async_trait]
1121    impl DatanodeManager for DummyDatanodeManager {
1122        async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
1123            unimplemented!()
1124        }
1125    }
1126
1127    #[async_trait::async_trait]
1128    impl FlownodeManager for DummyDatanodeManager {
1129        async fn flownode(&self, _node: &Peer) -> FlownodeRef {
1130            unimplemented!()
1131        }
1132    }
1133
1134    struct DummyRepartitionProcedureFactory;
1135
1136    #[async_trait::async_trait]
1137    impl RepartitionProcedureFactory for DummyRepartitionProcedureFactory {
1138        fn create(
1139            &self,
1140            _ddl_ctx: &DdlContext,
1141            _table_name: TableName,
1142            _table_id: TableId,
1143            _from_exprs: Vec<String>,
1144            _to_exprs: Vec<String>,
1145            _timeout: Option<Duration>,
1146        ) -> std::result::Result<BoxedProcedure, BoxedError> {
1147            unimplemented!()
1148        }
1149
1150        fn register_loaders(
1151            &self,
1152            _ddl_ctx: &DdlContext,
1153            _procedure_manager: &ProcedureManagerRef,
1154        ) -> std::result::Result<(), BoxedError> {
1155            Ok(())
1156        }
1157    }
1158
1159    #[test]
1160    fn test_try_new() {
1161        let kv_backend = Arc::new(MemoryKvBackend::new());
1162        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
1163        let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
1164            Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
1165            Arc::new(WalProvider::default()),
1166        ));
1167        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
1168        let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
1169            Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()),
1170        ));
1171
1172        let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
1173        let poison_manager = Arc::new(InMemoryPoisonStore::default());
1174        let procedure_manager = Arc::new(LocalManager::new(
1175            Default::default(),
1176            state_store,
1177            poison_manager,
1178            None,
1179            None,
1180        ));
1181
1182        let _ = DdlManager::try_new(
1183            DdlContext {
1184                node_manager: Arc::new(DummyDatanodeManager),
1185                cache_invalidator: Arc::new(DummyCacheInvalidator),
1186                table_metadata_manager,
1187                table_metadata_allocator,
1188                flow_metadata_manager,
1189                flow_metadata_allocator,
1190                memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
1191                leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
1192                region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
1193            },
1194            procedure_manager.clone(),
1195            Arc::new(DummyRepartitionProcedureFactory),
1196            true,
1197        );
1198
1199        let expected_loaders = vec![
1200            CreateTableProcedure::TYPE_NAME,
1201            AlterTableProcedure::TYPE_NAME,
1202            DropTableProcedure::TYPE_NAME,
1203            TruncateTableProcedure::TYPE_NAME,
1204        ];
1205
1206        for loader in expected_loaders {
1207            assert!(procedure_manager.contains_loader(loader));
1208        }
1209    }
1210}