common_meta/reconciliation/
reconcile_database.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod end;
16pub(crate) mod reconcile_logical_tables;
17pub(crate) mod reconcile_tables;
18pub(crate) mod start;
19
20use std::any::Any;
21use std::collections::HashMap;
22use std::fmt::Debug;
23use std::time::Instant;
24
25use async_trait::async_trait;
26use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
27use common_procedure::{
28    Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
29    Result as ProcedureResult, Status,
30};
31use futures::stream::BoxStream;
32use serde::{Deserialize, Serialize};
33use snafu::ResultExt;
34use store_api::storage::TableId;
35use table::table_name::TableName;
36
37use crate::cache_invalidator::CacheInvalidatorRef;
38use crate::error::Result;
39use crate::key::table_name::TableNameValue;
40use crate::key::TableMetadataManagerRef;
41use crate::lock_key::{CatalogLock, SchemaLock};
42use crate::metrics;
43use crate::node_manager::NodeManagerRef;
44use crate::reconciliation::reconcile_database::start::ReconcileDatabaseStart;
45use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
46use crate::reconciliation::utils::{
47    wait_for_inflight_subprocedures, Context, ReconcileDatabaseMetrics, SubprocedureMeta,
48};
49pub(crate) const DEFAULT_PARALLELISM: usize = 64;
50
51pub(crate) struct ReconcileDatabaseContext {
52    pub node_manager: NodeManagerRef,
53    pub table_metadata_manager: TableMetadataManagerRef,
54    pub cache_invalidator: CacheInvalidatorRef,
55    persistent_ctx: PersistentContext,
56    volatile_ctx: VolatileContext,
57}
58
59impl ReconcileDatabaseContext {
60    pub fn new(ctx: Context, persistent_ctx: PersistentContext) -> Self {
61        Self {
62            node_manager: ctx.node_manager,
63            table_metadata_manager: ctx.table_metadata_manager,
64            cache_invalidator: ctx.cache_invalidator,
65            persistent_ctx,
66            volatile_ctx: VolatileContext::default(),
67        }
68    }
69
70    /// Waits for inflight subprocedures to complete.
71    pub(crate) async fn wait_for_inflight_subprocedures(
72        &mut self,
73        procedure_ctx: &ProcedureContext,
74    ) -> Result<()> {
75        if !self.volatile_ctx.inflight_subprocedures.is_empty() {
76            let result = wait_for_inflight_subprocedures(
77                procedure_ctx,
78                &self.volatile_ctx.inflight_subprocedures,
79                self.persistent_ctx.fail_fast,
80            )
81            .await?;
82
83            // Collects result into metrics
84            let metrics = result.into();
85            self.volatile_ctx.inflight_subprocedures.clear();
86            self.volatile_ctx.metrics += metrics;
87        }
88
89        Ok(())
90    }
91
92    /// Returns the immutable metrics.
93    pub(crate) fn metrics(&self) -> &ReconcileDatabaseMetrics {
94        &self.volatile_ctx.metrics
95    }
96}
97
98#[derive(Debug, Serialize, Deserialize)]
99pub(crate) struct PersistentContext {
100    catalog: String,
101    schema: String,
102    fail_fast: bool,
103    parallelism: usize,
104    resolve_strategy: ResolveStrategy,
105    is_subprocedure: bool,
106}
107
108impl PersistentContext {
109    pub fn new(
110        catalog: String,
111        schema: String,
112        fail_fast: bool,
113        parallelism: usize,
114        resolve_strategy: ResolveStrategy,
115        is_subprocedure: bool,
116    ) -> Self {
117        Self {
118            catalog,
119            schema,
120            fail_fast,
121            parallelism,
122            resolve_strategy,
123            is_subprocedure,
124        }
125    }
126}
127
128pub(crate) struct VolatileContext {
129    /// Stores pending physical tables.
130    pending_tables: Vec<(TableId, TableName)>,
131    /// Stores pending logical tables associated with each physical table.
132    ///
133    /// - Key: Table ID of the physical table.
134    /// - Value: Vector of (TableId, TableName) tuples representing logical tables belonging to the physical table.
135    pending_logical_tables: HashMap<TableId, Vec<(TableId, TableName)>>,
136    /// Stores inflight subprocedures.
137    inflight_subprocedures: Vec<SubprocedureMeta>,
138    /// Stores the stream of tables.
139    tables: Option<BoxStream<'static, Result<(String, TableNameValue)>>>,
140    /// The metrics of reconciling database.
141    metrics: ReconcileDatabaseMetrics,
142    /// The start time of the reconciliation.
143    start_time: Instant,
144}
145
146impl Default for VolatileContext {
147    fn default() -> Self {
148        Self {
149            pending_tables: vec![],
150            pending_logical_tables: HashMap::new(),
151            inflight_subprocedures: vec![],
152            tables: None,
153            metrics: ReconcileDatabaseMetrics::default(),
154            start_time: Instant::now(),
155        }
156    }
157}
158
159pub struct ReconcileDatabaseProcedure {
160    pub context: ReconcileDatabaseContext,
161    state: Box<dyn State>,
162}
163
164impl ReconcileDatabaseProcedure {
165    pub const TYPE_NAME: &'static str = "metasrv-procedure::ReconcileDatabase";
166
167    pub fn new(
168        ctx: Context,
169        catalog: String,
170        schema: String,
171        fail_fast: bool,
172        parallelism: usize,
173        resolve_strategy: ResolveStrategy,
174        is_subprocedure: bool,
175    ) -> Self {
176        let persistent_ctx = PersistentContext::new(
177            catalog,
178            schema,
179            fail_fast,
180            parallelism,
181            resolve_strategy,
182            is_subprocedure,
183        );
184        let context = ReconcileDatabaseContext::new(ctx, persistent_ctx);
185        let state = Box::new(ReconcileDatabaseStart);
186        Self { context, state }
187    }
188
189    pub(crate) fn from_json(ctx: Context, json: &str) -> ProcedureResult<Self> {
190        let ProcedureDataOwned {
191            state,
192            persistent_ctx,
193        } = serde_json::from_str(json).context(FromJsonSnafu)?;
194        let context = ReconcileDatabaseContext::new(ctx, persistent_ctx);
195        Ok(Self { context, state })
196    }
197}
198
199#[derive(Debug, Serialize)]
200struct ProcedureData<'a> {
201    state: &'a dyn State,
202    persistent_ctx: &'a PersistentContext,
203}
204
205#[derive(Debug, Deserialize)]
206struct ProcedureDataOwned {
207    state: Box<dyn State>,
208    persistent_ctx: PersistentContext,
209}
210
211#[async_trait]
212impl Procedure for ReconcileDatabaseProcedure {
213    fn type_name(&self) -> &str {
214        Self::TYPE_NAME
215    }
216
217    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
218        let state = &mut self.state;
219
220        let procedure_name = Self::TYPE_NAME;
221        let step = state.name();
222        let _timer = metrics::METRIC_META_RECONCILIATION_PROCEDURE
223            .with_label_values(&[procedure_name, step])
224            .start_timer();
225        match state.next(&mut self.context, _ctx).await {
226            Ok((next, status)) => {
227                *state = next;
228                Ok(status)
229            }
230            Err(e) => {
231                if e.is_retry_later() {
232                    metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
233                        .with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_RETRYABLE])
234                        .inc();
235                    Err(ProcedureError::retry_later(e))
236                } else {
237                    metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
238                        .with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_EXTERNAL])
239                        .inc();
240                    Err(ProcedureError::external(e))
241                }
242            }
243        }
244    }
245
246    fn dump(&self) -> ProcedureResult<String> {
247        let data = ProcedureData {
248            state: self.state.as_ref(),
249            persistent_ctx: &self.context.persistent_ctx,
250        };
251        serde_json::to_string(&data).context(ToJsonSnafu)
252    }
253
254    fn lock_key(&self) -> LockKey {
255        let catalog = &self.context.persistent_ctx.catalog;
256        let schema = &self.context.persistent_ctx.schema;
257        // If the procedure is a subprocedure, only lock the schema.
258        if self.context.persistent_ctx.is_subprocedure {
259            return LockKey::new(vec![SchemaLock::write(catalog, schema).into()]);
260        }
261
262        LockKey::new(vec![
263            CatalogLock::Read(catalog).into(),
264            SchemaLock::write(catalog, schema).into(),
265        ])
266    }
267}
268
269#[async_trait::async_trait]
270#[typetag::serde(tag = "reconcile_database_state")]
271pub(crate) trait State: Sync + Send + Debug {
272    fn name(&self) -> &'static str {
273        let type_name = std::any::type_name::<Self>();
274        // short name
275        type_name.split("::").last().unwrap_or(type_name)
276    }
277
278    async fn next(
279        &mut self,
280        ctx: &mut ReconcileDatabaseContext,
281        procedure_ctx: &ProcedureContext,
282    ) -> Result<(Box<dyn State>, Status)>;
283
284    fn as_any(&self) -> &dyn Any;
285}