common_meta/reconciliation/reconcile_catalog/
reconcile_databases.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status};
18use common_telemetry::info;
19use futures::TryStreamExt;
20use serde::{Deserialize, Serialize};
21
22use crate::error::Result;
23use crate::reconciliation::reconcile_catalog::end::ReconcileCatalogEnd;
24use crate::reconciliation::reconcile_catalog::{ReconcileCatalogContext, State};
25use crate::reconciliation::reconcile_database::ReconcileDatabaseProcedure;
26use crate::reconciliation::utils::{Context, SubprocedureMeta};
27
28#[derive(Debug, Serialize, Deserialize)]
29pub(crate) struct ReconcileDatabases;
30
31#[async_trait::async_trait]
32#[typetag::serde]
33impl State for ReconcileDatabases {
34    async fn next(
35        &mut self,
36        ctx: &mut ReconcileCatalogContext,
37        procedure_ctx: &ProcedureContext,
38    ) -> Result<(Box<dyn State>, Status)> {
39        // Waits for inflight subprocedure first.
40        ctx.wait_for_inflight_subprocedure(procedure_ctx).await?;
41
42        if ctx.volatile_ctx.schemas.as_deref().is_none() {
43            let schemas = ctx
44                .table_metadata_manager
45                .schema_manager()
46                .schema_names(&ctx.persistent_ctx.catalog);
47            ctx.volatile_ctx.schemas = Some(schemas);
48        }
49
50        if let Some(catalog) = ctx
51            .volatile_ctx
52            .schemas
53            .as_mut()
54            .unwrap()
55            .try_next()
56            .await?
57        {
58            return Self::schedule_reconcile_database(ctx, catalog);
59        }
60
61        Ok((Box::new(ReconcileCatalogEnd), Status::executing(false)))
62    }
63
64    fn as_any(&self) -> &dyn Any {
65        self
66    }
67}
68
69impl ReconcileDatabases {
70    fn schedule_reconcile_database(
71        ctx: &mut ReconcileCatalogContext,
72        schema: String,
73    ) -> Result<(Box<dyn State>, Status)> {
74        let context = Context {
75            node_manager: ctx.node_manager.clone(),
76            table_metadata_manager: ctx.table_metadata_manager.clone(),
77            cache_invalidator: ctx.cache_invalidator.clone(),
78        };
79        info!(
80            "Scheduling reconcile database: {}, catalog: {}",
81            schema, ctx.persistent_ctx.catalog
82        );
83        let procedure = ReconcileDatabaseProcedure::new(
84            context,
85            ctx.persistent_ctx.catalog.clone(),
86            schema.clone(),
87            ctx.persistent_ctx.fast_fail,
88            ctx.persistent_ctx.parallelism,
89            ctx.persistent_ctx.resolve_strategy,
90            true,
91        );
92        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
93        ctx.volatile_ctx.inflight_subprocedure = Some(SubprocedureMeta::new_reconcile_database(
94            procedure_with_id.id,
95            ctx.persistent_ctx.catalog.clone(),
96            schema,
97        ));
98
99        Ok((
100            Box::new(ReconcileDatabases),
101            Status::suspended(vec![procedure_with_id], false),
102        ))
103    }
104}