meta_srv/procedure/repartition/group/
repartition_start.rs1use std::any::Any;
16use std::collections::{HashMap, HashSet};
17
18use common_meta::rpc::router::RegionRoute;
19use common_procedure::{Context as ProcedureContext, Status};
20use common_telemetry::debug;
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt, ensure};
23
24use crate::error::{self, Result};
25use crate::procedure::repartition::group::sync_region::SyncRegion;
26use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
27use crate::procedure::repartition::group::{
28 Context, GroupId, GroupPrepareResult, State, region_routes,
29};
30use crate::procedure::repartition::plan::RegionDescriptor;
31
32#[derive(Debug, Serialize, Deserialize)]
33pub struct RepartitionStart;
34
35fn ensure_region_route_expr_match(
37 region_route: &RegionRoute,
38 region_descriptor: &RegionDescriptor,
39) -> Result<RegionRoute> {
40 let actual = ®ion_route.region.partition_expr;
41 let expected = region_descriptor
42 .partition_expr
43 .as_json_str()
44 .context(error::SerializePartitionExprSnafu)?;
45 ensure!(
46 actual == &expected,
47 error::PartitionExprMismatchSnafu {
48 region_id: region_route.region.id,
49 expected,
50 actual,
51 }
52 );
53 Ok(region_route.clone())
54}
55
56impl RepartitionStart {
57 fn ensure_route_present(
61 group_id: GroupId,
62 region_routes: &[RegionRoute],
63 sources: &[RegionDescriptor],
64 targets: &[RegionDescriptor],
65 ) -> Result<GroupPrepareResult> {
66 ensure!(
67 !sources.is_empty(),
68 error::UnexpectedSnafu {
69 violated: "Sources are empty"
70 }
71 );
72
73 let region_routes_map = region_routes
74 .iter()
75 .map(|r| (r.region.id, r))
76 .collect::<HashMap<_, _>>();
77 let source_region_routes = sources
78 .iter()
79 .map(|s| {
80 region_routes_map
81 .get(&s.region_id)
82 .context(error::RepartitionSourceRegionMissingSnafu {
83 group_id,
84 region_id: s.region_id,
85 })
86 .and_then(|r| ensure_region_route_expr_match(r, s))
87 })
88 .collect::<Result<Vec<_>>>()?;
89 let target_region_routes = targets
90 .iter()
91 .map(|t| {
92 region_routes_map
93 .get(&t.region_id)
94 .context(error::RepartitionTargetRegionMissingSnafu {
95 group_id,
96 region_id: t.region_id,
97 })
98 .map(|r| (*r).clone())
99 })
100 .collect::<Result<Vec<_>>>()?;
101 for target_region_route in &target_region_routes {
102 ensure!(
103 target_region_route.leader_peer.is_some(),
104 error::UnexpectedSnafu {
105 violated: format!(
106 "Leader peer is not set for region: {}",
107 target_region_route.region.id
108 ),
109 }
110 );
111 }
112 let central_region = sources[0].region_id;
113 let central_region_datanode = source_region_routes[0]
114 .leader_peer
115 .as_ref()
116 .context(error::UnexpectedSnafu {
117 violated: format!(
118 "Leader peer is not set for central region: {}",
119 central_region
120 ),
121 })?
122 .clone();
123
124 Ok(GroupPrepareResult {
125 source_routes: source_region_routes,
126 target_routes: target_region_routes,
127 central_region,
128 central_region_datanode,
129 })
130 }
131}
132
133#[async_trait::async_trait]
134#[typetag::serde]
135impl State for RepartitionStart {
136 async fn next(
147 &mut self,
148 ctx: &mut Context,
149 _procedure_ctx: &ProcedureContext,
150 ) -> Result<(Box<dyn State>, Status)> {
151 if ctx.persistent_ctx.group_prepare_result.is_some() {
152 return Ok((
153 Box::new(UpdateMetadata::ApplyStaging),
154 Status::executing(true),
155 ));
156 }
157 let table_id = ctx.persistent_ctx.table_id;
158 let group_id = ctx.persistent_ctx.group_id;
159 let table_route_value = ctx.get_table_route_value().await?.into_inner();
160 let region_routes = region_routes(table_id, &table_route_value)?;
161 let group_prepare_result = Self::ensure_route_present(
162 group_id,
163 region_routes,
164 &ctx.persistent_ctx.sources,
165 &ctx.persistent_ctx.targets,
166 )?;
167 ctx.persistent_ctx.group_prepare_result = Some(group_prepare_result);
168 debug!(
169 "Repartition group {}: captured {} sources, {} targets",
170 group_id,
171 ctx.persistent_ctx.sources.len(),
172 ctx.persistent_ctx.targets.len()
173 );
174
175 if ctx.persistent_ctx.sync_region {
176 let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
177 let allocated_region_ids: HashSet<_> = ctx
178 .persistent_ctx
179 .allocated_region_ids
180 .iter()
181 .copied()
182 .collect();
183 let region_routes: Vec<_> = prepare_result
184 .target_routes
185 .iter()
186 .filter(|route| allocated_region_ids.contains(&route.region.id))
187 .cloned()
188 .collect();
189 if !region_routes.is_empty() {
190 return Ok((
191 Box::new(SyncRegion { region_routes }),
192 Status::executing(true),
193 ));
194 }
195 }
196
197 Ok((
198 Box::new(UpdateMetadata::ApplyStaging),
199 Status::executing(true),
200 ))
201 }
202
203 fn as_any(&self) -> &dyn Any {
204 self
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use std::assert_matches::assert_matches;
211
212 use common_meta::peer::Peer;
213 use common_meta::rpc::router::{Region, RegionRoute};
214 use store_api::storage::RegionId;
215 use uuid::Uuid;
216
217 use crate::error::Error;
218 use crate::procedure::repartition::group::repartition_start::RepartitionStart;
219 use crate::procedure::repartition::plan::RegionDescriptor;
220 use crate::procedure::repartition::test_util::range_expr;
221
222 #[test]
223 fn test_ensure_route_present_missing_source_region() {
224 let source_region = RegionDescriptor {
225 region_id: RegionId::new(1024, 1),
226 partition_expr: range_expr("x", 0, 100),
227 };
228 let target_region = RegionDescriptor {
229 region_id: RegionId::new(1024, 2),
230 partition_expr: range_expr("x", 0, 10),
231 };
232 let region_routes = vec![RegionRoute {
233 region: Region {
234 id: RegionId::new(1024, 2),
235 ..Default::default()
236 },
237 leader_peer: Some(Peer::empty(1)),
238 ..Default::default()
239 }];
240 let err = RepartitionStart::ensure_route_present(
241 Uuid::new_v4(),
242 ®ion_routes,
243 &[source_region],
244 &[target_region],
245 )
246 .unwrap_err();
247 assert_matches!(err, Error::RepartitionSourceRegionMissing { .. });
248 }
249
250 #[test]
251 fn test_ensure_route_present_partition_expr_mismatch() {
252 let source_region = RegionDescriptor {
253 region_id: RegionId::new(1024, 1),
254 partition_expr: range_expr("x", 0, 100),
255 };
256 let target_region = RegionDescriptor {
257 region_id: RegionId::new(1024, 2),
258 partition_expr: range_expr("x", 0, 10),
259 };
260 let region_routes = vec![RegionRoute {
261 region: Region {
262 id: RegionId::new(1024, 1),
263 partition_expr: range_expr("x", 0, 5).as_json_str().unwrap(),
264 ..Default::default()
265 },
266 leader_peer: Some(Peer::empty(1)),
267 ..Default::default()
268 }];
269 let err = RepartitionStart::ensure_route_present(
270 Uuid::new_v4(),
271 ®ion_routes,
272 &[source_region],
273 &[target_region],
274 )
275 .unwrap_err();
276 assert_matches!(err, Error::PartitionExprMismatch { .. });
277 }
278
279 #[test]
280 fn test_ensure_route_present_missing_target_region() {
281 let source_region = RegionDescriptor {
282 region_id: RegionId::new(1024, 1),
283 partition_expr: range_expr("x", 0, 100),
284 };
285 let target_region = RegionDescriptor {
286 region_id: RegionId::new(1024, 2),
287 partition_expr: range_expr("x", 0, 10),
288 };
289 let region_routes = vec![RegionRoute {
290 region: Region {
291 id: RegionId::new(1024, 1),
292 partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
293 ..Default::default()
294 },
295 leader_peer: Some(Peer::empty(1)),
296 ..Default::default()
297 }];
298 let err = RepartitionStart::ensure_route_present(
299 Uuid::new_v4(),
300 ®ion_routes,
301 &[source_region],
302 &[target_region],
303 )
304 .unwrap_err();
305 assert_matches!(err, Error::RepartitionTargetRegionMissing { .. });
306 }
307}