1use std::any::Any;
16use std::collections::{HashMap, HashSet};
17
18use common_meta::rpc::router::RegionRoute;
19use common_procedure::{Context as ProcedureContext, Status};
20use common_telemetry::debug;
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ensure};
23
24use crate::error::{self, Result};
25use crate::procedure::repartition::group::sync_region::SyncRegion;
26use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
27use crate::procedure::repartition::group::{
28 Context, GroupId, GroupPrepareResult, State, region_routes,
29};
30use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor};
31
32#[derive(Debug, Serialize, Deserialize)]
33pub struct RepartitionStart;
34
35fn ensure_source_region_route_expr_match(
37 region_route: &RegionRoute,
38 source: &SourceRegionDescriptor,
39) -> Result<RegionRoute> {
40 let actual = region_route.region.partition_expr();
41 let expected = source.route_expr_for_rollback()?;
42 ensure!(
43 actual == expected,
44 error::PartitionExprMismatchSnafu {
45 region_id: region_route.region.id,
46 expected,
47 actual,
48 }
49 );
50 Ok(region_route.clone())
51}
52
53impl RepartitionStart {
54 fn ensure_route_present(
58 group_id: GroupId,
59 region_routes: &[RegionRoute],
60 sources: &[SourceRegionDescriptor],
61 targets: &[TargetRegionDescriptor],
62 ) -> Result<GroupPrepareResult> {
63 ensure!(
64 !sources.is_empty(),
65 error::UnexpectedSnafu {
66 violated: "Sources are empty"
67 }
68 );
69
70 let region_routes_map = region_routes
71 .iter()
72 .map(|r| (r.region.id, r))
73 .collect::<HashMap<_, _>>();
74 let source_region_routes = sources
75 .iter()
76 .map(|s| {
77 region_routes_map
78 .get(&s.region_id())
79 .context(error::RepartitionSourceRegionMissingSnafu {
80 group_id,
81 region_id: s.region_id(),
82 })
83 .and_then(|r| ensure_source_region_route_expr_match(r, s))
84 })
85 .collect::<Result<Vec<_>>>()?;
86 let target_region_routes = targets
87 .iter()
88 .map(|t| {
89 region_routes_map
90 .get(&t.region_id)
91 .context(error::RepartitionTargetRegionMissingSnafu {
92 group_id,
93 region_id: t.region_id,
94 })
95 .map(|r| (*r).clone())
96 })
97 .collect::<Result<Vec<_>>>()?;
98 for target_region_route in &target_region_routes {
99 ensure!(
100 target_region_route.leader_peer.is_some(),
101 error::UnexpectedSnafu {
102 violated: format!(
103 "Leader peer is not set for region: {}",
104 target_region_route.region.id
105 ),
106 }
107 );
108 }
109 let central_region = sources[0].region_id();
110 let central_region_datanode = source_region_routes[0]
111 .leader_peer
112 .as_ref()
113 .context(error::UnexpectedSnafu {
114 violated: format!(
115 "Leader peer is not set for central region: {}",
116 central_region
117 ),
118 })?
119 .clone();
120
121 Ok(GroupPrepareResult {
122 source_routes: source_region_routes,
123 target_routes: target_region_routes,
124 central_region,
125 central_region_datanode,
126 })
127 }
128}
129
130#[async_trait::async_trait]
131#[typetag::serde]
132impl State for RepartitionStart {
133 async fn next(
144 &mut self,
145 ctx: &mut Context,
146 _procedure_ctx: &ProcedureContext,
147 ) -> Result<(Box<dyn State>, Status)> {
148 if ctx.persistent_ctx.group_prepare_result.is_some() {
149 return Ok((
150 Box::new(UpdateMetadata::ApplyStaging),
151 Status::executing(true),
152 ));
153 }
154 let table_id = ctx.persistent_ctx.table_id;
155 let group_id = ctx.persistent_ctx.group_id;
156 let table_route_value = ctx.get_table_route_value().await?.into_inner();
157 let region_routes = region_routes(table_id, &table_route_value)?;
158 let group_prepare_result = Self::ensure_route_present(
159 group_id,
160 region_routes,
161 &ctx.persistent_ctx.sources,
162 &ctx.persistent_ctx.targets,
163 )?;
164 ctx.persistent_ctx.group_prepare_result = Some(group_prepare_result);
165 debug!(
166 "Repartition group {}: captured {} sources, {} targets",
167 group_id,
168 ctx.persistent_ctx.sources.len(),
169 ctx.persistent_ctx.targets.len()
170 );
171
172 if ctx.persistent_ctx.sync_region {
173 let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
174 let allocated_region_ids: HashSet<_> = ctx
175 .persistent_ctx
176 .allocated_region_ids
177 .iter()
178 .copied()
179 .collect();
180 let region_routes: Vec<_> = prepare_result
181 .target_routes
182 .iter()
183 .filter(|route| allocated_region_ids.contains(&route.region.id))
184 .cloned()
185 .collect();
186 if !region_routes.is_empty() {
187 return Ok((
188 Box::new(SyncRegion { region_routes }),
189 Status::executing(true),
190 ));
191 }
192 }
193
194 Ok((
195 Box::new(UpdateMetadata::ApplyStaging),
196 Status::executing(true),
197 ))
198 }
199
200 fn as_any(&self) -> &dyn Any {
201 self
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use std::assert_matches;
208
209 use common_meta::peer::Peer;
210 use common_meta::rpc::router::{Region, RegionRoute};
211 use store_api::storage::RegionId;
212 use uuid::Uuid;
213
214 use crate::error::Error;
215 use crate::procedure::repartition::group::repartition_start::RepartitionStart;
216 use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor};
217 use crate::procedure::repartition::test_util::range_expr;
218
219 #[test]
220 fn test_ensure_route_present_missing_source_region() {
221 let source_region =
222 SourceRegionDescriptor::partitioned(RegionId::new(1024, 1), range_expr("x", 0, 100));
223 let target_region = TargetRegionDescriptor {
224 region_id: RegionId::new(1024, 2),
225 partition_expr: range_expr("x", 0, 10),
226 };
227 let region_routes = vec![RegionRoute {
228 region: Region {
229 id: RegionId::new(1024, 2),
230 ..Default::default()
231 },
232 leader_peer: Some(Peer::empty(1)),
233 ..Default::default()
234 }];
235 let err = RepartitionStart::ensure_route_present(
236 Uuid::new_v4(),
237 ®ion_routes,
238 &[source_region],
239 &[target_region],
240 )
241 .unwrap_err();
242 assert_matches!(err, Error::RepartitionSourceRegionMissing { .. });
243 }
244
245 #[test]
246 fn test_ensure_route_present_partition_expr_mismatch() {
247 let source_region =
248 SourceRegionDescriptor::partitioned(RegionId::new(1024, 1), range_expr("x", 0, 100));
249 let target_region = TargetRegionDescriptor {
250 region_id: RegionId::new(1024, 2),
251 partition_expr: range_expr("x", 0, 10),
252 };
253 let region_routes = vec![RegionRoute {
254 region: Region {
255 id: RegionId::new(1024, 1),
256 partition_expr: range_expr("x", 0, 5).as_json_str().unwrap(),
257 ..Default::default()
258 },
259 leader_peer: Some(Peer::empty(1)),
260 ..Default::default()
261 }];
262 let err = RepartitionStart::ensure_route_present(
263 Uuid::new_v4(),
264 ®ion_routes,
265 &[source_region],
266 &[target_region],
267 )
268 .unwrap_err();
269 assert_matches!(err, Error::PartitionExprMismatch { .. });
270 }
271
272 #[test]
273 fn test_ensure_route_present_default_source_matches_empty_partition_expr() {
274 let source_region = SourceRegionDescriptor::Default {
275 region_id: RegionId::new(1024, 1),
276 };
277 let target_region = TargetRegionDescriptor {
278 region_id: RegionId::new(1024, 1),
279 partition_expr: range_expr("x", 0, 10),
280 };
281 let region_routes = vec![RegionRoute {
282 region: Region {
283 id: RegionId::new(1024, 1),
284 partition_expr: String::new(),
285 ..Default::default()
286 },
287 leader_peer: Some(Peer::empty(1)),
288 ..Default::default()
289 }];
290
291 let result = RepartitionStart::ensure_route_present(
292 Uuid::new_v4(),
293 ®ion_routes,
294 &[source_region],
295 &[target_region],
296 );
297
298 assert!(result.is_ok());
299 }
300
301 #[test]
302 fn test_ensure_route_present_default_source_rejects_non_empty_partition_expr() {
303 let source_region = SourceRegionDescriptor::Default {
304 region_id: RegionId::new(1024, 1),
305 };
306 let target_region = TargetRegionDescriptor {
307 region_id: RegionId::new(1024, 1),
308 partition_expr: range_expr("x", 0, 10),
309 };
310 let region_routes = vec![RegionRoute {
311 region: Region {
312 id: RegionId::new(1024, 1),
313 partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
314 ..Default::default()
315 },
316 leader_peer: Some(Peer::empty(1)),
317 ..Default::default()
318 }];
319
320 let err = RepartitionStart::ensure_route_present(
321 Uuid::new_v4(),
322 ®ion_routes,
323 &[source_region],
324 &[target_region],
325 )
326 .unwrap_err();
327
328 assert_matches!(err, Error::PartitionExprMismatch { .. });
329 }
330
331 #[test]
332 fn test_ensure_route_present_missing_target_region() {
333 let source_region =
334 SourceRegionDescriptor::partitioned(RegionId::new(1024, 1), range_expr("x", 0, 100));
335 let target_region = TargetRegionDescriptor {
336 region_id: RegionId::new(1024, 2),
337 partition_expr: range_expr("x", 0, 10),
338 };
339 let region_routes = vec![RegionRoute {
340 region: Region {
341 id: RegionId::new(1024, 1),
342 partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
343 ..Default::default()
344 },
345 leader_peer: Some(Peer::empty(1)),
346 ..Default::default()
347 }];
348 let err = RepartitionStart::ensure_route_present(
349 Uuid::new_v4(),
350 ®ion_routes,
351 &[source_region],
352 &[target_region],
353 )
354 .unwrap_err();
355 assert_matches!(err, Error::RepartitionTargetRegionMissing { .. });
356 }
357
358 #[test]
359 fn test_ensure_route_present_legacy_partition_expr_source() {
360 let source_region =
361 SourceRegionDescriptor::partitioned(RegionId::new(1024, 1), range_expr("x", 0, 100));
362 let target_region = TargetRegionDescriptor {
363 region_id: RegionId::new(1024, 2),
364 partition_expr: range_expr("x", 0, 10),
365 };
366 let legacy_partition_expr = range_expr("x", 0, 100).as_json_str().unwrap();
367 let legacy_region_json = serde_json::json!({
368 "id": RegionId::new(1024, 1).as_u64(),
369 "name": "",
370 "partition": {
371 "column_list": ["x"],
372 "value_list": [legacy_partition_expr]
373 },
374 "partition_expr": "",
375 "attrs": {}
376 });
377
378 let region_routes = vec![
379 RegionRoute {
380 region: serde_json::from_value(legacy_region_json).unwrap(),
381 leader_peer: Some(Peer::empty(1)),
382 ..Default::default()
383 },
384 RegionRoute {
385 region: Region {
386 id: RegionId::new(1024, 2),
387 ..Default::default()
388 },
389 leader_peer: Some(Peer::empty(1)),
390 ..Default::default()
391 },
392 ];
393
394 let result = RepartitionStart::ensure_route_present(
395 Uuid::new_v4(),
396 ®ion_routes,
397 &[source_region],
398 &[target_region],
399 );
400 assert!(result.is_ok());
401 }
402}