common_meta/reconciliation/reconcile_table/
reconcile_regions.rs1use std::any::Any;
16use std::collections::{HashMap, HashSet};
17
18use api::v1::column_def::try_as_column_def;
19use api::v1::region::region_request::Body;
20use api::v1::region::{
21 alter_request, AlterRequest, RegionColumnDef, RegionRequest, RegionRequestHeader, SyncColumns,
22};
23use api::v1::{ColumnDef, SemanticType};
24use async_trait::async_trait;
25use common_procedure::{Context as ProcedureContext, Status};
26use common_telemetry::info;
27use common_telemetry::tracing_context::TracingContext;
28use futures::future;
29use serde::{Deserialize, Serialize};
30use snafu::{OptionExt, ResultExt};
31use store_api::metadata::ColumnMetadata;
32use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
33use store_api::storage::{ColumnId, RegionId};
34
35use crate::ddl::utils::{add_peer_context_if_needed, extract_column_metadatas};
36use crate::error::{ConvertColumnDefSnafu, Result, UnexpectedSnafu};
37use crate::reconciliation::reconcile_table::reconciliation_end::ReconciliationEnd;
38use crate::reconciliation::reconcile_table::update_table_info::UpdateTableInfo;
39use crate::reconciliation::reconcile_table::{ReconcileTableContext, State};
40use crate::rpc::router::{find_leaders, region_distribution};
41
42#[derive(Debug, Serialize, Deserialize)]
43pub struct ReconcileRegions {
44 column_metadatas: Vec<ColumnMetadata>,
45 region_ids: HashSet<RegionId>,
46}
47
48impl ReconcileRegions {
49 pub fn new(column_metadatas: Vec<ColumnMetadata>, region_ids: Vec<RegionId>) -> Self {
50 Self {
51 column_metadatas,
52 region_ids: region_ids.into_iter().collect(),
53 }
54 }
55}
56
57#[async_trait]
58#[typetag::serde]
59impl State for ReconcileRegions {
60 async fn next(
61 &mut self,
62 ctx: &mut ReconcileTableContext,
63 _procedure_ctx: &ProcedureContext,
64 ) -> Result<(Box<dyn State>, Status)> {
65 let table_meta = ctx.build_table_meta(&self.column_metadatas)?;
66 ctx.volatile_ctx.table_meta = Some(table_meta);
67 let table_id = ctx.table_id();
68 let table_name = ctx.table_name();
69
70 let primary_keys = self
71 .column_metadatas
72 .iter()
73 .filter(|c| c.semantic_type == SemanticType::Tag)
74 .map(|c| c.column_schema.name.to_string())
75 .collect::<HashSet<_>>();
76 let column_defs = self
77 .column_metadatas
78 .iter()
79 .map(|c| {
80 let column_def = try_as_column_def(
81 &c.column_schema,
82 primary_keys.contains(&c.column_schema.name),
83 )
84 .context(ConvertColumnDefSnafu {
85 column: &c.column_schema.name,
86 })?;
87
88 Ok((c.column_id, column_def))
89 })
90 .collect::<Result<Vec<_>>>()?;
91
92 let region_routes = &ctx
95 .persistent_ctx
96 .physical_table_route
97 .as_ref()
98 .unwrap()
99 .region_routes;
100 let region_distribution = region_distribution(region_routes);
101 let leaders = find_leaders(region_routes)
102 .into_iter()
103 .map(|p| (p.id, p))
104 .collect::<HashMap<_, _>>();
105 let mut sync_column_tsks = Vec::with_capacity(self.region_ids.len());
106 for (datanode_id, region_role_set) in region_distribution {
107 if region_role_set.leader_regions.is_empty() {
108 continue;
109 }
110 let peer = leaders.get(&datanode_id).unwrap();
112 for region_id in region_role_set.leader_regions {
113 let region_id = RegionId::new(ctx.persistent_ctx.table_id, region_id);
114 if self.region_ids.contains(®ion_id) {
115 let requester = ctx.node_manager.datanode(peer).await;
116 let request = make_alter_region_request(region_id, &column_defs);
117 let peer = peer.clone();
118
119 sync_column_tsks.push(async move {
120 requester
121 .handle(request)
122 .await
123 .map_err(add_peer_context_if_needed(peer))
124 });
125 }
126 }
127 }
128
129 let mut results = future::join_all(sync_column_tsks)
130 .await
131 .into_iter()
132 .collect::<Result<Vec<_>>>()?;
133
134 let column_metadatas =
136 extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)?.context(
137 UnexpectedSnafu {
138 err_msg: format!(
139 "The table column metadata schemas from datanodes are not the same, table: {}, table_id: {}",
140 table_name,
141 table_id
142 ),
143 },
144 )?;
145
146 if column_metadatas != self.column_metadatas {
148 info!("Datanode column metadatas are not consistent with metasrv, updating metasrv's column metadatas, table: {}, table_id: {}", table_name, table_id);
149 let table_info_value = ctx.persistent_ctx.table_info_value.clone().unwrap();
151 return Ok((
152 Box::new(UpdateTableInfo::new(table_info_value, column_metadatas)),
153 Status::executing(true),
154 ));
155 }
156
157 Ok((Box::new(ReconciliationEnd), Status::executing(false)))
158 }
159
160 fn as_any(&self) -> &dyn Any {
161 self
162 }
163}
164
165fn make_alter_region_request(
167 region_id: RegionId,
168 column_defs: &[(ColumnId, ColumnDef)],
169) -> RegionRequest {
170 let kind = alter_request::Kind::SyncColumns(to_region_sync_columns(column_defs));
171
172 let alter_request = AlterRequest {
173 region_id: region_id.as_u64(),
174 schema_version: 0,
175 kind: Some(kind),
176 };
177
178 RegionRequest {
179 header: Some(RegionRequestHeader {
180 tracing_context: TracingContext::from_current_span().to_w3c(),
181 ..Default::default()
182 }),
183 body: Some(Body::Alter(alter_request)),
184 }
185}
186
187fn to_region_sync_columns(column_defs: &[(ColumnId, ColumnDef)]) -> SyncColumns {
188 let region_column_defs = column_defs
189 .iter()
190 .map(|(column_id, column_def)| RegionColumnDef {
191 column_id: *column_id,
192 column_def: Some(column_def.clone()),
193 })
194 .collect::<Vec<_>>();
195
196 SyncColumns {
197 column_defs: region_column_defs,
198 }
199}