common_meta/reconciliation/
reconcile_logical_tables.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod reconcile_regions;
16pub(crate) mod reconciliation_end;
17pub(crate) mod reconciliation_start;
18pub(crate) mod resolve_table_metadatas;
19pub(crate) mod update_table_infos;
20
21use std::any::Any;
22use std::fmt::Debug;
23
24use async_trait::async_trait;
25use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
26use common_procedure::{
27    Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
28    Result as ProcedureResult, Status,
29};
30use serde::{Deserialize, Serialize};
31use snafu::ResultExt;
32use store_api::metadata::ColumnMetadata;
33use store_api::storage::TableId;
34use table::metadata::RawTableInfo;
35use table::table_name::TableName;
36
37use crate::cache_invalidator::CacheInvalidatorRef;
38use crate::error::Result;
39use crate::key::table_info::TableInfoValue;
40use crate::key::table_route::PhysicalTableRouteValue;
41use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
42use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
43use crate::metrics;
44use crate::node_manager::NodeManagerRef;
45use crate::reconciliation::reconcile_logical_tables::reconciliation_start::ReconciliationStart;
46use crate::reconciliation::utils::{Context, ReconcileLogicalTableMetrics};
47
48pub struct ReconcileLogicalTablesContext {
49    pub node_manager: NodeManagerRef,
50    pub table_metadata_manager: TableMetadataManagerRef,
51    pub cache_invalidator: CacheInvalidatorRef,
52    pub persistent_ctx: PersistentContext,
53    pub volatile_ctx: VolatileContext,
54}
55
56impl ReconcileLogicalTablesContext {
57    /// Creates a new [`ReconcileLogicalTablesContext`] with the given [`Context`] and [`PersistentContext`].
58    pub fn new(ctx: Context, persistent_ctx: PersistentContext) -> Self {
59        Self {
60            node_manager: ctx.node_manager,
61            table_metadata_manager: ctx.table_metadata_manager,
62            cache_invalidator: ctx.cache_invalidator,
63            persistent_ctx,
64            volatile_ctx: VolatileContext::default(),
65        }
66    }
67
68    /// Returns the physical table name.
69    pub(crate) fn table_name(&self) -> &TableName {
70        &self.persistent_ctx.table_name
71    }
72
73    /// Returns the physical table id.
74    pub(crate) fn table_id(&self) -> TableId {
75        self.persistent_ctx.table_id
76    }
77
78    /// Returns a mutable reference to the metrics.
79    pub(crate) fn mut_metrics(&mut self) -> &mut ReconcileLogicalTableMetrics {
80        &mut self.volatile_ctx.metrics
81    }
82
83    /// Returns a reference to the metrics.
84    pub(crate) fn metrics(&self) -> &ReconcileLogicalTableMetrics {
85        &self.volatile_ctx.metrics
86    }
87}
88
89#[derive(Debug, Serialize, Deserialize)]
90pub(crate) struct PersistentContext {
91    pub(crate) table_id: TableId,
92    pub(crate) table_name: TableName,
93    // The logical tables need to be reconciled.
94    // The logical tables belongs to the physical table.
95    pub(crate) logical_tables: Vec<TableName>,
96    // The logical table ids.
97    // The value will be set in `ReconciliationStart` state.
98    pub(crate) logical_table_ids: Vec<TableId>,
99    /// The table info value.
100    /// The value will be set in `ReconciliationStart` state.
101    pub(crate) table_info_value: Option<DeserializedValueWithBytes<TableInfoValue>>,
102    // The physical table route.
103    // The value will be set in `ReconciliationStart` state.
104    pub(crate) physical_table_route: Option<PhysicalTableRouteValue>,
105    // The table infos to be updated.
106    // The value will be set in `ResolveTableMetadatas` state.
107    pub(crate) update_table_infos: Vec<(TableId, Vec<ColumnMetadata>)>,
108    // The table infos to be created.
109    // The value will be set in `ResolveTableMetadatas` state.
110    pub(crate) create_tables: Vec<(TableId, RawTableInfo)>,
111    // Whether the procedure is a subprocedure.
112    pub(crate) is_subprocedure: bool,
113}
114
115impl PersistentContext {
116    pub(crate) fn new(
117        table_id: TableId,
118        table_name: TableName,
119        logical_tables: Vec<(TableId, TableName)>,
120        is_subprocedure: bool,
121    ) -> Self {
122        let (logical_table_ids, logical_tables) = logical_tables.into_iter().unzip();
123
124        Self {
125            table_id,
126            table_name,
127            logical_tables,
128            logical_table_ids,
129            table_info_value: None,
130            physical_table_route: None,
131            update_table_infos: vec![],
132            create_tables: vec![],
133            is_subprocedure,
134        }
135    }
136}
137
138#[derive(Default)]
139pub(crate) struct VolatileContext {
140    pub(crate) metrics: ReconcileLogicalTableMetrics,
141}
142
143pub struct ReconcileLogicalTablesProcedure {
144    pub context: ReconcileLogicalTablesContext,
145    state: Box<dyn State>,
146}
147
148#[derive(Debug, Serialize)]
149struct ProcedureData<'a> {
150    state: &'a dyn State,
151    persistent_ctx: &'a PersistentContext,
152}
153
154#[derive(Debug, Deserialize)]
155struct ProcedureDataOwned {
156    state: Box<dyn State>,
157    persistent_ctx: PersistentContext,
158}
159
160impl ReconcileLogicalTablesProcedure {
161    pub const TYPE_NAME: &'static str = "metasrv-procedure::ReconcileLogicalTables";
162
163    pub fn new(
164        ctx: Context,
165        table_id: TableId,
166        table_name: TableName,
167        logical_tables: Vec<(TableId, TableName)>,
168        is_subprocedure: bool,
169    ) -> Self {
170        let persistent_ctx =
171            PersistentContext::new(table_id, table_name, logical_tables, is_subprocedure);
172        let context = ReconcileLogicalTablesContext::new(ctx, persistent_ctx);
173        let state = Box::new(ReconciliationStart);
174        Self { context, state }
175    }
176
177    pub(crate) fn from_json(ctx: Context, json: &str) -> ProcedureResult<Self> {
178        let ProcedureDataOwned {
179            state,
180            persistent_ctx,
181        } = serde_json::from_str(json).context(FromJsonSnafu)?;
182        let context = ReconcileLogicalTablesContext::new(ctx, persistent_ctx);
183        Ok(Self { context, state })
184    }
185}
186
187#[async_trait]
188impl Procedure for ReconcileLogicalTablesProcedure {
189    fn type_name(&self) -> &str {
190        Self::TYPE_NAME
191    }
192
193    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
194        let state = &mut self.state;
195
196        let procedure_name = Self::TYPE_NAME;
197        let step = state.name();
198        let _timer = metrics::METRIC_META_RECONCILIATION_PROCEDURE
199            .with_label_values(&[procedure_name, step])
200            .start_timer();
201        match state.next(&mut self.context, _ctx).await {
202            Ok((next, status)) => {
203                *state = next;
204                Ok(status)
205            }
206            Err(e) => {
207                if e.is_retry_later() {
208                    metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
209                        .with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_RETRYABLE])
210                        .inc();
211                    Err(ProcedureError::retry_later(e))
212                } else {
213                    metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
214                        .with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_EXTERNAL])
215                        .inc();
216                    Err(ProcedureError::external(e))
217                }
218            }
219        }
220    }
221
222    fn dump(&self) -> ProcedureResult<String> {
223        let data = ProcedureData {
224            state: self.state.as_ref(),
225            persistent_ctx: &self.context.persistent_ctx,
226        };
227        serde_json::to_string(&data).context(ToJsonSnafu)
228    }
229
230    fn lock_key(&self) -> LockKey {
231        let table_ref = &self.context.table_name().table_ref();
232
233        let mut table_ids = self
234            .context
235            .persistent_ctx
236            .logical_table_ids
237            .iter()
238            .map(|t| TableLock::Write(*t).into())
239            .collect::<Vec<_>>();
240        table_ids.sort_unstable();
241        table_ids.push(TableLock::Read(self.context.table_id()).into());
242        if self.context.persistent_ctx.is_subprocedure {
243            // The catalog and schema are already locked by the parent procedure.
244            // Only lock the table name.
245            return LockKey::new(table_ids);
246        }
247        let mut keys = vec![
248            CatalogLock::Read(table_ref.catalog).into(),
249            SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
250        ];
251        keys.extend(table_ids);
252        LockKey::new(keys)
253    }
254}
255
256#[async_trait::async_trait]
257#[typetag::serde(tag = "reconcile_logical_tables_state")]
258pub(crate) trait State: Sync + Send + Debug {
259    fn name(&self) -> &'static str {
260        let type_name = std::any::type_name::<Self>();
261        // short name
262        type_name.split("::").last().unwrap_or(type_name)
263    }
264
265    async fn next(
266        &mut self,
267        ctx: &mut ReconcileLogicalTablesContext,
268        procedure_ctx: &ProcedureContext,
269    ) -> Result<(Box<dyn State>, Status)>;
270
271    fn as_any(&self) -> &dyn Any;
272}