Skip to main content

common_meta/
ddl_manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::Repartition;
19use api::v1::alter_table_expr::Kind;
20use common_error::ext::BoxedError;
21use common_procedure::{
22    BoxedProcedure, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef,
23    ProcedureWithId, watcher,
24};
25use common_telemetry::tracing_context::{FutureExt, TracingContext};
26use common_telemetry::{debug, info, tracing};
27use derive_builder::Builder;
28use snafu::{OptionExt, ResultExt, ensure};
29use store_api::storage::TableId;
30use table::table_name::TableName;
31
32use crate::ddl::alter_database::AlterDatabaseProcedure;
33use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
34use crate::ddl::alter_table::AlterTableProcedure;
35use crate::ddl::comment_on::CommentOnProcedure;
36use crate::ddl::create_database::CreateDatabaseProcedure;
37use crate::ddl::create_flow::CreateFlowProcedure;
38use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
39use crate::ddl::create_table::CreateTableProcedure;
40use crate::ddl::create_view::CreateViewProcedure;
41use crate::ddl::drop_database::DropDatabaseProcedure;
42use crate::ddl::drop_flow::DropFlowProcedure;
43use crate::ddl::drop_table::DropTableProcedure;
44use crate::ddl::drop_view::DropViewProcedure;
45use crate::ddl::truncate_table::TruncateTableProcedure;
46use crate::ddl::{DdlContext, utils};
47use crate::error::{
48    self, CreateRepartitionProcedureSnafu, EmptyDdlTasksSnafu, ProcedureOutputSnafu,
49    RegisterProcedureLoaderSnafu, RegisterRepartitionProcedureLoaderSnafu, Result,
50    SubmitProcedureSnafu, TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
51    UnexpectedLogicalRouteTableSnafu, WaitProcedureSnafu,
52};
53use crate::key::table_info::TableInfoValue;
54use crate::key::table_name::TableNameKey;
55use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
56use crate::procedure_executor::ExecutorContext;
57#[cfg(feature = "enterprise")]
58use crate::rpc::ddl::DdlTask::CreateTrigger;
59#[cfg(feature = "enterprise")]
60use crate::rpc::ddl::DdlTask::DropTrigger;
61use crate::rpc::ddl::DdlTask::{
62    AlterDatabase, AlterLogicalTables, AlterTable, CommentOn, CreateDatabase, CreateFlow,
63    CreateLogicalTables, CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables,
64    DropTable, DropView, TruncateTable,
65};
66#[cfg(feature = "enterprise")]
67use crate::rpc::ddl::trigger::CreateTriggerTask;
68#[cfg(feature = "enterprise")]
69use crate::rpc::ddl::trigger::DropTriggerTask;
70use crate::rpc::ddl::{
71    AlterDatabaseTask, AlterTableTask, CommentOnTask, CreateDatabaseTask, CreateFlowTask,
72    CreateTableTask, CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask,
73    QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
74};
75
76/// A configurator that customizes or enhances a [`DdlManager`].
77#[async_trait::async_trait]
78pub trait DdlManagerConfigurator<C>: Send + Sync {
79    /// Configures the given [`DdlManager`] using the provided [`DdlManagerConfigureContext`].
80    async fn configure(
81        &self,
82        ddl_manager: DdlManager,
83        ctx: C,
84    ) -> std::result::Result<DdlManager, BoxedError>;
85}
86
87pub type DdlManagerConfiguratorRef<C> = Arc<dyn DdlManagerConfigurator<C>>;
88
89pub type DdlManagerRef = Arc<DdlManager>;
90
91pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader;
92
93/// The [DdlManager] provides the ability to execute Ddl.
94#[derive(Builder)]
95pub struct DdlManager {
96    ddl_context: DdlContext,
97    procedure_manager: ProcedureManagerRef,
98    repartition_procedure_factory: RepartitionProcedureFactoryRef,
99    #[cfg(feature = "enterprise")]
100    trigger_ddl_manager: Option<TriggerDdlManagerRef>,
101}
102
103/// This trait is responsible for handling DDL tasks about triggers. e.g.,
104/// create trigger, drop trigger, etc.
105#[cfg(feature = "enterprise")]
106#[async_trait::async_trait]
107pub trait TriggerDdlManager: Send + Sync {
108    async fn create_trigger(
109        &self,
110        create_trigger_task: CreateTriggerTask,
111        procedure_manager: ProcedureManagerRef,
112        ddl_context: DdlContext,
113        query_context: QueryContext,
114    ) -> Result<SubmitDdlTaskResponse>;
115
116    async fn drop_trigger(
117        &self,
118        drop_trigger_task: DropTriggerTask,
119        procedure_manager: ProcedureManagerRef,
120        ddl_context: DdlContext,
121        query_context: QueryContext,
122    ) -> Result<SubmitDdlTaskResponse>;
123
124    fn as_any(&self) -> &dyn std::any::Any;
125}
126
127#[cfg(feature = "enterprise")]
128pub type TriggerDdlManagerRef = Arc<dyn TriggerDdlManager>;
129
130macro_rules! procedure_loader_entry {
131    ($procedure:ident) => {
132        (
133            $procedure::TYPE_NAME,
134            &|context: DdlContext| -> BoxedProcedureLoader {
135                Box::new(move |json: &str| {
136                    let context = context.clone();
137                    $procedure::from_json(json, context).map(|p| Box::new(p) as _)
138                })
139            },
140        )
141    };
142}
143
144macro_rules! procedure_loader {
145    ($($procedure:ident),*) => {
146        vec![
147            $(procedure_loader_entry!($procedure)),*
148        ]
149    };
150}
151
152pub type RepartitionProcedureFactoryRef = Arc<dyn RepartitionProcedureFactory>;
153
154pub trait RepartitionProcedureFactory: Send + Sync {
155    fn create(
156        &self,
157        ddl_ctx: &DdlContext,
158        table_name: TableName,
159        table_id: TableId,
160        from_exprs: Vec<String>,
161        to_exprs: Vec<String>,
162        timeout: Option<Duration>,
163    ) -> std::result::Result<BoxedProcedure, BoxedError>;
164
165    fn register_loaders(
166        &self,
167        ddl_ctx: &DdlContext,
168        procedure_manager: &ProcedureManagerRef,
169    ) -> std::result::Result<(), BoxedError>;
170}
171
172/// The options for DDL tasks.
173///
174/// Note: These options may not be utilized by all procedures.
175/// At present, they are specifically applied in `RepartitionProcedure`.
176#[derive(Debug, Clone, Copy)]
177pub struct DdlOptions {
178    /// The timeout will be passed to the procedure.
179    ///
180    /// Note: Each procedure may implement its own timeout handling mechanism.
181    pub timeout: Duration,
182    /// The flag that controls whether to wait for the procedure to complete.
183    ///
184    /// If wait is `true`, the procedure will wait for completion(success or failure) and the result will be returned.
185    /// Otherwise, the procedure will be submitted and return the [ProcedureId](common_procedure::ProcedureId) immediately.
186    ///
187    /// Note: The value of `wait` is independent of the `timeout` option. If a procedure ignores the `timeout` and `wait` is set to true, the operation returns until the procedure completes.
188    pub wait: bool,
189}
190
191impl DdlManager {
192    /// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered.
193    pub fn try_new(
194        ddl_context: DdlContext,
195        procedure_manager: ProcedureManagerRef,
196        repartition_procedure_factory: RepartitionProcedureFactoryRef,
197        register_loaders: bool,
198    ) -> Result<Self> {
199        let manager = Self {
200            ddl_context,
201            procedure_manager,
202            repartition_procedure_factory,
203            #[cfg(feature = "enterprise")]
204            trigger_ddl_manager: None,
205        };
206        if register_loaders {
207            manager.register_loaders()?;
208        }
209        Ok(manager)
210    }
211
212    #[cfg(feature = "enterprise")]
213    pub fn with_trigger_ddl_manager(mut self, trigger_ddl_manager: TriggerDdlManagerRef) -> Self {
214        self.trigger_ddl_manager = Some(trigger_ddl_manager);
215        self
216    }
217
218    /// Returns the [TableMetadataManagerRef].
219    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
220        &self.ddl_context.table_metadata_manager
221    }
222
223    /// Returns the [DdlContext]
224    pub fn create_context(&self) -> DdlContext {
225        self.ddl_context.clone()
226    }
227
228    /// Registers all Ddl loaders.
229    pub fn register_loaders(&self) -> Result<()> {
230        let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = procedure_loader!(
231            CreateTableProcedure,
232            CreateLogicalTablesProcedure,
233            CreateViewProcedure,
234            CreateFlowProcedure,
235            AlterTableProcedure,
236            AlterLogicalTablesProcedure,
237            AlterDatabaseProcedure,
238            DropTableProcedure,
239            DropFlowProcedure,
240            TruncateTableProcedure,
241            CreateDatabaseProcedure,
242            DropDatabaseProcedure,
243            DropViewProcedure,
244            CommentOnProcedure
245        );
246
247        for (type_name, loader_factory) in loaders {
248            let context = self.create_context();
249            self.procedure_manager
250                .register_loader(type_name, loader_factory(context))
251                .context(RegisterProcedureLoaderSnafu { type_name })?;
252        }
253
254        self.repartition_procedure_factory
255            .register_loaders(&self.ddl_context, &self.procedure_manager)
256            .context(RegisterRepartitionProcedureLoaderSnafu)?;
257
258        Ok(())
259    }
260
261    /// Submits a repartition procedure for the specified table.
262    ///
263    /// This creates a repartition procedure using the provided `table_id`,
264    /// `table_name`, and `Repartition` configuration, and then either executes it
265    /// to completion or just submits it for asynchronous execution.
266    ///
267    /// The `Repartition` argument contains the original (`from_partition_exprs`)
268    /// and target (`into_partition_exprs`) partition expressions that define how
269    /// the table should be repartitioned.
270    ///
271    /// The `wait` flag controls whether this method waits for the repartition
272    /// procedure to finish:
273    /// - If `wait` is `true`, the procedure is executed and this method awaits
274    ///   its completion, returning both the generated `ProcedureId` and the
275    ///   final `Output` of the procedure.
276    /// - If `wait` is `false`, the procedure is only submitted to the procedure
277    ///   manager for asynchronous execution, and this method returns the
278    ///   `ProcedureId` along with `None` as the output.
279    async fn submit_repartition_task(
280        &self,
281        table_id: TableId,
282        table_name: TableName,
283        Repartition {
284            from_partition_exprs,
285            into_partition_exprs,
286        }: Repartition,
287        wait: bool,
288        timeout: Duration,
289    ) -> Result<(ProcedureId, Option<Output>)> {
290        let context = self.create_context();
291
292        let procedure = self
293            .repartition_procedure_factory
294            .create(
295                &context,
296                table_name,
297                table_id,
298                from_partition_exprs,
299                into_partition_exprs,
300                Some(timeout),
301            )
302            .context(CreateRepartitionProcedureSnafu)?;
303        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
304        if wait {
305            self.execute_procedure_and_wait(procedure_with_id).await
306        } else {
307            self.submit_procedure(procedure_with_id)
308                .await
309                .map(|p| (p, None))
310        }
311    }
312
313    /// Submits and executes an alter table task.
314    #[tracing::instrument(skip_all)]
315    pub async fn submit_alter_table_task(
316        &self,
317        table_id: TableId,
318        alter_table_task: AlterTableTask,
319        ddl_options: DdlOptions,
320    ) -> Result<(ProcedureId, Option<Output>)> {
321        // make alter_table_task mutable so we can call .take() on its field
322        let mut alter_table_task = alter_table_task;
323        if let Some(Kind::Repartition(_)) = alter_table_task.alter_table.kind.as_ref()
324            && let Kind::Repartition(repartition) =
325                alter_table_task.alter_table.kind.take().unwrap()
326        {
327            let table_name = TableName::new(
328                alter_table_task.alter_table.catalog_name,
329                alter_table_task.alter_table.schema_name,
330                alter_table_task.alter_table.table_name,
331            );
332            return self
333                .submit_repartition_task(
334                    table_id,
335                    table_name,
336                    repartition,
337                    ddl_options.wait,
338                    ddl_options.timeout,
339                )
340                .await;
341        }
342
343        let context = self.create_context();
344        let procedure = AlterTableProcedure::new(table_id, alter_table_task, context)?;
345
346        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
347
348        self.execute_procedure_and_wait(procedure_with_id).await
349    }
350
351    /// Submits and executes a create table task.
352    #[tracing::instrument(skip_all)]
353    pub async fn submit_create_table_task(
354        &self,
355        create_table_task: CreateTableTask,
356        query_context: QueryContext,
357    ) -> Result<(ProcedureId, Option<Output>)> {
358        let context = self.create_context();
359
360        let procedure = CreateTableProcedure::new_with_query_context(
361            create_table_task,
362            query_context,
363            context,
364        )?;
365
366        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
367
368        self.execute_procedure_and_wait(procedure_with_id).await
369    }
370
371    /// Submits and executes a `[CreateViewTask]`.
372    #[tracing::instrument(skip_all)]
373    pub async fn submit_create_view_task(
374        &self,
375        create_view_task: CreateViewTask,
376    ) -> Result<(ProcedureId, Option<Output>)> {
377        let context = self.create_context();
378
379        let procedure = CreateViewProcedure::new(create_view_task, context);
380
381        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
382
383        self.execute_procedure_and_wait(procedure_with_id).await
384    }
385
386    /// Submits and executes a create multiple logical table tasks.
387    #[tracing::instrument(skip_all)]
388    pub async fn submit_create_logical_table_tasks(
389        &self,
390        create_table_tasks: Vec<CreateTableTask>,
391        physical_table_id: TableId,
392    ) -> Result<(ProcedureId, Option<Output>)> {
393        let context = self.create_context();
394
395        let procedure =
396            CreateLogicalTablesProcedure::new(create_table_tasks, physical_table_id, context);
397
398        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
399
400        self.execute_procedure_and_wait(procedure_with_id).await
401    }
402
403    /// Submits and executes alter multiple table tasks.
404    #[tracing::instrument(skip_all)]
405    pub async fn submit_alter_logical_table_tasks(
406        &self,
407        alter_table_tasks: Vec<AlterTableTask>,
408        physical_table_id: TableId,
409    ) -> Result<(ProcedureId, Option<Output>)> {
410        let context = self.create_context();
411
412        let procedure =
413            AlterLogicalTablesProcedure::new(alter_table_tasks, physical_table_id, context);
414
415        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
416
417        self.execute_procedure_and_wait(procedure_with_id).await
418    }
419
420    /// Submits and executes a drop table task.
421    #[tracing::instrument(skip_all)]
422    pub async fn submit_drop_table_task(
423        &self,
424        drop_table_task: DropTableTask,
425    ) -> Result<(ProcedureId, Option<Output>)> {
426        let context = self.create_context();
427
428        let procedure = DropTableProcedure::new(drop_table_task, context);
429
430        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
431
432        self.execute_procedure_and_wait(procedure_with_id).await
433    }
434
435    /// Submits and executes a create database task.
436    #[tracing::instrument(skip_all)]
437    pub async fn submit_create_database(
438        &self,
439        CreateDatabaseTask {
440            catalog,
441            schema,
442            create_if_not_exists,
443            options,
444        }: CreateDatabaseTask,
445    ) -> Result<(ProcedureId, Option<Output>)> {
446        let context = self.create_context();
447        let procedure =
448            CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context);
449        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
450
451        self.execute_procedure_and_wait(procedure_with_id).await
452    }
453
454    /// Submits and executes a drop table task.
455    #[tracing::instrument(skip_all)]
456    pub async fn submit_drop_database(
457        &self,
458        DropDatabaseTask {
459            catalog,
460            schema,
461            drop_if_exists,
462        }: DropDatabaseTask,
463    ) -> Result<(ProcedureId, Option<Output>)> {
464        let context = self.create_context();
465        let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context);
466        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
467
468        self.execute_procedure_and_wait(procedure_with_id).await
469    }
470
471    pub async fn submit_alter_database(
472        &self,
473        alter_database_task: AlterDatabaseTask,
474    ) -> Result<(ProcedureId, Option<Output>)> {
475        let context = self.create_context();
476        let procedure = AlterDatabaseProcedure::new(alter_database_task, context)?;
477        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
478
479        self.execute_procedure_and_wait(procedure_with_id).await
480    }
481
482    /// Submits and executes a create flow task.
483    #[tracing::instrument(skip_all)]
484    pub async fn submit_create_flow_task(
485        &self,
486        create_flow: CreateFlowTask,
487        query_context: QueryContext,
488    ) -> Result<(ProcedureId, Option<Output>)> {
489        let context = self.create_context();
490        let procedure = CreateFlowProcedure::new(create_flow, query_context, context);
491        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
492
493        self.execute_procedure_and_wait(procedure_with_id).await
494    }
495
496    /// Submits and executes a drop flow task.
497    #[tracing::instrument(skip_all)]
498    pub async fn submit_drop_flow_task(
499        &self,
500        drop_flow: DropFlowTask,
501    ) -> Result<(ProcedureId, Option<Output>)> {
502        let context = self.create_context();
503        let procedure = DropFlowProcedure::new(drop_flow, context);
504        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
505
506        self.execute_procedure_and_wait(procedure_with_id).await
507    }
508
509    /// Submits and executes a drop view task.
510    #[tracing::instrument(skip_all)]
511    pub async fn submit_drop_view_task(
512        &self,
513        drop_view: DropViewTask,
514    ) -> Result<(ProcedureId, Option<Output>)> {
515        let context = self.create_context();
516        let procedure = DropViewProcedure::new(drop_view, context);
517        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
518
519        self.execute_procedure_and_wait(procedure_with_id).await
520    }
521
522    /// Submits and executes a truncate table task.
523    #[tracing::instrument(skip_all)]
524    pub async fn submit_truncate_table_task(
525        &self,
526        truncate_table_task: TruncateTableTask,
527        table_info_value: DeserializedValueWithBytes<TableInfoValue>,
528    ) -> Result<(ProcedureId, Option<Output>)> {
529        let context = self.create_context();
530        let procedure = TruncateTableProcedure::new(truncate_table_task, table_info_value, context);
531
532        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
533
534        self.execute_procedure_and_wait(procedure_with_id).await
535    }
536
537    /// Submits and executes a comment on task.
538    #[tracing::instrument(skip_all)]
539    pub async fn submit_comment_on_task(
540        &self,
541        comment_on_task: CommentOnTask,
542    ) -> Result<(ProcedureId, Option<Output>)> {
543        let context = self.create_context();
544        let procedure = CommentOnProcedure::new(comment_on_task, context);
545        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
546
547        self.execute_procedure_and_wait(procedure_with_id).await
548    }
549
550    /// Executes a procedure and waits for the result.
551    async fn execute_procedure_and_wait(
552        &self,
553        procedure_with_id: ProcedureWithId,
554    ) -> Result<(ProcedureId, Option<Output>)> {
555        let procedure_id = procedure_with_id.id;
556
557        let mut watcher = self
558            .procedure_manager
559            .submit(procedure_with_id)
560            .await
561            .context(SubmitProcedureSnafu)?;
562
563        let output = watcher::wait(&mut watcher)
564            .await
565            .context(WaitProcedureSnafu)?;
566
567        Ok((procedure_id, output))
568    }
569
570    /// Submits a procedure and returns the procedure id.
571    async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result<ProcedureId> {
572        let procedure_id = procedure_with_id.id;
573        let _ = self
574            .procedure_manager
575            .submit(procedure_with_id)
576            .await
577            .context(SubmitProcedureSnafu)?;
578
579        Ok(procedure_id)
580    }
581
582    pub async fn submit_ddl_task(
583        &self,
584        ctx: &ExecutorContext,
585        request: SubmitDdlTaskRequest,
586    ) -> Result<SubmitDdlTaskResponse> {
587        let span = ctx
588            .tracing_context
589            .as_ref()
590            .map(TracingContext::from_w3c)
591            .unwrap_or_else(TracingContext::from_current_span)
592            .attach(tracing::info_span!("DdlManager::submit_ddl_task"));
593        let ddl_options = DdlOptions {
594            wait: request.wait,
595            timeout: request.timeout,
596        };
597        async move {
598            debug!("Submitting Ddl task: {:?}", request.task);
599            match request.task {
600                CreateTable(create_table_task) => {
601                    handle_create_table_task(self, create_table_task, request.query_context).await
602                }
603                DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await,
604                AlterTable(alter_table_task) => {
605                    handle_alter_table_task(self, alter_table_task, ddl_options).await
606                }
607                TruncateTable(truncate_table_task) => {
608                    handle_truncate_table_task(self, truncate_table_task).await
609                }
610                CreateLogicalTables(create_table_tasks) => {
611                    handle_create_logical_table_tasks(self, create_table_tasks).await
612                }
613                AlterLogicalTables(alter_table_tasks) => {
614                    handle_alter_logical_table_tasks(self, alter_table_tasks).await
615                }
616                DropLogicalTables(_) => todo!(),
617                CreateDatabase(create_database_task) => {
618                    handle_create_database_task(self, create_database_task).await
619                }
620                DropDatabase(drop_database_task) => {
621                    handle_drop_database_task(self, drop_database_task).await
622                }
623                AlterDatabase(alter_database_task) => {
624                    handle_alter_database_task(self, alter_database_task).await
625                }
626                CreateFlow(create_flow_task) => {
627                    handle_create_flow_task(self, create_flow_task, request.query_context).await
628                }
629                DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
630                CreateView(create_view_task) => {
631                    handle_create_view_task(self, create_view_task).await
632                }
633                DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
634                CommentOn(comment_on_task) => handle_comment_on_task(self, comment_on_task).await,
635                #[cfg(feature = "enterprise")]
636                CreateTrigger(create_trigger_task) => {
637                    handle_create_trigger_task(self, create_trigger_task, request.query_context)
638                        .await
639                }
640                #[cfg(feature = "enterprise")]
641                DropTrigger(drop_trigger_task) => {
642                    handle_drop_trigger_task(self, drop_trigger_task, request.query_context).await
643                }
644            }
645        }
646        .trace(span)
647        .await
648    }
649}
650
651async fn handle_truncate_table_task(
652    ddl_manager: &DdlManager,
653    truncate_table_task: TruncateTableTask,
654) -> Result<SubmitDdlTaskResponse> {
655    let table_id = truncate_table_task.table_id;
656    let table_metadata_manager = &ddl_manager.table_metadata_manager();
657    let table_ref = truncate_table_task.table_ref();
658
659    let table_info_value = table_metadata_manager
660        .table_info_manager()
661        .get(table_id)
662        .await?
663        .with_context(|| TableInfoNotFoundSnafu {
664            table: table_ref.to_string(),
665        })?;
666    let physical_table_id = table_metadata_manager
667        .table_route_manager()
668        .get_physical_table_id(table_id)
669        .await?;
670    ensure!(
671        physical_table_id == table_id,
672        error::UnexpectedSnafu {
673            err_msg: "Truncate table is only supported for physical tables."
674        }
675    );
676
677    let (id, _) = ddl_manager
678        .submit_truncate_table_task(truncate_table_task, table_info_value)
679        .await?;
680
681    info!("Table: {table_id} is truncated via procedure_id {id:?}");
682
683    Ok(SubmitDdlTaskResponse {
684        key: id.to_string().into(),
685        ..Default::default()
686    })
687}
688
689async fn handle_alter_table_task(
690    ddl_manager: &DdlManager,
691    alter_table_task: AlterTableTask,
692    ddl_options: DdlOptions,
693) -> Result<SubmitDdlTaskResponse> {
694    let table_ref = alter_table_task.table_ref();
695
696    let table_id = ddl_manager
697        .table_metadata_manager()
698        .table_name_manager()
699        .get(TableNameKey::new(
700            table_ref.catalog,
701            table_ref.schema,
702            table_ref.table,
703        ))
704        .await?
705        .with_context(|| TableNotFoundSnafu {
706            table_name: table_ref.to_string(),
707        })?
708        .table_id();
709
710    let table_route_value = ddl_manager
711        .table_metadata_manager()
712        .table_route_manager()
713        .table_route_storage()
714        .get(table_id)
715        .await?
716        .context(TableRouteNotFoundSnafu { table_id })?;
717    ensure!(
718        table_route_value.is_physical(),
719        UnexpectedLogicalRouteTableSnafu {
720            err_msg: format!("{:?} is a non-physical TableRouteValue.", table_ref),
721        }
722    );
723
724    let (id, _) = ddl_manager
725        .submit_alter_table_task(table_id, alter_table_task, ddl_options)
726        .await?;
727
728    info!("Table: {table_id} is altered via procedure_id {id:?}");
729
730    Ok(SubmitDdlTaskResponse {
731        key: id.to_string().into(),
732        ..Default::default()
733    })
734}
735
736async fn handle_drop_table_task(
737    ddl_manager: &DdlManager,
738    drop_table_task: DropTableTask,
739) -> Result<SubmitDdlTaskResponse> {
740    let table_id = drop_table_task.table_id;
741    let (id, _) = ddl_manager.submit_drop_table_task(drop_table_task).await?;
742
743    info!("Table: {table_id} is dropped via procedure_id {id:?}");
744
745    Ok(SubmitDdlTaskResponse {
746        key: id.to_string().into(),
747        ..Default::default()
748    })
749}
750
751async fn handle_create_table_task(
752    ddl_manager: &DdlManager,
753    create_table_task: CreateTableTask,
754    query_context: QueryContext,
755) -> Result<SubmitDdlTaskResponse> {
756    let (id, output) = ddl_manager
757        .submit_create_table_task(create_table_task, query_context)
758        .await?;
759
760    let procedure_id = id.to_string();
761    let output = output.context(ProcedureOutputSnafu {
762        procedure_id: &procedure_id,
763        err_msg: "empty output",
764    })?;
765    let table_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
766        procedure_id: &procedure_id,
767        err_msg: "downcast to `u32`",
768    })?);
769    info!("Table: {table_id} is created via procedure_id {id:?}");
770
771    Ok(SubmitDdlTaskResponse {
772        key: procedure_id.into(),
773        table_ids: vec![table_id],
774    })
775}
776
777async fn handle_create_logical_table_tasks(
778    ddl_manager: &DdlManager,
779    create_table_tasks: Vec<CreateTableTask>,
780) -> Result<SubmitDdlTaskResponse> {
781    ensure!(
782        !create_table_tasks.is_empty(),
783        EmptyDdlTasksSnafu {
784            name: "create logical tables"
785        }
786    );
787    let physical_table_id = utils::check_and_get_physical_table_id(
788        ddl_manager.table_metadata_manager(),
789        &create_table_tasks,
790    )
791    .await?;
792    let num_logical_tables = create_table_tasks.len();
793
794    let (id, output) = ddl_manager
795        .submit_create_logical_table_tasks(create_table_tasks, physical_table_id)
796        .await?;
797
798    info!(
799        "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is created via procedure_id {id:?}"
800    );
801
802    let procedure_id = id.to_string();
803    let output = output.context(ProcedureOutputSnafu {
804        procedure_id: &procedure_id,
805        err_msg: "empty output",
806    })?;
807    let table_ids = output
808        .downcast_ref::<Vec<TableId>>()
809        .context(ProcedureOutputSnafu {
810            procedure_id: &procedure_id,
811            err_msg: "downcast to `Vec<TableId>`",
812        })?
813        .clone();
814
815    Ok(SubmitDdlTaskResponse {
816        key: procedure_id.into(),
817        table_ids,
818    })
819}
820
821async fn handle_create_database_task(
822    ddl_manager: &DdlManager,
823    create_database_task: CreateDatabaseTask,
824) -> Result<SubmitDdlTaskResponse> {
825    let (id, _) = ddl_manager
826        .submit_create_database(create_database_task.clone())
827        .await?;
828
829    let procedure_id = id.to_string();
830    info!(
831        "Database {}.{} is created via procedure_id {id:?}",
832        create_database_task.catalog, create_database_task.schema
833    );
834
835    Ok(SubmitDdlTaskResponse {
836        key: procedure_id.into(),
837        ..Default::default()
838    })
839}
840
841async fn handle_drop_database_task(
842    ddl_manager: &DdlManager,
843    drop_database_task: DropDatabaseTask,
844) -> Result<SubmitDdlTaskResponse> {
845    let (id, _) = ddl_manager
846        .submit_drop_database(drop_database_task.clone())
847        .await?;
848
849    let procedure_id = id.to_string();
850    info!(
851        "Database {}.{} is dropped via procedure_id {id:?}",
852        drop_database_task.catalog, drop_database_task.schema
853    );
854
855    Ok(SubmitDdlTaskResponse {
856        key: procedure_id.into(),
857        ..Default::default()
858    })
859}
860
861async fn handle_alter_database_task(
862    ddl_manager: &DdlManager,
863    alter_database_task: AlterDatabaseTask,
864) -> Result<SubmitDdlTaskResponse> {
865    let (id, _) = ddl_manager
866        .submit_alter_database(alter_database_task.clone())
867        .await?;
868
869    let procedure_id = id.to_string();
870    info!(
871        "Database {}.{} is altered via procedure_id {id:?}",
872        alter_database_task.catalog(),
873        alter_database_task.schema()
874    );
875
876    Ok(SubmitDdlTaskResponse {
877        key: procedure_id.into(),
878        ..Default::default()
879    })
880}
881
882async fn handle_drop_flow_task(
883    ddl_manager: &DdlManager,
884    drop_flow_task: DropFlowTask,
885) -> Result<SubmitDdlTaskResponse> {
886    let (id, _) = ddl_manager
887        .submit_drop_flow_task(drop_flow_task.clone())
888        .await?;
889
890    let procedure_id = id.to_string();
891    info!(
892        "Flow {}.{}({}) is dropped via procedure_id {id:?}",
893        drop_flow_task.catalog_name, drop_flow_task.flow_name, drop_flow_task.flow_id,
894    );
895
896    Ok(SubmitDdlTaskResponse {
897        key: procedure_id.into(),
898        ..Default::default()
899    })
900}
901
902#[cfg(feature = "enterprise")]
903async fn handle_drop_trigger_task(
904    ddl_manager: &DdlManager,
905    drop_trigger_task: DropTriggerTask,
906    query_context: QueryContext,
907) -> Result<SubmitDdlTaskResponse> {
908    let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
909        use crate::error::UnsupportedSnafu;
910
911        return UnsupportedSnafu {
912            operation: "drop trigger",
913        }
914        .fail();
915    };
916
917    m.drop_trigger(
918        drop_trigger_task,
919        ddl_manager.procedure_manager.clone(),
920        ddl_manager.ddl_context.clone(),
921        query_context,
922    )
923    .await
924}
925
926async fn handle_drop_view_task(
927    ddl_manager: &DdlManager,
928    drop_view_task: DropViewTask,
929) -> Result<SubmitDdlTaskResponse> {
930    let (id, _) = ddl_manager
931        .submit_drop_view_task(drop_view_task.clone())
932        .await?;
933
934    let procedure_id = id.to_string();
935    info!(
936        "View {}({}) is dropped via procedure_id {id:?}",
937        drop_view_task.table_ref(),
938        drop_view_task.view_id,
939    );
940
941    Ok(SubmitDdlTaskResponse {
942        key: procedure_id.into(),
943        ..Default::default()
944    })
945}
946
947async fn handle_create_flow_task(
948    ddl_manager: &DdlManager,
949    create_flow_task: CreateFlowTask,
950    query_context: QueryContext,
951) -> Result<SubmitDdlTaskResponse> {
952    let (id, output) = ddl_manager
953        .submit_create_flow_task(create_flow_task.clone(), query_context)
954        .await?;
955
956    let procedure_id = id.to_string();
957    let output = output.context(ProcedureOutputSnafu {
958        procedure_id: &procedure_id,
959        err_msg: "empty output",
960    })?;
961    let flow_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
962        procedure_id: &procedure_id,
963        err_msg: "downcast to `u32`",
964    })?);
965    if !create_flow_task.or_replace {
966        info!(
967            "Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
968            create_flow_task.catalog_name, create_flow_task.flow_name,
969        );
970    } else {
971        info!(
972            "Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
973            create_flow_task.catalog_name, create_flow_task.flow_name,
974        );
975    }
976
977    Ok(SubmitDdlTaskResponse {
978        key: procedure_id.into(),
979        ..Default::default()
980    })
981}
982
983#[cfg(feature = "enterprise")]
984async fn handle_create_trigger_task(
985    ddl_manager: &DdlManager,
986    create_trigger_task: CreateTriggerTask,
987    query_context: QueryContext,
988) -> Result<SubmitDdlTaskResponse> {
989    let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
990        use crate::error::UnsupportedSnafu;
991
992        return UnsupportedSnafu {
993            operation: "create trigger",
994        }
995        .fail();
996    };
997
998    m.create_trigger(
999        create_trigger_task,
1000        ddl_manager.procedure_manager.clone(),
1001        ddl_manager.ddl_context.clone(),
1002        query_context,
1003    )
1004    .await
1005}
1006
1007async fn handle_alter_logical_table_tasks(
1008    ddl_manager: &DdlManager,
1009    alter_table_tasks: Vec<AlterTableTask>,
1010) -> Result<SubmitDdlTaskResponse> {
1011    ensure!(
1012        !alter_table_tasks.is_empty(),
1013        EmptyDdlTasksSnafu {
1014            name: "alter logical tables"
1015        }
1016    );
1017
1018    // Use the physical table id in the first logical table, then it will be checked in the procedure.
1019    let first_table = TableNameKey {
1020        catalog: &alter_table_tasks[0].alter_table.catalog_name,
1021        schema: &alter_table_tasks[0].alter_table.schema_name,
1022        table: &alter_table_tasks[0].alter_table.table_name,
1023    };
1024    let physical_table_id =
1025        utils::get_physical_table_id(ddl_manager.table_metadata_manager(), first_table).await?;
1026    let num_logical_tables = alter_table_tasks.len();
1027
1028    let (id, _) = ddl_manager
1029        .submit_alter_logical_table_tasks(alter_table_tasks, physical_table_id)
1030        .await?;
1031
1032    info!(
1033        "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is altered via procedure_id {id:?}"
1034    );
1035
1036    let procedure_id = id.to_string();
1037
1038    Ok(SubmitDdlTaskResponse {
1039        key: procedure_id.into(),
1040        ..Default::default()
1041    })
1042}
1043
1044/// Handle the `[CreateViewTask]` and returns the DDL response when success.
1045async fn handle_create_view_task(
1046    ddl_manager: &DdlManager,
1047    create_view_task: CreateViewTask,
1048) -> Result<SubmitDdlTaskResponse> {
1049    let (id, output) = ddl_manager
1050        .submit_create_view_task(create_view_task)
1051        .await?;
1052
1053    let procedure_id = id.to_string();
1054    let output = output.context(ProcedureOutputSnafu {
1055        procedure_id: &procedure_id,
1056        err_msg: "empty output",
1057    })?;
1058    let view_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
1059        procedure_id: &procedure_id,
1060        err_msg: "downcast to `u32`",
1061    })?);
1062    info!("View: {view_id} is created via procedure_id {id:?}");
1063
1064    Ok(SubmitDdlTaskResponse {
1065        key: procedure_id.into(),
1066        table_ids: vec![view_id],
1067    })
1068}
1069
1070async fn handle_comment_on_task(
1071    ddl_manager: &DdlManager,
1072    comment_on_task: CommentOnTask,
1073) -> Result<SubmitDdlTaskResponse> {
1074    let (id, _) = ddl_manager
1075        .submit_comment_on_task(comment_on_task.clone())
1076        .await?;
1077
1078    let procedure_id = id.to_string();
1079    info!(
1080        "Comment on {}.{}.{} is updated via procedure_id {id:?}",
1081        comment_on_task.catalog_name, comment_on_task.schema_name, comment_on_task.object_name
1082    );
1083
1084    Ok(SubmitDdlTaskResponse {
1085        key: procedure_id.into(),
1086        ..Default::default()
1087    })
1088}
1089
1090#[cfg(test)]
1091mod tests {
1092    use std::sync::Arc;
1093    use std::time::Duration;
1094
1095    use common_error::ext::BoxedError;
1096    use common_procedure::local::LocalManager;
1097    use common_procedure::test_util::InMemoryPoisonStore;
1098    use common_procedure::{BoxedProcedure, ProcedureManagerRef};
1099    use store_api::storage::TableId;
1100    use table::table_name::TableName;
1101
1102    use super::DdlManager;
1103    use crate::cache_invalidator::DummyCacheInvalidator;
1104    use crate::ddl::alter_table::AlterTableProcedure;
1105    use crate::ddl::create_table::CreateTableProcedure;
1106    use crate::ddl::drop_table::DropTableProcedure;
1107    use crate::ddl::flow_meta::FlowMetadataAllocator;
1108    use crate::ddl::table_meta::TableMetadataAllocator;
1109    use crate::ddl::truncate_table::TruncateTableProcedure;
1110    use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
1111    use crate::ddl_manager::RepartitionProcedureFactory;
1112    use crate::key::TableMetadataManager;
1113    use crate::key::flow::FlowMetadataManager;
1114    use crate::kv_backend::memory::MemoryKvBackend;
1115    use crate::node_manager::{DatanodeManager, DatanodeRef, FlownodeManager, FlownodeRef};
1116    use crate::peer::Peer;
1117    use crate::region_keeper::MemoryRegionKeeper;
1118    use crate::region_registry::LeaderRegionRegistry;
1119    use crate::sequence::SequenceBuilder;
1120    use crate::state_store::KvStateStore;
1121    use crate::wal_provider::WalProvider;
1122
1123    /// A dummy implemented [NodeManager].
1124    pub struct DummyDatanodeManager;
1125
1126    #[async_trait::async_trait]
1127    impl DatanodeManager for DummyDatanodeManager {
1128        async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
1129            unimplemented!()
1130        }
1131    }
1132
1133    #[async_trait::async_trait]
1134    impl FlownodeManager for DummyDatanodeManager {
1135        async fn flownode(&self, _node: &Peer) -> FlownodeRef {
1136            unimplemented!()
1137        }
1138    }
1139
1140    struct DummyRepartitionProcedureFactory;
1141
1142    #[async_trait::async_trait]
1143    impl RepartitionProcedureFactory for DummyRepartitionProcedureFactory {
1144        fn create(
1145            &self,
1146            _ddl_ctx: &DdlContext,
1147            _table_name: TableName,
1148            _table_id: TableId,
1149            _from_exprs: Vec<String>,
1150            _to_exprs: Vec<String>,
1151            _timeout: Option<Duration>,
1152        ) -> std::result::Result<BoxedProcedure, BoxedError> {
1153            unimplemented!()
1154        }
1155
1156        fn register_loaders(
1157            &self,
1158            _ddl_ctx: &DdlContext,
1159            _procedure_manager: &ProcedureManagerRef,
1160        ) -> std::result::Result<(), BoxedError> {
1161            Ok(())
1162        }
1163    }
1164
1165    #[test]
1166    fn test_try_new() {
1167        let kv_backend = Arc::new(MemoryKvBackend::new());
1168        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
1169        let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
1170            Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
1171            Arc::new(WalProvider::default()),
1172        ));
1173        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
1174        let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
1175            Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()),
1176        ));
1177
1178        let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
1179        let poison_manager = Arc::new(InMemoryPoisonStore::default());
1180        let procedure_manager = Arc::new(LocalManager::new(
1181            Default::default(),
1182            state_store,
1183            poison_manager,
1184            None,
1185            None,
1186        ));
1187
1188        let _ = DdlManager::try_new(
1189            DdlContext {
1190                node_manager: Arc::new(DummyDatanodeManager),
1191                cache_invalidator: Arc::new(DummyCacheInvalidator),
1192                table_metadata_manager,
1193                table_metadata_allocator,
1194                flow_metadata_manager,
1195                flow_metadata_allocator,
1196                memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
1197                leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
1198                region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
1199            },
1200            procedure_manager.clone(),
1201            Arc::new(DummyRepartitionProcedureFactory),
1202            true,
1203        );
1204
1205        let expected_loaders = vec![
1206            CreateTableProcedure::TYPE_NAME,
1207            AlterTableProcedure::TYPE_NAME,
1208            DropTableProcedure::TYPE_NAME,
1209            TruncateTableProcedure::TYPE_NAME,
1210        ];
1211
1212        for loader in expected_loaders {
1213            assert!(procedure_manager.contains_loader(loader));
1214        }
1215    }
1216}