1use std::cmp::Ordering;
16
17use common_meta::rpc::router::RegionRoute;
18use partition::expr::PartitionExpr;
19use serde::{Deserialize, Serialize};
20use store_api::storage::{RegionId, RegionNumber, TableId};
21
22use crate::procedure::repartition::group::GroupId;
23
24#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
26pub struct RegionDescriptor {
27 pub region_id: RegionId,
29 pub partition_expr: PartitionExpr,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
36pub struct AllocationPlanEntry {
37 pub group_id: GroupId,
39 pub source_regions: Vec<RegionDescriptor>,
41 pub target_partition_exprs: Vec<PartitionExpr>,
43 pub transition_map: Vec<Vec<usize>>,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
51pub struct RepartitionPlanEntry {
52 pub group_id: GroupId,
54 pub source_regions: Vec<RegionDescriptor>,
56 pub target_regions: Vec<RegionDescriptor>,
58 pub allocated_region_ids: Vec<RegionId>,
60 pub pending_deallocate_region_ids: Vec<RegionId>,
62 pub transition_map: Vec<Vec<usize>>,
65 #[serde(default)]
67 pub original_target_routes: Vec<RegionRoute>,
68}
69
70impl RepartitionPlanEntry {
71 pub(crate) fn allocate_regions(&self) -> Vec<&RegionDescriptor> {
73 self.target_regions
74 .iter()
75 .filter(|r| self.allocated_region_ids.contains(&r.region_id))
76 .collect()
77 }
78}
79
80pub fn convert_allocation_plan_to_repartition_plan(
97 table_id: TableId,
98 next_region_number: &mut RegionNumber,
99 AllocationPlanEntry {
100 group_id,
101 source_regions,
102 target_partition_exprs,
103 transition_map,
104 ..
105 }: &AllocationPlanEntry,
106) -> RepartitionPlanEntry {
107 match source_regions.len().cmp(&target_partition_exprs.len()) {
108 Ordering::Less => {
109 let pending_allocate_target_partition_exprs = target_partition_exprs
111 .iter()
112 .skip(source_regions.len())
113 .map(|target_partition_expr| {
114 let desc = RegionDescriptor {
115 region_id: RegionId::new(table_id, *next_region_number),
116 partition_expr: target_partition_expr.clone(),
117 };
118 *next_region_number += 1;
119 desc
120 })
121 .collect::<Vec<_>>();
122
123 let allocated_region_ids = pending_allocate_target_partition_exprs
124 .iter()
125 .map(|rd| rd.region_id)
126 .collect::<Vec<_>>();
127
128 let target_regions = source_regions
129 .iter()
130 .zip(target_partition_exprs.iter())
131 .map(|(source_region, target_partition_expr)| RegionDescriptor {
132 region_id: source_region.region_id,
133 partition_expr: target_partition_expr.clone(),
134 })
135 .chain(pending_allocate_target_partition_exprs)
136 .collect::<Vec<_>>();
137
138 RepartitionPlanEntry {
139 group_id: *group_id,
140 source_regions: source_regions.clone(),
141 target_regions,
142 allocated_region_ids,
143 pending_deallocate_region_ids: vec![],
144 transition_map: transition_map.clone(),
145 original_target_routes: vec![],
146 }
147 }
148 Ordering::Equal => {
149 let target_regions = source_regions
150 .iter()
151 .zip(target_partition_exprs.iter())
152 .map(|(source_region, target_partition_expr)| RegionDescriptor {
153 region_id: source_region.region_id,
154 partition_expr: target_partition_expr.clone(),
155 })
156 .collect::<Vec<_>>();
157
158 RepartitionPlanEntry {
159 group_id: *group_id,
160 source_regions: source_regions.clone(),
161 target_regions,
162 allocated_region_ids: vec![],
163 pending_deallocate_region_ids: vec![],
164 transition_map: transition_map.clone(),
165 original_target_routes: vec![],
166 }
167 }
168 Ordering::Greater => {
169 let target_regions = source_regions
171 .iter()
172 .take(target_partition_exprs.len())
173 .zip(target_partition_exprs.iter())
174 .map(|(source_region, target_partition_expr)| RegionDescriptor {
175 region_id: source_region.region_id,
176 partition_expr: target_partition_expr.clone(),
177 })
178 .collect::<Vec<_>>();
179
180 let pending_deallocate_region_ids = source_regions
181 .iter()
182 .skip(target_partition_exprs.len())
183 .map(|source_region| source_region.region_id)
184 .collect::<Vec<_>>();
185
186 RepartitionPlanEntry {
187 group_id: *group_id,
188 source_regions: source_regions.clone(),
189 target_regions,
190 allocated_region_ids: vec![],
191 pending_deallocate_region_ids,
192 transition_map: transition_map.clone(),
193 original_target_routes: vec![],
194 }
195 }
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use store_api::storage::RegionId;
202 use uuid::Uuid;
203
204 use super::*;
205 use crate::procedure::repartition::test_util::range_expr;
206
207 fn create_region_descriptor(
208 table_id: TableId,
209 region_number: u32,
210 col: &str,
211 start: i64,
212 end: i64,
213 ) -> RegionDescriptor {
214 RegionDescriptor {
215 region_id: RegionId::new(table_id, region_number),
216 partition_expr: range_expr(col, start, end),
217 }
218 }
219
220 #[test]
221 fn test_convert_plan_equal_regions() {
222 let group_id = Uuid::new_v4();
223 let table_id = 1024;
224 let mut next_region_number = 10;
225 let source_regions = vec![
226 create_region_descriptor(table_id, 1, "x", 0, 100),
227 create_region_descriptor(table_id, 2, "x", 100, 200),
228 ];
229 let target_partition_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 200)];
230 let allocation_plan = AllocationPlanEntry {
231 group_id,
232 source_regions: source_regions.clone(),
233 target_partition_exprs: target_partition_exprs.clone(),
234 transition_map: Vec::new(),
235 };
236 let result = convert_allocation_plan_to_repartition_plan(
237 table_id,
238 &mut next_region_number,
239 &allocation_plan,
240 );
241 assert_eq!(result.group_id, group_id);
242 assert_eq!(result.source_regions, source_regions);
243 assert_eq!(result.target_regions.len(), 2);
244 assert!(result.allocated_region_ids.is_empty());
245 assert!(result.pending_deallocate_region_ids.is_empty());
246 assert_eq!(next_region_number, 10);
248 assert_eq!(
250 result.target_regions[0].region_id,
251 RegionId::new(table_id, 1)
252 );
253 assert_eq!(
254 result.target_regions[0].partition_expr,
255 target_partition_exprs[0]
256 );
257 assert_eq!(
258 result.target_regions[1].region_id,
259 RegionId::new(table_id, 2)
260 );
261 assert_eq!(
262 result.target_regions[1].partition_expr,
263 target_partition_exprs[1]
264 );
265 }
266
267 #[test]
268 fn test_convert_plan_allocate_regions() {
269 let group_id = Uuid::new_v4();
270 let table_id = 1024;
271 let mut next_region_number = 10;
272
273 let source_regions = vec![
275 create_region_descriptor(table_id, 1, "x", 0, 100),
276 create_region_descriptor(table_id, 2, "x", 100, 200),
277 create_region_descriptor(table_id, 3, "x", 200, 300),
278 ];
279 let target_partition_exprs = vec![
280 range_expr("x", 0, 50),
281 range_expr("x", 50, 100),
282 range_expr("x", 100, 150),
283 range_expr("x", 150, 200),
284 range_expr("x", 200, 300),
285 ];
286 let allocation_plan = AllocationPlanEntry {
287 group_id,
288 source_regions: source_regions.clone(),
289 target_partition_exprs: target_partition_exprs.clone(),
290 transition_map: vec![],
291 };
292 let result = convert_allocation_plan_to_repartition_plan(
293 table_id,
294 &mut next_region_number,
295 &allocation_plan,
296 );
297 assert_eq!(result.group_id, group_id);
298 assert_eq!(result.source_regions, source_regions);
299 assert_eq!(result.target_regions.len(), 5);
300 assert_eq!(result.allocated_region_ids.len(), 2);
301 assert!(result.pending_deallocate_region_ids.is_empty());
302 assert_eq!(next_region_number, 12);
303
304 assert_eq!(
306 result.target_regions[0].region_id,
307 RegionId::new(table_id, 1)
308 );
309 assert_eq!(
310 result.target_regions[0].partition_expr,
311 target_partition_exprs[0]
312 );
313 assert_eq!(
314 result.target_regions[1].region_id,
315 RegionId::new(table_id, 2)
316 );
317 assert_eq!(
318 result.target_regions[1].partition_expr,
319 target_partition_exprs[1]
320 );
321 assert_eq!(
322 result.target_regions[2].region_id,
323 RegionId::new(table_id, 3)
324 );
325 assert_eq!(
326 result.target_regions[2].partition_expr,
327 target_partition_exprs[2]
328 );
329
330 assert_eq!(
332 result.target_regions[3].region_id,
333 RegionId::new(table_id, 10)
334 );
335 assert_eq!(
336 result.target_regions[3].partition_expr,
337 target_partition_exprs[3]
338 );
339 assert_eq!(
340 result.target_regions[4].region_id,
341 RegionId::new(table_id, 11)
342 );
343 assert_eq!(
344 result.target_regions[4].partition_expr,
345 target_partition_exprs[4]
346 );
347
348 assert_eq!(result.allocated_region_ids[0], RegionId::new(table_id, 10));
350 assert_eq!(result.allocated_region_ids[1], RegionId::new(table_id, 11));
351 }
352
353 #[test]
354 fn test_convert_plan_deallocate_regions() {
355 let group_id = Uuid::new_v4();
356 let table_id = 1024;
357
358 let source_regions = vec![
360 create_region_descriptor(table_id, 1, "x", 0, 50),
361 create_region_descriptor(table_id, 2, "x", 50, 100),
362 create_region_descriptor(table_id, 3, "x", 100, 150),
363 create_region_descriptor(table_id, 4, "x", 150, 200),
364 create_region_descriptor(table_id, 5, "x", 200, 300),
365 ];
366 let target_partition_exprs = vec![
367 range_expr("x", 0, 100),
368 range_expr("x", 100, 200),
369 range_expr("x", 200, 300),
370 ];
371 let allocation_plan = AllocationPlanEntry {
372 group_id,
373 source_regions: source_regions.clone(),
374 target_partition_exprs: target_partition_exprs.clone(),
375 transition_map: vec![],
376 };
377 let mut next_region_number = 10;
378 let result = convert_allocation_plan_to_repartition_plan(
379 table_id,
380 &mut next_region_number,
381 &allocation_plan,
382 );
383 assert_eq!(next_region_number, 10);
384 assert_eq!(result.group_id, group_id);
385 assert_eq!(result.source_regions, source_regions);
386 assert_eq!(result.target_regions.len(), 3);
387 assert!(result.allocated_region_ids.is_empty());
388 assert_eq!(result.pending_deallocate_region_ids.len(), 2);
389
390 assert_eq!(
392 result.target_regions[0].region_id,
393 RegionId::new(table_id, 1)
394 );
395 assert_eq!(
396 result.target_regions[0].partition_expr,
397 target_partition_exprs[0]
398 );
399 assert_eq!(
400 result.target_regions[1].region_id,
401 RegionId::new(table_id, 2)
402 );
403 assert_eq!(
404 result.target_regions[1].partition_expr,
405 target_partition_exprs[1]
406 );
407 assert_eq!(
408 result.target_regions[2].region_id,
409 RegionId::new(table_id, 3)
410 );
411 assert_eq!(
412 result.target_regions[2].partition_expr,
413 target_partition_exprs[2]
414 );
415
416 assert_eq!(
418 result.pending_deallocate_region_ids[0],
419 RegionId::new(table_id, 4)
420 );
421 assert_eq!(
422 result.pending_deallocate_region_ids[1],
423 RegionId::new(table_id, 5)
424 );
425 }
426
427 #[test]
428 fn test_convert_plan_allocate_single_region() {
429 let group_id = Uuid::new_v4();
430 let table_id = 1024;
431 let mut next_region_number = 5;
432 let source_regions = vec![create_region_descriptor(table_id, 1, "x", 0, 100)];
434 let target_partition_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)];
435 let allocation_plan = AllocationPlanEntry {
436 group_id,
437 source_regions: source_regions.clone(),
438 target_partition_exprs: target_partition_exprs.clone(),
439 transition_map: vec![],
440 };
441
442 let result = convert_allocation_plan_to_repartition_plan(
443 table_id,
444 &mut next_region_number,
445 &allocation_plan,
446 );
447 assert_eq!(result.target_regions.len(), 2);
448 assert_eq!(result.allocated_region_ids.len(), 1);
449 assert_eq!(result.pending_deallocate_region_ids.len(), 0);
450 assert_eq!(
452 result.target_regions[0].region_id,
453 RegionId::new(table_id, 1)
454 );
455 assert_eq!(
456 result.target_regions[0].partition_expr,
457 target_partition_exprs[0]
458 );
459 assert_eq!(
461 result.target_regions[1].region_id,
462 RegionId::new(table_id, 5)
463 );
464 assert_eq!(
465 result.target_regions[1].partition_expr,
466 target_partition_exprs[1]
467 );
468 assert_eq!(next_region_number, 6);
469 }
470
471 #[test]
472 fn test_convert_plan_deallocate_to_single_region() {
473 let group_id = Uuid::new_v4();
474 let table_id = 1024;
475
476 let source_regions = vec![
478 create_region_descriptor(table_id, 1, "x", 0, 100),
479 create_region_descriptor(table_id, 2, "x", 100, 200),
480 create_region_descriptor(table_id, 3, "x", 200, 300),
481 ];
482 let target_partition_exprs = vec![range_expr("x", 0, 300)];
483 let allocation_plan = AllocationPlanEntry {
484 group_id,
485 source_regions: source_regions.clone(),
486 target_partition_exprs: target_partition_exprs.clone(),
487 transition_map: vec![],
488 };
489 let mut next_region_number = 10;
490 let result = convert_allocation_plan_to_repartition_plan(
491 table_id,
492 &mut next_region_number,
493 &allocation_plan,
494 );
495 assert_eq!(result.target_regions.len(), 1);
496 assert_eq!(result.allocated_region_ids.len(), 0);
497 assert_eq!(result.pending_deallocate_region_ids.len(), 2);
498
499 assert_eq!(
501 result.target_regions[0].region_id,
502 RegionId::new(table_id, 1)
503 );
504 assert_eq!(
505 result.target_regions[0].partition_expr,
506 target_partition_exprs[0]
507 );
508
509 assert_eq!(
511 result.pending_deallocate_region_ids[0],
512 RegionId::new(table_id, 2)
513 );
514 assert_eq!(
515 result.pending_deallocate_region_ids[1],
516 RegionId::new(table_id, 3)
517 );
518 }
519}