common_meta/reconciliation/
reconcile_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod reconcile_regions;
16pub(crate) mod reconciliation_end;
17pub(crate) mod reconciliation_start;
18pub(crate) mod resolve_column_metadata;
19pub(crate) mod update_table_info;
20
21use std::any::Any;
22use std::fmt::Debug;
23
24use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
25use common_procedure::{
26    Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
27    Result as ProcedureResult, Status,
28};
29use serde::{Deserialize, Serialize};
30use snafu::ResultExt;
31use store_api::metadata::ColumnMetadata;
32use store_api::storage::TableId;
33use table::metadata::RawTableMeta;
34use table::table_name::TableName;
35use tonic::async_trait;
36
37use crate::cache_invalidator::CacheInvalidatorRef;
38use crate::error::Result;
39use crate::key::table_info::TableInfoValue;
40use crate::key::table_route::PhysicalTableRouteValue;
41use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
42use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
43use crate::metrics;
44use crate::node_manager::NodeManagerRef;
45use crate::reconciliation::reconcile_table::reconciliation_start::ReconciliationStart;
46use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
47use crate::reconciliation::utils::{
48    build_table_meta_from_column_metadatas, Context, ReconcileTableMetrics,
49};
50
51pub struct ReconcileTableContext {
52    pub node_manager: NodeManagerRef,
53    pub table_metadata_manager: TableMetadataManagerRef,
54    pub cache_invalidator: CacheInvalidatorRef,
55    pub persistent_ctx: PersistentContext,
56    pub volatile_ctx: VolatileContext,
57}
58
59impl ReconcileTableContext {
60    /// Creates a new [`ReconcileTableContext`] with the given [`Context`] and [`PersistentContext`].
61    pub fn new(ctx: Context, persistent_ctx: PersistentContext) -> Self {
62        Self {
63            node_manager: ctx.node_manager,
64            table_metadata_manager: ctx.table_metadata_manager,
65            cache_invalidator: ctx.cache_invalidator,
66            persistent_ctx,
67            volatile_ctx: VolatileContext::default(),
68        }
69    }
70
71    /// Returns the physical table name.
72    pub(crate) fn table_name(&self) -> &TableName {
73        &self.persistent_ctx.table_name
74    }
75
76    /// Returns the physical table id.
77    pub(crate) fn table_id(&self) -> TableId {
78        self.persistent_ctx.table_id
79    }
80
81    /// Builds a [`RawTableMeta`] from the provided [`ColumnMetadata`]s.
82    pub(crate) fn build_table_meta(
83        &self,
84        column_metadatas: &[ColumnMetadata],
85    ) -> Result<RawTableMeta> {
86        // Safety: The table info value is set in `ReconciliationStart` state.
87        let table_info_value = self.persistent_ctx.table_info_value.as_ref().unwrap();
88        let table_id = self.table_id();
89        let table_ref = self.table_name().table_ref();
90        let name_to_ids = table_info_value.table_info.name_to_ids();
91        let table_meta = build_table_meta_from_column_metadatas(
92            table_id,
93            table_ref,
94            &table_info_value.table_info.meta,
95            name_to_ids,
96            column_metadatas,
97        )?;
98
99        Ok(table_meta)
100    }
101
102    /// Returns a mutable reference to the metrics.
103    pub(crate) fn mut_metrics(&mut self) -> &mut ReconcileTableMetrics {
104        &mut self.volatile_ctx.metrics
105    }
106
107    /// Returns a reference to the metrics.
108    pub(crate) fn metrics(&self) -> &ReconcileTableMetrics {
109        &self.volatile_ctx.metrics
110    }
111}
112
113#[derive(Debug, Serialize, Deserialize)]
114pub(crate) struct PersistentContext {
115    pub(crate) table_id: TableId,
116    pub(crate) table_name: TableName,
117    pub(crate) resolve_strategy: ResolveStrategy,
118    /// The table info value.
119    /// The value will be set in `ReconciliationStart` state.
120    pub(crate) table_info_value: Option<DeserializedValueWithBytes<TableInfoValue>>,
121    // The physical table route.
122    // The value will be set in `ReconciliationStart` state.
123    pub(crate) physical_table_route: Option<PhysicalTableRouteValue>,
124    // Whether the procedure is a subprocedure.
125    pub(crate) is_subprocedure: bool,
126}
127
128impl PersistentContext {
129    pub(crate) fn new(
130        table_id: TableId,
131        table_name: TableName,
132        resolve_strategy: ResolveStrategy,
133        is_subprocedure: bool,
134    ) -> Self {
135        Self {
136            table_id,
137            table_name,
138            resolve_strategy,
139            table_info_value: None,
140            physical_table_route: None,
141            is_subprocedure,
142        }
143    }
144}
145
146#[derive(Default)]
147pub(crate) struct VolatileContext {
148    pub(crate) table_meta: Option<RawTableMeta>,
149    pub(crate) metrics: ReconcileTableMetrics,
150}
151
152pub struct ReconcileTableProcedure {
153    pub context: ReconcileTableContext,
154    state: Box<dyn State>,
155}
156
157impl ReconcileTableProcedure {
158    /// Creates a new [`ReconcileTableProcedure`] with the given [`Context`] and [`PersistentContext`].
159    pub fn new(
160        ctx: Context,
161        table_id: TableId,
162        table_name: TableName,
163        resolve_strategy: ResolveStrategy,
164        is_subprocedure: bool,
165    ) -> Self {
166        let persistent_ctx =
167            PersistentContext::new(table_id, table_name, resolve_strategy, is_subprocedure);
168        let context = ReconcileTableContext::new(ctx, persistent_ctx);
169        let state = Box::new(ReconciliationStart);
170        Self { context, state }
171    }
172}
173
174impl ReconcileTableProcedure {
175    pub const TYPE_NAME: &'static str = "metasrv-procedure::ReconcileTable";
176
177    pub(crate) fn from_json(ctx: Context, json: &str) -> ProcedureResult<Self> {
178        let ProcedureDataOwned {
179            state,
180            persistent_ctx,
181        } = serde_json::from_str(json).context(FromJsonSnafu)?;
182        let context = ReconcileTableContext::new(ctx, persistent_ctx);
183        Ok(Self { context, state })
184    }
185}
186
187#[derive(Debug, Serialize)]
188struct ProcedureData<'a> {
189    state: &'a dyn State,
190    persistent_ctx: &'a PersistentContext,
191}
192
193#[derive(Debug, Deserialize)]
194struct ProcedureDataOwned {
195    state: Box<dyn State>,
196    persistent_ctx: PersistentContext,
197}
198
199#[async_trait]
200impl Procedure for ReconcileTableProcedure {
201    fn type_name(&self) -> &str {
202        Self::TYPE_NAME
203    }
204
205    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
206        let state = &mut self.state;
207
208        let procedure_name = Self::TYPE_NAME;
209        let step = state.name();
210        let _timer = metrics::METRIC_META_RECONCILIATION_PROCEDURE
211            .with_label_values(&[procedure_name, step])
212            .start_timer();
213        match state.next(&mut self.context, _ctx).await {
214            Ok((next, status)) => {
215                *state = next;
216                Ok(status)
217            }
218            Err(e) => {
219                if e.is_retry_later() {
220                    metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
221                        .with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_RETRYABLE])
222                        .inc();
223                    Err(ProcedureError::retry_later(e))
224                } else {
225                    metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
226                        .with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_EXTERNAL])
227                        .inc();
228                    Err(ProcedureError::external(e))
229                }
230            }
231        }
232    }
233
234    fn dump(&self) -> ProcedureResult<String> {
235        let data = ProcedureData {
236            state: self.state.as_ref(),
237            persistent_ctx: &self.context.persistent_ctx,
238        };
239        serde_json::to_string(&data).context(ToJsonSnafu)
240    }
241
242    fn lock_key(&self) -> LockKey {
243        let table_ref = &self.context.table_name().table_ref();
244
245        if self.context.persistent_ctx.is_subprocedure {
246            // The catalog and schema are already locked by the parent procedure.
247            // Only lock the table name.
248            return LockKey::new(vec![TableNameLock::new(
249                table_ref.catalog,
250                table_ref.schema,
251                table_ref.table,
252            )
253            .into()]);
254        }
255
256        LockKey::new(vec![
257            CatalogLock::Read(table_ref.catalog).into(),
258            SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
259            TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
260        ])
261    }
262}
263
264#[async_trait::async_trait]
265#[typetag::serde(tag = "reconcile_table_state")]
266pub(crate) trait State: Sync + Send + Debug {
267    fn name(&self) -> &'static str {
268        let type_name = std::any::type_name::<Self>();
269        // short name
270        type_name.split("::").last().unwrap_or(type_name)
271    }
272
273    async fn next(
274        &mut self,
275        ctx: &mut ReconcileTableContext,
276        procedure_ctx: &ProcedureContext,
277    ) -> Result<(Box<dyn State>, Status)>;
278
279    fn as_any(&self) -> &dyn Any;
280}