1use std::sync::Arc;
16
17use common_procedure::{
18 BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId, watcher,
19};
20use common_telemetry::tracing_context::{FutureExt, TracingContext};
21use common_telemetry::{debug, info, tracing};
22use derive_builder::Builder;
23use snafu::{OptionExt, ResultExt, ensure};
24use store_api::storage::TableId;
25
26use crate::ddl::alter_database::AlterDatabaseProcedure;
27use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
28use crate::ddl::alter_table::AlterTableProcedure;
29use crate::ddl::create_database::CreateDatabaseProcedure;
30use crate::ddl::create_flow::CreateFlowProcedure;
31use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
32use crate::ddl::create_table::CreateTableProcedure;
33use crate::ddl::create_view::CreateViewProcedure;
34use crate::ddl::drop_database::DropDatabaseProcedure;
35use crate::ddl::drop_flow::DropFlowProcedure;
36use crate::ddl::drop_table::DropTableProcedure;
37use crate::ddl::drop_view::DropViewProcedure;
38use crate::ddl::truncate_table::TruncateTableProcedure;
39use crate::ddl::{DdlContext, utils};
40use crate::error::{
41 EmptyDdlTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result,
42 SubmitProcedureSnafu, TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
43 UnexpectedLogicalRouteTableSnafu, WaitProcedureSnafu,
44};
45use crate::key::table_info::TableInfoValue;
46use crate::key::table_name::TableNameKey;
47use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
48use crate::procedure_executor::ExecutorContext;
49#[cfg(feature = "enterprise")]
50use crate::rpc::ddl::DdlTask::CreateTrigger;
51#[cfg(feature = "enterprise")]
52use crate::rpc::ddl::DdlTask::DropTrigger;
53use crate::rpc::ddl::DdlTask::{
54 AlterDatabase, AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables,
55 CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView,
56 TruncateTable,
57};
58#[cfg(feature = "enterprise")]
59use crate::rpc::ddl::trigger::CreateTriggerTask;
60#[cfg(feature = "enterprise")]
61use crate::rpc::ddl::trigger::DropTriggerTask;
62use crate::rpc::ddl::{
63 AlterDatabaseTask, AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask,
64 CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, QueryContext,
65 SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
66};
67use crate::rpc::router::RegionRoute;
68
69pub type DdlManagerRef = Arc<DdlManager>;
70
71pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader;
72
73#[derive(Builder)]
75pub struct DdlManager {
76 ddl_context: DdlContext,
77 procedure_manager: ProcedureManagerRef,
78 #[cfg(feature = "enterprise")]
79 trigger_ddl_manager: Option<TriggerDdlManagerRef>,
80}
81
82#[cfg(feature = "enterprise")]
85#[async_trait::async_trait]
86pub trait TriggerDdlManager: Send + Sync {
87 async fn create_trigger(
88 &self,
89 create_trigger_task: CreateTriggerTask,
90 procedure_manager: ProcedureManagerRef,
91 ddl_context: DdlContext,
92 query_context: QueryContext,
93 ) -> Result<SubmitDdlTaskResponse>;
94
95 async fn drop_trigger(
96 &self,
97 drop_trigger_task: DropTriggerTask,
98 procedure_manager: ProcedureManagerRef,
99 ddl_context: DdlContext,
100 query_context: QueryContext,
101 ) -> Result<SubmitDdlTaskResponse>;
102
103 fn as_any(&self) -> &dyn std::any::Any;
104}
105
106#[cfg(feature = "enterprise")]
107pub type TriggerDdlManagerRef = Arc<dyn TriggerDdlManager>;
108
109macro_rules! procedure_loader_entry {
110 ($procedure:ident) => {
111 (
112 $procedure::TYPE_NAME,
113 &|context: DdlContext| -> BoxedProcedureLoader {
114 Box::new(move |json: &str| {
115 let context = context.clone();
116 $procedure::from_json(json, context).map(|p| Box::new(p) as _)
117 })
118 },
119 )
120 };
121}
122
123macro_rules! procedure_loader {
124 ($($procedure:ident),*) => {
125 vec![
126 $(procedure_loader_entry!($procedure)),*
127 ]
128 };
129}
130
131impl DdlManager {
132 pub fn try_new(
134 ddl_context: DdlContext,
135 procedure_manager: ProcedureManagerRef,
136 register_loaders: bool,
137 ) -> Result<Self> {
138 let manager = Self {
139 ddl_context,
140 procedure_manager,
141 #[cfg(feature = "enterprise")]
142 trigger_ddl_manager: None,
143 };
144 if register_loaders {
145 manager.register_loaders()?;
146 }
147 Ok(manager)
148 }
149
150 #[cfg(feature = "enterprise")]
151 pub fn with_trigger_ddl_manager(
152 mut self,
153 trigger_ddl_manager: Option<TriggerDdlManagerRef>,
154 ) -> Self {
155 self.trigger_ddl_manager = trigger_ddl_manager;
156 self
157 }
158
159 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
161 &self.ddl_context.table_metadata_manager
162 }
163
164 pub fn create_context(&self) -> DdlContext {
166 self.ddl_context.clone()
167 }
168
169 pub fn register_loaders(&self) -> Result<()> {
171 let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = procedure_loader!(
172 CreateTableProcedure,
173 CreateLogicalTablesProcedure,
174 CreateViewProcedure,
175 CreateFlowProcedure,
176 AlterTableProcedure,
177 AlterLogicalTablesProcedure,
178 AlterDatabaseProcedure,
179 DropTableProcedure,
180 DropFlowProcedure,
181 TruncateTableProcedure,
182 CreateDatabaseProcedure,
183 DropDatabaseProcedure,
184 DropViewProcedure
185 );
186
187 for (type_name, loader_factory) in loaders {
188 let context = self.create_context();
189 self.procedure_manager
190 .register_loader(type_name, loader_factory(context))
191 .context(RegisterProcedureLoaderSnafu { type_name })?;
192 }
193
194 Ok(())
195 }
196
197 #[tracing::instrument(skip_all)]
199 pub async fn submit_alter_table_task(
200 &self,
201 table_id: TableId,
202 alter_table_task: AlterTableTask,
203 ) -> Result<(ProcedureId, Option<Output>)> {
204 let context = self.create_context();
205
206 let procedure = AlterTableProcedure::new(table_id, alter_table_task, context)?;
207
208 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
209
210 self.submit_procedure(procedure_with_id).await
211 }
212
213 #[tracing::instrument(skip_all)]
215 pub async fn submit_create_table_task(
216 &self,
217 create_table_task: CreateTableTask,
218 ) -> Result<(ProcedureId, Option<Output>)> {
219 let context = self.create_context();
220
221 let procedure = CreateTableProcedure::new(create_table_task, context);
222
223 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
224
225 self.submit_procedure(procedure_with_id).await
226 }
227
228 #[tracing::instrument(skip_all)]
230 pub async fn submit_create_view_task(
231 &self,
232 create_view_task: CreateViewTask,
233 ) -> Result<(ProcedureId, Option<Output>)> {
234 let context = self.create_context();
235
236 let procedure = CreateViewProcedure::new(create_view_task, context);
237
238 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
239
240 self.submit_procedure(procedure_with_id).await
241 }
242
243 #[tracing::instrument(skip_all)]
245 pub async fn submit_create_logical_table_tasks(
246 &self,
247 create_table_tasks: Vec<CreateTableTask>,
248 physical_table_id: TableId,
249 ) -> Result<(ProcedureId, Option<Output>)> {
250 let context = self.create_context();
251
252 let procedure =
253 CreateLogicalTablesProcedure::new(create_table_tasks, physical_table_id, context);
254
255 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
256
257 self.submit_procedure(procedure_with_id).await
258 }
259
260 #[tracing::instrument(skip_all)]
262 pub async fn submit_alter_logical_table_tasks(
263 &self,
264 alter_table_tasks: Vec<AlterTableTask>,
265 physical_table_id: TableId,
266 ) -> Result<(ProcedureId, Option<Output>)> {
267 let context = self.create_context();
268
269 let procedure =
270 AlterLogicalTablesProcedure::new(alter_table_tasks, physical_table_id, context);
271
272 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
273
274 self.submit_procedure(procedure_with_id).await
275 }
276
277 #[tracing::instrument(skip_all)]
279 pub async fn submit_drop_table_task(
280 &self,
281 drop_table_task: DropTableTask,
282 ) -> Result<(ProcedureId, Option<Output>)> {
283 let context = self.create_context();
284
285 let procedure = DropTableProcedure::new(drop_table_task, context);
286
287 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
288
289 self.submit_procedure(procedure_with_id).await
290 }
291
292 #[tracing::instrument(skip_all)]
294 pub async fn submit_create_database(
295 &self,
296 CreateDatabaseTask {
297 catalog,
298 schema,
299 create_if_not_exists,
300 options,
301 }: CreateDatabaseTask,
302 ) -> Result<(ProcedureId, Option<Output>)> {
303 let context = self.create_context();
304 let procedure =
305 CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context);
306 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
307
308 self.submit_procedure(procedure_with_id).await
309 }
310
311 #[tracing::instrument(skip_all)]
313 pub async fn submit_drop_database(
314 &self,
315 DropDatabaseTask {
316 catalog,
317 schema,
318 drop_if_exists,
319 }: DropDatabaseTask,
320 ) -> Result<(ProcedureId, Option<Output>)> {
321 let context = self.create_context();
322 let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context);
323 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
324
325 self.submit_procedure(procedure_with_id).await
326 }
327
328 pub async fn submit_alter_database(
329 &self,
330 alter_database_task: AlterDatabaseTask,
331 ) -> Result<(ProcedureId, Option<Output>)> {
332 let context = self.create_context();
333 let procedure = AlterDatabaseProcedure::new(alter_database_task, context)?;
334 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
335
336 self.submit_procedure(procedure_with_id).await
337 }
338
339 #[tracing::instrument(skip_all)]
341 pub async fn submit_create_flow_task(
342 &self,
343 create_flow: CreateFlowTask,
344 query_context: QueryContext,
345 ) -> Result<(ProcedureId, Option<Output>)> {
346 let context = self.create_context();
347 let procedure = CreateFlowProcedure::new(create_flow, query_context, context);
348 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
349
350 self.submit_procedure(procedure_with_id).await
351 }
352
353 #[tracing::instrument(skip_all)]
355 pub async fn submit_drop_flow_task(
356 &self,
357 drop_flow: DropFlowTask,
358 ) -> Result<(ProcedureId, Option<Output>)> {
359 let context = self.create_context();
360 let procedure = DropFlowProcedure::new(drop_flow, context);
361 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
362
363 self.submit_procedure(procedure_with_id).await
364 }
365
366 #[tracing::instrument(skip_all)]
368 pub async fn submit_drop_view_task(
369 &self,
370 drop_view: DropViewTask,
371 ) -> Result<(ProcedureId, Option<Output>)> {
372 let context = self.create_context();
373 let procedure = DropViewProcedure::new(drop_view, context);
374 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
375
376 self.submit_procedure(procedure_with_id).await
377 }
378
379 #[tracing::instrument(skip_all)]
381 pub async fn submit_truncate_table_task(
382 &self,
383 truncate_table_task: TruncateTableTask,
384 table_info_value: DeserializedValueWithBytes<TableInfoValue>,
385 region_routes: Vec<RegionRoute>,
386 ) -> Result<(ProcedureId, Option<Output>)> {
387 let context = self.create_context();
388 let procedure = TruncateTableProcedure::new(
389 truncate_table_task,
390 table_info_value,
391 region_routes,
392 context,
393 );
394
395 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
396
397 self.submit_procedure(procedure_with_id).await
398 }
399
400 async fn submit_procedure(
401 &self,
402 procedure_with_id: ProcedureWithId,
403 ) -> Result<(ProcedureId, Option<Output>)> {
404 let procedure_id = procedure_with_id.id;
405
406 let mut watcher = self
407 .procedure_manager
408 .submit(procedure_with_id)
409 .await
410 .context(SubmitProcedureSnafu)?;
411
412 let output = watcher::wait(&mut watcher)
413 .await
414 .context(WaitProcedureSnafu)?;
415
416 Ok((procedure_id, output))
417 }
418
419 pub async fn submit_ddl_task(
420 &self,
421 ctx: &ExecutorContext,
422 request: SubmitDdlTaskRequest,
423 ) -> Result<SubmitDdlTaskResponse> {
424 let span = ctx
425 .tracing_context
426 .as_ref()
427 .map(TracingContext::from_w3c)
428 .unwrap_or_else(TracingContext::from_current_span)
429 .attach(tracing::info_span!("DdlManager::submit_ddl_task"));
430 async move {
431 debug!("Submitting Ddl task: {:?}", request.task);
432 match request.task {
433 CreateTable(create_table_task) => {
434 handle_create_table_task(self, create_table_task).await
435 }
436 DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await,
437 AlterTable(alter_table_task) => {
438 handle_alter_table_task(self, alter_table_task).await
439 }
440 TruncateTable(truncate_table_task) => {
441 handle_truncate_table_task(self, truncate_table_task).await
442 }
443 CreateLogicalTables(create_table_tasks) => {
444 handle_create_logical_table_tasks(self, create_table_tasks).await
445 }
446 AlterLogicalTables(alter_table_tasks) => {
447 handle_alter_logical_table_tasks(self, alter_table_tasks).await
448 }
449 DropLogicalTables(_) => todo!(),
450 CreateDatabase(create_database_task) => {
451 handle_create_database_task(self, create_database_task).await
452 }
453 DropDatabase(drop_database_task) => {
454 handle_drop_database_task(self, drop_database_task).await
455 }
456 AlterDatabase(alter_database_task) => {
457 handle_alter_database_task(self, alter_database_task).await
458 }
459 CreateFlow(create_flow_task) => {
460 handle_create_flow_task(self, create_flow_task, request.query_context.into())
461 .await
462 }
463 DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
464 CreateView(create_view_task) => {
465 handle_create_view_task(self, create_view_task).await
466 }
467 DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
468 #[cfg(feature = "enterprise")]
469 CreateTrigger(create_trigger_task) => {
470 handle_create_trigger_task(
471 self,
472 create_trigger_task,
473 request.query_context.into(),
474 )
475 .await
476 }
477 #[cfg(feature = "enterprise")]
478 DropTrigger(drop_trigger_task) => {
479 handle_drop_trigger_task(self, drop_trigger_task, request.query_context.into())
480 .await
481 }
482 }
483 }
484 .trace(span)
485 .await
486 }
487}
488
489async fn handle_truncate_table_task(
490 ddl_manager: &DdlManager,
491 truncate_table_task: TruncateTableTask,
492) -> Result<SubmitDdlTaskResponse> {
493 let table_id = truncate_table_task.table_id;
494 let table_metadata_manager = &ddl_manager.table_metadata_manager();
495 let table_ref = truncate_table_task.table_ref();
496
497 let (table_info_value, table_route_value) =
498 table_metadata_manager.get_full_table_info(table_id).await?;
499
500 let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu {
501 table: table_ref.to_string(),
502 })?;
503
504 let table_route_value = table_route_value.context(TableRouteNotFoundSnafu { table_id })?;
505
506 let table_route = table_route_value.into_inner().region_routes()?.clone();
507
508 let (id, _) = ddl_manager
509 .submit_truncate_table_task(truncate_table_task, table_info_value, table_route)
510 .await?;
511
512 info!("Table: {table_id} is truncated via procedure_id {id:?}");
513
514 Ok(SubmitDdlTaskResponse {
515 key: id.to_string().into(),
516 ..Default::default()
517 })
518}
519
520async fn handle_alter_table_task(
521 ddl_manager: &DdlManager,
522 alter_table_task: AlterTableTask,
523) -> Result<SubmitDdlTaskResponse> {
524 let table_ref = alter_table_task.table_ref();
525
526 let table_id = ddl_manager
527 .table_metadata_manager()
528 .table_name_manager()
529 .get(TableNameKey::new(
530 table_ref.catalog,
531 table_ref.schema,
532 table_ref.table,
533 ))
534 .await?
535 .with_context(|| TableNotFoundSnafu {
536 table_name: table_ref.to_string(),
537 })?
538 .table_id();
539
540 let table_route_value = ddl_manager
541 .table_metadata_manager()
542 .table_route_manager()
543 .table_route_storage()
544 .get(table_id)
545 .await?
546 .context(TableRouteNotFoundSnafu { table_id })?;
547 ensure!(
548 table_route_value.is_physical(),
549 UnexpectedLogicalRouteTableSnafu {
550 err_msg: format!("{:?} is a non-physical TableRouteValue.", table_ref),
551 }
552 );
553
554 let (id, _) = ddl_manager
555 .submit_alter_table_task(table_id, alter_table_task)
556 .await?;
557
558 info!("Table: {table_id} is altered via procedure_id {id:?}");
559
560 Ok(SubmitDdlTaskResponse {
561 key: id.to_string().into(),
562 ..Default::default()
563 })
564}
565
566async fn handle_drop_table_task(
567 ddl_manager: &DdlManager,
568 drop_table_task: DropTableTask,
569) -> Result<SubmitDdlTaskResponse> {
570 let table_id = drop_table_task.table_id;
571 let (id, _) = ddl_manager.submit_drop_table_task(drop_table_task).await?;
572
573 info!("Table: {table_id} is dropped via procedure_id {id:?}");
574
575 Ok(SubmitDdlTaskResponse {
576 key: id.to_string().into(),
577 ..Default::default()
578 })
579}
580
581async fn handle_create_table_task(
582 ddl_manager: &DdlManager,
583 create_table_task: CreateTableTask,
584) -> Result<SubmitDdlTaskResponse> {
585 let (id, output) = ddl_manager
586 .submit_create_table_task(create_table_task)
587 .await?;
588
589 let procedure_id = id.to_string();
590 let output = output.context(ProcedureOutputSnafu {
591 procedure_id: &procedure_id,
592 err_msg: "empty output",
593 })?;
594 let table_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
595 procedure_id: &procedure_id,
596 err_msg: "downcast to `u32`",
597 })?);
598 info!("Table: {table_id} is created via procedure_id {id:?}");
599
600 Ok(SubmitDdlTaskResponse {
601 key: procedure_id.into(),
602 table_ids: vec![table_id],
603 })
604}
605
606async fn handle_create_logical_table_tasks(
607 ddl_manager: &DdlManager,
608 create_table_tasks: Vec<CreateTableTask>,
609) -> Result<SubmitDdlTaskResponse> {
610 ensure!(
611 !create_table_tasks.is_empty(),
612 EmptyDdlTasksSnafu {
613 name: "create logical tables"
614 }
615 );
616 let physical_table_id = utils::check_and_get_physical_table_id(
617 ddl_manager.table_metadata_manager(),
618 &create_table_tasks,
619 )
620 .await?;
621 let num_logical_tables = create_table_tasks.len();
622
623 let (id, output) = ddl_manager
624 .submit_create_logical_table_tasks(create_table_tasks, physical_table_id)
625 .await?;
626
627 info!(
628 "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is created via procedure_id {id:?}"
629 );
630
631 let procedure_id = id.to_string();
632 let output = output.context(ProcedureOutputSnafu {
633 procedure_id: &procedure_id,
634 err_msg: "empty output",
635 })?;
636 let table_ids = output
637 .downcast_ref::<Vec<TableId>>()
638 .context(ProcedureOutputSnafu {
639 procedure_id: &procedure_id,
640 err_msg: "downcast to `Vec<TableId>`",
641 })?
642 .clone();
643
644 Ok(SubmitDdlTaskResponse {
645 key: procedure_id.into(),
646 table_ids,
647 })
648}
649
650async fn handle_create_database_task(
651 ddl_manager: &DdlManager,
652 create_database_task: CreateDatabaseTask,
653) -> Result<SubmitDdlTaskResponse> {
654 let (id, _) = ddl_manager
655 .submit_create_database(create_database_task.clone())
656 .await?;
657
658 let procedure_id = id.to_string();
659 info!(
660 "Database {}.{} is created via procedure_id {id:?}",
661 create_database_task.catalog, create_database_task.schema
662 );
663
664 Ok(SubmitDdlTaskResponse {
665 key: procedure_id.into(),
666 ..Default::default()
667 })
668}
669
670async fn handle_drop_database_task(
671 ddl_manager: &DdlManager,
672 drop_database_task: DropDatabaseTask,
673) -> Result<SubmitDdlTaskResponse> {
674 let (id, _) = ddl_manager
675 .submit_drop_database(drop_database_task.clone())
676 .await?;
677
678 let procedure_id = id.to_string();
679 info!(
680 "Database {}.{} is dropped via procedure_id {id:?}",
681 drop_database_task.catalog, drop_database_task.schema
682 );
683
684 Ok(SubmitDdlTaskResponse {
685 key: procedure_id.into(),
686 ..Default::default()
687 })
688}
689
690async fn handle_alter_database_task(
691 ddl_manager: &DdlManager,
692 alter_database_task: AlterDatabaseTask,
693) -> Result<SubmitDdlTaskResponse> {
694 let (id, _) = ddl_manager
695 .submit_alter_database(alter_database_task.clone())
696 .await?;
697
698 let procedure_id = id.to_string();
699 info!(
700 "Database {}.{} is altered via procedure_id {id:?}",
701 alter_database_task.catalog(),
702 alter_database_task.schema()
703 );
704
705 Ok(SubmitDdlTaskResponse {
706 key: procedure_id.into(),
707 ..Default::default()
708 })
709}
710
711async fn handle_drop_flow_task(
712 ddl_manager: &DdlManager,
713 drop_flow_task: DropFlowTask,
714) -> Result<SubmitDdlTaskResponse> {
715 let (id, _) = ddl_manager
716 .submit_drop_flow_task(drop_flow_task.clone())
717 .await?;
718
719 let procedure_id = id.to_string();
720 info!(
721 "Flow {}.{}({}) is dropped via procedure_id {id:?}",
722 drop_flow_task.catalog_name, drop_flow_task.flow_name, drop_flow_task.flow_id,
723 );
724
725 Ok(SubmitDdlTaskResponse {
726 key: procedure_id.into(),
727 ..Default::default()
728 })
729}
730
731#[cfg(feature = "enterprise")]
732async fn handle_drop_trigger_task(
733 ddl_manager: &DdlManager,
734 drop_trigger_task: DropTriggerTask,
735 query_context: QueryContext,
736) -> Result<SubmitDdlTaskResponse> {
737 let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
738 use crate::error::UnsupportedSnafu;
739
740 return UnsupportedSnafu {
741 operation: "drop trigger",
742 }
743 .fail();
744 };
745
746 m.drop_trigger(
747 drop_trigger_task,
748 ddl_manager.procedure_manager.clone(),
749 ddl_manager.ddl_context.clone(),
750 query_context,
751 )
752 .await
753}
754
755async fn handle_drop_view_task(
756 ddl_manager: &DdlManager,
757 drop_view_task: DropViewTask,
758) -> Result<SubmitDdlTaskResponse> {
759 let (id, _) = ddl_manager
760 .submit_drop_view_task(drop_view_task.clone())
761 .await?;
762
763 let procedure_id = id.to_string();
764 info!(
765 "View {}({}) is dropped via procedure_id {id:?}",
766 drop_view_task.table_ref(),
767 drop_view_task.view_id,
768 );
769
770 Ok(SubmitDdlTaskResponse {
771 key: procedure_id.into(),
772 ..Default::default()
773 })
774}
775
776async fn handle_create_flow_task(
777 ddl_manager: &DdlManager,
778 create_flow_task: CreateFlowTask,
779 query_context: QueryContext,
780) -> Result<SubmitDdlTaskResponse> {
781 let (id, output) = ddl_manager
782 .submit_create_flow_task(create_flow_task.clone(), query_context)
783 .await?;
784
785 let procedure_id = id.to_string();
786 let output = output.context(ProcedureOutputSnafu {
787 procedure_id: &procedure_id,
788 err_msg: "empty output",
789 })?;
790 let flow_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
791 procedure_id: &procedure_id,
792 err_msg: "downcast to `u32`",
793 })?);
794 if !create_flow_task.or_replace {
795 info!(
796 "Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
797 create_flow_task.catalog_name, create_flow_task.flow_name,
798 );
799 } else {
800 info!(
801 "Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
802 create_flow_task.catalog_name, create_flow_task.flow_name,
803 );
804 }
805
806 Ok(SubmitDdlTaskResponse {
807 key: procedure_id.into(),
808 ..Default::default()
809 })
810}
811
812#[cfg(feature = "enterprise")]
813async fn handle_create_trigger_task(
814 ddl_manager: &DdlManager,
815 create_trigger_task: CreateTriggerTask,
816 query_context: QueryContext,
817) -> Result<SubmitDdlTaskResponse> {
818 let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
819 use crate::error::UnsupportedSnafu;
820
821 return UnsupportedSnafu {
822 operation: "create trigger",
823 }
824 .fail();
825 };
826
827 m.create_trigger(
828 create_trigger_task,
829 ddl_manager.procedure_manager.clone(),
830 ddl_manager.ddl_context.clone(),
831 query_context,
832 )
833 .await
834}
835
836async fn handle_alter_logical_table_tasks(
837 ddl_manager: &DdlManager,
838 alter_table_tasks: Vec<AlterTableTask>,
839) -> Result<SubmitDdlTaskResponse> {
840 ensure!(
841 !alter_table_tasks.is_empty(),
842 EmptyDdlTasksSnafu {
843 name: "alter logical tables"
844 }
845 );
846
847 let first_table = TableNameKey {
849 catalog: &alter_table_tasks[0].alter_table.catalog_name,
850 schema: &alter_table_tasks[0].alter_table.schema_name,
851 table: &alter_table_tasks[0].alter_table.table_name,
852 };
853 let physical_table_id =
854 utils::get_physical_table_id(ddl_manager.table_metadata_manager(), first_table).await?;
855 let num_logical_tables = alter_table_tasks.len();
856
857 let (id, _) = ddl_manager
858 .submit_alter_logical_table_tasks(alter_table_tasks, physical_table_id)
859 .await?;
860
861 info!(
862 "{num_logical_tables} logical tables on physical table: {physical_table_id:?} is altered via procedure_id {id:?}"
863 );
864
865 let procedure_id = id.to_string();
866
867 Ok(SubmitDdlTaskResponse {
868 key: procedure_id.into(),
869 ..Default::default()
870 })
871}
872
873async fn handle_create_view_task(
875 ddl_manager: &DdlManager,
876 create_view_task: CreateViewTask,
877) -> Result<SubmitDdlTaskResponse> {
878 let (id, output) = ddl_manager
879 .submit_create_view_task(create_view_task)
880 .await?;
881
882 let procedure_id = id.to_string();
883 let output = output.context(ProcedureOutputSnafu {
884 procedure_id: &procedure_id,
885 err_msg: "empty output",
886 })?;
887 let view_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
888 procedure_id: &procedure_id,
889 err_msg: "downcast to `u32`",
890 })?);
891 info!("View: {view_id} is created via procedure_id {id:?}");
892
893 Ok(SubmitDdlTaskResponse {
894 key: procedure_id.into(),
895 table_ids: vec![view_id],
896 })
897}
898
899#[cfg(test)]
900mod tests {
901 use std::sync::Arc;
902
903 use common_procedure::local::LocalManager;
904 use common_procedure::test_util::InMemoryPoisonStore;
905
906 use super::DdlManager;
907 use crate::cache_invalidator::DummyCacheInvalidator;
908 use crate::ddl::alter_table::AlterTableProcedure;
909 use crate::ddl::create_table::CreateTableProcedure;
910 use crate::ddl::drop_table::DropTableProcedure;
911 use crate::ddl::flow_meta::FlowMetadataAllocator;
912 use crate::ddl::table_meta::TableMetadataAllocator;
913 use crate::ddl::truncate_table::TruncateTableProcedure;
914 use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
915 use crate::key::TableMetadataManager;
916 use crate::key::flow::FlowMetadataManager;
917 use crate::kv_backend::memory::MemoryKvBackend;
918 use crate::node_manager::{DatanodeManager, DatanodeRef, FlownodeManager, FlownodeRef};
919 use crate::peer::Peer;
920 use crate::region_keeper::MemoryRegionKeeper;
921 use crate::region_registry::LeaderRegionRegistry;
922 use crate::sequence::SequenceBuilder;
923 use crate::state_store::KvStateStore;
924 use crate::wal_options_allocator::WalOptionsAllocator;
925
926 pub struct DummyDatanodeManager;
928
929 #[async_trait::async_trait]
930 impl DatanodeManager for DummyDatanodeManager {
931 async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
932 unimplemented!()
933 }
934 }
935
936 #[async_trait::async_trait]
937 impl FlownodeManager for DummyDatanodeManager {
938 async fn flownode(&self, _node: &Peer) -> FlownodeRef {
939 unimplemented!()
940 }
941 }
942
943 #[test]
944 fn test_try_new() {
945 let kv_backend = Arc::new(MemoryKvBackend::new());
946 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
947 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
948 Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
949 Arc::new(WalOptionsAllocator::default()),
950 ));
951 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
952 let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
953 Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()),
954 ));
955
956 let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
957 let poison_manager = Arc::new(InMemoryPoisonStore::default());
958 let procedure_manager = Arc::new(LocalManager::new(
959 Default::default(),
960 state_store,
961 poison_manager,
962 None,
963 None,
964 ));
965
966 let _ = DdlManager::try_new(
967 DdlContext {
968 node_manager: Arc::new(DummyDatanodeManager),
969 cache_invalidator: Arc::new(DummyCacheInvalidator),
970 table_metadata_manager,
971 table_metadata_allocator,
972 flow_metadata_manager,
973 flow_metadata_allocator,
974 memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
975 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
976 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
977 },
978 procedure_manager.clone(),
979 true,
980 );
981
982 let expected_loaders = vec![
983 CreateTableProcedure::TYPE_NAME,
984 AlterTableProcedure::TYPE_NAME,
985 DropTableProcedure::TYPE_NAME,
986 TruncateTableProcedure::TYPE_NAME,
987 ];
988
989 for loader in expected_loaders {
990 assert!(procedure_manager.contains_loader(loader));
991 }
992 }
993}