common_meta/
ddl_manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::Repartition;
19use api::v1::alter_table_expr::Kind;
20use common_error::ext::BoxedError;
21use common_procedure::{
22    BoxedProcedure, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef,
23    ProcedureWithId, watcher,
24};
25use common_telemetry::tracing_context::{FutureExt, TracingContext};
26use common_telemetry::{debug, info, tracing};
27use derive_builder::Builder;
28use snafu::{OptionExt, ResultExt, ensure};
29use store_api::storage::TableId;
30use table::table_name::TableName;
31
32use crate::ddl::alter_database::AlterDatabaseProcedure;
33use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
34use crate::ddl::alter_table::AlterTableProcedure;
35use crate::ddl::comment_on::CommentOnProcedure;
36use crate::ddl::create_database::CreateDatabaseProcedure;
37use crate::ddl::create_flow::CreateFlowProcedure;
38use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
39use crate::ddl::create_table::CreateTableProcedure;
40use crate::ddl::create_view::CreateViewProcedure;
41use crate::ddl::drop_database::DropDatabaseProcedure;
42use crate::ddl::drop_flow::DropFlowProcedure;
43use crate::ddl::drop_table::DropTableProcedure;
44use crate::ddl::drop_view::DropViewProcedure;
45use crate::ddl::truncate_table::TruncateTableProcedure;
46use crate::ddl::{DdlContext, utils};
47use crate::error::{
48    CreateRepartitionProcedureSnafu, EmptyDdlTasksSnafu, ProcedureOutputSnafu,
49    RegisterProcedureLoaderSnafu, RegisterRepartitionProcedureLoaderSnafu, Result,
50    SubmitProcedureSnafu, TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
51    UnexpectedLogicalRouteTableSnafu, WaitProcedureSnafu,
52};
53use crate::key::table_info::TableInfoValue;
54use crate::key::table_name::TableNameKey;
55use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
56use crate::procedure_executor::ExecutorContext;
57#[cfg(feature = "enterprise")]
58use crate::rpc::ddl::DdlTask::CreateTrigger;
59#[cfg(feature = "enterprise")]
60use crate::rpc::ddl::DdlTask::DropTrigger;
61use crate::rpc::ddl::DdlTask::{
62    AlterDatabase, AlterLogicalTables, AlterTable, CommentOn, CreateDatabase, CreateFlow,
63    CreateLogicalTables, CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables,
64    DropTable, DropView, TruncateTable,
65};
66#[cfg(feature = "enterprise")]
67use crate::rpc::ddl::trigger::CreateTriggerTask;
68#[cfg(feature = "enterprise")]
69use crate::rpc::ddl::trigger::DropTriggerTask;
70use crate::rpc::ddl::{
71    AlterDatabaseTask, AlterTableTask, CommentOnTask, CreateDatabaseTask, CreateFlowTask,
72    CreateTableTask, CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask,
73    QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
74};
75use crate::rpc::router::RegionRoute;
76
77/// A configurator that customizes or enhances a [`DdlManager`].
78#[async_trait::async_trait]
79pub trait DdlManagerConfigurator<C>: Send + Sync {
80    /// Configures the given [`DdlManager`] using the provided [`DdlManagerConfigureContext`].
81    async fn configure(
82        &self,
83        ddl_manager: DdlManager,
84        ctx: C,
85    ) -> std::result::Result<DdlManager, BoxedError>;
86}
87
88pub type DdlManagerConfiguratorRef<C> = Arc<dyn DdlManagerConfigurator<C>>;
89
90pub type DdlManagerRef = Arc<DdlManager>;
91
92pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader;
93
94/// The [DdlManager] provides the ability to execute Ddl.
95#[derive(Builder)]
96pub struct DdlManager {
97    ddl_context: DdlContext,
98    procedure_manager: ProcedureManagerRef,
99    repartition_procedure_factory: RepartitionProcedureFactoryRef,
100    #[cfg(feature = "enterprise")]
101    trigger_ddl_manager: Option<TriggerDdlManagerRef>,
102}
103
104/// This trait is responsible for handling DDL tasks about triggers. e.g.,
105/// create trigger, drop trigger, etc.
106#[cfg(feature = "enterprise")]
107#[async_trait::async_trait]
108pub trait TriggerDdlManager: Send + Sync {
109    async fn create_trigger(
110        &self,
111        create_trigger_task: CreateTriggerTask,
112        procedure_manager: ProcedureManagerRef,
113        ddl_context: DdlContext,
114        query_context: QueryContext,
115    ) -> Result<SubmitDdlTaskResponse>;
116
117    async fn drop_trigger(
118        &self,
119        drop_trigger_task: DropTriggerTask,
120        procedure_manager: ProcedureManagerRef,
121        ddl_context: DdlContext,
122        query_context: QueryContext,
123    ) -> Result<SubmitDdlTaskResponse>;
124
125    fn as_any(&self) -> &dyn std::any::Any;
126}
127
128#[cfg(feature = "enterprise")]
129pub type TriggerDdlManagerRef = Arc<dyn TriggerDdlManager>;
130
131macro_rules! procedure_loader_entry {
132    ($procedure:ident) => {
133        (
134            $procedure::TYPE_NAME,
135            &|context: DdlContext| -> BoxedProcedureLoader {
136                Box::new(move |json: &str| {
137                    let context = context.clone();
138                    $procedure::from_json(json, context).map(|p| Box::new(p) as _)
139                })
140            },
141        )
142    };
143}
144
145macro_rules! procedure_loader {
146    ($($procedure:ident),*) => {
147        vec![
148            $(procedure_loader_entry!($procedure)),*
149        ]
150    };
151}
152
153pub type RepartitionProcedureFactoryRef = Arc<dyn RepartitionProcedureFactory>;
154
155pub trait RepartitionProcedureFactory: Send + Sync {
156    fn create(
157        &self,
158        ddl_ctx: &DdlContext,
159        table_name: TableName,
160        table_id: TableId,
161        from_exprs: Vec<String>,
162        to_exprs: Vec<String>,
163        timeout: Option<Duration>,
164    ) -> std::result::Result<BoxedProcedure, BoxedError>;
165
166    fn register_loaders(
167        &self,
168        ddl_ctx: &DdlContext,
169        procedure_manager: &ProcedureManagerRef,
170    ) -> std::result::Result<(), BoxedError>;
171}
172
173/// The options for DDL tasks.
174///
175/// Note: These options may not be utilized by all procedures.
176/// At present, they are specifically applied in `RepartitionProcedure`.
177#[derive(Debug, Clone, Copy)]
178pub struct DdlOptions {
179    /// The timeout will be passed to the procedure.
180    ///
181    /// Note: Each procedure may implement its own timeout handling mechanism.
182    pub timeout: Duration,
183    /// The flag that controls whether to wait for the procedure to complete.
184    ///
185    /// If wait is `true`, the procedure will wait for completion(success or failure) and the result will be returned.
186    /// Otherwise, the procedure will be submitted and return the [ProcedureId](common_procedure::ProcedureId) immediately.
187    ///
188    /// Note: The value of `wait` is independent of the `timeout` option. If a procedure ignores the `timeout` and `wait` is set to true, the operation returns until the procedure completes.
189    pub wait: bool,
190}
191
192impl DdlManager {
193    /// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered.
194    pub fn try_new(
195        ddl_context: DdlContext,
196        procedure_manager: ProcedureManagerRef,
197        repartition_procedure_factory: RepartitionProcedureFactoryRef,
198        register_loaders: bool,
199    ) -> Result<Self> {
200        let manager = Self {
201            ddl_context,
202            procedure_manager,
203            repartition_procedure_factory,
204            #[cfg(feature = "enterprise")]
205            trigger_ddl_manager: None,
206        };
207        if register_loaders {
208            manager.register_loaders()?;
209        }
210        Ok(manager)
211    }
212
213    #[cfg(feature = "enterprise")]
214    pub fn with_trigger_ddl_manager(mut self, trigger_ddl_manager: TriggerDdlManagerRef) -> Self {
215        self.trigger_ddl_manager = Some(trigger_ddl_manager);
216        self
217    }
218
219    /// Returns the [TableMetadataManagerRef].
220    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
221        &self.ddl_context.table_metadata_manager
222    }
223
224    /// Returns the [DdlContext]
225    pub fn create_context(&self) -> DdlContext {
226        self.ddl_context.clone()
227    }
228
229    /// Registers all Ddl loaders.
230    pub fn register_loaders(&self) -> Result<()> {
231        let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = procedure_loader!(
232            CreateTableProcedure,
233            CreateLogicalTablesProcedure,
234            CreateViewProcedure,
235            CreateFlowProcedure,
236            AlterTableProcedure,
237            AlterLogicalTablesProcedure,
238            AlterDatabaseProcedure,
239            DropTableProcedure,
240            DropFlowProcedure,
241            TruncateTableProcedure,
242            CreateDatabaseProcedure,
243            DropDatabaseProcedure,
244            DropViewProcedure,
245            CommentOnProcedure
246        );
247
248        for (type_name, loader_factory) in loaders {
249            let context = self.create_context();
250            self.procedure_manager
251                .register_loader(type_name, loader_factory(context))
252                .context(RegisterProcedureLoaderSnafu { type_name })?;
253        }
254
255        self.repartition_procedure_factory
256            .register_loaders(&self.ddl_context, &self.procedure_manager)
257            .context(RegisterRepartitionProcedureLoaderSnafu)?;
258
259        Ok(())
260    }
261
262    /// Submits a repartition procedure for the specified table.
263    ///
264    /// This creates a repartition procedure using the provided `table_id`,
265    /// `table_name`, and `Repartition` configuration, and then either executes it
266    /// to completion or just submits it for asynchronous execution.
267    ///
268    /// The `Repartition` argument contains the original (`from_partition_exprs`)
269    /// and target (`into_partition_exprs`) partition expressions that define how
270    /// the table should be repartitioned.
271    ///
272    /// The `wait` flag controls whether this method waits for the repartition
273    /// procedure to finish:
274    /// - If `wait` is `true`, the procedure is executed and this method awaits
275    ///   its completion, returning both the generated `ProcedureId` and the
276    ///   final `Output` of the procedure.
277    /// - If `wait` is `false`, the procedure is only submitted to the procedure
278    ///   manager for asynchronous execution, and this method returns the
279    ///   `ProcedureId` along with `None` as the output.
280    async fn submit_repartition_task(
281        &self,
282        table_id: TableId,
283        table_name: TableName,
284        Repartition {
285            from_partition_exprs,
286            into_partition_exprs,
287        }: Repartition,
288        wait: bool,
289        timeout: Duration,
290    ) -> Result<(ProcedureId, Option<Output>)> {
291        let context = self.create_context();
292
293        let procedure = self
294            .repartition_procedure_factory
295            .create(
296                &context,
297                table_name,
298                table_id,
299                from_partition_exprs,
300                into_partition_exprs,
301                Some(timeout),
302            )
303            .context(CreateRepartitionProcedureSnafu)?;
304        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
305        if wait {
306            self.execute_procedure_and_wait(procedure_with_id).await
307        } else {
308            self.submit_procedure(procedure_with_id)
309                .await
310                .map(|p| (p, None))
311        }
312    }
313
314    /// Submits and executes an alter table task.
315    #[tracing::instrument(skip_all)]
316    pub async fn submit_alter_table_task(
317        &self,
318        table_id: TableId,
319        alter_table_task: AlterTableTask,
320        ddl_options: DdlOptions,
321    ) -> Result<(ProcedureId, Option<Output>)> {
322        // make alter_table_task mutable so we can call .take() on its field
323        let mut alter_table_task = alter_table_task;
324        if let Some(Kind::Repartition(_)) = alter_table_task.alter_table.kind.as_ref()
325            && let Kind::Repartition(repartition) =
326                alter_table_task.alter_table.kind.take().unwrap()
327        {
328            let table_name = TableName::new(
329                alter_table_task.alter_table.catalog_name,
330                alter_table_task.alter_table.schema_name,
331                alter_table_task.alter_table.table_name,
332            );
333            return self
334                .submit_repartition_task(
335                    table_id,
336                    table_name,
337                    repartition,
338                    ddl_options.wait,
339                    ddl_options.timeout,
340                )
341                .await;
342        }
343
344        let context = self.create_context();
345        let procedure = AlterTableProcedure::new(table_id, alter_table_task, context)?;
346
347        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
348
349        self.execute_procedure_and_wait(procedure_with_id).await
350    }
351
352    /// Submits and executes a create table task.
353    #[tracing::instrument(skip_all)]
354    pub async fn submit_create_table_task(
355        &self,
356        create_table_task: CreateTableTask,
357    ) -> Result<(ProcedureId, Option<Output>)> {
358        let context = self.create_context();
359
360        let procedure = CreateTableProcedure::new(create_table_task, context)?;
361
362        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
363
364        self.execute_procedure_and_wait(procedure_with_id).await
365    }
366
367    /// Submits and executes a `[CreateViewTask]`.
368    #[tracing::instrument(skip_all)]
369    pub async fn submit_create_view_task(
370        &self,
371        create_view_task: CreateViewTask,
372    ) -> Result<(ProcedureId, Option<Output>)> {
373        let context = self.create_context();
374
375        let procedure = CreateViewProcedure::new(create_view_task, context);
376
377        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
378
379        self.execute_procedure_and_wait(procedure_with_id).await
380    }
381
382    /// Submits and executes a create multiple logical table tasks.
383    #[tracing::instrument(skip_all)]
384    pub async fn submit_create_logical_table_tasks(
385        &self,
386        create_table_tasks: Vec<CreateTableTask>,
387        physical_table_id: TableId,
388    ) -> Result<(ProcedureId, Option<Output>)> {
389        let context = self.create_context();
390
391        let procedure =
392            CreateLogicalTablesProcedure::new(create_table_tasks, physical_table_id, context);
393
394        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
395
396        self.execute_procedure_and_wait(procedure_with_id).await
397    }
398
399    /// Submits and executes alter multiple table tasks.
400    #[tracing::instrument(skip_all)]
401    pub async fn submit_alter_logical_table_tasks(
402        &self,
403        alter_table_tasks: Vec<AlterTableTask>,
404        physical_table_id: TableId,
405    ) -> Result<(ProcedureId, Option<Output>)> {
406        let context = self.create_context();
407
408        let procedure =
409            AlterLogicalTablesProcedure::new(alter_table_tasks, physical_table_id, context);
410
411        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
412
413        self.execute_procedure_and_wait(procedure_with_id).await
414    }
415
416    /// Submits and executes a drop table task.
417    #[tracing::instrument(skip_all)]
418    pub async fn submit_drop_table_task(
419        &self,
420        drop_table_task: DropTableTask,
421    ) -> Result<(ProcedureId, Option<Output>)> {
422        let context = self.create_context();
423
424        let procedure = DropTableProcedure::new(drop_table_task, context);
425
426        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
427
428        self.execute_procedure_and_wait(procedure_with_id).await
429    }
430
431    /// Submits and executes a create database task.
432    #[tracing::instrument(skip_all)]
433    pub async fn submit_create_database(
434        &self,
435        CreateDatabaseTask {
436            catalog,
437            schema,
438            create_if_not_exists,
439            options,
440        }: CreateDatabaseTask,
441    ) -> Result<(ProcedureId, Option<Output>)> {
442        let context = self.create_context();
443        let procedure =
444            CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context);
445        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
446
447        self.execute_procedure_and_wait(procedure_with_id).await
448    }
449
450    /// Submits and executes a drop table task.
451    #[tracing::instrument(skip_all)]
452    pub async fn submit_drop_database(
453        &self,
454        DropDatabaseTask {
455            catalog,
456            schema,
457            drop_if_exists,
458        }: DropDatabaseTask,
459    ) -> Result<(ProcedureId, Option<Output>)> {
460        let context = self.create_context();
461        let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context);
462        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
463
464        self.execute_procedure_and_wait(procedure_with_id).await
465    }
466
467    pub async fn submit_alter_database(
468        &self,
469        alter_database_task: AlterDatabaseTask,
470    ) -> Result<(ProcedureId, Option<Output>)> {
471        let context = self.create_context();
472        let procedure = AlterDatabaseProcedure::new(alter_database_task, context)?;
473        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
474
475        self.execute_procedure_and_wait(procedure_with_id).await
476    }
477
478    /// Submits and executes a create flow task.
479    #[tracing::instrument(skip_all)]
480    pub async fn submit_create_flow_task(
481        &self,
482        create_flow: CreateFlowTask,
483        query_context: QueryContext,
484    ) -> Result<(ProcedureId, Option<Output>)> {
485        let context = self.create_context();
486        let procedure = CreateFlowProcedure::new(create_flow, query_context, context);
487        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
488
489        self.execute_procedure_and_wait(procedure_with_id).await
490    }
491
492    /// Submits and executes a drop flow task.
493    #[tracing::instrument(skip_all)]
494    pub async fn submit_drop_flow_task(
495        &self,
496        drop_flow: DropFlowTask,
497    ) -> Result<(ProcedureId, Option<Output>)> {
498        let context = self.create_context();
499        let procedure = DropFlowProcedure::new(drop_flow, context);
500        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
501
502        self.execute_procedure_and_wait(procedure_with_id).await
503    }
504
505    /// Submits and executes a drop view task.
506    #[tracing::instrument(skip_all)]
507    pub async fn submit_drop_view_task(
508        &self,
509        drop_view: DropViewTask,
510    ) -> Result<(ProcedureId, Option<Output>)> {
511        let context = self.create_context();
512        let procedure = DropViewProcedure::new(drop_view, context);
513        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
514
515        self.execute_procedure_and_wait(procedure_with_id).await
516    }
517
518    /// Submits and executes a truncate table task.
519    #[tracing::instrument(skip_all)]
520    pub async fn submit_truncate_table_task(
521        &self,
522        truncate_table_task: TruncateTableTask,
523        table_info_value: DeserializedValueWithBytes<TableInfoValue>,
524        region_routes: Vec<RegionRoute>,
525    ) -> Result<(ProcedureId, Option<Output>)> {
526        let context = self.create_context();
527        let procedure = TruncateTableProcedure::new(
528            truncate_table_task,
529            table_info_value,
530            region_routes,
531            context,
532        );
533
534        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
535
536        self.execute_procedure_and_wait(procedure_with_id).await
537    }
538
539    /// Submits and executes a comment on task.
540    #[tracing::instrument(skip_all)]
541    pub async fn submit_comment_on_task(
542        &self,
543        comment_on_task: CommentOnTask,
544    ) -> Result<(ProcedureId, Option<Output>)> {
545        let context = self.create_context();
546        let procedure = CommentOnProcedure::new(comment_on_task, context);
547        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
548
549        self.execute_procedure_and_wait(procedure_with_id).await
550    }
551
552    /// Executes a procedure and waits for the result.
553    async fn execute_procedure_and_wait(
554        &self,
555        procedure_with_id: ProcedureWithId,
556    ) -> Result<(ProcedureId, Option<Output>)> {
557        let procedure_id = procedure_with_id.id;
558
559        let mut watcher = self
560            .procedure_manager
561            .submit(procedure_with_id)
562            .await
563            .context(SubmitProcedureSnafu)?;
564
565        let output = watcher::wait(&mut watcher)
566            .await
567            .context(WaitProcedureSnafu)?;
568
569        Ok((procedure_id, output))
570    }
571
572    /// Submits a procedure and returns the procedure id.
573    async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result<ProcedureId> {
574        let procedure_id = procedure_with_id.id;
575        let _ = self
576            .procedure_manager
577            .submit(procedure_with_id)
578            .await
579            .context(SubmitProcedureSnafu)?;
580
581        Ok(procedure_id)
582    }
583
584    pub async fn submit_ddl_task(
585        &self,
586        ctx: &ExecutorContext,
587        request: SubmitDdlTaskRequest,
588    ) -> Result<SubmitDdlTaskResponse> {
589        let span = ctx
590            .tracing_context
591            .as_ref()
592            .map(TracingContext::from_w3c)
593            .unwrap_or_else(TracingContext::from_current_span)
594            .attach(tracing::info_span!("DdlManager::submit_ddl_task"));
595        let ddl_options = DdlOptions {
596            wait: request.wait,
597            timeout: request.timeout,
598        };
599        async move {
600            debug!("Submitting Ddl task: {:?}", request.task);
601            match request.task {
602                CreateTable(create_table_task) => {
603                    handle_create_table_task(self, create_table_task).await
604                }
605                DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await,
606                AlterTable(alter_table_task) => {
607                    handle_alter_table_task(self, alter_table_task, ddl_options).await
608                }
609                TruncateTable(truncate_table_task) => {
610                    handle_truncate_table_task(self, truncate_table_task).await
611                }
612                CreateLogicalTables(create_table_tasks) => {
613                    handle_create_logical_table_tasks(self, create_table_tasks).await
614                }
615                AlterLogicalTables(alter_table_tasks) => {
616                    handle_alter_logical_table_tasks(self, alter_table_tasks).await
617                }
618                DropLogicalTables(_) => todo!(),
619                CreateDatabase(create_database_task) => {
620                    handle_create_database_task(self, create_database_task).await
621                }
622                DropDatabase(drop_database_task) => {
623                    handle_drop_database_task(self, drop_database_task).await
624                }
625                AlterDatabase(alter_database_task) => {
626                    handle_alter_database_task(self, alter_database_task).await
627                }
628                CreateFlow(create_flow_task) => {
629                    handle_create_flow_task(self, create_flow_task, request.query_context).await
630                }
631                DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
632                CreateView(create_view_task) => {
633                    handle_create_view_task(self, create_view_task).await
634                }
635                DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
636                CommentOn(comment_on_task) => handle_comment_on_task(self, comment_on_task).await,
637                #[cfg(feature = "enterprise")]
638                CreateTrigger(create_trigger_task) => {
639                    handle_create_trigger_task(self, create_trigger_task, request.query_context)
640                        .await
641                }
642                #[cfg(feature = "enterprise")]
643                DropTrigger(drop_trigger_task) => {
644                    handle_drop_trigger_task(self, drop_trigger_task, request.query_context).await
645                }
646            }
647        }
648        .trace(span)
649        .await
650    }
651}
652
653async fn handle_truncate_table_task(
654    ddl_manager: &DdlManager,
655    truncate_table_task: TruncateTableTask,
656) -> Result<SubmitDdlTaskResponse> {
657    let table_id = truncate_table_task.table_id;
658    let table_metadata_manager = &ddl_manager.table_metadata_manager();
659    let table_ref = truncate_table_task.table_ref();
660
661    let (table_info_value, table_route_value) =
662        table_metadata_manager.get_full_table_info(table_id).await?;
663
664    let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu {
665        table: table_ref.to_string(),
666    })?;
667
668    let table_route_value = table_route_value.context(TableRouteNotFoundSnafu { table_id })?;
669
670    let table_route = table_route_value.into_inner().region_routes()?.clone();
671
672    let (id, _) = ddl_manager
673        .submit_truncate_table_task(truncate_table_task, table_info_value, table_route)
674        .await?;
675
676    info!("Table: {table_id} is truncated via procedure_id {id:?}");
677
678    Ok(SubmitDdlTaskResponse {
679        key: id.to_string().into(),
680        ..Default::default()
681    })
682}
683
684async fn handle_alter_table_task(
685    ddl_manager: &DdlManager,
686    alter_table_task: AlterTableTask,
687    ddl_options: DdlOptions,
688) -> Result<SubmitDdlTaskResponse> {
689    let table_ref = alter_table_task.table_ref();
690
691    let table_id = ddl_manager
692        .table_metadata_manager()
693        .table_name_manager()
694        .get(TableNameKey::new(
695            table_ref.catalog,
696            table_ref.schema,
697            table_ref.table,
698        ))
699        .await?
700        .with_context(|| TableNotFoundSnafu {
701            table_name: table_ref.to_string(),
702        })?
703        .table_id();
704
705    let table_route_value = ddl_manager
706        .table_metadata_manager()
707        .table_route_manager()
708        .table_route_storage()
709        .get(table_id)
710        .await?
711        .context(TableRouteNotFoundSnafu { table_id })?;
712    ensure!(
713        table_route_value.is_physical(),
714        UnexpectedLogicalRouteTableSnafu {
715            err_msg: format!("{:?} is a non-physical TableRouteValue.", table_ref),
716        }
717    );
718
719    let (id, _) = ddl_manager
720        .submit_alter_table_task(table_id, alter_table_task, ddl_options)
721        .await?;
722
723    info!("Table: {table_id} is altered via procedure_id {id:?}");
724
725    Ok(SubmitDdlTaskResponse {
726        key: id.to_string().into(),
727        ..Default::default()
728    })
729}
730
731async fn handle_drop_table_task(
732    ddl_manager: &DdlManager,
733    drop_table_task: DropTableTask,
734) -> Result<SubmitDdlTaskResponse> {
735    let table_id = drop_table_task.table_id;
736    let (id, _) = ddl_manager.submit_drop_table_task(drop_table_task).await?;
737
738    info!("Table: {table_id} is dropped via procedure_id {id:?}");
739
740    Ok(SubmitDdlTaskResponse {
741        key: id.to_string().into(),
742        ..Default::default()
743    })
744}
745
746async fn handle_create_table_task(
747    ddl_manager: &DdlManager,
748    create_table_task: CreateTableTask,
749) -> Result<SubmitDdlTaskResponse> {
750    let (id, output) = ddl_manager
751        .submit_create_table_task(create_table_task)
752        .await?;
753
754    let procedure_id = id.to_string();
755    let output = output.context(ProcedureOutputSnafu {
756        procedure_id: &procedure_id,
757        err_msg: "empty output",
758    })?;
759    let table_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
760        procedure_id: &procedure_id,
761        err_msg: "downcast to `u32`",
762    })?);
763    info!("Table: {table_id} is created via procedure_id {id:?}");
764
765    Ok(SubmitDdlTaskResponse {
766        key: procedure_id.into(),
767        table_ids: vec![table_id],
768    })
769}
770
771async fn handle_create_logical_table_tasks(
772    ddl_manager: &DdlManager,
773    create_table_tasks: Vec<CreateTableTask>,
774) -> Result<SubmitDdlTaskResponse> {
775    ensure!(
776        !create_table_tasks.is_empty(),
777        EmptyDdlTasksSnafu {
778            name: "create logical tables"
779        }
780    );
781    let physical_table_id = utils::check_and_get_physical_table_id(
782        ddl_manager.table_metadata_manager(),
783        &create_table_tasks,
784    )
785    .await?;
786    let num_logical_tables = create_table_tasks.len();
787
788    let (id, output) = ddl_manager
789        .submit_create_logical_table_tasks(create_table_tasks, physical_table_id)
790        .await?;
791
792    info!(
793        "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is created via procedure_id {id:?}"
794    );
795
796    let procedure_id = id.to_string();
797    let output = output.context(ProcedureOutputSnafu {
798        procedure_id: &procedure_id,
799        err_msg: "empty output",
800    })?;
801    let table_ids = output
802        .downcast_ref::<Vec<TableId>>()
803        .context(ProcedureOutputSnafu {
804            procedure_id: &procedure_id,
805            err_msg: "downcast to `Vec<TableId>`",
806        })?
807        .clone();
808
809    Ok(SubmitDdlTaskResponse {
810        key: procedure_id.into(),
811        table_ids,
812    })
813}
814
815async fn handle_create_database_task(
816    ddl_manager: &DdlManager,
817    create_database_task: CreateDatabaseTask,
818) -> Result<SubmitDdlTaskResponse> {
819    let (id, _) = ddl_manager
820        .submit_create_database(create_database_task.clone())
821        .await?;
822
823    let procedure_id = id.to_string();
824    info!(
825        "Database {}.{} is created via procedure_id {id:?}",
826        create_database_task.catalog, create_database_task.schema
827    );
828
829    Ok(SubmitDdlTaskResponse {
830        key: procedure_id.into(),
831        ..Default::default()
832    })
833}
834
835async fn handle_drop_database_task(
836    ddl_manager: &DdlManager,
837    drop_database_task: DropDatabaseTask,
838) -> Result<SubmitDdlTaskResponse> {
839    let (id, _) = ddl_manager
840        .submit_drop_database(drop_database_task.clone())
841        .await?;
842
843    let procedure_id = id.to_string();
844    info!(
845        "Database {}.{} is dropped via procedure_id {id:?}",
846        drop_database_task.catalog, drop_database_task.schema
847    );
848
849    Ok(SubmitDdlTaskResponse {
850        key: procedure_id.into(),
851        ..Default::default()
852    })
853}
854
855async fn handle_alter_database_task(
856    ddl_manager: &DdlManager,
857    alter_database_task: AlterDatabaseTask,
858) -> Result<SubmitDdlTaskResponse> {
859    let (id, _) = ddl_manager
860        .submit_alter_database(alter_database_task.clone())
861        .await?;
862
863    let procedure_id = id.to_string();
864    info!(
865        "Database {}.{} is altered via procedure_id {id:?}",
866        alter_database_task.catalog(),
867        alter_database_task.schema()
868    );
869
870    Ok(SubmitDdlTaskResponse {
871        key: procedure_id.into(),
872        ..Default::default()
873    })
874}
875
876async fn handle_drop_flow_task(
877    ddl_manager: &DdlManager,
878    drop_flow_task: DropFlowTask,
879) -> Result<SubmitDdlTaskResponse> {
880    let (id, _) = ddl_manager
881        .submit_drop_flow_task(drop_flow_task.clone())
882        .await?;
883
884    let procedure_id = id.to_string();
885    info!(
886        "Flow {}.{}({}) is dropped via procedure_id {id:?}",
887        drop_flow_task.catalog_name, drop_flow_task.flow_name, drop_flow_task.flow_id,
888    );
889
890    Ok(SubmitDdlTaskResponse {
891        key: procedure_id.into(),
892        ..Default::default()
893    })
894}
895
896#[cfg(feature = "enterprise")]
897async fn handle_drop_trigger_task(
898    ddl_manager: &DdlManager,
899    drop_trigger_task: DropTriggerTask,
900    query_context: QueryContext,
901) -> Result<SubmitDdlTaskResponse> {
902    let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
903        use crate::error::UnsupportedSnafu;
904
905        return UnsupportedSnafu {
906            operation: "drop trigger",
907        }
908        .fail();
909    };
910
911    m.drop_trigger(
912        drop_trigger_task,
913        ddl_manager.procedure_manager.clone(),
914        ddl_manager.ddl_context.clone(),
915        query_context,
916    )
917    .await
918}
919
920async fn handle_drop_view_task(
921    ddl_manager: &DdlManager,
922    drop_view_task: DropViewTask,
923) -> Result<SubmitDdlTaskResponse> {
924    let (id, _) = ddl_manager
925        .submit_drop_view_task(drop_view_task.clone())
926        .await?;
927
928    let procedure_id = id.to_string();
929    info!(
930        "View {}({}) is dropped via procedure_id {id:?}",
931        drop_view_task.table_ref(),
932        drop_view_task.view_id,
933    );
934
935    Ok(SubmitDdlTaskResponse {
936        key: procedure_id.into(),
937        ..Default::default()
938    })
939}
940
941async fn handle_create_flow_task(
942    ddl_manager: &DdlManager,
943    create_flow_task: CreateFlowTask,
944    query_context: QueryContext,
945) -> Result<SubmitDdlTaskResponse> {
946    let (id, output) = ddl_manager
947        .submit_create_flow_task(create_flow_task.clone(), query_context)
948        .await?;
949
950    let procedure_id = id.to_string();
951    let output = output.context(ProcedureOutputSnafu {
952        procedure_id: &procedure_id,
953        err_msg: "empty output",
954    })?;
955    let flow_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
956        procedure_id: &procedure_id,
957        err_msg: "downcast to `u32`",
958    })?);
959    if !create_flow_task.or_replace {
960        info!(
961            "Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
962            create_flow_task.catalog_name, create_flow_task.flow_name,
963        );
964    } else {
965        info!(
966            "Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
967            create_flow_task.catalog_name, create_flow_task.flow_name,
968        );
969    }
970
971    Ok(SubmitDdlTaskResponse {
972        key: procedure_id.into(),
973        ..Default::default()
974    })
975}
976
977#[cfg(feature = "enterprise")]
978async fn handle_create_trigger_task(
979    ddl_manager: &DdlManager,
980    create_trigger_task: CreateTriggerTask,
981    query_context: QueryContext,
982) -> Result<SubmitDdlTaskResponse> {
983    let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
984        use crate::error::UnsupportedSnafu;
985
986        return UnsupportedSnafu {
987            operation: "create trigger",
988        }
989        .fail();
990    };
991
992    m.create_trigger(
993        create_trigger_task,
994        ddl_manager.procedure_manager.clone(),
995        ddl_manager.ddl_context.clone(),
996        query_context,
997    )
998    .await
999}
1000
1001async fn handle_alter_logical_table_tasks(
1002    ddl_manager: &DdlManager,
1003    alter_table_tasks: Vec<AlterTableTask>,
1004) -> Result<SubmitDdlTaskResponse> {
1005    ensure!(
1006        !alter_table_tasks.is_empty(),
1007        EmptyDdlTasksSnafu {
1008            name: "alter logical tables"
1009        }
1010    );
1011
1012    // Use the physical table id in the first logical table, then it will be checked in the procedure.
1013    let first_table = TableNameKey {
1014        catalog: &alter_table_tasks[0].alter_table.catalog_name,
1015        schema: &alter_table_tasks[0].alter_table.schema_name,
1016        table: &alter_table_tasks[0].alter_table.table_name,
1017    };
1018    let physical_table_id =
1019        utils::get_physical_table_id(ddl_manager.table_metadata_manager(), first_table).await?;
1020    let num_logical_tables = alter_table_tasks.len();
1021
1022    let (id, _) = ddl_manager
1023        .submit_alter_logical_table_tasks(alter_table_tasks, physical_table_id)
1024        .await?;
1025
1026    info!(
1027        "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is altered via procedure_id {id:?}"
1028    );
1029
1030    let procedure_id = id.to_string();
1031
1032    Ok(SubmitDdlTaskResponse {
1033        key: procedure_id.into(),
1034        ..Default::default()
1035    })
1036}
1037
1038/// Handle the `[CreateViewTask]` and returns the DDL response when success.
1039async fn handle_create_view_task(
1040    ddl_manager: &DdlManager,
1041    create_view_task: CreateViewTask,
1042) -> Result<SubmitDdlTaskResponse> {
1043    let (id, output) = ddl_manager
1044        .submit_create_view_task(create_view_task)
1045        .await?;
1046
1047    let procedure_id = id.to_string();
1048    let output = output.context(ProcedureOutputSnafu {
1049        procedure_id: &procedure_id,
1050        err_msg: "empty output",
1051    })?;
1052    let view_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
1053        procedure_id: &procedure_id,
1054        err_msg: "downcast to `u32`",
1055    })?);
1056    info!("View: {view_id} is created via procedure_id {id:?}");
1057
1058    Ok(SubmitDdlTaskResponse {
1059        key: procedure_id.into(),
1060        table_ids: vec![view_id],
1061    })
1062}
1063
1064async fn handle_comment_on_task(
1065    ddl_manager: &DdlManager,
1066    comment_on_task: CommentOnTask,
1067) -> Result<SubmitDdlTaskResponse> {
1068    let (id, _) = ddl_manager
1069        .submit_comment_on_task(comment_on_task.clone())
1070        .await?;
1071
1072    let procedure_id = id.to_string();
1073    info!(
1074        "Comment on {}.{}.{} is updated via procedure_id {id:?}",
1075        comment_on_task.catalog_name, comment_on_task.schema_name, comment_on_task.object_name
1076    );
1077
1078    Ok(SubmitDdlTaskResponse {
1079        key: procedure_id.into(),
1080        ..Default::default()
1081    })
1082}
1083
1084#[cfg(test)]
1085mod tests {
1086    use std::sync::Arc;
1087    use std::time::Duration;
1088
1089    use common_error::ext::BoxedError;
1090    use common_procedure::local::LocalManager;
1091    use common_procedure::test_util::InMemoryPoisonStore;
1092    use common_procedure::{BoxedProcedure, ProcedureManagerRef};
1093    use store_api::storage::TableId;
1094    use table::table_name::TableName;
1095
1096    use super::DdlManager;
1097    use crate::cache_invalidator::DummyCacheInvalidator;
1098    use crate::ddl::alter_table::AlterTableProcedure;
1099    use crate::ddl::create_table::CreateTableProcedure;
1100    use crate::ddl::drop_table::DropTableProcedure;
1101    use crate::ddl::flow_meta::FlowMetadataAllocator;
1102    use crate::ddl::table_meta::TableMetadataAllocator;
1103    use crate::ddl::truncate_table::TruncateTableProcedure;
1104    use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
1105    use crate::ddl_manager::RepartitionProcedureFactory;
1106    use crate::key::TableMetadataManager;
1107    use crate::key::flow::FlowMetadataManager;
1108    use crate::kv_backend::memory::MemoryKvBackend;
1109    use crate::node_manager::{DatanodeManager, DatanodeRef, FlownodeManager, FlownodeRef};
1110    use crate::peer::Peer;
1111    use crate::region_keeper::MemoryRegionKeeper;
1112    use crate::region_registry::LeaderRegionRegistry;
1113    use crate::sequence::SequenceBuilder;
1114    use crate::state_store::KvStateStore;
1115    use crate::wal_provider::WalProvider;
1116
1117    /// A dummy implemented [NodeManager].
1118    pub struct DummyDatanodeManager;
1119
1120    #[async_trait::async_trait]
1121    impl DatanodeManager for DummyDatanodeManager {
1122        async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
1123            unimplemented!()
1124        }
1125    }
1126
1127    #[async_trait::async_trait]
1128    impl FlownodeManager for DummyDatanodeManager {
1129        async fn flownode(&self, _node: &Peer) -> FlownodeRef {
1130            unimplemented!()
1131        }
1132    }
1133
1134    struct DummyRepartitionProcedureFactory;
1135
1136    #[async_trait::async_trait]
1137    impl RepartitionProcedureFactory for DummyRepartitionProcedureFactory {
1138        fn create(
1139            &self,
1140            _ddl_ctx: &DdlContext,
1141            _table_name: TableName,
1142            _table_id: TableId,
1143            _from_exprs: Vec<String>,
1144            _to_exprs: Vec<String>,
1145            _timeout: Option<Duration>,
1146        ) -> std::result::Result<BoxedProcedure, BoxedError> {
1147            unimplemented!()
1148        }
1149
1150        fn register_loaders(
1151            &self,
1152            _ddl_ctx: &DdlContext,
1153            _procedure_manager: &ProcedureManagerRef,
1154        ) -> std::result::Result<(), BoxedError> {
1155            Ok(())
1156        }
1157    }
1158
1159    #[test]
1160    fn test_try_new() {
1161        let kv_backend = Arc::new(MemoryKvBackend::new());
1162        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
1163        let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
1164            Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
1165            Arc::new(WalProvider::default()),
1166        ));
1167        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
1168        let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
1169            Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()),
1170        ));
1171
1172        let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
1173        let poison_manager = Arc::new(InMemoryPoisonStore::default());
1174        let procedure_manager = Arc::new(LocalManager::new(
1175            Default::default(),
1176            state_store,
1177            poison_manager,
1178            None,
1179            None,
1180        ));
1181
1182        let _ = DdlManager::try_new(
1183            DdlContext {
1184                node_manager: Arc::new(DummyDatanodeManager),
1185                cache_invalidator: Arc::new(DummyCacheInvalidator),
1186                table_metadata_manager,
1187                table_metadata_allocator,
1188                flow_metadata_manager,
1189                flow_metadata_allocator,
1190                memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
1191                leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
1192                region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
1193            },
1194            procedure_manager.clone(),
1195            Arc::new(DummyRepartitionProcedureFactory),
1196            true,
1197        );
1198
1199        let expected_loaders = vec![
1200            CreateTableProcedure::TYPE_NAME,
1201            AlterTableProcedure::TYPE_NAME,
1202            DropTableProcedure::TYPE_NAME,
1203            TruncateTableProcedure::TYPE_NAME,
1204        ];
1205
1206        for loader in expected_loaders {
1207            assert!(procedure_manager.contains_loader(loader));
1208        }
1209    }
1210}