common_meta/reconciliation/reconcile_logical_tables/
reconcile_regions.rs1use std::any::Any;
16use std::collections::HashMap;
17
18use api::v1::region::{region_request, CreateRequests, RegionRequest, RegionRequestHeader};
19use common_procedure::{Context as ProcedureContext, Status};
20use common_telemetry::info;
21use common_telemetry::tracing_context::TracingContext;
22use futures::future;
23use serde::{Deserialize, Serialize};
24use store_api::storage::{RegionId, RegionNumber, TableId};
25use table::metadata::RawTableInfo;
26
27use crate::ddl::utils::{add_peer_context_if_needed, region_storage_path};
28use crate::ddl::{build_template_from_raw_table_info, CreateRequestBuilder};
29use crate::error::Result;
30use crate::reconciliation::reconcile_logical_tables::update_table_infos::UpdateTableInfos;
31use crate::reconciliation::reconcile_logical_tables::{ReconcileLogicalTablesContext, State};
32use crate::rpc::router::{find_leaders, region_distribution};
33
34#[derive(Debug, Serialize, Deserialize)]
35pub struct ReconcileRegions;
36
37#[async_trait::async_trait]
38#[typetag::serde]
39impl State for ReconcileRegions {
40 async fn next(
41 &mut self,
42 ctx: &mut ReconcileLogicalTablesContext,
43 _procedure_ctx: &ProcedureContext,
44 ) -> Result<(Box<dyn State>, Status)> {
45 if ctx.persistent_ctx.create_tables.is_empty() {
46 return Ok((Box::new(UpdateTableInfos), Status::executing(false)));
47 }
48
49 let region_routes = &ctx
51 .persistent_ctx
52 .physical_table_route
53 .as_ref()
54 .unwrap()
55 .region_routes;
56
57 let region_distribution = region_distribution(region_routes);
58 let leaders = find_leaders(region_routes)
59 .into_iter()
60 .map(|p| (p.id, p))
61 .collect::<HashMap<_, _>>();
62 let mut create_table_tasks = Vec::with_capacity(leaders.len());
63 for (datanode_id, region_role_set) in region_distribution {
64 if region_role_set.leader_regions.is_empty() {
65 continue;
66 }
67 let peer = leaders.get(&datanode_id).unwrap().clone();
69 let request = self.make_request(®ion_role_set.leader_regions, ctx)?;
70 let requester = ctx.node_manager.datanode(&peer).await;
71 create_table_tasks.push(async move {
72 requester
73 .handle(request)
74 .await
75 .map_err(add_peer_context_if_needed(peer))
76 });
77 }
78
79 future::join_all(create_table_tasks)
80 .await
81 .into_iter()
82 .collect::<Result<Vec<_>>>()?;
83 let table_id = ctx.table_id();
84 let table_name = ctx.table_name();
85 info!(
86 "Reconciled regions for logical tables: {:?}, physical table: {}, table_id: {}",
87 ctx.persistent_ctx
88 .create_tables
89 .iter()
90 .map(|(table_id, _)| table_id)
91 .collect::<Vec<_>>(),
92 table_id,
93 table_name
94 );
95 ctx.persistent_ctx.create_tables.clear();
96 return Ok((Box::new(UpdateTableInfos), Status::executing(true)));
97 }
98
99 fn as_any(&self) -> &dyn Any {
100 self
101 }
102}
103
104impl ReconcileRegions {
105 fn make_request(
106 &self,
107 region_numbers: &[u32],
108 ctx: &ReconcileLogicalTablesContext,
109 ) -> Result<RegionRequest> {
110 let physical_table_id = ctx.table_id();
111 let table_name = ctx.table_name();
112 let create_tables = &ctx.persistent_ctx.create_tables;
113
114 let mut requests = Vec::with_capacity(region_numbers.len() * create_tables.len());
115
116 for (table_id, table_info) in create_tables {
117 let request_builder =
118 create_region_request_from_raw_table_info(table_info, physical_table_id)?;
119 let storage_path =
120 region_storage_path(&table_name.catalog_name, &table_name.schema_name);
121 let partition_exprs = prepare_partition_exprs(ctx, *table_id);
122
123 for region_number in region_numbers {
124 let region_id = RegionId::new(*table_id, *region_number);
125
126 let one_region_request = request_builder.build_one(
127 region_id,
128 storage_path.clone(),
129 &HashMap::new(),
130 &partition_exprs,
131 );
132 requests.push(one_region_request);
133 }
134 }
135
136 Ok(RegionRequest {
137 header: Some(RegionRequestHeader {
138 tracing_context: TracingContext::from_current_span().to_w3c(),
139 ..Default::default()
140 }),
141 body: Some(region_request::Body::Creates(CreateRequests { requests })),
142 })
143 }
144}
145
146fn create_region_request_from_raw_table_info(
148 raw_table_info: &RawTableInfo,
149 physical_table_id: TableId,
150) -> Result<CreateRequestBuilder> {
151 let template = build_template_from_raw_table_info(raw_table_info)?;
152 Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
153}
154
155fn prepare_partition_exprs(
156 ctx: &ReconcileLogicalTablesContext,
157 table_id: TableId,
158) -> HashMap<RegionNumber, String> {
159 ctx.persistent_ctx
160 .physical_table_route
161 .as_ref()
162 .map(|r| {
163 r.region_routes
164 .iter()
165 .filter(|r| r.region.id.table_id() == table_id)
166 .map(|r| (r.region.id.region_number(), r.region.partition_expr()))
167 .collect::<HashMap<_, _>>()
168 })
169 .unwrap_or_default()
170}