common_meta/
ddl_manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::meta::ProcedureDetailResponse;
18use common_procedure::{
19    watcher, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId,
20};
21use common_telemetry::tracing_context::{FutureExt, TracingContext};
22use common_telemetry::{debug, info, tracing};
23use derive_builder::Builder;
24use snafu::{ensure, OptionExt, ResultExt};
25use store_api::storage::TableId;
26
27use crate::ddl::alter_database::AlterDatabaseProcedure;
28use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
29use crate::ddl::alter_table::AlterTableProcedure;
30use crate::ddl::create_database::CreateDatabaseProcedure;
31use crate::ddl::create_flow::CreateFlowProcedure;
32use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
33use crate::ddl::create_table::CreateTableProcedure;
34use crate::ddl::create_view::CreateViewProcedure;
35use crate::ddl::drop_database::DropDatabaseProcedure;
36use crate::ddl::drop_flow::DropFlowProcedure;
37use crate::ddl::drop_table::DropTableProcedure;
38use crate::ddl::drop_view::DropViewProcedure;
39use crate::ddl::truncate_table::TruncateTableProcedure;
40use crate::ddl::{utils, DdlContext, ExecutorContext, ProcedureExecutor};
41use crate::error::{
42    EmptyDdlTasksSnafu, ParseProcedureIdSnafu, ProcedureNotFoundSnafu, ProcedureOutputSnafu,
43    QueryProcedureSnafu, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu,
44    TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
45    UnexpectedLogicalRouteTableSnafu, UnsupportedSnafu, WaitProcedureSnafu,
46};
47use crate::key::table_info::TableInfoValue;
48use crate::key::table_name::TableNameKey;
49use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
50use crate::rpc::ddl::DdlTask::{
51    AlterDatabase, AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables,
52    CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView,
53    TruncateTable,
54};
55use crate::rpc::ddl::{
56    AlterDatabaseTask, AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask,
57    CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, QueryContext,
58    SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
59};
60use crate::rpc::procedure;
61use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
62use crate::rpc::router::RegionRoute;
63
64pub type DdlManagerRef = Arc<DdlManager>;
65
66pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader;
67
68/// The [DdlManager] provides the ability to execute Ddl.
69#[derive(Builder)]
70pub struct DdlManager {
71    ddl_context: DdlContext,
72    procedure_manager: ProcedureManagerRef,
73}
74
75macro_rules! procedure_loader_entry {
76    ($procedure:ident) => {
77        (
78            $procedure::TYPE_NAME,
79            &|context: DdlContext| -> BoxedProcedureLoader {
80                Box::new(move |json: &str| {
81                    let context = context.clone();
82                    $procedure::from_json(json, context).map(|p| Box::new(p) as _)
83                })
84            },
85        )
86    };
87}
88
89macro_rules! procedure_loader {
90    ($($procedure:ident),*) => {
91        vec![
92            $(procedure_loader_entry!($procedure)),*
93        ]
94    };
95}
96
97impl DdlManager {
98    /// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered.
99    pub fn try_new(
100        ddl_context: DdlContext,
101        procedure_manager: ProcedureManagerRef,
102        register_loaders: bool,
103    ) -> Result<Self> {
104        let manager = Self {
105            ddl_context,
106            procedure_manager,
107        };
108        if register_loaders {
109            manager.register_loaders()?;
110        }
111        Ok(manager)
112    }
113
114    /// Returns the [TableMetadataManagerRef].
115    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
116        &self.ddl_context.table_metadata_manager
117    }
118
119    /// Returns the [DdlContext]
120    pub fn create_context(&self) -> DdlContext {
121        self.ddl_context.clone()
122    }
123
124    /// Registers all Ddl loaders.
125    pub fn register_loaders(&self) -> Result<()> {
126        let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = procedure_loader!(
127            CreateTableProcedure,
128            CreateLogicalTablesProcedure,
129            CreateViewProcedure,
130            CreateFlowProcedure,
131            AlterTableProcedure,
132            AlterLogicalTablesProcedure,
133            AlterDatabaseProcedure,
134            DropTableProcedure,
135            DropFlowProcedure,
136            TruncateTableProcedure,
137            CreateDatabaseProcedure,
138            DropDatabaseProcedure,
139            DropViewProcedure
140        );
141
142        for (type_name, loader_factory) in loaders {
143            let context = self.create_context();
144            self.procedure_manager
145                .register_loader(type_name, loader_factory(context))
146                .context(RegisterProcedureLoaderSnafu { type_name })?;
147        }
148
149        Ok(())
150    }
151
152    /// Submits and executes an alter table task.
153    #[tracing::instrument(skip_all)]
154    pub async fn submit_alter_table_task(
155        &self,
156        table_id: TableId,
157        alter_table_task: AlterTableTask,
158    ) -> Result<(ProcedureId, Option<Output>)> {
159        let context = self.create_context();
160
161        let procedure = AlterTableProcedure::new(table_id, alter_table_task, context)?;
162
163        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
164
165        self.submit_procedure(procedure_with_id).await
166    }
167
168    /// Submits and executes a create table task.
169    #[tracing::instrument(skip_all)]
170    pub async fn submit_create_table_task(
171        &self,
172        create_table_task: CreateTableTask,
173    ) -> Result<(ProcedureId, Option<Output>)> {
174        let context = self.create_context();
175
176        let procedure = CreateTableProcedure::new(create_table_task, context);
177
178        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
179
180        self.submit_procedure(procedure_with_id).await
181    }
182
183    /// Submits and executes a `[CreateViewTask]`.
184    #[tracing::instrument(skip_all)]
185    pub async fn submit_create_view_task(
186        &self,
187        create_view_task: CreateViewTask,
188    ) -> Result<(ProcedureId, Option<Output>)> {
189        let context = self.create_context();
190
191        let procedure = CreateViewProcedure::new(create_view_task, context);
192
193        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
194
195        self.submit_procedure(procedure_with_id).await
196    }
197
198    /// Submits and executes a create multiple logical table tasks.
199    #[tracing::instrument(skip_all)]
200    pub async fn submit_create_logical_table_tasks(
201        &self,
202        create_table_tasks: Vec<CreateTableTask>,
203        physical_table_id: TableId,
204    ) -> Result<(ProcedureId, Option<Output>)> {
205        let context = self.create_context();
206
207        let procedure =
208            CreateLogicalTablesProcedure::new(create_table_tasks, physical_table_id, context);
209
210        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
211
212        self.submit_procedure(procedure_with_id).await
213    }
214
215    /// Submits and executes alter multiple table tasks.
216    #[tracing::instrument(skip_all)]
217    pub async fn submit_alter_logical_table_tasks(
218        &self,
219        alter_table_tasks: Vec<AlterTableTask>,
220        physical_table_id: TableId,
221    ) -> Result<(ProcedureId, Option<Output>)> {
222        let context = self.create_context();
223
224        let procedure =
225            AlterLogicalTablesProcedure::new(alter_table_tasks, physical_table_id, context);
226
227        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
228
229        self.submit_procedure(procedure_with_id).await
230    }
231
232    /// Submits and executes a drop table task.
233    #[tracing::instrument(skip_all)]
234    pub async fn submit_drop_table_task(
235        &self,
236        drop_table_task: DropTableTask,
237    ) -> Result<(ProcedureId, Option<Output>)> {
238        let context = self.create_context();
239
240        let procedure = DropTableProcedure::new(drop_table_task, context);
241
242        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
243
244        self.submit_procedure(procedure_with_id).await
245    }
246
247    /// Submits and executes a create database task.
248    #[tracing::instrument(skip_all)]
249    pub async fn submit_create_database(
250        &self,
251        CreateDatabaseTask {
252            catalog,
253            schema,
254            create_if_not_exists,
255            options,
256        }: CreateDatabaseTask,
257    ) -> Result<(ProcedureId, Option<Output>)> {
258        let context = self.create_context();
259        let procedure =
260            CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context);
261        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
262
263        self.submit_procedure(procedure_with_id).await
264    }
265
266    /// Submits and executes a drop table task.
267    #[tracing::instrument(skip_all)]
268    pub async fn submit_drop_database(
269        &self,
270        DropDatabaseTask {
271            catalog,
272            schema,
273            drop_if_exists,
274        }: DropDatabaseTask,
275    ) -> Result<(ProcedureId, Option<Output>)> {
276        let context = self.create_context();
277        let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context);
278        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
279
280        self.submit_procedure(procedure_with_id).await
281    }
282
283    pub async fn submit_alter_database(
284        &self,
285        alter_database_task: AlterDatabaseTask,
286    ) -> Result<(ProcedureId, Option<Output>)> {
287        let context = self.create_context();
288        let procedure = AlterDatabaseProcedure::new(alter_database_task, context)?;
289        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
290
291        self.submit_procedure(procedure_with_id).await
292    }
293
294    /// Submits and executes a create flow task.
295    #[tracing::instrument(skip_all)]
296    pub async fn submit_create_flow_task(
297        &self,
298        create_flow: CreateFlowTask,
299        query_context: QueryContext,
300    ) -> Result<(ProcedureId, Option<Output>)> {
301        let context = self.create_context();
302        let procedure = CreateFlowProcedure::new(create_flow, query_context, context);
303        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
304
305        self.submit_procedure(procedure_with_id).await
306    }
307
308    /// Submits and executes a drop flow task.
309    #[tracing::instrument(skip_all)]
310    pub async fn submit_drop_flow_task(
311        &self,
312        drop_flow: DropFlowTask,
313    ) -> Result<(ProcedureId, Option<Output>)> {
314        let context = self.create_context();
315        let procedure = DropFlowProcedure::new(drop_flow, context);
316        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
317
318        self.submit_procedure(procedure_with_id).await
319    }
320
321    /// Submits and executes a drop view task.
322    #[tracing::instrument(skip_all)]
323    pub async fn submit_drop_view_task(
324        &self,
325        drop_view: DropViewTask,
326    ) -> Result<(ProcedureId, Option<Output>)> {
327        let context = self.create_context();
328        let procedure = DropViewProcedure::new(drop_view, context);
329        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
330
331        self.submit_procedure(procedure_with_id).await
332    }
333
334    /// Submits and executes a truncate table task.
335    #[tracing::instrument(skip_all)]
336    pub async fn submit_truncate_table_task(
337        &self,
338        truncate_table_task: TruncateTableTask,
339        table_info_value: DeserializedValueWithBytes<TableInfoValue>,
340        region_routes: Vec<RegionRoute>,
341    ) -> Result<(ProcedureId, Option<Output>)> {
342        let context = self.create_context();
343        let procedure = TruncateTableProcedure::new(
344            truncate_table_task,
345            table_info_value,
346            region_routes,
347            context,
348        );
349
350        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
351
352        self.submit_procedure(procedure_with_id).await
353    }
354
355    async fn submit_procedure(
356        &self,
357        procedure_with_id: ProcedureWithId,
358    ) -> Result<(ProcedureId, Option<Output>)> {
359        let procedure_id = procedure_with_id.id;
360
361        let mut watcher = self
362            .procedure_manager
363            .submit(procedure_with_id)
364            .await
365            .context(SubmitProcedureSnafu)?;
366
367        let output = watcher::wait(&mut watcher)
368            .await
369            .context(WaitProcedureSnafu)?;
370
371        Ok((procedure_id, output))
372    }
373}
374
375async fn handle_truncate_table_task(
376    ddl_manager: &DdlManager,
377    truncate_table_task: TruncateTableTask,
378) -> Result<SubmitDdlTaskResponse> {
379    let table_id = truncate_table_task.table_id;
380    let table_metadata_manager = &ddl_manager.table_metadata_manager();
381    let table_ref = truncate_table_task.table_ref();
382
383    let (table_info_value, table_route_value) =
384        table_metadata_manager.get_full_table_info(table_id).await?;
385
386    let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu {
387        table: table_ref.to_string(),
388    })?;
389
390    let table_route_value = table_route_value.context(TableRouteNotFoundSnafu { table_id })?;
391
392    let table_route = table_route_value.into_inner().region_routes()?.clone();
393
394    let (id, _) = ddl_manager
395        .submit_truncate_table_task(truncate_table_task, table_info_value, table_route)
396        .await?;
397
398    info!("Table: {table_id} is truncated via procedure_id {id:?}");
399
400    Ok(SubmitDdlTaskResponse {
401        key: id.to_string().into(),
402        ..Default::default()
403    })
404}
405
406async fn handle_alter_table_task(
407    ddl_manager: &DdlManager,
408    alter_table_task: AlterTableTask,
409) -> Result<SubmitDdlTaskResponse> {
410    let table_ref = alter_table_task.table_ref();
411
412    let table_id = ddl_manager
413        .table_metadata_manager()
414        .table_name_manager()
415        .get(TableNameKey::new(
416            table_ref.catalog,
417            table_ref.schema,
418            table_ref.table,
419        ))
420        .await?
421        .with_context(|| TableNotFoundSnafu {
422            table_name: table_ref.to_string(),
423        })?
424        .table_id();
425
426    let table_route_value = ddl_manager
427        .table_metadata_manager()
428        .table_route_manager()
429        .table_route_storage()
430        .get(table_id)
431        .await?
432        .context(TableRouteNotFoundSnafu { table_id })?;
433    ensure!(
434        table_route_value.is_physical(),
435        UnexpectedLogicalRouteTableSnafu {
436            err_msg: format!("{:?} is a non-physical TableRouteValue.", table_ref),
437        }
438    );
439
440    let (id, _) = ddl_manager
441        .submit_alter_table_task(table_id, alter_table_task)
442        .await?;
443
444    info!("Table: {table_id} is altered via procedure_id {id:?}");
445
446    Ok(SubmitDdlTaskResponse {
447        key: id.to_string().into(),
448        ..Default::default()
449    })
450}
451
452async fn handle_drop_table_task(
453    ddl_manager: &DdlManager,
454    drop_table_task: DropTableTask,
455) -> Result<SubmitDdlTaskResponse> {
456    let table_id = drop_table_task.table_id;
457    let (id, _) = ddl_manager.submit_drop_table_task(drop_table_task).await?;
458
459    info!("Table: {table_id} is dropped via procedure_id {id:?}");
460
461    Ok(SubmitDdlTaskResponse {
462        key: id.to_string().into(),
463        ..Default::default()
464    })
465}
466
467async fn handle_create_table_task(
468    ddl_manager: &DdlManager,
469    create_table_task: CreateTableTask,
470) -> Result<SubmitDdlTaskResponse> {
471    let (id, output) = ddl_manager
472        .submit_create_table_task(create_table_task)
473        .await?;
474
475    let procedure_id = id.to_string();
476    let output = output.context(ProcedureOutputSnafu {
477        procedure_id: &procedure_id,
478        err_msg: "empty output",
479    })?;
480    let table_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
481        procedure_id: &procedure_id,
482        err_msg: "downcast to `u32`",
483    })?);
484    info!("Table: {table_id} is created via procedure_id {id:?}");
485
486    Ok(SubmitDdlTaskResponse {
487        key: procedure_id.into(),
488        table_ids: vec![table_id],
489    })
490}
491
492async fn handle_create_logical_table_tasks(
493    ddl_manager: &DdlManager,
494    create_table_tasks: Vec<CreateTableTask>,
495) -> Result<SubmitDdlTaskResponse> {
496    ensure!(
497        !create_table_tasks.is_empty(),
498        EmptyDdlTasksSnafu {
499            name: "create logical tables"
500        }
501    );
502    let physical_table_id = utils::check_and_get_physical_table_id(
503        ddl_manager.table_metadata_manager(),
504        &create_table_tasks,
505    )
506    .await?;
507    let num_logical_tables = create_table_tasks.len();
508
509    let (id, output) = ddl_manager
510        .submit_create_logical_table_tasks(create_table_tasks, physical_table_id)
511        .await?;
512
513    info!("{num_logical_tables} logical tables on physical table: {physical_table_id:?} is created via procedure_id {id:?}");
514
515    let procedure_id = id.to_string();
516    let output = output.context(ProcedureOutputSnafu {
517        procedure_id: &procedure_id,
518        err_msg: "empty output",
519    })?;
520    let table_ids = output
521        .downcast_ref::<Vec<TableId>>()
522        .context(ProcedureOutputSnafu {
523            procedure_id: &procedure_id,
524            err_msg: "downcast to `Vec<TableId>`",
525        })?
526        .clone();
527
528    Ok(SubmitDdlTaskResponse {
529        key: procedure_id.into(),
530        table_ids,
531    })
532}
533
534async fn handle_create_database_task(
535    ddl_manager: &DdlManager,
536    create_database_task: CreateDatabaseTask,
537) -> Result<SubmitDdlTaskResponse> {
538    let (id, _) = ddl_manager
539        .submit_create_database(create_database_task.clone())
540        .await?;
541
542    let procedure_id = id.to_string();
543    info!(
544        "Database {}.{} is created via procedure_id {id:?}",
545        create_database_task.catalog, create_database_task.schema
546    );
547
548    Ok(SubmitDdlTaskResponse {
549        key: procedure_id.into(),
550        ..Default::default()
551    })
552}
553
554async fn handle_drop_database_task(
555    ddl_manager: &DdlManager,
556    drop_database_task: DropDatabaseTask,
557) -> Result<SubmitDdlTaskResponse> {
558    let (id, _) = ddl_manager
559        .submit_drop_database(drop_database_task.clone())
560        .await?;
561
562    let procedure_id = id.to_string();
563    info!(
564        "Database {}.{} is dropped via procedure_id {id:?}",
565        drop_database_task.catalog, drop_database_task.schema
566    );
567
568    Ok(SubmitDdlTaskResponse {
569        key: procedure_id.into(),
570        ..Default::default()
571    })
572}
573
574async fn handle_alter_database_task(
575    ddl_manager: &DdlManager,
576    alter_database_task: AlterDatabaseTask,
577) -> Result<SubmitDdlTaskResponse> {
578    let (id, _) = ddl_manager
579        .submit_alter_database(alter_database_task.clone())
580        .await?;
581
582    let procedure_id = id.to_string();
583    info!(
584        "Database {}.{} is altered via procedure_id {id:?}",
585        alter_database_task.catalog(),
586        alter_database_task.schema()
587    );
588
589    Ok(SubmitDdlTaskResponse {
590        key: procedure_id.into(),
591        ..Default::default()
592    })
593}
594
595async fn handle_drop_flow_task(
596    ddl_manager: &DdlManager,
597    drop_flow_task: DropFlowTask,
598) -> Result<SubmitDdlTaskResponse> {
599    let (id, _) = ddl_manager
600        .submit_drop_flow_task(drop_flow_task.clone())
601        .await?;
602
603    let procedure_id = id.to_string();
604    info!(
605        "Flow {}.{}({}) is dropped via procedure_id {id:?}",
606        drop_flow_task.catalog_name, drop_flow_task.flow_name, drop_flow_task.flow_id,
607    );
608
609    Ok(SubmitDdlTaskResponse {
610        key: procedure_id.into(),
611        ..Default::default()
612    })
613}
614
615async fn handle_drop_view_task(
616    ddl_manager: &DdlManager,
617    drop_view_task: DropViewTask,
618) -> Result<SubmitDdlTaskResponse> {
619    let (id, _) = ddl_manager
620        .submit_drop_view_task(drop_view_task.clone())
621        .await?;
622
623    let procedure_id = id.to_string();
624    info!(
625        "View {}({}) is dropped via procedure_id {id:?}",
626        drop_view_task.table_ref(),
627        drop_view_task.view_id,
628    );
629
630    Ok(SubmitDdlTaskResponse {
631        key: procedure_id.into(),
632        ..Default::default()
633    })
634}
635
636async fn handle_create_flow_task(
637    ddl_manager: &DdlManager,
638    create_flow_task: CreateFlowTask,
639    query_context: QueryContext,
640) -> Result<SubmitDdlTaskResponse> {
641    let (id, output) = ddl_manager
642        .submit_create_flow_task(create_flow_task.clone(), query_context)
643        .await?;
644
645    let procedure_id = id.to_string();
646    let output = output.context(ProcedureOutputSnafu {
647        procedure_id: &procedure_id,
648        err_msg: "empty output",
649    })?;
650    let flow_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
651        procedure_id: &procedure_id,
652        err_msg: "downcast to `u32`",
653    })?);
654    if !create_flow_task.or_replace {
655        info!(
656            "Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
657            create_flow_task.catalog_name, create_flow_task.flow_name,
658        );
659    } else {
660        info!(
661            "Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
662            create_flow_task.catalog_name, create_flow_task.flow_name,
663        );
664    }
665
666    Ok(SubmitDdlTaskResponse {
667        key: procedure_id.into(),
668        ..Default::default()
669    })
670}
671
672async fn handle_alter_logical_table_tasks(
673    ddl_manager: &DdlManager,
674    alter_table_tasks: Vec<AlterTableTask>,
675) -> Result<SubmitDdlTaskResponse> {
676    ensure!(
677        !alter_table_tasks.is_empty(),
678        EmptyDdlTasksSnafu {
679            name: "alter logical tables"
680        }
681    );
682
683    // Use the physical table id in the first logical table, then it will be checked in the procedure.
684    let first_table = TableNameKey {
685        catalog: &alter_table_tasks[0].alter_table.catalog_name,
686        schema: &alter_table_tasks[0].alter_table.schema_name,
687        table: &alter_table_tasks[0].alter_table.table_name,
688    };
689    let physical_table_id =
690        utils::get_physical_table_id(ddl_manager.table_metadata_manager(), first_table).await?;
691    let num_logical_tables = alter_table_tasks.len();
692
693    let (id, _) = ddl_manager
694        .submit_alter_logical_table_tasks(alter_table_tasks, physical_table_id)
695        .await?;
696
697    info!("{num_logical_tables} logical tables on physical table: {physical_table_id:?} is altered via procedure_id {id:?}");
698
699    let procedure_id = id.to_string();
700
701    Ok(SubmitDdlTaskResponse {
702        key: procedure_id.into(),
703        ..Default::default()
704    })
705}
706
707/// Handle the `[CreateViewTask]` and returns the DDL response when success.
708async fn handle_create_view_task(
709    ddl_manager: &DdlManager,
710    create_view_task: CreateViewTask,
711) -> Result<SubmitDdlTaskResponse> {
712    let (id, output) = ddl_manager
713        .submit_create_view_task(create_view_task)
714        .await?;
715
716    let procedure_id = id.to_string();
717    let output = output.context(ProcedureOutputSnafu {
718        procedure_id: &procedure_id,
719        err_msg: "empty output",
720    })?;
721    let view_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
722        procedure_id: &procedure_id,
723        err_msg: "downcast to `u32`",
724    })?);
725    info!("View: {view_id} is created via procedure_id {id:?}");
726
727    Ok(SubmitDdlTaskResponse {
728        key: procedure_id.into(),
729        table_ids: vec![view_id],
730    })
731}
732
733/// TODO(dennis): let [`DdlManager`] implement [`ProcedureExecutor`] looks weird, find some way to refactor it.
734#[async_trait::async_trait]
735impl ProcedureExecutor for DdlManager {
736    async fn submit_ddl_task(
737        &self,
738        ctx: &ExecutorContext,
739        request: SubmitDdlTaskRequest,
740    ) -> Result<SubmitDdlTaskResponse> {
741        let span = ctx
742            .tracing_context
743            .as_ref()
744            .map(TracingContext::from_w3c)
745            .unwrap_or(TracingContext::from_current_span())
746            .attach(tracing::info_span!("DdlManager::submit_ddl_task"));
747        async move {
748            debug!("Submitting Ddl task: {:?}", request.task);
749            match request.task {
750                CreateTable(create_table_task) => {
751                    handle_create_table_task(self, create_table_task).await
752                }
753                DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await,
754                AlterTable(alter_table_task) => {
755                    handle_alter_table_task(self, alter_table_task).await
756                }
757                TruncateTable(truncate_table_task) => {
758                    handle_truncate_table_task(self, truncate_table_task).await
759                }
760                CreateLogicalTables(create_table_tasks) => {
761                    handle_create_logical_table_tasks(self, create_table_tasks).await
762                }
763                AlterLogicalTables(alter_table_tasks) => {
764                    handle_alter_logical_table_tasks(self, alter_table_tasks).await
765                }
766                DropLogicalTables(_) => todo!(),
767                CreateDatabase(create_database_task) => {
768                    handle_create_database_task(self, create_database_task).await
769                }
770                DropDatabase(drop_database_task) => {
771                    handle_drop_database_task(self, drop_database_task).await
772                }
773                AlterDatabase(alter_database_task) => {
774                    handle_alter_database_task(self, alter_database_task).await
775                }
776                CreateFlow(create_flow_task) => {
777                    handle_create_flow_task(self, create_flow_task, request.query_context.into())
778                        .await
779                }
780                DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
781                CreateView(create_view_task) => {
782                    handle_create_view_task(self, create_view_task).await
783                }
784                DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
785            }
786        }
787        .trace(span)
788        .await
789    }
790
791    async fn migrate_region(
792        &self,
793        _ctx: &ExecutorContext,
794        _request: MigrateRegionRequest,
795    ) -> Result<MigrateRegionResponse> {
796        UnsupportedSnafu {
797            operation: "migrate_region",
798        }
799        .fail()
800    }
801
802    async fn query_procedure_state(
803        &self,
804        _ctx: &ExecutorContext,
805        pid: &str,
806    ) -> Result<ProcedureStateResponse> {
807        let pid =
808            ProcedureId::parse_str(pid).with_context(|_| ParseProcedureIdSnafu { key: pid })?;
809
810        let state = self
811            .procedure_manager
812            .procedure_state(pid)
813            .await
814            .context(QueryProcedureSnafu)?
815            .context(ProcedureNotFoundSnafu {
816                pid: pid.to_string(),
817            })?;
818
819        Ok(procedure::procedure_state_to_pb_response(&state))
820    }
821
822    async fn list_procedures(&self, _ctx: &ExecutorContext) -> Result<ProcedureDetailResponse> {
823        let metas = self
824            .procedure_manager
825            .list_procedures()
826            .await
827            .context(QueryProcedureSnafu)?;
828        Ok(procedure::procedure_details_to_pb_response(metas))
829    }
830}
831
832#[cfg(test)]
833mod tests {
834    use std::sync::Arc;
835
836    use common_procedure::local::LocalManager;
837    use common_procedure::test_util::InMemoryPoisonStore;
838
839    use super::DdlManager;
840    use crate::cache_invalidator::DummyCacheInvalidator;
841    use crate::ddl::alter_table::AlterTableProcedure;
842    use crate::ddl::create_table::CreateTableProcedure;
843    use crate::ddl::drop_table::DropTableProcedure;
844    use crate::ddl::flow_meta::FlowMetadataAllocator;
845    use crate::ddl::table_meta::TableMetadataAllocator;
846    use crate::ddl::truncate_table::TruncateTableProcedure;
847    use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
848    use crate::key::flow::FlowMetadataManager;
849    use crate::key::TableMetadataManager;
850    use crate::kv_backend::memory::MemoryKvBackend;
851    use crate::node_manager::{DatanodeRef, FlownodeRef, NodeManager};
852    use crate::peer::Peer;
853    use crate::region_keeper::MemoryRegionKeeper;
854    use crate::region_registry::LeaderRegionRegistry;
855    use crate::sequence::SequenceBuilder;
856    use crate::state_store::KvStateStore;
857    use crate::wal_options_allocator::WalOptionsAllocator;
858
859    /// A dummy implemented [NodeManager].
860    pub struct DummyDatanodeManager;
861
862    #[async_trait::async_trait]
863    impl NodeManager for DummyDatanodeManager {
864        async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
865            unimplemented!()
866        }
867
868        async fn flownode(&self, _node: &Peer) -> FlownodeRef {
869            unimplemented!()
870        }
871    }
872
873    #[test]
874    fn test_try_new() {
875        let kv_backend = Arc::new(MemoryKvBackend::new());
876        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
877        let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
878            Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
879            Arc::new(WalOptionsAllocator::default()),
880        ));
881        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
882        let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
883            Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()),
884        ));
885
886        let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
887        let poison_manager = Arc::new(InMemoryPoisonStore::default());
888        let procedure_manager = Arc::new(LocalManager::new(
889            Default::default(),
890            state_store,
891            poison_manager,
892        ));
893
894        let _ = DdlManager::try_new(
895            DdlContext {
896                node_manager: Arc::new(DummyDatanodeManager),
897                cache_invalidator: Arc::new(DummyCacheInvalidator),
898                table_metadata_manager,
899                table_metadata_allocator,
900                flow_metadata_manager,
901                flow_metadata_allocator,
902                memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
903                leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
904                region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
905            },
906            procedure_manager.clone(),
907            true,
908        );
909
910        let expected_loaders = vec![
911            CreateTableProcedure::TYPE_NAME,
912            AlterTableProcedure::TYPE_NAME,
913            DropTableProcedure::TYPE_NAME,
914            TruncateTableProcedure::TYPE_NAME,
915        ];
916
917        for loader in expected_loaders {
918            assert!(procedure_manager.contains_loader(loader));
919        }
920    }
921}