common_meta/reconciliation/reconcile_database/
start.rs1use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, Status};
18use common_telemetry::info;
19use serde::{Deserialize, Serialize};
20use snafu::ensure;
21
22use crate::error::{self, Result};
23use crate::key::schema_name::SchemaNameKey;
24use crate::reconciliation::reconcile_database::reconcile_tables::ReconcileTables;
25use crate::reconciliation::reconcile_database::{ReconcileDatabaseContext, State};
26
27#[derive(Debug, Serialize, Deserialize)]
28pub(crate) struct ReconcileDatabaseStart;
29
30#[async_trait::async_trait]
31#[typetag::serde]
32impl State for ReconcileDatabaseStart {
33    async fn next(
34        &mut self,
35        ctx: &mut ReconcileDatabaseContext,
36        procedure_ctx: &ProcedureContext,
37    ) -> Result<(Box<dyn State>, Status)> {
38        let exists = ctx
39            .table_metadata_manager
40            .schema_manager()
41            .exists(SchemaNameKey {
42                catalog: &ctx.persistent_ctx.catalog,
43                schema: &ctx.persistent_ctx.schema,
44            })
45            .await?;
46
47        ensure!(
48            exists,
49            error::SchemaNotFoundSnafu {
50                table_schema: &ctx.persistent_ctx.schema,
51            },
52        );
53        info!(
54            "Reconcile database: {}, catalog: {}, procedure_id: {}",
55            ctx.persistent_ctx.schema, ctx.persistent_ctx.catalog, procedure_ctx.procedure_id,
56        );
57        Ok((Box::new(ReconcileTables), Status::executing(true)))
58    }
59
60    fn as_any(&self) -> &dyn Any {
61        self
62    }
63}