common_meta/reconciliation/reconcile_logical_tables/
reconciliation_start.rs1use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, Status};
18use common_telemetry::info;
19use serde::{Deserialize, Serialize};
20use snafu::{ensure, OptionExt};
21use store_api::storage::TableId;
22use table::table_name::TableName;
23
24use crate::ddl::utils::region_metadata_lister::RegionMetadataLister;
25use crate::ddl::utils::table_id::get_all_table_ids_by_names;
26use crate::ddl::utils::table_info::all_logical_table_routes_have_same_physical_id;
27use crate::error::{self, Result};
28use crate::metrics;
29use crate::reconciliation::reconcile_logical_tables::resolve_table_metadatas::ResolveTableMetadatas;
30use crate::reconciliation::reconcile_logical_tables::{
31 ReconcileLogicalTablesContext, ReconcileLogicalTablesProcedure, State,
32};
33use crate::reconciliation::utils::check_column_metadatas_consistent;
34
35#[derive(Debug, Serialize, Deserialize)]
37pub struct ReconciliationStart;
38
39#[async_trait::async_trait]
40#[typetag::serde]
41impl State for ReconciliationStart {
42 async fn next(
43 &mut self,
44 ctx: &mut ReconcileLogicalTablesContext,
45 procedure_ctx: &ProcedureContext,
46 ) -> Result<(Box<dyn State>, Status)> {
47 let table_id = ctx.table_id();
48 let table_name = ctx.table_name();
49 let (physical_table_id, physical_table_route) = ctx
50 .table_metadata_manager
51 .table_route_manager()
52 .get_physical_table_route(table_id)
53 .await?;
54 ensure!(
55 physical_table_id == table_id,
56 error::UnexpectedSnafu {
57 err_msg: format!(
58 "Expected physical table: {}, but it's a logical table of table: {}",
59 table_name, physical_table_id
60 ),
61 }
62 );
63
64 let region_metadata_lister = RegionMetadataLister::new(ctx.node_manager.clone());
65 let region_metadatas = {
66 let _timer = metrics::METRIC_META_RECONCILIATION_LIST_REGION_METADATA_DURATION
67 .with_label_values(&[metrics::TABLE_TYPE_PHYSICAL])
68 .start_timer();
69 region_metadata_lister
70 .list(physical_table_id, &physical_table_route.region_routes)
71 .await?
72 };
73
74 ensure!(!region_metadatas.is_empty(), {
75 metrics::METRIC_META_RECONCILIATION_STATS
76 .with_label_values(&[
77 ReconcileLogicalTablesProcedure::TYPE_NAME,
78 metrics::TABLE_TYPE_PHYSICAL,
79 metrics::STATS_TYPE_NO_REGION_METADATA,
80 ])
81 .inc();
82
83 error::UnexpectedSnafu {
84 err_msg: format!(
85 "No region metadata found for physical table: {}, table_id: {}",
86 table_name, table_id
87 ),
88 }
89 });
90
91 ensure!(region_metadatas.iter().all(|r| r.is_some()), {
92 metrics::METRIC_META_RECONCILIATION_STATS
93 .with_label_values(&[
94 ReconcileLogicalTablesProcedure::TYPE_NAME,
95 metrics::TABLE_TYPE_PHYSICAL,
96 metrics::STATS_TYPE_REGION_NOT_OPEN,
97 ])
98 .inc();
99 error::UnexpectedSnafu {
100 err_msg: format!(
101 "Some regions of the physical table are not open. physical table: {}, table_id: {}",
102 table_name, table_id
103 ),
104 }
105 });
106
107 let region_metadatas = region_metadatas
109 .into_iter()
110 .map(|r| r.unwrap())
111 .collect::<Vec<_>>();
112 let _region_metadata = check_column_metadatas_consistent(®ion_metadatas).context(
113 error::UnexpectedSnafu {
114 err_msg: format!(
115 "Column metadatas are not consistent for physical table: {}, table_id: {}",
116 table_name, table_id
117 ),
118 },
119 )?;
120
121 Self::validate_schema(&ctx.persistent_ctx.logical_tables)?;
124 let table_refs = ctx
125 .persistent_ctx
126 .logical_tables
127 .iter()
128 .map(|t| t.table_ref())
129 .collect::<Vec<_>>();
130 let table_ids = get_all_table_ids_by_names(
131 ctx.table_metadata_manager.table_name_manager(),
132 &table_refs,
133 )
134 .await?;
135 Self::validate_logical_table_routes(ctx, &table_ids).await?;
136
137 let table_name = ctx.table_name();
138 info!(
139 "Starting reconciliation for logical tables: {:?}, physical_table_id: {}, table_name: {}, procedure_id: {}",
140 table_ids, table_id, table_name, procedure_ctx.procedure_id
141 );
142
143 ctx.persistent_ctx.physical_table_route = Some(physical_table_route);
144 ctx.persistent_ctx.logical_table_ids = table_ids;
145 Ok((Box::new(ResolveTableMetadatas), Status::executing(true)))
146 }
147
148 fn as_any(&self) -> &dyn Any {
149 self
150 }
151}
152
153impl ReconciliationStart {
154 fn validate_schema(logical_tables: &[TableName]) -> Result<()> {
156 let is_same_schema = logical_tables.windows(2).all(|pair| {
157 pair[0].catalog_name == pair[1].catalog_name
158 && pair[0].schema_name == pair[1].schema_name
159 });
160
161 ensure!(
162 is_same_schema,
163 error::UnexpectedSnafu {
164 err_msg: "The logical tables have different schemas",
165 }
166 );
167
168 Ok(())
169 }
170
171 async fn validate_logical_table_routes(
172 ctx: &mut ReconcileLogicalTablesContext,
173 table_ids: &[TableId],
174 ) -> Result<()> {
175 let all_logical_table_routes_have_same_physical_id =
176 all_logical_table_routes_have_same_physical_id(
177 ctx.table_metadata_manager.table_route_manager(),
178 table_ids,
179 ctx.table_id(),
180 )
181 .await?;
182
183 ensure!(
184 all_logical_table_routes_have_same_physical_id,
185 error::UnexpectedSnafu {
186 err_msg: "All the logical tables should have the same physical table id",
187 }
188 );
189
190 Ok(())
191 }
192}