1use std::sync::Arc;
16
17use common_error::ext::BoxedError;
18use common_procedure::{
19 BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId, watcher,
20};
21use common_telemetry::tracing_context::{FutureExt, TracingContext};
22use common_telemetry::{debug, info, tracing};
23use derive_builder::Builder;
24use snafu::{OptionExt, ResultExt, ensure};
25use store_api::storage::TableId;
26
27use crate::ddl::alter_database::AlterDatabaseProcedure;
28use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
29use crate::ddl::alter_table::AlterTableProcedure;
30use crate::ddl::comment_on::CommentOnProcedure;
31use crate::ddl::create_database::CreateDatabaseProcedure;
32use crate::ddl::create_flow::CreateFlowProcedure;
33use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
34use crate::ddl::create_table::CreateTableProcedure;
35use crate::ddl::create_view::CreateViewProcedure;
36use crate::ddl::drop_database::DropDatabaseProcedure;
37use crate::ddl::drop_flow::DropFlowProcedure;
38use crate::ddl::drop_table::DropTableProcedure;
39use crate::ddl::drop_view::DropViewProcedure;
40use crate::ddl::truncate_table::TruncateTableProcedure;
41use crate::ddl::{DdlContext, utils};
42use crate::error::{
43 EmptyDdlTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result,
44 SubmitProcedureSnafu, TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
45 UnexpectedLogicalRouteTableSnafu, WaitProcedureSnafu,
46};
47use crate::key::table_info::TableInfoValue;
48use crate::key::table_name::TableNameKey;
49use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
50use crate::procedure_executor::ExecutorContext;
51#[cfg(feature = "enterprise")]
52use crate::rpc::ddl::DdlTask::CreateTrigger;
53#[cfg(feature = "enterprise")]
54use crate::rpc::ddl::DdlTask::DropTrigger;
55use crate::rpc::ddl::DdlTask::{
56 AlterDatabase, AlterLogicalTables, AlterTable, CommentOn, CreateDatabase, CreateFlow,
57 CreateLogicalTables, CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables,
58 DropTable, DropView, TruncateTable,
59};
60#[cfg(feature = "enterprise")]
61use crate::rpc::ddl::trigger::CreateTriggerTask;
62#[cfg(feature = "enterprise")]
63use crate::rpc::ddl::trigger::DropTriggerTask;
64use crate::rpc::ddl::{
65 AlterDatabaseTask, AlterTableTask, CommentOnTask, CreateDatabaseTask, CreateFlowTask,
66 CreateTableTask, CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask,
67 QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
68};
69use crate::rpc::router::RegionRoute;
70
71#[async_trait::async_trait]
73pub trait DdlManagerConfigurator<C>: Send + Sync {
74 async fn configure(
76 &self,
77 ddl_manager: DdlManager,
78 ctx: C,
79 ) -> std::result::Result<DdlManager, BoxedError>;
80}
81
82pub type DdlManagerConfiguratorRef<C> = Arc<dyn DdlManagerConfigurator<C>>;
83
84pub type DdlManagerRef = Arc<DdlManager>;
85
86pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader;
87
88#[derive(Builder)]
90pub struct DdlManager {
91 ddl_context: DdlContext,
92 procedure_manager: ProcedureManagerRef,
93 #[cfg(feature = "enterprise")]
94 trigger_ddl_manager: Option<TriggerDdlManagerRef>,
95}
96
97#[cfg(feature = "enterprise")]
100#[async_trait::async_trait]
101pub trait TriggerDdlManager: Send + Sync {
102 async fn create_trigger(
103 &self,
104 create_trigger_task: CreateTriggerTask,
105 procedure_manager: ProcedureManagerRef,
106 ddl_context: DdlContext,
107 query_context: QueryContext,
108 ) -> Result<SubmitDdlTaskResponse>;
109
110 async fn drop_trigger(
111 &self,
112 drop_trigger_task: DropTriggerTask,
113 procedure_manager: ProcedureManagerRef,
114 ddl_context: DdlContext,
115 query_context: QueryContext,
116 ) -> Result<SubmitDdlTaskResponse>;
117
118 fn as_any(&self) -> &dyn std::any::Any;
119}
120
121#[cfg(feature = "enterprise")]
122pub type TriggerDdlManagerRef = Arc<dyn TriggerDdlManager>;
123
124macro_rules! procedure_loader_entry {
125 ($procedure:ident) => {
126 (
127 $procedure::TYPE_NAME,
128 &|context: DdlContext| -> BoxedProcedureLoader {
129 Box::new(move |json: &str| {
130 let context = context.clone();
131 $procedure::from_json(json, context).map(|p| Box::new(p) as _)
132 })
133 },
134 )
135 };
136}
137
138macro_rules! procedure_loader {
139 ($($procedure:ident),*) => {
140 vec![
141 $(procedure_loader_entry!($procedure)),*
142 ]
143 };
144}
145
146impl DdlManager {
147 pub fn try_new(
149 ddl_context: DdlContext,
150 procedure_manager: ProcedureManagerRef,
151 register_loaders: bool,
152 ) -> Result<Self> {
153 let manager = Self {
154 ddl_context,
155 procedure_manager,
156 #[cfg(feature = "enterprise")]
157 trigger_ddl_manager: None,
158 };
159 if register_loaders {
160 manager.register_loaders()?;
161 }
162 Ok(manager)
163 }
164
165 #[cfg(feature = "enterprise")]
166 pub fn with_trigger_ddl_manager(mut self, trigger_ddl_manager: TriggerDdlManagerRef) -> Self {
167 self.trigger_ddl_manager = Some(trigger_ddl_manager);
168 self
169 }
170
171 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
173 &self.ddl_context.table_metadata_manager
174 }
175
176 pub fn create_context(&self) -> DdlContext {
178 self.ddl_context.clone()
179 }
180
181 pub fn register_loaders(&self) -> Result<()> {
183 let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = procedure_loader!(
184 CreateTableProcedure,
185 CreateLogicalTablesProcedure,
186 CreateViewProcedure,
187 CreateFlowProcedure,
188 AlterTableProcedure,
189 AlterLogicalTablesProcedure,
190 AlterDatabaseProcedure,
191 DropTableProcedure,
192 DropFlowProcedure,
193 TruncateTableProcedure,
194 CreateDatabaseProcedure,
195 DropDatabaseProcedure,
196 DropViewProcedure,
197 CommentOnProcedure
198 );
199
200 for (type_name, loader_factory) in loaders {
201 let context = self.create_context();
202 self.procedure_manager
203 .register_loader(type_name, loader_factory(context))
204 .context(RegisterProcedureLoaderSnafu { type_name })?;
205 }
206
207 Ok(())
208 }
209
210 #[tracing::instrument(skip_all)]
212 pub async fn submit_alter_table_task(
213 &self,
214 table_id: TableId,
215 alter_table_task: AlterTableTask,
216 ) -> Result<(ProcedureId, Option<Output>)> {
217 let context = self.create_context();
218
219 let procedure = AlterTableProcedure::new(table_id, alter_table_task, context)?;
220
221 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
222
223 self.submit_procedure(procedure_with_id).await
224 }
225
226 #[tracing::instrument(skip_all)]
228 pub async fn submit_create_table_task(
229 &self,
230 create_table_task: CreateTableTask,
231 ) -> Result<(ProcedureId, Option<Output>)> {
232 let context = self.create_context();
233
234 let procedure = CreateTableProcedure::new(create_table_task, context);
235
236 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
237
238 self.submit_procedure(procedure_with_id).await
239 }
240
241 #[tracing::instrument(skip_all)]
243 pub async fn submit_create_view_task(
244 &self,
245 create_view_task: CreateViewTask,
246 ) -> Result<(ProcedureId, Option<Output>)> {
247 let context = self.create_context();
248
249 let procedure = CreateViewProcedure::new(create_view_task, context);
250
251 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
252
253 self.submit_procedure(procedure_with_id).await
254 }
255
256 #[tracing::instrument(skip_all)]
258 pub async fn submit_create_logical_table_tasks(
259 &self,
260 create_table_tasks: Vec<CreateTableTask>,
261 physical_table_id: TableId,
262 ) -> Result<(ProcedureId, Option<Output>)> {
263 let context = self.create_context();
264
265 let procedure =
266 CreateLogicalTablesProcedure::new(create_table_tasks, physical_table_id, context);
267
268 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
269
270 self.submit_procedure(procedure_with_id).await
271 }
272
273 #[tracing::instrument(skip_all)]
275 pub async fn submit_alter_logical_table_tasks(
276 &self,
277 alter_table_tasks: Vec<AlterTableTask>,
278 physical_table_id: TableId,
279 ) -> Result<(ProcedureId, Option<Output>)> {
280 let context = self.create_context();
281
282 let procedure =
283 AlterLogicalTablesProcedure::new(alter_table_tasks, physical_table_id, context);
284
285 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
286
287 self.submit_procedure(procedure_with_id).await
288 }
289
290 #[tracing::instrument(skip_all)]
292 pub async fn submit_drop_table_task(
293 &self,
294 drop_table_task: DropTableTask,
295 ) -> Result<(ProcedureId, Option<Output>)> {
296 let context = self.create_context();
297
298 let procedure = DropTableProcedure::new(drop_table_task, context);
299
300 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
301
302 self.submit_procedure(procedure_with_id).await
303 }
304
305 #[tracing::instrument(skip_all)]
307 pub async fn submit_create_database(
308 &self,
309 CreateDatabaseTask {
310 catalog,
311 schema,
312 create_if_not_exists,
313 options,
314 }: CreateDatabaseTask,
315 ) -> Result<(ProcedureId, Option<Output>)> {
316 let context = self.create_context();
317 let procedure =
318 CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context);
319 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
320
321 self.submit_procedure(procedure_with_id).await
322 }
323
324 #[tracing::instrument(skip_all)]
326 pub async fn submit_drop_database(
327 &self,
328 DropDatabaseTask {
329 catalog,
330 schema,
331 drop_if_exists,
332 }: DropDatabaseTask,
333 ) -> Result<(ProcedureId, Option<Output>)> {
334 let context = self.create_context();
335 let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context);
336 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
337
338 self.submit_procedure(procedure_with_id).await
339 }
340
341 pub async fn submit_alter_database(
342 &self,
343 alter_database_task: AlterDatabaseTask,
344 ) -> Result<(ProcedureId, Option<Output>)> {
345 let context = self.create_context();
346 let procedure = AlterDatabaseProcedure::new(alter_database_task, context)?;
347 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
348
349 self.submit_procedure(procedure_with_id).await
350 }
351
352 #[tracing::instrument(skip_all)]
354 pub async fn submit_create_flow_task(
355 &self,
356 create_flow: CreateFlowTask,
357 query_context: QueryContext,
358 ) -> Result<(ProcedureId, Option<Output>)> {
359 let context = self.create_context();
360 let procedure = CreateFlowProcedure::new(create_flow, query_context, context);
361 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
362
363 self.submit_procedure(procedure_with_id).await
364 }
365
366 #[tracing::instrument(skip_all)]
368 pub async fn submit_drop_flow_task(
369 &self,
370 drop_flow: DropFlowTask,
371 ) -> Result<(ProcedureId, Option<Output>)> {
372 let context = self.create_context();
373 let procedure = DropFlowProcedure::new(drop_flow, context);
374 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
375
376 self.submit_procedure(procedure_with_id).await
377 }
378
379 #[tracing::instrument(skip_all)]
381 pub async fn submit_drop_view_task(
382 &self,
383 drop_view: DropViewTask,
384 ) -> Result<(ProcedureId, Option<Output>)> {
385 let context = self.create_context();
386 let procedure = DropViewProcedure::new(drop_view, context);
387 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
388
389 self.submit_procedure(procedure_with_id).await
390 }
391
392 #[tracing::instrument(skip_all)]
394 pub async fn submit_truncate_table_task(
395 &self,
396 truncate_table_task: TruncateTableTask,
397 table_info_value: DeserializedValueWithBytes<TableInfoValue>,
398 region_routes: Vec<RegionRoute>,
399 ) -> Result<(ProcedureId, Option<Output>)> {
400 let context = self.create_context();
401 let procedure = TruncateTableProcedure::new(
402 truncate_table_task,
403 table_info_value,
404 region_routes,
405 context,
406 );
407
408 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
409
410 self.submit_procedure(procedure_with_id).await
411 }
412
413 #[tracing::instrument(skip_all)]
415 pub async fn submit_comment_on_task(
416 &self,
417 comment_on_task: CommentOnTask,
418 ) -> Result<(ProcedureId, Option<Output>)> {
419 let context = self.create_context();
420 let procedure = CommentOnProcedure::new(comment_on_task, context);
421 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
422
423 self.submit_procedure(procedure_with_id).await
424 }
425
426 async fn submit_procedure(
427 &self,
428 procedure_with_id: ProcedureWithId,
429 ) -> Result<(ProcedureId, Option<Output>)> {
430 let procedure_id = procedure_with_id.id;
431
432 let mut watcher = self
433 .procedure_manager
434 .submit(procedure_with_id)
435 .await
436 .context(SubmitProcedureSnafu)?;
437
438 let output = watcher::wait(&mut watcher)
439 .await
440 .context(WaitProcedureSnafu)?;
441
442 Ok((procedure_id, output))
443 }
444
445 pub async fn submit_ddl_task(
446 &self,
447 ctx: &ExecutorContext,
448 request: SubmitDdlTaskRequest,
449 ) -> Result<SubmitDdlTaskResponse> {
450 let span = ctx
451 .tracing_context
452 .as_ref()
453 .map(TracingContext::from_w3c)
454 .unwrap_or_else(TracingContext::from_current_span)
455 .attach(tracing::info_span!("DdlManager::submit_ddl_task"));
456 async move {
457 debug!("Submitting Ddl task: {:?}", request.task);
458 match request.task {
459 CreateTable(create_table_task) => {
460 handle_create_table_task(self, create_table_task).await
461 }
462 DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await,
463 AlterTable(alter_table_task) => {
464 handle_alter_table_task(self, alter_table_task).await
465 }
466 TruncateTable(truncate_table_task) => {
467 handle_truncate_table_task(self, truncate_table_task).await
468 }
469 CreateLogicalTables(create_table_tasks) => {
470 handle_create_logical_table_tasks(self, create_table_tasks).await
471 }
472 AlterLogicalTables(alter_table_tasks) => {
473 handle_alter_logical_table_tasks(self, alter_table_tasks).await
474 }
475 DropLogicalTables(_) => todo!(),
476 CreateDatabase(create_database_task) => {
477 handle_create_database_task(self, create_database_task).await
478 }
479 DropDatabase(drop_database_task) => {
480 handle_drop_database_task(self, drop_database_task).await
481 }
482 AlterDatabase(alter_database_task) => {
483 handle_alter_database_task(self, alter_database_task).await
484 }
485 CreateFlow(create_flow_task) => {
486 handle_create_flow_task(self, create_flow_task, request.query_context.into())
487 .await
488 }
489 DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
490 CreateView(create_view_task) => {
491 handle_create_view_task(self, create_view_task).await
492 }
493 DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
494 CommentOn(comment_on_task) => handle_comment_on_task(self, comment_on_task).await,
495 #[cfg(feature = "enterprise")]
496 CreateTrigger(create_trigger_task) => {
497 handle_create_trigger_task(
498 self,
499 create_trigger_task,
500 request.query_context.into(),
501 )
502 .await
503 }
504 #[cfg(feature = "enterprise")]
505 DropTrigger(drop_trigger_task) => {
506 handle_drop_trigger_task(self, drop_trigger_task, request.query_context.into())
507 .await
508 }
509 }
510 }
511 .trace(span)
512 .await
513 }
514}
515
516async fn handle_truncate_table_task(
517 ddl_manager: &DdlManager,
518 truncate_table_task: TruncateTableTask,
519) -> Result<SubmitDdlTaskResponse> {
520 let table_id = truncate_table_task.table_id;
521 let table_metadata_manager = &ddl_manager.table_metadata_manager();
522 let table_ref = truncate_table_task.table_ref();
523
524 let (table_info_value, table_route_value) =
525 table_metadata_manager.get_full_table_info(table_id).await?;
526
527 let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu {
528 table: table_ref.to_string(),
529 })?;
530
531 let table_route_value = table_route_value.context(TableRouteNotFoundSnafu { table_id })?;
532
533 let table_route = table_route_value.into_inner().region_routes()?.clone();
534
535 let (id, _) = ddl_manager
536 .submit_truncate_table_task(truncate_table_task, table_info_value, table_route)
537 .await?;
538
539 info!("Table: {table_id} is truncated via procedure_id {id:?}");
540
541 Ok(SubmitDdlTaskResponse {
542 key: id.to_string().into(),
543 ..Default::default()
544 })
545}
546
547async fn handle_alter_table_task(
548 ddl_manager: &DdlManager,
549 alter_table_task: AlterTableTask,
550) -> Result<SubmitDdlTaskResponse> {
551 let table_ref = alter_table_task.table_ref();
552
553 let table_id = ddl_manager
554 .table_metadata_manager()
555 .table_name_manager()
556 .get(TableNameKey::new(
557 table_ref.catalog,
558 table_ref.schema,
559 table_ref.table,
560 ))
561 .await?
562 .with_context(|| TableNotFoundSnafu {
563 table_name: table_ref.to_string(),
564 })?
565 .table_id();
566
567 let table_route_value = ddl_manager
568 .table_metadata_manager()
569 .table_route_manager()
570 .table_route_storage()
571 .get(table_id)
572 .await?
573 .context(TableRouteNotFoundSnafu { table_id })?;
574 ensure!(
575 table_route_value.is_physical(),
576 UnexpectedLogicalRouteTableSnafu {
577 err_msg: format!("{:?} is a non-physical TableRouteValue.", table_ref),
578 }
579 );
580
581 let (id, _) = ddl_manager
582 .submit_alter_table_task(table_id, alter_table_task)
583 .await?;
584
585 info!("Table: {table_id} is altered via procedure_id {id:?}");
586
587 Ok(SubmitDdlTaskResponse {
588 key: id.to_string().into(),
589 ..Default::default()
590 })
591}
592
593async fn handle_drop_table_task(
594 ddl_manager: &DdlManager,
595 drop_table_task: DropTableTask,
596) -> Result<SubmitDdlTaskResponse> {
597 let table_id = drop_table_task.table_id;
598 let (id, _) = ddl_manager.submit_drop_table_task(drop_table_task).await?;
599
600 info!("Table: {table_id} is dropped via procedure_id {id:?}");
601
602 Ok(SubmitDdlTaskResponse {
603 key: id.to_string().into(),
604 ..Default::default()
605 })
606}
607
608async fn handle_create_table_task(
609 ddl_manager: &DdlManager,
610 create_table_task: CreateTableTask,
611) -> Result<SubmitDdlTaskResponse> {
612 let (id, output) = ddl_manager
613 .submit_create_table_task(create_table_task)
614 .await?;
615
616 let procedure_id = id.to_string();
617 let output = output.context(ProcedureOutputSnafu {
618 procedure_id: &procedure_id,
619 err_msg: "empty output",
620 })?;
621 let table_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
622 procedure_id: &procedure_id,
623 err_msg: "downcast to `u32`",
624 })?);
625 info!("Table: {table_id} is created via procedure_id {id:?}");
626
627 Ok(SubmitDdlTaskResponse {
628 key: procedure_id.into(),
629 table_ids: vec![table_id],
630 })
631}
632
633async fn handle_create_logical_table_tasks(
634 ddl_manager: &DdlManager,
635 create_table_tasks: Vec<CreateTableTask>,
636) -> Result<SubmitDdlTaskResponse> {
637 ensure!(
638 !create_table_tasks.is_empty(),
639 EmptyDdlTasksSnafu {
640 name: "create logical tables"
641 }
642 );
643 let physical_table_id = utils::check_and_get_physical_table_id(
644 ddl_manager.table_metadata_manager(),
645 &create_table_tasks,
646 )
647 .await?;
648 let num_logical_tables = create_table_tasks.len();
649
650 let (id, output) = ddl_manager
651 .submit_create_logical_table_tasks(create_table_tasks, physical_table_id)
652 .await?;
653
654 info!(
655 "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is created via procedure_id {id:?}"
656 );
657
658 let procedure_id = id.to_string();
659 let output = output.context(ProcedureOutputSnafu {
660 procedure_id: &procedure_id,
661 err_msg: "empty output",
662 })?;
663 let table_ids = output
664 .downcast_ref::<Vec<TableId>>()
665 .context(ProcedureOutputSnafu {
666 procedure_id: &procedure_id,
667 err_msg: "downcast to `Vec<TableId>`",
668 })?
669 .clone();
670
671 Ok(SubmitDdlTaskResponse {
672 key: procedure_id.into(),
673 table_ids,
674 })
675}
676
677async fn handle_create_database_task(
678 ddl_manager: &DdlManager,
679 create_database_task: CreateDatabaseTask,
680) -> Result<SubmitDdlTaskResponse> {
681 let (id, _) = ddl_manager
682 .submit_create_database(create_database_task.clone())
683 .await?;
684
685 let procedure_id = id.to_string();
686 info!(
687 "Database {}.{} is created via procedure_id {id:?}",
688 create_database_task.catalog, create_database_task.schema
689 );
690
691 Ok(SubmitDdlTaskResponse {
692 key: procedure_id.into(),
693 ..Default::default()
694 })
695}
696
697async fn handle_drop_database_task(
698 ddl_manager: &DdlManager,
699 drop_database_task: DropDatabaseTask,
700) -> Result<SubmitDdlTaskResponse> {
701 let (id, _) = ddl_manager
702 .submit_drop_database(drop_database_task.clone())
703 .await?;
704
705 let procedure_id = id.to_string();
706 info!(
707 "Database {}.{} is dropped via procedure_id {id:?}",
708 drop_database_task.catalog, drop_database_task.schema
709 );
710
711 Ok(SubmitDdlTaskResponse {
712 key: procedure_id.into(),
713 ..Default::default()
714 })
715}
716
717async fn handle_alter_database_task(
718 ddl_manager: &DdlManager,
719 alter_database_task: AlterDatabaseTask,
720) -> Result<SubmitDdlTaskResponse> {
721 let (id, _) = ddl_manager
722 .submit_alter_database(alter_database_task.clone())
723 .await?;
724
725 let procedure_id = id.to_string();
726 info!(
727 "Database {}.{} is altered via procedure_id {id:?}",
728 alter_database_task.catalog(),
729 alter_database_task.schema()
730 );
731
732 Ok(SubmitDdlTaskResponse {
733 key: procedure_id.into(),
734 ..Default::default()
735 })
736}
737
738async fn handle_drop_flow_task(
739 ddl_manager: &DdlManager,
740 drop_flow_task: DropFlowTask,
741) -> Result<SubmitDdlTaskResponse> {
742 let (id, _) = ddl_manager
743 .submit_drop_flow_task(drop_flow_task.clone())
744 .await?;
745
746 let procedure_id = id.to_string();
747 info!(
748 "Flow {}.{}({}) is dropped via procedure_id {id:?}",
749 drop_flow_task.catalog_name, drop_flow_task.flow_name, drop_flow_task.flow_id,
750 );
751
752 Ok(SubmitDdlTaskResponse {
753 key: procedure_id.into(),
754 ..Default::default()
755 })
756}
757
758#[cfg(feature = "enterprise")]
759async fn handle_drop_trigger_task(
760 ddl_manager: &DdlManager,
761 drop_trigger_task: DropTriggerTask,
762 query_context: QueryContext,
763) -> Result<SubmitDdlTaskResponse> {
764 let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
765 use crate::error::UnsupportedSnafu;
766
767 return UnsupportedSnafu {
768 operation: "drop trigger",
769 }
770 .fail();
771 };
772
773 m.drop_trigger(
774 drop_trigger_task,
775 ddl_manager.procedure_manager.clone(),
776 ddl_manager.ddl_context.clone(),
777 query_context,
778 )
779 .await
780}
781
782async fn handle_drop_view_task(
783 ddl_manager: &DdlManager,
784 drop_view_task: DropViewTask,
785) -> Result<SubmitDdlTaskResponse> {
786 let (id, _) = ddl_manager
787 .submit_drop_view_task(drop_view_task.clone())
788 .await?;
789
790 let procedure_id = id.to_string();
791 info!(
792 "View {}({}) is dropped via procedure_id {id:?}",
793 drop_view_task.table_ref(),
794 drop_view_task.view_id,
795 );
796
797 Ok(SubmitDdlTaskResponse {
798 key: procedure_id.into(),
799 ..Default::default()
800 })
801}
802
803async fn handle_create_flow_task(
804 ddl_manager: &DdlManager,
805 create_flow_task: CreateFlowTask,
806 query_context: QueryContext,
807) -> Result<SubmitDdlTaskResponse> {
808 let (id, output) = ddl_manager
809 .submit_create_flow_task(create_flow_task.clone(), query_context)
810 .await?;
811
812 let procedure_id = id.to_string();
813 let output = output.context(ProcedureOutputSnafu {
814 procedure_id: &procedure_id,
815 err_msg: "empty output",
816 })?;
817 let flow_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
818 procedure_id: &procedure_id,
819 err_msg: "downcast to `u32`",
820 })?);
821 if !create_flow_task.or_replace {
822 info!(
823 "Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
824 create_flow_task.catalog_name, create_flow_task.flow_name,
825 );
826 } else {
827 info!(
828 "Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
829 create_flow_task.catalog_name, create_flow_task.flow_name,
830 );
831 }
832
833 Ok(SubmitDdlTaskResponse {
834 key: procedure_id.into(),
835 ..Default::default()
836 })
837}
838
839#[cfg(feature = "enterprise")]
840async fn handle_create_trigger_task(
841 ddl_manager: &DdlManager,
842 create_trigger_task: CreateTriggerTask,
843 query_context: QueryContext,
844) -> Result<SubmitDdlTaskResponse> {
845 let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
846 use crate::error::UnsupportedSnafu;
847
848 return UnsupportedSnafu {
849 operation: "create trigger",
850 }
851 .fail();
852 };
853
854 m.create_trigger(
855 create_trigger_task,
856 ddl_manager.procedure_manager.clone(),
857 ddl_manager.ddl_context.clone(),
858 query_context,
859 )
860 .await
861}
862
863async fn handle_alter_logical_table_tasks(
864 ddl_manager: &DdlManager,
865 alter_table_tasks: Vec<AlterTableTask>,
866) -> Result<SubmitDdlTaskResponse> {
867 ensure!(
868 !alter_table_tasks.is_empty(),
869 EmptyDdlTasksSnafu {
870 name: "alter logical tables"
871 }
872 );
873
874 let first_table = TableNameKey {
876 catalog: &alter_table_tasks[0].alter_table.catalog_name,
877 schema: &alter_table_tasks[0].alter_table.schema_name,
878 table: &alter_table_tasks[0].alter_table.table_name,
879 };
880 let physical_table_id =
881 utils::get_physical_table_id(ddl_manager.table_metadata_manager(), first_table).await?;
882 let num_logical_tables = alter_table_tasks.len();
883
884 let (id, _) = ddl_manager
885 .submit_alter_logical_table_tasks(alter_table_tasks, physical_table_id)
886 .await?;
887
888 info!(
889 "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is altered via procedure_id {id:?}"
890 );
891
892 let procedure_id = id.to_string();
893
894 Ok(SubmitDdlTaskResponse {
895 key: procedure_id.into(),
896 ..Default::default()
897 })
898}
899
900async fn handle_create_view_task(
902 ddl_manager: &DdlManager,
903 create_view_task: CreateViewTask,
904) -> Result<SubmitDdlTaskResponse> {
905 let (id, output) = ddl_manager
906 .submit_create_view_task(create_view_task)
907 .await?;
908
909 let procedure_id = id.to_string();
910 let output = output.context(ProcedureOutputSnafu {
911 procedure_id: &procedure_id,
912 err_msg: "empty output",
913 })?;
914 let view_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
915 procedure_id: &procedure_id,
916 err_msg: "downcast to `u32`",
917 })?);
918 info!("View: {view_id} is created via procedure_id {id:?}");
919
920 Ok(SubmitDdlTaskResponse {
921 key: procedure_id.into(),
922 table_ids: vec![view_id],
923 })
924}
925
926async fn handle_comment_on_task(
927 ddl_manager: &DdlManager,
928 comment_on_task: CommentOnTask,
929) -> Result<SubmitDdlTaskResponse> {
930 let (id, _) = ddl_manager
931 .submit_comment_on_task(comment_on_task.clone())
932 .await?;
933
934 let procedure_id = id.to_string();
935 info!(
936 "Comment on {}.{}.{} is updated via procedure_id {id:?}",
937 comment_on_task.catalog_name, comment_on_task.schema_name, comment_on_task.object_name
938 );
939
940 Ok(SubmitDdlTaskResponse {
941 key: procedure_id.into(),
942 ..Default::default()
943 })
944}
945
946#[cfg(test)]
947mod tests {
948 use std::sync::Arc;
949
950 use common_procedure::local::LocalManager;
951 use common_procedure::test_util::InMemoryPoisonStore;
952
953 use super::DdlManager;
954 use crate::cache_invalidator::DummyCacheInvalidator;
955 use crate::ddl::alter_table::AlterTableProcedure;
956 use crate::ddl::create_table::CreateTableProcedure;
957 use crate::ddl::drop_table::DropTableProcedure;
958 use crate::ddl::flow_meta::FlowMetadataAllocator;
959 use crate::ddl::table_meta::TableMetadataAllocator;
960 use crate::ddl::truncate_table::TruncateTableProcedure;
961 use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
962 use crate::key::TableMetadataManager;
963 use crate::key::flow::FlowMetadataManager;
964 use crate::kv_backend::memory::MemoryKvBackend;
965 use crate::node_manager::{DatanodeManager, DatanodeRef, FlownodeManager, FlownodeRef};
966 use crate::peer::Peer;
967 use crate::region_keeper::MemoryRegionKeeper;
968 use crate::region_registry::LeaderRegionRegistry;
969 use crate::sequence::SequenceBuilder;
970 use crate::state_store::KvStateStore;
971 use crate::wal_options_allocator::WalOptionsAllocator;
972
973 pub struct DummyDatanodeManager;
975
976 #[async_trait::async_trait]
977 impl DatanodeManager for DummyDatanodeManager {
978 async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
979 unimplemented!()
980 }
981 }
982
983 #[async_trait::async_trait]
984 impl FlownodeManager for DummyDatanodeManager {
985 async fn flownode(&self, _node: &Peer) -> FlownodeRef {
986 unimplemented!()
987 }
988 }
989
990 #[test]
991 fn test_try_new() {
992 let kv_backend = Arc::new(MemoryKvBackend::new());
993 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
994 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
995 Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
996 Arc::new(WalOptionsAllocator::default()),
997 ));
998 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
999 let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
1000 Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()),
1001 ));
1002
1003 let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
1004 let poison_manager = Arc::new(InMemoryPoisonStore::default());
1005 let procedure_manager = Arc::new(LocalManager::new(
1006 Default::default(),
1007 state_store,
1008 poison_manager,
1009 None,
1010 None,
1011 ));
1012
1013 let _ = DdlManager::try_new(
1014 DdlContext {
1015 node_manager: Arc::new(DummyDatanodeManager),
1016 cache_invalidator: Arc::new(DummyCacheInvalidator),
1017 table_metadata_manager,
1018 table_metadata_allocator,
1019 flow_metadata_manager,
1020 flow_metadata_allocator,
1021 memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
1022 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
1023 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
1024 },
1025 procedure_manager.clone(),
1026 true,
1027 );
1028
1029 let expected_loaders = vec![
1030 CreateTableProcedure::TYPE_NAME,
1031 AlterTableProcedure::TYPE_NAME,
1032 DropTableProcedure::TYPE_NAME,
1033 TruncateTableProcedure::TYPE_NAME,
1034 ];
1035
1036 for loader in expected_loaders {
1037 assert!(procedure_manager.contains_loader(loader));
1038 }
1039 }
1040}