common_meta/
ddl_manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::Repartition;
19use api::v1::alter_table_expr::Kind;
20use common_error::ext::BoxedError;
21use common_procedure::{
22    BoxedProcedure, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef,
23    ProcedureWithId, watcher,
24};
25use common_telemetry::tracing_context::{FutureExt, TracingContext};
26use common_telemetry::{debug, info, tracing};
27use derive_builder::Builder;
28use snafu::{OptionExt, ResultExt, ensure};
29use store_api::storage::TableId;
30use table::table_name::TableName;
31
32use crate::ddl::alter_database::AlterDatabaseProcedure;
33use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
34use crate::ddl::alter_table::AlterTableProcedure;
35use crate::ddl::comment_on::CommentOnProcedure;
36use crate::ddl::create_database::CreateDatabaseProcedure;
37use crate::ddl::create_flow::CreateFlowProcedure;
38use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
39use crate::ddl::create_table::CreateTableProcedure;
40use crate::ddl::create_view::CreateViewProcedure;
41use crate::ddl::drop_database::DropDatabaseProcedure;
42use crate::ddl::drop_flow::DropFlowProcedure;
43use crate::ddl::drop_table::DropTableProcedure;
44use crate::ddl::drop_view::DropViewProcedure;
45use crate::ddl::truncate_table::TruncateTableProcedure;
46use crate::ddl::{DdlContext, utils};
47use crate::error::{
48    CreateRepartitionProcedureSnafu, EmptyDdlTasksSnafu, ProcedureOutputSnafu,
49    RegisterProcedureLoaderSnafu, RegisterRepartitionProcedureLoaderSnafu, Result,
50    SubmitProcedureSnafu, TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
51    UnexpectedLogicalRouteTableSnafu, WaitProcedureSnafu,
52};
53use crate::key::table_info::TableInfoValue;
54use crate::key::table_name::TableNameKey;
55use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
56use crate::procedure_executor::ExecutorContext;
57#[cfg(feature = "enterprise")]
58use crate::rpc::ddl::DdlTask::CreateTrigger;
59#[cfg(feature = "enterprise")]
60use crate::rpc::ddl::DdlTask::DropTrigger;
61use crate::rpc::ddl::DdlTask::{
62    AlterDatabase, AlterLogicalTables, AlterTable, CommentOn, CreateDatabase, CreateFlow,
63    CreateLogicalTables, CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables,
64    DropTable, DropView, TruncateTable,
65};
66#[cfg(feature = "enterprise")]
67use crate::rpc::ddl::trigger::CreateTriggerTask;
68#[cfg(feature = "enterprise")]
69use crate::rpc::ddl::trigger::DropTriggerTask;
70use crate::rpc::ddl::{
71    AlterDatabaseTask, AlterTableTask, CommentOnTask, CreateDatabaseTask, CreateFlowTask,
72    CreateTableTask, CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask,
73    QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
74};
75use crate::rpc::router::RegionRoute;
76
77/// A configurator that customizes or enhances a [`DdlManager`].
78#[async_trait::async_trait]
79pub trait DdlManagerConfigurator<C>: Send + Sync {
80    /// Configures the given [`DdlManager`] using the provided [`DdlManagerConfigureContext`].
81    async fn configure(
82        &self,
83        ddl_manager: DdlManager,
84        ctx: C,
85    ) -> std::result::Result<DdlManager, BoxedError>;
86}
87
88pub type DdlManagerConfiguratorRef<C> = Arc<dyn DdlManagerConfigurator<C>>;
89
90pub type DdlManagerRef = Arc<DdlManager>;
91
92pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader;
93
94/// The [DdlManager] provides the ability to execute Ddl.
95#[derive(Builder)]
96pub struct DdlManager {
97    ddl_context: DdlContext,
98    procedure_manager: ProcedureManagerRef,
99    repartition_procedure_factory: RepartitionProcedureFactoryRef,
100    #[cfg(feature = "enterprise")]
101    trigger_ddl_manager: Option<TriggerDdlManagerRef>,
102}
103
104/// This trait is responsible for handling DDL tasks about triggers. e.g.,
105/// create trigger, drop trigger, etc.
106#[cfg(feature = "enterprise")]
107#[async_trait::async_trait]
108pub trait TriggerDdlManager: Send + Sync {
109    async fn create_trigger(
110        &self,
111        create_trigger_task: CreateTriggerTask,
112        procedure_manager: ProcedureManagerRef,
113        ddl_context: DdlContext,
114        query_context: QueryContext,
115    ) -> Result<SubmitDdlTaskResponse>;
116
117    async fn drop_trigger(
118        &self,
119        drop_trigger_task: DropTriggerTask,
120        procedure_manager: ProcedureManagerRef,
121        ddl_context: DdlContext,
122        query_context: QueryContext,
123    ) -> Result<SubmitDdlTaskResponse>;
124
125    fn as_any(&self) -> &dyn std::any::Any;
126}
127
128#[cfg(feature = "enterprise")]
129pub type TriggerDdlManagerRef = Arc<dyn TriggerDdlManager>;
130
131macro_rules! procedure_loader_entry {
132    ($procedure:ident) => {
133        (
134            $procedure::TYPE_NAME,
135            &|context: DdlContext| -> BoxedProcedureLoader {
136                Box::new(move |json: &str| {
137                    let context = context.clone();
138                    $procedure::from_json(json, context).map(|p| Box::new(p) as _)
139                })
140            },
141        )
142    };
143}
144
145macro_rules! procedure_loader {
146    ($($procedure:ident),*) => {
147        vec![
148            $(procedure_loader_entry!($procedure)),*
149        ]
150    };
151}
152
153pub type RepartitionProcedureFactoryRef = Arc<dyn RepartitionProcedureFactory>;
154
155pub trait RepartitionProcedureFactory: Send + Sync {
156    fn create(
157        &self,
158        ddl_ctx: &DdlContext,
159        table_name: TableName,
160        table_id: TableId,
161        from_exprs: Vec<String>,
162        to_exprs: Vec<String>,
163        timeout: Option<Duration>,
164    ) -> std::result::Result<BoxedProcedure, BoxedError>;
165
166    fn register_loaders(
167        &self,
168        ddl_ctx: &DdlContext,
169        procedure_manager: &ProcedureManagerRef,
170    ) -> std::result::Result<(), BoxedError>;
171}
172
173/// The options for DDL tasks.
174///
175/// Note: These options may not be utilized by all procedures.
176/// At present, they are specifically applied in `RepartitionProcedure`.
177#[derive(Debug, Clone, Copy)]
178pub struct DdlOptions {
179    /// The timeout will be passed to the procedure.
180    ///
181    /// Note: Each procedure may implement its own timeout handling mechanism.
182    pub timeout: Duration,
183    /// The flag that controls whether to wait for the procedure to complete.
184    ///
185    /// If wait is `true`, the procedure will wait for completion(success or failure) and the result will be returned.
186    /// Otherwise, the procedure will be submitted and return the [ProcedureId](common_procedure::ProcedureId) immediately.
187    ///
188    /// Note: The value of `wait` is independent of the `timeout` option. If a procedure ignores the `timeout` and `wait` is set to true, the operation returns until the procedure completes.
189    pub wait: bool,
190}
191
192impl DdlManager {
193    /// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered.
194    pub fn try_new(
195        ddl_context: DdlContext,
196        procedure_manager: ProcedureManagerRef,
197        repartition_procedure_factory: RepartitionProcedureFactoryRef,
198        register_loaders: bool,
199    ) -> Result<Self> {
200        let manager = Self {
201            ddl_context,
202            procedure_manager,
203            repartition_procedure_factory,
204            #[cfg(feature = "enterprise")]
205            trigger_ddl_manager: None,
206        };
207        if register_loaders {
208            manager.register_loaders()?;
209        }
210        Ok(manager)
211    }
212
213    #[cfg(feature = "enterprise")]
214    pub fn with_trigger_ddl_manager(mut self, trigger_ddl_manager: TriggerDdlManagerRef) -> Self {
215        self.trigger_ddl_manager = Some(trigger_ddl_manager);
216        self
217    }
218
219    /// Returns the [TableMetadataManagerRef].
220    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
221        &self.ddl_context.table_metadata_manager
222    }
223
224    /// Returns the [DdlContext]
225    pub fn create_context(&self) -> DdlContext {
226        self.ddl_context.clone()
227    }
228
229    /// Registers all Ddl loaders.
230    pub fn register_loaders(&self) -> Result<()> {
231        let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = procedure_loader!(
232            CreateTableProcedure,
233            CreateLogicalTablesProcedure,
234            CreateViewProcedure,
235            CreateFlowProcedure,
236            AlterTableProcedure,
237            AlterLogicalTablesProcedure,
238            AlterDatabaseProcedure,
239            DropTableProcedure,
240            DropFlowProcedure,
241            TruncateTableProcedure,
242            CreateDatabaseProcedure,
243            DropDatabaseProcedure,
244            DropViewProcedure,
245            CommentOnProcedure
246        );
247
248        for (type_name, loader_factory) in loaders {
249            let context = self.create_context();
250            self.procedure_manager
251                .register_loader(type_name, loader_factory(context))
252                .context(RegisterProcedureLoaderSnafu { type_name })?;
253        }
254
255        self.repartition_procedure_factory
256            .register_loaders(&self.ddl_context, &self.procedure_manager)
257            .context(RegisterRepartitionProcedureLoaderSnafu)?;
258
259        Ok(())
260    }
261
262    /// Submits a repartition procedure for the specified table.
263    ///
264    /// This creates a repartition procedure using the provided `table_id`,
265    /// `table_name`, and `Repartition` configuration, and then either executes it
266    /// to completion or just submits it for asynchronous execution.
267    ///
268    /// The `Repartition` argument contains the original (`from_partition_exprs`)
269    /// and target (`into_partition_exprs`) partition expressions that define how
270    /// the table should be repartitioned.
271    ///
272    /// The `wait` flag controls whether this method waits for the repartition
273    /// procedure to finish:
274    /// - If `wait` is `true`, the procedure is executed and this method awaits
275    ///   its completion, returning both the generated `ProcedureId` and the
276    ///   final `Output` of the procedure.
277    /// - If `wait` is `false`, the procedure is only submitted to the procedure
278    ///   manager for asynchronous execution, and this method returns the
279    ///   `ProcedureId` along with `None` as the output.
280    async fn submit_repartition_task(
281        &self,
282        table_id: TableId,
283        table_name: TableName,
284        Repartition {
285            from_partition_exprs,
286            into_partition_exprs,
287        }: Repartition,
288        wait: bool,
289        timeout: Duration,
290    ) -> Result<(ProcedureId, Option<Output>)> {
291        let context = self.create_context();
292
293        let procedure = self
294            .repartition_procedure_factory
295            .create(
296                &context,
297                table_name,
298                table_id,
299                from_partition_exprs,
300                into_partition_exprs,
301                Some(timeout),
302            )
303            .context(CreateRepartitionProcedureSnafu)?;
304        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
305        if wait {
306            self.execute_procedure_and_wait(procedure_with_id).await
307        } else {
308            self.submit_procedure(procedure_with_id)
309                .await
310                .map(|p| (p, None))
311        }
312    }
313
314    /// Submits and executes an alter table task.
315    #[tracing::instrument(skip_all)]
316    pub async fn submit_alter_table_task(
317        &self,
318        table_id: TableId,
319        alter_table_task: AlterTableTask,
320        ddl_options: DdlOptions,
321    ) -> Result<(ProcedureId, Option<Output>)> {
322        // make alter_table_task mutable so we can call .take() on its field
323        let mut alter_table_task = alter_table_task;
324        if let Some(Kind::Repartition(_)) = alter_table_task.alter_table.kind.as_ref()
325            && let Kind::Repartition(repartition) =
326                alter_table_task.alter_table.kind.take().unwrap()
327        {
328            let table_name = TableName::new(
329                alter_table_task.alter_table.catalog_name,
330                alter_table_task.alter_table.schema_name,
331                alter_table_task.alter_table.table_name,
332            );
333            return self
334                .submit_repartition_task(
335                    table_id,
336                    table_name,
337                    repartition,
338                    ddl_options.wait,
339                    ddl_options.timeout,
340                )
341                .await;
342        }
343
344        let context = self.create_context();
345        let procedure = AlterTableProcedure::new(table_id, alter_table_task, context)?;
346
347        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
348
349        self.execute_procedure_and_wait(procedure_with_id).await
350    }
351
352    /// Submits and executes a create table task.
353    #[tracing::instrument(skip_all)]
354    pub async fn submit_create_table_task(
355        &self,
356        create_table_task: CreateTableTask,
357    ) -> Result<(ProcedureId, Option<Output>)> {
358        let context = self.create_context();
359
360        let procedure = CreateTableProcedure::new(create_table_task, context)?;
361
362        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
363
364        self.execute_procedure_and_wait(procedure_with_id).await
365    }
366
367    /// Submits and executes a `[CreateViewTask]`.
368    #[tracing::instrument(skip_all)]
369    pub async fn submit_create_view_task(
370        &self,
371        create_view_task: CreateViewTask,
372    ) -> Result<(ProcedureId, Option<Output>)> {
373        let context = self.create_context();
374
375        let procedure = CreateViewProcedure::new(create_view_task, context);
376
377        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
378
379        self.execute_procedure_and_wait(procedure_with_id).await
380    }
381
382    /// Submits and executes a create multiple logical table tasks.
383    #[tracing::instrument(skip_all)]
384    pub async fn submit_create_logical_table_tasks(
385        &self,
386        create_table_tasks: Vec<CreateTableTask>,
387        physical_table_id: TableId,
388    ) -> Result<(ProcedureId, Option<Output>)> {
389        let context = self.create_context();
390
391        let procedure =
392            CreateLogicalTablesProcedure::new(create_table_tasks, physical_table_id, context);
393
394        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
395
396        self.execute_procedure_and_wait(procedure_with_id).await
397    }
398
399    /// Submits and executes alter multiple table tasks.
400    #[tracing::instrument(skip_all)]
401    pub async fn submit_alter_logical_table_tasks(
402        &self,
403        alter_table_tasks: Vec<AlterTableTask>,
404        physical_table_id: TableId,
405    ) -> Result<(ProcedureId, Option<Output>)> {
406        let context = self.create_context();
407
408        let procedure =
409            AlterLogicalTablesProcedure::new(alter_table_tasks, physical_table_id, context);
410
411        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
412
413        self.execute_procedure_and_wait(procedure_with_id).await
414    }
415
416    /// Submits and executes a drop table task.
417    #[tracing::instrument(skip_all)]
418    pub async fn submit_drop_table_task(
419        &self,
420        drop_table_task: DropTableTask,
421    ) -> Result<(ProcedureId, Option<Output>)> {
422        let context = self.create_context();
423
424        let procedure = DropTableProcedure::new(drop_table_task, context);
425
426        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
427
428        self.execute_procedure_and_wait(procedure_with_id).await
429    }
430
431    /// Submits and executes a create database task.
432    #[tracing::instrument(skip_all)]
433    pub async fn submit_create_database(
434        &self,
435        CreateDatabaseTask {
436            catalog,
437            schema,
438            create_if_not_exists,
439            options,
440        }: CreateDatabaseTask,
441    ) -> Result<(ProcedureId, Option<Output>)> {
442        let context = self.create_context();
443        let procedure =
444            CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context);
445        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
446
447        self.execute_procedure_and_wait(procedure_with_id).await
448    }
449
450    /// Submits and executes a drop table task.
451    #[tracing::instrument(skip_all)]
452    pub async fn submit_drop_database(
453        &self,
454        DropDatabaseTask {
455            catalog,
456            schema,
457            drop_if_exists,
458        }: DropDatabaseTask,
459    ) -> Result<(ProcedureId, Option<Output>)> {
460        let context = self.create_context();
461        let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context);
462        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
463
464        self.execute_procedure_and_wait(procedure_with_id).await
465    }
466
467    pub async fn submit_alter_database(
468        &self,
469        alter_database_task: AlterDatabaseTask,
470    ) -> Result<(ProcedureId, Option<Output>)> {
471        let context = self.create_context();
472        let procedure = AlterDatabaseProcedure::new(alter_database_task, context)?;
473        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
474
475        self.execute_procedure_and_wait(procedure_with_id).await
476    }
477
478    /// Submits and executes a create flow task.
479    #[tracing::instrument(skip_all)]
480    pub async fn submit_create_flow_task(
481        &self,
482        create_flow: CreateFlowTask,
483        query_context: QueryContext,
484    ) -> Result<(ProcedureId, Option<Output>)> {
485        let context = self.create_context();
486        let procedure = CreateFlowProcedure::new(create_flow, query_context, context);
487        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
488
489        self.execute_procedure_and_wait(procedure_with_id).await
490    }
491
492    /// Submits and executes a drop flow task.
493    #[tracing::instrument(skip_all)]
494    pub async fn submit_drop_flow_task(
495        &self,
496        drop_flow: DropFlowTask,
497    ) -> Result<(ProcedureId, Option<Output>)> {
498        let context = self.create_context();
499        let procedure = DropFlowProcedure::new(drop_flow, context);
500        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
501
502        self.execute_procedure_and_wait(procedure_with_id).await
503    }
504
505    /// Submits and executes a drop view task.
506    #[tracing::instrument(skip_all)]
507    pub async fn submit_drop_view_task(
508        &self,
509        drop_view: DropViewTask,
510    ) -> Result<(ProcedureId, Option<Output>)> {
511        let context = self.create_context();
512        let procedure = DropViewProcedure::new(drop_view, context);
513        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
514
515        self.execute_procedure_and_wait(procedure_with_id).await
516    }
517
518    /// Submits and executes a truncate table task.
519    #[tracing::instrument(skip_all)]
520    pub async fn submit_truncate_table_task(
521        &self,
522        truncate_table_task: TruncateTableTask,
523        table_info_value: DeserializedValueWithBytes<TableInfoValue>,
524        region_routes: Vec<RegionRoute>,
525    ) -> Result<(ProcedureId, Option<Output>)> {
526        let context = self.create_context();
527        let procedure = TruncateTableProcedure::new(
528            truncate_table_task,
529            table_info_value,
530            region_routes,
531            context,
532        );
533
534        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
535
536        self.execute_procedure_and_wait(procedure_with_id).await
537    }
538
539    /// Submits and executes a comment on task.
540    #[tracing::instrument(skip_all)]
541    pub async fn submit_comment_on_task(
542        &self,
543        comment_on_task: CommentOnTask,
544    ) -> Result<(ProcedureId, Option<Output>)> {
545        let context = self.create_context();
546        let procedure = CommentOnProcedure::new(comment_on_task, context);
547        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
548
549        self.execute_procedure_and_wait(procedure_with_id).await
550    }
551
552    /// Executes a procedure and waits for the result.
553    async fn execute_procedure_and_wait(
554        &self,
555        procedure_with_id: ProcedureWithId,
556    ) -> Result<(ProcedureId, Option<Output>)> {
557        let procedure_id = procedure_with_id.id;
558
559        let mut watcher = self
560            .procedure_manager
561            .submit(procedure_with_id)
562            .await
563            .context(SubmitProcedureSnafu)?;
564
565        let output = watcher::wait(&mut watcher)
566            .await
567            .context(WaitProcedureSnafu)?;
568
569        Ok((procedure_id, output))
570    }
571
572    /// Submits a procedure and returns the procedure id.
573    async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result<ProcedureId> {
574        let procedure_id = procedure_with_id.id;
575        let _ = self
576            .procedure_manager
577            .submit(procedure_with_id)
578            .await
579            .context(SubmitProcedureSnafu)?;
580
581        Ok(procedure_id)
582    }
583
584    pub async fn submit_ddl_task(
585        &self,
586        ctx: &ExecutorContext,
587        request: SubmitDdlTaskRequest,
588    ) -> Result<SubmitDdlTaskResponse> {
589        let span = ctx
590            .tracing_context
591            .as_ref()
592            .map(TracingContext::from_w3c)
593            .unwrap_or_else(TracingContext::from_current_span)
594            .attach(tracing::info_span!("DdlManager::submit_ddl_task"));
595        let ddl_options = DdlOptions {
596            wait: request.wait,
597            timeout: request.timeout,
598        };
599        async move {
600            debug!("Submitting Ddl task: {:?}", request.task);
601            match request.task {
602                CreateTable(create_table_task) => {
603                    handle_create_table_task(self, create_table_task).await
604                }
605                DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await,
606                AlterTable(alter_table_task) => {
607                    handle_alter_table_task(self, alter_table_task, ddl_options).await
608                }
609                TruncateTable(truncate_table_task) => {
610                    handle_truncate_table_task(self, truncate_table_task).await
611                }
612                CreateLogicalTables(create_table_tasks) => {
613                    handle_create_logical_table_tasks(self, create_table_tasks).await
614                }
615                AlterLogicalTables(alter_table_tasks) => {
616                    handle_alter_logical_table_tasks(self, alter_table_tasks).await
617                }
618                DropLogicalTables(_) => todo!(),
619                CreateDatabase(create_database_task) => {
620                    handle_create_database_task(self, create_database_task).await
621                }
622                DropDatabase(drop_database_task) => {
623                    handle_drop_database_task(self, drop_database_task).await
624                }
625                AlterDatabase(alter_database_task) => {
626                    handle_alter_database_task(self, alter_database_task).await
627                }
628                CreateFlow(create_flow_task) => {
629                    handle_create_flow_task(self, create_flow_task, request.query_context.into())
630                        .await
631                }
632                DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
633                CreateView(create_view_task) => {
634                    handle_create_view_task(self, create_view_task).await
635                }
636                DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
637                CommentOn(comment_on_task) => handle_comment_on_task(self, comment_on_task).await,
638                #[cfg(feature = "enterprise")]
639                CreateTrigger(create_trigger_task) => {
640                    handle_create_trigger_task(
641                        self,
642                        create_trigger_task,
643                        request.query_context.into(),
644                    )
645                    .await
646                }
647                #[cfg(feature = "enterprise")]
648                DropTrigger(drop_trigger_task) => {
649                    handle_drop_trigger_task(self, drop_trigger_task, request.query_context.into())
650                        .await
651                }
652            }
653        }
654        .trace(span)
655        .await
656    }
657}
658
659async fn handle_truncate_table_task(
660    ddl_manager: &DdlManager,
661    truncate_table_task: TruncateTableTask,
662) -> Result<SubmitDdlTaskResponse> {
663    let table_id = truncate_table_task.table_id;
664    let table_metadata_manager = &ddl_manager.table_metadata_manager();
665    let table_ref = truncate_table_task.table_ref();
666
667    let (table_info_value, table_route_value) =
668        table_metadata_manager.get_full_table_info(table_id).await?;
669
670    let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu {
671        table: table_ref.to_string(),
672    })?;
673
674    let table_route_value = table_route_value.context(TableRouteNotFoundSnafu { table_id })?;
675
676    let table_route = table_route_value.into_inner().region_routes()?.clone();
677
678    let (id, _) = ddl_manager
679        .submit_truncate_table_task(truncate_table_task, table_info_value, table_route)
680        .await?;
681
682    info!("Table: {table_id} is truncated via procedure_id {id:?}");
683
684    Ok(SubmitDdlTaskResponse {
685        key: id.to_string().into(),
686        ..Default::default()
687    })
688}
689
690async fn handle_alter_table_task(
691    ddl_manager: &DdlManager,
692    alter_table_task: AlterTableTask,
693    ddl_options: DdlOptions,
694) -> Result<SubmitDdlTaskResponse> {
695    let table_ref = alter_table_task.table_ref();
696
697    let table_id = ddl_manager
698        .table_metadata_manager()
699        .table_name_manager()
700        .get(TableNameKey::new(
701            table_ref.catalog,
702            table_ref.schema,
703            table_ref.table,
704        ))
705        .await?
706        .with_context(|| TableNotFoundSnafu {
707            table_name: table_ref.to_string(),
708        })?
709        .table_id();
710
711    let table_route_value = ddl_manager
712        .table_metadata_manager()
713        .table_route_manager()
714        .table_route_storage()
715        .get(table_id)
716        .await?
717        .context(TableRouteNotFoundSnafu { table_id })?;
718    ensure!(
719        table_route_value.is_physical(),
720        UnexpectedLogicalRouteTableSnafu {
721            err_msg: format!("{:?} is a non-physical TableRouteValue.", table_ref),
722        }
723    );
724
725    let (id, _) = ddl_manager
726        .submit_alter_table_task(table_id, alter_table_task, ddl_options)
727        .await?;
728
729    info!("Table: {table_id} is altered via procedure_id {id:?}");
730
731    Ok(SubmitDdlTaskResponse {
732        key: id.to_string().into(),
733        ..Default::default()
734    })
735}
736
737async fn handle_drop_table_task(
738    ddl_manager: &DdlManager,
739    drop_table_task: DropTableTask,
740) -> Result<SubmitDdlTaskResponse> {
741    let table_id = drop_table_task.table_id;
742    let (id, _) = ddl_manager.submit_drop_table_task(drop_table_task).await?;
743
744    info!("Table: {table_id} is dropped via procedure_id {id:?}");
745
746    Ok(SubmitDdlTaskResponse {
747        key: id.to_string().into(),
748        ..Default::default()
749    })
750}
751
752async fn handle_create_table_task(
753    ddl_manager: &DdlManager,
754    create_table_task: CreateTableTask,
755) -> Result<SubmitDdlTaskResponse> {
756    let (id, output) = ddl_manager
757        .submit_create_table_task(create_table_task)
758        .await?;
759
760    let procedure_id = id.to_string();
761    let output = output.context(ProcedureOutputSnafu {
762        procedure_id: &procedure_id,
763        err_msg: "empty output",
764    })?;
765    let table_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
766        procedure_id: &procedure_id,
767        err_msg: "downcast to `u32`",
768    })?);
769    info!("Table: {table_id} is created via procedure_id {id:?}");
770
771    Ok(SubmitDdlTaskResponse {
772        key: procedure_id.into(),
773        table_ids: vec![table_id],
774    })
775}
776
777async fn handle_create_logical_table_tasks(
778    ddl_manager: &DdlManager,
779    create_table_tasks: Vec<CreateTableTask>,
780) -> Result<SubmitDdlTaskResponse> {
781    ensure!(
782        !create_table_tasks.is_empty(),
783        EmptyDdlTasksSnafu {
784            name: "create logical tables"
785        }
786    );
787    let physical_table_id = utils::check_and_get_physical_table_id(
788        ddl_manager.table_metadata_manager(),
789        &create_table_tasks,
790    )
791    .await?;
792    let num_logical_tables = create_table_tasks.len();
793
794    let (id, output) = ddl_manager
795        .submit_create_logical_table_tasks(create_table_tasks, physical_table_id)
796        .await?;
797
798    info!(
799        "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is created via procedure_id {id:?}"
800    );
801
802    let procedure_id = id.to_string();
803    let output = output.context(ProcedureOutputSnafu {
804        procedure_id: &procedure_id,
805        err_msg: "empty output",
806    })?;
807    let table_ids = output
808        .downcast_ref::<Vec<TableId>>()
809        .context(ProcedureOutputSnafu {
810            procedure_id: &procedure_id,
811            err_msg: "downcast to `Vec<TableId>`",
812        })?
813        .clone();
814
815    Ok(SubmitDdlTaskResponse {
816        key: procedure_id.into(),
817        table_ids,
818    })
819}
820
821async fn handle_create_database_task(
822    ddl_manager: &DdlManager,
823    create_database_task: CreateDatabaseTask,
824) -> Result<SubmitDdlTaskResponse> {
825    let (id, _) = ddl_manager
826        .submit_create_database(create_database_task.clone())
827        .await?;
828
829    let procedure_id = id.to_string();
830    info!(
831        "Database {}.{} is created via procedure_id {id:?}",
832        create_database_task.catalog, create_database_task.schema
833    );
834
835    Ok(SubmitDdlTaskResponse {
836        key: procedure_id.into(),
837        ..Default::default()
838    })
839}
840
841async fn handle_drop_database_task(
842    ddl_manager: &DdlManager,
843    drop_database_task: DropDatabaseTask,
844) -> Result<SubmitDdlTaskResponse> {
845    let (id, _) = ddl_manager
846        .submit_drop_database(drop_database_task.clone())
847        .await?;
848
849    let procedure_id = id.to_string();
850    info!(
851        "Database {}.{} is dropped via procedure_id {id:?}",
852        drop_database_task.catalog, drop_database_task.schema
853    );
854
855    Ok(SubmitDdlTaskResponse {
856        key: procedure_id.into(),
857        ..Default::default()
858    })
859}
860
861async fn handle_alter_database_task(
862    ddl_manager: &DdlManager,
863    alter_database_task: AlterDatabaseTask,
864) -> Result<SubmitDdlTaskResponse> {
865    let (id, _) = ddl_manager
866        .submit_alter_database(alter_database_task.clone())
867        .await?;
868
869    let procedure_id = id.to_string();
870    info!(
871        "Database {}.{} is altered via procedure_id {id:?}",
872        alter_database_task.catalog(),
873        alter_database_task.schema()
874    );
875
876    Ok(SubmitDdlTaskResponse {
877        key: procedure_id.into(),
878        ..Default::default()
879    })
880}
881
882async fn handle_drop_flow_task(
883    ddl_manager: &DdlManager,
884    drop_flow_task: DropFlowTask,
885) -> Result<SubmitDdlTaskResponse> {
886    let (id, _) = ddl_manager
887        .submit_drop_flow_task(drop_flow_task.clone())
888        .await?;
889
890    let procedure_id = id.to_string();
891    info!(
892        "Flow {}.{}({}) is dropped via procedure_id {id:?}",
893        drop_flow_task.catalog_name, drop_flow_task.flow_name, drop_flow_task.flow_id,
894    );
895
896    Ok(SubmitDdlTaskResponse {
897        key: procedure_id.into(),
898        ..Default::default()
899    })
900}
901
902#[cfg(feature = "enterprise")]
903async fn handle_drop_trigger_task(
904    ddl_manager: &DdlManager,
905    drop_trigger_task: DropTriggerTask,
906    query_context: QueryContext,
907) -> Result<SubmitDdlTaskResponse> {
908    let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
909        use crate::error::UnsupportedSnafu;
910
911        return UnsupportedSnafu {
912            operation: "drop trigger",
913        }
914        .fail();
915    };
916
917    m.drop_trigger(
918        drop_trigger_task,
919        ddl_manager.procedure_manager.clone(),
920        ddl_manager.ddl_context.clone(),
921        query_context,
922    )
923    .await
924}
925
926async fn handle_drop_view_task(
927    ddl_manager: &DdlManager,
928    drop_view_task: DropViewTask,
929) -> Result<SubmitDdlTaskResponse> {
930    let (id, _) = ddl_manager
931        .submit_drop_view_task(drop_view_task.clone())
932        .await?;
933
934    let procedure_id = id.to_string();
935    info!(
936        "View {}({}) is dropped via procedure_id {id:?}",
937        drop_view_task.table_ref(),
938        drop_view_task.view_id,
939    );
940
941    Ok(SubmitDdlTaskResponse {
942        key: procedure_id.into(),
943        ..Default::default()
944    })
945}
946
947async fn handle_create_flow_task(
948    ddl_manager: &DdlManager,
949    create_flow_task: CreateFlowTask,
950    query_context: QueryContext,
951) -> Result<SubmitDdlTaskResponse> {
952    let (id, output) = ddl_manager
953        .submit_create_flow_task(create_flow_task.clone(), query_context)
954        .await?;
955
956    let procedure_id = id.to_string();
957    let output = output.context(ProcedureOutputSnafu {
958        procedure_id: &procedure_id,
959        err_msg: "empty output",
960    })?;
961    let flow_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
962        procedure_id: &procedure_id,
963        err_msg: "downcast to `u32`",
964    })?);
965    if !create_flow_task.or_replace {
966        info!(
967            "Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
968            create_flow_task.catalog_name, create_flow_task.flow_name,
969        );
970    } else {
971        info!(
972            "Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
973            create_flow_task.catalog_name, create_flow_task.flow_name,
974        );
975    }
976
977    Ok(SubmitDdlTaskResponse {
978        key: procedure_id.into(),
979        ..Default::default()
980    })
981}
982
983#[cfg(feature = "enterprise")]
984async fn handle_create_trigger_task(
985    ddl_manager: &DdlManager,
986    create_trigger_task: CreateTriggerTask,
987    query_context: QueryContext,
988) -> Result<SubmitDdlTaskResponse> {
989    let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
990        use crate::error::UnsupportedSnafu;
991
992        return UnsupportedSnafu {
993            operation: "create trigger",
994        }
995        .fail();
996    };
997
998    m.create_trigger(
999        create_trigger_task,
1000        ddl_manager.procedure_manager.clone(),
1001        ddl_manager.ddl_context.clone(),
1002        query_context,
1003    )
1004    .await
1005}
1006
1007async fn handle_alter_logical_table_tasks(
1008    ddl_manager: &DdlManager,
1009    alter_table_tasks: Vec<AlterTableTask>,
1010) -> Result<SubmitDdlTaskResponse> {
1011    ensure!(
1012        !alter_table_tasks.is_empty(),
1013        EmptyDdlTasksSnafu {
1014            name: "alter logical tables"
1015        }
1016    );
1017
1018    // Use the physical table id in the first logical table, then it will be checked in the procedure.
1019    let first_table = TableNameKey {
1020        catalog: &alter_table_tasks[0].alter_table.catalog_name,
1021        schema: &alter_table_tasks[0].alter_table.schema_name,
1022        table: &alter_table_tasks[0].alter_table.table_name,
1023    };
1024    let physical_table_id =
1025        utils::get_physical_table_id(ddl_manager.table_metadata_manager(), first_table).await?;
1026    let num_logical_tables = alter_table_tasks.len();
1027
1028    let (id, _) = ddl_manager
1029        .submit_alter_logical_table_tasks(alter_table_tasks, physical_table_id)
1030        .await?;
1031
1032    info!(
1033        "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is altered via procedure_id {id:?}"
1034    );
1035
1036    let procedure_id = id.to_string();
1037
1038    Ok(SubmitDdlTaskResponse {
1039        key: procedure_id.into(),
1040        ..Default::default()
1041    })
1042}
1043
1044/// Handle the `[CreateViewTask]` and returns the DDL response when success.
1045async fn handle_create_view_task(
1046    ddl_manager: &DdlManager,
1047    create_view_task: CreateViewTask,
1048) -> Result<SubmitDdlTaskResponse> {
1049    let (id, output) = ddl_manager
1050        .submit_create_view_task(create_view_task)
1051        .await?;
1052
1053    let procedure_id = id.to_string();
1054    let output = output.context(ProcedureOutputSnafu {
1055        procedure_id: &procedure_id,
1056        err_msg: "empty output",
1057    })?;
1058    let view_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
1059        procedure_id: &procedure_id,
1060        err_msg: "downcast to `u32`",
1061    })?);
1062    info!("View: {view_id} is created via procedure_id {id:?}");
1063
1064    Ok(SubmitDdlTaskResponse {
1065        key: procedure_id.into(),
1066        table_ids: vec![view_id],
1067    })
1068}
1069
1070async fn handle_comment_on_task(
1071    ddl_manager: &DdlManager,
1072    comment_on_task: CommentOnTask,
1073) -> Result<SubmitDdlTaskResponse> {
1074    let (id, _) = ddl_manager
1075        .submit_comment_on_task(comment_on_task.clone())
1076        .await?;
1077
1078    let procedure_id = id.to_string();
1079    info!(
1080        "Comment on {}.{}.{} is updated via procedure_id {id:?}",
1081        comment_on_task.catalog_name, comment_on_task.schema_name, comment_on_task.object_name
1082    );
1083
1084    Ok(SubmitDdlTaskResponse {
1085        key: procedure_id.into(),
1086        ..Default::default()
1087    })
1088}
1089
1090#[cfg(test)]
1091mod tests {
1092    use std::sync::Arc;
1093    use std::time::Duration;
1094
1095    use common_error::ext::BoxedError;
1096    use common_procedure::local::LocalManager;
1097    use common_procedure::test_util::InMemoryPoisonStore;
1098    use common_procedure::{BoxedProcedure, ProcedureManagerRef};
1099    use store_api::storage::TableId;
1100    use table::table_name::TableName;
1101
1102    use super::DdlManager;
1103    use crate::cache_invalidator::DummyCacheInvalidator;
1104    use crate::ddl::alter_table::AlterTableProcedure;
1105    use crate::ddl::create_table::CreateTableProcedure;
1106    use crate::ddl::drop_table::DropTableProcedure;
1107    use crate::ddl::flow_meta::FlowMetadataAllocator;
1108    use crate::ddl::table_meta::TableMetadataAllocator;
1109    use crate::ddl::truncate_table::TruncateTableProcedure;
1110    use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
1111    use crate::ddl_manager::RepartitionProcedureFactory;
1112    use crate::key::TableMetadataManager;
1113    use crate::key::flow::FlowMetadataManager;
1114    use crate::kv_backend::memory::MemoryKvBackend;
1115    use crate::node_manager::{DatanodeManager, DatanodeRef, FlownodeManager, FlownodeRef};
1116    use crate::peer::Peer;
1117    use crate::region_keeper::MemoryRegionKeeper;
1118    use crate::region_registry::LeaderRegionRegistry;
1119    use crate::sequence::SequenceBuilder;
1120    use crate::state_store::KvStateStore;
1121    use crate::wal_provider::WalProvider;
1122
1123    /// A dummy implemented [NodeManager].
1124    pub struct DummyDatanodeManager;
1125
1126    #[async_trait::async_trait]
1127    impl DatanodeManager for DummyDatanodeManager {
1128        async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
1129            unimplemented!()
1130        }
1131    }
1132
1133    #[async_trait::async_trait]
1134    impl FlownodeManager for DummyDatanodeManager {
1135        async fn flownode(&self, _node: &Peer) -> FlownodeRef {
1136            unimplemented!()
1137        }
1138    }
1139
1140    struct DummyRepartitionProcedureFactory;
1141
1142    #[async_trait::async_trait]
1143    impl RepartitionProcedureFactory for DummyRepartitionProcedureFactory {
1144        fn create(
1145            &self,
1146            _ddl_ctx: &DdlContext,
1147            _table_name: TableName,
1148            _table_id: TableId,
1149            _from_exprs: Vec<String>,
1150            _to_exprs: Vec<String>,
1151            _timeout: Option<Duration>,
1152        ) -> std::result::Result<BoxedProcedure, BoxedError> {
1153            unimplemented!()
1154        }
1155
1156        fn register_loaders(
1157            &self,
1158            _ddl_ctx: &DdlContext,
1159            _procedure_manager: &ProcedureManagerRef,
1160        ) -> std::result::Result<(), BoxedError> {
1161            Ok(())
1162        }
1163    }
1164
1165    #[test]
1166    fn test_try_new() {
1167        let kv_backend = Arc::new(MemoryKvBackend::new());
1168        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
1169        let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
1170            Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
1171            Arc::new(WalProvider::default()),
1172        ));
1173        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
1174        let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
1175            Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()),
1176        ));
1177
1178        let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
1179        let poison_manager = Arc::new(InMemoryPoisonStore::default());
1180        let procedure_manager = Arc::new(LocalManager::new(
1181            Default::default(),
1182            state_store,
1183            poison_manager,
1184            None,
1185            None,
1186        ));
1187
1188        let _ = DdlManager::try_new(
1189            DdlContext {
1190                node_manager: Arc::new(DummyDatanodeManager),
1191                cache_invalidator: Arc::new(DummyCacheInvalidator),
1192                table_metadata_manager,
1193                table_metadata_allocator,
1194                flow_metadata_manager,
1195                flow_metadata_allocator,
1196                memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
1197                leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
1198                region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
1199            },
1200            procedure_manager.clone(),
1201            Arc::new(DummyRepartitionProcedureFactory),
1202            true,
1203        );
1204
1205        let expected_loaders = vec![
1206            CreateTableProcedure::TYPE_NAME,
1207            AlterTableProcedure::TYPE_NAME,
1208            DropTableProcedure::TYPE_NAME,
1209            TruncateTableProcedure::TYPE_NAME,
1210        ];
1211
1212        for loader in expected_loaders {
1213            assert!(procedure_manager.contains_loader(loader));
1214        }
1215    }
1216}