common_meta/
ddl_manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_procedure::{
18    watcher, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId,
19};
20use common_telemetry::tracing_context::{FutureExt, TracingContext};
21use common_telemetry::{debug, info, tracing};
22use derive_builder::Builder;
23use snafu::{ensure, OptionExt, ResultExt};
24use store_api::storage::TableId;
25
26use crate::ddl::alter_database::AlterDatabaseProcedure;
27use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
28use crate::ddl::alter_table::AlterTableProcedure;
29use crate::ddl::create_database::CreateDatabaseProcedure;
30use crate::ddl::create_flow::CreateFlowProcedure;
31use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
32use crate::ddl::create_table::CreateTableProcedure;
33use crate::ddl::create_view::CreateViewProcedure;
34use crate::ddl::drop_database::DropDatabaseProcedure;
35use crate::ddl::drop_flow::DropFlowProcedure;
36use crate::ddl::drop_table::DropTableProcedure;
37use crate::ddl::drop_view::DropViewProcedure;
38use crate::ddl::truncate_table::TruncateTableProcedure;
39use crate::ddl::{utils, DdlContext};
40use crate::error::{
41    EmptyDdlTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result,
42    SubmitProcedureSnafu, TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
43    UnexpectedLogicalRouteTableSnafu, WaitProcedureSnafu,
44};
45use crate::key::table_info::TableInfoValue;
46use crate::key::table_name::TableNameKey;
47use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
48use crate::procedure_executor::ExecutorContext;
49#[cfg(feature = "enterprise")]
50use crate::rpc::ddl::trigger::CreateTriggerTask;
51#[cfg(feature = "enterprise")]
52use crate::rpc::ddl::trigger::DropTriggerTask;
53#[cfg(feature = "enterprise")]
54use crate::rpc::ddl::DdlTask::CreateTrigger;
55#[cfg(feature = "enterprise")]
56use crate::rpc::ddl::DdlTask::DropTrigger;
57use crate::rpc::ddl::DdlTask::{
58    AlterDatabase, AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables,
59    CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView,
60    TruncateTable,
61};
62use crate::rpc::ddl::{
63    AlterDatabaseTask, AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask,
64    CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, QueryContext,
65    SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
66};
67use crate::rpc::router::RegionRoute;
68
69pub type DdlManagerRef = Arc<DdlManager>;
70
71pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader;
72
73/// The [DdlManager] provides the ability to execute Ddl.
74#[derive(Builder)]
75pub struct DdlManager {
76    ddl_context: DdlContext,
77    procedure_manager: ProcedureManagerRef,
78    #[cfg(feature = "enterprise")]
79    trigger_ddl_manager: Option<TriggerDdlManagerRef>,
80}
81
82/// This trait is responsible for handling DDL tasks about triggers. e.g.,
83/// create trigger, drop trigger, etc.
84#[cfg(feature = "enterprise")]
85#[async_trait::async_trait]
86pub trait TriggerDdlManager: Send + Sync {
87    async fn create_trigger(
88        &self,
89        create_trigger_task: CreateTriggerTask,
90        procedure_manager: ProcedureManagerRef,
91        ddl_context: DdlContext,
92        query_context: QueryContext,
93    ) -> Result<SubmitDdlTaskResponse>;
94
95    async fn drop_trigger(
96        &self,
97        drop_trigger_task: DropTriggerTask,
98        procedure_manager: ProcedureManagerRef,
99        ddl_context: DdlContext,
100        query_context: QueryContext,
101    ) -> Result<SubmitDdlTaskResponse>;
102
103    fn as_any(&self) -> &dyn std::any::Any;
104}
105
106#[cfg(feature = "enterprise")]
107pub type TriggerDdlManagerRef = Arc<dyn TriggerDdlManager>;
108
109macro_rules! procedure_loader_entry {
110    ($procedure:ident) => {
111        (
112            $procedure::TYPE_NAME,
113            &|context: DdlContext| -> BoxedProcedureLoader {
114                Box::new(move |json: &str| {
115                    let context = context.clone();
116                    $procedure::from_json(json, context).map(|p| Box::new(p) as _)
117                })
118            },
119        )
120    };
121}
122
123macro_rules! procedure_loader {
124    ($($procedure:ident),*) => {
125        vec![
126            $(procedure_loader_entry!($procedure)),*
127        ]
128    };
129}
130
131impl DdlManager {
132    /// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered.
133    pub fn try_new(
134        ddl_context: DdlContext,
135        procedure_manager: ProcedureManagerRef,
136        register_loaders: bool,
137    ) -> Result<Self> {
138        let manager = Self {
139            ddl_context,
140            procedure_manager,
141            #[cfg(feature = "enterprise")]
142            trigger_ddl_manager: None,
143        };
144        if register_loaders {
145            manager.register_loaders()?;
146        }
147        Ok(manager)
148    }
149
150    #[cfg(feature = "enterprise")]
151    pub fn with_trigger_ddl_manager(
152        mut self,
153        trigger_ddl_manager: Option<TriggerDdlManagerRef>,
154    ) -> Self {
155        self.trigger_ddl_manager = trigger_ddl_manager;
156        self
157    }
158
159    /// Returns the [TableMetadataManagerRef].
160    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
161        &self.ddl_context.table_metadata_manager
162    }
163
164    /// Returns the [DdlContext]
165    pub fn create_context(&self) -> DdlContext {
166        self.ddl_context.clone()
167    }
168
169    /// Registers all Ddl loaders.
170    pub fn register_loaders(&self) -> Result<()> {
171        let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = procedure_loader!(
172            CreateTableProcedure,
173            CreateLogicalTablesProcedure,
174            CreateViewProcedure,
175            CreateFlowProcedure,
176            AlterTableProcedure,
177            AlterLogicalTablesProcedure,
178            AlterDatabaseProcedure,
179            DropTableProcedure,
180            DropFlowProcedure,
181            TruncateTableProcedure,
182            CreateDatabaseProcedure,
183            DropDatabaseProcedure,
184            DropViewProcedure
185        );
186
187        for (type_name, loader_factory) in loaders {
188            let context = self.create_context();
189            self.procedure_manager
190                .register_loader(type_name, loader_factory(context))
191                .context(RegisterProcedureLoaderSnafu { type_name })?;
192        }
193
194        Ok(())
195    }
196
197    /// Submits and executes an alter table task.
198    #[tracing::instrument(skip_all)]
199    pub async fn submit_alter_table_task(
200        &self,
201        table_id: TableId,
202        alter_table_task: AlterTableTask,
203    ) -> Result<(ProcedureId, Option<Output>)> {
204        let context = self.create_context();
205
206        let procedure = AlterTableProcedure::new(table_id, alter_table_task, context)?;
207
208        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
209
210        self.submit_procedure(procedure_with_id).await
211    }
212
213    /// Submits and executes a create table task.
214    #[tracing::instrument(skip_all)]
215    pub async fn submit_create_table_task(
216        &self,
217        create_table_task: CreateTableTask,
218    ) -> Result<(ProcedureId, Option<Output>)> {
219        let context = self.create_context();
220
221        let procedure = CreateTableProcedure::new(create_table_task, context);
222
223        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
224
225        self.submit_procedure(procedure_with_id).await
226    }
227
228    /// Submits and executes a `[CreateViewTask]`.
229    #[tracing::instrument(skip_all)]
230    pub async fn submit_create_view_task(
231        &self,
232        create_view_task: CreateViewTask,
233    ) -> Result<(ProcedureId, Option<Output>)> {
234        let context = self.create_context();
235
236        let procedure = CreateViewProcedure::new(create_view_task, context);
237
238        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
239
240        self.submit_procedure(procedure_with_id).await
241    }
242
243    /// Submits and executes a create multiple logical table tasks.
244    #[tracing::instrument(skip_all)]
245    pub async fn submit_create_logical_table_tasks(
246        &self,
247        create_table_tasks: Vec<CreateTableTask>,
248        physical_table_id: TableId,
249    ) -> Result<(ProcedureId, Option<Output>)> {
250        let context = self.create_context();
251
252        let procedure =
253            CreateLogicalTablesProcedure::new(create_table_tasks, physical_table_id, context);
254
255        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
256
257        self.submit_procedure(procedure_with_id).await
258    }
259
260    /// Submits and executes alter multiple table tasks.
261    #[tracing::instrument(skip_all)]
262    pub async fn submit_alter_logical_table_tasks(
263        &self,
264        alter_table_tasks: Vec<AlterTableTask>,
265        physical_table_id: TableId,
266    ) -> Result<(ProcedureId, Option<Output>)> {
267        let context = self.create_context();
268
269        let procedure =
270            AlterLogicalTablesProcedure::new(alter_table_tasks, physical_table_id, context);
271
272        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
273
274        self.submit_procedure(procedure_with_id).await
275    }
276
277    /// Submits and executes a drop table task.
278    #[tracing::instrument(skip_all)]
279    pub async fn submit_drop_table_task(
280        &self,
281        drop_table_task: DropTableTask,
282    ) -> Result<(ProcedureId, Option<Output>)> {
283        let context = self.create_context();
284
285        let procedure = DropTableProcedure::new(drop_table_task, context);
286
287        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
288
289        self.submit_procedure(procedure_with_id).await
290    }
291
292    /// Submits and executes a create database task.
293    #[tracing::instrument(skip_all)]
294    pub async fn submit_create_database(
295        &self,
296        CreateDatabaseTask {
297            catalog,
298            schema,
299            create_if_not_exists,
300            options,
301        }: CreateDatabaseTask,
302    ) -> Result<(ProcedureId, Option<Output>)> {
303        let context = self.create_context();
304        let procedure =
305            CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context);
306        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
307
308        self.submit_procedure(procedure_with_id).await
309    }
310
311    /// Submits and executes a drop table task.
312    #[tracing::instrument(skip_all)]
313    pub async fn submit_drop_database(
314        &self,
315        DropDatabaseTask {
316            catalog,
317            schema,
318            drop_if_exists,
319        }: DropDatabaseTask,
320    ) -> Result<(ProcedureId, Option<Output>)> {
321        let context = self.create_context();
322        let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context);
323        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
324
325        self.submit_procedure(procedure_with_id).await
326    }
327
328    pub async fn submit_alter_database(
329        &self,
330        alter_database_task: AlterDatabaseTask,
331    ) -> Result<(ProcedureId, Option<Output>)> {
332        let context = self.create_context();
333        let procedure = AlterDatabaseProcedure::new(alter_database_task, context)?;
334        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
335
336        self.submit_procedure(procedure_with_id).await
337    }
338
339    /// Submits and executes a create flow task.
340    #[tracing::instrument(skip_all)]
341    pub async fn submit_create_flow_task(
342        &self,
343        create_flow: CreateFlowTask,
344        query_context: QueryContext,
345    ) -> Result<(ProcedureId, Option<Output>)> {
346        let context = self.create_context();
347        let procedure = CreateFlowProcedure::new(create_flow, query_context, context);
348        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
349
350        self.submit_procedure(procedure_with_id).await
351    }
352
353    /// Submits and executes a drop flow task.
354    #[tracing::instrument(skip_all)]
355    pub async fn submit_drop_flow_task(
356        &self,
357        drop_flow: DropFlowTask,
358    ) -> Result<(ProcedureId, Option<Output>)> {
359        let context = self.create_context();
360        let procedure = DropFlowProcedure::new(drop_flow, context);
361        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
362
363        self.submit_procedure(procedure_with_id).await
364    }
365
366    /// Submits and executes a drop view task.
367    #[tracing::instrument(skip_all)]
368    pub async fn submit_drop_view_task(
369        &self,
370        drop_view: DropViewTask,
371    ) -> Result<(ProcedureId, Option<Output>)> {
372        let context = self.create_context();
373        let procedure = DropViewProcedure::new(drop_view, context);
374        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
375
376        self.submit_procedure(procedure_with_id).await
377    }
378
379    /// Submits and executes a truncate table task.
380    #[tracing::instrument(skip_all)]
381    pub async fn submit_truncate_table_task(
382        &self,
383        truncate_table_task: TruncateTableTask,
384        table_info_value: DeserializedValueWithBytes<TableInfoValue>,
385        region_routes: Vec<RegionRoute>,
386    ) -> Result<(ProcedureId, Option<Output>)> {
387        let context = self.create_context();
388        let procedure = TruncateTableProcedure::new(
389            truncate_table_task,
390            table_info_value,
391            region_routes,
392            context,
393        );
394
395        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
396
397        self.submit_procedure(procedure_with_id).await
398    }
399
400    async fn submit_procedure(
401        &self,
402        procedure_with_id: ProcedureWithId,
403    ) -> Result<(ProcedureId, Option<Output>)> {
404        let procedure_id = procedure_with_id.id;
405
406        let mut watcher = self
407            .procedure_manager
408            .submit(procedure_with_id)
409            .await
410            .context(SubmitProcedureSnafu)?;
411
412        let output = watcher::wait(&mut watcher)
413            .await
414            .context(WaitProcedureSnafu)?;
415
416        Ok((procedure_id, output))
417    }
418
419    pub async fn submit_ddl_task(
420        &self,
421        ctx: &ExecutorContext,
422        request: SubmitDdlTaskRequest,
423    ) -> Result<SubmitDdlTaskResponse> {
424        let span = ctx
425            .tracing_context
426            .as_ref()
427            .map(TracingContext::from_w3c)
428            .unwrap_or_else(TracingContext::from_current_span)
429            .attach(tracing::info_span!("DdlManager::submit_ddl_task"));
430        async move {
431            debug!("Submitting Ddl task: {:?}", request.task);
432            match request.task {
433                CreateTable(create_table_task) => {
434                    handle_create_table_task(self, create_table_task).await
435                }
436                DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await,
437                AlterTable(alter_table_task) => {
438                    handle_alter_table_task(self, alter_table_task).await
439                }
440                TruncateTable(truncate_table_task) => {
441                    handle_truncate_table_task(self, truncate_table_task).await
442                }
443                CreateLogicalTables(create_table_tasks) => {
444                    handle_create_logical_table_tasks(self, create_table_tasks).await
445                }
446                AlterLogicalTables(alter_table_tasks) => {
447                    handle_alter_logical_table_tasks(self, alter_table_tasks).await
448                }
449                DropLogicalTables(_) => todo!(),
450                CreateDatabase(create_database_task) => {
451                    handle_create_database_task(self, create_database_task).await
452                }
453                DropDatabase(drop_database_task) => {
454                    handle_drop_database_task(self, drop_database_task).await
455                }
456                AlterDatabase(alter_database_task) => {
457                    handle_alter_database_task(self, alter_database_task).await
458                }
459                CreateFlow(create_flow_task) => {
460                    handle_create_flow_task(self, create_flow_task, request.query_context.into())
461                        .await
462                }
463                DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
464                CreateView(create_view_task) => {
465                    handle_create_view_task(self, create_view_task).await
466                }
467                DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
468                #[cfg(feature = "enterprise")]
469                CreateTrigger(create_trigger_task) => {
470                    handle_create_trigger_task(
471                        self,
472                        create_trigger_task,
473                        request.query_context.into(),
474                    )
475                    .await
476                }
477                #[cfg(feature = "enterprise")]
478                DropTrigger(drop_trigger_task) => {
479                    handle_drop_trigger_task(self, drop_trigger_task, request.query_context.into())
480                        .await
481                }
482            }
483        }
484        .trace(span)
485        .await
486    }
487}
488
489async fn handle_truncate_table_task(
490    ddl_manager: &DdlManager,
491    truncate_table_task: TruncateTableTask,
492) -> Result<SubmitDdlTaskResponse> {
493    let table_id = truncate_table_task.table_id;
494    let table_metadata_manager = &ddl_manager.table_metadata_manager();
495    let table_ref = truncate_table_task.table_ref();
496
497    let (table_info_value, table_route_value) =
498        table_metadata_manager.get_full_table_info(table_id).await?;
499
500    let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu {
501        table: table_ref.to_string(),
502    })?;
503
504    let table_route_value = table_route_value.context(TableRouteNotFoundSnafu { table_id })?;
505
506    let table_route = table_route_value.into_inner().region_routes()?.clone();
507
508    let (id, _) = ddl_manager
509        .submit_truncate_table_task(truncate_table_task, table_info_value, table_route)
510        .await?;
511
512    info!("Table: {table_id} is truncated via procedure_id {id:?}");
513
514    Ok(SubmitDdlTaskResponse {
515        key: id.to_string().into(),
516        ..Default::default()
517    })
518}
519
520async fn handle_alter_table_task(
521    ddl_manager: &DdlManager,
522    alter_table_task: AlterTableTask,
523) -> Result<SubmitDdlTaskResponse> {
524    let table_ref = alter_table_task.table_ref();
525
526    let table_id = ddl_manager
527        .table_metadata_manager()
528        .table_name_manager()
529        .get(TableNameKey::new(
530            table_ref.catalog,
531            table_ref.schema,
532            table_ref.table,
533        ))
534        .await?
535        .with_context(|| TableNotFoundSnafu {
536            table_name: table_ref.to_string(),
537        })?
538        .table_id();
539
540    let table_route_value = ddl_manager
541        .table_metadata_manager()
542        .table_route_manager()
543        .table_route_storage()
544        .get(table_id)
545        .await?
546        .context(TableRouteNotFoundSnafu { table_id })?;
547    ensure!(
548        table_route_value.is_physical(),
549        UnexpectedLogicalRouteTableSnafu {
550            err_msg: format!("{:?} is a non-physical TableRouteValue.", table_ref),
551        }
552    );
553
554    let (id, _) = ddl_manager
555        .submit_alter_table_task(table_id, alter_table_task)
556        .await?;
557
558    info!("Table: {table_id} is altered via procedure_id {id:?}");
559
560    Ok(SubmitDdlTaskResponse {
561        key: id.to_string().into(),
562        ..Default::default()
563    })
564}
565
566async fn handle_drop_table_task(
567    ddl_manager: &DdlManager,
568    drop_table_task: DropTableTask,
569) -> Result<SubmitDdlTaskResponse> {
570    let table_id = drop_table_task.table_id;
571    let (id, _) = ddl_manager.submit_drop_table_task(drop_table_task).await?;
572
573    info!("Table: {table_id} is dropped via procedure_id {id:?}");
574
575    Ok(SubmitDdlTaskResponse {
576        key: id.to_string().into(),
577        ..Default::default()
578    })
579}
580
581async fn handle_create_table_task(
582    ddl_manager: &DdlManager,
583    create_table_task: CreateTableTask,
584) -> Result<SubmitDdlTaskResponse> {
585    let (id, output) = ddl_manager
586        .submit_create_table_task(create_table_task)
587        .await?;
588
589    let procedure_id = id.to_string();
590    let output = output.context(ProcedureOutputSnafu {
591        procedure_id: &procedure_id,
592        err_msg: "empty output",
593    })?;
594    let table_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
595        procedure_id: &procedure_id,
596        err_msg: "downcast to `u32`",
597    })?);
598    info!("Table: {table_id} is created via procedure_id {id:?}");
599
600    Ok(SubmitDdlTaskResponse {
601        key: procedure_id.into(),
602        table_ids: vec![table_id],
603    })
604}
605
606async fn handle_create_logical_table_tasks(
607    ddl_manager: &DdlManager,
608    create_table_tasks: Vec<CreateTableTask>,
609) -> Result<SubmitDdlTaskResponse> {
610    ensure!(
611        !create_table_tasks.is_empty(),
612        EmptyDdlTasksSnafu {
613            name: "create logical tables"
614        }
615    );
616    let physical_table_id = utils::check_and_get_physical_table_id(
617        ddl_manager.table_metadata_manager(),
618        &create_table_tasks,
619    )
620    .await?;
621    let num_logical_tables = create_table_tasks.len();
622
623    let (id, output) = ddl_manager
624        .submit_create_logical_table_tasks(create_table_tasks, physical_table_id)
625        .await?;
626
627    info!("{num_logical_tables} logical tables on physical table: {physical_table_id:?} is created via procedure_id {id:?}");
628
629    let procedure_id = id.to_string();
630    let output = output.context(ProcedureOutputSnafu {
631        procedure_id: &procedure_id,
632        err_msg: "empty output",
633    })?;
634    let table_ids = output
635        .downcast_ref::<Vec<TableId>>()
636        .context(ProcedureOutputSnafu {
637            procedure_id: &procedure_id,
638            err_msg: "downcast to `Vec<TableId>`",
639        })?
640        .clone();
641
642    Ok(SubmitDdlTaskResponse {
643        key: procedure_id.into(),
644        table_ids,
645    })
646}
647
648async fn handle_create_database_task(
649    ddl_manager: &DdlManager,
650    create_database_task: CreateDatabaseTask,
651) -> Result<SubmitDdlTaskResponse> {
652    let (id, _) = ddl_manager
653        .submit_create_database(create_database_task.clone())
654        .await?;
655
656    let procedure_id = id.to_string();
657    info!(
658        "Database {}.{} is created via procedure_id {id:?}",
659        create_database_task.catalog, create_database_task.schema
660    );
661
662    Ok(SubmitDdlTaskResponse {
663        key: procedure_id.into(),
664        ..Default::default()
665    })
666}
667
668async fn handle_drop_database_task(
669    ddl_manager: &DdlManager,
670    drop_database_task: DropDatabaseTask,
671) -> Result<SubmitDdlTaskResponse> {
672    let (id, _) = ddl_manager
673        .submit_drop_database(drop_database_task.clone())
674        .await?;
675
676    let procedure_id = id.to_string();
677    info!(
678        "Database {}.{} is dropped via procedure_id {id:?}",
679        drop_database_task.catalog, drop_database_task.schema
680    );
681
682    Ok(SubmitDdlTaskResponse {
683        key: procedure_id.into(),
684        ..Default::default()
685    })
686}
687
688async fn handle_alter_database_task(
689    ddl_manager: &DdlManager,
690    alter_database_task: AlterDatabaseTask,
691) -> Result<SubmitDdlTaskResponse> {
692    let (id, _) = ddl_manager
693        .submit_alter_database(alter_database_task.clone())
694        .await?;
695
696    let procedure_id = id.to_string();
697    info!(
698        "Database {}.{} is altered via procedure_id {id:?}",
699        alter_database_task.catalog(),
700        alter_database_task.schema()
701    );
702
703    Ok(SubmitDdlTaskResponse {
704        key: procedure_id.into(),
705        ..Default::default()
706    })
707}
708
709async fn handle_drop_flow_task(
710    ddl_manager: &DdlManager,
711    drop_flow_task: DropFlowTask,
712) -> Result<SubmitDdlTaskResponse> {
713    let (id, _) = ddl_manager
714        .submit_drop_flow_task(drop_flow_task.clone())
715        .await?;
716
717    let procedure_id = id.to_string();
718    info!(
719        "Flow {}.{}({}) is dropped via procedure_id {id:?}",
720        drop_flow_task.catalog_name, drop_flow_task.flow_name, drop_flow_task.flow_id,
721    );
722
723    Ok(SubmitDdlTaskResponse {
724        key: procedure_id.into(),
725        ..Default::default()
726    })
727}
728
729#[cfg(feature = "enterprise")]
730async fn handle_drop_trigger_task(
731    ddl_manager: &DdlManager,
732    drop_trigger_task: DropTriggerTask,
733    query_context: QueryContext,
734) -> Result<SubmitDdlTaskResponse> {
735    let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
736        use crate::error::UnsupportedSnafu;
737
738        return UnsupportedSnafu {
739            operation: "drop trigger",
740        }
741        .fail();
742    };
743
744    m.drop_trigger(
745        drop_trigger_task,
746        ddl_manager.procedure_manager.clone(),
747        ddl_manager.ddl_context.clone(),
748        query_context,
749    )
750    .await
751}
752
753async fn handle_drop_view_task(
754    ddl_manager: &DdlManager,
755    drop_view_task: DropViewTask,
756) -> Result<SubmitDdlTaskResponse> {
757    let (id, _) = ddl_manager
758        .submit_drop_view_task(drop_view_task.clone())
759        .await?;
760
761    let procedure_id = id.to_string();
762    info!(
763        "View {}({}) is dropped via procedure_id {id:?}",
764        drop_view_task.table_ref(),
765        drop_view_task.view_id,
766    );
767
768    Ok(SubmitDdlTaskResponse {
769        key: procedure_id.into(),
770        ..Default::default()
771    })
772}
773
774async fn handle_create_flow_task(
775    ddl_manager: &DdlManager,
776    create_flow_task: CreateFlowTask,
777    query_context: QueryContext,
778) -> Result<SubmitDdlTaskResponse> {
779    let (id, output) = ddl_manager
780        .submit_create_flow_task(create_flow_task.clone(), query_context)
781        .await?;
782
783    let procedure_id = id.to_string();
784    let output = output.context(ProcedureOutputSnafu {
785        procedure_id: &procedure_id,
786        err_msg: "empty output",
787    })?;
788    let flow_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
789        procedure_id: &procedure_id,
790        err_msg: "downcast to `u32`",
791    })?);
792    if !create_flow_task.or_replace {
793        info!(
794            "Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
795            create_flow_task.catalog_name, create_flow_task.flow_name,
796        );
797    } else {
798        info!(
799            "Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
800            create_flow_task.catalog_name, create_flow_task.flow_name,
801        );
802    }
803
804    Ok(SubmitDdlTaskResponse {
805        key: procedure_id.into(),
806        ..Default::default()
807    })
808}
809
810#[cfg(feature = "enterprise")]
811async fn handle_create_trigger_task(
812    ddl_manager: &DdlManager,
813    create_trigger_task: CreateTriggerTask,
814    query_context: QueryContext,
815) -> Result<SubmitDdlTaskResponse> {
816    let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
817        use crate::error::UnsupportedSnafu;
818
819        return UnsupportedSnafu {
820            operation: "create trigger",
821        }
822        .fail();
823    };
824
825    m.create_trigger(
826        create_trigger_task,
827        ddl_manager.procedure_manager.clone(),
828        ddl_manager.ddl_context.clone(),
829        query_context,
830    )
831    .await
832}
833
834async fn handle_alter_logical_table_tasks(
835    ddl_manager: &DdlManager,
836    alter_table_tasks: Vec<AlterTableTask>,
837) -> Result<SubmitDdlTaskResponse> {
838    ensure!(
839        !alter_table_tasks.is_empty(),
840        EmptyDdlTasksSnafu {
841            name: "alter logical tables"
842        }
843    );
844
845    // Use the physical table id in the first logical table, then it will be checked in the procedure.
846    let first_table = TableNameKey {
847        catalog: &alter_table_tasks[0].alter_table.catalog_name,
848        schema: &alter_table_tasks[0].alter_table.schema_name,
849        table: &alter_table_tasks[0].alter_table.table_name,
850    };
851    let physical_table_id =
852        utils::get_physical_table_id(ddl_manager.table_metadata_manager(), first_table).await?;
853    let num_logical_tables = alter_table_tasks.len();
854
855    let (id, _) = ddl_manager
856        .submit_alter_logical_table_tasks(alter_table_tasks, physical_table_id)
857        .await?;
858
859    info!("{num_logical_tables} logical tables on physical table: {physical_table_id:?} is altered via procedure_id {id:?}");
860
861    let procedure_id = id.to_string();
862
863    Ok(SubmitDdlTaskResponse {
864        key: procedure_id.into(),
865        ..Default::default()
866    })
867}
868
869/// Handle the `[CreateViewTask]` and returns the DDL response when success.
870async fn handle_create_view_task(
871    ddl_manager: &DdlManager,
872    create_view_task: CreateViewTask,
873) -> Result<SubmitDdlTaskResponse> {
874    let (id, output) = ddl_manager
875        .submit_create_view_task(create_view_task)
876        .await?;
877
878    let procedure_id = id.to_string();
879    let output = output.context(ProcedureOutputSnafu {
880        procedure_id: &procedure_id,
881        err_msg: "empty output",
882    })?;
883    let view_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
884        procedure_id: &procedure_id,
885        err_msg: "downcast to `u32`",
886    })?);
887    info!("View: {view_id} is created via procedure_id {id:?}");
888
889    Ok(SubmitDdlTaskResponse {
890        key: procedure_id.into(),
891        table_ids: vec![view_id],
892    })
893}
894
895#[cfg(test)]
896mod tests {
897    use std::sync::Arc;
898
899    use common_procedure::local::LocalManager;
900    use common_procedure::test_util::InMemoryPoisonStore;
901
902    use super::DdlManager;
903    use crate::cache_invalidator::DummyCacheInvalidator;
904    use crate::ddl::alter_table::AlterTableProcedure;
905    use crate::ddl::create_table::CreateTableProcedure;
906    use crate::ddl::drop_table::DropTableProcedure;
907    use crate::ddl::flow_meta::FlowMetadataAllocator;
908    use crate::ddl::table_meta::TableMetadataAllocator;
909    use crate::ddl::truncate_table::TruncateTableProcedure;
910    use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
911    use crate::key::flow::FlowMetadataManager;
912    use crate::key::TableMetadataManager;
913    use crate::kv_backend::memory::MemoryKvBackend;
914    use crate::node_manager::{DatanodeManager, DatanodeRef, FlownodeManager, FlownodeRef};
915    use crate::peer::Peer;
916    use crate::region_keeper::MemoryRegionKeeper;
917    use crate::region_registry::LeaderRegionRegistry;
918    use crate::sequence::SequenceBuilder;
919    use crate::state_store::KvStateStore;
920    use crate::wal_options_allocator::WalOptionsAllocator;
921
922    /// A dummy implemented [NodeManager].
923    pub struct DummyDatanodeManager;
924
925    #[async_trait::async_trait]
926    impl DatanodeManager for DummyDatanodeManager {
927        async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
928            unimplemented!()
929        }
930    }
931
932    #[async_trait::async_trait]
933    impl FlownodeManager for DummyDatanodeManager {
934        async fn flownode(&self, _node: &Peer) -> FlownodeRef {
935            unimplemented!()
936        }
937    }
938
939    #[test]
940    fn test_try_new() {
941        let kv_backend = Arc::new(MemoryKvBackend::new());
942        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
943        let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
944            Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
945            Arc::new(WalOptionsAllocator::default()),
946        ));
947        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
948        let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
949            Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()),
950        ));
951
952        let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
953        let poison_manager = Arc::new(InMemoryPoisonStore::default());
954        let procedure_manager = Arc::new(LocalManager::new(
955            Default::default(),
956            state_store,
957            poison_manager,
958            None,
959            None,
960        ));
961
962        let _ = DdlManager::try_new(
963            DdlContext {
964                node_manager: Arc::new(DummyDatanodeManager),
965                cache_invalidator: Arc::new(DummyCacheInvalidator),
966                table_metadata_manager,
967                table_metadata_allocator,
968                flow_metadata_manager,
969                flow_metadata_allocator,
970                memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
971                leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
972                region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
973            },
974            procedure_manager.clone(),
975            true,
976        );
977
978        let expected_loaders = vec![
979            CreateTableProcedure::TYPE_NAME,
980            AlterTableProcedure::TYPE_NAME,
981            DropTableProcedure::TYPE_NAME,
982            TruncateTableProcedure::TYPE_NAME,
983        ];
984
985        for loader in expected_loaders {
986            assert!(procedure_manager.contains_loader(loader));
987        }
988    }
989}