common_meta/reconciliation/reconcile_logical_tables/
resolve_table_metadatas.rs1use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, Status};
18use common_telemetry::{info, warn};
19use serde::{Deserialize, Serialize};
20use snafu::ensure;
21
22use crate::ddl::utils::region_metadata_lister::RegionMetadataLister;
23use crate::ddl::utils::table_info::get_all_table_info_values_by_table_ids;
24use crate::error::{self, Result};
25use crate::metrics;
26use crate::reconciliation::reconcile_logical_tables::reconcile_regions::ReconcileRegions;
27use crate::reconciliation::reconcile_logical_tables::{
28 ReconcileLogicalTablesContext, ReconcileLogicalTablesProcedure, State,
29};
30use crate::reconciliation::utils::{
31 check_column_metadatas_consistent, need_update_logical_table_info,
32};
33
34#[derive(Debug, Serialize, Deserialize)]
35pub struct ResolveTableMetadatas;
36
37#[async_trait::async_trait]
38#[typetag::serde]
39impl State for ResolveTableMetadatas {
40 async fn next(
41 &mut self,
42 ctx: &mut ReconcileLogicalTablesContext,
43 _procedure_ctx: &ProcedureContext,
44 ) -> Result<(Box<dyn State>, Status)> {
45 let table_names = ctx
46 .persistent_ctx
47 .logical_tables
48 .iter()
49 .map(|t| t.table_ref())
50 .collect::<Vec<_>>();
51 let table_ids = &ctx.persistent_ctx.logical_table_ids;
52
53 let mut create_tables = vec![];
54 let mut update_table_infos = vec![];
55
56 let table_info_values = get_all_table_info_values_by_table_ids(
57 ctx.table_metadata_manager.table_info_manager(),
58 table_ids,
59 &table_names,
60 )
61 .await?;
62
63 let region_routes = &ctx
65 .persistent_ctx
66 .physical_table_route
67 .as_ref()
68 .unwrap()
69 .region_routes;
70 let region_metadata_lister = RegionMetadataLister::new(ctx.node_manager.clone());
71 let mut metadata_consistent_count = 0;
72 let mut metadata_inconsistent_count = 0;
73 let mut create_tables_count = 0;
74 for (table_id, table_info_value) in table_ids.iter().zip(table_info_values.iter()) {
75 let region_metadatas = {
76 let _timer = metrics::METRIC_META_RECONCILIATION_LIST_REGION_METADATA_DURATION
77 .with_label_values(&[metrics::TABLE_TYPE_LOGICAL])
78 .start_timer();
79 region_metadata_lister
80 .list(*table_id, region_routes)
81 .await?
82 };
83
84 ensure!(!region_metadatas.is_empty(), {
85 metrics::METRIC_META_RECONCILIATION_STATS
86 .with_label_values(&[
87 ReconcileLogicalTablesProcedure::TYPE_NAME,
88 metrics::TABLE_TYPE_LOGICAL,
89 metrics::STATS_TYPE_NO_REGION_METADATA,
90 ])
91 .inc();
92
93 error::UnexpectedSnafu {
94 err_msg: format!(
95 "No region metadata found for table: {}, table_id: {}",
96 table_info_value.table_info.name, table_id
97 ),
98 }
99 });
100
101 if region_metadatas.iter().any(|r| r.is_none()) {
102 create_tables_count += 1;
103 create_tables.push((*table_id, table_info_value.table_info.clone()));
104 continue;
105 }
106
107 let region_metadatas = region_metadatas
109 .into_iter()
110 .map(|r| r.unwrap())
111 .collect::<Vec<_>>();
112 if let Some(column_metadatas) = check_column_metadatas_consistent(®ion_metadatas) {
113 metadata_consistent_count += 1;
114 if need_update_logical_table_info(&table_info_value.table_info, &column_metadatas) {
115 update_table_infos.push((*table_id, column_metadatas));
116 }
117 } else {
118 metadata_inconsistent_count += 1;
119 warn!(
122 "Found inconsistent column metadatas for table: {}, table_id: {}. Remaining the inconsistent column metadatas",
123 table_info_value.table_info.name, table_id
124 );
125 }
126 }
127
128 let table_id = ctx.table_id();
129 let table_name = ctx.table_name();
130 info!(
131 "Resolving table metadatas for physical table: {}, table_id: {}, updating table infos: {:?}, creating tables: {:?}",
132 table_name,
133 table_id,
134 update_table_infos
135 .iter()
136 .map(|(table_id, _)| *table_id)
137 .collect::<Vec<_>>(),
138 create_tables
139 .iter()
140 .map(|(table_id, _)| *table_id)
141 .collect::<Vec<_>>(),
142 );
143 ctx.persistent_ctx.update_table_infos = update_table_infos;
144 ctx.persistent_ctx.create_tables = create_tables;
145 let metrics = ctx.mut_metrics();
147 metrics.column_metadata_consistent_count = metadata_consistent_count;
148 metrics.column_metadata_inconsistent_count = metadata_inconsistent_count;
149 metrics.create_tables_count = create_tables_count;
150 Ok((Box::new(ReconcileRegions), Status::executing(true)))
151 }
152
153 fn as_any(&self) -> &dyn Any {
154 self
155 }
156}