1use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::Repartition;
19use api::v1::alter_table_expr::Kind;
20use common_error::ext::BoxedError;
21use common_procedure::{
22 BoxedProcedure, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef,
23 ProcedureWithId, watcher,
24};
25use common_telemetry::tracing_context::{FutureExt, TracingContext};
26use common_telemetry::{debug, info, tracing};
27use derive_builder::Builder;
28use snafu::{OptionExt, ResultExt, ensure};
29use store_api::storage::TableId;
30use table::table_name::TableName;
31
32use crate::ddl::alter_database::AlterDatabaseProcedure;
33use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
34use crate::ddl::alter_table::AlterTableProcedure;
35use crate::ddl::comment_on::CommentOnProcedure;
36use crate::ddl::create_database::CreateDatabaseProcedure;
37use crate::ddl::create_flow::CreateFlowProcedure;
38use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
39use crate::ddl::create_table::CreateTableProcedure;
40use crate::ddl::create_view::CreateViewProcedure;
41use crate::ddl::drop_database::DropDatabaseProcedure;
42use crate::ddl::drop_flow::DropFlowProcedure;
43use crate::ddl::drop_table::DropTableProcedure;
44use crate::ddl::drop_view::DropViewProcedure;
45use crate::ddl::truncate_table::TruncateTableProcedure;
46use crate::ddl::{DdlContext, utils};
47use crate::error::{
48 self, CreateRepartitionProcedureSnafu, EmptyDdlTasksSnafu, ProcedureOutputSnafu,
49 RegisterProcedureLoaderSnafu, RegisterRepartitionProcedureLoaderSnafu, Result,
50 SubmitProcedureSnafu, TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
51 UnexpectedLogicalRouteTableSnafu, WaitProcedureSnafu,
52};
53use crate::key::table_info::TableInfoValue;
54use crate::key::table_name::TableNameKey;
55use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
56use crate::procedure_executor::ExecutorContext;
57#[cfg(feature = "enterprise")]
58use crate::rpc::ddl::DdlTask::CreateTrigger;
59#[cfg(feature = "enterprise")]
60use crate::rpc::ddl::DdlTask::DropTrigger;
61use crate::rpc::ddl::DdlTask::{
62 AlterDatabase, AlterLogicalTables, AlterTable, CommentOn, CreateDatabase, CreateFlow,
63 CreateLogicalTables, CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables,
64 DropTable, DropView, TruncateTable,
65};
66#[cfg(feature = "enterprise")]
67use crate::rpc::ddl::trigger::CreateTriggerTask;
68#[cfg(feature = "enterprise")]
69use crate::rpc::ddl::trigger::DropTriggerTask;
70use crate::rpc::ddl::{
71 AlterDatabaseTask, AlterTableTask, CommentOnTask, CreateDatabaseTask, CreateFlowTask,
72 CreateTableTask, CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask,
73 QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
74};
75
76#[async_trait::async_trait]
78pub trait DdlManagerConfigurator<C>: Send + Sync {
79 async fn configure(
81 &self,
82 ddl_manager: DdlManager,
83 ctx: C,
84 ) -> std::result::Result<DdlManager, BoxedError>;
85}
86
87pub type DdlManagerConfiguratorRef<C> = Arc<dyn DdlManagerConfigurator<C>>;
88
89pub type DdlManagerRef = Arc<DdlManager>;
90
91pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader;
92
93#[derive(Builder)]
95pub struct DdlManager {
96 ddl_context: DdlContext,
97 procedure_manager: ProcedureManagerRef,
98 repartition_procedure_factory: RepartitionProcedureFactoryRef,
99 #[cfg(feature = "enterprise")]
100 trigger_ddl_manager: Option<TriggerDdlManagerRef>,
101}
102
103#[cfg(feature = "enterprise")]
106#[async_trait::async_trait]
107pub trait TriggerDdlManager: Send + Sync {
108 async fn create_trigger(
109 &self,
110 create_trigger_task: CreateTriggerTask,
111 procedure_manager: ProcedureManagerRef,
112 ddl_context: DdlContext,
113 query_context: QueryContext,
114 ) -> Result<SubmitDdlTaskResponse>;
115
116 async fn drop_trigger(
117 &self,
118 drop_trigger_task: DropTriggerTask,
119 procedure_manager: ProcedureManagerRef,
120 ddl_context: DdlContext,
121 query_context: QueryContext,
122 ) -> Result<SubmitDdlTaskResponse>;
123
124 fn as_any(&self) -> &dyn std::any::Any;
125}
126
127#[cfg(feature = "enterprise")]
128pub type TriggerDdlManagerRef = Arc<dyn TriggerDdlManager>;
129
130macro_rules! procedure_loader_entry {
131 ($procedure:ident) => {
132 (
133 $procedure::TYPE_NAME,
134 &|context: DdlContext| -> BoxedProcedureLoader {
135 Box::new(move |json: &str| {
136 let context = context.clone();
137 $procedure::from_json(json, context).map(|p| Box::new(p) as _)
138 })
139 },
140 )
141 };
142}
143
144macro_rules! procedure_loader {
145 ($($procedure:ident),*) => {
146 vec![
147 $(procedure_loader_entry!($procedure)),*
148 ]
149 };
150}
151
152pub type RepartitionProcedureFactoryRef = Arc<dyn RepartitionProcedureFactory>;
153
154pub trait RepartitionProcedureFactory: Send + Sync {
155 fn create(
156 &self,
157 ddl_ctx: &DdlContext,
158 table_name: TableName,
159 table_id: TableId,
160 from_exprs: Vec<String>,
161 to_exprs: Vec<String>,
162 timeout: Option<Duration>,
163 ) -> std::result::Result<BoxedProcedure, BoxedError>;
164
165 fn register_loaders(
166 &self,
167 ddl_ctx: &DdlContext,
168 procedure_manager: &ProcedureManagerRef,
169 ) -> std::result::Result<(), BoxedError>;
170}
171
172#[derive(Debug, Clone, Copy)]
177pub struct DdlOptions {
178 pub timeout: Duration,
182 pub wait: bool,
189}
190
191impl DdlManager {
192 pub fn try_new(
194 ddl_context: DdlContext,
195 procedure_manager: ProcedureManagerRef,
196 repartition_procedure_factory: RepartitionProcedureFactoryRef,
197 register_loaders: bool,
198 ) -> Result<Self> {
199 let manager = Self {
200 ddl_context,
201 procedure_manager,
202 repartition_procedure_factory,
203 #[cfg(feature = "enterprise")]
204 trigger_ddl_manager: None,
205 };
206 if register_loaders {
207 manager.register_loaders()?;
208 }
209 Ok(manager)
210 }
211
212 #[cfg(feature = "enterprise")]
213 pub fn with_trigger_ddl_manager(mut self, trigger_ddl_manager: TriggerDdlManagerRef) -> Self {
214 self.trigger_ddl_manager = Some(trigger_ddl_manager);
215 self
216 }
217
218 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
220 &self.ddl_context.table_metadata_manager
221 }
222
223 pub fn create_context(&self) -> DdlContext {
225 self.ddl_context.clone()
226 }
227
228 pub fn register_loaders(&self) -> Result<()> {
230 let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = procedure_loader!(
231 CreateTableProcedure,
232 CreateLogicalTablesProcedure,
233 CreateViewProcedure,
234 CreateFlowProcedure,
235 AlterTableProcedure,
236 AlterLogicalTablesProcedure,
237 AlterDatabaseProcedure,
238 DropTableProcedure,
239 DropFlowProcedure,
240 TruncateTableProcedure,
241 CreateDatabaseProcedure,
242 DropDatabaseProcedure,
243 DropViewProcedure,
244 CommentOnProcedure
245 );
246
247 for (type_name, loader_factory) in loaders {
248 let context = self.create_context();
249 self.procedure_manager
250 .register_loader(type_name, loader_factory(context))
251 .context(RegisterProcedureLoaderSnafu { type_name })?;
252 }
253
254 self.repartition_procedure_factory
255 .register_loaders(&self.ddl_context, &self.procedure_manager)
256 .context(RegisterRepartitionProcedureLoaderSnafu)?;
257
258 Ok(())
259 }
260
261 async fn submit_repartition_task(
280 &self,
281 table_id: TableId,
282 table_name: TableName,
283 Repartition {
284 from_partition_exprs,
285 into_partition_exprs,
286 }: Repartition,
287 wait: bool,
288 timeout: Duration,
289 ) -> Result<(ProcedureId, Option<Output>)> {
290 let context = self.create_context();
291
292 let procedure = self
293 .repartition_procedure_factory
294 .create(
295 &context,
296 table_name,
297 table_id,
298 from_partition_exprs,
299 into_partition_exprs,
300 Some(timeout),
301 )
302 .context(CreateRepartitionProcedureSnafu)?;
303 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
304 if wait {
305 self.execute_procedure_and_wait(procedure_with_id).await
306 } else {
307 self.submit_procedure(procedure_with_id)
308 .await
309 .map(|p| (p, None))
310 }
311 }
312
313 #[tracing::instrument(skip_all)]
315 pub async fn submit_alter_table_task(
316 &self,
317 table_id: TableId,
318 alter_table_task: AlterTableTask,
319 ddl_options: DdlOptions,
320 ) -> Result<(ProcedureId, Option<Output>)> {
321 let mut alter_table_task = alter_table_task;
323 if let Some(Kind::Repartition(_)) = alter_table_task.alter_table.kind.as_ref()
324 && let Kind::Repartition(repartition) =
325 alter_table_task.alter_table.kind.take().unwrap()
326 {
327 let table_name = TableName::new(
328 alter_table_task.alter_table.catalog_name,
329 alter_table_task.alter_table.schema_name,
330 alter_table_task.alter_table.table_name,
331 );
332 return self
333 .submit_repartition_task(
334 table_id,
335 table_name,
336 repartition,
337 ddl_options.wait,
338 ddl_options.timeout,
339 )
340 .await;
341 }
342
343 let context = self.create_context();
344 let procedure = AlterTableProcedure::new(table_id, alter_table_task, context)?;
345
346 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
347
348 self.execute_procedure_and_wait(procedure_with_id).await
349 }
350
351 #[tracing::instrument(skip_all)]
353 pub async fn submit_create_table_task(
354 &self,
355 create_table_task: CreateTableTask,
356 query_context: QueryContext,
357 ) -> Result<(ProcedureId, Option<Output>)> {
358 let context = self.create_context();
359
360 let procedure = CreateTableProcedure::new_with_query_context(
361 create_table_task,
362 query_context,
363 context,
364 )?;
365
366 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
367
368 self.execute_procedure_and_wait(procedure_with_id).await
369 }
370
371 #[tracing::instrument(skip_all)]
373 pub async fn submit_create_view_task(
374 &self,
375 create_view_task: CreateViewTask,
376 ) -> Result<(ProcedureId, Option<Output>)> {
377 let context = self.create_context();
378
379 let procedure = CreateViewProcedure::new(create_view_task, context);
380
381 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
382
383 self.execute_procedure_and_wait(procedure_with_id).await
384 }
385
386 #[tracing::instrument(skip_all)]
388 pub async fn submit_create_logical_table_tasks(
389 &self,
390 create_table_tasks: Vec<CreateTableTask>,
391 physical_table_id: TableId,
392 ) -> Result<(ProcedureId, Option<Output>)> {
393 let context = self.create_context();
394
395 let procedure =
396 CreateLogicalTablesProcedure::new(create_table_tasks, physical_table_id, context);
397
398 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
399
400 self.execute_procedure_and_wait(procedure_with_id).await
401 }
402
403 #[tracing::instrument(skip_all)]
405 pub async fn submit_alter_logical_table_tasks(
406 &self,
407 alter_table_tasks: Vec<AlterTableTask>,
408 physical_table_id: TableId,
409 ) -> Result<(ProcedureId, Option<Output>)> {
410 let context = self.create_context();
411
412 let procedure =
413 AlterLogicalTablesProcedure::new(alter_table_tasks, physical_table_id, context);
414
415 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
416
417 self.execute_procedure_and_wait(procedure_with_id).await
418 }
419
420 #[tracing::instrument(skip_all)]
422 pub async fn submit_drop_table_task(
423 &self,
424 drop_table_task: DropTableTask,
425 ) -> Result<(ProcedureId, Option<Output>)> {
426 let context = self.create_context();
427
428 let procedure = DropTableProcedure::new(drop_table_task, context);
429
430 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
431
432 self.execute_procedure_and_wait(procedure_with_id).await
433 }
434
435 #[tracing::instrument(skip_all)]
437 pub async fn submit_create_database(
438 &self,
439 CreateDatabaseTask {
440 catalog,
441 schema,
442 create_if_not_exists,
443 options,
444 }: CreateDatabaseTask,
445 ) -> Result<(ProcedureId, Option<Output>)> {
446 let context = self.create_context();
447 let procedure =
448 CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context);
449 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
450
451 self.execute_procedure_and_wait(procedure_with_id).await
452 }
453
454 #[tracing::instrument(skip_all)]
456 pub async fn submit_drop_database(
457 &self,
458 DropDatabaseTask {
459 catalog,
460 schema,
461 drop_if_exists,
462 }: DropDatabaseTask,
463 ) -> Result<(ProcedureId, Option<Output>)> {
464 let context = self.create_context();
465 let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context);
466 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
467
468 self.execute_procedure_and_wait(procedure_with_id).await
469 }
470
471 pub async fn submit_alter_database(
472 &self,
473 alter_database_task: AlterDatabaseTask,
474 ) -> Result<(ProcedureId, Option<Output>)> {
475 let context = self.create_context();
476 let procedure = AlterDatabaseProcedure::new(alter_database_task, context)?;
477 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
478
479 self.execute_procedure_and_wait(procedure_with_id).await
480 }
481
482 #[tracing::instrument(skip_all)]
484 pub async fn submit_create_flow_task(
485 &self,
486 create_flow: CreateFlowTask,
487 query_context: QueryContext,
488 ) -> Result<(ProcedureId, Option<Output>)> {
489 let context = self.create_context();
490 let procedure = CreateFlowProcedure::new(create_flow, query_context, context);
491 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
492
493 self.execute_procedure_and_wait(procedure_with_id).await
494 }
495
496 #[tracing::instrument(skip_all)]
498 pub async fn submit_drop_flow_task(
499 &self,
500 drop_flow: DropFlowTask,
501 ) -> Result<(ProcedureId, Option<Output>)> {
502 let context = self.create_context();
503 let procedure = DropFlowProcedure::new(drop_flow, context);
504 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
505
506 self.execute_procedure_and_wait(procedure_with_id).await
507 }
508
509 #[tracing::instrument(skip_all)]
511 pub async fn submit_drop_view_task(
512 &self,
513 drop_view: DropViewTask,
514 ) -> Result<(ProcedureId, Option<Output>)> {
515 let context = self.create_context();
516 let procedure = DropViewProcedure::new(drop_view, context);
517 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
518
519 self.execute_procedure_and_wait(procedure_with_id).await
520 }
521
522 #[tracing::instrument(skip_all)]
524 pub async fn submit_truncate_table_task(
525 &self,
526 truncate_table_task: TruncateTableTask,
527 table_info_value: DeserializedValueWithBytes<TableInfoValue>,
528 ) -> Result<(ProcedureId, Option<Output>)> {
529 let context = self.create_context();
530 let procedure = TruncateTableProcedure::new(truncate_table_task, table_info_value, context);
531
532 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
533
534 self.execute_procedure_and_wait(procedure_with_id).await
535 }
536
537 #[tracing::instrument(skip_all)]
539 pub async fn submit_comment_on_task(
540 &self,
541 comment_on_task: CommentOnTask,
542 ) -> Result<(ProcedureId, Option<Output>)> {
543 let context = self.create_context();
544 let procedure = CommentOnProcedure::new(comment_on_task, context);
545 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
546
547 self.execute_procedure_and_wait(procedure_with_id).await
548 }
549
550 async fn execute_procedure_and_wait(
552 &self,
553 procedure_with_id: ProcedureWithId,
554 ) -> Result<(ProcedureId, Option<Output>)> {
555 let procedure_id = procedure_with_id.id;
556
557 let mut watcher = self
558 .procedure_manager
559 .submit(procedure_with_id)
560 .await
561 .context(SubmitProcedureSnafu)?;
562
563 let output = watcher::wait(&mut watcher)
564 .await
565 .context(WaitProcedureSnafu)?;
566
567 Ok((procedure_id, output))
568 }
569
570 async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result<ProcedureId> {
572 let procedure_id = procedure_with_id.id;
573 let _ = self
574 .procedure_manager
575 .submit(procedure_with_id)
576 .await
577 .context(SubmitProcedureSnafu)?;
578
579 Ok(procedure_id)
580 }
581
582 pub async fn submit_ddl_task(
583 &self,
584 ctx: &ExecutorContext,
585 request: SubmitDdlTaskRequest,
586 ) -> Result<SubmitDdlTaskResponse> {
587 let span = ctx
588 .tracing_context
589 .as_ref()
590 .map(TracingContext::from_w3c)
591 .unwrap_or_else(TracingContext::from_current_span)
592 .attach(tracing::info_span!("DdlManager::submit_ddl_task"));
593 let ddl_options = DdlOptions {
594 wait: request.wait,
595 timeout: request.timeout,
596 };
597 async move {
598 debug!("Submitting Ddl task: {:?}", request.task);
599 match request.task {
600 CreateTable(create_table_task) => {
601 handle_create_table_task(self, create_table_task, request.query_context).await
602 }
603 DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await,
604 AlterTable(alter_table_task) => {
605 handle_alter_table_task(self, alter_table_task, ddl_options).await
606 }
607 TruncateTable(truncate_table_task) => {
608 handle_truncate_table_task(self, truncate_table_task).await
609 }
610 CreateLogicalTables(create_table_tasks) => {
611 handle_create_logical_table_tasks(self, create_table_tasks).await
612 }
613 AlterLogicalTables(alter_table_tasks) => {
614 handle_alter_logical_table_tasks(self, alter_table_tasks).await
615 }
616 DropLogicalTables(_) => todo!(),
617 CreateDatabase(create_database_task) => {
618 handle_create_database_task(self, create_database_task).await
619 }
620 DropDatabase(drop_database_task) => {
621 handle_drop_database_task(self, drop_database_task).await
622 }
623 AlterDatabase(alter_database_task) => {
624 handle_alter_database_task(self, alter_database_task).await
625 }
626 CreateFlow(create_flow_task) => {
627 handle_create_flow_task(self, create_flow_task, request.query_context).await
628 }
629 DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
630 CreateView(create_view_task) => {
631 handle_create_view_task(self, create_view_task).await
632 }
633 DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
634 CommentOn(comment_on_task) => handle_comment_on_task(self, comment_on_task).await,
635 #[cfg(feature = "enterprise")]
636 CreateTrigger(create_trigger_task) => {
637 handle_create_trigger_task(self, create_trigger_task, request.query_context)
638 .await
639 }
640 #[cfg(feature = "enterprise")]
641 DropTrigger(drop_trigger_task) => {
642 handle_drop_trigger_task(self, drop_trigger_task, request.query_context).await
643 }
644 }
645 }
646 .trace(span)
647 .await
648 }
649}
650
651async fn handle_truncate_table_task(
652 ddl_manager: &DdlManager,
653 truncate_table_task: TruncateTableTask,
654) -> Result<SubmitDdlTaskResponse> {
655 let table_id = truncate_table_task.table_id;
656 let table_metadata_manager = &ddl_manager.table_metadata_manager();
657 let table_ref = truncate_table_task.table_ref();
658
659 let table_info_value = table_metadata_manager
660 .table_info_manager()
661 .get(table_id)
662 .await?
663 .with_context(|| TableInfoNotFoundSnafu {
664 table: table_ref.to_string(),
665 })?;
666 let physical_table_id = table_metadata_manager
667 .table_route_manager()
668 .get_physical_table_id(table_id)
669 .await?;
670 ensure!(
671 physical_table_id == table_id,
672 error::UnexpectedSnafu {
673 err_msg: "Truncate table is only supported for physical tables."
674 }
675 );
676
677 let (id, _) = ddl_manager
678 .submit_truncate_table_task(truncate_table_task, table_info_value)
679 .await?;
680
681 info!("Table: {table_id} is truncated via procedure_id {id:?}");
682
683 Ok(SubmitDdlTaskResponse {
684 key: id.to_string().into(),
685 ..Default::default()
686 })
687}
688
689async fn handle_alter_table_task(
690 ddl_manager: &DdlManager,
691 alter_table_task: AlterTableTask,
692 ddl_options: DdlOptions,
693) -> Result<SubmitDdlTaskResponse> {
694 let table_ref = alter_table_task.table_ref();
695
696 let table_id = ddl_manager
697 .table_metadata_manager()
698 .table_name_manager()
699 .get(TableNameKey::new(
700 table_ref.catalog,
701 table_ref.schema,
702 table_ref.table,
703 ))
704 .await?
705 .with_context(|| TableNotFoundSnafu {
706 table_name: table_ref.to_string(),
707 })?
708 .table_id();
709
710 let table_route_value = ddl_manager
711 .table_metadata_manager()
712 .table_route_manager()
713 .table_route_storage()
714 .get(table_id)
715 .await?
716 .context(TableRouteNotFoundSnafu { table_id })?;
717 ensure!(
718 table_route_value.is_physical(),
719 UnexpectedLogicalRouteTableSnafu {
720 err_msg: format!("{:?} is a non-physical TableRouteValue.", table_ref),
721 }
722 );
723
724 let (id, _) = ddl_manager
725 .submit_alter_table_task(table_id, alter_table_task, ddl_options)
726 .await?;
727
728 info!("Table: {table_id} is altered via procedure_id {id:?}");
729
730 Ok(SubmitDdlTaskResponse {
731 key: id.to_string().into(),
732 ..Default::default()
733 })
734}
735
736async fn handle_drop_table_task(
737 ddl_manager: &DdlManager,
738 drop_table_task: DropTableTask,
739) -> Result<SubmitDdlTaskResponse> {
740 let table_id = drop_table_task.table_id;
741 let (id, _) = ddl_manager.submit_drop_table_task(drop_table_task).await?;
742
743 info!("Table: {table_id} is dropped via procedure_id {id:?}");
744
745 Ok(SubmitDdlTaskResponse {
746 key: id.to_string().into(),
747 ..Default::default()
748 })
749}
750
751async fn handle_create_table_task(
752 ddl_manager: &DdlManager,
753 create_table_task: CreateTableTask,
754 query_context: QueryContext,
755) -> Result<SubmitDdlTaskResponse> {
756 let (id, output) = ddl_manager
757 .submit_create_table_task(create_table_task, query_context)
758 .await?;
759
760 let procedure_id = id.to_string();
761 let output = output.context(ProcedureOutputSnafu {
762 procedure_id: &procedure_id,
763 err_msg: "empty output",
764 })?;
765 let table_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
766 procedure_id: &procedure_id,
767 err_msg: "downcast to `u32`",
768 })?);
769 info!("Table: {table_id} is created via procedure_id {id:?}");
770
771 Ok(SubmitDdlTaskResponse {
772 key: procedure_id.into(),
773 table_ids: vec![table_id],
774 })
775}
776
777async fn handle_create_logical_table_tasks(
778 ddl_manager: &DdlManager,
779 create_table_tasks: Vec<CreateTableTask>,
780) -> Result<SubmitDdlTaskResponse> {
781 ensure!(
782 !create_table_tasks.is_empty(),
783 EmptyDdlTasksSnafu {
784 name: "create logical tables"
785 }
786 );
787 let physical_table_id = utils::check_and_get_physical_table_id(
788 ddl_manager.table_metadata_manager(),
789 &create_table_tasks,
790 )
791 .await?;
792 let num_logical_tables = create_table_tasks.len();
793
794 let (id, output) = ddl_manager
795 .submit_create_logical_table_tasks(create_table_tasks, physical_table_id)
796 .await?;
797
798 info!(
799 "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is created via procedure_id {id:?}"
800 );
801
802 let procedure_id = id.to_string();
803 let output = output.context(ProcedureOutputSnafu {
804 procedure_id: &procedure_id,
805 err_msg: "empty output",
806 })?;
807 let table_ids = output
808 .downcast_ref::<Vec<TableId>>()
809 .context(ProcedureOutputSnafu {
810 procedure_id: &procedure_id,
811 err_msg: "downcast to `Vec<TableId>`",
812 })?
813 .clone();
814
815 Ok(SubmitDdlTaskResponse {
816 key: procedure_id.into(),
817 table_ids,
818 })
819}
820
821async fn handle_create_database_task(
822 ddl_manager: &DdlManager,
823 create_database_task: CreateDatabaseTask,
824) -> Result<SubmitDdlTaskResponse> {
825 let (id, _) = ddl_manager
826 .submit_create_database(create_database_task.clone())
827 .await?;
828
829 let procedure_id = id.to_string();
830 info!(
831 "Database {}.{} is created via procedure_id {id:?}",
832 create_database_task.catalog, create_database_task.schema
833 );
834
835 Ok(SubmitDdlTaskResponse {
836 key: procedure_id.into(),
837 ..Default::default()
838 })
839}
840
841async fn handle_drop_database_task(
842 ddl_manager: &DdlManager,
843 drop_database_task: DropDatabaseTask,
844) -> Result<SubmitDdlTaskResponse> {
845 let (id, _) = ddl_manager
846 .submit_drop_database(drop_database_task.clone())
847 .await?;
848
849 let procedure_id = id.to_string();
850 info!(
851 "Database {}.{} is dropped via procedure_id {id:?}",
852 drop_database_task.catalog, drop_database_task.schema
853 );
854
855 Ok(SubmitDdlTaskResponse {
856 key: procedure_id.into(),
857 ..Default::default()
858 })
859}
860
861async fn handle_alter_database_task(
862 ddl_manager: &DdlManager,
863 alter_database_task: AlterDatabaseTask,
864) -> Result<SubmitDdlTaskResponse> {
865 let (id, _) = ddl_manager
866 .submit_alter_database(alter_database_task.clone())
867 .await?;
868
869 let procedure_id = id.to_string();
870 info!(
871 "Database {}.{} is altered via procedure_id {id:?}",
872 alter_database_task.catalog(),
873 alter_database_task.schema()
874 );
875
876 Ok(SubmitDdlTaskResponse {
877 key: procedure_id.into(),
878 ..Default::default()
879 })
880}
881
882async fn handle_drop_flow_task(
883 ddl_manager: &DdlManager,
884 drop_flow_task: DropFlowTask,
885) -> Result<SubmitDdlTaskResponse> {
886 let (id, _) = ddl_manager
887 .submit_drop_flow_task(drop_flow_task.clone())
888 .await?;
889
890 let procedure_id = id.to_string();
891 info!(
892 "Flow {}.{}({}) is dropped via procedure_id {id:?}",
893 drop_flow_task.catalog_name, drop_flow_task.flow_name, drop_flow_task.flow_id,
894 );
895
896 Ok(SubmitDdlTaskResponse {
897 key: procedure_id.into(),
898 ..Default::default()
899 })
900}
901
902#[cfg(feature = "enterprise")]
903async fn handle_drop_trigger_task(
904 ddl_manager: &DdlManager,
905 drop_trigger_task: DropTriggerTask,
906 query_context: QueryContext,
907) -> Result<SubmitDdlTaskResponse> {
908 let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
909 use crate::error::UnsupportedSnafu;
910
911 return UnsupportedSnafu {
912 operation: "drop trigger",
913 }
914 .fail();
915 };
916
917 m.drop_trigger(
918 drop_trigger_task,
919 ddl_manager.procedure_manager.clone(),
920 ddl_manager.ddl_context.clone(),
921 query_context,
922 )
923 .await
924}
925
926async fn handle_drop_view_task(
927 ddl_manager: &DdlManager,
928 drop_view_task: DropViewTask,
929) -> Result<SubmitDdlTaskResponse> {
930 let (id, _) = ddl_manager
931 .submit_drop_view_task(drop_view_task.clone())
932 .await?;
933
934 let procedure_id = id.to_string();
935 info!(
936 "View {}({}) is dropped via procedure_id {id:?}",
937 drop_view_task.table_ref(),
938 drop_view_task.view_id,
939 );
940
941 Ok(SubmitDdlTaskResponse {
942 key: procedure_id.into(),
943 ..Default::default()
944 })
945}
946
947async fn handle_create_flow_task(
948 ddl_manager: &DdlManager,
949 create_flow_task: CreateFlowTask,
950 query_context: QueryContext,
951) -> Result<SubmitDdlTaskResponse> {
952 let (id, output) = ddl_manager
953 .submit_create_flow_task(create_flow_task.clone(), query_context)
954 .await?;
955
956 let procedure_id = id.to_string();
957 let output = output.context(ProcedureOutputSnafu {
958 procedure_id: &procedure_id,
959 err_msg: "empty output",
960 })?;
961 let flow_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
962 procedure_id: &procedure_id,
963 err_msg: "downcast to `u32`",
964 })?);
965 if !create_flow_task.or_replace {
966 info!(
967 "Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
968 create_flow_task.catalog_name, create_flow_task.flow_name,
969 );
970 } else {
971 info!(
972 "Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
973 create_flow_task.catalog_name, create_flow_task.flow_name,
974 );
975 }
976
977 Ok(SubmitDdlTaskResponse {
978 key: procedure_id.into(),
979 ..Default::default()
980 })
981}
982
983#[cfg(feature = "enterprise")]
984async fn handle_create_trigger_task(
985 ddl_manager: &DdlManager,
986 create_trigger_task: CreateTriggerTask,
987 query_context: QueryContext,
988) -> Result<SubmitDdlTaskResponse> {
989 let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
990 use crate::error::UnsupportedSnafu;
991
992 return UnsupportedSnafu {
993 operation: "create trigger",
994 }
995 .fail();
996 };
997
998 m.create_trigger(
999 create_trigger_task,
1000 ddl_manager.procedure_manager.clone(),
1001 ddl_manager.ddl_context.clone(),
1002 query_context,
1003 )
1004 .await
1005}
1006
1007async fn handle_alter_logical_table_tasks(
1008 ddl_manager: &DdlManager,
1009 alter_table_tasks: Vec<AlterTableTask>,
1010) -> Result<SubmitDdlTaskResponse> {
1011 ensure!(
1012 !alter_table_tasks.is_empty(),
1013 EmptyDdlTasksSnafu {
1014 name: "alter logical tables"
1015 }
1016 );
1017
1018 let first_table = TableNameKey {
1020 catalog: &alter_table_tasks[0].alter_table.catalog_name,
1021 schema: &alter_table_tasks[0].alter_table.schema_name,
1022 table: &alter_table_tasks[0].alter_table.table_name,
1023 };
1024 let physical_table_id =
1025 utils::get_physical_table_id(ddl_manager.table_metadata_manager(), first_table).await?;
1026 let num_logical_tables = alter_table_tasks.len();
1027
1028 let (id, _) = ddl_manager
1029 .submit_alter_logical_table_tasks(alter_table_tasks, physical_table_id)
1030 .await?;
1031
1032 info!(
1033 "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is altered via procedure_id {id:?}"
1034 );
1035
1036 let procedure_id = id.to_string();
1037
1038 Ok(SubmitDdlTaskResponse {
1039 key: procedure_id.into(),
1040 ..Default::default()
1041 })
1042}
1043
1044async fn handle_create_view_task(
1046 ddl_manager: &DdlManager,
1047 create_view_task: CreateViewTask,
1048) -> Result<SubmitDdlTaskResponse> {
1049 let (id, output) = ddl_manager
1050 .submit_create_view_task(create_view_task)
1051 .await?;
1052
1053 let procedure_id = id.to_string();
1054 let output = output.context(ProcedureOutputSnafu {
1055 procedure_id: &procedure_id,
1056 err_msg: "empty output",
1057 })?;
1058 let view_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
1059 procedure_id: &procedure_id,
1060 err_msg: "downcast to `u32`",
1061 })?);
1062 info!("View: {view_id} is created via procedure_id {id:?}");
1063
1064 Ok(SubmitDdlTaskResponse {
1065 key: procedure_id.into(),
1066 table_ids: vec![view_id],
1067 })
1068}
1069
1070async fn handle_comment_on_task(
1071 ddl_manager: &DdlManager,
1072 comment_on_task: CommentOnTask,
1073) -> Result<SubmitDdlTaskResponse> {
1074 let (id, _) = ddl_manager
1075 .submit_comment_on_task(comment_on_task.clone())
1076 .await?;
1077
1078 let procedure_id = id.to_string();
1079 info!(
1080 "Comment on {}.{}.{} is updated via procedure_id {id:?}",
1081 comment_on_task.catalog_name, comment_on_task.schema_name, comment_on_task.object_name
1082 );
1083
1084 Ok(SubmitDdlTaskResponse {
1085 key: procedure_id.into(),
1086 ..Default::default()
1087 })
1088}
1089
1090#[cfg(test)]
1091mod tests {
1092 use std::sync::Arc;
1093 use std::time::Duration;
1094
1095 use common_error::ext::BoxedError;
1096 use common_procedure::local::LocalManager;
1097 use common_procedure::test_util::InMemoryPoisonStore;
1098 use common_procedure::{BoxedProcedure, ProcedureManagerRef};
1099 use store_api::storage::TableId;
1100 use table::table_name::TableName;
1101
1102 use super::DdlManager;
1103 use crate::cache_invalidator::DummyCacheInvalidator;
1104 use crate::ddl::alter_table::AlterTableProcedure;
1105 use crate::ddl::create_table::CreateTableProcedure;
1106 use crate::ddl::drop_table::DropTableProcedure;
1107 use crate::ddl::flow_meta::FlowMetadataAllocator;
1108 use crate::ddl::table_meta::TableMetadataAllocator;
1109 use crate::ddl::truncate_table::TruncateTableProcedure;
1110 use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
1111 use crate::ddl_manager::RepartitionProcedureFactory;
1112 use crate::key::TableMetadataManager;
1113 use crate::key::flow::FlowMetadataManager;
1114 use crate::kv_backend::memory::MemoryKvBackend;
1115 use crate::node_manager::{DatanodeManager, DatanodeRef, FlownodeManager, FlownodeRef};
1116 use crate::peer::Peer;
1117 use crate::region_keeper::MemoryRegionKeeper;
1118 use crate::region_registry::LeaderRegionRegistry;
1119 use crate::sequence::SequenceBuilder;
1120 use crate::state_store::KvStateStore;
1121 use crate::wal_provider::WalProvider;
1122
1123 pub struct DummyDatanodeManager;
1125
1126 #[async_trait::async_trait]
1127 impl DatanodeManager for DummyDatanodeManager {
1128 async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
1129 unimplemented!()
1130 }
1131 }
1132
1133 #[async_trait::async_trait]
1134 impl FlownodeManager for DummyDatanodeManager {
1135 async fn flownode(&self, _node: &Peer) -> FlownodeRef {
1136 unimplemented!()
1137 }
1138 }
1139
1140 struct DummyRepartitionProcedureFactory;
1141
1142 #[async_trait::async_trait]
1143 impl RepartitionProcedureFactory for DummyRepartitionProcedureFactory {
1144 fn create(
1145 &self,
1146 _ddl_ctx: &DdlContext,
1147 _table_name: TableName,
1148 _table_id: TableId,
1149 _from_exprs: Vec<String>,
1150 _to_exprs: Vec<String>,
1151 _timeout: Option<Duration>,
1152 ) -> std::result::Result<BoxedProcedure, BoxedError> {
1153 unimplemented!()
1154 }
1155
1156 fn register_loaders(
1157 &self,
1158 _ddl_ctx: &DdlContext,
1159 _procedure_manager: &ProcedureManagerRef,
1160 ) -> std::result::Result<(), BoxedError> {
1161 Ok(())
1162 }
1163 }
1164
1165 #[test]
1166 fn test_try_new() {
1167 let kv_backend = Arc::new(MemoryKvBackend::new());
1168 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
1169 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
1170 Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
1171 Arc::new(WalProvider::default()),
1172 ));
1173 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
1174 let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
1175 Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()),
1176 ));
1177
1178 let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
1179 let poison_manager = Arc::new(InMemoryPoisonStore::default());
1180 let procedure_manager = Arc::new(LocalManager::new(
1181 Default::default(),
1182 state_store,
1183 poison_manager,
1184 None,
1185 None,
1186 ));
1187
1188 let _ = DdlManager::try_new(
1189 DdlContext {
1190 node_manager: Arc::new(DummyDatanodeManager),
1191 cache_invalidator: Arc::new(DummyCacheInvalidator),
1192 table_metadata_manager,
1193 table_metadata_allocator,
1194 flow_metadata_manager,
1195 flow_metadata_allocator,
1196 memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
1197 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
1198 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
1199 },
1200 procedure_manager.clone(),
1201 Arc::new(DummyRepartitionProcedureFactory),
1202 true,
1203 );
1204
1205 let expected_loaders = vec![
1206 CreateTableProcedure::TYPE_NAME,
1207 AlterTableProcedure::TYPE_NAME,
1208 DropTableProcedure::TYPE_NAME,
1209 TruncateTableProcedure::TYPE_NAME,
1210 ];
1211
1212 for loader in expected_loaders {
1213 assert!(procedure_manager.contains_loader(loader));
1214 }
1215 }
1216}