common_meta/reconciliation/
reconcile_table.rs1pub(crate) mod reconcile_regions;
16pub(crate) mod reconciliation_end;
17pub(crate) mod reconciliation_start;
18pub(crate) mod resolve_column_metadata;
19pub(crate) mod update_table_info;
20
21use std::any::Any;
22use std::fmt::Debug;
23
24use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
25use common_procedure::{
26 Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
27 Result as ProcedureResult, Status,
28};
29use serde::{Deserialize, Serialize};
30use snafu::ResultExt;
31use store_api::metadata::ColumnMetadata;
32use store_api::storage::TableId;
33use table::metadata::RawTableMeta;
34use table::table_name::TableName;
35use tonic::async_trait;
36
37use crate::cache_invalidator::CacheInvalidatorRef;
38use crate::error::Result;
39use crate::key::table_info::TableInfoValue;
40use crate::key::table_route::PhysicalTableRouteValue;
41use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
42use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
43use crate::metrics;
44use crate::node_manager::NodeManagerRef;
45use crate::reconciliation::reconcile_table::reconciliation_start::ReconciliationStart;
46use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
47use crate::reconciliation::utils::{
48 build_table_meta_from_column_metadatas, Context, ReconcileTableMetrics,
49};
50
51pub struct ReconcileTableContext {
52 pub node_manager: NodeManagerRef,
53 pub table_metadata_manager: TableMetadataManagerRef,
54 pub cache_invalidator: CacheInvalidatorRef,
55 pub persistent_ctx: PersistentContext,
56 pub volatile_ctx: VolatileContext,
57}
58
59impl ReconcileTableContext {
60 pub fn new(ctx: Context, persistent_ctx: PersistentContext) -> Self {
62 Self {
63 node_manager: ctx.node_manager,
64 table_metadata_manager: ctx.table_metadata_manager,
65 cache_invalidator: ctx.cache_invalidator,
66 persistent_ctx,
67 volatile_ctx: VolatileContext::default(),
68 }
69 }
70
71 pub(crate) fn table_name(&self) -> &TableName {
73 &self.persistent_ctx.table_name
74 }
75
76 pub(crate) fn table_id(&self) -> TableId {
78 self.persistent_ctx.table_id
79 }
80
81 pub(crate) fn build_table_meta(
83 &self,
84 column_metadatas: &[ColumnMetadata],
85 ) -> Result<RawTableMeta> {
86 let table_info_value = self.persistent_ctx.table_info_value.as_ref().unwrap();
88 let table_id = self.table_id();
89 let table_ref = self.table_name().table_ref();
90 let name_to_ids = table_info_value.table_info.name_to_ids();
91 let table_meta = build_table_meta_from_column_metadatas(
92 table_id,
93 table_ref,
94 &table_info_value.table_info.meta,
95 name_to_ids,
96 column_metadatas,
97 )?;
98
99 Ok(table_meta)
100 }
101
102 pub(crate) fn mut_metrics(&mut self) -> &mut ReconcileTableMetrics {
104 &mut self.volatile_ctx.metrics
105 }
106
107 pub(crate) fn metrics(&self) -> &ReconcileTableMetrics {
109 &self.volatile_ctx.metrics
110 }
111}
112
113#[derive(Debug, Serialize, Deserialize)]
114pub(crate) struct PersistentContext {
115 pub(crate) table_id: TableId,
116 pub(crate) table_name: TableName,
117 pub(crate) resolve_strategy: ResolveStrategy,
118 pub(crate) table_info_value: Option<DeserializedValueWithBytes<TableInfoValue>>,
121 pub(crate) physical_table_route: Option<PhysicalTableRouteValue>,
124 pub(crate) is_subprocedure: bool,
126}
127
128impl PersistentContext {
129 pub(crate) fn new(
130 table_id: TableId,
131 table_name: TableName,
132 resolve_strategy: ResolveStrategy,
133 is_subprocedure: bool,
134 ) -> Self {
135 Self {
136 table_id,
137 table_name,
138 resolve_strategy,
139 table_info_value: None,
140 physical_table_route: None,
141 is_subprocedure,
142 }
143 }
144}
145
146#[derive(Default)]
147pub(crate) struct VolatileContext {
148 pub(crate) table_meta: Option<RawTableMeta>,
149 pub(crate) metrics: ReconcileTableMetrics,
150}
151
152pub struct ReconcileTableProcedure {
153 pub context: ReconcileTableContext,
154 state: Box<dyn State>,
155}
156
157impl ReconcileTableProcedure {
158 pub fn new(
160 ctx: Context,
161 table_id: TableId,
162 table_name: TableName,
163 resolve_strategy: ResolveStrategy,
164 is_subprocedure: bool,
165 ) -> Self {
166 let persistent_ctx =
167 PersistentContext::new(table_id, table_name, resolve_strategy, is_subprocedure);
168 let context = ReconcileTableContext::new(ctx, persistent_ctx);
169 let state = Box::new(ReconciliationStart);
170 Self { context, state }
171 }
172}
173
174impl ReconcileTableProcedure {
175 pub const TYPE_NAME: &'static str = "metasrv-procedure::ReconcileTable";
176
177 pub(crate) fn from_json(ctx: Context, json: &str) -> ProcedureResult<Self> {
178 let ProcedureDataOwned {
179 state,
180 persistent_ctx,
181 } = serde_json::from_str(json).context(FromJsonSnafu)?;
182 let context = ReconcileTableContext::new(ctx, persistent_ctx);
183 Ok(Self { context, state })
184 }
185}
186
187#[derive(Debug, Serialize)]
188struct ProcedureData<'a> {
189 state: &'a dyn State,
190 persistent_ctx: &'a PersistentContext,
191}
192
193#[derive(Debug, Deserialize)]
194struct ProcedureDataOwned {
195 state: Box<dyn State>,
196 persistent_ctx: PersistentContext,
197}
198
199#[async_trait]
200impl Procedure for ReconcileTableProcedure {
201 fn type_name(&self) -> &str {
202 Self::TYPE_NAME
203 }
204
205 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
206 let state = &mut self.state;
207
208 let procedure_name = Self::TYPE_NAME;
209 let step = state.name();
210 let _timer = metrics::METRIC_META_RECONCILIATION_PROCEDURE
211 .with_label_values(&[procedure_name, step])
212 .start_timer();
213 match state.next(&mut self.context, _ctx).await {
214 Ok((next, status)) => {
215 *state = next;
216 Ok(status)
217 }
218 Err(e) => {
219 if e.is_retry_later() {
220 metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
221 .with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_RETRYABLE])
222 .inc();
223 Err(ProcedureError::retry_later(e))
224 } else {
225 metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
226 .with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_EXTERNAL])
227 .inc();
228 Err(ProcedureError::external(e))
229 }
230 }
231 }
232 }
233
234 fn dump(&self) -> ProcedureResult<String> {
235 let data = ProcedureData {
236 state: self.state.as_ref(),
237 persistent_ctx: &self.context.persistent_ctx,
238 };
239 serde_json::to_string(&data).context(ToJsonSnafu)
240 }
241
242 fn lock_key(&self) -> LockKey {
243 let table_ref = &self.context.table_name().table_ref();
244
245 if self.context.persistent_ctx.is_subprocedure {
246 return LockKey::new(vec![TableNameLock::new(
249 table_ref.catalog,
250 table_ref.schema,
251 table_ref.table,
252 )
253 .into()]);
254 }
255
256 LockKey::new(vec![
257 CatalogLock::Read(table_ref.catalog).into(),
258 SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
259 TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
260 ])
261 }
262}
263
264#[async_trait::async_trait]
265#[typetag::serde(tag = "reconcile_table_state")]
266pub(crate) trait State: Sync + Send + Debug {
267 fn name(&self) -> &'static str {
268 let type_name = std::any::type_name::<Self>();
269 type_name.split("::").last().unwrap_or(type_name)
271 }
272
273 async fn next(
274 &mut self,
275 ctx: &mut ReconcileTableContext,
276 procedure_ctx: &ProcedureContext,
277 ) -> Result<(Box<dyn State>, Status)>;
278
279 fn as_any(&self) -> &dyn Any;
280}