1use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::Repartition;
19use api::v1::alter_table_expr::Kind;
20use common_error::ext::BoxedError;
21use common_procedure::{
22 BoxedProcedure, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef,
23 ProcedureWithId, watcher,
24};
25use common_telemetry::tracing_context::{FutureExt, TracingContext};
26use common_telemetry::{debug, info, tracing};
27use derive_builder::Builder;
28use snafu::{OptionExt, ResultExt, ensure};
29use store_api::storage::TableId;
30use table::table_name::TableName;
31
32use crate::ddl::alter_database::AlterDatabaseProcedure;
33use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
34use crate::ddl::alter_table::AlterTableProcedure;
35use crate::ddl::comment_on::CommentOnProcedure;
36use crate::ddl::create_database::CreateDatabaseProcedure;
37use crate::ddl::create_flow::CreateFlowProcedure;
38use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
39use crate::ddl::create_table::CreateTableProcedure;
40use crate::ddl::create_view::CreateViewProcedure;
41use crate::ddl::drop_database::DropDatabaseProcedure;
42use crate::ddl::drop_flow::DropFlowProcedure;
43use crate::ddl::drop_table::DropTableProcedure;
44use crate::ddl::drop_view::DropViewProcedure;
45use crate::ddl::truncate_table::TruncateTableProcedure;
46use crate::ddl::{DdlContext, utils};
47use crate::error::{
48 self, CreateRepartitionProcedureSnafu, EmptyDdlTasksSnafu, ProcedureOutputSnafu,
49 RegisterProcedureLoaderSnafu, RegisterRepartitionProcedureLoaderSnafu, Result,
50 SubmitProcedureSnafu, TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
51 UnexpectedLogicalRouteTableSnafu, WaitProcedureSnafu,
52};
53use crate::key::table_info::TableInfoValue;
54use crate::key::table_name::TableNameKey;
55use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
56use crate::procedure_executor::ExecutorContext;
57#[cfg(feature = "enterprise")]
58use crate::rpc::ddl::DdlTask::CreateTrigger;
59#[cfg(feature = "enterprise")]
60use crate::rpc::ddl::DdlTask::DropTrigger;
61use crate::rpc::ddl::DdlTask::{
62 AlterDatabase, AlterLogicalTables, AlterTable, CommentOn, CreateDatabase, CreateFlow,
63 CreateLogicalTables, CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables,
64 DropTable, DropView, TruncateTable,
65};
66#[cfg(feature = "enterprise")]
67use crate::rpc::ddl::trigger::CreateTriggerTask;
68#[cfg(feature = "enterprise")]
69use crate::rpc::ddl::trigger::DropTriggerTask;
70use crate::rpc::ddl::{
71 AlterDatabaseTask, AlterTableTask, CommentOnTask, CreateDatabaseTask, CreateFlowTask,
72 CreateTableTask, CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask,
73 QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
74};
75
76#[async_trait::async_trait]
78pub trait DdlManagerConfigurator<C>: Send + Sync {
79 async fn configure(
81 &self,
82 ddl_manager: DdlManager,
83 ctx: C,
84 ) -> std::result::Result<DdlManager, BoxedError>;
85}
86
87pub type DdlManagerConfiguratorRef<C> = Arc<dyn DdlManagerConfigurator<C>>;
88
89pub type DdlManagerRef = Arc<DdlManager>;
90
91pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader;
92
93#[derive(Builder)]
95pub struct DdlManager {
96 ddl_context: DdlContext,
97 procedure_manager: ProcedureManagerRef,
98 repartition_procedure_factory: RepartitionProcedureFactoryRef,
99 #[cfg(feature = "enterprise")]
100 trigger_ddl_manager: Option<TriggerDdlManagerRef>,
101}
102
103#[cfg(feature = "enterprise")]
106#[async_trait::async_trait]
107pub trait TriggerDdlManager: Send + Sync {
108 async fn create_trigger(
109 &self,
110 create_trigger_task: CreateTriggerTask,
111 procedure_manager: ProcedureManagerRef,
112 ddl_context: DdlContext,
113 query_context: QueryContext,
114 ) -> Result<SubmitDdlTaskResponse>;
115
116 async fn drop_trigger(
117 &self,
118 drop_trigger_task: DropTriggerTask,
119 procedure_manager: ProcedureManagerRef,
120 ddl_context: DdlContext,
121 query_context: QueryContext,
122 ) -> Result<SubmitDdlTaskResponse>;
123
124 fn as_any(&self) -> &dyn std::any::Any;
125}
126
127#[cfg(feature = "enterprise")]
128pub type TriggerDdlManagerRef = Arc<dyn TriggerDdlManager>;
129
130macro_rules! procedure_loader_entry {
131 ($procedure:ident) => {
132 (
133 $procedure::TYPE_NAME,
134 &|context: DdlContext| -> BoxedProcedureLoader {
135 Box::new(move |json: &str| {
136 let context = context.clone();
137 $procedure::from_json(json, context).map(|p| Box::new(p) as _)
138 })
139 },
140 )
141 };
142}
143
144macro_rules! procedure_loader {
145 ($($procedure:ident),*) => {
146 vec![
147 $(procedure_loader_entry!($procedure)),*
148 ]
149 };
150}
151
152pub type RepartitionProcedureFactoryRef = Arc<dyn RepartitionProcedureFactory>;
153
154pub trait RepartitionProcedureFactory: Send + Sync {
155 fn create(
156 &self,
157 ddl_ctx: &DdlContext,
158 table_name: TableName,
159 table_id: TableId,
160 from_exprs: Vec<String>,
161 to_exprs: Vec<String>,
162 timeout: Option<Duration>,
163 ) -> std::result::Result<BoxedProcedure, BoxedError>;
164
165 fn register_loaders(
166 &self,
167 ddl_ctx: &DdlContext,
168 procedure_manager: &ProcedureManagerRef,
169 ) -> std::result::Result<(), BoxedError>;
170}
171
172#[derive(Debug, Clone, Copy)]
177pub struct DdlOptions {
178 pub timeout: Duration,
182 pub wait: bool,
189}
190
191impl DdlManager {
192 pub fn try_new(
194 ddl_context: DdlContext,
195 procedure_manager: ProcedureManagerRef,
196 repartition_procedure_factory: RepartitionProcedureFactoryRef,
197 register_loaders: bool,
198 ) -> Result<Self> {
199 let manager = Self {
200 ddl_context,
201 procedure_manager,
202 repartition_procedure_factory,
203 #[cfg(feature = "enterprise")]
204 trigger_ddl_manager: None,
205 };
206 if register_loaders {
207 manager.register_loaders()?;
208 }
209 Ok(manager)
210 }
211
212 #[cfg(feature = "enterprise")]
213 pub fn with_trigger_ddl_manager(mut self, trigger_ddl_manager: TriggerDdlManagerRef) -> Self {
214 self.trigger_ddl_manager = Some(trigger_ddl_manager);
215 self
216 }
217
218 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
220 &self.ddl_context.table_metadata_manager
221 }
222
223 pub fn create_context(&self) -> DdlContext {
225 self.ddl_context.clone()
226 }
227
228 pub fn register_loaders(&self) -> Result<()> {
230 let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = procedure_loader!(
231 CreateTableProcedure,
232 CreateLogicalTablesProcedure,
233 CreateViewProcedure,
234 CreateFlowProcedure,
235 AlterTableProcedure,
236 AlterLogicalTablesProcedure,
237 AlterDatabaseProcedure,
238 DropTableProcedure,
239 DropFlowProcedure,
240 TruncateTableProcedure,
241 CreateDatabaseProcedure,
242 DropDatabaseProcedure,
243 DropViewProcedure,
244 CommentOnProcedure
245 );
246
247 for (type_name, loader_factory) in loaders {
248 let context = self.create_context();
249 self.procedure_manager
250 .register_loader(type_name, loader_factory(context))
251 .context(RegisterProcedureLoaderSnafu { type_name })?;
252 }
253
254 self.repartition_procedure_factory
255 .register_loaders(&self.ddl_context, &self.procedure_manager)
256 .context(RegisterRepartitionProcedureLoaderSnafu)?;
257
258 Ok(())
259 }
260
261 async fn submit_repartition_task(
280 &self,
281 table_id: TableId,
282 table_name: TableName,
283 Repartition {
284 from_partition_exprs,
285 into_partition_exprs,
286 }: Repartition,
287 wait: bool,
288 timeout: Duration,
289 ) -> Result<(ProcedureId, Option<Output>)> {
290 let context = self.create_context();
291
292 let procedure = self
293 .repartition_procedure_factory
294 .create(
295 &context,
296 table_name,
297 table_id,
298 from_partition_exprs,
299 into_partition_exprs,
300 Some(timeout),
301 )
302 .context(CreateRepartitionProcedureSnafu)?;
303 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
304 if wait {
305 self.execute_procedure_and_wait(procedure_with_id).await
306 } else {
307 self.submit_procedure(procedure_with_id)
308 .await
309 .map(|p| (p, None))
310 }
311 }
312
313 #[tracing::instrument(skip_all)]
315 pub async fn submit_alter_table_task(
316 &self,
317 table_id: TableId,
318 alter_table_task: AlterTableTask,
319 ddl_options: DdlOptions,
320 ) -> Result<(ProcedureId, Option<Output>)> {
321 let mut alter_table_task = alter_table_task;
323 if let Some(Kind::Repartition(_)) = alter_table_task.alter_table.kind.as_ref()
324 && let Kind::Repartition(repartition) =
325 alter_table_task.alter_table.kind.take().unwrap()
326 {
327 let table_name = TableName::new(
328 alter_table_task.alter_table.catalog_name,
329 alter_table_task.alter_table.schema_name,
330 alter_table_task.alter_table.table_name,
331 );
332 return self
333 .submit_repartition_task(
334 table_id,
335 table_name,
336 repartition,
337 ddl_options.wait,
338 ddl_options.timeout,
339 )
340 .await;
341 }
342
343 let context = self.create_context();
344 let procedure = AlterTableProcedure::new(table_id, alter_table_task, context)?;
345
346 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
347
348 self.execute_procedure_and_wait(procedure_with_id).await
349 }
350
351 #[tracing::instrument(skip_all)]
353 pub async fn submit_create_table_task(
354 &self,
355 create_table_task: CreateTableTask,
356 ) -> Result<(ProcedureId, Option<Output>)> {
357 let context = self.create_context();
358
359 let procedure = CreateTableProcedure::new(create_table_task, context)?;
360
361 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
362
363 self.execute_procedure_and_wait(procedure_with_id).await
364 }
365
366 #[tracing::instrument(skip_all)]
368 pub async fn submit_create_view_task(
369 &self,
370 create_view_task: CreateViewTask,
371 ) -> Result<(ProcedureId, Option<Output>)> {
372 let context = self.create_context();
373
374 let procedure = CreateViewProcedure::new(create_view_task, context);
375
376 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
377
378 self.execute_procedure_and_wait(procedure_with_id).await
379 }
380
381 #[tracing::instrument(skip_all)]
383 pub async fn submit_create_logical_table_tasks(
384 &self,
385 create_table_tasks: Vec<CreateTableTask>,
386 physical_table_id: TableId,
387 ) -> Result<(ProcedureId, Option<Output>)> {
388 let context = self.create_context();
389
390 let procedure =
391 CreateLogicalTablesProcedure::new(create_table_tasks, physical_table_id, context);
392
393 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
394
395 self.execute_procedure_and_wait(procedure_with_id).await
396 }
397
398 #[tracing::instrument(skip_all)]
400 pub async fn submit_alter_logical_table_tasks(
401 &self,
402 alter_table_tasks: Vec<AlterTableTask>,
403 physical_table_id: TableId,
404 ) -> Result<(ProcedureId, Option<Output>)> {
405 let context = self.create_context();
406
407 let procedure =
408 AlterLogicalTablesProcedure::new(alter_table_tasks, physical_table_id, context);
409
410 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
411
412 self.execute_procedure_and_wait(procedure_with_id).await
413 }
414
415 #[tracing::instrument(skip_all)]
417 pub async fn submit_drop_table_task(
418 &self,
419 drop_table_task: DropTableTask,
420 ) -> Result<(ProcedureId, Option<Output>)> {
421 let context = self.create_context();
422
423 let procedure = DropTableProcedure::new(drop_table_task, context);
424
425 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
426
427 self.execute_procedure_and_wait(procedure_with_id).await
428 }
429
430 #[tracing::instrument(skip_all)]
432 pub async fn submit_create_database(
433 &self,
434 CreateDatabaseTask {
435 catalog,
436 schema,
437 create_if_not_exists,
438 options,
439 }: CreateDatabaseTask,
440 ) -> Result<(ProcedureId, Option<Output>)> {
441 let context = self.create_context();
442 let procedure =
443 CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context);
444 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
445
446 self.execute_procedure_and_wait(procedure_with_id).await
447 }
448
449 #[tracing::instrument(skip_all)]
451 pub async fn submit_drop_database(
452 &self,
453 DropDatabaseTask {
454 catalog,
455 schema,
456 drop_if_exists,
457 }: DropDatabaseTask,
458 ) -> Result<(ProcedureId, Option<Output>)> {
459 let context = self.create_context();
460 let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context);
461 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
462
463 self.execute_procedure_and_wait(procedure_with_id).await
464 }
465
466 pub async fn submit_alter_database(
467 &self,
468 alter_database_task: AlterDatabaseTask,
469 ) -> Result<(ProcedureId, Option<Output>)> {
470 let context = self.create_context();
471 let procedure = AlterDatabaseProcedure::new(alter_database_task, context)?;
472 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
473
474 self.execute_procedure_and_wait(procedure_with_id).await
475 }
476
477 #[tracing::instrument(skip_all)]
479 pub async fn submit_create_flow_task(
480 &self,
481 create_flow: CreateFlowTask,
482 query_context: QueryContext,
483 ) -> Result<(ProcedureId, Option<Output>)> {
484 let context = self.create_context();
485 let procedure = CreateFlowProcedure::new(create_flow, query_context, context);
486 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
487
488 self.execute_procedure_and_wait(procedure_with_id).await
489 }
490
491 #[tracing::instrument(skip_all)]
493 pub async fn submit_drop_flow_task(
494 &self,
495 drop_flow: DropFlowTask,
496 ) -> Result<(ProcedureId, Option<Output>)> {
497 let context = self.create_context();
498 let procedure = DropFlowProcedure::new(drop_flow, context);
499 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
500
501 self.execute_procedure_and_wait(procedure_with_id).await
502 }
503
504 #[tracing::instrument(skip_all)]
506 pub async fn submit_drop_view_task(
507 &self,
508 drop_view: DropViewTask,
509 ) -> Result<(ProcedureId, Option<Output>)> {
510 let context = self.create_context();
511 let procedure = DropViewProcedure::new(drop_view, context);
512 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
513
514 self.execute_procedure_and_wait(procedure_with_id).await
515 }
516
517 #[tracing::instrument(skip_all)]
519 pub async fn submit_truncate_table_task(
520 &self,
521 truncate_table_task: TruncateTableTask,
522 table_info_value: DeserializedValueWithBytes<TableInfoValue>,
523 ) -> Result<(ProcedureId, Option<Output>)> {
524 let context = self.create_context();
525 let procedure = TruncateTableProcedure::new(truncate_table_task, table_info_value, context);
526
527 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
528
529 self.execute_procedure_and_wait(procedure_with_id).await
530 }
531
532 #[tracing::instrument(skip_all)]
534 pub async fn submit_comment_on_task(
535 &self,
536 comment_on_task: CommentOnTask,
537 ) -> Result<(ProcedureId, Option<Output>)> {
538 let context = self.create_context();
539 let procedure = CommentOnProcedure::new(comment_on_task, context);
540 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
541
542 self.execute_procedure_and_wait(procedure_with_id).await
543 }
544
545 async fn execute_procedure_and_wait(
547 &self,
548 procedure_with_id: ProcedureWithId,
549 ) -> Result<(ProcedureId, Option<Output>)> {
550 let procedure_id = procedure_with_id.id;
551
552 let mut watcher = self
553 .procedure_manager
554 .submit(procedure_with_id)
555 .await
556 .context(SubmitProcedureSnafu)?;
557
558 let output = watcher::wait(&mut watcher)
559 .await
560 .context(WaitProcedureSnafu)?;
561
562 Ok((procedure_id, output))
563 }
564
565 async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result<ProcedureId> {
567 let procedure_id = procedure_with_id.id;
568 let _ = self
569 .procedure_manager
570 .submit(procedure_with_id)
571 .await
572 .context(SubmitProcedureSnafu)?;
573
574 Ok(procedure_id)
575 }
576
577 pub async fn submit_ddl_task(
578 &self,
579 ctx: &ExecutorContext,
580 request: SubmitDdlTaskRequest,
581 ) -> Result<SubmitDdlTaskResponse> {
582 let span = ctx
583 .tracing_context
584 .as_ref()
585 .map(TracingContext::from_w3c)
586 .unwrap_or_else(TracingContext::from_current_span)
587 .attach(tracing::info_span!("DdlManager::submit_ddl_task"));
588 let ddl_options = DdlOptions {
589 wait: request.wait,
590 timeout: request.timeout,
591 };
592 async move {
593 debug!("Submitting Ddl task: {:?}", request.task);
594 match request.task {
595 CreateTable(create_table_task) => {
596 handle_create_table_task(self, create_table_task).await
597 }
598 DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await,
599 AlterTable(alter_table_task) => {
600 handle_alter_table_task(self, alter_table_task, ddl_options).await
601 }
602 TruncateTable(truncate_table_task) => {
603 handle_truncate_table_task(self, truncate_table_task).await
604 }
605 CreateLogicalTables(create_table_tasks) => {
606 handle_create_logical_table_tasks(self, create_table_tasks).await
607 }
608 AlterLogicalTables(alter_table_tasks) => {
609 handle_alter_logical_table_tasks(self, alter_table_tasks).await
610 }
611 DropLogicalTables(_) => todo!(),
612 CreateDatabase(create_database_task) => {
613 handle_create_database_task(self, create_database_task).await
614 }
615 DropDatabase(drop_database_task) => {
616 handle_drop_database_task(self, drop_database_task).await
617 }
618 AlterDatabase(alter_database_task) => {
619 handle_alter_database_task(self, alter_database_task).await
620 }
621 CreateFlow(create_flow_task) => {
622 handle_create_flow_task(self, create_flow_task, request.query_context).await
623 }
624 DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
625 CreateView(create_view_task) => {
626 handle_create_view_task(self, create_view_task).await
627 }
628 DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
629 CommentOn(comment_on_task) => handle_comment_on_task(self, comment_on_task).await,
630 #[cfg(feature = "enterprise")]
631 CreateTrigger(create_trigger_task) => {
632 handle_create_trigger_task(self, create_trigger_task, request.query_context)
633 .await
634 }
635 #[cfg(feature = "enterprise")]
636 DropTrigger(drop_trigger_task) => {
637 handle_drop_trigger_task(self, drop_trigger_task, request.query_context).await
638 }
639 }
640 }
641 .trace(span)
642 .await
643 }
644}
645
646async fn handle_truncate_table_task(
647 ddl_manager: &DdlManager,
648 truncate_table_task: TruncateTableTask,
649) -> Result<SubmitDdlTaskResponse> {
650 let table_id = truncate_table_task.table_id;
651 let table_metadata_manager = &ddl_manager.table_metadata_manager();
652 let table_ref = truncate_table_task.table_ref();
653
654 let table_info_value = table_metadata_manager
655 .table_info_manager()
656 .get(table_id)
657 .await?
658 .with_context(|| TableInfoNotFoundSnafu {
659 table: table_ref.to_string(),
660 })?;
661 let physical_table_id = table_metadata_manager
662 .table_route_manager()
663 .get_physical_table_id(table_id)
664 .await?;
665 ensure!(
666 physical_table_id == table_id,
667 error::UnexpectedSnafu {
668 err_msg: "Truncate table is only supported for physical tables."
669 }
670 );
671
672 let (id, _) = ddl_manager
673 .submit_truncate_table_task(truncate_table_task, table_info_value)
674 .await?;
675
676 info!("Table: {table_id} is truncated via procedure_id {id:?}");
677
678 Ok(SubmitDdlTaskResponse {
679 key: id.to_string().into(),
680 ..Default::default()
681 })
682}
683
684async fn handle_alter_table_task(
685 ddl_manager: &DdlManager,
686 alter_table_task: AlterTableTask,
687 ddl_options: DdlOptions,
688) -> Result<SubmitDdlTaskResponse> {
689 let table_ref = alter_table_task.table_ref();
690
691 let table_id = ddl_manager
692 .table_metadata_manager()
693 .table_name_manager()
694 .get(TableNameKey::new(
695 table_ref.catalog,
696 table_ref.schema,
697 table_ref.table,
698 ))
699 .await?
700 .with_context(|| TableNotFoundSnafu {
701 table_name: table_ref.to_string(),
702 })?
703 .table_id();
704
705 let table_route_value = ddl_manager
706 .table_metadata_manager()
707 .table_route_manager()
708 .table_route_storage()
709 .get(table_id)
710 .await?
711 .context(TableRouteNotFoundSnafu { table_id })?;
712 ensure!(
713 table_route_value.is_physical(),
714 UnexpectedLogicalRouteTableSnafu {
715 err_msg: format!("{:?} is a non-physical TableRouteValue.", table_ref),
716 }
717 );
718
719 let (id, _) = ddl_manager
720 .submit_alter_table_task(table_id, alter_table_task, ddl_options)
721 .await?;
722
723 info!("Table: {table_id} is altered via procedure_id {id:?}");
724
725 Ok(SubmitDdlTaskResponse {
726 key: id.to_string().into(),
727 ..Default::default()
728 })
729}
730
731async fn handle_drop_table_task(
732 ddl_manager: &DdlManager,
733 drop_table_task: DropTableTask,
734) -> Result<SubmitDdlTaskResponse> {
735 let table_id = drop_table_task.table_id;
736 let (id, _) = ddl_manager.submit_drop_table_task(drop_table_task).await?;
737
738 info!("Table: {table_id} is dropped via procedure_id {id:?}");
739
740 Ok(SubmitDdlTaskResponse {
741 key: id.to_string().into(),
742 ..Default::default()
743 })
744}
745
746async fn handle_create_table_task(
747 ddl_manager: &DdlManager,
748 create_table_task: CreateTableTask,
749) -> Result<SubmitDdlTaskResponse> {
750 let (id, output) = ddl_manager
751 .submit_create_table_task(create_table_task)
752 .await?;
753
754 let procedure_id = id.to_string();
755 let output = output.context(ProcedureOutputSnafu {
756 procedure_id: &procedure_id,
757 err_msg: "empty output",
758 })?;
759 let table_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
760 procedure_id: &procedure_id,
761 err_msg: "downcast to `u32`",
762 })?);
763 info!("Table: {table_id} is created via procedure_id {id:?}");
764
765 Ok(SubmitDdlTaskResponse {
766 key: procedure_id.into(),
767 table_ids: vec![table_id],
768 })
769}
770
771async fn handle_create_logical_table_tasks(
772 ddl_manager: &DdlManager,
773 create_table_tasks: Vec<CreateTableTask>,
774) -> Result<SubmitDdlTaskResponse> {
775 ensure!(
776 !create_table_tasks.is_empty(),
777 EmptyDdlTasksSnafu {
778 name: "create logical tables"
779 }
780 );
781 let physical_table_id = utils::check_and_get_physical_table_id(
782 ddl_manager.table_metadata_manager(),
783 &create_table_tasks,
784 )
785 .await?;
786 let num_logical_tables = create_table_tasks.len();
787
788 let (id, output) = ddl_manager
789 .submit_create_logical_table_tasks(create_table_tasks, physical_table_id)
790 .await?;
791
792 info!(
793 "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is created via procedure_id {id:?}"
794 );
795
796 let procedure_id = id.to_string();
797 let output = output.context(ProcedureOutputSnafu {
798 procedure_id: &procedure_id,
799 err_msg: "empty output",
800 })?;
801 let table_ids = output
802 .downcast_ref::<Vec<TableId>>()
803 .context(ProcedureOutputSnafu {
804 procedure_id: &procedure_id,
805 err_msg: "downcast to `Vec<TableId>`",
806 })?
807 .clone();
808
809 Ok(SubmitDdlTaskResponse {
810 key: procedure_id.into(),
811 table_ids,
812 })
813}
814
815async fn handle_create_database_task(
816 ddl_manager: &DdlManager,
817 create_database_task: CreateDatabaseTask,
818) -> Result<SubmitDdlTaskResponse> {
819 let (id, _) = ddl_manager
820 .submit_create_database(create_database_task.clone())
821 .await?;
822
823 let procedure_id = id.to_string();
824 info!(
825 "Database {}.{} is created via procedure_id {id:?}",
826 create_database_task.catalog, create_database_task.schema
827 );
828
829 Ok(SubmitDdlTaskResponse {
830 key: procedure_id.into(),
831 ..Default::default()
832 })
833}
834
835async fn handle_drop_database_task(
836 ddl_manager: &DdlManager,
837 drop_database_task: DropDatabaseTask,
838) -> Result<SubmitDdlTaskResponse> {
839 let (id, _) = ddl_manager
840 .submit_drop_database(drop_database_task.clone())
841 .await?;
842
843 let procedure_id = id.to_string();
844 info!(
845 "Database {}.{} is dropped via procedure_id {id:?}",
846 drop_database_task.catalog, drop_database_task.schema
847 );
848
849 Ok(SubmitDdlTaskResponse {
850 key: procedure_id.into(),
851 ..Default::default()
852 })
853}
854
855async fn handle_alter_database_task(
856 ddl_manager: &DdlManager,
857 alter_database_task: AlterDatabaseTask,
858) -> Result<SubmitDdlTaskResponse> {
859 let (id, _) = ddl_manager
860 .submit_alter_database(alter_database_task.clone())
861 .await?;
862
863 let procedure_id = id.to_string();
864 info!(
865 "Database {}.{} is altered via procedure_id {id:?}",
866 alter_database_task.catalog(),
867 alter_database_task.schema()
868 );
869
870 Ok(SubmitDdlTaskResponse {
871 key: procedure_id.into(),
872 ..Default::default()
873 })
874}
875
876async fn handle_drop_flow_task(
877 ddl_manager: &DdlManager,
878 drop_flow_task: DropFlowTask,
879) -> Result<SubmitDdlTaskResponse> {
880 let (id, _) = ddl_manager
881 .submit_drop_flow_task(drop_flow_task.clone())
882 .await?;
883
884 let procedure_id = id.to_string();
885 info!(
886 "Flow {}.{}({}) is dropped via procedure_id {id:?}",
887 drop_flow_task.catalog_name, drop_flow_task.flow_name, drop_flow_task.flow_id,
888 );
889
890 Ok(SubmitDdlTaskResponse {
891 key: procedure_id.into(),
892 ..Default::default()
893 })
894}
895
896#[cfg(feature = "enterprise")]
897async fn handle_drop_trigger_task(
898 ddl_manager: &DdlManager,
899 drop_trigger_task: DropTriggerTask,
900 query_context: QueryContext,
901) -> Result<SubmitDdlTaskResponse> {
902 let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
903 use crate::error::UnsupportedSnafu;
904
905 return UnsupportedSnafu {
906 operation: "drop trigger",
907 }
908 .fail();
909 };
910
911 m.drop_trigger(
912 drop_trigger_task,
913 ddl_manager.procedure_manager.clone(),
914 ddl_manager.ddl_context.clone(),
915 query_context,
916 )
917 .await
918}
919
920async fn handle_drop_view_task(
921 ddl_manager: &DdlManager,
922 drop_view_task: DropViewTask,
923) -> Result<SubmitDdlTaskResponse> {
924 let (id, _) = ddl_manager
925 .submit_drop_view_task(drop_view_task.clone())
926 .await?;
927
928 let procedure_id = id.to_string();
929 info!(
930 "View {}({}) is dropped via procedure_id {id:?}",
931 drop_view_task.table_ref(),
932 drop_view_task.view_id,
933 );
934
935 Ok(SubmitDdlTaskResponse {
936 key: procedure_id.into(),
937 ..Default::default()
938 })
939}
940
941async fn handle_create_flow_task(
942 ddl_manager: &DdlManager,
943 create_flow_task: CreateFlowTask,
944 query_context: QueryContext,
945) -> Result<SubmitDdlTaskResponse> {
946 let (id, output) = ddl_manager
947 .submit_create_flow_task(create_flow_task.clone(), query_context)
948 .await?;
949
950 let procedure_id = id.to_string();
951 let output = output.context(ProcedureOutputSnafu {
952 procedure_id: &procedure_id,
953 err_msg: "empty output",
954 })?;
955 let flow_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
956 procedure_id: &procedure_id,
957 err_msg: "downcast to `u32`",
958 })?);
959 if !create_flow_task.or_replace {
960 info!(
961 "Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
962 create_flow_task.catalog_name, create_flow_task.flow_name,
963 );
964 } else {
965 info!(
966 "Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
967 create_flow_task.catalog_name, create_flow_task.flow_name,
968 );
969 }
970
971 Ok(SubmitDdlTaskResponse {
972 key: procedure_id.into(),
973 ..Default::default()
974 })
975}
976
977#[cfg(feature = "enterprise")]
978async fn handle_create_trigger_task(
979 ddl_manager: &DdlManager,
980 create_trigger_task: CreateTriggerTask,
981 query_context: QueryContext,
982) -> Result<SubmitDdlTaskResponse> {
983 let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
984 use crate::error::UnsupportedSnafu;
985
986 return UnsupportedSnafu {
987 operation: "create trigger",
988 }
989 .fail();
990 };
991
992 m.create_trigger(
993 create_trigger_task,
994 ddl_manager.procedure_manager.clone(),
995 ddl_manager.ddl_context.clone(),
996 query_context,
997 )
998 .await
999}
1000
1001async fn handle_alter_logical_table_tasks(
1002 ddl_manager: &DdlManager,
1003 alter_table_tasks: Vec<AlterTableTask>,
1004) -> Result<SubmitDdlTaskResponse> {
1005 ensure!(
1006 !alter_table_tasks.is_empty(),
1007 EmptyDdlTasksSnafu {
1008 name: "alter logical tables"
1009 }
1010 );
1011
1012 let first_table = TableNameKey {
1014 catalog: &alter_table_tasks[0].alter_table.catalog_name,
1015 schema: &alter_table_tasks[0].alter_table.schema_name,
1016 table: &alter_table_tasks[0].alter_table.table_name,
1017 };
1018 let physical_table_id =
1019 utils::get_physical_table_id(ddl_manager.table_metadata_manager(), first_table).await?;
1020 let num_logical_tables = alter_table_tasks.len();
1021
1022 let (id, _) = ddl_manager
1023 .submit_alter_logical_table_tasks(alter_table_tasks, physical_table_id)
1024 .await?;
1025
1026 info!(
1027 "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is altered via procedure_id {id:?}"
1028 );
1029
1030 let procedure_id = id.to_string();
1031
1032 Ok(SubmitDdlTaskResponse {
1033 key: procedure_id.into(),
1034 ..Default::default()
1035 })
1036}
1037
1038async fn handle_create_view_task(
1040 ddl_manager: &DdlManager,
1041 create_view_task: CreateViewTask,
1042) -> Result<SubmitDdlTaskResponse> {
1043 let (id, output) = ddl_manager
1044 .submit_create_view_task(create_view_task)
1045 .await?;
1046
1047 let procedure_id = id.to_string();
1048 let output = output.context(ProcedureOutputSnafu {
1049 procedure_id: &procedure_id,
1050 err_msg: "empty output",
1051 })?;
1052 let view_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
1053 procedure_id: &procedure_id,
1054 err_msg: "downcast to `u32`",
1055 })?);
1056 info!("View: {view_id} is created via procedure_id {id:?}");
1057
1058 Ok(SubmitDdlTaskResponse {
1059 key: procedure_id.into(),
1060 table_ids: vec![view_id],
1061 })
1062}
1063
1064async fn handle_comment_on_task(
1065 ddl_manager: &DdlManager,
1066 comment_on_task: CommentOnTask,
1067) -> Result<SubmitDdlTaskResponse> {
1068 let (id, _) = ddl_manager
1069 .submit_comment_on_task(comment_on_task.clone())
1070 .await?;
1071
1072 let procedure_id = id.to_string();
1073 info!(
1074 "Comment on {}.{}.{} is updated via procedure_id {id:?}",
1075 comment_on_task.catalog_name, comment_on_task.schema_name, comment_on_task.object_name
1076 );
1077
1078 Ok(SubmitDdlTaskResponse {
1079 key: procedure_id.into(),
1080 ..Default::default()
1081 })
1082}
1083
1084#[cfg(test)]
1085mod tests {
1086 use std::sync::Arc;
1087 use std::time::Duration;
1088
1089 use common_error::ext::BoxedError;
1090 use common_procedure::local::LocalManager;
1091 use common_procedure::test_util::InMemoryPoisonStore;
1092 use common_procedure::{BoxedProcedure, ProcedureManagerRef};
1093 use store_api::storage::TableId;
1094 use table::table_name::TableName;
1095
1096 use super::DdlManager;
1097 use crate::cache_invalidator::DummyCacheInvalidator;
1098 use crate::ddl::alter_table::AlterTableProcedure;
1099 use crate::ddl::create_table::CreateTableProcedure;
1100 use crate::ddl::drop_table::DropTableProcedure;
1101 use crate::ddl::flow_meta::FlowMetadataAllocator;
1102 use crate::ddl::table_meta::TableMetadataAllocator;
1103 use crate::ddl::truncate_table::TruncateTableProcedure;
1104 use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
1105 use crate::ddl_manager::RepartitionProcedureFactory;
1106 use crate::key::TableMetadataManager;
1107 use crate::key::flow::FlowMetadataManager;
1108 use crate::kv_backend::memory::MemoryKvBackend;
1109 use crate::node_manager::{DatanodeManager, DatanodeRef, FlownodeManager, FlownodeRef};
1110 use crate::peer::Peer;
1111 use crate::region_keeper::MemoryRegionKeeper;
1112 use crate::region_registry::LeaderRegionRegistry;
1113 use crate::sequence::SequenceBuilder;
1114 use crate::state_store::KvStateStore;
1115 use crate::wal_provider::WalProvider;
1116
1117 pub struct DummyDatanodeManager;
1119
1120 #[async_trait::async_trait]
1121 impl DatanodeManager for DummyDatanodeManager {
1122 async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
1123 unimplemented!()
1124 }
1125 }
1126
1127 #[async_trait::async_trait]
1128 impl FlownodeManager for DummyDatanodeManager {
1129 async fn flownode(&self, _node: &Peer) -> FlownodeRef {
1130 unimplemented!()
1131 }
1132 }
1133
1134 struct DummyRepartitionProcedureFactory;
1135
1136 #[async_trait::async_trait]
1137 impl RepartitionProcedureFactory for DummyRepartitionProcedureFactory {
1138 fn create(
1139 &self,
1140 _ddl_ctx: &DdlContext,
1141 _table_name: TableName,
1142 _table_id: TableId,
1143 _from_exprs: Vec<String>,
1144 _to_exprs: Vec<String>,
1145 _timeout: Option<Duration>,
1146 ) -> std::result::Result<BoxedProcedure, BoxedError> {
1147 unimplemented!()
1148 }
1149
1150 fn register_loaders(
1151 &self,
1152 _ddl_ctx: &DdlContext,
1153 _procedure_manager: &ProcedureManagerRef,
1154 ) -> std::result::Result<(), BoxedError> {
1155 Ok(())
1156 }
1157 }
1158
1159 #[test]
1160 fn test_try_new() {
1161 let kv_backend = Arc::new(MemoryKvBackend::new());
1162 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
1163 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
1164 Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
1165 Arc::new(WalProvider::default()),
1166 ));
1167 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
1168 let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
1169 Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()),
1170 ));
1171
1172 let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
1173 let poison_manager = Arc::new(InMemoryPoisonStore::default());
1174 let procedure_manager = Arc::new(LocalManager::new(
1175 Default::default(),
1176 state_store,
1177 poison_manager,
1178 None,
1179 None,
1180 ));
1181
1182 let _ = DdlManager::try_new(
1183 DdlContext {
1184 node_manager: Arc::new(DummyDatanodeManager),
1185 cache_invalidator: Arc::new(DummyCacheInvalidator),
1186 table_metadata_manager,
1187 table_metadata_allocator,
1188 flow_metadata_manager,
1189 flow_metadata_allocator,
1190 memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
1191 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
1192 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
1193 },
1194 procedure_manager.clone(),
1195 Arc::new(DummyRepartitionProcedureFactory),
1196 true,
1197 );
1198
1199 let expected_loaders = vec![
1200 CreateTableProcedure::TYPE_NAME,
1201 AlterTableProcedure::TYPE_NAME,
1202 DropTableProcedure::TYPE_NAME,
1203 TruncateTableProcedure::TYPE_NAME,
1204 ];
1205
1206 for loader in expected_loaders {
1207 assert!(procedure_manager.contains_loader(loader));
1208 }
1209 }
1210}