1use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::Repartition;
19use api::v1::alter_table_expr::Kind;
20use api::v1::repartition::Source as PbRepartitionSource;
21use common_error::ext::BoxedError;
22use common_procedure::{
23 BoxedProcedure, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef,
24 ProcedureWithId, watcher,
25};
26use common_telemetry::tracing_context::{FutureExt, TracingContext};
27use common_telemetry::{debug, info, tracing};
28use derive_builder::Builder;
29use snafu::{OptionExt, ResultExt, ensure};
30use store_api::storage::TableId;
31use table::table_name::TableName;
32
33use crate::ddl::alter_database::AlterDatabaseProcedure;
34use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
35use crate::ddl::alter_table::AlterTableProcedure;
36use crate::ddl::comment_on::CommentOnProcedure;
37use crate::ddl::create_database::CreateDatabaseProcedure;
38use crate::ddl::create_flow::CreateFlowProcedure;
39use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
40use crate::ddl::create_table::CreateTableProcedure;
41use crate::ddl::create_view::CreateViewProcedure;
42use crate::ddl::drop_database::DropDatabaseProcedure;
43use crate::ddl::drop_flow::DropFlowProcedure;
44use crate::ddl::drop_table::DropTableProcedure;
45use crate::ddl::drop_view::DropViewProcedure;
46use crate::ddl::truncate_table::TruncateTableProcedure;
47use crate::ddl::{DdlContext, utils};
48use crate::error::{
49 self, CreateRepartitionProcedureSnafu, EmptyDdlTasksSnafu, ProcedureOutputSnafu,
50 RegisterProcedureLoaderSnafu, RegisterRepartitionProcedureLoaderSnafu, Result,
51 SubmitProcedureSnafu, TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
52 UnexpectedLogicalRouteTableSnafu, WaitProcedureSnafu,
53};
54use crate::key::table_info::TableInfoValue;
55use crate::key::table_name::TableNameKey;
56use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
57use crate::procedure_executor::ExecutorContext;
58#[cfg(feature = "enterprise")]
59use crate::rpc::ddl::DdlTask::CreateTrigger;
60#[cfg(feature = "enterprise")]
61use crate::rpc::ddl::DdlTask::DropTrigger;
62use crate::rpc::ddl::DdlTask::{
63 AlterDatabase, AlterLogicalTables, AlterTable, CommentOn, CreateDatabase, CreateFlow,
64 CreateLogicalTables, CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables,
65 DropTable, DropView, TruncateTable,
66};
67#[cfg(feature = "enterprise")]
68use crate::rpc::ddl::trigger::CreateTriggerTask;
69#[cfg(feature = "enterprise")]
70use crate::rpc::ddl::trigger::DropTriggerTask;
71use crate::rpc::ddl::{
72 AlterDatabaseTask, AlterTableTask, CommentOnTask, CreateDatabaseTask, CreateFlowTask,
73 CreateTableTask, CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask,
74 QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
75};
76
77#[async_trait::async_trait]
79pub trait DdlManagerConfigurator<C>: Send + Sync {
80 async fn configure(
82 &self,
83 ddl_manager: DdlManager,
84 ctx: C,
85 ) -> std::result::Result<DdlManager, BoxedError>;
86}
87
88pub type DdlManagerConfiguratorRef<C> = Arc<dyn DdlManagerConfigurator<C>>;
89
90pub type DdlManagerRef = Arc<DdlManager>;
91
92pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader;
93
94#[derive(Builder)]
96pub struct DdlManager {
97 ddl_context: DdlContext,
98 procedure_manager: ProcedureManagerRef,
99 repartition_procedure_factory: RepartitionProcedureFactoryRef,
100 #[cfg(feature = "enterprise")]
101 trigger_ddl_manager: Option<TriggerDdlManagerRef>,
102}
103
104#[cfg(feature = "enterprise")]
107#[async_trait::async_trait]
108pub trait TriggerDdlManager: Send + Sync {
109 async fn create_trigger(
110 &self,
111 create_trigger_task: CreateTriggerTask,
112 procedure_manager: ProcedureManagerRef,
113 ddl_context: DdlContext,
114 query_context: QueryContext,
115 ) -> Result<SubmitDdlTaskResponse>;
116
117 async fn drop_trigger(
118 &self,
119 drop_trigger_task: DropTriggerTask,
120 procedure_manager: ProcedureManagerRef,
121 ddl_context: DdlContext,
122 query_context: QueryContext,
123 ) -> Result<SubmitDdlTaskResponse>;
124
125 fn as_any(&self) -> &dyn std::any::Any;
126}
127
128#[cfg(feature = "enterprise")]
129pub type TriggerDdlManagerRef = Arc<dyn TriggerDdlManager>;
130
131macro_rules! procedure_loader_entry {
132 ($procedure:ident) => {
133 (
134 $procedure::TYPE_NAME,
135 &|context: DdlContext| -> BoxedProcedureLoader {
136 Box::new(move |json: &str| {
137 let context = context.clone();
138 $procedure::from_json(json, context).map(|p| Box::new(p) as _)
139 })
140 },
141 )
142 };
143}
144
145macro_rules! procedure_loader {
146 ($($procedure:ident),*) => {
147 vec![
148 $(procedure_loader_entry!($procedure)),*
149 ]
150 };
151}
152
153pub type RepartitionProcedureFactoryRef = Arc<dyn RepartitionProcedureFactory>;
154
155pub enum RepartitionSource {
156 Partitioned {
157 exprs: Vec<String>,
158 target_partition_columns: Option<Vec<String>>,
164 },
165 Unpartitioned {
166 partition_columns: Vec<String>,
167 },
168}
169
170pub trait RepartitionProcedureFactory: Send + Sync {
171 fn create(
172 &self,
173 ddl_ctx: &DdlContext,
174 table_name: TableName,
175 table_id: TableId,
176 source: RepartitionSource,
177 to_exprs: Vec<String>,
178 timeout: Option<Duration>,
179 ) -> std::result::Result<BoxedProcedure, BoxedError>;
180
181 fn register_loaders(
182 &self,
183 ddl_ctx: &DdlContext,
184 procedure_manager: &ProcedureManagerRef,
185 ) -> std::result::Result<(), BoxedError>;
186}
187
188#[derive(Debug, Clone, Copy)]
193pub struct DdlOptions {
194 pub timeout: Duration,
198 pub wait: bool,
205}
206
207impl DdlManager {
208 pub fn try_new(
210 ddl_context: DdlContext,
211 procedure_manager: ProcedureManagerRef,
212 repartition_procedure_factory: RepartitionProcedureFactoryRef,
213 register_loaders: bool,
214 ) -> Result<Self> {
215 let manager = Self {
216 ddl_context,
217 procedure_manager,
218 repartition_procedure_factory,
219 #[cfg(feature = "enterprise")]
220 trigger_ddl_manager: None,
221 };
222 if register_loaders {
223 manager.register_loaders()?;
224 }
225 Ok(manager)
226 }
227
228 #[cfg(feature = "enterprise")]
229 pub fn with_trigger_ddl_manager(mut self, trigger_ddl_manager: TriggerDdlManagerRef) -> Self {
230 self.trigger_ddl_manager = Some(trigger_ddl_manager);
231 self
232 }
233
234 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
236 &self.ddl_context.table_metadata_manager
237 }
238
239 pub fn create_context(&self) -> DdlContext {
241 self.ddl_context.clone()
242 }
243
244 pub fn register_loaders(&self) -> Result<()> {
246 let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = procedure_loader!(
247 CreateTableProcedure,
248 CreateLogicalTablesProcedure,
249 CreateViewProcedure,
250 CreateFlowProcedure,
251 AlterTableProcedure,
252 AlterLogicalTablesProcedure,
253 AlterDatabaseProcedure,
254 DropTableProcedure,
255 DropFlowProcedure,
256 TruncateTableProcedure,
257 CreateDatabaseProcedure,
258 DropDatabaseProcedure,
259 DropViewProcedure,
260 CommentOnProcedure
261 );
262
263 for (type_name, loader_factory) in loaders {
264 let context = self.create_context();
265 self.procedure_manager
266 .register_loader(type_name, loader_factory(context))
267 .context(RegisterProcedureLoaderSnafu { type_name })?;
268 }
269
270 self.repartition_procedure_factory
271 .register_loaders(&self.ddl_context, &self.procedure_manager)
272 .context(RegisterRepartitionProcedureLoaderSnafu)?;
273
274 Ok(())
275 }
276
277 async fn submit_repartition_task(
296 &self,
297 table_id: TableId,
298 table_name: TableName,
299 repartition: Repartition,
300 wait: bool,
301 timeout: Duration,
302 ) -> Result<(ProcedureId, Option<Output>)> {
303 let context = self.create_context();
304
305 let into_partition_exprs = repartition.into_partition_exprs;
306 let source = repartition.source;
307
308 let source = match source {
309 Some(PbRepartitionSource::PartitionExprs(source)) => RepartitionSource::Partitioned {
310 exprs: source.exprs,
311 target_partition_columns: source
312 .target_partition_columns
313 .map(|columns| columns.columns),
314 },
315 Some(PbRepartitionSource::Unpartitioned(source)) => RepartitionSource::Unpartitioned {
316 partition_columns: source.partition_columns,
317 },
318 None => {
319 #[allow(deprecated)]
321 RepartitionSource::Partitioned {
322 exprs: repartition.from_partition_exprs,
323 target_partition_columns: None,
324 }
325 }
326 };
327
328 let procedure = self
329 .repartition_procedure_factory
330 .create(
331 &context,
332 table_name,
333 table_id,
334 source,
335 into_partition_exprs,
336 Some(timeout),
337 )
338 .context(CreateRepartitionProcedureSnafu)?;
339 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
340 if wait {
341 self.execute_procedure_and_wait(procedure_with_id).await
342 } else {
343 self.submit_procedure(procedure_with_id)
344 .await
345 .map(|p| (p, None))
346 }
347 }
348
349 #[tracing::instrument(skip_all)]
351 pub async fn submit_alter_table_task(
352 &self,
353 table_id: TableId,
354 alter_table_task: AlterTableTask,
355 ddl_options: DdlOptions,
356 ) -> Result<(ProcedureId, Option<Output>)> {
357 let mut alter_table_task = alter_table_task;
359 if let Some(Kind::Repartition(_)) = alter_table_task.alter_table.kind.as_ref()
360 && let Kind::Repartition(repartition) =
361 alter_table_task.alter_table.kind.take().unwrap()
362 {
363 let table_name = TableName::new(
364 alter_table_task.alter_table.catalog_name,
365 alter_table_task.alter_table.schema_name,
366 alter_table_task.alter_table.table_name,
367 );
368 return self
369 .submit_repartition_task(
370 table_id,
371 table_name,
372 repartition,
373 ddl_options.wait,
374 ddl_options.timeout,
375 )
376 .await;
377 }
378
379 let context = self.create_context();
380 let procedure = AlterTableProcedure::new(table_id, alter_table_task, context)?;
381
382 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
383
384 self.execute_procedure_and_wait(procedure_with_id).await
385 }
386
387 #[tracing::instrument(skip_all)]
389 pub async fn submit_create_table_task(
390 &self,
391 create_table_task: CreateTableTask,
392 query_context: QueryContext,
393 ) -> Result<(ProcedureId, Option<Output>)> {
394 let context = self.create_context();
395
396 let procedure = CreateTableProcedure::new_with_query_context(
397 create_table_task,
398 query_context,
399 context,
400 )?;
401
402 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
403
404 self.execute_procedure_and_wait(procedure_with_id).await
405 }
406
407 #[tracing::instrument(skip_all)]
409 pub async fn submit_create_view_task(
410 &self,
411 create_view_task: CreateViewTask,
412 ) -> Result<(ProcedureId, Option<Output>)> {
413 let context = self.create_context();
414
415 let procedure = CreateViewProcedure::new(create_view_task, context);
416
417 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
418
419 self.execute_procedure_and_wait(procedure_with_id).await
420 }
421
422 #[tracing::instrument(skip_all)]
424 pub async fn submit_create_logical_table_tasks(
425 &self,
426 create_table_tasks: Vec<CreateTableTask>,
427 physical_table_id: TableId,
428 ) -> Result<(ProcedureId, Option<Output>)> {
429 let context = self.create_context();
430
431 let procedure =
432 CreateLogicalTablesProcedure::new(create_table_tasks, physical_table_id, context);
433
434 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
435
436 self.execute_procedure_and_wait(procedure_with_id).await
437 }
438
439 #[tracing::instrument(skip_all)]
441 pub async fn submit_alter_logical_table_tasks(
442 &self,
443 alter_table_tasks: Vec<AlterTableTask>,
444 physical_table_id: TableId,
445 ) -> Result<(ProcedureId, Option<Output>)> {
446 let context = self.create_context();
447
448 let procedure =
449 AlterLogicalTablesProcedure::new(alter_table_tasks, physical_table_id, context);
450
451 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
452
453 self.execute_procedure_and_wait(procedure_with_id).await
454 }
455
456 #[tracing::instrument(skip_all)]
458 pub async fn submit_drop_table_task(
459 &self,
460 drop_table_task: DropTableTask,
461 ) -> Result<(ProcedureId, Option<Output>)> {
462 let context = self.create_context();
463
464 let procedure = DropTableProcedure::new(drop_table_task, context);
465
466 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
467
468 self.execute_procedure_and_wait(procedure_with_id).await
469 }
470
471 #[tracing::instrument(skip_all)]
473 pub async fn submit_create_database(
474 &self,
475 CreateDatabaseTask {
476 catalog,
477 schema,
478 create_if_not_exists,
479 options,
480 }: CreateDatabaseTask,
481 ) -> Result<(ProcedureId, Option<Output>)> {
482 let context = self.create_context();
483 let procedure =
484 CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context);
485 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
486
487 self.execute_procedure_and_wait(procedure_with_id).await
488 }
489
490 #[tracing::instrument(skip_all)]
492 pub async fn submit_drop_database(
493 &self,
494 DropDatabaseTask {
495 catalog,
496 schema,
497 drop_if_exists,
498 }: DropDatabaseTask,
499 ) -> Result<(ProcedureId, Option<Output>)> {
500 let context = self.create_context();
501 let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context);
502 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
503
504 self.execute_procedure_and_wait(procedure_with_id).await
505 }
506
507 pub async fn submit_alter_database(
508 &self,
509 alter_database_task: AlterDatabaseTask,
510 ) -> Result<(ProcedureId, Option<Output>)> {
511 let context = self.create_context();
512 let procedure = AlterDatabaseProcedure::new(alter_database_task, context)?;
513 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
514
515 self.execute_procedure_and_wait(procedure_with_id).await
516 }
517
518 #[tracing::instrument(skip_all)]
520 pub async fn submit_create_flow_task(
521 &self,
522 create_flow: CreateFlowTask,
523 query_context: QueryContext,
524 ) -> Result<(ProcedureId, Option<Output>)> {
525 let context = self.create_context();
526 let procedure = CreateFlowProcedure::new(create_flow, query_context, context);
527 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
528
529 self.execute_procedure_and_wait(procedure_with_id).await
530 }
531
532 #[tracing::instrument(skip_all)]
534 pub async fn submit_drop_flow_task(
535 &self,
536 drop_flow: DropFlowTask,
537 ) -> Result<(ProcedureId, Option<Output>)> {
538 let context = self.create_context();
539 let procedure = DropFlowProcedure::new(drop_flow, context);
540 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
541
542 self.execute_procedure_and_wait(procedure_with_id).await
543 }
544
545 #[tracing::instrument(skip_all)]
547 pub async fn submit_drop_view_task(
548 &self,
549 drop_view: DropViewTask,
550 ) -> Result<(ProcedureId, Option<Output>)> {
551 let context = self.create_context();
552 let procedure = DropViewProcedure::new(drop_view, context);
553 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
554
555 self.execute_procedure_and_wait(procedure_with_id).await
556 }
557
558 #[tracing::instrument(skip_all)]
560 pub async fn submit_truncate_table_task(
561 &self,
562 truncate_table_task: TruncateTableTask,
563 table_info_value: DeserializedValueWithBytes<TableInfoValue>,
564 ) -> Result<(ProcedureId, Option<Output>)> {
565 let context = self.create_context();
566 let procedure = TruncateTableProcedure::new(truncate_table_task, table_info_value, context);
567
568 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
569
570 self.execute_procedure_and_wait(procedure_with_id).await
571 }
572
573 #[tracing::instrument(skip_all)]
575 pub async fn submit_comment_on_task(
576 &self,
577 comment_on_task: CommentOnTask,
578 ) -> Result<(ProcedureId, Option<Output>)> {
579 let context = self.create_context();
580 let procedure = CommentOnProcedure::new(comment_on_task, context);
581 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
582
583 self.execute_procedure_and_wait(procedure_with_id).await
584 }
585
586 async fn execute_procedure_and_wait(
588 &self,
589 procedure_with_id: ProcedureWithId,
590 ) -> Result<(ProcedureId, Option<Output>)> {
591 let procedure_id = procedure_with_id.id;
592
593 let mut watcher = self
594 .procedure_manager
595 .submit(procedure_with_id)
596 .await
597 .context(SubmitProcedureSnafu)?;
598
599 let output = watcher::wait(&mut watcher)
600 .await
601 .context(WaitProcedureSnafu)?;
602
603 Ok((procedure_id, output))
604 }
605
606 async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result<ProcedureId> {
608 let procedure_id = procedure_with_id.id;
609 let _ = self
610 .procedure_manager
611 .submit(procedure_with_id)
612 .await
613 .context(SubmitProcedureSnafu)?;
614
615 Ok(procedure_id)
616 }
617
618 pub async fn submit_ddl_task(
619 &self,
620 ctx: &ExecutorContext,
621 request: SubmitDdlTaskRequest,
622 ) -> Result<SubmitDdlTaskResponse> {
623 let span = ctx
624 .tracing_context
625 .as_ref()
626 .map(TracingContext::from_w3c)
627 .unwrap_or_else(TracingContext::from_current_span)
628 .attach(tracing::info_span!("DdlManager::submit_ddl_task"));
629 let ddl_options = DdlOptions {
630 wait: request.wait,
631 timeout: request.timeout,
632 };
633 async move {
634 debug!("Submitting Ddl task: {:?}", request.task);
635 match request.task {
636 CreateTable(create_table_task) => {
637 handle_create_table_task(self, create_table_task, request.query_context).await
638 }
639 DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await,
640 AlterTable(alter_table_task) => {
641 handle_alter_table_task(self, alter_table_task, ddl_options).await
642 }
643 TruncateTable(truncate_table_task) => {
644 handle_truncate_table_task(self, truncate_table_task).await
645 }
646 CreateLogicalTables(create_table_tasks) => {
647 handle_create_logical_table_tasks(self, create_table_tasks).await
648 }
649 AlterLogicalTables(alter_table_tasks) => {
650 handle_alter_logical_table_tasks(self, alter_table_tasks).await
651 }
652 DropLogicalTables(_) => todo!(),
653 CreateDatabase(create_database_task) => {
654 handle_create_database_task(self, create_database_task).await
655 }
656 DropDatabase(drop_database_task) => {
657 handle_drop_database_task(self, drop_database_task).await
658 }
659 AlterDatabase(alter_database_task) => {
660 handle_alter_database_task(self, alter_database_task).await
661 }
662 CreateFlow(create_flow_task) => {
663 handle_create_flow_task(self, create_flow_task, request.query_context).await
664 }
665 DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
666 CreateView(create_view_task) => {
667 handle_create_view_task(self, create_view_task).await
668 }
669 DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
670 CommentOn(comment_on_task) => handle_comment_on_task(self, comment_on_task).await,
671 #[cfg(feature = "enterprise")]
672 CreateTrigger(create_trigger_task) => {
673 handle_create_trigger_task(self, create_trigger_task, request.query_context)
674 .await
675 }
676 #[cfg(feature = "enterprise")]
677 DropTrigger(drop_trigger_task) => {
678 handle_drop_trigger_task(self, drop_trigger_task, request.query_context).await
679 }
680 }
681 }
682 .trace(span)
683 .await
684 }
685}
686
687async fn handle_truncate_table_task(
688 ddl_manager: &DdlManager,
689 truncate_table_task: TruncateTableTask,
690) -> Result<SubmitDdlTaskResponse> {
691 let table_id = truncate_table_task.table_id;
692 let table_metadata_manager = &ddl_manager.table_metadata_manager();
693 let table_ref = truncate_table_task.table_ref();
694
695 let table_info_value = table_metadata_manager
696 .table_info_manager()
697 .get(table_id)
698 .await?
699 .with_context(|| TableInfoNotFoundSnafu {
700 table: table_ref.to_string(),
701 })?;
702 let physical_table_id = table_metadata_manager
703 .table_route_manager()
704 .get_physical_table_id(table_id)
705 .await?;
706 ensure!(
707 physical_table_id == table_id,
708 error::UnexpectedSnafu {
709 err_msg: "Truncate table is only supported for physical tables."
710 }
711 );
712
713 let (id, _) = ddl_manager
714 .submit_truncate_table_task(truncate_table_task, table_info_value)
715 .await?;
716
717 info!("Table: {table_id} is truncated via procedure_id {id:?}");
718
719 Ok(SubmitDdlTaskResponse {
720 key: id.to_string().into(),
721 ..Default::default()
722 })
723}
724
725async fn handle_alter_table_task(
726 ddl_manager: &DdlManager,
727 alter_table_task: AlterTableTask,
728 ddl_options: DdlOptions,
729) -> Result<SubmitDdlTaskResponse> {
730 let table_ref = alter_table_task.table_ref();
731
732 let table_id = ddl_manager
733 .table_metadata_manager()
734 .table_name_manager()
735 .get(TableNameKey::new(
736 table_ref.catalog,
737 table_ref.schema,
738 table_ref.table,
739 ))
740 .await?
741 .with_context(|| TableNotFoundSnafu {
742 table_name: table_ref.to_string(),
743 })?
744 .table_id();
745
746 let table_route_value = ddl_manager
747 .table_metadata_manager()
748 .table_route_manager()
749 .table_route_storage()
750 .get(table_id)
751 .await?
752 .context(TableRouteNotFoundSnafu { table_id })?;
753 ensure!(
754 table_route_value.is_physical(),
755 UnexpectedLogicalRouteTableSnafu {
756 err_msg: format!("{:?} is a non-physical TableRouteValue.", table_ref),
757 }
758 );
759
760 let (id, _) = ddl_manager
761 .submit_alter_table_task(table_id, alter_table_task, ddl_options)
762 .await?;
763
764 info!("Table: {table_id} is altered via procedure_id {id:?}");
765
766 Ok(SubmitDdlTaskResponse {
767 key: id.to_string().into(),
768 ..Default::default()
769 })
770}
771
772async fn handle_drop_table_task(
773 ddl_manager: &DdlManager,
774 drop_table_task: DropTableTask,
775) -> Result<SubmitDdlTaskResponse> {
776 let table_id = drop_table_task.table_id;
777 let (id, _) = ddl_manager.submit_drop_table_task(drop_table_task).await?;
778
779 info!("Table: {table_id} is dropped via procedure_id {id:?}");
780
781 Ok(SubmitDdlTaskResponse {
782 key: id.to_string().into(),
783 ..Default::default()
784 })
785}
786
787async fn handle_create_table_task(
788 ddl_manager: &DdlManager,
789 create_table_task: CreateTableTask,
790 query_context: QueryContext,
791) -> Result<SubmitDdlTaskResponse> {
792 let (id, output) = ddl_manager
793 .submit_create_table_task(create_table_task, query_context)
794 .await?;
795
796 let procedure_id = id.to_string();
797 let output = output.context(ProcedureOutputSnafu {
798 procedure_id: &procedure_id,
799 err_msg: "empty output",
800 })?;
801 let table_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
802 procedure_id: &procedure_id,
803 err_msg: "downcast to `u32`",
804 })?);
805 info!("Table: {table_id} is created via procedure_id {id:?}");
806
807 Ok(SubmitDdlTaskResponse {
808 key: procedure_id.into(),
809 table_ids: vec![table_id],
810 })
811}
812
813async fn handle_create_logical_table_tasks(
814 ddl_manager: &DdlManager,
815 create_table_tasks: Vec<CreateTableTask>,
816) -> Result<SubmitDdlTaskResponse> {
817 ensure!(
818 !create_table_tasks.is_empty(),
819 EmptyDdlTasksSnafu {
820 name: "create logical tables"
821 }
822 );
823 let physical_table_id = utils::check_and_get_physical_table_id(
824 ddl_manager.table_metadata_manager(),
825 &create_table_tasks,
826 )
827 .await?;
828 let num_logical_tables = create_table_tasks.len();
829
830 let (id, output) = ddl_manager
831 .submit_create_logical_table_tasks(create_table_tasks, physical_table_id)
832 .await?;
833
834 info!(
835 "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is created via procedure_id {id:?}"
836 );
837
838 let procedure_id = id.to_string();
839 let output = output.context(ProcedureOutputSnafu {
840 procedure_id: &procedure_id,
841 err_msg: "empty output",
842 })?;
843 let table_ids = output
844 .downcast_ref::<Vec<TableId>>()
845 .context(ProcedureOutputSnafu {
846 procedure_id: &procedure_id,
847 err_msg: "downcast to `Vec<TableId>`",
848 })?
849 .clone();
850
851 Ok(SubmitDdlTaskResponse {
852 key: procedure_id.into(),
853 table_ids,
854 })
855}
856
857async fn handle_create_database_task(
858 ddl_manager: &DdlManager,
859 create_database_task: CreateDatabaseTask,
860) -> Result<SubmitDdlTaskResponse> {
861 let (id, _) = ddl_manager
862 .submit_create_database(create_database_task.clone())
863 .await?;
864
865 let procedure_id = id.to_string();
866 info!(
867 "Database {}.{} is created via procedure_id {id:?}",
868 create_database_task.catalog, create_database_task.schema
869 );
870
871 Ok(SubmitDdlTaskResponse {
872 key: procedure_id.into(),
873 ..Default::default()
874 })
875}
876
877async fn handle_drop_database_task(
878 ddl_manager: &DdlManager,
879 drop_database_task: DropDatabaseTask,
880) -> Result<SubmitDdlTaskResponse> {
881 let (id, _) = ddl_manager
882 .submit_drop_database(drop_database_task.clone())
883 .await?;
884
885 let procedure_id = id.to_string();
886 info!(
887 "Database {}.{} is dropped via procedure_id {id:?}",
888 drop_database_task.catalog, drop_database_task.schema
889 );
890
891 Ok(SubmitDdlTaskResponse {
892 key: procedure_id.into(),
893 ..Default::default()
894 })
895}
896
897async fn handle_alter_database_task(
898 ddl_manager: &DdlManager,
899 alter_database_task: AlterDatabaseTask,
900) -> Result<SubmitDdlTaskResponse> {
901 let (id, _) = ddl_manager
902 .submit_alter_database(alter_database_task.clone())
903 .await?;
904
905 let procedure_id = id.to_string();
906 info!(
907 "Database {}.{} is altered via procedure_id {id:?}",
908 alter_database_task.catalog(),
909 alter_database_task.schema()
910 );
911
912 Ok(SubmitDdlTaskResponse {
913 key: procedure_id.into(),
914 ..Default::default()
915 })
916}
917
918async fn handle_drop_flow_task(
919 ddl_manager: &DdlManager,
920 drop_flow_task: DropFlowTask,
921) -> Result<SubmitDdlTaskResponse> {
922 let (id, _) = ddl_manager
923 .submit_drop_flow_task(drop_flow_task.clone())
924 .await?;
925
926 let procedure_id = id.to_string();
927 info!(
928 "Flow {}.{}({}) is dropped via procedure_id {id:?}",
929 drop_flow_task.catalog_name, drop_flow_task.flow_name, drop_flow_task.flow_id,
930 );
931
932 Ok(SubmitDdlTaskResponse {
933 key: procedure_id.into(),
934 ..Default::default()
935 })
936}
937
938#[cfg(feature = "enterprise")]
939async fn handle_drop_trigger_task(
940 ddl_manager: &DdlManager,
941 drop_trigger_task: DropTriggerTask,
942 query_context: QueryContext,
943) -> Result<SubmitDdlTaskResponse> {
944 let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
945 use crate::error::UnsupportedSnafu;
946
947 return UnsupportedSnafu {
948 operation: "drop trigger",
949 }
950 .fail();
951 };
952
953 m.drop_trigger(
954 drop_trigger_task,
955 ddl_manager.procedure_manager.clone(),
956 ddl_manager.ddl_context.clone(),
957 query_context,
958 )
959 .await
960}
961
962async fn handle_drop_view_task(
963 ddl_manager: &DdlManager,
964 drop_view_task: DropViewTask,
965) -> Result<SubmitDdlTaskResponse> {
966 let (id, _) = ddl_manager
967 .submit_drop_view_task(drop_view_task.clone())
968 .await?;
969
970 let procedure_id = id.to_string();
971 info!(
972 "View {}({}) is dropped via procedure_id {id:?}",
973 drop_view_task.table_ref(),
974 drop_view_task.view_id,
975 );
976
977 Ok(SubmitDdlTaskResponse {
978 key: procedure_id.into(),
979 ..Default::default()
980 })
981}
982
983async fn handle_create_flow_task(
984 ddl_manager: &DdlManager,
985 create_flow_task: CreateFlowTask,
986 query_context: QueryContext,
987) -> Result<SubmitDdlTaskResponse> {
988 let (id, output) = ddl_manager
989 .submit_create_flow_task(create_flow_task.clone(), query_context)
990 .await?;
991
992 let procedure_id = id.to_string();
993 let output = output.context(ProcedureOutputSnafu {
994 procedure_id: &procedure_id,
995 err_msg: "empty output",
996 })?;
997 let flow_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
998 procedure_id: &procedure_id,
999 err_msg: "downcast to `u32`",
1000 })?);
1001 if !create_flow_task.or_replace {
1002 info!(
1003 "Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
1004 create_flow_task.catalog_name, create_flow_task.flow_name,
1005 );
1006 } else {
1007 info!(
1008 "Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
1009 create_flow_task.catalog_name, create_flow_task.flow_name,
1010 );
1011 }
1012
1013 Ok(SubmitDdlTaskResponse {
1014 key: procedure_id.into(),
1015 ..Default::default()
1016 })
1017}
1018
1019#[cfg(feature = "enterprise")]
1020async fn handle_create_trigger_task(
1021 ddl_manager: &DdlManager,
1022 create_trigger_task: CreateTriggerTask,
1023 query_context: QueryContext,
1024) -> Result<SubmitDdlTaskResponse> {
1025 let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
1026 use crate::error::UnsupportedSnafu;
1027
1028 return UnsupportedSnafu {
1029 operation: "create trigger",
1030 }
1031 .fail();
1032 };
1033
1034 m.create_trigger(
1035 create_trigger_task,
1036 ddl_manager.procedure_manager.clone(),
1037 ddl_manager.ddl_context.clone(),
1038 query_context,
1039 )
1040 .await
1041}
1042
1043async fn handle_alter_logical_table_tasks(
1044 ddl_manager: &DdlManager,
1045 alter_table_tasks: Vec<AlterTableTask>,
1046) -> Result<SubmitDdlTaskResponse> {
1047 ensure!(
1048 !alter_table_tasks.is_empty(),
1049 EmptyDdlTasksSnafu {
1050 name: "alter logical tables"
1051 }
1052 );
1053
1054 let first_table = TableNameKey {
1056 catalog: &alter_table_tasks[0].alter_table.catalog_name,
1057 schema: &alter_table_tasks[0].alter_table.schema_name,
1058 table: &alter_table_tasks[0].alter_table.table_name,
1059 };
1060 let physical_table_id =
1061 utils::get_physical_table_id(ddl_manager.table_metadata_manager(), first_table).await?;
1062 let num_logical_tables = alter_table_tasks.len();
1063
1064 let (id, _) = ddl_manager
1065 .submit_alter_logical_table_tasks(alter_table_tasks, physical_table_id)
1066 .await?;
1067
1068 info!(
1069 "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is altered via procedure_id {id:?}"
1070 );
1071
1072 let procedure_id = id.to_string();
1073
1074 Ok(SubmitDdlTaskResponse {
1075 key: procedure_id.into(),
1076 ..Default::default()
1077 })
1078}
1079
1080async fn handle_create_view_task(
1082 ddl_manager: &DdlManager,
1083 create_view_task: CreateViewTask,
1084) -> Result<SubmitDdlTaskResponse> {
1085 let (id, output) = ddl_manager
1086 .submit_create_view_task(create_view_task)
1087 .await?;
1088
1089 let procedure_id = id.to_string();
1090 let output = output.context(ProcedureOutputSnafu {
1091 procedure_id: &procedure_id,
1092 err_msg: "empty output",
1093 })?;
1094 let view_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
1095 procedure_id: &procedure_id,
1096 err_msg: "downcast to `u32`",
1097 })?);
1098 info!("View: {view_id} is created via procedure_id {id:?}");
1099
1100 Ok(SubmitDdlTaskResponse {
1101 key: procedure_id.into(),
1102 table_ids: vec![view_id],
1103 })
1104}
1105
1106async fn handle_comment_on_task(
1107 ddl_manager: &DdlManager,
1108 comment_on_task: CommentOnTask,
1109) -> Result<SubmitDdlTaskResponse> {
1110 let (id, _) = ddl_manager
1111 .submit_comment_on_task(comment_on_task.clone())
1112 .await?;
1113
1114 let procedure_id = id.to_string();
1115 info!(
1116 "Comment on {}.{}.{} is updated via procedure_id {id:?}",
1117 comment_on_task.catalog_name, comment_on_task.schema_name, comment_on_task.object_name
1118 );
1119
1120 Ok(SubmitDdlTaskResponse {
1121 key: procedure_id.into(),
1122 ..Default::default()
1123 })
1124}
1125
1126#[cfg(test)]
1127mod tests {
1128 use std::sync::Arc;
1129 use std::time::Duration;
1130
1131 use common_error::ext::BoxedError;
1132 use common_procedure::local::LocalManager;
1133 use common_procedure::test_util::InMemoryPoisonStore;
1134 use common_procedure::{BoxedProcedure, ProcedureManagerRef};
1135 use store_api::storage::TableId;
1136 use table::table_name::TableName;
1137
1138 use super::DdlManager;
1139 use crate::cache_invalidator::DummyCacheInvalidator;
1140 use crate::ddl::alter_table::AlterTableProcedure;
1141 use crate::ddl::create_table::CreateTableProcedure;
1142 use crate::ddl::drop_table::DropTableProcedure;
1143 use crate::ddl::flow_meta::FlowMetadataAllocator;
1144 use crate::ddl::table_meta::TableMetadataAllocator;
1145 use crate::ddl::truncate_table::TruncateTableProcedure;
1146 use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
1147 use crate::ddl_manager::{RepartitionProcedureFactory, RepartitionSource};
1148 use crate::key::TableMetadataManager;
1149 use crate::key::flow::FlowMetadataManager;
1150 use crate::kv_backend::memory::MemoryKvBackend;
1151 use crate::node_manager::{DatanodeManager, DatanodeRef, FlownodeManager, FlownodeRef};
1152 use crate::peer::Peer;
1153 use crate::region_keeper::MemoryRegionKeeper;
1154 use crate::region_registry::LeaderRegionRegistry;
1155 use crate::sequence::SequenceBuilder;
1156 use crate::state_store::KvStateStore;
1157 use crate::wal_provider::WalProvider;
1158
1159 pub struct DummyDatanodeManager;
1161
1162 #[async_trait::async_trait]
1163 impl DatanodeManager for DummyDatanodeManager {
1164 async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
1165 unimplemented!()
1166 }
1167 }
1168
1169 #[async_trait::async_trait]
1170 impl FlownodeManager for DummyDatanodeManager {
1171 async fn flownode(&self, _node: &Peer) -> FlownodeRef {
1172 unimplemented!()
1173 }
1174 }
1175
1176 struct DummyRepartitionProcedureFactory;
1177
1178 #[async_trait::async_trait]
1179 impl RepartitionProcedureFactory for DummyRepartitionProcedureFactory {
1180 fn create(
1181 &self,
1182 _ddl_ctx: &DdlContext,
1183 _table_name: TableName,
1184 _table_id: TableId,
1185 _source: RepartitionSource,
1186 _to_exprs: Vec<String>,
1187 _timeout: Option<Duration>,
1188 ) -> std::result::Result<BoxedProcedure, BoxedError> {
1189 unimplemented!()
1190 }
1191
1192 fn register_loaders(
1193 &self,
1194 _ddl_ctx: &DdlContext,
1195 _procedure_manager: &ProcedureManagerRef,
1196 ) -> std::result::Result<(), BoxedError> {
1197 Ok(())
1198 }
1199 }
1200
1201 #[test]
1202 fn test_try_new() {
1203 let kv_backend = Arc::new(MemoryKvBackend::new());
1204 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
1205 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
1206 Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
1207 Arc::new(WalProvider::default()),
1208 ));
1209 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
1210 let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
1211 Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()),
1212 ));
1213
1214 let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
1215 let poison_manager = Arc::new(InMemoryPoisonStore::default());
1216 let procedure_manager = Arc::new(LocalManager::new(
1217 Default::default(),
1218 state_store,
1219 poison_manager,
1220 None,
1221 None,
1222 ));
1223
1224 let _ = DdlManager::try_new(
1225 DdlContext {
1226 node_manager: Arc::new(DummyDatanodeManager),
1227 cache_invalidator: Arc::new(DummyCacheInvalidator),
1228 table_metadata_manager,
1229 table_metadata_allocator,
1230 flow_metadata_manager,
1231 flow_metadata_allocator,
1232 memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
1233 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
1234 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
1235 },
1236 procedure_manager.clone(),
1237 Arc::new(DummyRepartitionProcedureFactory),
1238 true,
1239 );
1240
1241 let expected_loaders = vec![
1242 CreateTableProcedure::TYPE_NAME,
1243 AlterTableProcedure::TYPE_NAME,
1244 DropTableProcedure::TYPE_NAME,
1245 TruncateTableProcedure::TYPE_NAME,
1246 ];
1247
1248 for loader in expected_loaders {
1249 assert!(procedure_manager.contains_loader(loader));
1250 }
1251 }
1252}