common_meta/
ddl_manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_error::ext::BoxedError;
18use common_procedure::{
19    BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId, watcher,
20};
21use common_telemetry::tracing_context::{FutureExt, TracingContext};
22use common_telemetry::{debug, info, tracing};
23use derive_builder::Builder;
24use snafu::{OptionExt, ResultExt, ensure};
25use store_api::storage::TableId;
26
27use crate::ddl::alter_database::AlterDatabaseProcedure;
28use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
29use crate::ddl::alter_table::AlterTableProcedure;
30use crate::ddl::comment_on::CommentOnProcedure;
31use crate::ddl::create_database::CreateDatabaseProcedure;
32use crate::ddl::create_flow::CreateFlowProcedure;
33use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
34use crate::ddl::create_table::CreateTableProcedure;
35use crate::ddl::create_view::CreateViewProcedure;
36use crate::ddl::drop_database::DropDatabaseProcedure;
37use crate::ddl::drop_flow::DropFlowProcedure;
38use crate::ddl::drop_table::DropTableProcedure;
39use crate::ddl::drop_view::DropViewProcedure;
40use crate::ddl::truncate_table::TruncateTableProcedure;
41use crate::ddl::{DdlContext, utils};
42use crate::error::{
43    EmptyDdlTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result,
44    SubmitProcedureSnafu, TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
45    UnexpectedLogicalRouteTableSnafu, WaitProcedureSnafu,
46};
47use crate::key::table_info::TableInfoValue;
48use crate::key::table_name::TableNameKey;
49use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
50use crate::procedure_executor::ExecutorContext;
51#[cfg(feature = "enterprise")]
52use crate::rpc::ddl::DdlTask::CreateTrigger;
53#[cfg(feature = "enterprise")]
54use crate::rpc::ddl::DdlTask::DropTrigger;
55use crate::rpc::ddl::DdlTask::{
56    AlterDatabase, AlterLogicalTables, AlterTable, CommentOn, CreateDatabase, CreateFlow,
57    CreateLogicalTables, CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables,
58    DropTable, DropView, TruncateTable,
59};
60#[cfg(feature = "enterprise")]
61use crate::rpc::ddl::trigger::CreateTriggerTask;
62#[cfg(feature = "enterprise")]
63use crate::rpc::ddl::trigger::DropTriggerTask;
64use crate::rpc::ddl::{
65    AlterDatabaseTask, AlterTableTask, CommentOnTask, CreateDatabaseTask, CreateFlowTask,
66    CreateTableTask, CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask,
67    QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
68};
69use crate::rpc::router::RegionRoute;
70
71/// A configurator that customizes or enhances a [`DdlManager`].
72#[async_trait::async_trait]
73pub trait DdlManagerConfigurator<C>: Send + Sync {
74    /// Configures the given [`DdlManager`] using the provided [`DdlManagerConfigureContext`].
75    async fn configure(
76        &self,
77        ddl_manager: DdlManager,
78        ctx: C,
79    ) -> std::result::Result<DdlManager, BoxedError>;
80}
81
82pub type DdlManagerConfiguratorRef<C> = Arc<dyn DdlManagerConfigurator<C>>;
83
84pub type DdlManagerRef = Arc<DdlManager>;
85
86pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader;
87
88/// The [DdlManager] provides the ability to execute Ddl.
89#[derive(Builder)]
90pub struct DdlManager {
91    ddl_context: DdlContext,
92    procedure_manager: ProcedureManagerRef,
93    #[cfg(feature = "enterprise")]
94    trigger_ddl_manager: Option<TriggerDdlManagerRef>,
95}
96
97/// This trait is responsible for handling DDL tasks about triggers. e.g.,
98/// create trigger, drop trigger, etc.
99#[cfg(feature = "enterprise")]
100#[async_trait::async_trait]
101pub trait TriggerDdlManager: Send + Sync {
102    async fn create_trigger(
103        &self,
104        create_trigger_task: CreateTriggerTask,
105        procedure_manager: ProcedureManagerRef,
106        ddl_context: DdlContext,
107        query_context: QueryContext,
108    ) -> Result<SubmitDdlTaskResponse>;
109
110    async fn drop_trigger(
111        &self,
112        drop_trigger_task: DropTriggerTask,
113        procedure_manager: ProcedureManagerRef,
114        ddl_context: DdlContext,
115        query_context: QueryContext,
116    ) -> Result<SubmitDdlTaskResponse>;
117
118    fn as_any(&self) -> &dyn std::any::Any;
119}
120
121#[cfg(feature = "enterprise")]
122pub type TriggerDdlManagerRef = Arc<dyn TriggerDdlManager>;
123
124macro_rules! procedure_loader_entry {
125    ($procedure:ident) => {
126        (
127            $procedure::TYPE_NAME,
128            &|context: DdlContext| -> BoxedProcedureLoader {
129                Box::new(move |json: &str| {
130                    let context = context.clone();
131                    $procedure::from_json(json, context).map(|p| Box::new(p) as _)
132                })
133            },
134        )
135    };
136}
137
138macro_rules! procedure_loader {
139    ($($procedure:ident),*) => {
140        vec![
141            $(procedure_loader_entry!($procedure)),*
142        ]
143    };
144}
145
146impl DdlManager {
147    /// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered.
148    pub fn try_new(
149        ddl_context: DdlContext,
150        procedure_manager: ProcedureManagerRef,
151        register_loaders: bool,
152    ) -> Result<Self> {
153        let manager = Self {
154            ddl_context,
155            procedure_manager,
156            #[cfg(feature = "enterprise")]
157            trigger_ddl_manager: None,
158        };
159        if register_loaders {
160            manager.register_loaders()?;
161        }
162        Ok(manager)
163    }
164
165    #[cfg(feature = "enterprise")]
166    pub fn with_trigger_ddl_manager(mut self, trigger_ddl_manager: TriggerDdlManagerRef) -> Self {
167        self.trigger_ddl_manager = Some(trigger_ddl_manager);
168        self
169    }
170
171    /// Returns the [TableMetadataManagerRef].
172    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
173        &self.ddl_context.table_metadata_manager
174    }
175
176    /// Returns the [DdlContext]
177    pub fn create_context(&self) -> DdlContext {
178        self.ddl_context.clone()
179    }
180
181    /// Registers all Ddl loaders.
182    pub fn register_loaders(&self) -> Result<()> {
183        let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = procedure_loader!(
184            CreateTableProcedure,
185            CreateLogicalTablesProcedure,
186            CreateViewProcedure,
187            CreateFlowProcedure,
188            AlterTableProcedure,
189            AlterLogicalTablesProcedure,
190            AlterDatabaseProcedure,
191            DropTableProcedure,
192            DropFlowProcedure,
193            TruncateTableProcedure,
194            CreateDatabaseProcedure,
195            DropDatabaseProcedure,
196            DropViewProcedure,
197            CommentOnProcedure
198        );
199
200        for (type_name, loader_factory) in loaders {
201            let context = self.create_context();
202            self.procedure_manager
203                .register_loader(type_name, loader_factory(context))
204                .context(RegisterProcedureLoaderSnafu { type_name })?;
205        }
206
207        Ok(())
208    }
209
210    /// Submits and executes an alter table task.
211    #[tracing::instrument(skip_all)]
212    pub async fn submit_alter_table_task(
213        &self,
214        table_id: TableId,
215        alter_table_task: AlterTableTask,
216    ) -> Result<(ProcedureId, Option<Output>)> {
217        let context = self.create_context();
218
219        let procedure = AlterTableProcedure::new(table_id, alter_table_task, context)?;
220
221        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
222
223        self.submit_procedure(procedure_with_id).await
224    }
225
226    /// Submits and executes a create table task.
227    #[tracing::instrument(skip_all)]
228    pub async fn submit_create_table_task(
229        &self,
230        create_table_task: CreateTableTask,
231    ) -> Result<(ProcedureId, Option<Output>)> {
232        let context = self.create_context();
233
234        let procedure = CreateTableProcedure::new(create_table_task, context);
235
236        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
237
238        self.submit_procedure(procedure_with_id).await
239    }
240
241    /// Submits and executes a `[CreateViewTask]`.
242    #[tracing::instrument(skip_all)]
243    pub async fn submit_create_view_task(
244        &self,
245        create_view_task: CreateViewTask,
246    ) -> Result<(ProcedureId, Option<Output>)> {
247        let context = self.create_context();
248
249        let procedure = CreateViewProcedure::new(create_view_task, context);
250
251        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
252
253        self.submit_procedure(procedure_with_id).await
254    }
255
256    /// Submits and executes a create multiple logical table tasks.
257    #[tracing::instrument(skip_all)]
258    pub async fn submit_create_logical_table_tasks(
259        &self,
260        create_table_tasks: Vec<CreateTableTask>,
261        physical_table_id: TableId,
262    ) -> Result<(ProcedureId, Option<Output>)> {
263        let context = self.create_context();
264
265        let procedure =
266            CreateLogicalTablesProcedure::new(create_table_tasks, physical_table_id, context);
267
268        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
269
270        self.submit_procedure(procedure_with_id).await
271    }
272
273    /// Submits and executes alter multiple table tasks.
274    #[tracing::instrument(skip_all)]
275    pub async fn submit_alter_logical_table_tasks(
276        &self,
277        alter_table_tasks: Vec<AlterTableTask>,
278        physical_table_id: TableId,
279    ) -> Result<(ProcedureId, Option<Output>)> {
280        let context = self.create_context();
281
282        let procedure =
283            AlterLogicalTablesProcedure::new(alter_table_tasks, physical_table_id, context);
284
285        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
286
287        self.submit_procedure(procedure_with_id).await
288    }
289
290    /// Submits and executes a drop table task.
291    #[tracing::instrument(skip_all)]
292    pub async fn submit_drop_table_task(
293        &self,
294        drop_table_task: DropTableTask,
295    ) -> Result<(ProcedureId, Option<Output>)> {
296        let context = self.create_context();
297
298        let procedure = DropTableProcedure::new(drop_table_task, context);
299
300        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
301
302        self.submit_procedure(procedure_with_id).await
303    }
304
305    /// Submits and executes a create database task.
306    #[tracing::instrument(skip_all)]
307    pub async fn submit_create_database(
308        &self,
309        CreateDatabaseTask {
310            catalog,
311            schema,
312            create_if_not_exists,
313            options,
314        }: CreateDatabaseTask,
315    ) -> Result<(ProcedureId, Option<Output>)> {
316        let context = self.create_context();
317        let procedure =
318            CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context);
319        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
320
321        self.submit_procedure(procedure_with_id).await
322    }
323
324    /// Submits and executes a drop table task.
325    #[tracing::instrument(skip_all)]
326    pub async fn submit_drop_database(
327        &self,
328        DropDatabaseTask {
329            catalog,
330            schema,
331            drop_if_exists,
332        }: DropDatabaseTask,
333    ) -> Result<(ProcedureId, Option<Output>)> {
334        let context = self.create_context();
335        let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context);
336        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
337
338        self.submit_procedure(procedure_with_id).await
339    }
340
341    pub async fn submit_alter_database(
342        &self,
343        alter_database_task: AlterDatabaseTask,
344    ) -> Result<(ProcedureId, Option<Output>)> {
345        let context = self.create_context();
346        let procedure = AlterDatabaseProcedure::new(alter_database_task, context)?;
347        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
348
349        self.submit_procedure(procedure_with_id).await
350    }
351
352    /// Submits and executes a create flow task.
353    #[tracing::instrument(skip_all)]
354    pub async fn submit_create_flow_task(
355        &self,
356        create_flow: CreateFlowTask,
357        query_context: QueryContext,
358    ) -> Result<(ProcedureId, Option<Output>)> {
359        let context = self.create_context();
360        let procedure = CreateFlowProcedure::new(create_flow, query_context, context);
361        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
362
363        self.submit_procedure(procedure_with_id).await
364    }
365
366    /// Submits and executes a drop flow task.
367    #[tracing::instrument(skip_all)]
368    pub async fn submit_drop_flow_task(
369        &self,
370        drop_flow: DropFlowTask,
371    ) -> Result<(ProcedureId, Option<Output>)> {
372        let context = self.create_context();
373        let procedure = DropFlowProcedure::new(drop_flow, context);
374        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
375
376        self.submit_procedure(procedure_with_id).await
377    }
378
379    /// Submits and executes a drop view task.
380    #[tracing::instrument(skip_all)]
381    pub async fn submit_drop_view_task(
382        &self,
383        drop_view: DropViewTask,
384    ) -> Result<(ProcedureId, Option<Output>)> {
385        let context = self.create_context();
386        let procedure = DropViewProcedure::new(drop_view, context);
387        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
388
389        self.submit_procedure(procedure_with_id).await
390    }
391
392    /// Submits and executes a truncate table task.
393    #[tracing::instrument(skip_all)]
394    pub async fn submit_truncate_table_task(
395        &self,
396        truncate_table_task: TruncateTableTask,
397        table_info_value: DeserializedValueWithBytes<TableInfoValue>,
398        region_routes: Vec<RegionRoute>,
399    ) -> Result<(ProcedureId, Option<Output>)> {
400        let context = self.create_context();
401        let procedure = TruncateTableProcedure::new(
402            truncate_table_task,
403            table_info_value,
404            region_routes,
405            context,
406        );
407
408        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
409
410        self.submit_procedure(procedure_with_id).await
411    }
412
413    /// Submits and executes a comment on task.
414    #[tracing::instrument(skip_all)]
415    pub async fn submit_comment_on_task(
416        &self,
417        comment_on_task: CommentOnTask,
418    ) -> Result<(ProcedureId, Option<Output>)> {
419        let context = self.create_context();
420        let procedure = CommentOnProcedure::new(comment_on_task, context);
421        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
422
423        self.submit_procedure(procedure_with_id).await
424    }
425
426    async fn submit_procedure(
427        &self,
428        procedure_with_id: ProcedureWithId,
429    ) -> Result<(ProcedureId, Option<Output>)> {
430        let procedure_id = procedure_with_id.id;
431
432        let mut watcher = self
433            .procedure_manager
434            .submit(procedure_with_id)
435            .await
436            .context(SubmitProcedureSnafu)?;
437
438        let output = watcher::wait(&mut watcher)
439            .await
440            .context(WaitProcedureSnafu)?;
441
442        Ok((procedure_id, output))
443    }
444
445    pub async fn submit_ddl_task(
446        &self,
447        ctx: &ExecutorContext,
448        request: SubmitDdlTaskRequest,
449    ) -> Result<SubmitDdlTaskResponse> {
450        let span = ctx
451            .tracing_context
452            .as_ref()
453            .map(TracingContext::from_w3c)
454            .unwrap_or_else(TracingContext::from_current_span)
455            .attach(tracing::info_span!("DdlManager::submit_ddl_task"));
456        async move {
457            debug!("Submitting Ddl task: {:?}", request.task);
458            match request.task {
459                CreateTable(create_table_task) => {
460                    handle_create_table_task(self, create_table_task).await
461                }
462                DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await,
463                AlterTable(alter_table_task) => {
464                    handle_alter_table_task(self, alter_table_task).await
465                }
466                TruncateTable(truncate_table_task) => {
467                    handle_truncate_table_task(self, truncate_table_task).await
468                }
469                CreateLogicalTables(create_table_tasks) => {
470                    handle_create_logical_table_tasks(self, create_table_tasks).await
471                }
472                AlterLogicalTables(alter_table_tasks) => {
473                    handle_alter_logical_table_tasks(self, alter_table_tasks).await
474                }
475                DropLogicalTables(_) => todo!(),
476                CreateDatabase(create_database_task) => {
477                    handle_create_database_task(self, create_database_task).await
478                }
479                DropDatabase(drop_database_task) => {
480                    handle_drop_database_task(self, drop_database_task).await
481                }
482                AlterDatabase(alter_database_task) => {
483                    handle_alter_database_task(self, alter_database_task).await
484                }
485                CreateFlow(create_flow_task) => {
486                    handle_create_flow_task(self, create_flow_task, request.query_context.into())
487                        .await
488                }
489                DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
490                CreateView(create_view_task) => {
491                    handle_create_view_task(self, create_view_task).await
492                }
493                DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
494                CommentOn(comment_on_task) => handle_comment_on_task(self, comment_on_task).await,
495                #[cfg(feature = "enterprise")]
496                CreateTrigger(create_trigger_task) => {
497                    handle_create_trigger_task(
498                        self,
499                        create_trigger_task,
500                        request.query_context.into(),
501                    )
502                    .await
503                }
504                #[cfg(feature = "enterprise")]
505                DropTrigger(drop_trigger_task) => {
506                    handle_drop_trigger_task(self, drop_trigger_task, request.query_context.into())
507                        .await
508                }
509            }
510        }
511        .trace(span)
512        .await
513    }
514}
515
516async fn handle_truncate_table_task(
517    ddl_manager: &DdlManager,
518    truncate_table_task: TruncateTableTask,
519) -> Result<SubmitDdlTaskResponse> {
520    let table_id = truncate_table_task.table_id;
521    let table_metadata_manager = &ddl_manager.table_metadata_manager();
522    let table_ref = truncate_table_task.table_ref();
523
524    let (table_info_value, table_route_value) =
525        table_metadata_manager.get_full_table_info(table_id).await?;
526
527    let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu {
528        table: table_ref.to_string(),
529    })?;
530
531    let table_route_value = table_route_value.context(TableRouteNotFoundSnafu { table_id })?;
532
533    let table_route = table_route_value.into_inner().region_routes()?.clone();
534
535    let (id, _) = ddl_manager
536        .submit_truncate_table_task(truncate_table_task, table_info_value, table_route)
537        .await?;
538
539    info!("Table: {table_id} is truncated via procedure_id {id:?}");
540
541    Ok(SubmitDdlTaskResponse {
542        key: id.to_string().into(),
543        ..Default::default()
544    })
545}
546
547async fn handle_alter_table_task(
548    ddl_manager: &DdlManager,
549    alter_table_task: AlterTableTask,
550) -> Result<SubmitDdlTaskResponse> {
551    let table_ref = alter_table_task.table_ref();
552
553    let table_id = ddl_manager
554        .table_metadata_manager()
555        .table_name_manager()
556        .get(TableNameKey::new(
557            table_ref.catalog,
558            table_ref.schema,
559            table_ref.table,
560        ))
561        .await?
562        .with_context(|| TableNotFoundSnafu {
563            table_name: table_ref.to_string(),
564        })?
565        .table_id();
566
567    let table_route_value = ddl_manager
568        .table_metadata_manager()
569        .table_route_manager()
570        .table_route_storage()
571        .get(table_id)
572        .await?
573        .context(TableRouteNotFoundSnafu { table_id })?;
574    ensure!(
575        table_route_value.is_physical(),
576        UnexpectedLogicalRouteTableSnafu {
577            err_msg: format!("{:?} is a non-physical TableRouteValue.", table_ref),
578        }
579    );
580
581    let (id, _) = ddl_manager
582        .submit_alter_table_task(table_id, alter_table_task)
583        .await?;
584
585    info!("Table: {table_id} is altered via procedure_id {id:?}");
586
587    Ok(SubmitDdlTaskResponse {
588        key: id.to_string().into(),
589        ..Default::default()
590    })
591}
592
593async fn handle_drop_table_task(
594    ddl_manager: &DdlManager,
595    drop_table_task: DropTableTask,
596) -> Result<SubmitDdlTaskResponse> {
597    let table_id = drop_table_task.table_id;
598    let (id, _) = ddl_manager.submit_drop_table_task(drop_table_task).await?;
599
600    info!("Table: {table_id} is dropped via procedure_id {id:?}");
601
602    Ok(SubmitDdlTaskResponse {
603        key: id.to_string().into(),
604        ..Default::default()
605    })
606}
607
608async fn handle_create_table_task(
609    ddl_manager: &DdlManager,
610    create_table_task: CreateTableTask,
611) -> Result<SubmitDdlTaskResponse> {
612    let (id, output) = ddl_manager
613        .submit_create_table_task(create_table_task)
614        .await?;
615
616    let procedure_id = id.to_string();
617    let output = output.context(ProcedureOutputSnafu {
618        procedure_id: &procedure_id,
619        err_msg: "empty output",
620    })?;
621    let table_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
622        procedure_id: &procedure_id,
623        err_msg: "downcast to `u32`",
624    })?);
625    info!("Table: {table_id} is created via procedure_id {id:?}");
626
627    Ok(SubmitDdlTaskResponse {
628        key: procedure_id.into(),
629        table_ids: vec![table_id],
630    })
631}
632
633async fn handle_create_logical_table_tasks(
634    ddl_manager: &DdlManager,
635    create_table_tasks: Vec<CreateTableTask>,
636) -> Result<SubmitDdlTaskResponse> {
637    ensure!(
638        !create_table_tasks.is_empty(),
639        EmptyDdlTasksSnafu {
640            name: "create logical tables"
641        }
642    );
643    let physical_table_id = utils::check_and_get_physical_table_id(
644        ddl_manager.table_metadata_manager(),
645        &create_table_tasks,
646    )
647    .await?;
648    let num_logical_tables = create_table_tasks.len();
649
650    let (id, output) = ddl_manager
651        .submit_create_logical_table_tasks(create_table_tasks, physical_table_id)
652        .await?;
653
654    info!(
655        "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is created via procedure_id {id:?}"
656    );
657
658    let procedure_id = id.to_string();
659    let output = output.context(ProcedureOutputSnafu {
660        procedure_id: &procedure_id,
661        err_msg: "empty output",
662    })?;
663    let table_ids = output
664        .downcast_ref::<Vec<TableId>>()
665        .context(ProcedureOutputSnafu {
666            procedure_id: &procedure_id,
667            err_msg: "downcast to `Vec<TableId>`",
668        })?
669        .clone();
670
671    Ok(SubmitDdlTaskResponse {
672        key: procedure_id.into(),
673        table_ids,
674    })
675}
676
677async fn handle_create_database_task(
678    ddl_manager: &DdlManager,
679    create_database_task: CreateDatabaseTask,
680) -> Result<SubmitDdlTaskResponse> {
681    let (id, _) = ddl_manager
682        .submit_create_database(create_database_task.clone())
683        .await?;
684
685    let procedure_id = id.to_string();
686    info!(
687        "Database {}.{} is created via procedure_id {id:?}",
688        create_database_task.catalog, create_database_task.schema
689    );
690
691    Ok(SubmitDdlTaskResponse {
692        key: procedure_id.into(),
693        ..Default::default()
694    })
695}
696
697async fn handle_drop_database_task(
698    ddl_manager: &DdlManager,
699    drop_database_task: DropDatabaseTask,
700) -> Result<SubmitDdlTaskResponse> {
701    let (id, _) = ddl_manager
702        .submit_drop_database(drop_database_task.clone())
703        .await?;
704
705    let procedure_id = id.to_string();
706    info!(
707        "Database {}.{} is dropped via procedure_id {id:?}",
708        drop_database_task.catalog, drop_database_task.schema
709    );
710
711    Ok(SubmitDdlTaskResponse {
712        key: procedure_id.into(),
713        ..Default::default()
714    })
715}
716
717async fn handle_alter_database_task(
718    ddl_manager: &DdlManager,
719    alter_database_task: AlterDatabaseTask,
720) -> Result<SubmitDdlTaskResponse> {
721    let (id, _) = ddl_manager
722        .submit_alter_database(alter_database_task.clone())
723        .await?;
724
725    let procedure_id = id.to_string();
726    info!(
727        "Database {}.{} is altered via procedure_id {id:?}",
728        alter_database_task.catalog(),
729        alter_database_task.schema()
730    );
731
732    Ok(SubmitDdlTaskResponse {
733        key: procedure_id.into(),
734        ..Default::default()
735    })
736}
737
738async fn handle_drop_flow_task(
739    ddl_manager: &DdlManager,
740    drop_flow_task: DropFlowTask,
741) -> Result<SubmitDdlTaskResponse> {
742    let (id, _) = ddl_manager
743        .submit_drop_flow_task(drop_flow_task.clone())
744        .await?;
745
746    let procedure_id = id.to_string();
747    info!(
748        "Flow {}.{}({}) is dropped via procedure_id {id:?}",
749        drop_flow_task.catalog_name, drop_flow_task.flow_name, drop_flow_task.flow_id,
750    );
751
752    Ok(SubmitDdlTaskResponse {
753        key: procedure_id.into(),
754        ..Default::default()
755    })
756}
757
758#[cfg(feature = "enterprise")]
759async fn handle_drop_trigger_task(
760    ddl_manager: &DdlManager,
761    drop_trigger_task: DropTriggerTask,
762    query_context: QueryContext,
763) -> Result<SubmitDdlTaskResponse> {
764    let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
765        use crate::error::UnsupportedSnafu;
766
767        return UnsupportedSnafu {
768            operation: "drop trigger",
769        }
770        .fail();
771    };
772
773    m.drop_trigger(
774        drop_trigger_task,
775        ddl_manager.procedure_manager.clone(),
776        ddl_manager.ddl_context.clone(),
777        query_context,
778    )
779    .await
780}
781
782async fn handle_drop_view_task(
783    ddl_manager: &DdlManager,
784    drop_view_task: DropViewTask,
785) -> Result<SubmitDdlTaskResponse> {
786    let (id, _) = ddl_manager
787        .submit_drop_view_task(drop_view_task.clone())
788        .await?;
789
790    let procedure_id = id.to_string();
791    info!(
792        "View {}({}) is dropped via procedure_id {id:?}",
793        drop_view_task.table_ref(),
794        drop_view_task.view_id,
795    );
796
797    Ok(SubmitDdlTaskResponse {
798        key: procedure_id.into(),
799        ..Default::default()
800    })
801}
802
803async fn handle_create_flow_task(
804    ddl_manager: &DdlManager,
805    create_flow_task: CreateFlowTask,
806    query_context: QueryContext,
807) -> Result<SubmitDdlTaskResponse> {
808    let (id, output) = ddl_manager
809        .submit_create_flow_task(create_flow_task.clone(), query_context)
810        .await?;
811
812    let procedure_id = id.to_string();
813    let output = output.context(ProcedureOutputSnafu {
814        procedure_id: &procedure_id,
815        err_msg: "empty output",
816    })?;
817    let flow_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
818        procedure_id: &procedure_id,
819        err_msg: "downcast to `u32`",
820    })?);
821    if !create_flow_task.or_replace {
822        info!(
823            "Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
824            create_flow_task.catalog_name, create_flow_task.flow_name,
825        );
826    } else {
827        info!(
828            "Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
829            create_flow_task.catalog_name, create_flow_task.flow_name,
830        );
831    }
832
833    Ok(SubmitDdlTaskResponse {
834        key: procedure_id.into(),
835        ..Default::default()
836    })
837}
838
839#[cfg(feature = "enterprise")]
840async fn handle_create_trigger_task(
841    ddl_manager: &DdlManager,
842    create_trigger_task: CreateTriggerTask,
843    query_context: QueryContext,
844) -> Result<SubmitDdlTaskResponse> {
845    let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
846        use crate::error::UnsupportedSnafu;
847
848        return UnsupportedSnafu {
849            operation: "create trigger",
850        }
851        .fail();
852    };
853
854    m.create_trigger(
855        create_trigger_task,
856        ddl_manager.procedure_manager.clone(),
857        ddl_manager.ddl_context.clone(),
858        query_context,
859    )
860    .await
861}
862
863async fn handle_alter_logical_table_tasks(
864    ddl_manager: &DdlManager,
865    alter_table_tasks: Vec<AlterTableTask>,
866) -> Result<SubmitDdlTaskResponse> {
867    ensure!(
868        !alter_table_tasks.is_empty(),
869        EmptyDdlTasksSnafu {
870            name: "alter logical tables"
871        }
872    );
873
874    // Use the physical table id in the first logical table, then it will be checked in the procedure.
875    let first_table = TableNameKey {
876        catalog: &alter_table_tasks[0].alter_table.catalog_name,
877        schema: &alter_table_tasks[0].alter_table.schema_name,
878        table: &alter_table_tasks[0].alter_table.table_name,
879    };
880    let physical_table_id =
881        utils::get_physical_table_id(ddl_manager.table_metadata_manager(), first_table).await?;
882    let num_logical_tables = alter_table_tasks.len();
883
884    let (id, _) = ddl_manager
885        .submit_alter_logical_table_tasks(alter_table_tasks, physical_table_id)
886        .await?;
887
888    info!(
889        "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is altered via procedure_id {id:?}"
890    );
891
892    let procedure_id = id.to_string();
893
894    Ok(SubmitDdlTaskResponse {
895        key: procedure_id.into(),
896        ..Default::default()
897    })
898}
899
900/// Handle the `[CreateViewTask]` and returns the DDL response when success.
901async fn handle_create_view_task(
902    ddl_manager: &DdlManager,
903    create_view_task: CreateViewTask,
904) -> Result<SubmitDdlTaskResponse> {
905    let (id, output) = ddl_manager
906        .submit_create_view_task(create_view_task)
907        .await?;
908
909    let procedure_id = id.to_string();
910    let output = output.context(ProcedureOutputSnafu {
911        procedure_id: &procedure_id,
912        err_msg: "empty output",
913    })?;
914    let view_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
915        procedure_id: &procedure_id,
916        err_msg: "downcast to `u32`",
917    })?);
918    info!("View: {view_id} is created via procedure_id {id:?}");
919
920    Ok(SubmitDdlTaskResponse {
921        key: procedure_id.into(),
922        table_ids: vec![view_id],
923    })
924}
925
926async fn handle_comment_on_task(
927    ddl_manager: &DdlManager,
928    comment_on_task: CommentOnTask,
929) -> Result<SubmitDdlTaskResponse> {
930    let (id, _) = ddl_manager
931        .submit_comment_on_task(comment_on_task.clone())
932        .await?;
933
934    let procedure_id = id.to_string();
935    info!(
936        "Comment on {}.{}.{} is updated via procedure_id {id:?}",
937        comment_on_task.catalog_name, comment_on_task.schema_name, comment_on_task.object_name
938    );
939
940    Ok(SubmitDdlTaskResponse {
941        key: procedure_id.into(),
942        ..Default::default()
943    })
944}
945
946#[cfg(test)]
947mod tests {
948    use std::sync::Arc;
949
950    use common_procedure::local::LocalManager;
951    use common_procedure::test_util::InMemoryPoisonStore;
952
953    use super::DdlManager;
954    use crate::cache_invalidator::DummyCacheInvalidator;
955    use crate::ddl::alter_table::AlterTableProcedure;
956    use crate::ddl::create_table::CreateTableProcedure;
957    use crate::ddl::drop_table::DropTableProcedure;
958    use crate::ddl::flow_meta::FlowMetadataAllocator;
959    use crate::ddl::table_meta::TableMetadataAllocator;
960    use crate::ddl::truncate_table::TruncateTableProcedure;
961    use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
962    use crate::key::TableMetadataManager;
963    use crate::key::flow::FlowMetadataManager;
964    use crate::kv_backend::memory::MemoryKvBackend;
965    use crate::node_manager::{DatanodeManager, DatanodeRef, FlownodeManager, FlownodeRef};
966    use crate::peer::Peer;
967    use crate::region_keeper::MemoryRegionKeeper;
968    use crate::region_registry::LeaderRegionRegistry;
969    use crate::sequence::SequenceBuilder;
970    use crate::state_store::KvStateStore;
971    use crate::wal_options_allocator::WalOptionsAllocator;
972
973    /// A dummy implemented [NodeManager].
974    pub struct DummyDatanodeManager;
975
976    #[async_trait::async_trait]
977    impl DatanodeManager for DummyDatanodeManager {
978        async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
979            unimplemented!()
980        }
981    }
982
983    #[async_trait::async_trait]
984    impl FlownodeManager for DummyDatanodeManager {
985        async fn flownode(&self, _node: &Peer) -> FlownodeRef {
986            unimplemented!()
987        }
988    }
989
990    #[test]
991    fn test_try_new() {
992        let kv_backend = Arc::new(MemoryKvBackend::new());
993        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
994        let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
995            Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
996            Arc::new(WalOptionsAllocator::default()),
997        ));
998        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
999        let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
1000            Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()),
1001        ));
1002
1003        let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
1004        let poison_manager = Arc::new(InMemoryPoisonStore::default());
1005        let procedure_manager = Arc::new(LocalManager::new(
1006            Default::default(),
1007            state_store,
1008            poison_manager,
1009            None,
1010            None,
1011        ));
1012
1013        let _ = DdlManager::try_new(
1014            DdlContext {
1015                node_manager: Arc::new(DummyDatanodeManager),
1016                cache_invalidator: Arc::new(DummyCacheInvalidator),
1017                table_metadata_manager,
1018                table_metadata_allocator,
1019                flow_metadata_manager,
1020                flow_metadata_allocator,
1021                memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
1022                leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
1023                region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
1024            },
1025            procedure_manager.clone(),
1026            true,
1027        );
1028
1029        let expected_loaders = vec![
1030            CreateTableProcedure::TYPE_NAME,
1031            AlterTableProcedure::TYPE_NAME,
1032            DropTableProcedure::TYPE_NAME,
1033            TruncateTableProcedure::TYPE_NAME,
1034        ];
1035
1036        for loader in expected_loaders {
1037            assert!(procedure_manager.contains_loader(loader));
1038        }
1039    }
1040}