1use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::Repartition;
19use api::v1::alter_table_expr::Kind;
20use common_error::ext::BoxedError;
21use common_procedure::{
22 BoxedProcedure, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef,
23 ProcedureWithId, watcher,
24};
25use common_telemetry::tracing_context::{FutureExt, TracingContext};
26use common_telemetry::{debug, info, tracing};
27use derive_builder::Builder;
28use snafu::{OptionExt, ResultExt, ensure};
29use store_api::storage::TableId;
30use table::table_name::TableName;
31
32use crate::ddl::alter_database::AlterDatabaseProcedure;
33use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
34use crate::ddl::alter_table::AlterTableProcedure;
35use crate::ddl::comment_on::CommentOnProcedure;
36use crate::ddl::create_database::CreateDatabaseProcedure;
37use crate::ddl::create_flow::CreateFlowProcedure;
38use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
39use crate::ddl::create_table::CreateTableProcedure;
40use crate::ddl::create_view::CreateViewProcedure;
41use crate::ddl::drop_database::DropDatabaseProcedure;
42use crate::ddl::drop_flow::DropFlowProcedure;
43use crate::ddl::drop_table::DropTableProcedure;
44use crate::ddl::drop_view::DropViewProcedure;
45use crate::ddl::truncate_table::TruncateTableProcedure;
46use crate::ddl::{DdlContext, utils};
47use crate::error::{
48 CreateRepartitionProcedureSnafu, EmptyDdlTasksSnafu, ProcedureOutputSnafu,
49 RegisterProcedureLoaderSnafu, RegisterRepartitionProcedureLoaderSnafu, Result,
50 SubmitProcedureSnafu, TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
51 UnexpectedLogicalRouteTableSnafu, WaitProcedureSnafu,
52};
53use crate::key::table_info::TableInfoValue;
54use crate::key::table_name::TableNameKey;
55use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
56use crate::procedure_executor::ExecutorContext;
57#[cfg(feature = "enterprise")]
58use crate::rpc::ddl::DdlTask::CreateTrigger;
59#[cfg(feature = "enterprise")]
60use crate::rpc::ddl::DdlTask::DropTrigger;
61use crate::rpc::ddl::DdlTask::{
62 AlterDatabase, AlterLogicalTables, AlterTable, CommentOn, CreateDatabase, CreateFlow,
63 CreateLogicalTables, CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables,
64 DropTable, DropView, TruncateTable,
65};
66#[cfg(feature = "enterprise")]
67use crate::rpc::ddl::trigger::CreateTriggerTask;
68#[cfg(feature = "enterprise")]
69use crate::rpc::ddl::trigger::DropTriggerTask;
70use crate::rpc::ddl::{
71 AlterDatabaseTask, AlterTableTask, CommentOnTask, CreateDatabaseTask, CreateFlowTask,
72 CreateTableTask, CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask,
73 QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
74};
75use crate::rpc::router::RegionRoute;
76
77#[async_trait::async_trait]
79pub trait DdlManagerConfigurator<C>: Send + Sync {
80 async fn configure(
82 &self,
83 ddl_manager: DdlManager,
84 ctx: C,
85 ) -> std::result::Result<DdlManager, BoxedError>;
86}
87
88pub type DdlManagerConfiguratorRef<C> = Arc<dyn DdlManagerConfigurator<C>>;
89
90pub type DdlManagerRef = Arc<DdlManager>;
91
92pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader;
93
94#[derive(Builder)]
96pub struct DdlManager {
97 ddl_context: DdlContext,
98 procedure_manager: ProcedureManagerRef,
99 repartition_procedure_factory: RepartitionProcedureFactoryRef,
100 #[cfg(feature = "enterprise")]
101 trigger_ddl_manager: Option<TriggerDdlManagerRef>,
102}
103
104#[cfg(feature = "enterprise")]
107#[async_trait::async_trait]
108pub trait TriggerDdlManager: Send + Sync {
109 async fn create_trigger(
110 &self,
111 create_trigger_task: CreateTriggerTask,
112 procedure_manager: ProcedureManagerRef,
113 ddl_context: DdlContext,
114 query_context: QueryContext,
115 ) -> Result<SubmitDdlTaskResponse>;
116
117 async fn drop_trigger(
118 &self,
119 drop_trigger_task: DropTriggerTask,
120 procedure_manager: ProcedureManagerRef,
121 ddl_context: DdlContext,
122 query_context: QueryContext,
123 ) -> Result<SubmitDdlTaskResponse>;
124
125 fn as_any(&self) -> &dyn std::any::Any;
126}
127
128#[cfg(feature = "enterprise")]
129pub type TriggerDdlManagerRef = Arc<dyn TriggerDdlManager>;
130
131macro_rules! procedure_loader_entry {
132 ($procedure:ident) => {
133 (
134 $procedure::TYPE_NAME,
135 &|context: DdlContext| -> BoxedProcedureLoader {
136 Box::new(move |json: &str| {
137 let context = context.clone();
138 $procedure::from_json(json, context).map(|p| Box::new(p) as _)
139 })
140 },
141 )
142 };
143}
144
145macro_rules! procedure_loader {
146 ($($procedure:ident),*) => {
147 vec![
148 $(procedure_loader_entry!($procedure)),*
149 ]
150 };
151}
152
153pub type RepartitionProcedureFactoryRef = Arc<dyn RepartitionProcedureFactory>;
154
155pub trait RepartitionProcedureFactory: Send + Sync {
156 fn create(
157 &self,
158 ddl_ctx: &DdlContext,
159 table_name: TableName,
160 table_id: TableId,
161 from_exprs: Vec<String>,
162 to_exprs: Vec<String>,
163 timeout: Option<Duration>,
164 ) -> std::result::Result<BoxedProcedure, BoxedError>;
165
166 fn register_loaders(
167 &self,
168 ddl_ctx: &DdlContext,
169 procedure_manager: &ProcedureManagerRef,
170 ) -> std::result::Result<(), BoxedError>;
171}
172
173#[derive(Debug, Clone, Copy)]
178pub struct DdlOptions {
179 pub timeout: Duration,
183 pub wait: bool,
190}
191
192impl DdlManager {
193 pub fn try_new(
195 ddl_context: DdlContext,
196 procedure_manager: ProcedureManagerRef,
197 repartition_procedure_factory: RepartitionProcedureFactoryRef,
198 register_loaders: bool,
199 ) -> Result<Self> {
200 let manager = Self {
201 ddl_context,
202 procedure_manager,
203 repartition_procedure_factory,
204 #[cfg(feature = "enterprise")]
205 trigger_ddl_manager: None,
206 };
207 if register_loaders {
208 manager.register_loaders()?;
209 }
210 Ok(manager)
211 }
212
213 #[cfg(feature = "enterprise")]
214 pub fn with_trigger_ddl_manager(mut self, trigger_ddl_manager: TriggerDdlManagerRef) -> Self {
215 self.trigger_ddl_manager = Some(trigger_ddl_manager);
216 self
217 }
218
219 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
221 &self.ddl_context.table_metadata_manager
222 }
223
224 pub fn create_context(&self) -> DdlContext {
226 self.ddl_context.clone()
227 }
228
229 pub fn register_loaders(&self) -> Result<()> {
231 let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = procedure_loader!(
232 CreateTableProcedure,
233 CreateLogicalTablesProcedure,
234 CreateViewProcedure,
235 CreateFlowProcedure,
236 AlterTableProcedure,
237 AlterLogicalTablesProcedure,
238 AlterDatabaseProcedure,
239 DropTableProcedure,
240 DropFlowProcedure,
241 TruncateTableProcedure,
242 CreateDatabaseProcedure,
243 DropDatabaseProcedure,
244 DropViewProcedure,
245 CommentOnProcedure
246 );
247
248 for (type_name, loader_factory) in loaders {
249 let context = self.create_context();
250 self.procedure_manager
251 .register_loader(type_name, loader_factory(context))
252 .context(RegisterProcedureLoaderSnafu { type_name })?;
253 }
254
255 self.repartition_procedure_factory
256 .register_loaders(&self.ddl_context, &self.procedure_manager)
257 .context(RegisterRepartitionProcedureLoaderSnafu)?;
258
259 Ok(())
260 }
261
262 async fn submit_repartition_task(
281 &self,
282 table_id: TableId,
283 table_name: TableName,
284 Repartition {
285 from_partition_exprs,
286 into_partition_exprs,
287 }: Repartition,
288 wait: bool,
289 timeout: Duration,
290 ) -> Result<(ProcedureId, Option<Output>)> {
291 let context = self.create_context();
292
293 let procedure = self
294 .repartition_procedure_factory
295 .create(
296 &context,
297 table_name,
298 table_id,
299 from_partition_exprs,
300 into_partition_exprs,
301 Some(timeout),
302 )
303 .context(CreateRepartitionProcedureSnafu)?;
304 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
305 if wait {
306 self.execute_procedure_and_wait(procedure_with_id).await
307 } else {
308 self.submit_procedure(procedure_with_id)
309 .await
310 .map(|p| (p, None))
311 }
312 }
313
314 #[tracing::instrument(skip_all)]
316 pub async fn submit_alter_table_task(
317 &self,
318 table_id: TableId,
319 alter_table_task: AlterTableTask,
320 ddl_options: DdlOptions,
321 ) -> Result<(ProcedureId, Option<Output>)> {
322 let mut alter_table_task = alter_table_task;
324 if let Some(Kind::Repartition(_)) = alter_table_task.alter_table.kind.as_ref()
325 && let Kind::Repartition(repartition) =
326 alter_table_task.alter_table.kind.take().unwrap()
327 {
328 let table_name = TableName::new(
329 alter_table_task.alter_table.catalog_name,
330 alter_table_task.alter_table.schema_name,
331 alter_table_task.alter_table.table_name,
332 );
333 return self
334 .submit_repartition_task(
335 table_id,
336 table_name,
337 repartition,
338 ddl_options.wait,
339 ddl_options.timeout,
340 )
341 .await;
342 }
343
344 let context = self.create_context();
345 let procedure = AlterTableProcedure::new(table_id, alter_table_task, context)?;
346
347 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
348
349 self.execute_procedure_and_wait(procedure_with_id).await
350 }
351
352 #[tracing::instrument(skip_all)]
354 pub async fn submit_create_table_task(
355 &self,
356 create_table_task: CreateTableTask,
357 ) -> Result<(ProcedureId, Option<Output>)> {
358 let context = self.create_context();
359
360 let procedure = CreateTableProcedure::new(create_table_task, context)?;
361
362 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
363
364 self.execute_procedure_and_wait(procedure_with_id).await
365 }
366
367 #[tracing::instrument(skip_all)]
369 pub async fn submit_create_view_task(
370 &self,
371 create_view_task: CreateViewTask,
372 ) -> Result<(ProcedureId, Option<Output>)> {
373 let context = self.create_context();
374
375 let procedure = CreateViewProcedure::new(create_view_task, context);
376
377 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
378
379 self.execute_procedure_and_wait(procedure_with_id).await
380 }
381
382 #[tracing::instrument(skip_all)]
384 pub async fn submit_create_logical_table_tasks(
385 &self,
386 create_table_tasks: Vec<CreateTableTask>,
387 physical_table_id: TableId,
388 ) -> Result<(ProcedureId, Option<Output>)> {
389 let context = self.create_context();
390
391 let procedure =
392 CreateLogicalTablesProcedure::new(create_table_tasks, physical_table_id, context);
393
394 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
395
396 self.execute_procedure_and_wait(procedure_with_id).await
397 }
398
399 #[tracing::instrument(skip_all)]
401 pub async fn submit_alter_logical_table_tasks(
402 &self,
403 alter_table_tasks: Vec<AlterTableTask>,
404 physical_table_id: TableId,
405 ) -> Result<(ProcedureId, Option<Output>)> {
406 let context = self.create_context();
407
408 let procedure =
409 AlterLogicalTablesProcedure::new(alter_table_tasks, physical_table_id, context);
410
411 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
412
413 self.execute_procedure_and_wait(procedure_with_id).await
414 }
415
416 #[tracing::instrument(skip_all)]
418 pub async fn submit_drop_table_task(
419 &self,
420 drop_table_task: DropTableTask,
421 ) -> Result<(ProcedureId, Option<Output>)> {
422 let context = self.create_context();
423
424 let procedure = DropTableProcedure::new(drop_table_task, context);
425
426 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
427
428 self.execute_procedure_and_wait(procedure_with_id).await
429 }
430
431 #[tracing::instrument(skip_all)]
433 pub async fn submit_create_database(
434 &self,
435 CreateDatabaseTask {
436 catalog,
437 schema,
438 create_if_not_exists,
439 options,
440 }: CreateDatabaseTask,
441 ) -> Result<(ProcedureId, Option<Output>)> {
442 let context = self.create_context();
443 let procedure =
444 CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context);
445 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
446
447 self.execute_procedure_and_wait(procedure_with_id).await
448 }
449
450 #[tracing::instrument(skip_all)]
452 pub async fn submit_drop_database(
453 &self,
454 DropDatabaseTask {
455 catalog,
456 schema,
457 drop_if_exists,
458 }: DropDatabaseTask,
459 ) -> Result<(ProcedureId, Option<Output>)> {
460 let context = self.create_context();
461 let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context);
462 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
463
464 self.execute_procedure_and_wait(procedure_with_id).await
465 }
466
467 pub async fn submit_alter_database(
468 &self,
469 alter_database_task: AlterDatabaseTask,
470 ) -> Result<(ProcedureId, Option<Output>)> {
471 let context = self.create_context();
472 let procedure = AlterDatabaseProcedure::new(alter_database_task, context)?;
473 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
474
475 self.execute_procedure_and_wait(procedure_with_id).await
476 }
477
478 #[tracing::instrument(skip_all)]
480 pub async fn submit_create_flow_task(
481 &self,
482 create_flow: CreateFlowTask,
483 query_context: QueryContext,
484 ) -> Result<(ProcedureId, Option<Output>)> {
485 let context = self.create_context();
486 let procedure = CreateFlowProcedure::new(create_flow, query_context, context);
487 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
488
489 self.execute_procedure_and_wait(procedure_with_id).await
490 }
491
492 #[tracing::instrument(skip_all)]
494 pub async fn submit_drop_flow_task(
495 &self,
496 drop_flow: DropFlowTask,
497 ) -> Result<(ProcedureId, Option<Output>)> {
498 let context = self.create_context();
499 let procedure = DropFlowProcedure::new(drop_flow, context);
500 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
501
502 self.execute_procedure_and_wait(procedure_with_id).await
503 }
504
505 #[tracing::instrument(skip_all)]
507 pub async fn submit_drop_view_task(
508 &self,
509 drop_view: DropViewTask,
510 ) -> Result<(ProcedureId, Option<Output>)> {
511 let context = self.create_context();
512 let procedure = DropViewProcedure::new(drop_view, context);
513 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
514
515 self.execute_procedure_and_wait(procedure_with_id).await
516 }
517
518 #[tracing::instrument(skip_all)]
520 pub async fn submit_truncate_table_task(
521 &self,
522 truncate_table_task: TruncateTableTask,
523 table_info_value: DeserializedValueWithBytes<TableInfoValue>,
524 region_routes: Vec<RegionRoute>,
525 ) -> Result<(ProcedureId, Option<Output>)> {
526 let context = self.create_context();
527 let procedure = TruncateTableProcedure::new(
528 truncate_table_task,
529 table_info_value,
530 region_routes,
531 context,
532 );
533
534 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
535
536 self.execute_procedure_and_wait(procedure_with_id).await
537 }
538
539 #[tracing::instrument(skip_all)]
541 pub async fn submit_comment_on_task(
542 &self,
543 comment_on_task: CommentOnTask,
544 ) -> Result<(ProcedureId, Option<Output>)> {
545 let context = self.create_context();
546 let procedure = CommentOnProcedure::new(comment_on_task, context);
547 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
548
549 self.execute_procedure_and_wait(procedure_with_id).await
550 }
551
552 async fn execute_procedure_and_wait(
554 &self,
555 procedure_with_id: ProcedureWithId,
556 ) -> Result<(ProcedureId, Option<Output>)> {
557 let procedure_id = procedure_with_id.id;
558
559 let mut watcher = self
560 .procedure_manager
561 .submit(procedure_with_id)
562 .await
563 .context(SubmitProcedureSnafu)?;
564
565 let output = watcher::wait(&mut watcher)
566 .await
567 .context(WaitProcedureSnafu)?;
568
569 Ok((procedure_id, output))
570 }
571
572 async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result<ProcedureId> {
574 let procedure_id = procedure_with_id.id;
575 let _ = self
576 .procedure_manager
577 .submit(procedure_with_id)
578 .await
579 .context(SubmitProcedureSnafu)?;
580
581 Ok(procedure_id)
582 }
583
584 pub async fn submit_ddl_task(
585 &self,
586 ctx: &ExecutorContext,
587 request: SubmitDdlTaskRequest,
588 ) -> Result<SubmitDdlTaskResponse> {
589 let span = ctx
590 .tracing_context
591 .as_ref()
592 .map(TracingContext::from_w3c)
593 .unwrap_or_else(TracingContext::from_current_span)
594 .attach(tracing::info_span!("DdlManager::submit_ddl_task"));
595 let ddl_options = DdlOptions {
596 wait: request.wait,
597 timeout: request.timeout,
598 };
599 async move {
600 debug!("Submitting Ddl task: {:?}", request.task);
601 match request.task {
602 CreateTable(create_table_task) => {
603 handle_create_table_task(self, create_table_task).await
604 }
605 DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await,
606 AlterTable(alter_table_task) => {
607 handle_alter_table_task(self, alter_table_task, ddl_options).await
608 }
609 TruncateTable(truncate_table_task) => {
610 handle_truncate_table_task(self, truncate_table_task).await
611 }
612 CreateLogicalTables(create_table_tasks) => {
613 handle_create_logical_table_tasks(self, create_table_tasks).await
614 }
615 AlterLogicalTables(alter_table_tasks) => {
616 handle_alter_logical_table_tasks(self, alter_table_tasks).await
617 }
618 DropLogicalTables(_) => todo!(),
619 CreateDatabase(create_database_task) => {
620 handle_create_database_task(self, create_database_task).await
621 }
622 DropDatabase(drop_database_task) => {
623 handle_drop_database_task(self, drop_database_task).await
624 }
625 AlterDatabase(alter_database_task) => {
626 handle_alter_database_task(self, alter_database_task).await
627 }
628 CreateFlow(create_flow_task) => {
629 handle_create_flow_task(self, create_flow_task, request.query_context).await
630 }
631 DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
632 CreateView(create_view_task) => {
633 handle_create_view_task(self, create_view_task).await
634 }
635 DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
636 CommentOn(comment_on_task) => handle_comment_on_task(self, comment_on_task).await,
637 #[cfg(feature = "enterprise")]
638 CreateTrigger(create_trigger_task) => {
639 handle_create_trigger_task(self, create_trigger_task, request.query_context)
640 .await
641 }
642 #[cfg(feature = "enterprise")]
643 DropTrigger(drop_trigger_task) => {
644 handle_drop_trigger_task(self, drop_trigger_task, request.query_context).await
645 }
646 }
647 }
648 .trace(span)
649 .await
650 }
651}
652
653async fn handle_truncate_table_task(
654 ddl_manager: &DdlManager,
655 truncate_table_task: TruncateTableTask,
656) -> Result<SubmitDdlTaskResponse> {
657 let table_id = truncate_table_task.table_id;
658 let table_metadata_manager = &ddl_manager.table_metadata_manager();
659 let table_ref = truncate_table_task.table_ref();
660
661 let (table_info_value, table_route_value) =
662 table_metadata_manager.get_full_table_info(table_id).await?;
663
664 let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu {
665 table: table_ref.to_string(),
666 })?;
667
668 let table_route_value = table_route_value.context(TableRouteNotFoundSnafu { table_id })?;
669
670 let table_route = table_route_value.into_inner().region_routes()?.clone();
671
672 let (id, _) = ddl_manager
673 .submit_truncate_table_task(truncate_table_task, table_info_value, table_route)
674 .await?;
675
676 info!("Table: {table_id} is truncated via procedure_id {id:?}");
677
678 Ok(SubmitDdlTaskResponse {
679 key: id.to_string().into(),
680 ..Default::default()
681 })
682}
683
684async fn handle_alter_table_task(
685 ddl_manager: &DdlManager,
686 alter_table_task: AlterTableTask,
687 ddl_options: DdlOptions,
688) -> Result<SubmitDdlTaskResponse> {
689 let table_ref = alter_table_task.table_ref();
690
691 let table_id = ddl_manager
692 .table_metadata_manager()
693 .table_name_manager()
694 .get(TableNameKey::new(
695 table_ref.catalog,
696 table_ref.schema,
697 table_ref.table,
698 ))
699 .await?
700 .with_context(|| TableNotFoundSnafu {
701 table_name: table_ref.to_string(),
702 })?
703 .table_id();
704
705 let table_route_value = ddl_manager
706 .table_metadata_manager()
707 .table_route_manager()
708 .table_route_storage()
709 .get(table_id)
710 .await?
711 .context(TableRouteNotFoundSnafu { table_id })?;
712 ensure!(
713 table_route_value.is_physical(),
714 UnexpectedLogicalRouteTableSnafu {
715 err_msg: format!("{:?} is a non-physical TableRouteValue.", table_ref),
716 }
717 );
718
719 let (id, _) = ddl_manager
720 .submit_alter_table_task(table_id, alter_table_task, ddl_options)
721 .await?;
722
723 info!("Table: {table_id} is altered via procedure_id {id:?}");
724
725 Ok(SubmitDdlTaskResponse {
726 key: id.to_string().into(),
727 ..Default::default()
728 })
729}
730
731async fn handle_drop_table_task(
732 ddl_manager: &DdlManager,
733 drop_table_task: DropTableTask,
734) -> Result<SubmitDdlTaskResponse> {
735 let table_id = drop_table_task.table_id;
736 let (id, _) = ddl_manager.submit_drop_table_task(drop_table_task).await?;
737
738 info!("Table: {table_id} is dropped via procedure_id {id:?}");
739
740 Ok(SubmitDdlTaskResponse {
741 key: id.to_string().into(),
742 ..Default::default()
743 })
744}
745
746async fn handle_create_table_task(
747 ddl_manager: &DdlManager,
748 create_table_task: CreateTableTask,
749) -> Result<SubmitDdlTaskResponse> {
750 let (id, output) = ddl_manager
751 .submit_create_table_task(create_table_task)
752 .await?;
753
754 let procedure_id = id.to_string();
755 let output = output.context(ProcedureOutputSnafu {
756 procedure_id: &procedure_id,
757 err_msg: "empty output",
758 })?;
759 let table_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
760 procedure_id: &procedure_id,
761 err_msg: "downcast to `u32`",
762 })?);
763 info!("Table: {table_id} is created via procedure_id {id:?}");
764
765 Ok(SubmitDdlTaskResponse {
766 key: procedure_id.into(),
767 table_ids: vec![table_id],
768 })
769}
770
771async fn handle_create_logical_table_tasks(
772 ddl_manager: &DdlManager,
773 create_table_tasks: Vec<CreateTableTask>,
774) -> Result<SubmitDdlTaskResponse> {
775 ensure!(
776 !create_table_tasks.is_empty(),
777 EmptyDdlTasksSnafu {
778 name: "create logical tables"
779 }
780 );
781 let physical_table_id = utils::check_and_get_physical_table_id(
782 ddl_manager.table_metadata_manager(),
783 &create_table_tasks,
784 )
785 .await?;
786 let num_logical_tables = create_table_tasks.len();
787
788 let (id, output) = ddl_manager
789 .submit_create_logical_table_tasks(create_table_tasks, physical_table_id)
790 .await?;
791
792 info!(
793 "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is created via procedure_id {id:?}"
794 );
795
796 let procedure_id = id.to_string();
797 let output = output.context(ProcedureOutputSnafu {
798 procedure_id: &procedure_id,
799 err_msg: "empty output",
800 })?;
801 let table_ids = output
802 .downcast_ref::<Vec<TableId>>()
803 .context(ProcedureOutputSnafu {
804 procedure_id: &procedure_id,
805 err_msg: "downcast to `Vec<TableId>`",
806 })?
807 .clone();
808
809 Ok(SubmitDdlTaskResponse {
810 key: procedure_id.into(),
811 table_ids,
812 })
813}
814
815async fn handle_create_database_task(
816 ddl_manager: &DdlManager,
817 create_database_task: CreateDatabaseTask,
818) -> Result<SubmitDdlTaskResponse> {
819 let (id, _) = ddl_manager
820 .submit_create_database(create_database_task.clone())
821 .await?;
822
823 let procedure_id = id.to_string();
824 info!(
825 "Database {}.{} is created via procedure_id {id:?}",
826 create_database_task.catalog, create_database_task.schema
827 );
828
829 Ok(SubmitDdlTaskResponse {
830 key: procedure_id.into(),
831 ..Default::default()
832 })
833}
834
835async fn handle_drop_database_task(
836 ddl_manager: &DdlManager,
837 drop_database_task: DropDatabaseTask,
838) -> Result<SubmitDdlTaskResponse> {
839 let (id, _) = ddl_manager
840 .submit_drop_database(drop_database_task.clone())
841 .await?;
842
843 let procedure_id = id.to_string();
844 info!(
845 "Database {}.{} is dropped via procedure_id {id:?}",
846 drop_database_task.catalog, drop_database_task.schema
847 );
848
849 Ok(SubmitDdlTaskResponse {
850 key: procedure_id.into(),
851 ..Default::default()
852 })
853}
854
855async fn handle_alter_database_task(
856 ddl_manager: &DdlManager,
857 alter_database_task: AlterDatabaseTask,
858) -> Result<SubmitDdlTaskResponse> {
859 let (id, _) = ddl_manager
860 .submit_alter_database(alter_database_task.clone())
861 .await?;
862
863 let procedure_id = id.to_string();
864 info!(
865 "Database {}.{} is altered via procedure_id {id:?}",
866 alter_database_task.catalog(),
867 alter_database_task.schema()
868 );
869
870 Ok(SubmitDdlTaskResponse {
871 key: procedure_id.into(),
872 ..Default::default()
873 })
874}
875
876async fn handle_drop_flow_task(
877 ddl_manager: &DdlManager,
878 drop_flow_task: DropFlowTask,
879) -> Result<SubmitDdlTaskResponse> {
880 let (id, _) = ddl_manager
881 .submit_drop_flow_task(drop_flow_task.clone())
882 .await?;
883
884 let procedure_id = id.to_string();
885 info!(
886 "Flow {}.{}({}) is dropped via procedure_id {id:?}",
887 drop_flow_task.catalog_name, drop_flow_task.flow_name, drop_flow_task.flow_id,
888 );
889
890 Ok(SubmitDdlTaskResponse {
891 key: procedure_id.into(),
892 ..Default::default()
893 })
894}
895
896#[cfg(feature = "enterprise")]
897async fn handle_drop_trigger_task(
898 ddl_manager: &DdlManager,
899 drop_trigger_task: DropTriggerTask,
900 query_context: QueryContext,
901) -> Result<SubmitDdlTaskResponse> {
902 let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
903 use crate::error::UnsupportedSnafu;
904
905 return UnsupportedSnafu {
906 operation: "drop trigger",
907 }
908 .fail();
909 };
910
911 m.drop_trigger(
912 drop_trigger_task,
913 ddl_manager.procedure_manager.clone(),
914 ddl_manager.ddl_context.clone(),
915 query_context,
916 )
917 .await
918}
919
920async fn handle_drop_view_task(
921 ddl_manager: &DdlManager,
922 drop_view_task: DropViewTask,
923) -> Result<SubmitDdlTaskResponse> {
924 let (id, _) = ddl_manager
925 .submit_drop_view_task(drop_view_task.clone())
926 .await?;
927
928 let procedure_id = id.to_string();
929 info!(
930 "View {}({}) is dropped via procedure_id {id:?}",
931 drop_view_task.table_ref(),
932 drop_view_task.view_id,
933 );
934
935 Ok(SubmitDdlTaskResponse {
936 key: procedure_id.into(),
937 ..Default::default()
938 })
939}
940
941async fn handle_create_flow_task(
942 ddl_manager: &DdlManager,
943 create_flow_task: CreateFlowTask,
944 query_context: QueryContext,
945) -> Result<SubmitDdlTaskResponse> {
946 let (id, output) = ddl_manager
947 .submit_create_flow_task(create_flow_task.clone(), query_context)
948 .await?;
949
950 let procedure_id = id.to_string();
951 let output = output.context(ProcedureOutputSnafu {
952 procedure_id: &procedure_id,
953 err_msg: "empty output",
954 })?;
955 let flow_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
956 procedure_id: &procedure_id,
957 err_msg: "downcast to `u32`",
958 })?);
959 if !create_flow_task.or_replace {
960 info!(
961 "Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
962 create_flow_task.catalog_name, create_flow_task.flow_name,
963 );
964 } else {
965 info!(
966 "Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
967 create_flow_task.catalog_name, create_flow_task.flow_name,
968 );
969 }
970
971 Ok(SubmitDdlTaskResponse {
972 key: procedure_id.into(),
973 ..Default::default()
974 })
975}
976
977#[cfg(feature = "enterprise")]
978async fn handle_create_trigger_task(
979 ddl_manager: &DdlManager,
980 create_trigger_task: CreateTriggerTask,
981 query_context: QueryContext,
982) -> Result<SubmitDdlTaskResponse> {
983 let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
984 use crate::error::UnsupportedSnafu;
985
986 return UnsupportedSnafu {
987 operation: "create trigger",
988 }
989 .fail();
990 };
991
992 m.create_trigger(
993 create_trigger_task,
994 ddl_manager.procedure_manager.clone(),
995 ddl_manager.ddl_context.clone(),
996 query_context,
997 )
998 .await
999}
1000
1001async fn handle_alter_logical_table_tasks(
1002 ddl_manager: &DdlManager,
1003 alter_table_tasks: Vec<AlterTableTask>,
1004) -> Result<SubmitDdlTaskResponse> {
1005 ensure!(
1006 !alter_table_tasks.is_empty(),
1007 EmptyDdlTasksSnafu {
1008 name: "alter logical tables"
1009 }
1010 );
1011
1012 let first_table = TableNameKey {
1014 catalog: &alter_table_tasks[0].alter_table.catalog_name,
1015 schema: &alter_table_tasks[0].alter_table.schema_name,
1016 table: &alter_table_tasks[0].alter_table.table_name,
1017 };
1018 let physical_table_id =
1019 utils::get_physical_table_id(ddl_manager.table_metadata_manager(), first_table).await?;
1020 let num_logical_tables = alter_table_tasks.len();
1021
1022 let (id, _) = ddl_manager
1023 .submit_alter_logical_table_tasks(alter_table_tasks, physical_table_id)
1024 .await?;
1025
1026 info!(
1027 "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is altered via procedure_id {id:?}"
1028 );
1029
1030 let procedure_id = id.to_string();
1031
1032 Ok(SubmitDdlTaskResponse {
1033 key: procedure_id.into(),
1034 ..Default::default()
1035 })
1036}
1037
1038async fn handle_create_view_task(
1040 ddl_manager: &DdlManager,
1041 create_view_task: CreateViewTask,
1042) -> Result<SubmitDdlTaskResponse> {
1043 let (id, output) = ddl_manager
1044 .submit_create_view_task(create_view_task)
1045 .await?;
1046
1047 let procedure_id = id.to_string();
1048 let output = output.context(ProcedureOutputSnafu {
1049 procedure_id: &procedure_id,
1050 err_msg: "empty output",
1051 })?;
1052 let view_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
1053 procedure_id: &procedure_id,
1054 err_msg: "downcast to `u32`",
1055 })?);
1056 info!("View: {view_id} is created via procedure_id {id:?}");
1057
1058 Ok(SubmitDdlTaskResponse {
1059 key: procedure_id.into(),
1060 table_ids: vec![view_id],
1061 })
1062}
1063
1064async fn handle_comment_on_task(
1065 ddl_manager: &DdlManager,
1066 comment_on_task: CommentOnTask,
1067) -> Result<SubmitDdlTaskResponse> {
1068 let (id, _) = ddl_manager
1069 .submit_comment_on_task(comment_on_task.clone())
1070 .await?;
1071
1072 let procedure_id = id.to_string();
1073 info!(
1074 "Comment on {}.{}.{} is updated via procedure_id {id:?}",
1075 comment_on_task.catalog_name, comment_on_task.schema_name, comment_on_task.object_name
1076 );
1077
1078 Ok(SubmitDdlTaskResponse {
1079 key: procedure_id.into(),
1080 ..Default::default()
1081 })
1082}
1083
1084#[cfg(test)]
1085mod tests {
1086 use std::sync::Arc;
1087 use std::time::Duration;
1088
1089 use common_error::ext::BoxedError;
1090 use common_procedure::local::LocalManager;
1091 use common_procedure::test_util::InMemoryPoisonStore;
1092 use common_procedure::{BoxedProcedure, ProcedureManagerRef};
1093 use store_api::storage::TableId;
1094 use table::table_name::TableName;
1095
1096 use super::DdlManager;
1097 use crate::cache_invalidator::DummyCacheInvalidator;
1098 use crate::ddl::alter_table::AlterTableProcedure;
1099 use crate::ddl::create_table::CreateTableProcedure;
1100 use crate::ddl::drop_table::DropTableProcedure;
1101 use crate::ddl::flow_meta::FlowMetadataAllocator;
1102 use crate::ddl::table_meta::TableMetadataAllocator;
1103 use crate::ddl::truncate_table::TruncateTableProcedure;
1104 use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
1105 use crate::ddl_manager::RepartitionProcedureFactory;
1106 use crate::key::TableMetadataManager;
1107 use crate::key::flow::FlowMetadataManager;
1108 use crate::kv_backend::memory::MemoryKvBackend;
1109 use crate::node_manager::{DatanodeManager, DatanodeRef, FlownodeManager, FlownodeRef};
1110 use crate::peer::Peer;
1111 use crate::region_keeper::MemoryRegionKeeper;
1112 use crate::region_registry::LeaderRegionRegistry;
1113 use crate::sequence::SequenceBuilder;
1114 use crate::state_store::KvStateStore;
1115 use crate::wal_provider::WalProvider;
1116
1117 pub struct DummyDatanodeManager;
1119
1120 #[async_trait::async_trait]
1121 impl DatanodeManager for DummyDatanodeManager {
1122 async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
1123 unimplemented!()
1124 }
1125 }
1126
1127 #[async_trait::async_trait]
1128 impl FlownodeManager for DummyDatanodeManager {
1129 async fn flownode(&self, _node: &Peer) -> FlownodeRef {
1130 unimplemented!()
1131 }
1132 }
1133
1134 struct DummyRepartitionProcedureFactory;
1135
1136 #[async_trait::async_trait]
1137 impl RepartitionProcedureFactory for DummyRepartitionProcedureFactory {
1138 fn create(
1139 &self,
1140 _ddl_ctx: &DdlContext,
1141 _table_name: TableName,
1142 _table_id: TableId,
1143 _from_exprs: Vec<String>,
1144 _to_exprs: Vec<String>,
1145 _timeout: Option<Duration>,
1146 ) -> std::result::Result<BoxedProcedure, BoxedError> {
1147 unimplemented!()
1148 }
1149
1150 fn register_loaders(
1151 &self,
1152 _ddl_ctx: &DdlContext,
1153 _procedure_manager: &ProcedureManagerRef,
1154 ) -> std::result::Result<(), BoxedError> {
1155 Ok(())
1156 }
1157 }
1158
1159 #[test]
1160 fn test_try_new() {
1161 let kv_backend = Arc::new(MemoryKvBackend::new());
1162 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
1163 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
1164 Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
1165 Arc::new(WalProvider::default()),
1166 ));
1167 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
1168 let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
1169 Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()),
1170 ));
1171
1172 let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
1173 let poison_manager = Arc::new(InMemoryPoisonStore::default());
1174 let procedure_manager = Arc::new(LocalManager::new(
1175 Default::default(),
1176 state_store,
1177 poison_manager,
1178 None,
1179 None,
1180 ));
1181
1182 let _ = DdlManager::try_new(
1183 DdlContext {
1184 node_manager: Arc::new(DummyDatanodeManager),
1185 cache_invalidator: Arc::new(DummyCacheInvalidator),
1186 table_metadata_manager,
1187 table_metadata_allocator,
1188 flow_metadata_manager,
1189 flow_metadata_allocator,
1190 memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
1191 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
1192 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
1193 },
1194 procedure_manager.clone(),
1195 Arc::new(DummyRepartitionProcedureFactory),
1196 true,
1197 );
1198
1199 let expected_loaders = vec![
1200 CreateTableProcedure::TYPE_NAME,
1201 AlterTableProcedure::TYPE_NAME,
1202 DropTableProcedure::TYPE_NAME,
1203 TruncateTableProcedure::TYPE_NAME,
1204 ];
1205
1206 for loader in expected_loaders {
1207 assert!(procedure_manager.contains_loader(loader));
1208 }
1209 }
1210}