common_meta/reconciliation/
reconcile_catalog.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::fmt::Debug;
17use std::time::Instant;
18
19use common_procedure::error::FromJsonSnafu;
20use common_procedure::{
21    Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
22    Result as ProcedureResult, Status,
23};
24use futures::stream::BoxStream;
25use serde::{Deserialize, Serialize};
26use snafu::ResultExt;
27
28use crate::cache_invalidator::CacheInvalidatorRef;
29use crate::error::Result;
30use crate::key::TableMetadataManagerRef;
31use crate::lock_key::CatalogLock;
32use crate::metrics;
33use crate::node_manager::NodeManagerRef;
34use crate::reconciliation::reconcile_catalog::start::ReconcileCatalogStart;
35use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
36use crate::reconciliation::utils::{
37    wait_for_inflight_subprocedures, Context, ReconcileCatalogMetrics, SubprocedureMeta,
38};
39
40pub(crate) mod end;
41pub(crate) mod reconcile_databases;
42pub(crate) mod start;
43
44pub(crate) struct ReconcileCatalogContext {
45    pub node_manager: NodeManagerRef,
46    pub table_metadata_manager: TableMetadataManagerRef,
47    pub cache_invalidator: CacheInvalidatorRef,
48    persistent_ctx: PersistentContext,
49    volatile_ctx: VolatileContext,
50}
51
52impl ReconcileCatalogContext {
53    pub fn new(ctx: Context, persistent_ctx: PersistentContext) -> Self {
54        Self {
55            node_manager: ctx.node_manager,
56            table_metadata_manager: ctx.table_metadata_manager,
57            cache_invalidator: ctx.cache_invalidator,
58            persistent_ctx,
59            volatile_ctx: VolatileContext::default(),
60        }
61    }
62
63    pub(crate) async fn wait_for_inflight_subprocedure(
64        &mut self,
65        procedure_ctx: &ProcedureContext,
66    ) -> Result<()> {
67        if let Some(subprocedure) = self.volatile_ctx.inflight_subprocedure.take() {
68            let subprocedures = [subprocedure];
69            let result = wait_for_inflight_subprocedures(
70                procedure_ctx,
71                &subprocedures,
72                self.persistent_ctx.fast_fail,
73            )
74            .await?;
75            self.volatile_ctx.metrics += result.into();
76        }
77        Ok(())
78    }
79}
80
81#[derive(Debug, Serialize, Deserialize)]
82pub(crate) struct PersistentContext {
83    catalog: String,
84    fast_fail: bool,
85    resolve_strategy: ResolveStrategy,
86    parallelism: usize,
87}
88
89impl PersistentContext {
90    pub fn new(
91        catalog: String,
92        fast_fail: bool,
93        resolve_strategy: ResolveStrategy,
94        parallelism: usize,
95    ) -> Self {
96        Self {
97            catalog,
98            fast_fail,
99            resolve_strategy,
100            parallelism,
101        }
102    }
103}
104
105pub(crate) struct VolatileContext {
106    /// Stores the stream of catalogs.
107    schemas: Option<BoxStream<'static, Result<String>>>,
108    /// Stores the inflight subprocedure.
109    inflight_subprocedure: Option<SubprocedureMeta>,
110    /// Stores the metrics of reconciling catalog.
111    metrics: ReconcileCatalogMetrics,
112    /// The start time of the reconciliation.
113    start_time: Instant,
114}
115
116impl Default for VolatileContext {
117    fn default() -> Self {
118        Self {
119            schemas: None,
120            inflight_subprocedure: None,
121            metrics: Default::default(),
122            start_time: Instant::now(),
123        }
124    }
125}
126
127pub struct ReconcileCatalogProcedure {
128    pub context: ReconcileCatalogContext,
129    state: Box<dyn State>,
130}
131
132impl ReconcileCatalogProcedure {
133    pub const TYPE_NAME: &'static str = "metasrv-procedure::ReconcileCatalog";
134
135    pub fn new(
136        ctx: Context,
137        catalog: String,
138        fast_fail: bool,
139        resolve_strategy: ResolveStrategy,
140        parallelism: usize,
141    ) -> Self {
142        let persistent_ctx =
143            PersistentContext::new(catalog, fast_fail, resolve_strategy, parallelism);
144        let context = ReconcileCatalogContext::new(ctx, persistent_ctx);
145        let state = Box::new(ReconcileCatalogStart);
146        Self { context, state }
147    }
148
149    pub(crate) fn from_json(ctx: Context, json: &str) -> ProcedureResult<Self> {
150        let ProcedureDataOwned {
151            state,
152            persistent_ctx,
153        } = serde_json::from_str(json).context(FromJsonSnafu)?;
154        let context = ReconcileCatalogContext::new(ctx, persistent_ctx);
155        Ok(Self { context, state })
156    }
157}
158
159#[derive(Debug, Serialize)]
160struct ProcedureData<'a> {
161    state: &'a dyn State,
162    persistent_ctx: &'a PersistentContext,
163}
164
165#[derive(Debug, Deserialize)]
166struct ProcedureDataOwned {
167    state: Box<dyn State>,
168    persistent_ctx: PersistentContext,
169}
170
171#[async_trait::async_trait]
172impl Procedure for ReconcileCatalogProcedure {
173    fn type_name(&self) -> &str {
174        Self::TYPE_NAME
175    }
176
177    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
178        let state = &mut self.state;
179
180        let procedure_name = Self::TYPE_NAME;
181        let step = state.name();
182        let _timer = metrics::METRIC_META_RECONCILIATION_PROCEDURE
183            .with_label_values(&[procedure_name, step])
184            .start_timer();
185        match state.next(&mut self.context, _ctx).await {
186            Ok((next, status)) => {
187                *state = next;
188                Ok(status)
189            }
190            Err(e) => {
191                if e.is_retry_later() {
192                    metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
193                        .with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_RETRYABLE])
194                        .inc();
195                    Err(ProcedureError::retry_later(e))
196                } else {
197                    metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
198                        .with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_EXTERNAL])
199                        .inc();
200                    Err(ProcedureError::external(e))
201                }
202            }
203        }
204    }
205
206    fn dump(&self) -> ProcedureResult<String> {
207        let data = ProcedureData {
208            state: self.state.as_ref(),
209            persistent_ctx: &self.context.persistent_ctx,
210        };
211        serde_json::to_string(&data).context(FromJsonSnafu)
212    }
213
214    fn lock_key(&self) -> LockKey {
215        let catalog = &self.context.persistent_ctx.catalog;
216
217        LockKey::new(vec![CatalogLock::Write(catalog).into()])
218    }
219}
220
221#[async_trait::async_trait]
222#[typetag::serde(tag = "reconcile_catalog_state")]
223pub(crate) trait State: Sync + Send + Debug {
224    fn name(&self) -> &'static str {
225        let type_name = std::any::type_name::<Self>();
226        // short name
227        type_name.split("::").last().unwrap_or(type_name)
228    }
229
230    async fn next(
231        &mut self,
232        ctx: &mut ReconcileCatalogContext,
233        procedure_ctx: &ProcedureContext,
234    ) -> Result<(Box<dyn State>, Status)>;
235
236    fn as_any(&self) -> &dyn Any;
237}