common_meta/reconciliation/
manager.rs1use std::sync::Arc;
16
17use common_procedure::{
18 watcher, BoxedProcedure, ProcedureId, ProcedureManagerRef, ProcedureWithId,
19};
20use common_telemetry::{error, info, warn};
21use snafu::{OptionExt, ResultExt};
22use store_api::storage::TableId;
23use table::table_name::TableName;
24use table::table_reference::TableReference;
25
26use crate::cache_invalidator::CacheInvalidatorRef;
27use crate::error::{self, Result, TableNotFoundSnafu};
28use crate::key::table_name::TableNameKey;
29use crate::key::TableMetadataManagerRef;
30use crate::node_manager::NodeManagerRef;
31use crate::reconciliation::reconcile_catalog::ReconcileCatalogProcedure;
32use crate::reconciliation::reconcile_database::{ReconcileDatabaseProcedure, DEFAULT_PARALLELISM};
33use crate::reconciliation::reconcile_logical_tables::ReconcileLogicalTablesProcedure;
34use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
35use crate::reconciliation::reconcile_table::ReconcileTableProcedure;
36use crate::reconciliation::utils::Context;
37
38pub type ReconciliationManagerRef = Arc<ReconciliationManager>;
39
40pub struct ReconciliationManager {
42 procedure_manager: ProcedureManagerRef,
43 context: Context,
44}
45
46macro_rules! register_reconcile_loader {
47 ($self:ident, $procedure:ty) => {{
48 let context = $self.context.clone();
49 $self
50 .procedure_manager
51 .register_loader(
52 <$procedure>::TYPE_NAME,
53 Box::new(move |json| {
54 let context = context.clone();
55 let procedure = <$procedure>::from_json(context, json)?;
56 Ok(Box::new(procedure))
57 }),
58 )
59 .context(error::RegisterProcedureLoaderSnafu {
60 type_name: <$procedure>::TYPE_NAME,
61 })?;
62 }};
63}
64
65impl ReconciliationManager {
66 pub fn new(
67 node_manager: NodeManagerRef,
68 table_metadata_manager: TableMetadataManagerRef,
69 cache_invalidator: CacheInvalidatorRef,
70 procedure_manager: ProcedureManagerRef,
71 ) -> Self {
72 Self {
73 procedure_manager,
74 context: Context {
75 node_manager,
76 table_metadata_manager,
77 cache_invalidator,
78 },
79 }
80 }
81
82 pub fn try_start(&self) -> Result<()> {
87 register_reconcile_loader!(self, ReconcileLogicalTablesProcedure);
88 register_reconcile_loader!(self, ReconcileTableProcedure);
89 register_reconcile_loader!(self, ReconcileDatabaseProcedure);
90 register_reconcile_loader!(self, ReconcileCatalogProcedure);
91
92 Ok(())
93 }
94
95 pub async fn reconcile_table(
99 &self,
100 table_ref: TableReference<'_>,
101 resolve_strategy: ResolveStrategy,
102 ) -> Result<ProcedureId> {
103 let table_name_key =
104 TableNameKey::new(table_ref.catalog, table_ref.schema, table_ref.table);
105 let table_metadata_manager = &self.context.table_metadata_manager;
106 let table_id = table_metadata_manager
107 .table_name_manager()
108 .get(table_name_key)
109 .await?
110 .with_context(|| TableNotFoundSnafu {
111 table_name: table_ref.to_string(),
112 })?
113 .table_id();
114 let (physical_table_id, _) = table_metadata_manager
115 .table_route_manager()
116 .get_physical_table_route(table_id)
117 .await?;
118
119 if physical_table_id == table_id {
120 self.reconcile_physical_table(table_id, table_ref.into(), resolve_strategy)
121 .await
122 } else {
123 let physical_table_info = table_metadata_manager
124 .table_info_manager()
125 .get(physical_table_id)
126 .await?
127 .with_context(|| TableNotFoundSnafu {
128 table_name: format!("table_id: {}", physical_table_id),
129 })?;
130
131 self.reconcile_logical_tables(
132 physical_table_id,
133 physical_table_info.table_name(),
134 vec![(table_id, table_ref.into())],
135 )
136 .await
137 }
138 }
139
140 pub async fn reconcile_database(
144 &self,
145 catalog: String,
146 schema: String,
147 resolve_strategy: ResolveStrategy,
148 parallelism: usize,
149 ) -> Result<ProcedureId> {
150 let parallelism = normalize_parallelism(parallelism);
151 let procedure = ReconcileDatabaseProcedure::new(
152 self.context.clone(),
153 catalog,
154 schema,
155 false,
156 parallelism,
157 resolve_strategy,
158 false,
159 );
160 self.spawn_procedure(Box::new(procedure)).await
161 }
162
163 async fn reconcile_physical_table(
164 &self,
165 table_id: TableId,
166 table_name: TableName,
167 resolve_strategy: ResolveStrategy,
168 ) -> Result<ProcedureId> {
169 let procedure = ReconcileTableProcedure::new(
170 self.context.clone(),
171 table_id,
172 table_name,
173 resolve_strategy,
174 false,
175 );
176 self.spawn_procedure(Box::new(procedure)).await
177 }
178
179 async fn reconcile_logical_tables(
180 &self,
181 physical_table_id: TableId,
182 physical_table_name: TableName,
183 logical_tables: Vec<(TableId, TableName)>,
184 ) -> Result<ProcedureId> {
185 let procedure = ReconcileLogicalTablesProcedure::new(
186 self.context.clone(),
187 physical_table_id,
188 physical_table_name,
189 logical_tables,
190 false,
191 );
192 self.spawn_procedure(Box::new(procedure)).await
193 }
194
195 pub async fn reconcile_catalog(
199 &self,
200 catalog: String,
201 resolve_strategy: ResolveStrategy,
202 parallelism: usize,
203 ) -> Result<ProcedureId> {
204 let parallelism = normalize_parallelism(parallelism);
205 let procedure = ReconcileCatalogProcedure::new(
206 self.context.clone(),
207 catalog,
208 false,
209 resolve_strategy,
210 parallelism,
211 );
212 self.spawn_procedure(Box::new(procedure)).await
213 }
214
215 async fn spawn_procedure(&self, procedure: BoxedProcedure) -> Result<ProcedureId> {
216 let procedure_manager = self.procedure_manager.clone();
217 let procedure_with_id = ProcedureWithId::with_random_id(procedure);
218 let procedure_id = procedure_with_id.id;
219 let mut watcher = procedure_manager
220 .submit(procedure_with_id)
221 .await
222 .context(error::SubmitProcedureSnafu)?;
223 common_runtime::spawn_global(async move {
224 if let Err(e) = watcher::wait(&mut watcher).await {
225 error!(e; "Failed to wait reconciliation procedure {procedure_id}");
226 return;
227 }
228
229 info!("Reconciliation procedure {procedure_id} is finished successfully!");
230 });
231 Ok(procedure_id)
232 }
233}
234
235fn normalize_parallelism(parallelism: usize) -> usize {
236 if parallelism == 0 {
237 warn!(
238 "Parallelism is 0, using default parallelism: {}",
239 DEFAULT_PARALLELISM
240 );
241 DEFAULT_PARALLELISM
242 } else {
243 parallelism
244 }
245}