common_meta/reconciliation/reconcile_catalog/
reconcile_databases.rs1use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status};
18use common_telemetry::info;
19use futures::TryStreamExt;
20use serde::{Deserialize, Serialize};
21
22use crate::error::Result;
23use crate::reconciliation::reconcile_catalog::end::ReconcileCatalogEnd;
24use crate::reconciliation::reconcile_catalog::{ReconcileCatalogContext, State};
25use crate::reconciliation::reconcile_database::ReconcileDatabaseProcedure;
26use crate::reconciliation::utils::{Context, SubprocedureMeta};
27
28#[derive(Debug, Serialize, Deserialize)]
29pub(crate) struct ReconcileDatabases;
30
31#[async_trait::async_trait]
32#[typetag::serde]
33impl State for ReconcileDatabases {
34 async fn next(
35 &mut self,
36 ctx: &mut ReconcileCatalogContext,
37 procedure_ctx: &ProcedureContext,
38 ) -> Result<(Box<dyn State>, Status)> {
39 ctx.wait_for_inflight_subprocedure(procedure_ctx).await?;
41
42 if ctx.volatile_ctx.schemas.as_deref().is_none() {
43 let schemas = ctx
44 .table_metadata_manager
45 .schema_manager()
46 .schema_names(&ctx.persistent_ctx.catalog);
47 ctx.volatile_ctx.schemas = Some(schemas);
48 }
49
50 if let Some(catalog) = ctx
51 .volatile_ctx
52 .schemas
53 .as_mut()
54 .unwrap()
55 .try_next()
56 .await?
57 {
58 return Self::schedule_reconcile_database(ctx, catalog);
59 }
60
61 Ok((Box::new(ReconcileCatalogEnd), Status::executing(false)))
62 }
63
64 fn as_any(&self) -> &dyn Any {
65 self
66 }
67}
68
69impl ReconcileDatabases {
70 fn schedule_reconcile_database(
71 ctx: &mut ReconcileCatalogContext,
72 schema: String,
73 ) -> Result<(Box<dyn State>, Status)> {
74 let context = Context {
75 node_manager: ctx.node_manager.clone(),
76 table_metadata_manager: ctx.table_metadata_manager.clone(),
77 cache_invalidator: ctx.cache_invalidator.clone(),
78 };
79 info!(
80 "Scheduling reconcile database: {}, catalog: {}",
81 schema, ctx.persistent_ctx.catalog
82 );
83 let procedure = ReconcileDatabaseProcedure::new(
84 context,
85 ctx.persistent_ctx.catalog.clone(),
86 schema.clone(),
87 ctx.persistent_ctx.fast_fail,
88 ctx.persistent_ctx.parallelism,
89 ctx.persistent_ctx.resolve_strategy,
90 true,
91 );
92 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
93 ctx.volatile_ctx.inflight_subprocedure = Some(SubprocedureMeta::new_reconcile_database(
94 procedure_with_id.id,
95 ctx.persistent_ctx.catalog.clone(),
96 schema,
97 ));
98
99 Ok((
100 Box::new(ReconcileDatabases),
101 Status::suspended(vec![procedure_with_id], false),
102 ))
103 }
104}