1use std::sync::Arc;
16
17use api::v1::meta::ProcedureDetailResponse;
18use common_procedure::{
19 watcher, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId,
20};
21use common_telemetry::tracing_context::{FutureExt, TracingContext};
22use common_telemetry::{debug, info, tracing};
23use derive_builder::Builder;
24use snafu::{ensure, OptionExt, ResultExt};
25use store_api::storage::TableId;
26
27use crate::ddl::alter_database::AlterDatabaseProcedure;
28use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
29use crate::ddl::alter_table::AlterTableProcedure;
30use crate::ddl::create_database::CreateDatabaseProcedure;
31use crate::ddl::create_flow::CreateFlowProcedure;
32use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
33use crate::ddl::create_table::CreateTableProcedure;
34use crate::ddl::create_view::CreateViewProcedure;
35use crate::ddl::drop_database::DropDatabaseProcedure;
36use crate::ddl::drop_flow::DropFlowProcedure;
37use crate::ddl::drop_table::DropTableProcedure;
38use crate::ddl::drop_view::DropViewProcedure;
39use crate::ddl::truncate_table::TruncateTableProcedure;
40use crate::ddl::{utils, DdlContext, ExecutorContext, ProcedureExecutor};
41use crate::error::{
42 EmptyDdlTasksSnafu, ParseProcedureIdSnafu, ProcedureNotFoundSnafu, ProcedureOutputSnafu,
43 QueryProcedureSnafu, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu,
44 TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
45 UnexpectedLogicalRouteTableSnafu, UnsupportedSnafu, WaitProcedureSnafu,
46};
47use crate::key::table_info::TableInfoValue;
48use crate::key::table_name::TableNameKey;
49use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
50#[cfg(feature = "enterprise")]
51use crate::rpc::ddl::trigger::CreateTriggerTask;
52#[cfg(feature = "enterprise")]
53use crate::rpc::ddl::DdlTask::CreateTrigger;
54use crate::rpc::ddl::DdlTask::{
55 AlterDatabase, AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables,
56 CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView,
57 TruncateTable,
58};
59use crate::rpc::ddl::{
60 AlterDatabaseTask, AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask,
61 CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, QueryContext,
62 SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
63};
64use crate::rpc::procedure;
65use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
66use crate::rpc::router::RegionRoute;
67
68pub type DdlManagerRef = Arc<DdlManager>;
69
70pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader;
71
72#[derive(Builder)]
74pub struct DdlManager {
75 ddl_context: DdlContext,
76 procedure_manager: ProcedureManagerRef,
77 #[cfg(feature = "enterprise")]
78 trigger_ddl_manager: Option<TriggerDdlManagerRef>,
79}
80
81#[cfg(feature = "enterprise")]
84#[async_trait::async_trait]
85pub trait TriggerDdlManager: Send + Sync {
86 async fn create_trigger(
87 &self,
88 create_trigger_task: CreateTriggerTask,
89 procedure_manager: ProcedureManagerRef,
90 ddl_context: DdlContext,
91 query_context: QueryContext,
92 ) -> Result<SubmitDdlTaskResponse>;
93
94 fn as_any(&self) -> &dyn std::any::Any;
95}
96
97#[cfg(feature = "enterprise")]
98pub type TriggerDdlManagerRef = Arc<dyn TriggerDdlManager>;
99
100macro_rules! procedure_loader_entry {
101 ($procedure:ident) => {
102 (
103 $procedure::TYPE_NAME,
104 &|context: DdlContext| -> BoxedProcedureLoader {
105 Box::new(move |json: &str| {
106 let context = context.clone();
107 $procedure::from_json(json, context).map(|p| Box::new(p) as _)
108 })
109 },
110 )
111 };
112}
113
114macro_rules! procedure_loader {
115 ($($procedure:ident),*) => {
116 vec![
117 $(procedure_loader_entry!($procedure)),*
118 ]
119 };
120}
121
122impl DdlManager {
123 pub fn try_new(
125 ddl_context: DdlContext,
126 procedure_manager: ProcedureManagerRef,
127 register_loaders: bool,
128 #[cfg(feature = "enterprise")] trigger_ddl_manager: Option<TriggerDdlManagerRef>,
129 ) -> Result<Self> {
130 let manager = Self {
131 ddl_context,
132 procedure_manager,
133 #[cfg(feature = "enterprise")]
134 trigger_ddl_manager,
135 };
136 if register_loaders {
137 manager.register_loaders()?;
138 }
139 Ok(manager)
140 }
141
142 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
144 &self.ddl_context.table_metadata_manager
145 }
146
147 pub fn create_context(&self) -> DdlContext {
149 self.ddl_context.clone()
150 }
151
152 pub fn register_loaders(&self) -> Result<()> {
154 let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = procedure_loader!(
155 CreateTableProcedure,
156 CreateLogicalTablesProcedure,
157 CreateViewProcedure,
158 CreateFlowProcedure,
159 AlterTableProcedure,
160 AlterLogicalTablesProcedure,
161 AlterDatabaseProcedure,
162 DropTableProcedure,
163 DropFlowProcedure,
164 TruncateTableProcedure,
165 CreateDatabaseProcedure,
166 DropDatabaseProcedure,
167 DropViewProcedure
168 );
169
170 for (type_name, loader_factory) in loaders {
171 let context = self.create_context();
172 self.procedure_manager
173 .register_loader(type_name, loader_factory(context))
174 .context(RegisterProcedureLoaderSnafu { type_name })?;
175 }
176
177 Ok(())
178 }
179
180 #[tracing::instrument(skip_all)]
182 pub async fn submit_alter_table_task(
183 &self,
184 table_id: TableId,
185 alter_table_task: AlterTableTask,
186 ) -> Result<(ProcedureId, Option<Output>)> {
187 let context = self.create_context();
188
189 let procedure = AlterTableProcedure::new(table_id, alter_table_task, context)?;
190
191 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
192
193 self.submit_procedure(procedure_with_id).await
194 }
195
196 #[tracing::instrument(skip_all)]
198 pub async fn submit_create_table_task(
199 &self,
200 create_table_task: CreateTableTask,
201 ) -> Result<(ProcedureId, Option<Output>)> {
202 let context = self.create_context();
203
204 let procedure = CreateTableProcedure::new(create_table_task, context);
205
206 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
207
208 self.submit_procedure(procedure_with_id).await
209 }
210
211 #[tracing::instrument(skip_all)]
213 pub async fn submit_create_view_task(
214 &self,
215 create_view_task: CreateViewTask,
216 ) -> Result<(ProcedureId, Option<Output>)> {
217 let context = self.create_context();
218
219 let procedure = CreateViewProcedure::new(create_view_task, context);
220
221 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
222
223 self.submit_procedure(procedure_with_id).await
224 }
225
226 #[tracing::instrument(skip_all)]
228 pub async fn submit_create_logical_table_tasks(
229 &self,
230 create_table_tasks: Vec<CreateTableTask>,
231 physical_table_id: TableId,
232 ) -> Result<(ProcedureId, Option<Output>)> {
233 let context = self.create_context();
234
235 let procedure =
236 CreateLogicalTablesProcedure::new(create_table_tasks, physical_table_id, context);
237
238 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
239
240 self.submit_procedure(procedure_with_id).await
241 }
242
243 #[tracing::instrument(skip_all)]
245 pub async fn submit_alter_logical_table_tasks(
246 &self,
247 alter_table_tasks: Vec<AlterTableTask>,
248 physical_table_id: TableId,
249 ) -> Result<(ProcedureId, Option<Output>)> {
250 let context = self.create_context();
251
252 let procedure =
253 AlterLogicalTablesProcedure::new(alter_table_tasks, physical_table_id, context);
254
255 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
256
257 self.submit_procedure(procedure_with_id).await
258 }
259
260 #[tracing::instrument(skip_all)]
262 pub async fn submit_drop_table_task(
263 &self,
264 drop_table_task: DropTableTask,
265 ) -> Result<(ProcedureId, Option<Output>)> {
266 let context = self.create_context();
267
268 let procedure = DropTableProcedure::new(drop_table_task, context);
269
270 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
271
272 self.submit_procedure(procedure_with_id).await
273 }
274
275 #[tracing::instrument(skip_all)]
277 pub async fn submit_create_database(
278 &self,
279 CreateDatabaseTask {
280 catalog,
281 schema,
282 create_if_not_exists,
283 options,
284 }: CreateDatabaseTask,
285 ) -> Result<(ProcedureId, Option<Output>)> {
286 let context = self.create_context();
287 let procedure =
288 CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context);
289 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
290
291 self.submit_procedure(procedure_with_id).await
292 }
293
294 #[tracing::instrument(skip_all)]
296 pub async fn submit_drop_database(
297 &self,
298 DropDatabaseTask {
299 catalog,
300 schema,
301 drop_if_exists,
302 }: DropDatabaseTask,
303 ) -> Result<(ProcedureId, Option<Output>)> {
304 let context = self.create_context();
305 let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context);
306 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
307
308 self.submit_procedure(procedure_with_id).await
309 }
310
311 pub async fn submit_alter_database(
312 &self,
313 alter_database_task: AlterDatabaseTask,
314 ) -> Result<(ProcedureId, Option<Output>)> {
315 let context = self.create_context();
316 let procedure = AlterDatabaseProcedure::new(alter_database_task, context)?;
317 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
318
319 self.submit_procedure(procedure_with_id).await
320 }
321
322 #[tracing::instrument(skip_all)]
324 pub async fn submit_create_flow_task(
325 &self,
326 create_flow: CreateFlowTask,
327 query_context: QueryContext,
328 ) -> Result<(ProcedureId, Option<Output>)> {
329 let context = self.create_context();
330 let procedure = CreateFlowProcedure::new(create_flow, query_context, context);
331 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
332
333 self.submit_procedure(procedure_with_id).await
334 }
335
336 #[tracing::instrument(skip_all)]
338 pub async fn submit_drop_flow_task(
339 &self,
340 drop_flow: DropFlowTask,
341 ) -> Result<(ProcedureId, Option<Output>)> {
342 let context = self.create_context();
343 let procedure = DropFlowProcedure::new(drop_flow, context);
344 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
345
346 self.submit_procedure(procedure_with_id).await
347 }
348
349 #[tracing::instrument(skip_all)]
351 pub async fn submit_drop_view_task(
352 &self,
353 drop_view: DropViewTask,
354 ) -> Result<(ProcedureId, Option<Output>)> {
355 let context = self.create_context();
356 let procedure = DropViewProcedure::new(drop_view, context);
357 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
358
359 self.submit_procedure(procedure_with_id).await
360 }
361
362 #[tracing::instrument(skip_all)]
364 pub async fn submit_truncate_table_task(
365 &self,
366 truncate_table_task: TruncateTableTask,
367 table_info_value: DeserializedValueWithBytes<TableInfoValue>,
368 region_routes: Vec<RegionRoute>,
369 ) -> Result<(ProcedureId, Option<Output>)> {
370 let context = self.create_context();
371 let procedure = TruncateTableProcedure::new(
372 truncate_table_task,
373 table_info_value,
374 region_routes,
375 context,
376 );
377
378 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
379
380 self.submit_procedure(procedure_with_id).await
381 }
382
383 async fn submit_procedure(
384 &self,
385 procedure_with_id: ProcedureWithId,
386 ) -> Result<(ProcedureId, Option<Output>)> {
387 let procedure_id = procedure_with_id.id;
388
389 let mut watcher = self
390 .procedure_manager
391 .submit(procedure_with_id)
392 .await
393 .context(SubmitProcedureSnafu)?;
394
395 let output = watcher::wait(&mut watcher)
396 .await
397 .context(WaitProcedureSnafu)?;
398
399 Ok((procedure_id, output))
400 }
401}
402
403async fn handle_truncate_table_task(
404 ddl_manager: &DdlManager,
405 truncate_table_task: TruncateTableTask,
406) -> Result<SubmitDdlTaskResponse> {
407 let table_id = truncate_table_task.table_id;
408 let table_metadata_manager = &ddl_manager.table_metadata_manager();
409 let table_ref = truncate_table_task.table_ref();
410
411 let (table_info_value, table_route_value) =
412 table_metadata_manager.get_full_table_info(table_id).await?;
413
414 let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu {
415 table: table_ref.to_string(),
416 })?;
417
418 let table_route_value = table_route_value.context(TableRouteNotFoundSnafu { table_id })?;
419
420 let table_route = table_route_value.into_inner().region_routes()?.clone();
421
422 let (id, _) = ddl_manager
423 .submit_truncate_table_task(truncate_table_task, table_info_value, table_route)
424 .await?;
425
426 info!("Table: {table_id} is truncated via procedure_id {id:?}");
427
428 Ok(SubmitDdlTaskResponse {
429 key: id.to_string().into(),
430 ..Default::default()
431 })
432}
433
434async fn handle_alter_table_task(
435 ddl_manager: &DdlManager,
436 alter_table_task: AlterTableTask,
437) -> Result<SubmitDdlTaskResponse> {
438 let table_ref = alter_table_task.table_ref();
439
440 let table_id = ddl_manager
441 .table_metadata_manager()
442 .table_name_manager()
443 .get(TableNameKey::new(
444 table_ref.catalog,
445 table_ref.schema,
446 table_ref.table,
447 ))
448 .await?
449 .with_context(|| TableNotFoundSnafu {
450 table_name: table_ref.to_string(),
451 })?
452 .table_id();
453
454 let table_route_value = ddl_manager
455 .table_metadata_manager()
456 .table_route_manager()
457 .table_route_storage()
458 .get(table_id)
459 .await?
460 .context(TableRouteNotFoundSnafu { table_id })?;
461 ensure!(
462 table_route_value.is_physical(),
463 UnexpectedLogicalRouteTableSnafu {
464 err_msg: format!("{:?} is a non-physical TableRouteValue.", table_ref),
465 }
466 );
467
468 let (id, _) = ddl_manager
469 .submit_alter_table_task(table_id, alter_table_task)
470 .await?;
471
472 info!("Table: {table_id} is altered via procedure_id {id:?}");
473
474 Ok(SubmitDdlTaskResponse {
475 key: id.to_string().into(),
476 ..Default::default()
477 })
478}
479
480async fn handle_drop_table_task(
481 ddl_manager: &DdlManager,
482 drop_table_task: DropTableTask,
483) -> Result<SubmitDdlTaskResponse> {
484 let table_id = drop_table_task.table_id;
485 let (id, _) = ddl_manager.submit_drop_table_task(drop_table_task).await?;
486
487 info!("Table: {table_id} is dropped via procedure_id {id:?}");
488
489 Ok(SubmitDdlTaskResponse {
490 key: id.to_string().into(),
491 ..Default::default()
492 })
493}
494
495async fn handle_create_table_task(
496 ddl_manager: &DdlManager,
497 create_table_task: CreateTableTask,
498) -> Result<SubmitDdlTaskResponse> {
499 let (id, output) = ddl_manager
500 .submit_create_table_task(create_table_task)
501 .await?;
502
503 let procedure_id = id.to_string();
504 let output = output.context(ProcedureOutputSnafu {
505 procedure_id: &procedure_id,
506 err_msg: "empty output",
507 })?;
508 let table_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
509 procedure_id: &procedure_id,
510 err_msg: "downcast to `u32`",
511 })?);
512 info!("Table: {table_id} is created via procedure_id {id:?}");
513
514 Ok(SubmitDdlTaskResponse {
515 key: procedure_id.into(),
516 table_ids: vec![table_id],
517 })
518}
519
520async fn handle_create_logical_table_tasks(
521 ddl_manager: &DdlManager,
522 create_table_tasks: Vec<CreateTableTask>,
523) -> Result<SubmitDdlTaskResponse> {
524 ensure!(
525 !create_table_tasks.is_empty(),
526 EmptyDdlTasksSnafu {
527 name: "create logical tables"
528 }
529 );
530 let physical_table_id = utils::check_and_get_physical_table_id(
531 ddl_manager.table_metadata_manager(),
532 &create_table_tasks,
533 )
534 .await?;
535 let num_logical_tables = create_table_tasks.len();
536
537 let (id, output) = ddl_manager
538 .submit_create_logical_table_tasks(create_table_tasks, physical_table_id)
539 .await?;
540
541 info!("{num_logical_tables} logical tables on physical table: {physical_table_id:?} is created via procedure_id {id:?}");
542
543 let procedure_id = id.to_string();
544 let output = output.context(ProcedureOutputSnafu {
545 procedure_id: &procedure_id,
546 err_msg: "empty output",
547 })?;
548 let table_ids = output
549 .downcast_ref::<Vec<TableId>>()
550 .context(ProcedureOutputSnafu {
551 procedure_id: &procedure_id,
552 err_msg: "downcast to `Vec<TableId>`",
553 })?
554 .clone();
555
556 Ok(SubmitDdlTaskResponse {
557 key: procedure_id.into(),
558 table_ids,
559 })
560}
561
562async fn handle_create_database_task(
563 ddl_manager: &DdlManager,
564 create_database_task: CreateDatabaseTask,
565) -> Result<SubmitDdlTaskResponse> {
566 let (id, _) = ddl_manager
567 .submit_create_database(create_database_task.clone())
568 .await?;
569
570 let procedure_id = id.to_string();
571 info!(
572 "Database {}.{} is created via procedure_id {id:?}",
573 create_database_task.catalog, create_database_task.schema
574 );
575
576 Ok(SubmitDdlTaskResponse {
577 key: procedure_id.into(),
578 ..Default::default()
579 })
580}
581
582async fn handle_drop_database_task(
583 ddl_manager: &DdlManager,
584 drop_database_task: DropDatabaseTask,
585) -> Result<SubmitDdlTaskResponse> {
586 let (id, _) = ddl_manager
587 .submit_drop_database(drop_database_task.clone())
588 .await?;
589
590 let procedure_id = id.to_string();
591 info!(
592 "Database {}.{} is dropped via procedure_id {id:?}",
593 drop_database_task.catalog, drop_database_task.schema
594 );
595
596 Ok(SubmitDdlTaskResponse {
597 key: procedure_id.into(),
598 ..Default::default()
599 })
600}
601
602async fn handle_alter_database_task(
603 ddl_manager: &DdlManager,
604 alter_database_task: AlterDatabaseTask,
605) -> Result<SubmitDdlTaskResponse> {
606 let (id, _) = ddl_manager
607 .submit_alter_database(alter_database_task.clone())
608 .await?;
609
610 let procedure_id = id.to_string();
611 info!(
612 "Database {}.{} is altered via procedure_id {id:?}",
613 alter_database_task.catalog(),
614 alter_database_task.schema()
615 );
616
617 Ok(SubmitDdlTaskResponse {
618 key: procedure_id.into(),
619 ..Default::default()
620 })
621}
622
623async fn handle_drop_flow_task(
624 ddl_manager: &DdlManager,
625 drop_flow_task: DropFlowTask,
626) -> Result<SubmitDdlTaskResponse> {
627 let (id, _) = ddl_manager
628 .submit_drop_flow_task(drop_flow_task.clone())
629 .await?;
630
631 let procedure_id = id.to_string();
632 info!(
633 "Flow {}.{}({}) is dropped via procedure_id {id:?}",
634 drop_flow_task.catalog_name, drop_flow_task.flow_name, drop_flow_task.flow_id,
635 );
636
637 Ok(SubmitDdlTaskResponse {
638 key: procedure_id.into(),
639 ..Default::default()
640 })
641}
642
643async fn handle_drop_view_task(
644 ddl_manager: &DdlManager,
645 drop_view_task: DropViewTask,
646) -> Result<SubmitDdlTaskResponse> {
647 let (id, _) = ddl_manager
648 .submit_drop_view_task(drop_view_task.clone())
649 .await?;
650
651 let procedure_id = id.to_string();
652 info!(
653 "View {}({}) is dropped via procedure_id {id:?}",
654 drop_view_task.table_ref(),
655 drop_view_task.view_id,
656 );
657
658 Ok(SubmitDdlTaskResponse {
659 key: procedure_id.into(),
660 ..Default::default()
661 })
662}
663
664async fn handle_create_flow_task(
665 ddl_manager: &DdlManager,
666 create_flow_task: CreateFlowTask,
667 query_context: QueryContext,
668) -> Result<SubmitDdlTaskResponse> {
669 let (id, output) = ddl_manager
670 .submit_create_flow_task(create_flow_task.clone(), query_context)
671 .await?;
672
673 let procedure_id = id.to_string();
674 let output = output.context(ProcedureOutputSnafu {
675 procedure_id: &procedure_id,
676 err_msg: "empty output",
677 })?;
678 let flow_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
679 procedure_id: &procedure_id,
680 err_msg: "downcast to `u32`",
681 })?);
682 if !create_flow_task.or_replace {
683 info!(
684 "Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
685 create_flow_task.catalog_name, create_flow_task.flow_name,
686 );
687 } else {
688 info!(
689 "Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
690 create_flow_task.catalog_name, create_flow_task.flow_name,
691 );
692 }
693
694 Ok(SubmitDdlTaskResponse {
695 key: procedure_id.into(),
696 ..Default::default()
697 })
698}
699
700#[cfg(feature = "enterprise")]
701async fn handle_create_trigger_task(
702 ddl_manager: &DdlManager,
703 create_trigger_task: CreateTriggerTask,
704 query_context: QueryContext,
705) -> Result<SubmitDdlTaskResponse> {
706 let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
707 return UnsupportedSnafu {
708 operation: "create trigger",
709 }
710 .fail();
711 };
712
713 m.create_trigger(
714 create_trigger_task,
715 ddl_manager.procedure_manager.clone(),
716 ddl_manager.ddl_context.clone(),
717 query_context,
718 )
719 .await
720}
721
722async fn handle_alter_logical_table_tasks(
723 ddl_manager: &DdlManager,
724 alter_table_tasks: Vec<AlterTableTask>,
725) -> Result<SubmitDdlTaskResponse> {
726 ensure!(
727 !alter_table_tasks.is_empty(),
728 EmptyDdlTasksSnafu {
729 name: "alter logical tables"
730 }
731 );
732
733 let first_table = TableNameKey {
735 catalog: &alter_table_tasks[0].alter_table.catalog_name,
736 schema: &alter_table_tasks[0].alter_table.schema_name,
737 table: &alter_table_tasks[0].alter_table.table_name,
738 };
739 let physical_table_id =
740 utils::get_physical_table_id(ddl_manager.table_metadata_manager(), first_table).await?;
741 let num_logical_tables = alter_table_tasks.len();
742
743 let (id, _) = ddl_manager
744 .submit_alter_logical_table_tasks(alter_table_tasks, physical_table_id)
745 .await?;
746
747 info!("{num_logical_tables} logical tables on physical table: {physical_table_id:?} is altered via procedure_id {id:?}");
748
749 let procedure_id = id.to_string();
750
751 Ok(SubmitDdlTaskResponse {
752 key: procedure_id.into(),
753 ..Default::default()
754 })
755}
756
757async fn handle_create_view_task(
759 ddl_manager: &DdlManager,
760 create_view_task: CreateViewTask,
761) -> Result<SubmitDdlTaskResponse> {
762 let (id, output) = ddl_manager
763 .submit_create_view_task(create_view_task)
764 .await?;
765
766 let procedure_id = id.to_string();
767 let output = output.context(ProcedureOutputSnafu {
768 procedure_id: &procedure_id,
769 err_msg: "empty output",
770 })?;
771 let view_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
772 procedure_id: &procedure_id,
773 err_msg: "downcast to `u32`",
774 })?);
775 info!("View: {view_id} is created via procedure_id {id:?}");
776
777 Ok(SubmitDdlTaskResponse {
778 key: procedure_id.into(),
779 table_ids: vec![view_id],
780 })
781}
782
783#[async_trait::async_trait]
785impl ProcedureExecutor for DdlManager {
786 async fn submit_ddl_task(
787 &self,
788 ctx: &ExecutorContext,
789 request: SubmitDdlTaskRequest,
790 ) -> Result<SubmitDdlTaskResponse> {
791 let span = ctx
792 .tracing_context
793 .as_ref()
794 .map(TracingContext::from_w3c)
795 .unwrap_or(TracingContext::from_current_span())
796 .attach(tracing::info_span!("DdlManager::submit_ddl_task"));
797 async move {
798 debug!("Submitting Ddl task: {:?}", request.task);
799 match request.task {
800 CreateTable(create_table_task) => {
801 handle_create_table_task(self, create_table_task).await
802 }
803 DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await,
804 AlterTable(alter_table_task) => {
805 handle_alter_table_task(self, alter_table_task).await
806 }
807 TruncateTable(truncate_table_task) => {
808 handle_truncate_table_task(self, truncate_table_task).await
809 }
810 CreateLogicalTables(create_table_tasks) => {
811 handle_create_logical_table_tasks(self, create_table_tasks).await
812 }
813 AlterLogicalTables(alter_table_tasks) => {
814 handle_alter_logical_table_tasks(self, alter_table_tasks).await
815 }
816 DropLogicalTables(_) => todo!(),
817 CreateDatabase(create_database_task) => {
818 handle_create_database_task(self, create_database_task).await
819 }
820 DropDatabase(drop_database_task) => {
821 handle_drop_database_task(self, drop_database_task).await
822 }
823 AlterDatabase(alter_database_task) => {
824 handle_alter_database_task(self, alter_database_task).await
825 }
826 CreateFlow(create_flow_task) => {
827 handle_create_flow_task(self, create_flow_task, request.query_context.into())
828 .await
829 }
830 #[cfg(feature = "enterprise")]
831 CreateTrigger(create_trigger_task) => {
832 handle_create_trigger_task(
833 self,
834 create_trigger_task,
835 request.query_context.into(),
836 )
837 .await
838 }
839 DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
840 CreateView(create_view_task) => {
841 handle_create_view_task(self, create_view_task).await
842 }
843 DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
844 }
845 }
846 .trace(span)
847 .await
848 }
849
850 async fn migrate_region(
851 &self,
852 _ctx: &ExecutorContext,
853 _request: MigrateRegionRequest,
854 ) -> Result<MigrateRegionResponse> {
855 UnsupportedSnafu {
856 operation: "migrate_region",
857 }
858 .fail()
859 }
860
861 async fn query_procedure_state(
862 &self,
863 _ctx: &ExecutorContext,
864 pid: &str,
865 ) -> Result<ProcedureStateResponse> {
866 let pid =
867 ProcedureId::parse_str(pid).with_context(|_| ParseProcedureIdSnafu { key: pid })?;
868
869 let state = self
870 .procedure_manager
871 .procedure_state(pid)
872 .await
873 .context(QueryProcedureSnafu)?
874 .context(ProcedureNotFoundSnafu {
875 pid: pid.to_string(),
876 })?;
877
878 Ok(procedure::procedure_state_to_pb_response(&state))
879 }
880
881 async fn list_procedures(&self, _ctx: &ExecutorContext) -> Result<ProcedureDetailResponse> {
882 let metas = self
883 .procedure_manager
884 .list_procedures()
885 .await
886 .context(QueryProcedureSnafu)?;
887 Ok(procedure::procedure_details_to_pb_response(metas))
888 }
889}
890
891#[cfg(test)]
892mod tests {
893 use std::sync::Arc;
894
895 use common_procedure::local::LocalManager;
896 use common_procedure::test_util::InMemoryPoisonStore;
897
898 use super::DdlManager;
899 use crate::cache_invalidator::DummyCacheInvalidator;
900 use crate::ddl::alter_table::AlterTableProcedure;
901 use crate::ddl::create_table::CreateTableProcedure;
902 use crate::ddl::drop_table::DropTableProcedure;
903 use crate::ddl::flow_meta::FlowMetadataAllocator;
904 use crate::ddl::table_meta::TableMetadataAllocator;
905 use crate::ddl::truncate_table::TruncateTableProcedure;
906 use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
907 use crate::key::flow::FlowMetadataManager;
908 use crate::key::TableMetadataManager;
909 use crate::kv_backend::memory::MemoryKvBackend;
910 use crate::node_manager::{DatanodeRef, FlownodeRef, NodeManager};
911 use crate::peer::Peer;
912 use crate::region_keeper::MemoryRegionKeeper;
913 use crate::region_registry::LeaderRegionRegistry;
914 use crate::sequence::SequenceBuilder;
915 use crate::state_store::KvStateStore;
916 use crate::wal_options_allocator::WalOptionsAllocator;
917
918 pub struct DummyDatanodeManager;
920
921 #[async_trait::async_trait]
922 impl NodeManager for DummyDatanodeManager {
923 async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
924 unimplemented!()
925 }
926
927 async fn flownode(&self, _node: &Peer) -> FlownodeRef {
928 unimplemented!()
929 }
930 }
931
932 #[test]
933 fn test_try_new() {
934 let kv_backend = Arc::new(MemoryKvBackend::new());
935 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
936 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
937 Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
938 Arc::new(WalOptionsAllocator::default()),
939 ));
940 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
941 let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
942 Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()),
943 ));
944
945 let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
946 let poison_manager = Arc::new(InMemoryPoisonStore::default());
947 let procedure_manager = Arc::new(LocalManager::new(
948 Default::default(),
949 state_store,
950 poison_manager,
951 ));
952
953 let _ = DdlManager::try_new(
954 DdlContext {
955 node_manager: Arc::new(DummyDatanodeManager),
956 cache_invalidator: Arc::new(DummyCacheInvalidator),
957 table_metadata_manager,
958 table_metadata_allocator,
959 flow_metadata_manager,
960 flow_metadata_allocator,
961 memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
962 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
963 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
964 },
965 procedure_manager.clone(),
966 true,
967 #[cfg(feature = "enterprise")]
968 None,
969 );
970
971 let expected_loaders = vec![
972 CreateTableProcedure::TYPE_NAME,
973 AlterTableProcedure::TYPE_NAME,
974 DropTableProcedure::TYPE_NAME,
975 TruncateTableProcedure::TYPE_NAME,
976 ];
977
978 for loader in expected_loaders {
979 assert!(procedure_manager.contains_loader(loader));
980 }
981 }
982}