common_meta/reconciliation/
reconcile_database.rs1pub(crate) mod end;
16pub(crate) mod reconcile_logical_tables;
17pub(crate) mod reconcile_tables;
18pub(crate) mod start;
19
20use std::any::Any;
21use std::collections::HashMap;
22use std::fmt::Debug;
23use std::time::Instant;
24
25use async_trait::async_trait;
26use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
27use common_procedure::{
28 Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
29 Result as ProcedureResult, Status,
30};
31use futures::stream::BoxStream;
32use serde::{Deserialize, Serialize};
33use snafu::ResultExt;
34use store_api::storage::TableId;
35use table::table_name::TableName;
36
37use crate::cache_invalidator::CacheInvalidatorRef;
38use crate::error::Result;
39use crate::key::table_name::TableNameValue;
40use crate::key::TableMetadataManagerRef;
41use crate::lock_key::{CatalogLock, SchemaLock};
42use crate::metrics;
43use crate::node_manager::NodeManagerRef;
44use crate::reconciliation::reconcile_database::start::ReconcileDatabaseStart;
45use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
46use crate::reconciliation::utils::{
47 wait_for_inflight_subprocedures, Context, ReconcileDatabaseMetrics, SubprocedureMeta,
48};
49pub(crate) const DEFAULT_PARALLELISM: usize = 64;
50
51pub(crate) struct ReconcileDatabaseContext {
52 pub node_manager: NodeManagerRef,
53 pub table_metadata_manager: TableMetadataManagerRef,
54 pub cache_invalidator: CacheInvalidatorRef,
55 persistent_ctx: PersistentContext,
56 volatile_ctx: VolatileContext,
57}
58
59impl ReconcileDatabaseContext {
60 pub fn new(ctx: Context, persistent_ctx: PersistentContext) -> Self {
61 Self {
62 node_manager: ctx.node_manager,
63 table_metadata_manager: ctx.table_metadata_manager,
64 cache_invalidator: ctx.cache_invalidator,
65 persistent_ctx,
66 volatile_ctx: VolatileContext::default(),
67 }
68 }
69
70 pub(crate) async fn wait_for_inflight_subprocedures(
72 &mut self,
73 procedure_ctx: &ProcedureContext,
74 ) -> Result<()> {
75 if !self.volatile_ctx.inflight_subprocedures.is_empty() {
76 let result = wait_for_inflight_subprocedures(
77 procedure_ctx,
78 &self.volatile_ctx.inflight_subprocedures,
79 self.persistent_ctx.fail_fast,
80 )
81 .await?;
82
83 let metrics = result.into();
85 self.volatile_ctx.inflight_subprocedures.clear();
86 self.volatile_ctx.metrics += metrics;
87 }
88
89 Ok(())
90 }
91
92 pub(crate) fn metrics(&self) -> &ReconcileDatabaseMetrics {
94 &self.volatile_ctx.metrics
95 }
96}
97
98#[derive(Debug, Serialize, Deserialize)]
99pub(crate) struct PersistentContext {
100 catalog: String,
101 schema: String,
102 fail_fast: bool,
103 parallelism: usize,
104 resolve_strategy: ResolveStrategy,
105 is_subprocedure: bool,
106}
107
108impl PersistentContext {
109 pub fn new(
110 catalog: String,
111 schema: String,
112 fail_fast: bool,
113 parallelism: usize,
114 resolve_strategy: ResolveStrategy,
115 is_subprocedure: bool,
116 ) -> Self {
117 Self {
118 catalog,
119 schema,
120 fail_fast,
121 parallelism,
122 resolve_strategy,
123 is_subprocedure,
124 }
125 }
126}
127
128pub(crate) struct VolatileContext {
129 pending_tables: Vec<(TableId, TableName)>,
131 pending_logical_tables: HashMap<TableId, Vec<(TableId, TableName)>>,
136 inflight_subprocedures: Vec<SubprocedureMeta>,
138 tables: Option<BoxStream<'static, Result<(String, TableNameValue)>>>,
140 metrics: ReconcileDatabaseMetrics,
142 start_time: Instant,
144}
145
146impl Default for VolatileContext {
147 fn default() -> Self {
148 Self {
149 pending_tables: vec![],
150 pending_logical_tables: HashMap::new(),
151 inflight_subprocedures: vec![],
152 tables: None,
153 metrics: ReconcileDatabaseMetrics::default(),
154 start_time: Instant::now(),
155 }
156 }
157}
158
159pub struct ReconcileDatabaseProcedure {
160 pub context: ReconcileDatabaseContext,
161 state: Box<dyn State>,
162}
163
164impl ReconcileDatabaseProcedure {
165 pub const TYPE_NAME: &'static str = "metasrv-procedure::ReconcileDatabase";
166
167 pub fn new(
168 ctx: Context,
169 catalog: String,
170 schema: String,
171 fail_fast: bool,
172 parallelism: usize,
173 resolve_strategy: ResolveStrategy,
174 is_subprocedure: bool,
175 ) -> Self {
176 let persistent_ctx = PersistentContext::new(
177 catalog,
178 schema,
179 fail_fast,
180 parallelism,
181 resolve_strategy,
182 is_subprocedure,
183 );
184 let context = ReconcileDatabaseContext::new(ctx, persistent_ctx);
185 let state = Box::new(ReconcileDatabaseStart);
186 Self { context, state }
187 }
188
189 pub(crate) fn from_json(ctx: Context, json: &str) -> ProcedureResult<Self> {
190 let ProcedureDataOwned {
191 state,
192 persistent_ctx,
193 } = serde_json::from_str(json).context(FromJsonSnafu)?;
194 let context = ReconcileDatabaseContext::new(ctx, persistent_ctx);
195 Ok(Self { context, state })
196 }
197}
198
199#[derive(Debug, Serialize)]
200struct ProcedureData<'a> {
201 state: &'a dyn State,
202 persistent_ctx: &'a PersistentContext,
203}
204
205#[derive(Debug, Deserialize)]
206struct ProcedureDataOwned {
207 state: Box<dyn State>,
208 persistent_ctx: PersistentContext,
209}
210
211#[async_trait]
212impl Procedure for ReconcileDatabaseProcedure {
213 fn type_name(&self) -> &str {
214 Self::TYPE_NAME
215 }
216
217 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
218 let state = &mut self.state;
219
220 let procedure_name = Self::TYPE_NAME;
221 let step = state.name();
222 let _timer = metrics::METRIC_META_RECONCILIATION_PROCEDURE
223 .with_label_values(&[procedure_name, step])
224 .start_timer();
225 match state.next(&mut self.context, _ctx).await {
226 Ok((next, status)) => {
227 *state = next;
228 Ok(status)
229 }
230 Err(e) => {
231 if e.is_retry_later() {
232 metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
233 .with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_RETRYABLE])
234 .inc();
235 Err(ProcedureError::retry_later(e))
236 } else {
237 metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
238 .with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_EXTERNAL])
239 .inc();
240 Err(ProcedureError::external(e))
241 }
242 }
243 }
244 }
245
246 fn dump(&self) -> ProcedureResult<String> {
247 let data = ProcedureData {
248 state: self.state.as_ref(),
249 persistent_ctx: &self.context.persistent_ctx,
250 };
251 serde_json::to_string(&data).context(ToJsonSnafu)
252 }
253
254 fn lock_key(&self) -> LockKey {
255 let catalog = &self.context.persistent_ctx.catalog;
256 let schema = &self.context.persistent_ctx.schema;
257 if self.context.persistent_ctx.is_subprocedure {
259 return LockKey::new(vec![SchemaLock::write(catalog, schema).into()]);
260 }
261
262 LockKey::new(vec![
263 CatalogLock::Read(catalog).into(),
264 SchemaLock::write(catalog, schema).into(),
265 ])
266 }
267}
268
269#[async_trait::async_trait]
270#[typetag::serde(tag = "reconcile_database_state")]
271pub(crate) trait State: Sync + Send + Debug {
272 fn name(&self) -> &'static str {
273 let type_name = std::any::type_name::<Self>();
274 type_name.split("::").last().unwrap_or(type_name)
276 }
277
278 async fn next(
279 &mut self,
280 ctx: &mut ReconcileDatabaseContext,
281 procedure_ctx: &ProcedureContext,
282 ) -> Result<(Box<dyn State>, Status)>;
283
284 fn as_any(&self) -> &dyn Any;
285}