common_meta/reconciliation/reconcile_database/
start.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, Status};
18use common_telemetry::info;
19use serde::{Deserialize, Serialize};
20use snafu::ensure;
21
22use crate::error::{self, Result};
23use crate::key::schema_name::SchemaNameKey;
24use crate::reconciliation::reconcile_database::reconcile_tables::ReconcileTables;
25use crate::reconciliation::reconcile_database::{ReconcileDatabaseContext, State};
26
27#[derive(Debug, Serialize, Deserialize)]
28pub(crate) struct ReconcileDatabaseStart;
29
30#[async_trait::async_trait]
31#[typetag::serde]
32impl State for ReconcileDatabaseStart {
33    async fn next(
34        &mut self,
35        ctx: &mut ReconcileDatabaseContext,
36        procedure_ctx: &ProcedureContext,
37    ) -> Result<(Box<dyn State>, Status)> {
38        let exists = ctx
39            .table_metadata_manager
40            .schema_manager()
41            .exists(SchemaNameKey {
42                catalog: &ctx.persistent_ctx.catalog,
43                schema: &ctx.persistent_ctx.schema,
44            })
45            .await?;
46
47        ensure!(
48            exists,
49            error::SchemaNotFoundSnafu {
50                table_schema: &ctx.persistent_ctx.schema,
51            },
52        );
53        info!(
54            "Reconcile database: {}, catalog: {}, procedure_id: {}",
55            ctx.persistent_ctx.schema, ctx.persistent_ctx.catalog, procedure_ctx.procedure_id,
56        );
57        Ok((Box::new(ReconcileTables), Status::executing(true)))
58    }
59
60    fn as_any(&self) -> &dyn Any {
61        self
62    }
63}