common_meta/reconciliation/
reconcile_catalog.rs1use std::any::Any;
16use std::fmt::Debug;
17use std::time::Instant;
18
19use common_procedure::error::FromJsonSnafu;
20use common_procedure::{
21 Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
22 Result as ProcedureResult, Status,
23};
24use futures::stream::BoxStream;
25use serde::{Deserialize, Serialize};
26use snafu::ResultExt;
27
28use crate::cache_invalidator::CacheInvalidatorRef;
29use crate::error::Result;
30use crate::key::TableMetadataManagerRef;
31use crate::lock_key::CatalogLock;
32use crate::metrics;
33use crate::node_manager::NodeManagerRef;
34use crate::reconciliation::reconcile_catalog::start::ReconcileCatalogStart;
35use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
36use crate::reconciliation::utils::{
37 wait_for_inflight_subprocedures, Context, ReconcileCatalogMetrics, SubprocedureMeta,
38};
39
40pub(crate) mod end;
41pub(crate) mod reconcile_databases;
42pub(crate) mod start;
43
44pub(crate) struct ReconcileCatalogContext {
45 pub node_manager: NodeManagerRef,
46 pub table_metadata_manager: TableMetadataManagerRef,
47 pub cache_invalidator: CacheInvalidatorRef,
48 persistent_ctx: PersistentContext,
49 volatile_ctx: VolatileContext,
50}
51
52impl ReconcileCatalogContext {
53 pub fn new(ctx: Context, persistent_ctx: PersistentContext) -> Self {
54 Self {
55 node_manager: ctx.node_manager,
56 table_metadata_manager: ctx.table_metadata_manager,
57 cache_invalidator: ctx.cache_invalidator,
58 persistent_ctx,
59 volatile_ctx: VolatileContext::default(),
60 }
61 }
62
63 pub(crate) async fn wait_for_inflight_subprocedure(
64 &mut self,
65 procedure_ctx: &ProcedureContext,
66 ) -> Result<()> {
67 if let Some(subprocedure) = self.volatile_ctx.inflight_subprocedure.take() {
68 let subprocedures = [subprocedure];
69 let result = wait_for_inflight_subprocedures(
70 procedure_ctx,
71 &subprocedures,
72 self.persistent_ctx.fast_fail,
73 )
74 .await?;
75 self.volatile_ctx.metrics += result.into();
76 }
77 Ok(())
78 }
79}
80
81#[derive(Debug, Serialize, Deserialize)]
82pub(crate) struct PersistentContext {
83 catalog: String,
84 fast_fail: bool,
85 resolve_strategy: ResolveStrategy,
86 parallelism: usize,
87}
88
89impl PersistentContext {
90 pub fn new(
91 catalog: String,
92 fast_fail: bool,
93 resolve_strategy: ResolveStrategy,
94 parallelism: usize,
95 ) -> Self {
96 Self {
97 catalog,
98 fast_fail,
99 resolve_strategy,
100 parallelism,
101 }
102 }
103}
104
105pub(crate) struct VolatileContext {
106 schemas: Option<BoxStream<'static, Result<String>>>,
108 inflight_subprocedure: Option<SubprocedureMeta>,
110 metrics: ReconcileCatalogMetrics,
112 start_time: Instant,
114}
115
116impl Default for VolatileContext {
117 fn default() -> Self {
118 Self {
119 schemas: None,
120 inflight_subprocedure: None,
121 metrics: Default::default(),
122 start_time: Instant::now(),
123 }
124 }
125}
126
127pub struct ReconcileCatalogProcedure {
128 pub context: ReconcileCatalogContext,
129 state: Box<dyn State>,
130}
131
132impl ReconcileCatalogProcedure {
133 pub const TYPE_NAME: &'static str = "metasrv-procedure::ReconcileCatalog";
134
135 pub fn new(
136 ctx: Context,
137 catalog: String,
138 fast_fail: bool,
139 resolve_strategy: ResolveStrategy,
140 parallelism: usize,
141 ) -> Self {
142 let persistent_ctx =
143 PersistentContext::new(catalog, fast_fail, resolve_strategy, parallelism);
144 let context = ReconcileCatalogContext::new(ctx, persistent_ctx);
145 let state = Box::new(ReconcileCatalogStart);
146 Self { context, state }
147 }
148
149 pub(crate) fn from_json(ctx: Context, json: &str) -> ProcedureResult<Self> {
150 let ProcedureDataOwned {
151 state,
152 persistent_ctx,
153 } = serde_json::from_str(json).context(FromJsonSnafu)?;
154 let context = ReconcileCatalogContext::new(ctx, persistent_ctx);
155 Ok(Self { context, state })
156 }
157}
158
159#[derive(Debug, Serialize)]
160struct ProcedureData<'a> {
161 state: &'a dyn State,
162 persistent_ctx: &'a PersistentContext,
163}
164
165#[derive(Debug, Deserialize)]
166struct ProcedureDataOwned {
167 state: Box<dyn State>,
168 persistent_ctx: PersistentContext,
169}
170
171#[async_trait::async_trait]
172impl Procedure for ReconcileCatalogProcedure {
173 fn type_name(&self) -> &str {
174 Self::TYPE_NAME
175 }
176
177 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
178 let state = &mut self.state;
179
180 let procedure_name = Self::TYPE_NAME;
181 let step = state.name();
182 let _timer = metrics::METRIC_META_RECONCILIATION_PROCEDURE
183 .with_label_values(&[procedure_name, step])
184 .start_timer();
185 match state.next(&mut self.context, _ctx).await {
186 Ok((next, status)) => {
187 *state = next;
188 Ok(status)
189 }
190 Err(e) => {
191 if e.is_retry_later() {
192 metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
193 .with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_RETRYABLE])
194 .inc();
195 Err(ProcedureError::retry_later(e))
196 } else {
197 metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
198 .with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_EXTERNAL])
199 .inc();
200 Err(ProcedureError::external(e))
201 }
202 }
203 }
204 }
205
206 fn dump(&self) -> ProcedureResult<String> {
207 let data = ProcedureData {
208 state: self.state.as_ref(),
209 persistent_ctx: &self.context.persistent_ctx,
210 };
211 serde_json::to_string(&data).context(FromJsonSnafu)
212 }
213
214 fn lock_key(&self) -> LockKey {
215 let catalog = &self.context.persistent_ctx.catalog;
216
217 LockKey::new(vec![CatalogLock::Write(catalog).into()])
218 }
219}
220
221#[async_trait::async_trait]
222#[typetag::serde(tag = "reconcile_catalog_state")]
223pub(crate) trait State: Sync + Send + Debug {
224 fn name(&self) -> &'static str {
225 let type_name = std::any::type_name::<Self>();
226 type_name.split("::").last().unwrap_or(type_name)
228 }
229
230 async fn next(
231 &mut self,
232 ctx: &mut ReconcileCatalogContext,
233 procedure_ctx: &ProcedureContext,
234 ) -> Result<(Box<dyn State>, Status)>;
235
236 fn as_any(&self) -> &dyn Any;
237}