common_meta/reconciliation/reconcile_table/
resolve_column_metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use async_trait::async_trait;
18use common_procedure::{Context as ProcedureContext, Status};
19use common_telemetry::info;
20use serde::{Deserialize, Serialize};
21use snafu::OptionExt;
22use store_api::metadata::RegionMetadata;
23use strum::AsRefStr;
24
25use crate::error::{self, MissingColumnIdsSnafu, Result};
26use crate::reconciliation::reconcile_table::reconcile_regions::ReconcileRegions;
27use crate::reconciliation::reconcile_table::update_table_info::UpdateTableInfo;
28use crate::reconciliation::reconcile_table::{ReconcileTableContext, State};
29use crate::reconciliation::utils::{
30    build_column_metadata_from_table_info, check_column_metadatas_consistent,
31    resolve_column_metadatas_with_latest, resolve_column_metadatas_with_metasrv,
32    ResolveColumnMetadataResult,
33};
34
35/// Strategy for resolving column metadata inconsistencies.
36#[derive(Debug, Serialize, Deserialize, Clone, Copy, Default, AsRefStr)]
37pub enum ResolveStrategy {
38    #[default]
39    /// Trusts the latest column metadata from datanode.
40    UseLatest,
41
42    /// Always uses the column metadata from metasrv.
43    UseMetasrv,
44
45    /// Aborts the resolution process if inconsistencies are detected.
46    AbortOnConflict,
47}
48
49impl From<api::v1::meta::ResolveStrategy> for ResolveStrategy {
50    fn from(strategy: api::v1::meta::ResolveStrategy) -> Self {
51        match strategy {
52            api::v1::meta::ResolveStrategy::UseMetasrv => Self::UseMetasrv,
53            api::v1::meta::ResolveStrategy::UseLatest => Self::UseLatest,
54            api::v1::meta::ResolveStrategy::AbortOnConflict => Self::AbortOnConflict,
55        }
56    }
57}
58
59/// State responsible for resolving inconsistencies in column metadata across physical regions.
60#[derive(Debug, Serialize, Deserialize)]
61pub struct ResolveColumnMetadata {
62    strategy: ResolveStrategy,
63    region_metadata: Vec<RegionMetadata>,
64}
65
66impl ResolveColumnMetadata {
67    pub fn new(strategy: ResolveStrategy, region_metadata: Vec<RegionMetadata>) -> Self {
68        Self {
69            strategy,
70            region_metadata,
71        }
72    }
73}
74
75#[async_trait]
76#[typetag::serde]
77impl State for ResolveColumnMetadata {
78    async fn next(
79        &mut self,
80        ctx: &mut ReconcileTableContext,
81        _procedure_ctx: &ProcedureContext,
82    ) -> Result<(Box<dyn State>, Status)> {
83        let table_id = ctx.persistent_ctx.table_id;
84        let table_name = &ctx.persistent_ctx.table_name;
85
86        let table_info_value = ctx
87            .table_metadata_manager
88            .table_info_manager()
89            .get(table_id)
90            .await?
91            .with_context(|| error::TableNotFoundSnafu {
92                table_name: table_name.to_string(),
93            })?;
94        ctx.persistent_ctx.table_info_value = Some(table_info_value);
95
96        if let Some(column_metadatas) = check_column_metadatas_consistent(&self.region_metadata) {
97            // Safety: fetched in the above.
98            let table_info_value = ctx.persistent_ctx.table_info_value.clone().unwrap();
99            info!(
100                "Column metadatas are consistent for table: {}, table_id: {}.",
101                table_name, table_id
102            );
103
104            // Update metrics.
105            ctx.mut_metrics().resolve_column_metadata_result =
106                Some(ResolveColumnMetadataResult::Consistent);
107            return Ok((
108                Box::new(UpdateTableInfo::new(table_info_value, column_metadatas)),
109                Status::executing(false),
110            ));
111        };
112
113        match self.strategy {
114            ResolveStrategy::UseMetasrv => {
115                let table_info_value = ctx.persistent_ctx.table_info_value.as_ref().unwrap();
116                let name_to_ids = table_info_value
117                    .table_info
118                    .name_to_ids()
119                    .context(MissingColumnIdsSnafu)?;
120                let column_metadata = build_column_metadata_from_table_info(
121                    &table_info_value.table_info.meta.schema.column_schemas,
122                    &table_info_value.table_info.meta.primary_key_indices,
123                    &name_to_ids,
124                )?;
125
126                let region_ids =
127                    resolve_column_metadatas_with_metasrv(&column_metadata, &self.region_metadata)?;
128
129                // Update metrics.
130                let metrics = ctx.mut_metrics();
131                metrics.resolve_column_metadata_result =
132                    Some(ResolveColumnMetadataResult::Inconsistent(self.strategy));
133                Ok((
134                    Box::new(ReconcileRegions::new(column_metadata, region_ids)),
135                    Status::executing(true),
136                ))
137            }
138            ResolveStrategy::UseLatest => {
139                let (column_metadatas, region_ids) =
140                    resolve_column_metadatas_with_latest(&self.region_metadata)?;
141
142                // Update metrics.
143                let metrics = ctx.mut_metrics();
144                metrics.resolve_column_metadata_result =
145                    Some(ResolveColumnMetadataResult::Inconsistent(self.strategy));
146                Ok((
147                    Box::new(ReconcileRegions::new(column_metadatas, region_ids)),
148                    Status::executing(true),
149                ))
150            }
151            ResolveStrategy::AbortOnConflict => {
152                let table_name = table_name.to_string();
153
154                // Update metrics.
155                let metrics = ctx.mut_metrics();
156                metrics.resolve_column_metadata_result =
157                    Some(ResolveColumnMetadataResult::Inconsistent(self.strategy));
158                error::ColumnMetadataConflictsSnafu {
159                    table_name,
160                    table_id,
161                }
162                .fail()
163            }
164        }
165    }
166
167    fn as_any(&self) -> &dyn Any {
168        self
169    }
170}