common_meta/reconciliation/reconcile_database/
start.rs1use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, Status};
18use common_telemetry::info;
19use serde::{Deserialize, Serialize};
20use snafu::ensure;
21
22use crate::error::{self, Result};
23use crate::key::schema_name::SchemaNameKey;
24use crate::reconciliation::reconcile_database::reconcile_tables::ReconcileTables;
25use crate::reconciliation::reconcile_database::{ReconcileDatabaseContext, State};
26
27#[derive(Debug, Serialize, Deserialize)]
28pub(crate) struct ReconcileDatabaseStart;
29
30#[async_trait::async_trait]
31#[typetag::serde]
32impl State for ReconcileDatabaseStart {
33 async fn next(
34 &mut self,
35 ctx: &mut ReconcileDatabaseContext,
36 procedure_ctx: &ProcedureContext,
37 ) -> Result<(Box<dyn State>, Status)> {
38 let exists = ctx
39 .table_metadata_manager
40 .schema_manager()
41 .exists(SchemaNameKey {
42 catalog: &ctx.persistent_ctx.catalog,
43 schema: &ctx.persistent_ctx.schema,
44 })
45 .await?;
46
47 ensure!(
48 exists,
49 error::SchemaNotFoundSnafu {
50 table_schema: &ctx.persistent_ctx.schema,
51 },
52 );
53 info!(
54 "Reconcile database: {}, catalog: {}, procedure_id: {}",
55 ctx.persistent_ctx.schema, ctx.persistent_ctx.catalog, procedure_ctx.procedure_id,
56 );
57 Ok((Box::new(ReconcileTables), Status::executing(true)))
58 }
59
60 fn as_any(&self) -> &dyn Any {
61 self
62 }
63}