1use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::Repartition;
19use api::v1::alter_table_expr::Kind;
20use common_error::ext::BoxedError;
21use common_procedure::{
22 BoxedProcedure, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef,
23 ProcedureWithId, watcher,
24};
25use common_telemetry::tracing_context::{FutureExt, TracingContext};
26use common_telemetry::{debug, info, tracing};
27use derive_builder::Builder;
28use snafu::{OptionExt, ResultExt, ensure};
29use store_api::storage::TableId;
30use table::table_name::TableName;
31
32use crate::ddl::alter_database::AlterDatabaseProcedure;
33use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
34use crate::ddl::alter_table::AlterTableProcedure;
35use crate::ddl::comment_on::CommentOnProcedure;
36use crate::ddl::create_database::CreateDatabaseProcedure;
37use crate::ddl::create_flow::CreateFlowProcedure;
38use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
39use crate::ddl::create_table::CreateTableProcedure;
40use crate::ddl::create_view::CreateViewProcedure;
41use crate::ddl::drop_database::DropDatabaseProcedure;
42use crate::ddl::drop_flow::DropFlowProcedure;
43use crate::ddl::drop_table::DropTableProcedure;
44use crate::ddl::drop_view::DropViewProcedure;
45use crate::ddl::truncate_table::TruncateTableProcedure;
46use crate::ddl::{DdlContext, utils};
47use crate::error::{
48 CreateRepartitionProcedureSnafu, EmptyDdlTasksSnafu, ProcedureOutputSnafu,
49 RegisterProcedureLoaderSnafu, RegisterRepartitionProcedureLoaderSnafu, Result,
50 SubmitProcedureSnafu, TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
51 UnexpectedLogicalRouteTableSnafu, WaitProcedureSnafu,
52};
53use crate::key::table_info::TableInfoValue;
54use crate::key::table_name::TableNameKey;
55use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
56use crate::procedure_executor::ExecutorContext;
57#[cfg(feature = "enterprise")]
58use crate::rpc::ddl::DdlTask::CreateTrigger;
59#[cfg(feature = "enterprise")]
60use crate::rpc::ddl::DdlTask::DropTrigger;
61use crate::rpc::ddl::DdlTask::{
62 AlterDatabase, AlterLogicalTables, AlterTable, CommentOn, CreateDatabase, CreateFlow,
63 CreateLogicalTables, CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables,
64 DropTable, DropView, TruncateTable,
65};
66#[cfg(feature = "enterprise")]
67use crate::rpc::ddl::trigger::CreateTriggerTask;
68#[cfg(feature = "enterprise")]
69use crate::rpc::ddl::trigger::DropTriggerTask;
70use crate::rpc::ddl::{
71 AlterDatabaseTask, AlterTableTask, CommentOnTask, CreateDatabaseTask, CreateFlowTask,
72 CreateTableTask, CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask,
73 QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
74};
75use crate::rpc::router::RegionRoute;
76
77#[async_trait::async_trait]
79pub trait DdlManagerConfigurator<C>: Send + Sync {
80 async fn configure(
82 &self,
83 ddl_manager: DdlManager,
84 ctx: C,
85 ) -> std::result::Result<DdlManager, BoxedError>;
86}
87
88pub type DdlManagerConfiguratorRef<C> = Arc<dyn DdlManagerConfigurator<C>>;
89
90pub type DdlManagerRef = Arc<DdlManager>;
91
92pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader;
93
94#[derive(Builder)]
96pub struct DdlManager {
97 ddl_context: DdlContext,
98 procedure_manager: ProcedureManagerRef,
99 repartition_procedure_factory: RepartitionProcedureFactoryRef,
100 #[cfg(feature = "enterprise")]
101 trigger_ddl_manager: Option<TriggerDdlManagerRef>,
102}
103
104#[cfg(feature = "enterprise")]
107#[async_trait::async_trait]
108pub trait TriggerDdlManager: Send + Sync {
109 async fn create_trigger(
110 &self,
111 create_trigger_task: CreateTriggerTask,
112 procedure_manager: ProcedureManagerRef,
113 ddl_context: DdlContext,
114 query_context: QueryContext,
115 ) -> Result<SubmitDdlTaskResponse>;
116
117 async fn drop_trigger(
118 &self,
119 drop_trigger_task: DropTriggerTask,
120 procedure_manager: ProcedureManagerRef,
121 ddl_context: DdlContext,
122 query_context: QueryContext,
123 ) -> Result<SubmitDdlTaskResponse>;
124
125 fn as_any(&self) -> &dyn std::any::Any;
126}
127
128#[cfg(feature = "enterprise")]
129pub type TriggerDdlManagerRef = Arc<dyn TriggerDdlManager>;
130
131macro_rules! procedure_loader_entry {
132 ($procedure:ident) => {
133 (
134 $procedure::TYPE_NAME,
135 &|context: DdlContext| -> BoxedProcedureLoader {
136 Box::new(move |json: &str| {
137 let context = context.clone();
138 $procedure::from_json(json, context).map(|p| Box::new(p) as _)
139 })
140 },
141 )
142 };
143}
144
145macro_rules! procedure_loader {
146 ($($procedure:ident),*) => {
147 vec![
148 $(procedure_loader_entry!($procedure)),*
149 ]
150 };
151}
152
153pub type RepartitionProcedureFactoryRef = Arc<dyn RepartitionProcedureFactory>;
154
155pub trait RepartitionProcedureFactory: Send + Sync {
156 fn create(
157 &self,
158 ddl_ctx: &DdlContext,
159 table_name: TableName,
160 table_id: TableId,
161 from_exprs: Vec<String>,
162 to_exprs: Vec<String>,
163 timeout: Option<Duration>,
164 ) -> std::result::Result<BoxedProcedure, BoxedError>;
165
166 fn register_loaders(
167 &self,
168 ddl_ctx: &DdlContext,
169 procedure_manager: &ProcedureManagerRef,
170 ) -> std::result::Result<(), BoxedError>;
171}
172
173#[derive(Debug, Clone, Copy)]
178pub struct DdlOptions {
179 pub timeout: Duration,
183 pub wait: bool,
190}
191
192impl DdlManager {
193 pub fn try_new(
195 ddl_context: DdlContext,
196 procedure_manager: ProcedureManagerRef,
197 repartition_procedure_factory: RepartitionProcedureFactoryRef,
198 register_loaders: bool,
199 ) -> Result<Self> {
200 let manager = Self {
201 ddl_context,
202 procedure_manager,
203 repartition_procedure_factory,
204 #[cfg(feature = "enterprise")]
205 trigger_ddl_manager: None,
206 };
207 if register_loaders {
208 manager.register_loaders()?;
209 }
210 Ok(manager)
211 }
212
213 #[cfg(feature = "enterprise")]
214 pub fn with_trigger_ddl_manager(mut self, trigger_ddl_manager: TriggerDdlManagerRef) -> Self {
215 self.trigger_ddl_manager = Some(trigger_ddl_manager);
216 self
217 }
218
219 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
221 &self.ddl_context.table_metadata_manager
222 }
223
224 pub fn create_context(&self) -> DdlContext {
226 self.ddl_context.clone()
227 }
228
229 pub fn register_loaders(&self) -> Result<()> {
231 let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = procedure_loader!(
232 CreateTableProcedure,
233 CreateLogicalTablesProcedure,
234 CreateViewProcedure,
235 CreateFlowProcedure,
236 AlterTableProcedure,
237 AlterLogicalTablesProcedure,
238 AlterDatabaseProcedure,
239 DropTableProcedure,
240 DropFlowProcedure,
241 TruncateTableProcedure,
242 CreateDatabaseProcedure,
243 DropDatabaseProcedure,
244 DropViewProcedure,
245 CommentOnProcedure
246 );
247
248 for (type_name, loader_factory) in loaders {
249 let context = self.create_context();
250 self.procedure_manager
251 .register_loader(type_name, loader_factory(context))
252 .context(RegisterProcedureLoaderSnafu { type_name })?;
253 }
254
255 self.repartition_procedure_factory
256 .register_loaders(&self.ddl_context, &self.procedure_manager)
257 .context(RegisterRepartitionProcedureLoaderSnafu)?;
258
259 Ok(())
260 }
261
262 async fn submit_repartition_task(
281 &self,
282 table_id: TableId,
283 table_name: TableName,
284 Repartition {
285 from_partition_exprs,
286 into_partition_exprs,
287 }: Repartition,
288 wait: bool,
289 timeout: Duration,
290 ) -> Result<(ProcedureId, Option<Output>)> {
291 let context = self.create_context();
292
293 let procedure = self
294 .repartition_procedure_factory
295 .create(
296 &context,
297 table_name,
298 table_id,
299 from_partition_exprs,
300 into_partition_exprs,
301 Some(timeout),
302 )
303 .context(CreateRepartitionProcedureSnafu)?;
304 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
305 if wait {
306 self.execute_procedure_and_wait(procedure_with_id).await
307 } else {
308 self.submit_procedure(procedure_with_id)
309 .await
310 .map(|p| (p, None))
311 }
312 }
313
314 #[tracing::instrument(skip_all)]
316 pub async fn submit_alter_table_task(
317 &self,
318 table_id: TableId,
319 alter_table_task: AlterTableTask,
320 ddl_options: DdlOptions,
321 ) -> Result<(ProcedureId, Option<Output>)> {
322 let mut alter_table_task = alter_table_task;
324 if let Some(Kind::Repartition(_)) = alter_table_task.alter_table.kind.as_ref()
325 && let Kind::Repartition(repartition) =
326 alter_table_task.alter_table.kind.take().unwrap()
327 {
328 let table_name = TableName::new(
329 alter_table_task.alter_table.catalog_name,
330 alter_table_task.alter_table.schema_name,
331 alter_table_task.alter_table.table_name,
332 );
333 return self
334 .submit_repartition_task(
335 table_id,
336 table_name,
337 repartition,
338 ddl_options.wait,
339 ddl_options.timeout,
340 )
341 .await;
342 }
343
344 let context = self.create_context();
345 let procedure = AlterTableProcedure::new(table_id, alter_table_task, context)?;
346
347 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
348
349 self.execute_procedure_and_wait(procedure_with_id).await
350 }
351
352 #[tracing::instrument(skip_all)]
354 pub async fn submit_create_table_task(
355 &self,
356 create_table_task: CreateTableTask,
357 ) -> Result<(ProcedureId, Option<Output>)> {
358 let context = self.create_context();
359
360 let procedure = CreateTableProcedure::new(create_table_task, context)?;
361
362 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
363
364 self.execute_procedure_and_wait(procedure_with_id).await
365 }
366
367 #[tracing::instrument(skip_all)]
369 pub async fn submit_create_view_task(
370 &self,
371 create_view_task: CreateViewTask,
372 ) -> Result<(ProcedureId, Option<Output>)> {
373 let context = self.create_context();
374
375 let procedure = CreateViewProcedure::new(create_view_task, context);
376
377 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
378
379 self.execute_procedure_and_wait(procedure_with_id).await
380 }
381
382 #[tracing::instrument(skip_all)]
384 pub async fn submit_create_logical_table_tasks(
385 &self,
386 create_table_tasks: Vec<CreateTableTask>,
387 physical_table_id: TableId,
388 ) -> Result<(ProcedureId, Option<Output>)> {
389 let context = self.create_context();
390
391 let procedure =
392 CreateLogicalTablesProcedure::new(create_table_tasks, physical_table_id, context);
393
394 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
395
396 self.execute_procedure_and_wait(procedure_with_id).await
397 }
398
399 #[tracing::instrument(skip_all)]
401 pub async fn submit_alter_logical_table_tasks(
402 &self,
403 alter_table_tasks: Vec<AlterTableTask>,
404 physical_table_id: TableId,
405 ) -> Result<(ProcedureId, Option<Output>)> {
406 let context = self.create_context();
407
408 let procedure =
409 AlterLogicalTablesProcedure::new(alter_table_tasks, physical_table_id, context);
410
411 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
412
413 self.execute_procedure_and_wait(procedure_with_id).await
414 }
415
416 #[tracing::instrument(skip_all)]
418 pub async fn submit_drop_table_task(
419 &self,
420 drop_table_task: DropTableTask,
421 ) -> Result<(ProcedureId, Option<Output>)> {
422 let context = self.create_context();
423
424 let procedure = DropTableProcedure::new(drop_table_task, context);
425
426 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
427
428 self.execute_procedure_and_wait(procedure_with_id).await
429 }
430
431 #[tracing::instrument(skip_all)]
433 pub async fn submit_create_database(
434 &self,
435 CreateDatabaseTask {
436 catalog,
437 schema,
438 create_if_not_exists,
439 options,
440 }: CreateDatabaseTask,
441 ) -> Result<(ProcedureId, Option<Output>)> {
442 let context = self.create_context();
443 let procedure =
444 CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context);
445 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
446
447 self.execute_procedure_and_wait(procedure_with_id).await
448 }
449
450 #[tracing::instrument(skip_all)]
452 pub async fn submit_drop_database(
453 &self,
454 DropDatabaseTask {
455 catalog,
456 schema,
457 drop_if_exists,
458 }: DropDatabaseTask,
459 ) -> Result<(ProcedureId, Option<Output>)> {
460 let context = self.create_context();
461 let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context);
462 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
463
464 self.execute_procedure_and_wait(procedure_with_id).await
465 }
466
467 pub async fn submit_alter_database(
468 &self,
469 alter_database_task: AlterDatabaseTask,
470 ) -> Result<(ProcedureId, Option<Output>)> {
471 let context = self.create_context();
472 let procedure = AlterDatabaseProcedure::new(alter_database_task, context)?;
473 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
474
475 self.execute_procedure_and_wait(procedure_with_id).await
476 }
477
478 #[tracing::instrument(skip_all)]
480 pub async fn submit_create_flow_task(
481 &self,
482 create_flow: CreateFlowTask,
483 query_context: QueryContext,
484 ) -> Result<(ProcedureId, Option<Output>)> {
485 let context = self.create_context();
486 let procedure = CreateFlowProcedure::new(create_flow, query_context, context);
487 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
488
489 self.execute_procedure_and_wait(procedure_with_id).await
490 }
491
492 #[tracing::instrument(skip_all)]
494 pub async fn submit_drop_flow_task(
495 &self,
496 drop_flow: DropFlowTask,
497 ) -> Result<(ProcedureId, Option<Output>)> {
498 let context = self.create_context();
499 let procedure = DropFlowProcedure::new(drop_flow, context);
500 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
501
502 self.execute_procedure_and_wait(procedure_with_id).await
503 }
504
505 #[tracing::instrument(skip_all)]
507 pub async fn submit_drop_view_task(
508 &self,
509 drop_view: DropViewTask,
510 ) -> Result<(ProcedureId, Option<Output>)> {
511 let context = self.create_context();
512 let procedure = DropViewProcedure::new(drop_view, context);
513 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
514
515 self.execute_procedure_and_wait(procedure_with_id).await
516 }
517
518 #[tracing::instrument(skip_all)]
520 pub async fn submit_truncate_table_task(
521 &self,
522 truncate_table_task: TruncateTableTask,
523 table_info_value: DeserializedValueWithBytes<TableInfoValue>,
524 region_routes: Vec<RegionRoute>,
525 ) -> Result<(ProcedureId, Option<Output>)> {
526 let context = self.create_context();
527 let procedure = TruncateTableProcedure::new(
528 truncate_table_task,
529 table_info_value,
530 region_routes,
531 context,
532 );
533
534 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
535
536 self.execute_procedure_and_wait(procedure_with_id).await
537 }
538
539 #[tracing::instrument(skip_all)]
541 pub async fn submit_comment_on_task(
542 &self,
543 comment_on_task: CommentOnTask,
544 ) -> Result<(ProcedureId, Option<Output>)> {
545 let context = self.create_context();
546 let procedure = CommentOnProcedure::new(comment_on_task, context);
547 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
548
549 self.execute_procedure_and_wait(procedure_with_id).await
550 }
551
552 async fn execute_procedure_and_wait(
554 &self,
555 procedure_with_id: ProcedureWithId,
556 ) -> Result<(ProcedureId, Option<Output>)> {
557 let procedure_id = procedure_with_id.id;
558
559 let mut watcher = self
560 .procedure_manager
561 .submit(procedure_with_id)
562 .await
563 .context(SubmitProcedureSnafu)?;
564
565 let output = watcher::wait(&mut watcher)
566 .await
567 .context(WaitProcedureSnafu)?;
568
569 Ok((procedure_id, output))
570 }
571
572 async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result<ProcedureId> {
574 let procedure_id = procedure_with_id.id;
575 let _ = self
576 .procedure_manager
577 .submit(procedure_with_id)
578 .await
579 .context(SubmitProcedureSnafu)?;
580
581 Ok(procedure_id)
582 }
583
584 pub async fn submit_ddl_task(
585 &self,
586 ctx: &ExecutorContext,
587 request: SubmitDdlTaskRequest,
588 ) -> Result<SubmitDdlTaskResponse> {
589 let span = ctx
590 .tracing_context
591 .as_ref()
592 .map(TracingContext::from_w3c)
593 .unwrap_or_else(TracingContext::from_current_span)
594 .attach(tracing::info_span!("DdlManager::submit_ddl_task"));
595 let ddl_options = DdlOptions {
596 wait: request.wait,
597 timeout: request.timeout,
598 };
599 async move {
600 debug!("Submitting Ddl task: {:?}", request.task);
601 match request.task {
602 CreateTable(create_table_task) => {
603 handle_create_table_task(self, create_table_task).await
604 }
605 DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await,
606 AlterTable(alter_table_task) => {
607 handle_alter_table_task(self, alter_table_task, ddl_options).await
608 }
609 TruncateTable(truncate_table_task) => {
610 handle_truncate_table_task(self, truncate_table_task).await
611 }
612 CreateLogicalTables(create_table_tasks) => {
613 handle_create_logical_table_tasks(self, create_table_tasks).await
614 }
615 AlterLogicalTables(alter_table_tasks) => {
616 handle_alter_logical_table_tasks(self, alter_table_tasks).await
617 }
618 DropLogicalTables(_) => todo!(),
619 CreateDatabase(create_database_task) => {
620 handle_create_database_task(self, create_database_task).await
621 }
622 DropDatabase(drop_database_task) => {
623 handle_drop_database_task(self, drop_database_task).await
624 }
625 AlterDatabase(alter_database_task) => {
626 handle_alter_database_task(self, alter_database_task).await
627 }
628 CreateFlow(create_flow_task) => {
629 handle_create_flow_task(self, create_flow_task, request.query_context.into())
630 .await
631 }
632 DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
633 CreateView(create_view_task) => {
634 handle_create_view_task(self, create_view_task).await
635 }
636 DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
637 CommentOn(comment_on_task) => handle_comment_on_task(self, comment_on_task).await,
638 #[cfg(feature = "enterprise")]
639 CreateTrigger(create_trigger_task) => {
640 handle_create_trigger_task(
641 self,
642 create_trigger_task,
643 request.query_context.into(),
644 )
645 .await
646 }
647 #[cfg(feature = "enterprise")]
648 DropTrigger(drop_trigger_task) => {
649 handle_drop_trigger_task(self, drop_trigger_task, request.query_context.into())
650 .await
651 }
652 }
653 }
654 .trace(span)
655 .await
656 }
657}
658
659async fn handle_truncate_table_task(
660 ddl_manager: &DdlManager,
661 truncate_table_task: TruncateTableTask,
662) -> Result<SubmitDdlTaskResponse> {
663 let table_id = truncate_table_task.table_id;
664 let table_metadata_manager = &ddl_manager.table_metadata_manager();
665 let table_ref = truncate_table_task.table_ref();
666
667 let (table_info_value, table_route_value) =
668 table_metadata_manager.get_full_table_info(table_id).await?;
669
670 let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu {
671 table: table_ref.to_string(),
672 })?;
673
674 let table_route_value = table_route_value.context(TableRouteNotFoundSnafu { table_id })?;
675
676 let table_route = table_route_value.into_inner().region_routes()?.clone();
677
678 let (id, _) = ddl_manager
679 .submit_truncate_table_task(truncate_table_task, table_info_value, table_route)
680 .await?;
681
682 info!("Table: {table_id} is truncated via procedure_id {id:?}");
683
684 Ok(SubmitDdlTaskResponse {
685 key: id.to_string().into(),
686 ..Default::default()
687 })
688}
689
690async fn handle_alter_table_task(
691 ddl_manager: &DdlManager,
692 alter_table_task: AlterTableTask,
693 ddl_options: DdlOptions,
694) -> Result<SubmitDdlTaskResponse> {
695 let table_ref = alter_table_task.table_ref();
696
697 let table_id = ddl_manager
698 .table_metadata_manager()
699 .table_name_manager()
700 .get(TableNameKey::new(
701 table_ref.catalog,
702 table_ref.schema,
703 table_ref.table,
704 ))
705 .await?
706 .with_context(|| TableNotFoundSnafu {
707 table_name: table_ref.to_string(),
708 })?
709 .table_id();
710
711 let table_route_value = ddl_manager
712 .table_metadata_manager()
713 .table_route_manager()
714 .table_route_storage()
715 .get(table_id)
716 .await?
717 .context(TableRouteNotFoundSnafu { table_id })?;
718 ensure!(
719 table_route_value.is_physical(),
720 UnexpectedLogicalRouteTableSnafu {
721 err_msg: format!("{:?} is a non-physical TableRouteValue.", table_ref),
722 }
723 );
724
725 let (id, _) = ddl_manager
726 .submit_alter_table_task(table_id, alter_table_task, ddl_options)
727 .await?;
728
729 info!("Table: {table_id} is altered via procedure_id {id:?}");
730
731 Ok(SubmitDdlTaskResponse {
732 key: id.to_string().into(),
733 ..Default::default()
734 })
735}
736
737async fn handle_drop_table_task(
738 ddl_manager: &DdlManager,
739 drop_table_task: DropTableTask,
740) -> Result<SubmitDdlTaskResponse> {
741 let table_id = drop_table_task.table_id;
742 let (id, _) = ddl_manager.submit_drop_table_task(drop_table_task).await?;
743
744 info!("Table: {table_id} is dropped via procedure_id {id:?}");
745
746 Ok(SubmitDdlTaskResponse {
747 key: id.to_string().into(),
748 ..Default::default()
749 })
750}
751
752async fn handle_create_table_task(
753 ddl_manager: &DdlManager,
754 create_table_task: CreateTableTask,
755) -> Result<SubmitDdlTaskResponse> {
756 let (id, output) = ddl_manager
757 .submit_create_table_task(create_table_task)
758 .await?;
759
760 let procedure_id = id.to_string();
761 let output = output.context(ProcedureOutputSnafu {
762 procedure_id: &procedure_id,
763 err_msg: "empty output",
764 })?;
765 let table_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
766 procedure_id: &procedure_id,
767 err_msg: "downcast to `u32`",
768 })?);
769 info!("Table: {table_id} is created via procedure_id {id:?}");
770
771 Ok(SubmitDdlTaskResponse {
772 key: procedure_id.into(),
773 table_ids: vec![table_id],
774 })
775}
776
777async fn handle_create_logical_table_tasks(
778 ddl_manager: &DdlManager,
779 create_table_tasks: Vec<CreateTableTask>,
780) -> Result<SubmitDdlTaskResponse> {
781 ensure!(
782 !create_table_tasks.is_empty(),
783 EmptyDdlTasksSnafu {
784 name: "create logical tables"
785 }
786 );
787 let physical_table_id = utils::check_and_get_physical_table_id(
788 ddl_manager.table_metadata_manager(),
789 &create_table_tasks,
790 )
791 .await?;
792 let num_logical_tables = create_table_tasks.len();
793
794 let (id, output) = ddl_manager
795 .submit_create_logical_table_tasks(create_table_tasks, physical_table_id)
796 .await?;
797
798 info!(
799 "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is created via procedure_id {id:?}"
800 );
801
802 let procedure_id = id.to_string();
803 let output = output.context(ProcedureOutputSnafu {
804 procedure_id: &procedure_id,
805 err_msg: "empty output",
806 })?;
807 let table_ids = output
808 .downcast_ref::<Vec<TableId>>()
809 .context(ProcedureOutputSnafu {
810 procedure_id: &procedure_id,
811 err_msg: "downcast to `Vec<TableId>`",
812 })?
813 .clone();
814
815 Ok(SubmitDdlTaskResponse {
816 key: procedure_id.into(),
817 table_ids,
818 })
819}
820
821async fn handle_create_database_task(
822 ddl_manager: &DdlManager,
823 create_database_task: CreateDatabaseTask,
824) -> Result<SubmitDdlTaskResponse> {
825 let (id, _) = ddl_manager
826 .submit_create_database(create_database_task.clone())
827 .await?;
828
829 let procedure_id = id.to_string();
830 info!(
831 "Database {}.{} is created via procedure_id {id:?}",
832 create_database_task.catalog, create_database_task.schema
833 );
834
835 Ok(SubmitDdlTaskResponse {
836 key: procedure_id.into(),
837 ..Default::default()
838 })
839}
840
841async fn handle_drop_database_task(
842 ddl_manager: &DdlManager,
843 drop_database_task: DropDatabaseTask,
844) -> Result<SubmitDdlTaskResponse> {
845 let (id, _) = ddl_manager
846 .submit_drop_database(drop_database_task.clone())
847 .await?;
848
849 let procedure_id = id.to_string();
850 info!(
851 "Database {}.{} is dropped via procedure_id {id:?}",
852 drop_database_task.catalog, drop_database_task.schema
853 );
854
855 Ok(SubmitDdlTaskResponse {
856 key: procedure_id.into(),
857 ..Default::default()
858 })
859}
860
861async fn handle_alter_database_task(
862 ddl_manager: &DdlManager,
863 alter_database_task: AlterDatabaseTask,
864) -> Result<SubmitDdlTaskResponse> {
865 let (id, _) = ddl_manager
866 .submit_alter_database(alter_database_task.clone())
867 .await?;
868
869 let procedure_id = id.to_string();
870 info!(
871 "Database {}.{} is altered via procedure_id {id:?}",
872 alter_database_task.catalog(),
873 alter_database_task.schema()
874 );
875
876 Ok(SubmitDdlTaskResponse {
877 key: procedure_id.into(),
878 ..Default::default()
879 })
880}
881
882async fn handle_drop_flow_task(
883 ddl_manager: &DdlManager,
884 drop_flow_task: DropFlowTask,
885) -> Result<SubmitDdlTaskResponse> {
886 let (id, _) = ddl_manager
887 .submit_drop_flow_task(drop_flow_task.clone())
888 .await?;
889
890 let procedure_id = id.to_string();
891 info!(
892 "Flow {}.{}({}) is dropped via procedure_id {id:?}",
893 drop_flow_task.catalog_name, drop_flow_task.flow_name, drop_flow_task.flow_id,
894 );
895
896 Ok(SubmitDdlTaskResponse {
897 key: procedure_id.into(),
898 ..Default::default()
899 })
900}
901
902#[cfg(feature = "enterprise")]
903async fn handle_drop_trigger_task(
904 ddl_manager: &DdlManager,
905 drop_trigger_task: DropTriggerTask,
906 query_context: QueryContext,
907) -> Result<SubmitDdlTaskResponse> {
908 let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
909 use crate::error::UnsupportedSnafu;
910
911 return UnsupportedSnafu {
912 operation: "drop trigger",
913 }
914 .fail();
915 };
916
917 m.drop_trigger(
918 drop_trigger_task,
919 ddl_manager.procedure_manager.clone(),
920 ddl_manager.ddl_context.clone(),
921 query_context,
922 )
923 .await
924}
925
926async fn handle_drop_view_task(
927 ddl_manager: &DdlManager,
928 drop_view_task: DropViewTask,
929) -> Result<SubmitDdlTaskResponse> {
930 let (id, _) = ddl_manager
931 .submit_drop_view_task(drop_view_task.clone())
932 .await?;
933
934 let procedure_id = id.to_string();
935 info!(
936 "View {}({}) is dropped via procedure_id {id:?}",
937 drop_view_task.table_ref(),
938 drop_view_task.view_id,
939 );
940
941 Ok(SubmitDdlTaskResponse {
942 key: procedure_id.into(),
943 ..Default::default()
944 })
945}
946
947async fn handle_create_flow_task(
948 ddl_manager: &DdlManager,
949 create_flow_task: CreateFlowTask,
950 query_context: QueryContext,
951) -> Result<SubmitDdlTaskResponse> {
952 let (id, output) = ddl_manager
953 .submit_create_flow_task(create_flow_task.clone(), query_context)
954 .await?;
955
956 let procedure_id = id.to_string();
957 let output = output.context(ProcedureOutputSnafu {
958 procedure_id: &procedure_id,
959 err_msg: "empty output",
960 })?;
961 let flow_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
962 procedure_id: &procedure_id,
963 err_msg: "downcast to `u32`",
964 })?);
965 if !create_flow_task.or_replace {
966 info!(
967 "Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
968 create_flow_task.catalog_name, create_flow_task.flow_name,
969 );
970 } else {
971 info!(
972 "Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
973 create_flow_task.catalog_name, create_flow_task.flow_name,
974 );
975 }
976
977 Ok(SubmitDdlTaskResponse {
978 key: procedure_id.into(),
979 ..Default::default()
980 })
981}
982
983#[cfg(feature = "enterprise")]
984async fn handle_create_trigger_task(
985 ddl_manager: &DdlManager,
986 create_trigger_task: CreateTriggerTask,
987 query_context: QueryContext,
988) -> Result<SubmitDdlTaskResponse> {
989 let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
990 use crate::error::UnsupportedSnafu;
991
992 return UnsupportedSnafu {
993 operation: "create trigger",
994 }
995 .fail();
996 };
997
998 m.create_trigger(
999 create_trigger_task,
1000 ddl_manager.procedure_manager.clone(),
1001 ddl_manager.ddl_context.clone(),
1002 query_context,
1003 )
1004 .await
1005}
1006
1007async fn handle_alter_logical_table_tasks(
1008 ddl_manager: &DdlManager,
1009 alter_table_tasks: Vec<AlterTableTask>,
1010) -> Result<SubmitDdlTaskResponse> {
1011 ensure!(
1012 !alter_table_tasks.is_empty(),
1013 EmptyDdlTasksSnafu {
1014 name: "alter logical tables"
1015 }
1016 );
1017
1018 let first_table = TableNameKey {
1020 catalog: &alter_table_tasks[0].alter_table.catalog_name,
1021 schema: &alter_table_tasks[0].alter_table.schema_name,
1022 table: &alter_table_tasks[0].alter_table.table_name,
1023 };
1024 let physical_table_id =
1025 utils::get_physical_table_id(ddl_manager.table_metadata_manager(), first_table).await?;
1026 let num_logical_tables = alter_table_tasks.len();
1027
1028 let (id, _) = ddl_manager
1029 .submit_alter_logical_table_tasks(alter_table_tasks, physical_table_id)
1030 .await?;
1031
1032 info!(
1033 "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is altered via procedure_id {id:?}"
1034 );
1035
1036 let procedure_id = id.to_string();
1037
1038 Ok(SubmitDdlTaskResponse {
1039 key: procedure_id.into(),
1040 ..Default::default()
1041 })
1042}
1043
1044async fn handle_create_view_task(
1046 ddl_manager: &DdlManager,
1047 create_view_task: CreateViewTask,
1048) -> Result<SubmitDdlTaskResponse> {
1049 let (id, output) = ddl_manager
1050 .submit_create_view_task(create_view_task)
1051 .await?;
1052
1053 let procedure_id = id.to_string();
1054 let output = output.context(ProcedureOutputSnafu {
1055 procedure_id: &procedure_id,
1056 err_msg: "empty output",
1057 })?;
1058 let view_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
1059 procedure_id: &procedure_id,
1060 err_msg: "downcast to `u32`",
1061 })?);
1062 info!("View: {view_id} is created via procedure_id {id:?}");
1063
1064 Ok(SubmitDdlTaskResponse {
1065 key: procedure_id.into(),
1066 table_ids: vec![view_id],
1067 })
1068}
1069
1070async fn handle_comment_on_task(
1071 ddl_manager: &DdlManager,
1072 comment_on_task: CommentOnTask,
1073) -> Result<SubmitDdlTaskResponse> {
1074 let (id, _) = ddl_manager
1075 .submit_comment_on_task(comment_on_task.clone())
1076 .await?;
1077
1078 let procedure_id = id.to_string();
1079 info!(
1080 "Comment on {}.{}.{} is updated via procedure_id {id:?}",
1081 comment_on_task.catalog_name, comment_on_task.schema_name, comment_on_task.object_name
1082 );
1083
1084 Ok(SubmitDdlTaskResponse {
1085 key: procedure_id.into(),
1086 ..Default::default()
1087 })
1088}
1089
1090#[cfg(test)]
1091mod tests {
1092 use std::sync::Arc;
1093 use std::time::Duration;
1094
1095 use common_error::ext::BoxedError;
1096 use common_procedure::local::LocalManager;
1097 use common_procedure::test_util::InMemoryPoisonStore;
1098 use common_procedure::{BoxedProcedure, ProcedureManagerRef};
1099 use store_api::storage::TableId;
1100 use table::table_name::TableName;
1101
1102 use super::DdlManager;
1103 use crate::cache_invalidator::DummyCacheInvalidator;
1104 use crate::ddl::alter_table::AlterTableProcedure;
1105 use crate::ddl::create_table::CreateTableProcedure;
1106 use crate::ddl::drop_table::DropTableProcedure;
1107 use crate::ddl::flow_meta::FlowMetadataAllocator;
1108 use crate::ddl::table_meta::TableMetadataAllocator;
1109 use crate::ddl::truncate_table::TruncateTableProcedure;
1110 use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
1111 use crate::ddl_manager::RepartitionProcedureFactory;
1112 use crate::key::TableMetadataManager;
1113 use crate::key::flow::FlowMetadataManager;
1114 use crate::kv_backend::memory::MemoryKvBackend;
1115 use crate::node_manager::{DatanodeManager, DatanodeRef, FlownodeManager, FlownodeRef};
1116 use crate::peer::Peer;
1117 use crate::region_keeper::MemoryRegionKeeper;
1118 use crate::region_registry::LeaderRegionRegistry;
1119 use crate::sequence::SequenceBuilder;
1120 use crate::state_store::KvStateStore;
1121 use crate::wal_provider::WalProvider;
1122
1123 pub struct DummyDatanodeManager;
1125
1126 #[async_trait::async_trait]
1127 impl DatanodeManager for DummyDatanodeManager {
1128 async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
1129 unimplemented!()
1130 }
1131 }
1132
1133 #[async_trait::async_trait]
1134 impl FlownodeManager for DummyDatanodeManager {
1135 async fn flownode(&self, _node: &Peer) -> FlownodeRef {
1136 unimplemented!()
1137 }
1138 }
1139
1140 struct DummyRepartitionProcedureFactory;
1141
1142 #[async_trait::async_trait]
1143 impl RepartitionProcedureFactory for DummyRepartitionProcedureFactory {
1144 fn create(
1145 &self,
1146 _ddl_ctx: &DdlContext,
1147 _table_name: TableName,
1148 _table_id: TableId,
1149 _from_exprs: Vec<String>,
1150 _to_exprs: Vec<String>,
1151 _timeout: Option<Duration>,
1152 ) -> std::result::Result<BoxedProcedure, BoxedError> {
1153 unimplemented!()
1154 }
1155
1156 fn register_loaders(
1157 &self,
1158 _ddl_ctx: &DdlContext,
1159 _procedure_manager: &ProcedureManagerRef,
1160 ) -> std::result::Result<(), BoxedError> {
1161 Ok(())
1162 }
1163 }
1164
1165 #[test]
1166 fn test_try_new() {
1167 let kv_backend = Arc::new(MemoryKvBackend::new());
1168 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
1169 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
1170 Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
1171 Arc::new(WalProvider::default()),
1172 ));
1173 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
1174 let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
1175 Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()),
1176 ));
1177
1178 let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
1179 let poison_manager = Arc::new(InMemoryPoisonStore::default());
1180 let procedure_manager = Arc::new(LocalManager::new(
1181 Default::default(),
1182 state_store,
1183 poison_manager,
1184 None,
1185 None,
1186 ));
1187
1188 let _ = DdlManager::try_new(
1189 DdlContext {
1190 node_manager: Arc::new(DummyDatanodeManager),
1191 cache_invalidator: Arc::new(DummyCacheInvalidator),
1192 table_metadata_manager,
1193 table_metadata_allocator,
1194 flow_metadata_manager,
1195 flow_metadata_allocator,
1196 memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
1197 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
1198 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
1199 },
1200 procedure_manager.clone(),
1201 Arc::new(DummyRepartitionProcedureFactory),
1202 true,
1203 );
1204
1205 let expected_loaders = vec![
1206 CreateTableProcedure::TYPE_NAME,
1207 AlterTableProcedure::TYPE_NAME,
1208 DropTableProcedure::TYPE_NAME,
1209 TruncateTableProcedure::TYPE_NAME,
1210 ];
1211
1212 for loader in expected_loaders {
1213 assert!(procedure_manager.contains_loader(loader));
1214 }
1215 }
1216}