1use std::sync::Arc;
16
17use common_procedure::{
18 watcher, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId,
19};
20use common_telemetry::tracing_context::{FutureExt, TracingContext};
21use common_telemetry::{debug, info, tracing};
22use derive_builder::Builder;
23use snafu::{ensure, OptionExt, ResultExt};
24use store_api::storage::TableId;
25
26use crate::ddl::alter_database::AlterDatabaseProcedure;
27use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
28use crate::ddl::alter_table::AlterTableProcedure;
29use crate::ddl::create_database::CreateDatabaseProcedure;
30use crate::ddl::create_flow::CreateFlowProcedure;
31use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
32use crate::ddl::create_table::CreateTableProcedure;
33use crate::ddl::create_view::CreateViewProcedure;
34use crate::ddl::drop_database::DropDatabaseProcedure;
35use crate::ddl::drop_flow::DropFlowProcedure;
36use crate::ddl::drop_table::DropTableProcedure;
37use crate::ddl::drop_view::DropViewProcedure;
38use crate::ddl::truncate_table::TruncateTableProcedure;
39use crate::ddl::{utils, DdlContext};
40use crate::error::{
41 EmptyDdlTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result,
42 SubmitProcedureSnafu, TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
43 UnexpectedLogicalRouteTableSnafu, WaitProcedureSnafu,
44};
45use crate::key::table_info::TableInfoValue;
46use crate::key::table_name::TableNameKey;
47use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
48use crate::procedure_executor::ExecutorContext;
49#[cfg(feature = "enterprise")]
50use crate::rpc::ddl::trigger::CreateTriggerTask;
51#[cfg(feature = "enterprise")]
52use crate::rpc::ddl::trigger::DropTriggerTask;
53#[cfg(feature = "enterprise")]
54use crate::rpc::ddl::DdlTask::CreateTrigger;
55#[cfg(feature = "enterprise")]
56use crate::rpc::ddl::DdlTask::DropTrigger;
57use crate::rpc::ddl::DdlTask::{
58 AlterDatabase, AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables,
59 CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView,
60 TruncateTable,
61};
62use crate::rpc::ddl::{
63 AlterDatabaseTask, AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask,
64 CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, QueryContext,
65 SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
66};
67use crate::rpc::router::RegionRoute;
68
69pub type DdlManagerRef = Arc<DdlManager>;
70
71pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader;
72
73#[derive(Builder)]
75pub struct DdlManager {
76 ddl_context: DdlContext,
77 procedure_manager: ProcedureManagerRef,
78 #[cfg(feature = "enterprise")]
79 trigger_ddl_manager: Option<TriggerDdlManagerRef>,
80}
81
82#[cfg(feature = "enterprise")]
85#[async_trait::async_trait]
86pub trait TriggerDdlManager: Send + Sync {
87 async fn create_trigger(
88 &self,
89 create_trigger_task: CreateTriggerTask,
90 procedure_manager: ProcedureManagerRef,
91 ddl_context: DdlContext,
92 query_context: QueryContext,
93 ) -> Result<SubmitDdlTaskResponse>;
94
95 async fn drop_trigger(
96 &self,
97 drop_trigger_task: DropTriggerTask,
98 procedure_manager: ProcedureManagerRef,
99 ddl_context: DdlContext,
100 query_context: QueryContext,
101 ) -> Result<SubmitDdlTaskResponse>;
102
103 fn as_any(&self) -> &dyn std::any::Any;
104}
105
106#[cfg(feature = "enterprise")]
107pub type TriggerDdlManagerRef = Arc<dyn TriggerDdlManager>;
108
109macro_rules! procedure_loader_entry {
110 ($procedure:ident) => {
111 (
112 $procedure::TYPE_NAME,
113 &|context: DdlContext| -> BoxedProcedureLoader {
114 Box::new(move |json: &str| {
115 let context = context.clone();
116 $procedure::from_json(json, context).map(|p| Box::new(p) as _)
117 })
118 },
119 )
120 };
121}
122
123macro_rules! procedure_loader {
124 ($($procedure:ident),*) => {
125 vec![
126 $(procedure_loader_entry!($procedure)),*
127 ]
128 };
129}
130
131impl DdlManager {
132 pub fn try_new(
134 ddl_context: DdlContext,
135 procedure_manager: ProcedureManagerRef,
136 register_loaders: bool,
137 ) -> Result<Self> {
138 let manager = Self {
139 ddl_context,
140 procedure_manager,
141 #[cfg(feature = "enterprise")]
142 trigger_ddl_manager: None,
143 };
144 if register_loaders {
145 manager.register_loaders()?;
146 }
147 Ok(manager)
148 }
149
150 #[cfg(feature = "enterprise")]
151 pub fn with_trigger_ddl_manager(
152 mut self,
153 trigger_ddl_manager: Option<TriggerDdlManagerRef>,
154 ) -> Self {
155 self.trigger_ddl_manager = trigger_ddl_manager;
156 self
157 }
158
159 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
161 &self.ddl_context.table_metadata_manager
162 }
163
164 pub fn create_context(&self) -> DdlContext {
166 self.ddl_context.clone()
167 }
168
169 pub fn register_loaders(&self) -> Result<()> {
171 let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = procedure_loader!(
172 CreateTableProcedure,
173 CreateLogicalTablesProcedure,
174 CreateViewProcedure,
175 CreateFlowProcedure,
176 AlterTableProcedure,
177 AlterLogicalTablesProcedure,
178 AlterDatabaseProcedure,
179 DropTableProcedure,
180 DropFlowProcedure,
181 TruncateTableProcedure,
182 CreateDatabaseProcedure,
183 DropDatabaseProcedure,
184 DropViewProcedure
185 );
186
187 for (type_name, loader_factory) in loaders {
188 let context = self.create_context();
189 self.procedure_manager
190 .register_loader(type_name, loader_factory(context))
191 .context(RegisterProcedureLoaderSnafu { type_name })?;
192 }
193
194 Ok(())
195 }
196
197 #[tracing::instrument(skip_all)]
199 pub async fn submit_alter_table_task(
200 &self,
201 table_id: TableId,
202 alter_table_task: AlterTableTask,
203 ) -> Result<(ProcedureId, Option<Output>)> {
204 let context = self.create_context();
205
206 let procedure = AlterTableProcedure::new(table_id, alter_table_task, context)?;
207
208 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
209
210 self.submit_procedure(procedure_with_id).await
211 }
212
213 #[tracing::instrument(skip_all)]
215 pub async fn submit_create_table_task(
216 &self,
217 create_table_task: CreateTableTask,
218 ) -> Result<(ProcedureId, Option<Output>)> {
219 let context = self.create_context();
220
221 let procedure = CreateTableProcedure::new(create_table_task, context);
222
223 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
224
225 self.submit_procedure(procedure_with_id).await
226 }
227
228 #[tracing::instrument(skip_all)]
230 pub async fn submit_create_view_task(
231 &self,
232 create_view_task: CreateViewTask,
233 ) -> Result<(ProcedureId, Option<Output>)> {
234 let context = self.create_context();
235
236 let procedure = CreateViewProcedure::new(create_view_task, context);
237
238 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
239
240 self.submit_procedure(procedure_with_id).await
241 }
242
243 #[tracing::instrument(skip_all)]
245 pub async fn submit_create_logical_table_tasks(
246 &self,
247 create_table_tasks: Vec<CreateTableTask>,
248 physical_table_id: TableId,
249 ) -> Result<(ProcedureId, Option<Output>)> {
250 let context = self.create_context();
251
252 let procedure =
253 CreateLogicalTablesProcedure::new(create_table_tasks, physical_table_id, context);
254
255 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
256
257 self.submit_procedure(procedure_with_id).await
258 }
259
260 #[tracing::instrument(skip_all)]
262 pub async fn submit_alter_logical_table_tasks(
263 &self,
264 alter_table_tasks: Vec<AlterTableTask>,
265 physical_table_id: TableId,
266 ) -> Result<(ProcedureId, Option<Output>)> {
267 let context = self.create_context();
268
269 let procedure =
270 AlterLogicalTablesProcedure::new(alter_table_tasks, physical_table_id, context);
271
272 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
273
274 self.submit_procedure(procedure_with_id).await
275 }
276
277 #[tracing::instrument(skip_all)]
279 pub async fn submit_drop_table_task(
280 &self,
281 drop_table_task: DropTableTask,
282 ) -> Result<(ProcedureId, Option<Output>)> {
283 let context = self.create_context();
284
285 let procedure = DropTableProcedure::new(drop_table_task, context);
286
287 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
288
289 self.submit_procedure(procedure_with_id).await
290 }
291
292 #[tracing::instrument(skip_all)]
294 pub async fn submit_create_database(
295 &self,
296 CreateDatabaseTask {
297 catalog,
298 schema,
299 create_if_not_exists,
300 options,
301 }: CreateDatabaseTask,
302 ) -> Result<(ProcedureId, Option<Output>)> {
303 let context = self.create_context();
304 let procedure =
305 CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context);
306 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
307
308 self.submit_procedure(procedure_with_id).await
309 }
310
311 #[tracing::instrument(skip_all)]
313 pub async fn submit_drop_database(
314 &self,
315 DropDatabaseTask {
316 catalog,
317 schema,
318 drop_if_exists,
319 }: DropDatabaseTask,
320 ) -> Result<(ProcedureId, Option<Output>)> {
321 let context = self.create_context();
322 let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context);
323 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
324
325 self.submit_procedure(procedure_with_id).await
326 }
327
328 pub async fn submit_alter_database(
329 &self,
330 alter_database_task: AlterDatabaseTask,
331 ) -> Result<(ProcedureId, Option<Output>)> {
332 let context = self.create_context();
333 let procedure = AlterDatabaseProcedure::new(alter_database_task, context)?;
334 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
335
336 self.submit_procedure(procedure_with_id).await
337 }
338
339 #[tracing::instrument(skip_all)]
341 pub async fn submit_create_flow_task(
342 &self,
343 create_flow: CreateFlowTask,
344 query_context: QueryContext,
345 ) -> Result<(ProcedureId, Option<Output>)> {
346 let context = self.create_context();
347 let procedure = CreateFlowProcedure::new(create_flow, query_context, context);
348 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
349
350 self.submit_procedure(procedure_with_id).await
351 }
352
353 #[tracing::instrument(skip_all)]
355 pub async fn submit_drop_flow_task(
356 &self,
357 drop_flow: DropFlowTask,
358 ) -> Result<(ProcedureId, Option<Output>)> {
359 let context = self.create_context();
360 let procedure = DropFlowProcedure::new(drop_flow, context);
361 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
362
363 self.submit_procedure(procedure_with_id).await
364 }
365
366 #[tracing::instrument(skip_all)]
368 pub async fn submit_drop_view_task(
369 &self,
370 drop_view: DropViewTask,
371 ) -> Result<(ProcedureId, Option<Output>)> {
372 let context = self.create_context();
373 let procedure = DropViewProcedure::new(drop_view, context);
374 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
375
376 self.submit_procedure(procedure_with_id).await
377 }
378
379 #[tracing::instrument(skip_all)]
381 pub async fn submit_truncate_table_task(
382 &self,
383 truncate_table_task: TruncateTableTask,
384 table_info_value: DeserializedValueWithBytes<TableInfoValue>,
385 region_routes: Vec<RegionRoute>,
386 ) -> Result<(ProcedureId, Option<Output>)> {
387 let context = self.create_context();
388 let procedure = TruncateTableProcedure::new(
389 truncate_table_task,
390 table_info_value,
391 region_routes,
392 context,
393 );
394
395 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
396
397 self.submit_procedure(procedure_with_id).await
398 }
399
400 async fn submit_procedure(
401 &self,
402 procedure_with_id: ProcedureWithId,
403 ) -> Result<(ProcedureId, Option<Output>)> {
404 let procedure_id = procedure_with_id.id;
405
406 let mut watcher = self
407 .procedure_manager
408 .submit(procedure_with_id)
409 .await
410 .context(SubmitProcedureSnafu)?;
411
412 let output = watcher::wait(&mut watcher)
413 .await
414 .context(WaitProcedureSnafu)?;
415
416 Ok((procedure_id, output))
417 }
418
419 pub async fn submit_ddl_task(
420 &self,
421 ctx: &ExecutorContext,
422 request: SubmitDdlTaskRequest,
423 ) -> Result<SubmitDdlTaskResponse> {
424 let span = ctx
425 .tracing_context
426 .as_ref()
427 .map(TracingContext::from_w3c)
428 .unwrap_or_else(TracingContext::from_current_span)
429 .attach(tracing::info_span!("DdlManager::submit_ddl_task"));
430 async move {
431 debug!("Submitting Ddl task: {:?}", request.task);
432 match request.task {
433 CreateTable(create_table_task) => {
434 handle_create_table_task(self, create_table_task).await
435 }
436 DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await,
437 AlterTable(alter_table_task) => {
438 handle_alter_table_task(self, alter_table_task).await
439 }
440 TruncateTable(truncate_table_task) => {
441 handle_truncate_table_task(self, truncate_table_task).await
442 }
443 CreateLogicalTables(create_table_tasks) => {
444 handle_create_logical_table_tasks(self, create_table_tasks).await
445 }
446 AlterLogicalTables(alter_table_tasks) => {
447 handle_alter_logical_table_tasks(self, alter_table_tasks).await
448 }
449 DropLogicalTables(_) => todo!(),
450 CreateDatabase(create_database_task) => {
451 handle_create_database_task(self, create_database_task).await
452 }
453 DropDatabase(drop_database_task) => {
454 handle_drop_database_task(self, drop_database_task).await
455 }
456 AlterDatabase(alter_database_task) => {
457 handle_alter_database_task(self, alter_database_task).await
458 }
459 CreateFlow(create_flow_task) => {
460 handle_create_flow_task(self, create_flow_task, request.query_context.into())
461 .await
462 }
463 DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
464 CreateView(create_view_task) => {
465 handle_create_view_task(self, create_view_task).await
466 }
467 DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
468 #[cfg(feature = "enterprise")]
469 CreateTrigger(create_trigger_task) => {
470 handle_create_trigger_task(
471 self,
472 create_trigger_task,
473 request.query_context.into(),
474 )
475 .await
476 }
477 #[cfg(feature = "enterprise")]
478 DropTrigger(drop_trigger_task) => {
479 handle_drop_trigger_task(self, drop_trigger_task, request.query_context.into())
480 .await
481 }
482 }
483 }
484 .trace(span)
485 .await
486 }
487}
488
489async fn handle_truncate_table_task(
490 ddl_manager: &DdlManager,
491 truncate_table_task: TruncateTableTask,
492) -> Result<SubmitDdlTaskResponse> {
493 let table_id = truncate_table_task.table_id;
494 let table_metadata_manager = &ddl_manager.table_metadata_manager();
495 let table_ref = truncate_table_task.table_ref();
496
497 let (table_info_value, table_route_value) =
498 table_metadata_manager.get_full_table_info(table_id).await?;
499
500 let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu {
501 table: table_ref.to_string(),
502 })?;
503
504 let table_route_value = table_route_value.context(TableRouteNotFoundSnafu { table_id })?;
505
506 let table_route = table_route_value.into_inner().region_routes()?.clone();
507
508 let (id, _) = ddl_manager
509 .submit_truncate_table_task(truncate_table_task, table_info_value, table_route)
510 .await?;
511
512 info!("Table: {table_id} is truncated via procedure_id {id:?}");
513
514 Ok(SubmitDdlTaskResponse {
515 key: id.to_string().into(),
516 ..Default::default()
517 })
518}
519
520async fn handle_alter_table_task(
521 ddl_manager: &DdlManager,
522 alter_table_task: AlterTableTask,
523) -> Result<SubmitDdlTaskResponse> {
524 let table_ref = alter_table_task.table_ref();
525
526 let table_id = ddl_manager
527 .table_metadata_manager()
528 .table_name_manager()
529 .get(TableNameKey::new(
530 table_ref.catalog,
531 table_ref.schema,
532 table_ref.table,
533 ))
534 .await?
535 .with_context(|| TableNotFoundSnafu {
536 table_name: table_ref.to_string(),
537 })?
538 .table_id();
539
540 let table_route_value = ddl_manager
541 .table_metadata_manager()
542 .table_route_manager()
543 .table_route_storage()
544 .get(table_id)
545 .await?
546 .context(TableRouteNotFoundSnafu { table_id })?;
547 ensure!(
548 table_route_value.is_physical(),
549 UnexpectedLogicalRouteTableSnafu {
550 err_msg: format!("{:?} is a non-physical TableRouteValue.", table_ref),
551 }
552 );
553
554 let (id, _) = ddl_manager
555 .submit_alter_table_task(table_id, alter_table_task)
556 .await?;
557
558 info!("Table: {table_id} is altered via procedure_id {id:?}");
559
560 Ok(SubmitDdlTaskResponse {
561 key: id.to_string().into(),
562 ..Default::default()
563 })
564}
565
566async fn handle_drop_table_task(
567 ddl_manager: &DdlManager,
568 drop_table_task: DropTableTask,
569) -> Result<SubmitDdlTaskResponse> {
570 let table_id = drop_table_task.table_id;
571 let (id, _) = ddl_manager.submit_drop_table_task(drop_table_task).await?;
572
573 info!("Table: {table_id} is dropped via procedure_id {id:?}");
574
575 Ok(SubmitDdlTaskResponse {
576 key: id.to_string().into(),
577 ..Default::default()
578 })
579}
580
581async fn handle_create_table_task(
582 ddl_manager: &DdlManager,
583 create_table_task: CreateTableTask,
584) -> Result<SubmitDdlTaskResponse> {
585 let (id, output) = ddl_manager
586 .submit_create_table_task(create_table_task)
587 .await?;
588
589 let procedure_id = id.to_string();
590 let output = output.context(ProcedureOutputSnafu {
591 procedure_id: &procedure_id,
592 err_msg: "empty output",
593 })?;
594 let table_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
595 procedure_id: &procedure_id,
596 err_msg: "downcast to `u32`",
597 })?);
598 info!("Table: {table_id} is created via procedure_id {id:?}");
599
600 Ok(SubmitDdlTaskResponse {
601 key: procedure_id.into(),
602 table_ids: vec![table_id],
603 })
604}
605
606async fn handle_create_logical_table_tasks(
607 ddl_manager: &DdlManager,
608 create_table_tasks: Vec<CreateTableTask>,
609) -> Result<SubmitDdlTaskResponse> {
610 ensure!(
611 !create_table_tasks.is_empty(),
612 EmptyDdlTasksSnafu {
613 name: "create logical tables"
614 }
615 );
616 let physical_table_id = utils::check_and_get_physical_table_id(
617 ddl_manager.table_metadata_manager(),
618 &create_table_tasks,
619 )
620 .await?;
621 let num_logical_tables = create_table_tasks.len();
622
623 let (id, output) = ddl_manager
624 .submit_create_logical_table_tasks(create_table_tasks, physical_table_id)
625 .await?;
626
627 info!("{num_logical_tables} logical tables on physical table: {physical_table_id:?} is created via procedure_id {id:?}");
628
629 let procedure_id = id.to_string();
630 let output = output.context(ProcedureOutputSnafu {
631 procedure_id: &procedure_id,
632 err_msg: "empty output",
633 })?;
634 let table_ids = output
635 .downcast_ref::<Vec<TableId>>()
636 .context(ProcedureOutputSnafu {
637 procedure_id: &procedure_id,
638 err_msg: "downcast to `Vec<TableId>`",
639 })?
640 .clone();
641
642 Ok(SubmitDdlTaskResponse {
643 key: procedure_id.into(),
644 table_ids,
645 })
646}
647
648async fn handle_create_database_task(
649 ddl_manager: &DdlManager,
650 create_database_task: CreateDatabaseTask,
651) -> Result<SubmitDdlTaskResponse> {
652 let (id, _) = ddl_manager
653 .submit_create_database(create_database_task.clone())
654 .await?;
655
656 let procedure_id = id.to_string();
657 info!(
658 "Database {}.{} is created via procedure_id {id:?}",
659 create_database_task.catalog, create_database_task.schema
660 );
661
662 Ok(SubmitDdlTaskResponse {
663 key: procedure_id.into(),
664 ..Default::default()
665 })
666}
667
668async fn handle_drop_database_task(
669 ddl_manager: &DdlManager,
670 drop_database_task: DropDatabaseTask,
671) -> Result<SubmitDdlTaskResponse> {
672 let (id, _) = ddl_manager
673 .submit_drop_database(drop_database_task.clone())
674 .await?;
675
676 let procedure_id = id.to_string();
677 info!(
678 "Database {}.{} is dropped via procedure_id {id:?}",
679 drop_database_task.catalog, drop_database_task.schema
680 );
681
682 Ok(SubmitDdlTaskResponse {
683 key: procedure_id.into(),
684 ..Default::default()
685 })
686}
687
688async fn handle_alter_database_task(
689 ddl_manager: &DdlManager,
690 alter_database_task: AlterDatabaseTask,
691) -> Result<SubmitDdlTaskResponse> {
692 let (id, _) = ddl_manager
693 .submit_alter_database(alter_database_task.clone())
694 .await?;
695
696 let procedure_id = id.to_string();
697 info!(
698 "Database {}.{} is altered via procedure_id {id:?}",
699 alter_database_task.catalog(),
700 alter_database_task.schema()
701 );
702
703 Ok(SubmitDdlTaskResponse {
704 key: procedure_id.into(),
705 ..Default::default()
706 })
707}
708
709async fn handle_drop_flow_task(
710 ddl_manager: &DdlManager,
711 drop_flow_task: DropFlowTask,
712) -> Result<SubmitDdlTaskResponse> {
713 let (id, _) = ddl_manager
714 .submit_drop_flow_task(drop_flow_task.clone())
715 .await?;
716
717 let procedure_id = id.to_string();
718 info!(
719 "Flow {}.{}({}) is dropped via procedure_id {id:?}",
720 drop_flow_task.catalog_name, drop_flow_task.flow_name, drop_flow_task.flow_id,
721 );
722
723 Ok(SubmitDdlTaskResponse {
724 key: procedure_id.into(),
725 ..Default::default()
726 })
727}
728
729#[cfg(feature = "enterprise")]
730async fn handle_drop_trigger_task(
731 ddl_manager: &DdlManager,
732 drop_trigger_task: DropTriggerTask,
733 query_context: QueryContext,
734) -> Result<SubmitDdlTaskResponse> {
735 let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
736 use crate::error::UnsupportedSnafu;
737
738 return UnsupportedSnafu {
739 operation: "drop trigger",
740 }
741 .fail();
742 };
743
744 m.drop_trigger(
745 drop_trigger_task,
746 ddl_manager.procedure_manager.clone(),
747 ddl_manager.ddl_context.clone(),
748 query_context,
749 )
750 .await
751}
752
753async fn handle_drop_view_task(
754 ddl_manager: &DdlManager,
755 drop_view_task: DropViewTask,
756) -> Result<SubmitDdlTaskResponse> {
757 let (id, _) = ddl_manager
758 .submit_drop_view_task(drop_view_task.clone())
759 .await?;
760
761 let procedure_id = id.to_string();
762 info!(
763 "View {}({}) is dropped via procedure_id {id:?}",
764 drop_view_task.table_ref(),
765 drop_view_task.view_id,
766 );
767
768 Ok(SubmitDdlTaskResponse {
769 key: procedure_id.into(),
770 ..Default::default()
771 })
772}
773
774async fn handle_create_flow_task(
775 ddl_manager: &DdlManager,
776 create_flow_task: CreateFlowTask,
777 query_context: QueryContext,
778) -> Result<SubmitDdlTaskResponse> {
779 let (id, output) = ddl_manager
780 .submit_create_flow_task(create_flow_task.clone(), query_context)
781 .await?;
782
783 let procedure_id = id.to_string();
784 let output = output.context(ProcedureOutputSnafu {
785 procedure_id: &procedure_id,
786 err_msg: "empty output",
787 })?;
788 let flow_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
789 procedure_id: &procedure_id,
790 err_msg: "downcast to `u32`",
791 })?);
792 if !create_flow_task.or_replace {
793 info!(
794 "Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
795 create_flow_task.catalog_name, create_flow_task.flow_name,
796 );
797 } else {
798 info!(
799 "Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
800 create_flow_task.catalog_name, create_flow_task.flow_name,
801 );
802 }
803
804 Ok(SubmitDdlTaskResponse {
805 key: procedure_id.into(),
806 ..Default::default()
807 })
808}
809
810#[cfg(feature = "enterprise")]
811async fn handle_create_trigger_task(
812 ddl_manager: &DdlManager,
813 create_trigger_task: CreateTriggerTask,
814 query_context: QueryContext,
815) -> Result<SubmitDdlTaskResponse> {
816 let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
817 use crate::error::UnsupportedSnafu;
818
819 return UnsupportedSnafu {
820 operation: "create trigger",
821 }
822 .fail();
823 };
824
825 m.create_trigger(
826 create_trigger_task,
827 ddl_manager.procedure_manager.clone(),
828 ddl_manager.ddl_context.clone(),
829 query_context,
830 )
831 .await
832}
833
834async fn handle_alter_logical_table_tasks(
835 ddl_manager: &DdlManager,
836 alter_table_tasks: Vec<AlterTableTask>,
837) -> Result<SubmitDdlTaskResponse> {
838 ensure!(
839 !alter_table_tasks.is_empty(),
840 EmptyDdlTasksSnafu {
841 name: "alter logical tables"
842 }
843 );
844
845 let first_table = TableNameKey {
847 catalog: &alter_table_tasks[0].alter_table.catalog_name,
848 schema: &alter_table_tasks[0].alter_table.schema_name,
849 table: &alter_table_tasks[0].alter_table.table_name,
850 };
851 let physical_table_id =
852 utils::get_physical_table_id(ddl_manager.table_metadata_manager(), first_table).await?;
853 let num_logical_tables = alter_table_tasks.len();
854
855 let (id, _) = ddl_manager
856 .submit_alter_logical_table_tasks(alter_table_tasks, physical_table_id)
857 .await?;
858
859 info!("{num_logical_tables} logical tables on physical table: {physical_table_id:?} is altered via procedure_id {id:?}");
860
861 let procedure_id = id.to_string();
862
863 Ok(SubmitDdlTaskResponse {
864 key: procedure_id.into(),
865 ..Default::default()
866 })
867}
868
869async fn handle_create_view_task(
871 ddl_manager: &DdlManager,
872 create_view_task: CreateViewTask,
873) -> Result<SubmitDdlTaskResponse> {
874 let (id, output) = ddl_manager
875 .submit_create_view_task(create_view_task)
876 .await?;
877
878 let procedure_id = id.to_string();
879 let output = output.context(ProcedureOutputSnafu {
880 procedure_id: &procedure_id,
881 err_msg: "empty output",
882 })?;
883 let view_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
884 procedure_id: &procedure_id,
885 err_msg: "downcast to `u32`",
886 })?);
887 info!("View: {view_id} is created via procedure_id {id:?}");
888
889 Ok(SubmitDdlTaskResponse {
890 key: procedure_id.into(),
891 table_ids: vec![view_id],
892 })
893}
894
895#[cfg(test)]
896mod tests {
897 use std::sync::Arc;
898
899 use common_procedure::local::LocalManager;
900 use common_procedure::test_util::InMemoryPoisonStore;
901
902 use super::DdlManager;
903 use crate::cache_invalidator::DummyCacheInvalidator;
904 use crate::ddl::alter_table::AlterTableProcedure;
905 use crate::ddl::create_table::CreateTableProcedure;
906 use crate::ddl::drop_table::DropTableProcedure;
907 use crate::ddl::flow_meta::FlowMetadataAllocator;
908 use crate::ddl::table_meta::TableMetadataAllocator;
909 use crate::ddl::truncate_table::TruncateTableProcedure;
910 use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
911 use crate::key::flow::FlowMetadataManager;
912 use crate::key::TableMetadataManager;
913 use crate::kv_backend::memory::MemoryKvBackend;
914 use crate::node_manager::{DatanodeManager, DatanodeRef, FlownodeManager, FlownodeRef};
915 use crate::peer::Peer;
916 use crate::region_keeper::MemoryRegionKeeper;
917 use crate::region_registry::LeaderRegionRegistry;
918 use crate::sequence::SequenceBuilder;
919 use crate::state_store::KvStateStore;
920 use crate::wal_options_allocator::WalOptionsAllocator;
921
922 pub struct DummyDatanodeManager;
924
925 #[async_trait::async_trait]
926 impl DatanodeManager for DummyDatanodeManager {
927 async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
928 unimplemented!()
929 }
930 }
931
932 #[async_trait::async_trait]
933 impl FlownodeManager for DummyDatanodeManager {
934 async fn flownode(&self, _node: &Peer) -> FlownodeRef {
935 unimplemented!()
936 }
937 }
938
939 #[test]
940 fn test_try_new() {
941 let kv_backend = Arc::new(MemoryKvBackend::new());
942 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
943 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
944 Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
945 Arc::new(WalOptionsAllocator::default()),
946 ));
947 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
948 let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
949 Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()),
950 ));
951
952 let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
953 let poison_manager = Arc::new(InMemoryPoisonStore::default());
954 let procedure_manager = Arc::new(LocalManager::new(
955 Default::default(),
956 state_store,
957 poison_manager,
958 None,
959 None,
960 ));
961
962 let _ = DdlManager::try_new(
963 DdlContext {
964 node_manager: Arc::new(DummyDatanodeManager),
965 cache_invalidator: Arc::new(DummyCacheInvalidator),
966 table_metadata_manager,
967 table_metadata_allocator,
968 flow_metadata_manager,
969 flow_metadata_allocator,
970 memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
971 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
972 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
973 },
974 procedure_manager.clone(),
975 true,
976 );
977
978 let expected_loaders = vec![
979 CreateTableProcedure::TYPE_NAME,
980 AlterTableProcedure::TYPE_NAME,
981 DropTableProcedure::TYPE_NAME,
982 TruncateTableProcedure::TYPE_NAME,
983 ];
984
985 for loader in expected_loaders {
986 assert!(procedure_manager.contains_loader(loader));
987 }
988 }
989}