1use std::sync::Arc;
16
17use api::v1::meta::ProcedureDetailResponse;
18use common_procedure::{
19 watcher, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId,
20};
21use common_telemetry::tracing_context::{FutureExt, TracingContext};
22use common_telemetry::{debug, info, tracing};
23use derive_builder::Builder;
24use snafu::{ensure, OptionExt, ResultExt};
25use store_api::storage::TableId;
26
27use crate::ddl::alter_database::AlterDatabaseProcedure;
28use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
29use crate::ddl::alter_table::AlterTableProcedure;
30use crate::ddl::create_database::CreateDatabaseProcedure;
31use crate::ddl::create_flow::CreateFlowProcedure;
32use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
33use crate::ddl::create_table::CreateTableProcedure;
34use crate::ddl::create_view::CreateViewProcedure;
35use crate::ddl::drop_database::DropDatabaseProcedure;
36use crate::ddl::drop_flow::DropFlowProcedure;
37use crate::ddl::drop_table::DropTableProcedure;
38use crate::ddl::drop_view::DropViewProcedure;
39use crate::ddl::truncate_table::TruncateTableProcedure;
40use crate::ddl::{utils, DdlContext, ExecutorContext, ProcedureExecutor};
41use crate::error::{
42 EmptyDdlTasksSnafu, ParseProcedureIdSnafu, ProcedureNotFoundSnafu, ProcedureOutputSnafu,
43 QueryProcedureSnafu, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu,
44 TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
45 UnexpectedLogicalRouteTableSnafu, UnsupportedSnafu, WaitProcedureSnafu,
46};
47use crate::key::table_info::TableInfoValue;
48use crate::key::table_name::TableNameKey;
49use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
50#[cfg(feature = "enterprise")]
51use crate::rpc::ddl::trigger::CreateTriggerTask;
52#[cfg(feature = "enterprise")]
53use crate::rpc::ddl::trigger::DropTriggerTask;
54#[cfg(feature = "enterprise")]
55use crate::rpc::ddl::DdlTask::CreateTrigger;
56#[cfg(feature = "enterprise")]
57use crate::rpc::ddl::DdlTask::DropTrigger;
58use crate::rpc::ddl::DdlTask::{
59 AlterDatabase, AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables,
60 CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView,
61 TruncateTable,
62};
63use crate::rpc::ddl::{
64 AlterDatabaseTask, AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask,
65 CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, QueryContext,
66 SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
67};
68use crate::rpc::procedure;
69use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
70use crate::rpc::router::RegionRoute;
71
72pub type DdlManagerRef = Arc<DdlManager>;
73
74pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader;
75
76#[derive(Builder)]
78pub struct DdlManager {
79 ddl_context: DdlContext,
80 procedure_manager: ProcedureManagerRef,
81 #[cfg(feature = "enterprise")]
82 trigger_ddl_manager: Option<TriggerDdlManagerRef>,
83}
84
85#[cfg(feature = "enterprise")]
88#[async_trait::async_trait]
89pub trait TriggerDdlManager: Send + Sync {
90 async fn create_trigger(
91 &self,
92 create_trigger_task: CreateTriggerTask,
93 procedure_manager: ProcedureManagerRef,
94 ddl_context: DdlContext,
95 query_context: QueryContext,
96 ) -> Result<SubmitDdlTaskResponse>;
97
98 async fn drop_trigger(
99 &self,
100 drop_trigger_task: DropTriggerTask,
101 procedure_manager: ProcedureManagerRef,
102 ddl_context: DdlContext,
103 query_context: QueryContext,
104 ) -> Result<SubmitDdlTaskResponse>;
105
106 fn as_any(&self) -> &dyn std::any::Any;
107}
108
109#[cfg(feature = "enterprise")]
110pub type TriggerDdlManagerRef = Arc<dyn TriggerDdlManager>;
111
112macro_rules! procedure_loader_entry {
113 ($procedure:ident) => {
114 (
115 $procedure::TYPE_NAME,
116 &|context: DdlContext| -> BoxedProcedureLoader {
117 Box::new(move |json: &str| {
118 let context = context.clone();
119 $procedure::from_json(json, context).map(|p| Box::new(p) as _)
120 })
121 },
122 )
123 };
124}
125
126macro_rules! procedure_loader {
127 ($($procedure:ident),*) => {
128 vec![
129 $(procedure_loader_entry!($procedure)),*
130 ]
131 };
132}
133
134impl DdlManager {
135 pub fn try_new(
137 ddl_context: DdlContext,
138 procedure_manager: ProcedureManagerRef,
139 register_loaders: bool,
140 ) -> Result<Self> {
141 let manager = Self {
142 ddl_context,
143 procedure_manager,
144 #[cfg(feature = "enterprise")]
145 trigger_ddl_manager: None,
146 };
147 if register_loaders {
148 manager.register_loaders()?;
149 }
150 Ok(manager)
151 }
152
153 #[cfg(feature = "enterprise")]
154 pub fn with_trigger_ddl_manager(
155 mut self,
156 trigger_ddl_manager: Option<TriggerDdlManagerRef>,
157 ) -> Self {
158 self.trigger_ddl_manager = trigger_ddl_manager;
159 self
160 }
161
162 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
164 &self.ddl_context.table_metadata_manager
165 }
166
167 pub fn create_context(&self) -> DdlContext {
169 self.ddl_context.clone()
170 }
171
172 pub fn register_loaders(&self) -> Result<()> {
174 let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = procedure_loader!(
175 CreateTableProcedure,
176 CreateLogicalTablesProcedure,
177 CreateViewProcedure,
178 CreateFlowProcedure,
179 AlterTableProcedure,
180 AlterLogicalTablesProcedure,
181 AlterDatabaseProcedure,
182 DropTableProcedure,
183 DropFlowProcedure,
184 TruncateTableProcedure,
185 CreateDatabaseProcedure,
186 DropDatabaseProcedure,
187 DropViewProcedure
188 );
189
190 for (type_name, loader_factory) in loaders {
191 let context = self.create_context();
192 self.procedure_manager
193 .register_loader(type_name, loader_factory(context))
194 .context(RegisterProcedureLoaderSnafu { type_name })?;
195 }
196
197 Ok(())
198 }
199
200 #[tracing::instrument(skip_all)]
202 pub async fn submit_alter_table_task(
203 &self,
204 table_id: TableId,
205 alter_table_task: AlterTableTask,
206 ) -> Result<(ProcedureId, Option<Output>)> {
207 let context = self.create_context();
208
209 let procedure = AlterTableProcedure::new(table_id, alter_table_task, context)?;
210
211 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
212
213 self.submit_procedure(procedure_with_id).await
214 }
215
216 #[tracing::instrument(skip_all)]
218 pub async fn submit_create_table_task(
219 &self,
220 create_table_task: CreateTableTask,
221 ) -> Result<(ProcedureId, Option<Output>)> {
222 let context = self.create_context();
223
224 let procedure = CreateTableProcedure::new(create_table_task, context);
225
226 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
227
228 self.submit_procedure(procedure_with_id).await
229 }
230
231 #[tracing::instrument(skip_all)]
233 pub async fn submit_create_view_task(
234 &self,
235 create_view_task: CreateViewTask,
236 ) -> Result<(ProcedureId, Option<Output>)> {
237 let context = self.create_context();
238
239 let procedure = CreateViewProcedure::new(create_view_task, context);
240
241 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
242
243 self.submit_procedure(procedure_with_id).await
244 }
245
246 #[tracing::instrument(skip_all)]
248 pub async fn submit_create_logical_table_tasks(
249 &self,
250 create_table_tasks: Vec<CreateTableTask>,
251 physical_table_id: TableId,
252 ) -> Result<(ProcedureId, Option<Output>)> {
253 let context = self.create_context();
254
255 let procedure =
256 CreateLogicalTablesProcedure::new(create_table_tasks, physical_table_id, context);
257
258 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
259
260 self.submit_procedure(procedure_with_id).await
261 }
262
263 #[tracing::instrument(skip_all)]
265 pub async fn submit_alter_logical_table_tasks(
266 &self,
267 alter_table_tasks: Vec<AlterTableTask>,
268 physical_table_id: TableId,
269 ) -> Result<(ProcedureId, Option<Output>)> {
270 let context = self.create_context();
271
272 let procedure =
273 AlterLogicalTablesProcedure::new(alter_table_tasks, physical_table_id, context);
274
275 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
276
277 self.submit_procedure(procedure_with_id).await
278 }
279
280 #[tracing::instrument(skip_all)]
282 pub async fn submit_drop_table_task(
283 &self,
284 drop_table_task: DropTableTask,
285 ) -> Result<(ProcedureId, Option<Output>)> {
286 let context = self.create_context();
287
288 let procedure = DropTableProcedure::new(drop_table_task, context);
289
290 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
291
292 self.submit_procedure(procedure_with_id).await
293 }
294
295 #[tracing::instrument(skip_all)]
297 pub async fn submit_create_database(
298 &self,
299 CreateDatabaseTask {
300 catalog,
301 schema,
302 create_if_not_exists,
303 options,
304 }: CreateDatabaseTask,
305 ) -> Result<(ProcedureId, Option<Output>)> {
306 let context = self.create_context();
307 let procedure =
308 CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context);
309 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
310
311 self.submit_procedure(procedure_with_id).await
312 }
313
314 #[tracing::instrument(skip_all)]
316 pub async fn submit_drop_database(
317 &self,
318 DropDatabaseTask {
319 catalog,
320 schema,
321 drop_if_exists,
322 }: DropDatabaseTask,
323 ) -> Result<(ProcedureId, Option<Output>)> {
324 let context = self.create_context();
325 let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context);
326 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
327
328 self.submit_procedure(procedure_with_id).await
329 }
330
331 pub async fn submit_alter_database(
332 &self,
333 alter_database_task: AlterDatabaseTask,
334 ) -> Result<(ProcedureId, Option<Output>)> {
335 let context = self.create_context();
336 let procedure = AlterDatabaseProcedure::new(alter_database_task, context)?;
337 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
338
339 self.submit_procedure(procedure_with_id).await
340 }
341
342 #[tracing::instrument(skip_all)]
344 pub async fn submit_create_flow_task(
345 &self,
346 create_flow: CreateFlowTask,
347 query_context: QueryContext,
348 ) -> Result<(ProcedureId, Option<Output>)> {
349 let context = self.create_context();
350 let procedure = CreateFlowProcedure::new(create_flow, query_context, context);
351 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
352
353 self.submit_procedure(procedure_with_id).await
354 }
355
356 #[tracing::instrument(skip_all)]
358 pub async fn submit_drop_flow_task(
359 &self,
360 drop_flow: DropFlowTask,
361 ) -> Result<(ProcedureId, Option<Output>)> {
362 let context = self.create_context();
363 let procedure = DropFlowProcedure::new(drop_flow, context);
364 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
365
366 self.submit_procedure(procedure_with_id).await
367 }
368
369 #[tracing::instrument(skip_all)]
371 pub async fn submit_drop_view_task(
372 &self,
373 drop_view: DropViewTask,
374 ) -> Result<(ProcedureId, Option<Output>)> {
375 let context = self.create_context();
376 let procedure = DropViewProcedure::new(drop_view, context);
377 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
378
379 self.submit_procedure(procedure_with_id).await
380 }
381
382 #[tracing::instrument(skip_all)]
384 pub async fn submit_truncate_table_task(
385 &self,
386 truncate_table_task: TruncateTableTask,
387 table_info_value: DeserializedValueWithBytes<TableInfoValue>,
388 region_routes: Vec<RegionRoute>,
389 ) -> Result<(ProcedureId, Option<Output>)> {
390 let context = self.create_context();
391 let procedure = TruncateTableProcedure::new(
392 truncate_table_task,
393 table_info_value,
394 region_routes,
395 context,
396 );
397
398 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
399
400 self.submit_procedure(procedure_with_id).await
401 }
402
403 async fn submit_procedure(
404 &self,
405 procedure_with_id: ProcedureWithId,
406 ) -> Result<(ProcedureId, Option<Output>)> {
407 let procedure_id = procedure_with_id.id;
408
409 let mut watcher = self
410 .procedure_manager
411 .submit(procedure_with_id)
412 .await
413 .context(SubmitProcedureSnafu)?;
414
415 let output = watcher::wait(&mut watcher)
416 .await
417 .context(WaitProcedureSnafu)?;
418
419 Ok((procedure_id, output))
420 }
421}
422
423async fn handle_truncate_table_task(
424 ddl_manager: &DdlManager,
425 truncate_table_task: TruncateTableTask,
426) -> Result<SubmitDdlTaskResponse> {
427 let table_id = truncate_table_task.table_id;
428 let table_metadata_manager = &ddl_manager.table_metadata_manager();
429 let table_ref = truncate_table_task.table_ref();
430
431 let (table_info_value, table_route_value) =
432 table_metadata_manager.get_full_table_info(table_id).await?;
433
434 let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu {
435 table: table_ref.to_string(),
436 })?;
437
438 let table_route_value = table_route_value.context(TableRouteNotFoundSnafu { table_id })?;
439
440 let table_route = table_route_value.into_inner().region_routes()?.clone();
441
442 let (id, _) = ddl_manager
443 .submit_truncate_table_task(truncate_table_task, table_info_value, table_route)
444 .await?;
445
446 info!("Table: {table_id} is truncated via procedure_id {id:?}");
447
448 Ok(SubmitDdlTaskResponse {
449 key: id.to_string().into(),
450 ..Default::default()
451 })
452}
453
454async fn handle_alter_table_task(
455 ddl_manager: &DdlManager,
456 alter_table_task: AlterTableTask,
457) -> Result<SubmitDdlTaskResponse> {
458 let table_ref = alter_table_task.table_ref();
459
460 let table_id = ddl_manager
461 .table_metadata_manager()
462 .table_name_manager()
463 .get(TableNameKey::new(
464 table_ref.catalog,
465 table_ref.schema,
466 table_ref.table,
467 ))
468 .await?
469 .with_context(|| TableNotFoundSnafu {
470 table_name: table_ref.to_string(),
471 })?
472 .table_id();
473
474 let table_route_value = ddl_manager
475 .table_metadata_manager()
476 .table_route_manager()
477 .table_route_storage()
478 .get(table_id)
479 .await?
480 .context(TableRouteNotFoundSnafu { table_id })?;
481 ensure!(
482 table_route_value.is_physical(),
483 UnexpectedLogicalRouteTableSnafu {
484 err_msg: format!("{:?} is a non-physical TableRouteValue.", table_ref),
485 }
486 );
487
488 let (id, _) = ddl_manager
489 .submit_alter_table_task(table_id, alter_table_task)
490 .await?;
491
492 info!("Table: {table_id} is altered via procedure_id {id:?}");
493
494 Ok(SubmitDdlTaskResponse {
495 key: id.to_string().into(),
496 ..Default::default()
497 })
498}
499
500async fn handle_drop_table_task(
501 ddl_manager: &DdlManager,
502 drop_table_task: DropTableTask,
503) -> Result<SubmitDdlTaskResponse> {
504 let table_id = drop_table_task.table_id;
505 let (id, _) = ddl_manager.submit_drop_table_task(drop_table_task).await?;
506
507 info!("Table: {table_id} is dropped via procedure_id {id:?}");
508
509 Ok(SubmitDdlTaskResponse {
510 key: id.to_string().into(),
511 ..Default::default()
512 })
513}
514
515async fn handle_create_table_task(
516 ddl_manager: &DdlManager,
517 create_table_task: CreateTableTask,
518) -> Result<SubmitDdlTaskResponse> {
519 let (id, output) = ddl_manager
520 .submit_create_table_task(create_table_task)
521 .await?;
522
523 let procedure_id = id.to_string();
524 let output = output.context(ProcedureOutputSnafu {
525 procedure_id: &procedure_id,
526 err_msg: "empty output",
527 })?;
528 let table_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
529 procedure_id: &procedure_id,
530 err_msg: "downcast to `u32`",
531 })?);
532 info!("Table: {table_id} is created via procedure_id {id:?}");
533
534 Ok(SubmitDdlTaskResponse {
535 key: procedure_id.into(),
536 table_ids: vec![table_id],
537 })
538}
539
540async fn handle_create_logical_table_tasks(
541 ddl_manager: &DdlManager,
542 create_table_tasks: Vec<CreateTableTask>,
543) -> Result<SubmitDdlTaskResponse> {
544 ensure!(
545 !create_table_tasks.is_empty(),
546 EmptyDdlTasksSnafu {
547 name: "create logical tables"
548 }
549 );
550 let physical_table_id = utils::check_and_get_physical_table_id(
551 ddl_manager.table_metadata_manager(),
552 &create_table_tasks,
553 )
554 .await?;
555 let num_logical_tables = create_table_tasks.len();
556
557 let (id, output) = ddl_manager
558 .submit_create_logical_table_tasks(create_table_tasks, physical_table_id)
559 .await?;
560
561 info!("{num_logical_tables} logical tables on physical table: {physical_table_id:?} is created via procedure_id {id:?}");
562
563 let procedure_id = id.to_string();
564 let output = output.context(ProcedureOutputSnafu {
565 procedure_id: &procedure_id,
566 err_msg: "empty output",
567 })?;
568 let table_ids = output
569 .downcast_ref::<Vec<TableId>>()
570 .context(ProcedureOutputSnafu {
571 procedure_id: &procedure_id,
572 err_msg: "downcast to `Vec<TableId>`",
573 })?
574 .clone();
575
576 Ok(SubmitDdlTaskResponse {
577 key: procedure_id.into(),
578 table_ids,
579 })
580}
581
582async fn handle_create_database_task(
583 ddl_manager: &DdlManager,
584 create_database_task: CreateDatabaseTask,
585) -> Result<SubmitDdlTaskResponse> {
586 let (id, _) = ddl_manager
587 .submit_create_database(create_database_task.clone())
588 .await?;
589
590 let procedure_id = id.to_string();
591 info!(
592 "Database {}.{} is created via procedure_id {id:?}",
593 create_database_task.catalog, create_database_task.schema
594 );
595
596 Ok(SubmitDdlTaskResponse {
597 key: procedure_id.into(),
598 ..Default::default()
599 })
600}
601
602async fn handle_drop_database_task(
603 ddl_manager: &DdlManager,
604 drop_database_task: DropDatabaseTask,
605) -> Result<SubmitDdlTaskResponse> {
606 let (id, _) = ddl_manager
607 .submit_drop_database(drop_database_task.clone())
608 .await?;
609
610 let procedure_id = id.to_string();
611 info!(
612 "Database {}.{} is dropped via procedure_id {id:?}",
613 drop_database_task.catalog, drop_database_task.schema
614 );
615
616 Ok(SubmitDdlTaskResponse {
617 key: procedure_id.into(),
618 ..Default::default()
619 })
620}
621
622async fn handle_alter_database_task(
623 ddl_manager: &DdlManager,
624 alter_database_task: AlterDatabaseTask,
625) -> Result<SubmitDdlTaskResponse> {
626 let (id, _) = ddl_manager
627 .submit_alter_database(alter_database_task.clone())
628 .await?;
629
630 let procedure_id = id.to_string();
631 info!(
632 "Database {}.{} is altered via procedure_id {id:?}",
633 alter_database_task.catalog(),
634 alter_database_task.schema()
635 );
636
637 Ok(SubmitDdlTaskResponse {
638 key: procedure_id.into(),
639 ..Default::default()
640 })
641}
642
643async fn handle_drop_flow_task(
644 ddl_manager: &DdlManager,
645 drop_flow_task: DropFlowTask,
646) -> Result<SubmitDdlTaskResponse> {
647 let (id, _) = ddl_manager
648 .submit_drop_flow_task(drop_flow_task.clone())
649 .await?;
650
651 let procedure_id = id.to_string();
652 info!(
653 "Flow {}.{}({}) is dropped via procedure_id {id:?}",
654 drop_flow_task.catalog_name, drop_flow_task.flow_name, drop_flow_task.flow_id,
655 );
656
657 Ok(SubmitDdlTaskResponse {
658 key: procedure_id.into(),
659 ..Default::default()
660 })
661}
662
663#[cfg(feature = "enterprise")]
664async fn handle_drop_trigger_task(
665 ddl_manager: &DdlManager,
666 drop_trigger_task: DropTriggerTask,
667 query_context: QueryContext,
668) -> Result<SubmitDdlTaskResponse> {
669 let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
670 return UnsupportedSnafu {
671 operation: "drop trigger",
672 }
673 .fail();
674 };
675
676 m.drop_trigger(
677 drop_trigger_task,
678 ddl_manager.procedure_manager.clone(),
679 ddl_manager.ddl_context.clone(),
680 query_context,
681 )
682 .await
683}
684
685async fn handle_drop_view_task(
686 ddl_manager: &DdlManager,
687 drop_view_task: DropViewTask,
688) -> Result<SubmitDdlTaskResponse> {
689 let (id, _) = ddl_manager
690 .submit_drop_view_task(drop_view_task.clone())
691 .await?;
692
693 let procedure_id = id.to_string();
694 info!(
695 "View {}({}) is dropped via procedure_id {id:?}",
696 drop_view_task.table_ref(),
697 drop_view_task.view_id,
698 );
699
700 Ok(SubmitDdlTaskResponse {
701 key: procedure_id.into(),
702 ..Default::default()
703 })
704}
705
706async fn handle_create_flow_task(
707 ddl_manager: &DdlManager,
708 create_flow_task: CreateFlowTask,
709 query_context: QueryContext,
710) -> Result<SubmitDdlTaskResponse> {
711 let (id, output) = ddl_manager
712 .submit_create_flow_task(create_flow_task.clone(), query_context)
713 .await?;
714
715 let procedure_id = id.to_string();
716 let output = output.context(ProcedureOutputSnafu {
717 procedure_id: &procedure_id,
718 err_msg: "empty output",
719 })?;
720 let flow_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
721 procedure_id: &procedure_id,
722 err_msg: "downcast to `u32`",
723 })?);
724 if !create_flow_task.or_replace {
725 info!(
726 "Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
727 create_flow_task.catalog_name, create_flow_task.flow_name,
728 );
729 } else {
730 info!(
731 "Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
732 create_flow_task.catalog_name, create_flow_task.flow_name,
733 );
734 }
735
736 Ok(SubmitDdlTaskResponse {
737 key: procedure_id.into(),
738 ..Default::default()
739 })
740}
741
742#[cfg(feature = "enterprise")]
743async fn handle_create_trigger_task(
744 ddl_manager: &DdlManager,
745 create_trigger_task: CreateTriggerTask,
746 query_context: QueryContext,
747) -> Result<SubmitDdlTaskResponse> {
748 let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
749 return UnsupportedSnafu {
750 operation: "create trigger",
751 }
752 .fail();
753 };
754
755 m.create_trigger(
756 create_trigger_task,
757 ddl_manager.procedure_manager.clone(),
758 ddl_manager.ddl_context.clone(),
759 query_context,
760 )
761 .await
762}
763
764async fn handle_alter_logical_table_tasks(
765 ddl_manager: &DdlManager,
766 alter_table_tasks: Vec<AlterTableTask>,
767) -> Result<SubmitDdlTaskResponse> {
768 ensure!(
769 !alter_table_tasks.is_empty(),
770 EmptyDdlTasksSnafu {
771 name: "alter logical tables"
772 }
773 );
774
775 let first_table = TableNameKey {
777 catalog: &alter_table_tasks[0].alter_table.catalog_name,
778 schema: &alter_table_tasks[0].alter_table.schema_name,
779 table: &alter_table_tasks[0].alter_table.table_name,
780 };
781 let physical_table_id =
782 utils::get_physical_table_id(ddl_manager.table_metadata_manager(), first_table).await?;
783 let num_logical_tables = alter_table_tasks.len();
784
785 let (id, _) = ddl_manager
786 .submit_alter_logical_table_tasks(alter_table_tasks, physical_table_id)
787 .await?;
788
789 info!("{num_logical_tables} logical tables on physical table: {physical_table_id:?} is altered via procedure_id {id:?}");
790
791 let procedure_id = id.to_string();
792
793 Ok(SubmitDdlTaskResponse {
794 key: procedure_id.into(),
795 ..Default::default()
796 })
797}
798
799async fn handle_create_view_task(
801 ddl_manager: &DdlManager,
802 create_view_task: CreateViewTask,
803) -> Result<SubmitDdlTaskResponse> {
804 let (id, output) = ddl_manager
805 .submit_create_view_task(create_view_task)
806 .await?;
807
808 let procedure_id = id.to_string();
809 let output = output.context(ProcedureOutputSnafu {
810 procedure_id: &procedure_id,
811 err_msg: "empty output",
812 })?;
813 let view_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
814 procedure_id: &procedure_id,
815 err_msg: "downcast to `u32`",
816 })?);
817 info!("View: {view_id} is created via procedure_id {id:?}");
818
819 Ok(SubmitDdlTaskResponse {
820 key: procedure_id.into(),
821 table_ids: vec![view_id],
822 })
823}
824
825#[async_trait::async_trait]
827impl ProcedureExecutor for DdlManager {
828 async fn submit_ddl_task(
829 &self,
830 ctx: &ExecutorContext,
831 request: SubmitDdlTaskRequest,
832 ) -> Result<SubmitDdlTaskResponse> {
833 let span = ctx
834 .tracing_context
835 .as_ref()
836 .map(TracingContext::from_w3c)
837 .unwrap_or(TracingContext::from_current_span())
838 .attach(tracing::info_span!("DdlManager::submit_ddl_task"));
839 async move {
840 debug!("Submitting Ddl task: {:?}", request.task);
841 match request.task {
842 CreateTable(create_table_task) => {
843 handle_create_table_task(self, create_table_task).await
844 }
845 DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await,
846 AlterTable(alter_table_task) => {
847 handle_alter_table_task(self, alter_table_task).await
848 }
849 TruncateTable(truncate_table_task) => {
850 handle_truncate_table_task(self, truncate_table_task).await
851 }
852 CreateLogicalTables(create_table_tasks) => {
853 handle_create_logical_table_tasks(self, create_table_tasks).await
854 }
855 AlterLogicalTables(alter_table_tasks) => {
856 handle_alter_logical_table_tasks(self, alter_table_tasks).await
857 }
858 DropLogicalTables(_) => todo!(),
859 CreateDatabase(create_database_task) => {
860 handle_create_database_task(self, create_database_task).await
861 }
862 DropDatabase(drop_database_task) => {
863 handle_drop_database_task(self, drop_database_task).await
864 }
865 AlterDatabase(alter_database_task) => {
866 handle_alter_database_task(self, alter_database_task).await
867 }
868 CreateFlow(create_flow_task) => {
869 handle_create_flow_task(self, create_flow_task, request.query_context.into())
870 .await
871 }
872 DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
873 CreateView(create_view_task) => {
874 handle_create_view_task(self, create_view_task).await
875 }
876 DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
877 #[cfg(feature = "enterprise")]
878 CreateTrigger(create_trigger_task) => {
879 handle_create_trigger_task(
880 self,
881 create_trigger_task,
882 request.query_context.into(),
883 )
884 .await
885 }
886 #[cfg(feature = "enterprise")]
887 DropTrigger(drop_trigger_task) => {
888 handle_drop_trigger_task(self, drop_trigger_task, request.query_context.into())
889 .await
890 }
891 }
892 }
893 .trace(span)
894 .await
895 }
896
897 async fn migrate_region(
898 &self,
899 _ctx: &ExecutorContext,
900 _request: MigrateRegionRequest,
901 ) -> Result<MigrateRegionResponse> {
902 UnsupportedSnafu {
903 operation: "migrate_region",
904 }
905 .fail()
906 }
907
908 async fn query_procedure_state(
909 &self,
910 _ctx: &ExecutorContext,
911 pid: &str,
912 ) -> Result<ProcedureStateResponse> {
913 let pid =
914 ProcedureId::parse_str(pid).with_context(|_| ParseProcedureIdSnafu { key: pid })?;
915
916 let state = self
917 .procedure_manager
918 .procedure_state(pid)
919 .await
920 .context(QueryProcedureSnafu)?
921 .context(ProcedureNotFoundSnafu {
922 pid: pid.to_string(),
923 })?;
924
925 Ok(procedure::procedure_state_to_pb_response(&state))
926 }
927
928 async fn list_procedures(&self, _ctx: &ExecutorContext) -> Result<ProcedureDetailResponse> {
929 let metas = self
930 .procedure_manager
931 .list_procedures()
932 .await
933 .context(QueryProcedureSnafu)?;
934 Ok(procedure::procedure_details_to_pb_response(metas))
935 }
936}
937
938#[cfg(test)]
939mod tests {
940 use std::sync::Arc;
941
942 use common_procedure::local::LocalManager;
943 use common_procedure::test_util::InMemoryPoisonStore;
944
945 use super::DdlManager;
946 use crate::cache_invalidator::DummyCacheInvalidator;
947 use crate::ddl::alter_table::AlterTableProcedure;
948 use crate::ddl::create_table::CreateTableProcedure;
949 use crate::ddl::drop_table::DropTableProcedure;
950 use crate::ddl::flow_meta::FlowMetadataAllocator;
951 use crate::ddl::table_meta::TableMetadataAllocator;
952 use crate::ddl::truncate_table::TruncateTableProcedure;
953 use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
954 use crate::key::flow::FlowMetadataManager;
955 use crate::key::TableMetadataManager;
956 use crate::kv_backend::memory::MemoryKvBackend;
957 use crate::node_manager::{DatanodeRef, FlownodeRef, NodeManager};
958 use crate::peer::Peer;
959 use crate::region_keeper::MemoryRegionKeeper;
960 use crate::region_registry::LeaderRegionRegistry;
961 use crate::sequence::SequenceBuilder;
962 use crate::state_store::KvStateStore;
963 use crate::wal_options_allocator::WalOptionsAllocator;
964
965 pub struct DummyDatanodeManager;
967
968 #[async_trait::async_trait]
969 impl NodeManager for DummyDatanodeManager {
970 async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
971 unimplemented!()
972 }
973
974 async fn flownode(&self, _node: &Peer) -> FlownodeRef {
975 unimplemented!()
976 }
977 }
978
979 #[test]
980 fn test_try_new() {
981 let kv_backend = Arc::new(MemoryKvBackend::new());
982 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
983 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
984 Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
985 Arc::new(WalOptionsAllocator::default()),
986 ));
987 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
988 let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
989 Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()),
990 ));
991
992 let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
993 let poison_manager = Arc::new(InMemoryPoisonStore::default());
994 let procedure_manager = Arc::new(LocalManager::new(
995 Default::default(),
996 state_store,
997 poison_manager,
998 None,
999 ));
1000
1001 let _ = DdlManager::try_new(
1002 DdlContext {
1003 node_manager: Arc::new(DummyDatanodeManager),
1004 cache_invalidator: Arc::new(DummyCacheInvalidator),
1005 table_metadata_manager,
1006 table_metadata_allocator,
1007 flow_metadata_manager,
1008 flow_metadata_allocator,
1009 memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
1010 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
1011 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
1012 },
1013 procedure_manager.clone(),
1014 true,
1015 );
1016
1017 let expected_loaders = vec![
1018 CreateTableProcedure::TYPE_NAME,
1019 AlterTableProcedure::TYPE_NAME,
1020 DropTableProcedure::TYPE_NAME,
1021 TruncateTableProcedure::TYPE_NAME,
1022 ];
1023
1024 for loader in expected_loaders {
1025 assert!(procedure_manager.contains_loader(loader));
1026 }
1027 }
1028}