common_meta/reconciliation/reconcile_table/
resolve_column_metadata.rs1use std::any::Any;
16
17use async_trait::async_trait;
18use common_procedure::{Context as ProcedureContext, Status};
19use common_telemetry::info;
20use serde::{Deserialize, Serialize};
21use snafu::OptionExt;
22use store_api::metadata::RegionMetadata;
23use strum::AsRefStr;
24
25use crate::error::{self, MissingColumnIdsSnafu, Result};
26use crate::reconciliation::reconcile_table::reconcile_regions::ReconcileRegions;
27use crate::reconciliation::reconcile_table::update_table_info::UpdateTableInfo;
28use crate::reconciliation::reconcile_table::{ReconcileTableContext, State};
29use crate::reconciliation::utils::{
30 build_column_metadata_from_table_info, check_column_metadatas_consistent,
31 resolve_column_metadatas_with_latest, resolve_column_metadatas_with_metasrv,
32 ResolveColumnMetadataResult,
33};
34
35#[derive(Debug, Serialize, Deserialize, Clone, Copy, Default, AsRefStr)]
37pub enum ResolveStrategy {
38 #[default]
39 UseLatest,
41
42 UseMetasrv,
44
45 AbortOnConflict,
47}
48
49impl From<api::v1::meta::ResolveStrategy> for ResolveStrategy {
50 fn from(strategy: api::v1::meta::ResolveStrategy) -> Self {
51 match strategy {
52 api::v1::meta::ResolveStrategy::UseMetasrv => Self::UseMetasrv,
53 api::v1::meta::ResolveStrategy::UseLatest => Self::UseLatest,
54 api::v1::meta::ResolveStrategy::AbortOnConflict => Self::AbortOnConflict,
55 }
56 }
57}
58
59#[derive(Debug, Serialize, Deserialize)]
61pub struct ResolveColumnMetadata {
62 strategy: ResolveStrategy,
63 region_metadata: Vec<RegionMetadata>,
64}
65
66impl ResolveColumnMetadata {
67 pub fn new(strategy: ResolveStrategy, region_metadata: Vec<RegionMetadata>) -> Self {
68 Self {
69 strategy,
70 region_metadata,
71 }
72 }
73}
74
75#[async_trait]
76#[typetag::serde]
77impl State for ResolveColumnMetadata {
78 async fn next(
79 &mut self,
80 ctx: &mut ReconcileTableContext,
81 _procedure_ctx: &ProcedureContext,
82 ) -> Result<(Box<dyn State>, Status)> {
83 let table_id = ctx.persistent_ctx.table_id;
84 let table_name = &ctx.persistent_ctx.table_name;
85
86 let table_info_value = ctx
87 .table_metadata_manager
88 .table_info_manager()
89 .get(table_id)
90 .await?
91 .with_context(|| error::TableNotFoundSnafu {
92 table_name: table_name.to_string(),
93 })?;
94 ctx.persistent_ctx.table_info_value = Some(table_info_value);
95
96 if let Some(column_metadatas) = check_column_metadatas_consistent(&self.region_metadata) {
97 let table_info_value = ctx.persistent_ctx.table_info_value.clone().unwrap();
99 info!(
100 "Column metadatas are consistent for table: {}, table_id: {}.",
101 table_name, table_id
102 );
103
104 ctx.mut_metrics().resolve_column_metadata_result =
106 Some(ResolveColumnMetadataResult::Consistent);
107 return Ok((
108 Box::new(UpdateTableInfo::new(table_info_value, column_metadatas)),
109 Status::executing(false),
110 ));
111 };
112
113 match self.strategy {
114 ResolveStrategy::UseMetasrv => {
115 let table_info_value = ctx.persistent_ctx.table_info_value.as_ref().unwrap();
116 let name_to_ids = table_info_value
117 .table_info
118 .name_to_ids()
119 .context(MissingColumnIdsSnafu)?;
120 let column_metadata = build_column_metadata_from_table_info(
121 &table_info_value.table_info.meta.schema.column_schemas,
122 &table_info_value.table_info.meta.primary_key_indices,
123 &name_to_ids,
124 )?;
125
126 let region_ids =
127 resolve_column_metadatas_with_metasrv(&column_metadata, &self.region_metadata)?;
128
129 let metrics = ctx.mut_metrics();
131 metrics.resolve_column_metadata_result =
132 Some(ResolveColumnMetadataResult::Inconsistent(self.strategy));
133 Ok((
134 Box::new(ReconcileRegions::new(column_metadata, region_ids)),
135 Status::executing(true),
136 ))
137 }
138 ResolveStrategy::UseLatest => {
139 let (column_metadatas, region_ids) =
140 resolve_column_metadatas_with_latest(&self.region_metadata)?;
141
142 let metrics = ctx.mut_metrics();
144 metrics.resolve_column_metadata_result =
145 Some(ResolveColumnMetadataResult::Inconsistent(self.strategy));
146 Ok((
147 Box::new(ReconcileRegions::new(column_metadatas, region_ids)),
148 Status::executing(true),
149 ))
150 }
151 ResolveStrategy::AbortOnConflict => {
152 let table_name = table_name.to_string();
153
154 let metrics = ctx.mut_metrics();
156 metrics.resolve_column_metadata_result =
157 Some(ResolveColumnMetadataResult::Inconsistent(self.strategy));
158 error::ColumnMetadataConflictsSnafu {
159 table_name,
160 table_id,
161 }
162 .fail()
163 }
164 }
165 }
166
167 fn as_any(&self) -> &dyn Any {
168 self
169 }
170}