common_meta/
ddl_manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::meta::ProcedureDetailResponse;
18use common_procedure::{
19    watcher, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId,
20};
21use common_telemetry::tracing_context::{FutureExt, TracingContext};
22use common_telemetry::{debug, info, tracing};
23use derive_builder::Builder;
24use snafu::{ensure, OptionExt, ResultExt};
25use store_api::storage::TableId;
26
27use crate::ddl::alter_database::AlterDatabaseProcedure;
28use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
29use crate::ddl::alter_table::AlterTableProcedure;
30use crate::ddl::create_database::CreateDatabaseProcedure;
31use crate::ddl::create_flow::CreateFlowProcedure;
32use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
33use crate::ddl::create_table::CreateTableProcedure;
34use crate::ddl::create_view::CreateViewProcedure;
35use crate::ddl::drop_database::DropDatabaseProcedure;
36use crate::ddl::drop_flow::DropFlowProcedure;
37use crate::ddl::drop_table::DropTableProcedure;
38use crate::ddl::drop_view::DropViewProcedure;
39use crate::ddl::truncate_table::TruncateTableProcedure;
40use crate::ddl::{utils, DdlContext, ExecutorContext, ProcedureExecutor};
41use crate::error::{
42    EmptyDdlTasksSnafu, ParseProcedureIdSnafu, ProcedureNotFoundSnafu, ProcedureOutputSnafu,
43    QueryProcedureSnafu, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu,
44    TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
45    UnexpectedLogicalRouteTableSnafu, UnsupportedSnafu, WaitProcedureSnafu,
46};
47use crate::key::table_info::TableInfoValue;
48use crate::key::table_name::TableNameKey;
49use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
50#[cfg(feature = "enterprise")]
51use crate::rpc::ddl::trigger::CreateTriggerTask;
52#[cfg(feature = "enterprise")]
53use crate::rpc::ddl::DdlTask::CreateTrigger;
54use crate::rpc::ddl::DdlTask::{
55    AlterDatabase, AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables,
56    CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView,
57    TruncateTable,
58};
59use crate::rpc::ddl::{
60    AlterDatabaseTask, AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask,
61    CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, QueryContext,
62    SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
63};
64use crate::rpc::procedure;
65use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
66use crate::rpc::router::RegionRoute;
67
68pub type DdlManagerRef = Arc<DdlManager>;
69
70pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader;
71
72/// The [DdlManager] provides the ability to execute Ddl.
73#[derive(Builder)]
74pub struct DdlManager {
75    ddl_context: DdlContext,
76    procedure_manager: ProcedureManagerRef,
77    #[cfg(feature = "enterprise")]
78    trigger_ddl_manager: Option<TriggerDdlManagerRef>,
79}
80
81/// This trait is responsible for handling DDL tasks about triggers. e.g.,
82/// create trigger, drop trigger, etc.
83#[cfg(feature = "enterprise")]
84#[async_trait::async_trait]
85pub trait TriggerDdlManager: Send + Sync {
86    async fn create_trigger(
87        &self,
88        create_trigger_task: CreateTriggerTask,
89        procedure_manager: ProcedureManagerRef,
90        ddl_context: DdlContext,
91        query_context: QueryContext,
92    ) -> Result<SubmitDdlTaskResponse>;
93
94    fn as_any(&self) -> &dyn std::any::Any;
95}
96
97#[cfg(feature = "enterprise")]
98pub type TriggerDdlManagerRef = Arc<dyn TriggerDdlManager>;
99
100macro_rules! procedure_loader_entry {
101    ($procedure:ident) => {
102        (
103            $procedure::TYPE_NAME,
104            &|context: DdlContext| -> BoxedProcedureLoader {
105                Box::new(move |json: &str| {
106                    let context = context.clone();
107                    $procedure::from_json(json, context).map(|p| Box::new(p) as _)
108                })
109            },
110        )
111    };
112}
113
114macro_rules! procedure_loader {
115    ($($procedure:ident),*) => {
116        vec![
117            $(procedure_loader_entry!($procedure)),*
118        ]
119    };
120}
121
122impl DdlManager {
123    /// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered.
124    pub fn try_new(
125        ddl_context: DdlContext,
126        procedure_manager: ProcedureManagerRef,
127        register_loaders: bool,
128        #[cfg(feature = "enterprise")] trigger_ddl_manager: Option<TriggerDdlManagerRef>,
129    ) -> Result<Self> {
130        let manager = Self {
131            ddl_context,
132            procedure_manager,
133            #[cfg(feature = "enterprise")]
134            trigger_ddl_manager,
135        };
136        if register_loaders {
137            manager.register_loaders()?;
138        }
139        Ok(manager)
140    }
141
142    /// Returns the [TableMetadataManagerRef].
143    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
144        &self.ddl_context.table_metadata_manager
145    }
146
147    /// Returns the [DdlContext]
148    pub fn create_context(&self) -> DdlContext {
149        self.ddl_context.clone()
150    }
151
152    /// Registers all Ddl loaders.
153    pub fn register_loaders(&self) -> Result<()> {
154        let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = procedure_loader!(
155            CreateTableProcedure,
156            CreateLogicalTablesProcedure,
157            CreateViewProcedure,
158            CreateFlowProcedure,
159            AlterTableProcedure,
160            AlterLogicalTablesProcedure,
161            AlterDatabaseProcedure,
162            DropTableProcedure,
163            DropFlowProcedure,
164            TruncateTableProcedure,
165            CreateDatabaseProcedure,
166            DropDatabaseProcedure,
167            DropViewProcedure
168        );
169
170        for (type_name, loader_factory) in loaders {
171            let context = self.create_context();
172            self.procedure_manager
173                .register_loader(type_name, loader_factory(context))
174                .context(RegisterProcedureLoaderSnafu { type_name })?;
175        }
176
177        Ok(())
178    }
179
180    /// Submits and executes an alter table task.
181    #[tracing::instrument(skip_all)]
182    pub async fn submit_alter_table_task(
183        &self,
184        table_id: TableId,
185        alter_table_task: AlterTableTask,
186    ) -> Result<(ProcedureId, Option<Output>)> {
187        let context = self.create_context();
188
189        let procedure = AlterTableProcedure::new(table_id, alter_table_task, context)?;
190
191        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
192
193        self.submit_procedure(procedure_with_id).await
194    }
195
196    /// Submits and executes a create table task.
197    #[tracing::instrument(skip_all)]
198    pub async fn submit_create_table_task(
199        &self,
200        create_table_task: CreateTableTask,
201    ) -> Result<(ProcedureId, Option<Output>)> {
202        let context = self.create_context();
203
204        let procedure = CreateTableProcedure::new(create_table_task, context);
205
206        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
207
208        self.submit_procedure(procedure_with_id).await
209    }
210
211    /// Submits and executes a `[CreateViewTask]`.
212    #[tracing::instrument(skip_all)]
213    pub async fn submit_create_view_task(
214        &self,
215        create_view_task: CreateViewTask,
216    ) -> Result<(ProcedureId, Option<Output>)> {
217        let context = self.create_context();
218
219        let procedure = CreateViewProcedure::new(create_view_task, context);
220
221        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
222
223        self.submit_procedure(procedure_with_id).await
224    }
225
226    /// Submits and executes a create multiple logical table tasks.
227    #[tracing::instrument(skip_all)]
228    pub async fn submit_create_logical_table_tasks(
229        &self,
230        create_table_tasks: Vec<CreateTableTask>,
231        physical_table_id: TableId,
232    ) -> Result<(ProcedureId, Option<Output>)> {
233        let context = self.create_context();
234
235        let procedure =
236            CreateLogicalTablesProcedure::new(create_table_tasks, physical_table_id, context);
237
238        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
239
240        self.submit_procedure(procedure_with_id).await
241    }
242
243    /// Submits and executes alter multiple table tasks.
244    #[tracing::instrument(skip_all)]
245    pub async fn submit_alter_logical_table_tasks(
246        &self,
247        alter_table_tasks: Vec<AlterTableTask>,
248        physical_table_id: TableId,
249    ) -> Result<(ProcedureId, Option<Output>)> {
250        let context = self.create_context();
251
252        let procedure =
253            AlterLogicalTablesProcedure::new(alter_table_tasks, physical_table_id, context);
254
255        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
256
257        self.submit_procedure(procedure_with_id).await
258    }
259
260    /// Submits and executes a drop table task.
261    #[tracing::instrument(skip_all)]
262    pub async fn submit_drop_table_task(
263        &self,
264        drop_table_task: DropTableTask,
265    ) -> Result<(ProcedureId, Option<Output>)> {
266        let context = self.create_context();
267
268        let procedure = DropTableProcedure::new(drop_table_task, context);
269
270        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
271
272        self.submit_procedure(procedure_with_id).await
273    }
274
275    /// Submits and executes a create database task.
276    #[tracing::instrument(skip_all)]
277    pub async fn submit_create_database(
278        &self,
279        CreateDatabaseTask {
280            catalog,
281            schema,
282            create_if_not_exists,
283            options,
284        }: CreateDatabaseTask,
285    ) -> Result<(ProcedureId, Option<Output>)> {
286        let context = self.create_context();
287        let procedure =
288            CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context);
289        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
290
291        self.submit_procedure(procedure_with_id).await
292    }
293
294    /// Submits and executes a drop table task.
295    #[tracing::instrument(skip_all)]
296    pub async fn submit_drop_database(
297        &self,
298        DropDatabaseTask {
299            catalog,
300            schema,
301            drop_if_exists,
302        }: DropDatabaseTask,
303    ) -> Result<(ProcedureId, Option<Output>)> {
304        let context = self.create_context();
305        let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context);
306        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
307
308        self.submit_procedure(procedure_with_id).await
309    }
310
311    pub async fn submit_alter_database(
312        &self,
313        alter_database_task: AlterDatabaseTask,
314    ) -> Result<(ProcedureId, Option<Output>)> {
315        let context = self.create_context();
316        let procedure = AlterDatabaseProcedure::new(alter_database_task, context)?;
317        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
318
319        self.submit_procedure(procedure_with_id).await
320    }
321
322    /// Submits and executes a create flow task.
323    #[tracing::instrument(skip_all)]
324    pub async fn submit_create_flow_task(
325        &self,
326        create_flow: CreateFlowTask,
327        query_context: QueryContext,
328    ) -> Result<(ProcedureId, Option<Output>)> {
329        let context = self.create_context();
330        let procedure = CreateFlowProcedure::new(create_flow, query_context, context);
331        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
332
333        self.submit_procedure(procedure_with_id).await
334    }
335
336    /// Submits and executes a drop flow task.
337    #[tracing::instrument(skip_all)]
338    pub async fn submit_drop_flow_task(
339        &self,
340        drop_flow: DropFlowTask,
341    ) -> Result<(ProcedureId, Option<Output>)> {
342        let context = self.create_context();
343        let procedure = DropFlowProcedure::new(drop_flow, context);
344        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
345
346        self.submit_procedure(procedure_with_id).await
347    }
348
349    /// Submits and executes a drop view task.
350    #[tracing::instrument(skip_all)]
351    pub async fn submit_drop_view_task(
352        &self,
353        drop_view: DropViewTask,
354    ) -> Result<(ProcedureId, Option<Output>)> {
355        let context = self.create_context();
356        let procedure = DropViewProcedure::new(drop_view, context);
357        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
358
359        self.submit_procedure(procedure_with_id).await
360    }
361
362    /// Submits and executes a truncate table task.
363    #[tracing::instrument(skip_all)]
364    pub async fn submit_truncate_table_task(
365        &self,
366        truncate_table_task: TruncateTableTask,
367        table_info_value: DeserializedValueWithBytes<TableInfoValue>,
368        region_routes: Vec<RegionRoute>,
369    ) -> Result<(ProcedureId, Option<Output>)> {
370        let context = self.create_context();
371        let procedure = TruncateTableProcedure::new(
372            truncate_table_task,
373            table_info_value,
374            region_routes,
375            context,
376        );
377
378        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
379
380        self.submit_procedure(procedure_with_id).await
381    }
382
383    async fn submit_procedure(
384        &self,
385        procedure_with_id: ProcedureWithId,
386    ) -> Result<(ProcedureId, Option<Output>)> {
387        let procedure_id = procedure_with_id.id;
388
389        let mut watcher = self
390            .procedure_manager
391            .submit(procedure_with_id)
392            .await
393            .context(SubmitProcedureSnafu)?;
394
395        let output = watcher::wait(&mut watcher)
396            .await
397            .context(WaitProcedureSnafu)?;
398
399        Ok((procedure_id, output))
400    }
401}
402
403async fn handle_truncate_table_task(
404    ddl_manager: &DdlManager,
405    truncate_table_task: TruncateTableTask,
406) -> Result<SubmitDdlTaskResponse> {
407    let table_id = truncate_table_task.table_id;
408    let table_metadata_manager = &ddl_manager.table_metadata_manager();
409    let table_ref = truncate_table_task.table_ref();
410
411    let (table_info_value, table_route_value) =
412        table_metadata_manager.get_full_table_info(table_id).await?;
413
414    let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu {
415        table: table_ref.to_string(),
416    })?;
417
418    let table_route_value = table_route_value.context(TableRouteNotFoundSnafu { table_id })?;
419
420    let table_route = table_route_value.into_inner().region_routes()?.clone();
421
422    let (id, _) = ddl_manager
423        .submit_truncate_table_task(truncate_table_task, table_info_value, table_route)
424        .await?;
425
426    info!("Table: {table_id} is truncated via procedure_id {id:?}");
427
428    Ok(SubmitDdlTaskResponse {
429        key: id.to_string().into(),
430        ..Default::default()
431    })
432}
433
434async fn handle_alter_table_task(
435    ddl_manager: &DdlManager,
436    alter_table_task: AlterTableTask,
437) -> Result<SubmitDdlTaskResponse> {
438    let table_ref = alter_table_task.table_ref();
439
440    let table_id = ddl_manager
441        .table_metadata_manager()
442        .table_name_manager()
443        .get(TableNameKey::new(
444            table_ref.catalog,
445            table_ref.schema,
446            table_ref.table,
447        ))
448        .await?
449        .with_context(|| TableNotFoundSnafu {
450            table_name: table_ref.to_string(),
451        })?
452        .table_id();
453
454    let table_route_value = ddl_manager
455        .table_metadata_manager()
456        .table_route_manager()
457        .table_route_storage()
458        .get(table_id)
459        .await?
460        .context(TableRouteNotFoundSnafu { table_id })?;
461    ensure!(
462        table_route_value.is_physical(),
463        UnexpectedLogicalRouteTableSnafu {
464            err_msg: format!("{:?} is a non-physical TableRouteValue.", table_ref),
465        }
466    );
467
468    let (id, _) = ddl_manager
469        .submit_alter_table_task(table_id, alter_table_task)
470        .await?;
471
472    info!("Table: {table_id} is altered via procedure_id {id:?}");
473
474    Ok(SubmitDdlTaskResponse {
475        key: id.to_string().into(),
476        ..Default::default()
477    })
478}
479
480async fn handle_drop_table_task(
481    ddl_manager: &DdlManager,
482    drop_table_task: DropTableTask,
483) -> Result<SubmitDdlTaskResponse> {
484    let table_id = drop_table_task.table_id;
485    let (id, _) = ddl_manager.submit_drop_table_task(drop_table_task).await?;
486
487    info!("Table: {table_id} is dropped via procedure_id {id:?}");
488
489    Ok(SubmitDdlTaskResponse {
490        key: id.to_string().into(),
491        ..Default::default()
492    })
493}
494
495async fn handle_create_table_task(
496    ddl_manager: &DdlManager,
497    create_table_task: CreateTableTask,
498) -> Result<SubmitDdlTaskResponse> {
499    let (id, output) = ddl_manager
500        .submit_create_table_task(create_table_task)
501        .await?;
502
503    let procedure_id = id.to_string();
504    let output = output.context(ProcedureOutputSnafu {
505        procedure_id: &procedure_id,
506        err_msg: "empty output",
507    })?;
508    let table_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
509        procedure_id: &procedure_id,
510        err_msg: "downcast to `u32`",
511    })?);
512    info!("Table: {table_id} is created via procedure_id {id:?}");
513
514    Ok(SubmitDdlTaskResponse {
515        key: procedure_id.into(),
516        table_ids: vec![table_id],
517    })
518}
519
520async fn handle_create_logical_table_tasks(
521    ddl_manager: &DdlManager,
522    create_table_tasks: Vec<CreateTableTask>,
523) -> Result<SubmitDdlTaskResponse> {
524    ensure!(
525        !create_table_tasks.is_empty(),
526        EmptyDdlTasksSnafu {
527            name: "create logical tables"
528        }
529    );
530    let physical_table_id = utils::check_and_get_physical_table_id(
531        ddl_manager.table_metadata_manager(),
532        &create_table_tasks,
533    )
534    .await?;
535    let num_logical_tables = create_table_tasks.len();
536
537    let (id, output) = ddl_manager
538        .submit_create_logical_table_tasks(create_table_tasks, physical_table_id)
539        .await?;
540
541    info!("{num_logical_tables} logical tables on physical table: {physical_table_id:?} is created via procedure_id {id:?}");
542
543    let procedure_id = id.to_string();
544    let output = output.context(ProcedureOutputSnafu {
545        procedure_id: &procedure_id,
546        err_msg: "empty output",
547    })?;
548    let table_ids = output
549        .downcast_ref::<Vec<TableId>>()
550        .context(ProcedureOutputSnafu {
551            procedure_id: &procedure_id,
552            err_msg: "downcast to `Vec<TableId>`",
553        })?
554        .clone();
555
556    Ok(SubmitDdlTaskResponse {
557        key: procedure_id.into(),
558        table_ids,
559    })
560}
561
562async fn handle_create_database_task(
563    ddl_manager: &DdlManager,
564    create_database_task: CreateDatabaseTask,
565) -> Result<SubmitDdlTaskResponse> {
566    let (id, _) = ddl_manager
567        .submit_create_database(create_database_task.clone())
568        .await?;
569
570    let procedure_id = id.to_string();
571    info!(
572        "Database {}.{} is created via procedure_id {id:?}",
573        create_database_task.catalog, create_database_task.schema
574    );
575
576    Ok(SubmitDdlTaskResponse {
577        key: procedure_id.into(),
578        ..Default::default()
579    })
580}
581
582async fn handle_drop_database_task(
583    ddl_manager: &DdlManager,
584    drop_database_task: DropDatabaseTask,
585) -> Result<SubmitDdlTaskResponse> {
586    let (id, _) = ddl_manager
587        .submit_drop_database(drop_database_task.clone())
588        .await?;
589
590    let procedure_id = id.to_string();
591    info!(
592        "Database {}.{} is dropped via procedure_id {id:?}",
593        drop_database_task.catalog, drop_database_task.schema
594    );
595
596    Ok(SubmitDdlTaskResponse {
597        key: procedure_id.into(),
598        ..Default::default()
599    })
600}
601
602async fn handle_alter_database_task(
603    ddl_manager: &DdlManager,
604    alter_database_task: AlterDatabaseTask,
605) -> Result<SubmitDdlTaskResponse> {
606    let (id, _) = ddl_manager
607        .submit_alter_database(alter_database_task.clone())
608        .await?;
609
610    let procedure_id = id.to_string();
611    info!(
612        "Database {}.{} is altered via procedure_id {id:?}",
613        alter_database_task.catalog(),
614        alter_database_task.schema()
615    );
616
617    Ok(SubmitDdlTaskResponse {
618        key: procedure_id.into(),
619        ..Default::default()
620    })
621}
622
623async fn handle_drop_flow_task(
624    ddl_manager: &DdlManager,
625    drop_flow_task: DropFlowTask,
626) -> Result<SubmitDdlTaskResponse> {
627    let (id, _) = ddl_manager
628        .submit_drop_flow_task(drop_flow_task.clone())
629        .await?;
630
631    let procedure_id = id.to_string();
632    info!(
633        "Flow {}.{}({}) is dropped via procedure_id {id:?}",
634        drop_flow_task.catalog_name, drop_flow_task.flow_name, drop_flow_task.flow_id,
635    );
636
637    Ok(SubmitDdlTaskResponse {
638        key: procedure_id.into(),
639        ..Default::default()
640    })
641}
642
643async fn handle_drop_view_task(
644    ddl_manager: &DdlManager,
645    drop_view_task: DropViewTask,
646) -> Result<SubmitDdlTaskResponse> {
647    let (id, _) = ddl_manager
648        .submit_drop_view_task(drop_view_task.clone())
649        .await?;
650
651    let procedure_id = id.to_string();
652    info!(
653        "View {}({}) is dropped via procedure_id {id:?}",
654        drop_view_task.table_ref(),
655        drop_view_task.view_id,
656    );
657
658    Ok(SubmitDdlTaskResponse {
659        key: procedure_id.into(),
660        ..Default::default()
661    })
662}
663
664async fn handle_create_flow_task(
665    ddl_manager: &DdlManager,
666    create_flow_task: CreateFlowTask,
667    query_context: QueryContext,
668) -> Result<SubmitDdlTaskResponse> {
669    let (id, output) = ddl_manager
670        .submit_create_flow_task(create_flow_task.clone(), query_context)
671        .await?;
672
673    let procedure_id = id.to_string();
674    let output = output.context(ProcedureOutputSnafu {
675        procedure_id: &procedure_id,
676        err_msg: "empty output",
677    })?;
678    let flow_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
679        procedure_id: &procedure_id,
680        err_msg: "downcast to `u32`",
681    })?);
682    if !create_flow_task.or_replace {
683        info!(
684            "Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
685            create_flow_task.catalog_name, create_flow_task.flow_name,
686        );
687    } else {
688        info!(
689            "Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
690            create_flow_task.catalog_name, create_flow_task.flow_name,
691        );
692    }
693
694    Ok(SubmitDdlTaskResponse {
695        key: procedure_id.into(),
696        ..Default::default()
697    })
698}
699
700#[cfg(feature = "enterprise")]
701async fn handle_create_trigger_task(
702    ddl_manager: &DdlManager,
703    create_trigger_task: CreateTriggerTask,
704    query_context: QueryContext,
705) -> Result<SubmitDdlTaskResponse> {
706    let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
707        return UnsupportedSnafu {
708            operation: "create trigger",
709        }
710        .fail();
711    };
712
713    m.create_trigger(
714        create_trigger_task,
715        ddl_manager.procedure_manager.clone(),
716        ddl_manager.ddl_context.clone(),
717        query_context,
718    )
719    .await
720}
721
722async fn handle_alter_logical_table_tasks(
723    ddl_manager: &DdlManager,
724    alter_table_tasks: Vec<AlterTableTask>,
725) -> Result<SubmitDdlTaskResponse> {
726    ensure!(
727        !alter_table_tasks.is_empty(),
728        EmptyDdlTasksSnafu {
729            name: "alter logical tables"
730        }
731    );
732
733    // Use the physical table id in the first logical table, then it will be checked in the procedure.
734    let first_table = TableNameKey {
735        catalog: &alter_table_tasks[0].alter_table.catalog_name,
736        schema: &alter_table_tasks[0].alter_table.schema_name,
737        table: &alter_table_tasks[0].alter_table.table_name,
738    };
739    let physical_table_id =
740        utils::get_physical_table_id(ddl_manager.table_metadata_manager(), first_table).await?;
741    let num_logical_tables = alter_table_tasks.len();
742
743    let (id, _) = ddl_manager
744        .submit_alter_logical_table_tasks(alter_table_tasks, physical_table_id)
745        .await?;
746
747    info!("{num_logical_tables} logical tables on physical table: {physical_table_id:?} is altered via procedure_id {id:?}");
748
749    let procedure_id = id.to_string();
750
751    Ok(SubmitDdlTaskResponse {
752        key: procedure_id.into(),
753        ..Default::default()
754    })
755}
756
757/// Handle the `[CreateViewTask]` and returns the DDL response when success.
758async fn handle_create_view_task(
759    ddl_manager: &DdlManager,
760    create_view_task: CreateViewTask,
761) -> Result<SubmitDdlTaskResponse> {
762    let (id, output) = ddl_manager
763        .submit_create_view_task(create_view_task)
764        .await?;
765
766    let procedure_id = id.to_string();
767    let output = output.context(ProcedureOutputSnafu {
768        procedure_id: &procedure_id,
769        err_msg: "empty output",
770    })?;
771    let view_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
772        procedure_id: &procedure_id,
773        err_msg: "downcast to `u32`",
774    })?);
775    info!("View: {view_id} is created via procedure_id {id:?}");
776
777    Ok(SubmitDdlTaskResponse {
778        key: procedure_id.into(),
779        table_ids: vec![view_id],
780    })
781}
782
783/// TODO(dennis): let [`DdlManager`] implement [`ProcedureExecutor`] looks weird, find some way to refactor it.
784#[async_trait::async_trait]
785impl ProcedureExecutor for DdlManager {
786    async fn submit_ddl_task(
787        &self,
788        ctx: &ExecutorContext,
789        request: SubmitDdlTaskRequest,
790    ) -> Result<SubmitDdlTaskResponse> {
791        let span = ctx
792            .tracing_context
793            .as_ref()
794            .map(TracingContext::from_w3c)
795            .unwrap_or(TracingContext::from_current_span())
796            .attach(tracing::info_span!("DdlManager::submit_ddl_task"));
797        async move {
798            debug!("Submitting Ddl task: {:?}", request.task);
799            match request.task {
800                CreateTable(create_table_task) => {
801                    handle_create_table_task(self, create_table_task).await
802                }
803                DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await,
804                AlterTable(alter_table_task) => {
805                    handle_alter_table_task(self, alter_table_task).await
806                }
807                TruncateTable(truncate_table_task) => {
808                    handle_truncate_table_task(self, truncate_table_task).await
809                }
810                CreateLogicalTables(create_table_tasks) => {
811                    handle_create_logical_table_tasks(self, create_table_tasks).await
812                }
813                AlterLogicalTables(alter_table_tasks) => {
814                    handle_alter_logical_table_tasks(self, alter_table_tasks).await
815                }
816                DropLogicalTables(_) => todo!(),
817                CreateDatabase(create_database_task) => {
818                    handle_create_database_task(self, create_database_task).await
819                }
820                DropDatabase(drop_database_task) => {
821                    handle_drop_database_task(self, drop_database_task).await
822                }
823                AlterDatabase(alter_database_task) => {
824                    handle_alter_database_task(self, alter_database_task).await
825                }
826                CreateFlow(create_flow_task) => {
827                    handle_create_flow_task(self, create_flow_task, request.query_context.into())
828                        .await
829                }
830                #[cfg(feature = "enterprise")]
831                CreateTrigger(create_trigger_task) => {
832                    handle_create_trigger_task(
833                        self,
834                        create_trigger_task,
835                        request.query_context.into(),
836                    )
837                    .await
838                }
839                DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
840                CreateView(create_view_task) => {
841                    handle_create_view_task(self, create_view_task).await
842                }
843                DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
844            }
845        }
846        .trace(span)
847        .await
848    }
849
850    async fn migrate_region(
851        &self,
852        _ctx: &ExecutorContext,
853        _request: MigrateRegionRequest,
854    ) -> Result<MigrateRegionResponse> {
855        UnsupportedSnafu {
856            operation: "migrate_region",
857        }
858        .fail()
859    }
860
861    async fn query_procedure_state(
862        &self,
863        _ctx: &ExecutorContext,
864        pid: &str,
865    ) -> Result<ProcedureStateResponse> {
866        let pid =
867            ProcedureId::parse_str(pid).with_context(|_| ParseProcedureIdSnafu { key: pid })?;
868
869        let state = self
870            .procedure_manager
871            .procedure_state(pid)
872            .await
873            .context(QueryProcedureSnafu)?
874            .context(ProcedureNotFoundSnafu {
875                pid: pid.to_string(),
876            })?;
877
878        Ok(procedure::procedure_state_to_pb_response(&state))
879    }
880
881    async fn list_procedures(&self, _ctx: &ExecutorContext) -> Result<ProcedureDetailResponse> {
882        let metas = self
883            .procedure_manager
884            .list_procedures()
885            .await
886            .context(QueryProcedureSnafu)?;
887        Ok(procedure::procedure_details_to_pb_response(metas))
888    }
889}
890
891#[cfg(test)]
892mod tests {
893    use std::sync::Arc;
894
895    use common_procedure::local::LocalManager;
896    use common_procedure::test_util::InMemoryPoisonStore;
897
898    use super::DdlManager;
899    use crate::cache_invalidator::DummyCacheInvalidator;
900    use crate::ddl::alter_table::AlterTableProcedure;
901    use crate::ddl::create_table::CreateTableProcedure;
902    use crate::ddl::drop_table::DropTableProcedure;
903    use crate::ddl::flow_meta::FlowMetadataAllocator;
904    use crate::ddl::table_meta::TableMetadataAllocator;
905    use crate::ddl::truncate_table::TruncateTableProcedure;
906    use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
907    use crate::key::flow::FlowMetadataManager;
908    use crate::key::TableMetadataManager;
909    use crate::kv_backend::memory::MemoryKvBackend;
910    use crate::node_manager::{DatanodeRef, FlownodeRef, NodeManager};
911    use crate::peer::Peer;
912    use crate::region_keeper::MemoryRegionKeeper;
913    use crate::region_registry::LeaderRegionRegistry;
914    use crate::sequence::SequenceBuilder;
915    use crate::state_store::KvStateStore;
916    use crate::wal_options_allocator::WalOptionsAllocator;
917
918    /// A dummy implemented [NodeManager].
919    pub struct DummyDatanodeManager;
920
921    #[async_trait::async_trait]
922    impl NodeManager for DummyDatanodeManager {
923        async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
924            unimplemented!()
925        }
926
927        async fn flownode(&self, _node: &Peer) -> FlownodeRef {
928            unimplemented!()
929        }
930    }
931
932    #[test]
933    fn test_try_new() {
934        let kv_backend = Arc::new(MemoryKvBackend::new());
935        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
936        let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
937            Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
938            Arc::new(WalOptionsAllocator::default()),
939        ));
940        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
941        let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
942            Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()),
943        ));
944
945        let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
946        let poison_manager = Arc::new(InMemoryPoisonStore::default());
947        let procedure_manager = Arc::new(LocalManager::new(
948            Default::default(),
949            state_store,
950            poison_manager,
951        ));
952
953        let _ = DdlManager::try_new(
954            DdlContext {
955                node_manager: Arc::new(DummyDatanodeManager),
956                cache_invalidator: Arc::new(DummyCacheInvalidator),
957                table_metadata_manager,
958                table_metadata_allocator,
959                flow_metadata_manager,
960                flow_metadata_allocator,
961                memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
962                leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
963                region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
964            },
965            procedure_manager.clone(),
966            true,
967            #[cfg(feature = "enterprise")]
968            None,
969        );
970
971        let expected_loaders = vec![
972            CreateTableProcedure::TYPE_NAME,
973            AlterTableProcedure::TYPE_NAME,
974            DropTableProcedure::TYPE_NAME,
975            TruncateTableProcedure::TYPE_NAME,
976        ];
977
978        for loader in expected_loaders {
979            assert!(procedure_manager.contains_loader(loader));
980        }
981    }
982}