common_meta/
ddl_manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::meta::ProcedureDetailResponse;
18use common_procedure::{
19    watcher, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId,
20};
21use common_telemetry::tracing_context::{FutureExt, TracingContext};
22use common_telemetry::{debug, info, tracing};
23use derive_builder::Builder;
24use snafu::{ensure, OptionExt, ResultExt};
25use store_api::storage::TableId;
26
27use crate::ddl::alter_database::AlterDatabaseProcedure;
28use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
29use crate::ddl::alter_table::AlterTableProcedure;
30use crate::ddl::create_database::CreateDatabaseProcedure;
31use crate::ddl::create_flow::CreateFlowProcedure;
32use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
33use crate::ddl::create_table::CreateTableProcedure;
34use crate::ddl::create_view::CreateViewProcedure;
35use crate::ddl::drop_database::DropDatabaseProcedure;
36use crate::ddl::drop_flow::DropFlowProcedure;
37use crate::ddl::drop_table::DropTableProcedure;
38use crate::ddl::drop_view::DropViewProcedure;
39use crate::ddl::truncate_table::TruncateTableProcedure;
40use crate::ddl::{utils, DdlContext, ExecutorContext, ProcedureExecutor};
41use crate::error::{
42    EmptyDdlTasksSnafu, ParseProcedureIdSnafu, ProcedureNotFoundSnafu, ProcedureOutputSnafu,
43    QueryProcedureSnafu, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu,
44    TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
45    UnexpectedLogicalRouteTableSnafu, UnsupportedSnafu, WaitProcedureSnafu,
46};
47use crate::key::table_info::TableInfoValue;
48use crate::key::table_name::TableNameKey;
49use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
50#[cfg(feature = "enterprise")]
51use crate::rpc::ddl::trigger::CreateTriggerTask;
52#[cfg(feature = "enterprise")]
53use crate::rpc::ddl::trigger::DropTriggerTask;
54#[cfg(feature = "enterprise")]
55use crate::rpc::ddl::DdlTask::CreateTrigger;
56#[cfg(feature = "enterprise")]
57use crate::rpc::ddl::DdlTask::DropTrigger;
58use crate::rpc::ddl::DdlTask::{
59    AlterDatabase, AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables,
60    CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView,
61    TruncateTable,
62};
63use crate::rpc::ddl::{
64    AlterDatabaseTask, AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask,
65    CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, QueryContext,
66    SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
67};
68use crate::rpc::procedure;
69use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
70use crate::rpc::router::RegionRoute;
71
72pub type DdlManagerRef = Arc<DdlManager>;
73
74pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader;
75
76/// The [DdlManager] provides the ability to execute Ddl.
77#[derive(Builder)]
78pub struct DdlManager {
79    ddl_context: DdlContext,
80    procedure_manager: ProcedureManagerRef,
81    #[cfg(feature = "enterprise")]
82    trigger_ddl_manager: Option<TriggerDdlManagerRef>,
83}
84
85/// This trait is responsible for handling DDL tasks about triggers. e.g.,
86/// create trigger, drop trigger, etc.
87#[cfg(feature = "enterprise")]
88#[async_trait::async_trait]
89pub trait TriggerDdlManager: Send + Sync {
90    async fn create_trigger(
91        &self,
92        create_trigger_task: CreateTriggerTask,
93        procedure_manager: ProcedureManagerRef,
94        ddl_context: DdlContext,
95        query_context: QueryContext,
96    ) -> Result<SubmitDdlTaskResponse>;
97
98    async fn drop_trigger(
99        &self,
100        drop_trigger_task: DropTriggerTask,
101        procedure_manager: ProcedureManagerRef,
102        ddl_context: DdlContext,
103        query_context: QueryContext,
104    ) -> Result<SubmitDdlTaskResponse>;
105
106    fn as_any(&self) -> &dyn std::any::Any;
107}
108
109#[cfg(feature = "enterprise")]
110pub type TriggerDdlManagerRef = Arc<dyn TriggerDdlManager>;
111
112macro_rules! procedure_loader_entry {
113    ($procedure:ident) => {
114        (
115            $procedure::TYPE_NAME,
116            &|context: DdlContext| -> BoxedProcedureLoader {
117                Box::new(move |json: &str| {
118                    let context = context.clone();
119                    $procedure::from_json(json, context).map(|p| Box::new(p) as _)
120                })
121            },
122        )
123    };
124}
125
126macro_rules! procedure_loader {
127    ($($procedure:ident),*) => {
128        vec![
129            $(procedure_loader_entry!($procedure)),*
130        ]
131    };
132}
133
134impl DdlManager {
135    /// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered.
136    pub fn try_new(
137        ddl_context: DdlContext,
138        procedure_manager: ProcedureManagerRef,
139        register_loaders: bool,
140    ) -> Result<Self> {
141        let manager = Self {
142            ddl_context,
143            procedure_manager,
144            #[cfg(feature = "enterprise")]
145            trigger_ddl_manager: None,
146        };
147        if register_loaders {
148            manager.register_loaders()?;
149        }
150        Ok(manager)
151    }
152
153    #[cfg(feature = "enterprise")]
154    pub fn with_trigger_ddl_manager(
155        mut self,
156        trigger_ddl_manager: Option<TriggerDdlManagerRef>,
157    ) -> Self {
158        self.trigger_ddl_manager = trigger_ddl_manager;
159        self
160    }
161
162    /// Returns the [TableMetadataManagerRef].
163    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
164        &self.ddl_context.table_metadata_manager
165    }
166
167    /// Returns the [DdlContext]
168    pub fn create_context(&self) -> DdlContext {
169        self.ddl_context.clone()
170    }
171
172    /// Registers all Ddl loaders.
173    pub fn register_loaders(&self) -> Result<()> {
174        let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = procedure_loader!(
175            CreateTableProcedure,
176            CreateLogicalTablesProcedure,
177            CreateViewProcedure,
178            CreateFlowProcedure,
179            AlterTableProcedure,
180            AlterLogicalTablesProcedure,
181            AlterDatabaseProcedure,
182            DropTableProcedure,
183            DropFlowProcedure,
184            TruncateTableProcedure,
185            CreateDatabaseProcedure,
186            DropDatabaseProcedure,
187            DropViewProcedure
188        );
189
190        for (type_name, loader_factory) in loaders {
191            let context = self.create_context();
192            self.procedure_manager
193                .register_loader(type_name, loader_factory(context))
194                .context(RegisterProcedureLoaderSnafu { type_name })?;
195        }
196
197        Ok(())
198    }
199
200    /// Submits and executes an alter table task.
201    #[tracing::instrument(skip_all)]
202    pub async fn submit_alter_table_task(
203        &self,
204        table_id: TableId,
205        alter_table_task: AlterTableTask,
206    ) -> Result<(ProcedureId, Option<Output>)> {
207        let context = self.create_context();
208
209        let procedure = AlterTableProcedure::new(table_id, alter_table_task, context)?;
210
211        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
212
213        self.submit_procedure(procedure_with_id).await
214    }
215
216    /// Submits and executes a create table task.
217    #[tracing::instrument(skip_all)]
218    pub async fn submit_create_table_task(
219        &self,
220        create_table_task: CreateTableTask,
221    ) -> Result<(ProcedureId, Option<Output>)> {
222        let context = self.create_context();
223
224        let procedure = CreateTableProcedure::new(create_table_task, context);
225
226        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
227
228        self.submit_procedure(procedure_with_id).await
229    }
230
231    /// Submits and executes a `[CreateViewTask]`.
232    #[tracing::instrument(skip_all)]
233    pub async fn submit_create_view_task(
234        &self,
235        create_view_task: CreateViewTask,
236    ) -> Result<(ProcedureId, Option<Output>)> {
237        let context = self.create_context();
238
239        let procedure = CreateViewProcedure::new(create_view_task, context);
240
241        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
242
243        self.submit_procedure(procedure_with_id).await
244    }
245
246    /// Submits and executes a create multiple logical table tasks.
247    #[tracing::instrument(skip_all)]
248    pub async fn submit_create_logical_table_tasks(
249        &self,
250        create_table_tasks: Vec<CreateTableTask>,
251        physical_table_id: TableId,
252    ) -> Result<(ProcedureId, Option<Output>)> {
253        let context = self.create_context();
254
255        let procedure =
256            CreateLogicalTablesProcedure::new(create_table_tasks, physical_table_id, context);
257
258        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
259
260        self.submit_procedure(procedure_with_id).await
261    }
262
263    /// Submits and executes alter multiple table tasks.
264    #[tracing::instrument(skip_all)]
265    pub async fn submit_alter_logical_table_tasks(
266        &self,
267        alter_table_tasks: Vec<AlterTableTask>,
268        physical_table_id: TableId,
269    ) -> Result<(ProcedureId, Option<Output>)> {
270        let context = self.create_context();
271
272        let procedure =
273            AlterLogicalTablesProcedure::new(alter_table_tasks, physical_table_id, context);
274
275        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
276
277        self.submit_procedure(procedure_with_id).await
278    }
279
280    /// Submits and executes a drop table task.
281    #[tracing::instrument(skip_all)]
282    pub async fn submit_drop_table_task(
283        &self,
284        drop_table_task: DropTableTask,
285    ) -> Result<(ProcedureId, Option<Output>)> {
286        let context = self.create_context();
287
288        let procedure = DropTableProcedure::new(drop_table_task, context);
289
290        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
291
292        self.submit_procedure(procedure_with_id).await
293    }
294
295    /// Submits and executes a create database task.
296    #[tracing::instrument(skip_all)]
297    pub async fn submit_create_database(
298        &self,
299        CreateDatabaseTask {
300            catalog,
301            schema,
302            create_if_not_exists,
303            options,
304        }: CreateDatabaseTask,
305    ) -> Result<(ProcedureId, Option<Output>)> {
306        let context = self.create_context();
307        let procedure =
308            CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context);
309        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
310
311        self.submit_procedure(procedure_with_id).await
312    }
313
314    /// Submits and executes a drop table task.
315    #[tracing::instrument(skip_all)]
316    pub async fn submit_drop_database(
317        &self,
318        DropDatabaseTask {
319            catalog,
320            schema,
321            drop_if_exists,
322        }: DropDatabaseTask,
323    ) -> Result<(ProcedureId, Option<Output>)> {
324        let context = self.create_context();
325        let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context);
326        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
327
328        self.submit_procedure(procedure_with_id).await
329    }
330
331    pub async fn submit_alter_database(
332        &self,
333        alter_database_task: AlterDatabaseTask,
334    ) -> Result<(ProcedureId, Option<Output>)> {
335        let context = self.create_context();
336        let procedure = AlterDatabaseProcedure::new(alter_database_task, context)?;
337        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
338
339        self.submit_procedure(procedure_with_id).await
340    }
341
342    /// Submits and executes a create flow task.
343    #[tracing::instrument(skip_all)]
344    pub async fn submit_create_flow_task(
345        &self,
346        create_flow: CreateFlowTask,
347        query_context: QueryContext,
348    ) -> Result<(ProcedureId, Option<Output>)> {
349        let context = self.create_context();
350        let procedure = CreateFlowProcedure::new(create_flow, query_context, context);
351        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
352
353        self.submit_procedure(procedure_with_id).await
354    }
355
356    /// Submits and executes a drop flow task.
357    #[tracing::instrument(skip_all)]
358    pub async fn submit_drop_flow_task(
359        &self,
360        drop_flow: DropFlowTask,
361    ) -> Result<(ProcedureId, Option<Output>)> {
362        let context = self.create_context();
363        let procedure = DropFlowProcedure::new(drop_flow, context);
364        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
365
366        self.submit_procedure(procedure_with_id).await
367    }
368
369    /// Submits and executes a drop view task.
370    #[tracing::instrument(skip_all)]
371    pub async fn submit_drop_view_task(
372        &self,
373        drop_view: DropViewTask,
374    ) -> Result<(ProcedureId, Option<Output>)> {
375        let context = self.create_context();
376        let procedure = DropViewProcedure::new(drop_view, context);
377        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
378
379        self.submit_procedure(procedure_with_id).await
380    }
381
382    /// Submits and executes a truncate table task.
383    #[tracing::instrument(skip_all)]
384    pub async fn submit_truncate_table_task(
385        &self,
386        truncate_table_task: TruncateTableTask,
387        table_info_value: DeserializedValueWithBytes<TableInfoValue>,
388        region_routes: Vec<RegionRoute>,
389    ) -> Result<(ProcedureId, Option<Output>)> {
390        let context = self.create_context();
391        let procedure = TruncateTableProcedure::new(
392            truncate_table_task,
393            table_info_value,
394            region_routes,
395            context,
396        );
397
398        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
399
400        self.submit_procedure(procedure_with_id).await
401    }
402
403    async fn submit_procedure(
404        &self,
405        procedure_with_id: ProcedureWithId,
406    ) -> Result<(ProcedureId, Option<Output>)> {
407        let procedure_id = procedure_with_id.id;
408
409        let mut watcher = self
410            .procedure_manager
411            .submit(procedure_with_id)
412            .await
413            .context(SubmitProcedureSnafu)?;
414
415        let output = watcher::wait(&mut watcher)
416            .await
417            .context(WaitProcedureSnafu)?;
418
419        Ok((procedure_id, output))
420    }
421}
422
423async fn handle_truncate_table_task(
424    ddl_manager: &DdlManager,
425    truncate_table_task: TruncateTableTask,
426) -> Result<SubmitDdlTaskResponse> {
427    let table_id = truncate_table_task.table_id;
428    let table_metadata_manager = &ddl_manager.table_metadata_manager();
429    let table_ref = truncate_table_task.table_ref();
430
431    let (table_info_value, table_route_value) =
432        table_metadata_manager.get_full_table_info(table_id).await?;
433
434    let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu {
435        table: table_ref.to_string(),
436    })?;
437
438    let table_route_value = table_route_value.context(TableRouteNotFoundSnafu { table_id })?;
439
440    let table_route = table_route_value.into_inner().region_routes()?.clone();
441
442    let (id, _) = ddl_manager
443        .submit_truncate_table_task(truncate_table_task, table_info_value, table_route)
444        .await?;
445
446    info!("Table: {table_id} is truncated via procedure_id {id:?}");
447
448    Ok(SubmitDdlTaskResponse {
449        key: id.to_string().into(),
450        ..Default::default()
451    })
452}
453
454async fn handle_alter_table_task(
455    ddl_manager: &DdlManager,
456    alter_table_task: AlterTableTask,
457) -> Result<SubmitDdlTaskResponse> {
458    let table_ref = alter_table_task.table_ref();
459
460    let table_id = ddl_manager
461        .table_metadata_manager()
462        .table_name_manager()
463        .get(TableNameKey::new(
464            table_ref.catalog,
465            table_ref.schema,
466            table_ref.table,
467        ))
468        .await?
469        .with_context(|| TableNotFoundSnafu {
470            table_name: table_ref.to_string(),
471        })?
472        .table_id();
473
474    let table_route_value = ddl_manager
475        .table_metadata_manager()
476        .table_route_manager()
477        .table_route_storage()
478        .get(table_id)
479        .await?
480        .context(TableRouteNotFoundSnafu { table_id })?;
481    ensure!(
482        table_route_value.is_physical(),
483        UnexpectedLogicalRouteTableSnafu {
484            err_msg: format!("{:?} is a non-physical TableRouteValue.", table_ref),
485        }
486    );
487
488    let (id, _) = ddl_manager
489        .submit_alter_table_task(table_id, alter_table_task)
490        .await?;
491
492    info!("Table: {table_id} is altered via procedure_id {id:?}");
493
494    Ok(SubmitDdlTaskResponse {
495        key: id.to_string().into(),
496        ..Default::default()
497    })
498}
499
500async fn handle_drop_table_task(
501    ddl_manager: &DdlManager,
502    drop_table_task: DropTableTask,
503) -> Result<SubmitDdlTaskResponse> {
504    let table_id = drop_table_task.table_id;
505    let (id, _) = ddl_manager.submit_drop_table_task(drop_table_task).await?;
506
507    info!("Table: {table_id} is dropped via procedure_id {id:?}");
508
509    Ok(SubmitDdlTaskResponse {
510        key: id.to_string().into(),
511        ..Default::default()
512    })
513}
514
515async fn handle_create_table_task(
516    ddl_manager: &DdlManager,
517    create_table_task: CreateTableTask,
518) -> Result<SubmitDdlTaskResponse> {
519    let (id, output) = ddl_manager
520        .submit_create_table_task(create_table_task)
521        .await?;
522
523    let procedure_id = id.to_string();
524    let output = output.context(ProcedureOutputSnafu {
525        procedure_id: &procedure_id,
526        err_msg: "empty output",
527    })?;
528    let table_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
529        procedure_id: &procedure_id,
530        err_msg: "downcast to `u32`",
531    })?);
532    info!("Table: {table_id} is created via procedure_id {id:?}");
533
534    Ok(SubmitDdlTaskResponse {
535        key: procedure_id.into(),
536        table_ids: vec![table_id],
537    })
538}
539
540async fn handle_create_logical_table_tasks(
541    ddl_manager: &DdlManager,
542    create_table_tasks: Vec<CreateTableTask>,
543) -> Result<SubmitDdlTaskResponse> {
544    ensure!(
545        !create_table_tasks.is_empty(),
546        EmptyDdlTasksSnafu {
547            name: "create logical tables"
548        }
549    );
550    let physical_table_id = utils::check_and_get_physical_table_id(
551        ddl_manager.table_metadata_manager(),
552        &create_table_tasks,
553    )
554    .await?;
555    let num_logical_tables = create_table_tasks.len();
556
557    let (id, output) = ddl_manager
558        .submit_create_logical_table_tasks(create_table_tasks, physical_table_id)
559        .await?;
560
561    info!("{num_logical_tables} logical tables on physical table: {physical_table_id:?} is created via procedure_id {id:?}");
562
563    let procedure_id = id.to_string();
564    let output = output.context(ProcedureOutputSnafu {
565        procedure_id: &procedure_id,
566        err_msg: "empty output",
567    })?;
568    let table_ids = output
569        .downcast_ref::<Vec<TableId>>()
570        .context(ProcedureOutputSnafu {
571            procedure_id: &procedure_id,
572            err_msg: "downcast to `Vec<TableId>`",
573        })?
574        .clone();
575
576    Ok(SubmitDdlTaskResponse {
577        key: procedure_id.into(),
578        table_ids,
579    })
580}
581
582async fn handle_create_database_task(
583    ddl_manager: &DdlManager,
584    create_database_task: CreateDatabaseTask,
585) -> Result<SubmitDdlTaskResponse> {
586    let (id, _) = ddl_manager
587        .submit_create_database(create_database_task.clone())
588        .await?;
589
590    let procedure_id = id.to_string();
591    info!(
592        "Database {}.{} is created via procedure_id {id:?}",
593        create_database_task.catalog, create_database_task.schema
594    );
595
596    Ok(SubmitDdlTaskResponse {
597        key: procedure_id.into(),
598        ..Default::default()
599    })
600}
601
602async fn handle_drop_database_task(
603    ddl_manager: &DdlManager,
604    drop_database_task: DropDatabaseTask,
605) -> Result<SubmitDdlTaskResponse> {
606    let (id, _) = ddl_manager
607        .submit_drop_database(drop_database_task.clone())
608        .await?;
609
610    let procedure_id = id.to_string();
611    info!(
612        "Database {}.{} is dropped via procedure_id {id:?}",
613        drop_database_task.catalog, drop_database_task.schema
614    );
615
616    Ok(SubmitDdlTaskResponse {
617        key: procedure_id.into(),
618        ..Default::default()
619    })
620}
621
622async fn handle_alter_database_task(
623    ddl_manager: &DdlManager,
624    alter_database_task: AlterDatabaseTask,
625) -> Result<SubmitDdlTaskResponse> {
626    let (id, _) = ddl_manager
627        .submit_alter_database(alter_database_task.clone())
628        .await?;
629
630    let procedure_id = id.to_string();
631    info!(
632        "Database {}.{} is altered via procedure_id {id:?}",
633        alter_database_task.catalog(),
634        alter_database_task.schema()
635    );
636
637    Ok(SubmitDdlTaskResponse {
638        key: procedure_id.into(),
639        ..Default::default()
640    })
641}
642
643async fn handle_drop_flow_task(
644    ddl_manager: &DdlManager,
645    drop_flow_task: DropFlowTask,
646) -> Result<SubmitDdlTaskResponse> {
647    let (id, _) = ddl_manager
648        .submit_drop_flow_task(drop_flow_task.clone())
649        .await?;
650
651    let procedure_id = id.to_string();
652    info!(
653        "Flow {}.{}({}) is dropped via procedure_id {id:?}",
654        drop_flow_task.catalog_name, drop_flow_task.flow_name, drop_flow_task.flow_id,
655    );
656
657    Ok(SubmitDdlTaskResponse {
658        key: procedure_id.into(),
659        ..Default::default()
660    })
661}
662
663#[cfg(feature = "enterprise")]
664async fn handle_drop_trigger_task(
665    ddl_manager: &DdlManager,
666    drop_trigger_task: DropTriggerTask,
667    query_context: QueryContext,
668) -> Result<SubmitDdlTaskResponse> {
669    let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
670        return UnsupportedSnafu {
671            operation: "drop trigger",
672        }
673        .fail();
674    };
675
676    m.drop_trigger(
677        drop_trigger_task,
678        ddl_manager.procedure_manager.clone(),
679        ddl_manager.ddl_context.clone(),
680        query_context,
681    )
682    .await
683}
684
685async fn handle_drop_view_task(
686    ddl_manager: &DdlManager,
687    drop_view_task: DropViewTask,
688) -> Result<SubmitDdlTaskResponse> {
689    let (id, _) = ddl_manager
690        .submit_drop_view_task(drop_view_task.clone())
691        .await?;
692
693    let procedure_id = id.to_string();
694    info!(
695        "View {}({}) is dropped via procedure_id {id:?}",
696        drop_view_task.table_ref(),
697        drop_view_task.view_id,
698    );
699
700    Ok(SubmitDdlTaskResponse {
701        key: procedure_id.into(),
702        ..Default::default()
703    })
704}
705
706async fn handle_create_flow_task(
707    ddl_manager: &DdlManager,
708    create_flow_task: CreateFlowTask,
709    query_context: QueryContext,
710) -> Result<SubmitDdlTaskResponse> {
711    let (id, output) = ddl_manager
712        .submit_create_flow_task(create_flow_task.clone(), query_context)
713        .await?;
714
715    let procedure_id = id.to_string();
716    let output = output.context(ProcedureOutputSnafu {
717        procedure_id: &procedure_id,
718        err_msg: "empty output",
719    })?;
720    let flow_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
721        procedure_id: &procedure_id,
722        err_msg: "downcast to `u32`",
723    })?);
724    if !create_flow_task.or_replace {
725        info!(
726            "Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
727            create_flow_task.catalog_name, create_flow_task.flow_name,
728        );
729    } else {
730        info!(
731            "Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
732            create_flow_task.catalog_name, create_flow_task.flow_name,
733        );
734    }
735
736    Ok(SubmitDdlTaskResponse {
737        key: procedure_id.into(),
738        ..Default::default()
739    })
740}
741
742#[cfg(feature = "enterprise")]
743async fn handle_create_trigger_task(
744    ddl_manager: &DdlManager,
745    create_trigger_task: CreateTriggerTask,
746    query_context: QueryContext,
747) -> Result<SubmitDdlTaskResponse> {
748    let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
749        return UnsupportedSnafu {
750            operation: "create trigger",
751        }
752        .fail();
753    };
754
755    m.create_trigger(
756        create_trigger_task,
757        ddl_manager.procedure_manager.clone(),
758        ddl_manager.ddl_context.clone(),
759        query_context,
760    )
761    .await
762}
763
764async fn handle_alter_logical_table_tasks(
765    ddl_manager: &DdlManager,
766    alter_table_tasks: Vec<AlterTableTask>,
767) -> Result<SubmitDdlTaskResponse> {
768    ensure!(
769        !alter_table_tasks.is_empty(),
770        EmptyDdlTasksSnafu {
771            name: "alter logical tables"
772        }
773    );
774
775    // Use the physical table id in the first logical table, then it will be checked in the procedure.
776    let first_table = TableNameKey {
777        catalog: &alter_table_tasks[0].alter_table.catalog_name,
778        schema: &alter_table_tasks[0].alter_table.schema_name,
779        table: &alter_table_tasks[0].alter_table.table_name,
780    };
781    let physical_table_id =
782        utils::get_physical_table_id(ddl_manager.table_metadata_manager(), first_table).await?;
783    let num_logical_tables = alter_table_tasks.len();
784
785    let (id, _) = ddl_manager
786        .submit_alter_logical_table_tasks(alter_table_tasks, physical_table_id)
787        .await?;
788
789    info!("{num_logical_tables} logical tables on physical table: {physical_table_id:?} is altered via procedure_id {id:?}");
790
791    let procedure_id = id.to_string();
792
793    Ok(SubmitDdlTaskResponse {
794        key: procedure_id.into(),
795        ..Default::default()
796    })
797}
798
799/// Handle the `[CreateViewTask]` and returns the DDL response when success.
800async fn handle_create_view_task(
801    ddl_manager: &DdlManager,
802    create_view_task: CreateViewTask,
803) -> Result<SubmitDdlTaskResponse> {
804    let (id, output) = ddl_manager
805        .submit_create_view_task(create_view_task)
806        .await?;
807
808    let procedure_id = id.to_string();
809    let output = output.context(ProcedureOutputSnafu {
810        procedure_id: &procedure_id,
811        err_msg: "empty output",
812    })?;
813    let view_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
814        procedure_id: &procedure_id,
815        err_msg: "downcast to `u32`",
816    })?);
817    info!("View: {view_id} is created via procedure_id {id:?}");
818
819    Ok(SubmitDdlTaskResponse {
820        key: procedure_id.into(),
821        table_ids: vec![view_id],
822    })
823}
824
825/// TODO(dennis): let [`DdlManager`] implement [`ProcedureExecutor`] looks weird, find some way to refactor it.
826#[async_trait::async_trait]
827impl ProcedureExecutor for DdlManager {
828    async fn submit_ddl_task(
829        &self,
830        ctx: &ExecutorContext,
831        request: SubmitDdlTaskRequest,
832    ) -> Result<SubmitDdlTaskResponse> {
833        let span = ctx
834            .tracing_context
835            .as_ref()
836            .map(TracingContext::from_w3c)
837            .unwrap_or(TracingContext::from_current_span())
838            .attach(tracing::info_span!("DdlManager::submit_ddl_task"));
839        async move {
840            debug!("Submitting Ddl task: {:?}", request.task);
841            match request.task {
842                CreateTable(create_table_task) => {
843                    handle_create_table_task(self, create_table_task).await
844                }
845                DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await,
846                AlterTable(alter_table_task) => {
847                    handle_alter_table_task(self, alter_table_task).await
848                }
849                TruncateTable(truncate_table_task) => {
850                    handle_truncate_table_task(self, truncate_table_task).await
851                }
852                CreateLogicalTables(create_table_tasks) => {
853                    handle_create_logical_table_tasks(self, create_table_tasks).await
854                }
855                AlterLogicalTables(alter_table_tasks) => {
856                    handle_alter_logical_table_tasks(self, alter_table_tasks).await
857                }
858                DropLogicalTables(_) => todo!(),
859                CreateDatabase(create_database_task) => {
860                    handle_create_database_task(self, create_database_task).await
861                }
862                DropDatabase(drop_database_task) => {
863                    handle_drop_database_task(self, drop_database_task).await
864                }
865                AlterDatabase(alter_database_task) => {
866                    handle_alter_database_task(self, alter_database_task).await
867                }
868                CreateFlow(create_flow_task) => {
869                    handle_create_flow_task(self, create_flow_task, request.query_context.into())
870                        .await
871                }
872                DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
873                CreateView(create_view_task) => {
874                    handle_create_view_task(self, create_view_task).await
875                }
876                DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
877                #[cfg(feature = "enterprise")]
878                CreateTrigger(create_trigger_task) => {
879                    handle_create_trigger_task(
880                        self,
881                        create_trigger_task,
882                        request.query_context.into(),
883                    )
884                    .await
885                }
886                #[cfg(feature = "enterprise")]
887                DropTrigger(drop_trigger_task) => {
888                    handle_drop_trigger_task(self, drop_trigger_task, request.query_context.into())
889                        .await
890                }
891            }
892        }
893        .trace(span)
894        .await
895    }
896
897    async fn migrate_region(
898        &self,
899        _ctx: &ExecutorContext,
900        _request: MigrateRegionRequest,
901    ) -> Result<MigrateRegionResponse> {
902        UnsupportedSnafu {
903            operation: "migrate_region",
904        }
905        .fail()
906    }
907
908    async fn query_procedure_state(
909        &self,
910        _ctx: &ExecutorContext,
911        pid: &str,
912    ) -> Result<ProcedureStateResponse> {
913        let pid =
914            ProcedureId::parse_str(pid).with_context(|_| ParseProcedureIdSnafu { key: pid })?;
915
916        let state = self
917            .procedure_manager
918            .procedure_state(pid)
919            .await
920            .context(QueryProcedureSnafu)?
921            .context(ProcedureNotFoundSnafu {
922                pid: pid.to_string(),
923            })?;
924
925        Ok(procedure::procedure_state_to_pb_response(&state))
926    }
927
928    async fn list_procedures(&self, _ctx: &ExecutorContext) -> Result<ProcedureDetailResponse> {
929        let metas = self
930            .procedure_manager
931            .list_procedures()
932            .await
933            .context(QueryProcedureSnafu)?;
934        Ok(procedure::procedure_details_to_pb_response(metas))
935    }
936}
937
938#[cfg(test)]
939mod tests {
940    use std::sync::Arc;
941
942    use common_procedure::local::LocalManager;
943    use common_procedure::test_util::InMemoryPoisonStore;
944
945    use super::DdlManager;
946    use crate::cache_invalidator::DummyCacheInvalidator;
947    use crate::ddl::alter_table::AlterTableProcedure;
948    use crate::ddl::create_table::CreateTableProcedure;
949    use crate::ddl::drop_table::DropTableProcedure;
950    use crate::ddl::flow_meta::FlowMetadataAllocator;
951    use crate::ddl::table_meta::TableMetadataAllocator;
952    use crate::ddl::truncate_table::TruncateTableProcedure;
953    use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
954    use crate::key::flow::FlowMetadataManager;
955    use crate::key::TableMetadataManager;
956    use crate::kv_backend::memory::MemoryKvBackend;
957    use crate::node_manager::{DatanodeRef, FlownodeRef, NodeManager};
958    use crate::peer::Peer;
959    use crate::region_keeper::MemoryRegionKeeper;
960    use crate::region_registry::LeaderRegionRegistry;
961    use crate::sequence::SequenceBuilder;
962    use crate::state_store::KvStateStore;
963    use crate::wal_options_allocator::WalOptionsAllocator;
964
965    /// A dummy implemented [NodeManager].
966    pub struct DummyDatanodeManager;
967
968    #[async_trait::async_trait]
969    impl NodeManager for DummyDatanodeManager {
970        async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
971            unimplemented!()
972        }
973
974        async fn flownode(&self, _node: &Peer) -> FlownodeRef {
975            unimplemented!()
976        }
977    }
978
979    #[test]
980    fn test_try_new() {
981        let kv_backend = Arc::new(MemoryKvBackend::new());
982        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
983        let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
984            Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
985            Arc::new(WalOptionsAllocator::default()),
986        ));
987        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
988        let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
989            Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()),
990        ));
991
992        let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
993        let poison_manager = Arc::new(InMemoryPoisonStore::default());
994        let procedure_manager = Arc::new(LocalManager::new(
995            Default::default(),
996            state_store,
997            poison_manager,
998            None,
999        ));
1000
1001        let _ = DdlManager::try_new(
1002            DdlContext {
1003                node_manager: Arc::new(DummyDatanodeManager),
1004                cache_invalidator: Arc::new(DummyCacheInvalidator),
1005                table_metadata_manager,
1006                table_metadata_allocator,
1007                flow_metadata_manager,
1008                flow_metadata_allocator,
1009                memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
1010                leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
1011                region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
1012            },
1013            procedure_manager.clone(),
1014            true,
1015        );
1016
1017        let expected_loaders = vec![
1018            CreateTableProcedure::TYPE_NAME,
1019            AlterTableProcedure::TYPE_NAME,
1020            DropTableProcedure::TYPE_NAME,
1021            TruncateTableProcedure::TYPE_NAME,
1022        ];
1023
1024        for loader in expected_loaders {
1025            assert!(procedure_manager.contains_loader(loader));
1026        }
1027    }
1028}