common_meta/reconciliation/reconcile_database/
reconcile_logical_tables.rs1use std::any::Any;
16use std::collections::HashMap;
17
18use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status};
19use common_telemetry::info;
20use futures::TryStreamExt;
21use serde::{Deserialize, Serialize};
22use snafu::OptionExt;
23use table::metadata::TableId;
24use table::table_name::TableName;
25use table::table_reference::TableReference;
26
27use crate::error::{Result, TableInfoNotFoundSnafu};
28use crate::key::table_route::TableRouteValue;
29use crate::reconciliation::reconcile_database::end::ReconcileDatabaseEnd;
30use crate::reconciliation::reconcile_database::{ReconcileDatabaseContext, State};
31use crate::reconciliation::reconcile_logical_tables::ReconcileLogicalTablesProcedure;
32use crate::reconciliation::utils::{Context, SubprocedureMeta};
33
34#[derive(Debug, Serialize, Deserialize)]
35pub(crate) struct ReconcileLogicalTables;
36
37#[async_trait::async_trait]
38#[typetag::serde]
39impl State for ReconcileLogicalTables {
40 async fn next(
41 &mut self,
42 ctx: &mut ReconcileDatabaseContext,
43 procedure_ctx: &ProcedureContext,
44 ) -> Result<(Box<dyn State>, Status)> {
45 info!(
46 "Reconcile logical tables in database: {}, catalog: {}, inflight_subprocedures: {}",
47 ctx.persistent_ctx.schema,
48 ctx.persistent_ctx.catalog,
49 ctx.volatile_ctx.inflight_subprocedures.len()
50 );
51 ctx.wait_for_inflight_subprocedures(procedure_ctx).await?;
53
54 let catalog = &ctx.persistent_ctx.catalog;
55 let schema = &ctx.persistent_ctx.schema;
56 let parallelism = ctx.persistent_ctx.parallelism;
57 if ctx.volatile_ctx.tables.as_deref().is_none() {
58 let tables = ctx
59 .table_metadata_manager
60 .table_name_manager()
61 .tables(catalog, schema);
62 ctx.volatile_ctx.tables = Some(tables);
63 }
64
65 let pending_logical_tables = &mut ctx.volatile_ctx.pending_logical_tables;
66 let mut pending_procedures = Vec::with_capacity(parallelism);
67 let context = Context {
68 node_manager: ctx.node_manager.clone(),
69 table_metadata_manager: ctx.table_metadata_manager.clone(),
70 cache_invalidator: ctx.cache_invalidator.clone(),
71 };
72 while let Some((table_name, table_name_value)) =
74 ctx.volatile_ctx.tables.as_mut().unwrap().try_next().await?
75 {
76 let table_id = table_name_value.table_id();
77 let Some(table_route) = ctx
78 .table_metadata_manager
79 .table_route_manager()
80 .table_route_storage()
81 .get(table_id)
82 .await?
83 else {
84 continue;
85 };
86
87 let table_ref = TableReference::full(catalog, schema, &table_name);
88 Self::enqueue_logical_table(pending_logical_tables, table_id, table_ref, table_route);
89 if let Some(procedure) = Self::try_build_reconcile_logical_tables_procedure(
91 &context,
92 pending_logical_tables,
93 parallelism,
94 )
95 .await?
96 {
97 pending_procedures.push(procedure);
98 }
99 if Self::should_schedule_reconcile_logical_tables(&pending_procedures, parallelism) {
102 return Self::schedule_reconcile_logical_tables(ctx, &mut pending_procedures);
103 }
104 }
105
106 Self::build_remaining_procedures(
108 &context,
109 pending_logical_tables,
110 &mut pending_procedures,
111 parallelism,
112 )
113 .await?;
114 if !pending_procedures.is_empty() {
116 return Self::schedule_reconcile_logical_tables(ctx, &mut pending_procedures);
117 }
118
119 ctx.volatile_ctx.tables.take();
120 Ok((Box::new(ReconcileDatabaseEnd), Status::executing(true)))
121 }
122
123 fn as_any(&self) -> &dyn Any {
124 self
125 }
126}
127
128impl ReconcileLogicalTables {
129 fn schedule_reconcile_logical_tables(
130 ctx: &mut ReconcileDatabaseContext,
131 buffer: &mut Vec<(ProcedureWithId, SubprocedureMeta)>,
132 ) -> Result<(Box<dyn State>, Status)> {
133 let buffer = std::mem::take(buffer);
134 let (procedures, meta): (Vec<_>, Vec<_>) = buffer.into_iter().unzip();
135
136 ctx.volatile_ctx.inflight_subprocedures.extend(meta);
137 Ok((
138 Box::new(ReconcileLogicalTables),
139 Status::suspended(procedures, false),
140 ))
141 }
142
143 fn should_schedule_reconcile_logical_tables(
144 buffer: &[(ProcedureWithId, SubprocedureMeta)],
145 parallelism: usize,
146 ) -> bool {
147 buffer.len() >= parallelism
148 }
149
150 async fn try_build_reconcile_logical_tables_procedure(
151 ctx: &Context,
152 pending_logical_tables: &mut HashMap<TableId, Vec<(TableId, TableName)>>,
153 parallelism: usize,
154 ) -> Result<Option<(ProcedureWithId, SubprocedureMeta)>> {
155 let mut physical_table_id = None;
156 for (table_id, tables) in pending_logical_tables.iter() {
157 if tables.len() >= parallelism {
158 physical_table_id = Some(*table_id);
159 break;
160 }
161 }
162
163 if let Some(physical_table_id) = physical_table_id {
164 let tables = pending_logical_tables.remove(&physical_table_id).unwrap();
166 return Ok(Some(
167 Self::build_reconcile_logical_tables_procedure(ctx, physical_table_id, tables)
168 .await?,
169 ));
170 }
171
172 Ok(None)
173 }
174
175 async fn build_remaining_procedures(
176 ctx: &Context,
177 pending_logical_tables: &mut HashMap<TableId, Vec<(TableId, TableName)>>,
178 pending_procedures: &mut Vec<(ProcedureWithId, SubprocedureMeta)>,
179 parallelism: usize,
180 ) -> Result<()> {
181 if pending_logical_tables.is_empty() {
182 return Ok(());
183 }
184
185 while let Some(physical_table_id) = pending_logical_tables.keys().next().cloned() {
186 if pending_procedures.len() >= parallelism {
187 return Ok(());
188 }
189
190 let tables = pending_logical_tables.remove(&physical_table_id).unwrap();
192 pending_procedures.push(
193 Self::build_reconcile_logical_tables_procedure(ctx, physical_table_id, tables)
194 .await?,
195 );
196 }
197
198 Ok(())
199 }
200
201 async fn build_reconcile_logical_tables_procedure(
202 ctx: &Context,
203 physical_table_id: TableId,
204 logical_tables: Vec<(TableId, TableName)>,
205 ) -> Result<(ProcedureWithId, SubprocedureMeta)> {
206 let table_info = ctx
207 .table_metadata_manager
208 .table_info_manager()
209 .get(physical_table_id)
210 .await?
211 .context(TableInfoNotFoundSnafu {
212 table: format!("table_id: {}", physical_table_id),
213 })?;
214
215 let physical_table_name = table_info.table_name();
216 let procedure = ReconcileLogicalTablesProcedure::new(
217 ctx.clone(),
218 physical_table_id,
219 physical_table_name.clone(),
220 logical_tables.clone(),
221 true,
222 );
223 let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
224 let subprocedure_meta = SubprocedureMeta::new_logical_table(
225 procedure_with_id.id,
226 physical_table_id,
227 physical_table_name,
228 logical_tables,
229 );
230 Ok((procedure_with_id, subprocedure_meta))
231 }
232
233 fn enqueue_logical_table(
234 tables: &mut HashMap<TableId, Vec<(TableId, TableName)>>,
235 table_id: TableId,
236 table_ref: TableReference<'_>,
237 table_route: TableRouteValue,
238 ) {
239 if !table_route.is_physical() {
240 let logical_table_route = table_route.into_logical_table_route();
241 let physical_table_id = logical_table_route.physical_table_id();
242 tables
243 .entry(physical_table_id)
244 .or_default()
245 .push((table_id, table_ref.into()));
246 }
247 }
248}