common_meta/reconciliation/reconcile_table/
reconciliation_start.rs1use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, Status};
18use common_telemetry::info;
19use serde::{Deserialize, Serialize};
20use snafu::ensure;
21
22use crate::ddl::utils::region_metadata_lister::RegionMetadataLister;
23use crate::error::{self, Result};
24use crate::metrics::{self};
25use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveColumnMetadata;
26use crate::reconciliation::reconcile_table::{
27 ReconcileTableContext, ReconcileTableProcedure, State,
28};
29
30#[derive(Debug, Serialize, Deserialize)]
38pub struct ReconciliationStart;
39
40#[async_trait::async_trait]
41#[typetag::serde]
42impl State for ReconciliationStart {
43 async fn next(
44 &mut self,
45 ctx: &mut ReconcileTableContext,
46 procedure_ctx: &ProcedureContext,
47 ) -> Result<(Box<dyn State>, Status)> {
48 let table_id = ctx.table_id();
49 let table_name = ctx.table_name();
50
51 let (physical_table_id, physical_table_route) = ctx
52 .table_metadata_manager
53 .table_route_manager()
54 .get_physical_table_route(table_id)
55 .await?;
56 ensure!(
57 physical_table_id == table_id,
58 error::UnexpectedSnafu {
59 err_msg: format!(
60 "Reconcile table only works for physical table, but got logical table: {}, table_id: {}",
61 table_name, table_id
62 ),
63 }
64 );
65
66 info!(
67 "Reconciling table: {}, table_id: {}, procedure_id: {}",
68 table_name, table_id, procedure_ctx.procedure_id
69 );
70 let region_metadata_lister = RegionMetadataLister::new(ctx.node_manager.clone());
72
73 let region_metadatas = {
74 let _timer = metrics::METRIC_META_RECONCILIATION_LIST_REGION_METADATA_DURATION
75 .with_label_values(&[metrics::TABLE_TYPE_PHYSICAL])
76 .start_timer();
77 region_metadata_lister
79 .list(physical_table_id, &physical_table_route.region_routes)
80 .await?
81 };
82
83 ensure!(!region_metadatas.is_empty(), {
84 metrics::METRIC_META_RECONCILIATION_STATS
85 .with_label_values(&[
86 ReconcileTableProcedure::TYPE_NAME,
87 metrics::TABLE_TYPE_PHYSICAL,
88 metrics::STATS_TYPE_NO_REGION_METADATA,
89 ])
90 .inc();
91
92 error::UnexpectedSnafu {
93 err_msg: format!(
94 "No region metadata found for table: {}, table_id: {}",
95 table_name, table_id
96 ),
97 }
98 });
99
100 ensure!(region_metadatas.iter().all(|r| r.is_some()), {
101 metrics::METRIC_META_RECONCILIATION_STATS
102 .with_label_values(&[
103 ReconcileTableProcedure::TYPE_NAME,
104 metrics::TABLE_TYPE_PHYSICAL,
105 metrics::STATS_TYPE_REGION_NOT_OPEN,
106 ])
107 .inc();
108
109 error::UnexpectedSnafu {
110 err_msg: format!(
111 "Some regions are not opened, table: {}, table_id: {}",
112 table_name, table_id
113 ),
114 }
115 });
116
117 ctx.persistent_ctx.physical_table_route = Some(physical_table_route);
120 let region_metadatas = region_metadatas.into_iter().map(|r| r.unwrap()).collect();
121 Ok((
122 Box::new(ResolveColumnMetadata::new(
123 ctx.persistent_ctx.resolve_strategy,
124 region_metadatas,
125 )),
126 Status::executing(false),
128 ))
129 }
130
131 fn as_any(&self) -> &dyn Any {
132 self
133 }
134}