1use std::sync::Arc;
16
17use api::v1::meta::ProcedureDetailResponse;
18use common_procedure::{
19 watcher, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId,
20};
21use common_telemetry::tracing_context::{FutureExt, TracingContext};
22use common_telemetry::{debug, info, tracing};
23use derive_builder::Builder;
24use snafu::{ensure, OptionExt, ResultExt};
25use store_api::storage::TableId;
26
27use crate::ddl::alter_database::AlterDatabaseProcedure;
28use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
29use crate::ddl::alter_table::AlterTableProcedure;
30use crate::ddl::create_database::CreateDatabaseProcedure;
31use crate::ddl::create_flow::CreateFlowProcedure;
32use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
33use crate::ddl::create_table::CreateTableProcedure;
34use crate::ddl::create_view::CreateViewProcedure;
35use crate::ddl::drop_database::DropDatabaseProcedure;
36use crate::ddl::drop_flow::DropFlowProcedure;
37use crate::ddl::drop_table::DropTableProcedure;
38use crate::ddl::drop_view::DropViewProcedure;
39use crate::ddl::truncate_table::TruncateTableProcedure;
40use crate::ddl::{utils, DdlContext, ExecutorContext, ProcedureExecutor};
41use crate::error::{
42 EmptyDdlTasksSnafu, ParseProcedureIdSnafu, ProcedureNotFoundSnafu, ProcedureOutputSnafu,
43 QueryProcedureSnafu, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu,
44 TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
45 UnexpectedLogicalRouteTableSnafu, UnsupportedSnafu, WaitProcedureSnafu,
46};
47use crate::key::table_info::TableInfoValue;
48use crate::key::table_name::TableNameKey;
49use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
50use crate::rpc::ddl::DdlTask::{
51 AlterDatabase, AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables,
52 CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView,
53 TruncateTable,
54};
55use crate::rpc::ddl::{
56 AlterDatabaseTask, AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask,
57 CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, QueryContext,
58 SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
59};
60use crate::rpc::procedure;
61use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
62use crate::rpc::router::RegionRoute;
63
64pub type DdlManagerRef = Arc<DdlManager>;
65
66pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader;
67
68#[derive(Builder)]
70pub struct DdlManager {
71 ddl_context: DdlContext,
72 procedure_manager: ProcedureManagerRef,
73}
74
75macro_rules! procedure_loader_entry {
76 ($procedure:ident) => {
77 (
78 $procedure::TYPE_NAME,
79 &|context: DdlContext| -> BoxedProcedureLoader {
80 Box::new(move |json: &str| {
81 let context = context.clone();
82 $procedure::from_json(json, context).map(|p| Box::new(p) as _)
83 })
84 },
85 )
86 };
87}
88
89macro_rules! procedure_loader {
90 ($($procedure:ident),*) => {
91 vec![
92 $(procedure_loader_entry!($procedure)),*
93 ]
94 };
95}
96
97impl DdlManager {
98 pub fn try_new(
100 ddl_context: DdlContext,
101 procedure_manager: ProcedureManagerRef,
102 register_loaders: bool,
103 ) -> Result<Self> {
104 let manager = Self {
105 ddl_context,
106 procedure_manager,
107 };
108 if register_loaders {
109 manager.register_loaders()?;
110 }
111 Ok(manager)
112 }
113
114 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
116 &self.ddl_context.table_metadata_manager
117 }
118
119 pub fn create_context(&self) -> DdlContext {
121 self.ddl_context.clone()
122 }
123
124 pub fn register_loaders(&self) -> Result<()> {
126 let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = procedure_loader!(
127 CreateTableProcedure,
128 CreateLogicalTablesProcedure,
129 CreateViewProcedure,
130 CreateFlowProcedure,
131 AlterTableProcedure,
132 AlterLogicalTablesProcedure,
133 AlterDatabaseProcedure,
134 DropTableProcedure,
135 DropFlowProcedure,
136 TruncateTableProcedure,
137 CreateDatabaseProcedure,
138 DropDatabaseProcedure,
139 DropViewProcedure
140 );
141
142 for (type_name, loader_factory) in loaders {
143 let context = self.create_context();
144 self.procedure_manager
145 .register_loader(type_name, loader_factory(context))
146 .context(RegisterProcedureLoaderSnafu { type_name })?;
147 }
148
149 Ok(())
150 }
151
152 #[tracing::instrument(skip_all)]
154 pub async fn submit_alter_table_task(
155 &self,
156 table_id: TableId,
157 alter_table_task: AlterTableTask,
158 ) -> Result<(ProcedureId, Option<Output>)> {
159 let context = self.create_context();
160
161 let procedure = AlterTableProcedure::new(table_id, alter_table_task, context)?;
162
163 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
164
165 self.submit_procedure(procedure_with_id).await
166 }
167
168 #[tracing::instrument(skip_all)]
170 pub async fn submit_create_table_task(
171 &self,
172 create_table_task: CreateTableTask,
173 ) -> Result<(ProcedureId, Option<Output>)> {
174 let context = self.create_context();
175
176 let procedure = CreateTableProcedure::new(create_table_task, context);
177
178 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
179
180 self.submit_procedure(procedure_with_id).await
181 }
182
183 #[tracing::instrument(skip_all)]
185 pub async fn submit_create_view_task(
186 &self,
187 create_view_task: CreateViewTask,
188 ) -> Result<(ProcedureId, Option<Output>)> {
189 let context = self.create_context();
190
191 let procedure = CreateViewProcedure::new(create_view_task, context);
192
193 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
194
195 self.submit_procedure(procedure_with_id).await
196 }
197
198 #[tracing::instrument(skip_all)]
200 pub async fn submit_create_logical_table_tasks(
201 &self,
202 create_table_tasks: Vec<CreateTableTask>,
203 physical_table_id: TableId,
204 ) -> Result<(ProcedureId, Option<Output>)> {
205 let context = self.create_context();
206
207 let procedure =
208 CreateLogicalTablesProcedure::new(create_table_tasks, physical_table_id, context);
209
210 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
211
212 self.submit_procedure(procedure_with_id).await
213 }
214
215 #[tracing::instrument(skip_all)]
217 pub async fn submit_alter_logical_table_tasks(
218 &self,
219 alter_table_tasks: Vec<AlterTableTask>,
220 physical_table_id: TableId,
221 ) -> Result<(ProcedureId, Option<Output>)> {
222 let context = self.create_context();
223
224 let procedure =
225 AlterLogicalTablesProcedure::new(alter_table_tasks, physical_table_id, context);
226
227 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
228
229 self.submit_procedure(procedure_with_id).await
230 }
231
232 #[tracing::instrument(skip_all)]
234 pub async fn submit_drop_table_task(
235 &self,
236 drop_table_task: DropTableTask,
237 ) -> Result<(ProcedureId, Option<Output>)> {
238 let context = self.create_context();
239
240 let procedure = DropTableProcedure::new(drop_table_task, context);
241
242 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
243
244 self.submit_procedure(procedure_with_id).await
245 }
246
247 #[tracing::instrument(skip_all)]
249 pub async fn submit_create_database(
250 &self,
251 CreateDatabaseTask {
252 catalog,
253 schema,
254 create_if_not_exists,
255 options,
256 }: CreateDatabaseTask,
257 ) -> Result<(ProcedureId, Option<Output>)> {
258 let context = self.create_context();
259 let procedure =
260 CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context);
261 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
262
263 self.submit_procedure(procedure_with_id).await
264 }
265
266 #[tracing::instrument(skip_all)]
268 pub async fn submit_drop_database(
269 &self,
270 DropDatabaseTask {
271 catalog,
272 schema,
273 drop_if_exists,
274 }: DropDatabaseTask,
275 ) -> Result<(ProcedureId, Option<Output>)> {
276 let context = self.create_context();
277 let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context);
278 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
279
280 self.submit_procedure(procedure_with_id).await
281 }
282
283 pub async fn submit_alter_database(
284 &self,
285 alter_database_task: AlterDatabaseTask,
286 ) -> Result<(ProcedureId, Option<Output>)> {
287 let context = self.create_context();
288 let procedure = AlterDatabaseProcedure::new(alter_database_task, context)?;
289 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
290
291 self.submit_procedure(procedure_with_id).await
292 }
293
294 #[tracing::instrument(skip_all)]
296 pub async fn submit_create_flow_task(
297 &self,
298 create_flow: CreateFlowTask,
299 query_context: QueryContext,
300 ) -> Result<(ProcedureId, Option<Output>)> {
301 let context = self.create_context();
302 let procedure = CreateFlowProcedure::new(create_flow, query_context, context);
303 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
304
305 self.submit_procedure(procedure_with_id).await
306 }
307
308 #[tracing::instrument(skip_all)]
310 pub async fn submit_drop_flow_task(
311 &self,
312 drop_flow: DropFlowTask,
313 ) -> Result<(ProcedureId, Option<Output>)> {
314 let context = self.create_context();
315 let procedure = DropFlowProcedure::new(drop_flow, context);
316 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
317
318 self.submit_procedure(procedure_with_id).await
319 }
320
321 #[tracing::instrument(skip_all)]
323 pub async fn submit_drop_view_task(
324 &self,
325 drop_view: DropViewTask,
326 ) -> Result<(ProcedureId, Option<Output>)> {
327 let context = self.create_context();
328 let procedure = DropViewProcedure::new(drop_view, context);
329 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
330
331 self.submit_procedure(procedure_with_id).await
332 }
333
334 #[tracing::instrument(skip_all)]
336 pub async fn submit_truncate_table_task(
337 &self,
338 truncate_table_task: TruncateTableTask,
339 table_info_value: DeserializedValueWithBytes<TableInfoValue>,
340 region_routes: Vec<RegionRoute>,
341 ) -> Result<(ProcedureId, Option<Output>)> {
342 let context = self.create_context();
343 let procedure = TruncateTableProcedure::new(
344 truncate_table_task,
345 table_info_value,
346 region_routes,
347 context,
348 );
349
350 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
351
352 self.submit_procedure(procedure_with_id).await
353 }
354
355 async fn submit_procedure(
356 &self,
357 procedure_with_id: ProcedureWithId,
358 ) -> Result<(ProcedureId, Option<Output>)> {
359 let procedure_id = procedure_with_id.id;
360
361 let mut watcher = self
362 .procedure_manager
363 .submit(procedure_with_id)
364 .await
365 .context(SubmitProcedureSnafu)?;
366
367 let output = watcher::wait(&mut watcher)
368 .await
369 .context(WaitProcedureSnafu)?;
370
371 Ok((procedure_id, output))
372 }
373}
374
375async fn handle_truncate_table_task(
376 ddl_manager: &DdlManager,
377 truncate_table_task: TruncateTableTask,
378) -> Result<SubmitDdlTaskResponse> {
379 let table_id = truncate_table_task.table_id;
380 let table_metadata_manager = &ddl_manager.table_metadata_manager();
381 let table_ref = truncate_table_task.table_ref();
382
383 let (table_info_value, table_route_value) =
384 table_metadata_manager.get_full_table_info(table_id).await?;
385
386 let table_info_value = table_info_value.with_context(|| TableInfoNotFoundSnafu {
387 table: table_ref.to_string(),
388 })?;
389
390 let table_route_value = table_route_value.context(TableRouteNotFoundSnafu { table_id })?;
391
392 let table_route = table_route_value.into_inner().region_routes()?.clone();
393
394 let (id, _) = ddl_manager
395 .submit_truncate_table_task(truncate_table_task, table_info_value, table_route)
396 .await?;
397
398 info!("Table: {table_id} is truncated via procedure_id {id:?}");
399
400 Ok(SubmitDdlTaskResponse {
401 key: id.to_string().into(),
402 ..Default::default()
403 })
404}
405
406async fn handle_alter_table_task(
407 ddl_manager: &DdlManager,
408 alter_table_task: AlterTableTask,
409) -> Result<SubmitDdlTaskResponse> {
410 let table_ref = alter_table_task.table_ref();
411
412 let table_id = ddl_manager
413 .table_metadata_manager()
414 .table_name_manager()
415 .get(TableNameKey::new(
416 table_ref.catalog,
417 table_ref.schema,
418 table_ref.table,
419 ))
420 .await?
421 .with_context(|| TableNotFoundSnafu {
422 table_name: table_ref.to_string(),
423 })?
424 .table_id();
425
426 let table_route_value = ddl_manager
427 .table_metadata_manager()
428 .table_route_manager()
429 .table_route_storage()
430 .get(table_id)
431 .await?
432 .context(TableRouteNotFoundSnafu { table_id })?;
433 ensure!(
434 table_route_value.is_physical(),
435 UnexpectedLogicalRouteTableSnafu {
436 err_msg: format!("{:?} is a non-physical TableRouteValue.", table_ref),
437 }
438 );
439
440 let (id, _) = ddl_manager
441 .submit_alter_table_task(table_id, alter_table_task)
442 .await?;
443
444 info!("Table: {table_id} is altered via procedure_id {id:?}");
445
446 Ok(SubmitDdlTaskResponse {
447 key: id.to_string().into(),
448 ..Default::default()
449 })
450}
451
452async fn handle_drop_table_task(
453 ddl_manager: &DdlManager,
454 drop_table_task: DropTableTask,
455) -> Result<SubmitDdlTaskResponse> {
456 let table_id = drop_table_task.table_id;
457 let (id, _) = ddl_manager.submit_drop_table_task(drop_table_task).await?;
458
459 info!("Table: {table_id} is dropped via procedure_id {id:?}");
460
461 Ok(SubmitDdlTaskResponse {
462 key: id.to_string().into(),
463 ..Default::default()
464 })
465}
466
467async fn handle_create_table_task(
468 ddl_manager: &DdlManager,
469 create_table_task: CreateTableTask,
470) -> Result<SubmitDdlTaskResponse> {
471 let (id, output) = ddl_manager
472 .submit_create_table_task(create_table_task)
473 .await?;
474
475 let procedure_id = id.to_string();
476 let output = output.context(ProcedureOutputSnafu {
477 procedure_id: &procedure_id,
478 err_msg: "empty output",
479 })?;
480 let table_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
481 procedure_id: &procedure_id,
482 err_msg: "downcast to `u32`",
483 })?);
484 info!("Table: {table_id} is created via procedure_id {id:?}");
485
486 Ok(SubmitDdlTaskResponse {
487 key: procedure_id.into(),
488 table_ids: vec![table_id],
489 })
490}
491
492async fn handle_create_logical_table_tasks(
493 ddl_manager: &DdlManager,
494 create_table_tasks: Vec<CreateTableTask>,
495) -> Result<SubmitDdlTaskResponse> {
496 ensure!(
497 !create_table_tasks.is_empty(),
498 EmptyDdlTasksSnafu {
499 name: "create logical tables"
500 }
501 );
502 let physical_table_id = utils::check_and_get_physical_table_id(
503 ddl_manager.table_metadata_manager(),
504 &create_table_tasks,
505 )
506 .await?;
507 let num_logical_tables = create_table_tasks.len();
508
509 let (id, output) = ddl_manager
510 .submit_create_logical_table_tasks(create_table_tasks, physical_table_id)
511 .await?;
512
513 info!("{num_logical_tables} logical tables on physical table: {physical_table_id:?} is created via procedure_id {id:?}");
514
515 let procedure_id = id.to_string();
516 let output = output.context(ProcedureOutputSnafu {
517 procedure_id: &procedure_id,
518 err_msg: "empty output",
519 })?;
520 let table_ids = output
521 .downcast_ref::<Vec<TableId>>()
522 .context(ProcedureOutputSnafu {
523 procedure_id: &procedure_id,
524 err_msg: "downcast to `Vec<TableId>`",
525 })?
526 .clone();
527
528 Ok(SubmitDdlTaskResponse {
529 key: procedure_id.into(),
530 table_ids,
531 })
532}
533
534async fn handle_create_database_task(
535 ddl_manager: &DdlManager,
536 create_database_task: CreateDatabaseTask,
537) -> Result<SubmitDdlTaskResponse> {
538 let (id, _) = ddl_manager
539 .submit_create_database(create_database_task.clone())
540 .await?;
541
542 let procedure_id = id.to_string();
543 info!(
544 "Database {}.{} is created via procedure_id {id:?}",
545 create_database_task.catalog, create_database_task.schema
546 );
547
548 Ok(SubmitDdlTaskResponse {
549 key: procedure_id.into(),
550 ..Default::default()
551 })
552}
553
554async fn handle_drop_database_task(
555 ddl_manager: &DdlManager,
556 drop_database_task: DropDatabaseTask,
557) -> Result<SubmitDdlTaskResponse> {
558 let (id, _) = ddl_manager
559 .submit_drop_database(drop_database_task.clone())
560 .await?;
561
562 let procedure_id = id.to_string();
563 info!(
564 "Database {}.{} is dropped via procedure_id {id:?}",
565 drop_database_task.catalog, drop_database_task.schema
566 );
567
568 Ok(SubmitDdlTaskResponse {
569 key: procedure_id.into(),
570 ..Default::default()
571 })
572}
573
574async fn handle_alter_database_task(
575 ddl_manager: &DdlManager,
576 alter_database_task: AlterDatabaseTask,
577) -> Result<SubmitDdlTaskResponse> {
578 let (id, _) = ddl_manager
579 .submit_alter_database(alter_database_task.clone())
580 .await?;
581
582 let procedure_id = id.to_string();
583 info!(
584 "Database {}.{} is altered via procedure_id {id:?}",
585 alter_database_task.catalog(),
586 alter_database_task.schema()
587 );
588
589 Ok(SubmitDdlTaskResponse {
590 key: procedure_id.into(),
591 ..Default::default()
592 })
593}
594
595async fn handle_drop_flow_task(
596 ddl_manager: &DdlManager,
597 drop_flow_task: DropFlowTask,
598) -> Result<SubmitDdlTaskResponse> {
599 let (id, _) = ddl_manager
600 .submit_drop_flow_task(drop_flow_task.clone())
601 .await?;
602
603 let procedure_id = id.to_string();
604 info!(
605 "Flow {}.{}({}) is dropped via procedure_id {id:?}",
606 drop_flow_task.catalog_name, drop_flow_task.flow_name, drop_flow_task.flow_id,
607 );
608
609 Ok(SubmitDdlTaskResponse {
610 key: procedure_id.into(),
611 ..Default::default()
612 })
613}
614
615async fn handle_drop_view_task(
616 ddl_manager: &DdlManager,
617 drop_view_task: DropViewTask,
618) -> Result<SubmitDdlTaskResponse> {
619 let (id, _) = ddl_manager
620 .submit_drop_view_task(drop_view_task.clone())
621 .await?;
622
623 let procedure_id = id.to_string();
624 info!(
625 "View {}({}) is dropped via procedure_id {id:?}",
626 drop_view_task.table_ref(),
627 drop_view_task.view_id,
628 );
629
630 Ok(SubmitDdlTaskResponse {
631 key: procedure_id.into(),
632 ..Default::default()
633 })
634}
635
636async fn handle_create_flow_task(
637 ddl_manager: &DdlManager,
638 create_flow_task: CreateFlowTask,
639 query_context: QueryContext,
640) -> Result<SubmitDdlTaskResponse> {
641 let (id, output) = ddl_manager
642 .submit_create_flow_task(create_flow_task.clone(), query_context)
643 .await?;
644
645 let procedure_id = id.to_string();
646 let output = output.context(ProcedureOutputSnafu {
647 procedure_id: &procedure_id,
648 err_msg: "empty output",
649 })?;
650 let flow_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
651 procedure_id: &procedure_id,
652 err_msg: "downcast to `u32`",
653 })?);
654 if !create_flow_task.or_replace {
655 info!(
656 "Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
657 create_flow_task.catalog_name, create_flow_task.flow_name,
658 );
659 } else {
660 info!(
661 "Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
662 create_flow_task.catalog_name, create_flow_task.flow_name,
663 );
664 }
665
666 Ok(SubmitDdlTaskResponse {
667 key: procedure_id.into(),
668 ..Default::default()
669 })
670}
671
672async fn handle_alter_logical_table_tasks(
673 ddl_manager: &DdlManager,
674 alter_table_tasks: Vec<AlterTableTask>,
675) -> Result<SubmitDdlTaskResponse> {
676 ensure!(
677 !alter_table_tasks.is_empty(),
678 EmptyDdlTasksSnafu {
679 name: "alter logical tables"
680 }
681 );
682
683 let first_table = TableNameKey {
685 catalog: &alter_table_tasks[0].alter_table.catalog_name,
686 schema: &alter_table_tasks[0].alter_table.schema_name,
687 table: &alter_table_tasks[0].alter_table.table_name,
688 };
689 let physical_table_id =
690 utils::get_physical_table_id(ddl_manager.table_metadata_manager(), first_table).await?;
691 let num_logical_tables = alter_table_tasks.len();
692
693 let (id, _) = ddl_manager
694 .submit_alter_logical_table_tasks(alter_table_tasks, physical_table_id)
695 .await?;
696
697 info!("{num_logical_tables} logical tables on physical table: {physical_table_id:?} is altered via procedure_id {id:?}");
698
699 let procedure_id = id.to_string();
700
701 Ok(SubmitDdlTaskResponse {
702 key: procedure_id.into(),
703 ..Default::default()
704 })
705}
706
707async fn handle_create_view_task(
709 ddl_manager: &DdlManager,
710 create_view_task: CreateViewTask,
711) -> Result<SubmitDdlTaskResponse> {
712 let (id, output) = ddl_manager
713 .submit_create_view_task(create_view_task)
714 .await?;
715
716 let procedure_id = id.to_string();
717 let output = output.context(ProcedureOutputSnafu {
718 procedure_id: &procedure_id,
719 err_msg: "empty output",
720 })?;
721 let view_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
722 procedure_id: &procedure_id,
723 err_msg: "downcast to `u32`",
724 })?);
725 info!("View: {view_id} is created via procedure_id {id:?}");
726
727 Ok(SubmitDdlTaskResponse {
728 key: procedure_id.into(),
729 table_ids: vec![view_id],
730 })
731}
732
733#[async_trait::async_trait]
735impl ProcedureExecutor for DdlManager {
736 async fn submit_ddl_task(
737 &self,
738 ctx: &ExecutorContext,
739 request: SubmitDdlTaskRequest,
740 ) -> Result<SubmitDdlTaskResponse> {
741 let span = ctx
742 .tracing_context
743 .as_ref()
744 .map(TracingContext::from_w3c)
745 .unwrap_or(TracingContext::from_current_span())
746 .attach(tracing::info_span!("DdlManager::submit_ddl_task"));
747 async move {
748 debug!("Submitting Ddl task: {:?}", request.task);
749 match request.task {
750 CreateTable(create_table_task) => {
751 handle_create_table_task(self, create_table_task).await
752 }
753 DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await,
754 AlterTable(alter_table_task) => {
755 handle_alter_table_task(self, alter_table_task).await
756 }
757 TruncateTable(truncate_table_task) => {
758 handle_truncate_table_task(self, truncate_table_task).await
759 }
760 CreateLogicalTables(create_table_tasks) => {
761 handle_create_logical_table_tasks(self, create_table_tasks).await
762 }
763 AlterLogicalTables(alter_table_tasks) => {
764 handle_alter_logical_table_tasks(self, alter_table_tasks).await
765 }
766 DropLogicalTables(_) => todo!(),
767 CreateDatabase(create_database_task) => {
768 handle_create_database_task(self, create_database_task).await
769 }
770 DropDatabase(drop_database_task) => {
771 handle_drop_database_task(self, drop_database_task).await
772 }
773 AlterDatabase(alter_database_task) => {
774 handle_alter_database_task(self, alter_database_task).await
775 }
776 CreateFlow(create_flow_task) => {
777 handle_create_flow_task(self, create_flow_task, request.query_context.into())
778 .await
779 }
780 DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
781 CreateView(create_view_task) => {
782 handle_create_view_task(self, create_view_task).await
783 }
784 DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
785 }
786 }
787 .trace(span)
788 .await
789 }
790
791 async fn migrate_region(
792 &self,
793 _ctx: &ExecutorContext,
794 _request: MigrateRegionRequest,
795 ) -> Result<MigrateRegionResponse> {
796 UnsupportedSnafu {
797 operation: "migrate_region",
798 }
799 .fail()
800 }
801
802 async fn query_procedure_state(
803 &self,
804 _ctx: &ExecutorContext,
805 pid: &str,
806 ) -> Result<ProcedureStateResponse> {
807 let pid =
808 ProcedureId::parse_str(pid).with_context(|_| ParseProcedureIdSnafu { key: pid })?;
809
810 let state = self
811 .procedure_manager
812 .procedure_state(pid)
813 .await
814 .context(QueryProcedureSnafu)?
815 .context(ProcedureNotFoundSnafu {
816 pid: pid.to_string(),
817 })?;
818
819 Ok(procedure::procedure_state_to_pb_response(&state))
820 }
821
822 async fn list_procedures(&self, _ctx: &ExecutorContext) -> Result<ProcedureDetailResponse> {
823 let metas = self
824 .procedure_manager
825 .list_procedures()
826 .await
827 .context(QueryProcedureSnafu)?;
828 Ok(procedure::procedure_details_to_pb_response(metas))
829 }
830}
831
832#[cfg(test)]
833mod tests {
834 use std::sync::Arc;
835
836 use common_procedure::local::LocalManager;
837 use common_procedure::test_util::InMemoryPoisonStore;
838
839 use super::DdlManager;
840 use crate::cache_invalidator::DummyCacheInvalidator;
841 use crate::ddl::alter_table::AlterTableProcedure;
842 use crate::ddl::create_table::CreateTableProcedure;
843 use crate::ddl::drop_table::DropTableProcedure;
844 use crate::ddl::flow_meta::FlowMetadataAllocator;
845 use crate::ddl::table_meta::TableMetadataAllocator;
846 use crate::ddl::truncate_table::TruncateTableProcedure;
847 use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
848 use crate::key::flow::FlowMetadataManager;
849 use crate::key::TableMetadataManager;
850 use crate::kv_backend::memory::MemoryKvBackend;
851 use crate::node_manager::{DatanodeRef, FlownodeRef, NodeManager};
852 use crate::peer::Peer;
853 use crate::region_keeper::MemoryRegionKeeper;
854 use crate::region_registry::LeaderRegionRegistry;
855 use crate::sequence::SequenceBuilder;
856 use crate::state_store::KvStateStore;
857 use crate::wal_options_allocator::WalOptionsAllocator;
858
859 pub struct DummyDatanodeManager;
861
862 #[async_trait::async_trait]
863 impl NodeManager for DummyDatanodeManager {
864 async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
865 unimplemented!()
866 }
867
868 async fn flownode(&self, _node: &Peer) -> FlownodeRef {
869 unimplemented!()
870 }
871 }
872
873 #[test]
874 fn test_try_new() {
875 let kv_backend = Arc::new(MemoryKvBackend::new());
876 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
877 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
878 Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
879 Arc::new(WalOptionsAllocator::default()),
880 ));
881 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
882 let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
883 Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()),
884 ));
885
886 let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
887 let poison_manager = Arc::new(InMemoryPoisonStore::default());
888 let procedure_manager = Arc::new(LocalManager::new(
889 Default::default(),
890 state_store,
891 poison_manager,
892 ));
893
894 let _ = DdlManager::try_new(
895 DdlContext {
896 node_manager: Arc::new(DummyDatanodeManager),
897 cache_invalidator: Arc::new(DummyCacheInvalidator),
898 table_metadata_manager,
899 table_metadata_allocator,
900 flow_metadata_manager,
901 flow_metadata_allocator,
902 memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
903 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
904 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
905 },
906 procedure_manager.clone(),
907 true,
908 );
909
910 let expected_loaders = vec![
911 CreateTableProcedure::TYPE_NAME,
912 AlterTableProcedure::TYPE_NAME,
913 DropTableProcedure::TYPE_NAME,
914 TruncateTableProcedure::TYPE_NAME,
915 ];
916
917 for loader in expected_loaders {
918 assert!(procedure_manager.contains_loader(loader));
919 }
920 }
921}