common_meta/reconciliation/reconcile_database/
reconcile_tables.rs1use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status};
18use common_telemetry::info;
19use futures::TryStreamExt;
20use serde::{Deserialize, Serialize};
21use store_api::storage::TableId;
22use table::table_name::TableName;
23use table::table_reference::TableReference;
24
25use crate::error::Result;
26use crate::key::table_route::TableRouteValue;
27use crate::reconciliation::reconcile_database::reconcile_logical_tables::ReconcileLogicalTables;
28use crate::reconciliation::reconcile_database::{ReconcileDatabaseContext, State};
29use crate::reconciliation::reconcile_table::ReconcileTableProcedure;
30use crate::reconciliation::utils::{Context, SubprocedureMeta};
31
32#[derive(Debug, Serialize, Deserialize)]
33pub(crate) struct ReconcileTables;
34
35#[async_trait::async_trait]
36#[typetag::serde]
37impl State for ReconcileTables {
38 async fn next(
39 &mut self,
40 ctx: &mut ReconcileDatabaseContext,
41 procedure_ctx: &ProcedureContext,
42 ) -> Result<(Box<dyn State>, Status)> {
43 info!(
44 "Reconcile tables in database: {}, catalog: {}, inflight_subprocedures: {}",
45 ctx.persistent_ctx.schema,
46 ctx.persistent_ctx.catalog,
47 ctx.volatile_ctx.inflight_subprocedures.len()
48 );
49 ctx.wait_for_inflight_subprocedures(procedure_ctx).await?;
51
52 let catalog = &ctx.persistent_ctx.catalog;
53 let schema = &ctx.persistent_ctx.schema;
54 let parallelism = ctx.persistent_ctx.parallelism;
55 if ctx.volatile_ctx.tables.as_deref().is_none() {
56 let tables = ctx
57 .table_metadata_manager
58 .table_name_manager()
59 .tables(catalog, schema);
60 ctx.volatile_ctx.tables = Some(tables);
61 }
62
63 let pending_tables = &mut ctx.volatile_ctx.pending_tables;
64 while let Some((table_name, table_name_value)) =
66 ctx.volatile_ctx.tables.as_mut().unwrap().try_next().await?
67 {
68 let table_id = table_name_value.table_id();
69 let Some(table_route) = ctx
70 .table_metadata_manager
71 .table_route_manager()
72 .table_route_storage()
73 .get(table_id)
74 .await?
75 else {
76 continue;
77 };
78
79 let table_ref = TableReference::full(catalog, schema, &table_name);
80 Self::enqueue_table(pending_tables, table_id, table_ref, table_route);
82 if Self::should_schedule_reconcile_tables(pending_tables, parallelism) {
85 return Self::schedule_reconcile_tables(ctx);
86 }
87 }
88
89 if !pending_tables.is_empty() {
91 return Self::schedule_reconcile_tables(ctx);
92 }
93 ctx.volatile_ctx.tables.take();
94 Ok((Box::new(ReconcileLogicalTables), Status::executing(true)))
95 }
96
97 fn as_any(&self) -> &dyn Any {
98 self
99 }
100}
101
102impl ReconcileTables {
103 fn schedule_reconcile_tables(
104 ctx: &mut ReconcileDatabaseContext,
105 ) -> Result<(Box<dyn State>, Status)> {
106 let tables = std::mem::take(&mut ctx.volatile_ctx.pending_tables);
107 let (procedures, meta): (Vec<_>, Vec<_>) =
108 Self::build_reconcile_table_procedures(ctx, tables)
109 .into_iter()
110 .unzip();
111 ctx.volatile_ctx.inflight_subprocedures.extend(meta);
112 Ok((
113 Box::new(ReconcileTables),
114 Status::suspended(procedures, false),
115 ))
116 }
117
118 fn should_schedule_reconcile_tables(
119 pending_tables: &[(TableId, TableName)],
120 parallelism: usize,
121 ) -> bool {
122 pending_tables.len() >= parallelism
123 }
124
125 fn build_reconcile_table_procedures(
126 ctx: &ReconcileDatabaseContext,
127 tables: Vec<(TableId, TableName)>,
128 ) -> Vec<(ProcedureWithId, SubprocedureMeta)> {
129 let mut procedures = Vec::with_capacity(tables.len());
130 for (table_id, table_name) in tables {
131 let context = Context {
132 node_manager: ctx.node_manager.clone(),
133 table_metadata_manager: ctx.table_metadata_manager.clone(),
134 cache_invalidator: ctx.cache_invalidator.clone(),
135 };
136 let procedure = ReconcileTableProcedure::new(
137 context,
138 table_id,
139 table_name.clone(),
140 ctx.persistent_ctx.resolve_strategy,
141 true,
142 );
143 let procedure = ProcedureWithId::with_random_id(Box::new(procedure));
144 let meta =
145 SubprocedureMeta::new_physical_table(procedure.id, table_id, table_name.clone());
146 info!(
147 "Reconcile table: {}, table_id: {}, procedure_id: {}",
148 table_name, table_id, procedure.id
149 );
150 procedures.push((procedure, meta));
151 }
152
153 procedures
154 }
155
156 fn enqueue_table(
157 tables: &mut Vec<(TableId, TableName)>,
158 table_id: TableId,
159 table_ref: TableReference<'_>,
160 table_route: TableRouteValue,
161 ) {
162 if table_route.is_physical() {
163 tables.push((table_id, table_ref.into()));
164 }
165 }
166}