common_meta/reconciliation/
reconcile_logical_tables.rs1pub(crate) mod reconcile_regions;
16pub(crate) mod reconciliation_end;
17pub(crate) mod reconciliation_start;
18pub(crate) mod resolve_table_metadatas;
19pub(crate) mod update_table_infos;
20
21use std::any::Any;
22use std::fmt::Debug;
23
24use async_trait::async_trait;
25use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
26use common_procedure::{
27 Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
28 Result as ProcedureResult, Status,
29};
30use serde::{Deserialize, Serialize};
31use snafu::ResultExt;
32use store_api::metadata::ColumnMetadata;
33use store_api::storage::TableId;
34use table::metadata::RawTableInfo;
35use table::table_name::TableName;
36
37use crate::cache_invalidator::CacheInvalidatorRef;
38use crate::error::Result;
39use crate::key::table_info::TableInfoValue;
40use crate::key::table_route::PhysicalTableRouteValue;
41use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
42use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
43use crate::metrics;
44use crate::node_manager::NodeManagerRef;
45use crate::reconciliation::reconcile_logical_tables::reconciliation_start::ReconciliationStart;
46use crate::reconciliation::utils::{Context, ReconcileLogicalTableMetrics};
47
48pub struct ReconcileLogicalTablesContext {
49 pub node_manager: NodeManagerRef,
50 pub table_metadata_manager: TableMetadataManagerRef,
51 pub cache_invalidator: CacheInvalidatorRef,
52 pub persistent_ctx: PersistentContext,
53 pub volatile_ctx: VolatileContext,
54}
55
56impl ReconcileLogicalTablesContext {
57 pub fn new(ctx: Context, persistent_ctx: PersistentContext) -> Self {
59 Self {
60 node_manager: ctx.node_manager,
61 table_metadata_manager: ctx.table_metadata_manager,
62 cache_invalidator: ctx.cache_invalidator,
63 persistent_ctx,
64 volatile_ctx: VolatileContext::default(),
65 }
66 }
67
68 pub(crate) fn table_name(&self) -> &TableName {
70 &self.persistent_ctx.table_name
71 }
72
73 pub(crate) fn table_id(&self) -> TableId {
75 self.persistent_ctx.table_id
76 }
77
78 pub(crate) fn mut_metrics(&mut self) -> &mut ReconcileLogicalTableMetrics {
80 &mut self.volatile_ctx.metrics
81 }
82
83 pub(crate) fn metrics(&self) -> &ReconcileLogicalTableMetrics {
85 &self.volatile_ctx.metrics
86 }
87}
88
89#[derive(Debug, Serialize, Deserialize)]
90pub(crate) struct PersistentContext {
91 pub(crate) table_id: TableId,
92 pub(crate) table_name: TableName,
93 pub(crate) logical_tables: Vec<TableName>,
96 pub(crate) logical_table_ids: Vec<TableId>,
99 pub(crate) table_info_value: Option<DeserializedValueWithBytes<TableInfoValue>>,
102 pub(crate) physical_table_route: Option<PhysicalTableRouteValue>,
105 pub(crate) update_table_infos: Vec<(TableId, Vec<ColumnMetadata>)>,
108 pub(crate) create_tables: Vec<(TableId, RawTableInfo)>,
111 pub(crate) is_subprocedure: bool,
113}
114
115impl PersistentContext {
116 pub(crate) fn new(
117 table_id: TableId,
118 table_name: TableName,
119 logical_tables: Vec<(TableId, TableName)>,
120 is_subprocedure: bool,
121 ) -> Self {
122 let (logical_table_ids, logical_tables) = logical_tables.into_iter().unzip();
123
124 Self {
125 table_id,
126 table_name,
127 logical_tables,
128 logical_table_ids,
129 table_info_value: None,
130 physical_table_route: None,
131 update_table_infos: vec![],
132 create_tables: vec![],
133 is_subprocedure,
134 }
135 }
136}
137
138#[derive(Default)]
139pub(crate) struct VolatileContext {
140 pub(crate) metrics: ReconcileLogicalTableMetrics,
141}
142
143pub struct ReconcileLogicalTablesProcedure {
144 pub context: ReconcileLogicalTablesContext,
145 state: Box<dyn State>,
146}
147
148#[derive(Debug, Serialize)]
149struct ProcedureData<'a> {
150 state: &'a dyn State,
151 persistent_ctx: &'a PersistentContext,
152}
153
154#[derive(Debug, Deserialize)]
155struct ProcedureDataOwned {
156 state: Box<dyn State>,
157 persistent_ctx: PersistentContext,
158}
159
160impl ReconcileLogicalTablesProcedure {
161 pub const TYPE_NAME: &'static str = "metasrv-procedure::ReconcileLogicalTables";
162
163 pub fn new(
164 ctx: Context,
165 table_id: TableId,
166 table_name: TableName,
167 logical_tables: Vec<(TableId, TableName)>,
168 is_subprocedure: bool,
169 ) -> Self {
170 let persistent_ctx =
171 PersistentContext::new(table_id, table_name, logical_tables, is_subprocedure);
172 let context = ReconcileLogicalTablesContext::new(ctx, persistent_ctx);
173 let state = Box::new(ReconciliationStart);
174 Self { context, state }
175 }
176
177 pub(crate) fn from_json(ctx: Context, json: &str) -> ProcedureResult<Self> {
178 let ProcedureDataOwned {
179 state,
180 persistent_ctx,
181 } = serde_json::from_str(json).context(FromJsonSnafu)?;
182 let context = ReconcileLogicalTablesContext::new(ctx, persistent_ctx);
183 Ok(Self { context, state })
184 }
185}
186
187#[async_trait]
188impl Procedure for ReconcileLogicalTablesProcedure {
189 fn type_name(&self) -> &str {
190 Self::TYPE_NAME
191 }
192
193 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
194 let state = &mut self.state;
195
196 let procedure_name = Self::TYPE_NAME;
197 let step = state.name();
198 let _timer = metrics::METRIC_META_RECONCILIATION_PROCEDURE
199 .with_label_values(&[procedure_name, step])
200 .start_timer();
201 match state.next(&mut self.context, _ctx).await {
202 Ok((next, status)) => {
203 *state = next;
204 Ok(status)
205 }
206 Err(e) => {
207 if e.is_retry_later() {
208 metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
209 .with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_RETRYABLE])
210 .inc();
211 Err(ProcedureError::retry_later(e))
212 } else {
213 metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
214 .with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_EXTERNAL])
215 .inc();
216 Err(ProcedureError::external(e))
217 }
218 }
219 }
220 }
221
222 fn dump(&self) -> ProcedureResult<String> {
223 let data = ProcedureData {
224 state: self.state.as_ref(),
225 persistent_ctx: &self.context.persistent_ctx,
226 };
227 serde_json::to_string(&data).context(ToJsonSnafu)
228 }
229
230 fn lock_key(&self) -> LockKey {
231 let table_ref = &self.context.table_name().table_ref();
232
233 let mut table_ids = self
234 .context
235 .persistent_ctx
236 .logical_table_ids
237 .iter()
238 .map(|t| TableLock::Write(*t).into())
239 .collect::<Vec<_>>();
240 table_ids.sort_unstable();
241 table_ids.push(TableLock::Read(self.context.table_id()).into());
242 if self.context.persistent_ctx.is_subprocedure {
243 return LockKey::new(table_ids);
246 }
247 let mut keys = vec![
248 CatalogLock::Read(table_ref.catalog).into(),
249 SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
250 ];
251 keys.extend(table_ids);
252 LockKey::new(keys)
253 }
254}
255
256#[async_trait::async_trait]
257#[typetag::serde(tag = "reconcile_logical_tables_state")]
258pub(crate) trait State: Sync + Send + Debug {
259 fn name(&self) -> &'static str {
260 let type_name = std::any::type_name::<Self>();
261 type_name.split("::").last().unwrap_or(type_name)
263 }
264
265 async fn next(
266 &mut self,
267 ctx: &mut ReconcileLogicalTablesContext,
268 procedure_ctx: &ProcedureContext,
269 ) -> Result<(Box<dyn State>, Status)>;
270
271 fn as_any(&self) -> &dyn Any;
272}