1use std::cmp::Ordering;
16
17use partition::expr::PartitionExpr;
18use serde::{Deserialize, Serialize};
19use store_api::storage::{RegionId, RegionNumber, TableId};
20
21use crate::procedure::repartition::group::GroupId;
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
25pub struct RegionDescriptor {
26 pub region_id: RegionId,
28 pub partition_expr: PartitionExpr,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
35pub struct AllocationPlanEntry {
36 pub group_id: GroupId,
38 pub source_regions: Vec<RegionDescriptor>,
40 pub target_partition_exprs: Vec<PartitionExpr>,
42 pub transition_map: Vec<Vec<usize>>,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
50pub struct RepartitionPlanEntry {
51 pub group_id: GroupId,
53 pub source_regions: Vec<RegionDescriptor>,
55 pub target_regions: Vec<RegionDescriptor>,
57 pub allocated_region_ids: Vec<RegionId>,
59 pub pending_deallocate_region_ids: Vec<RegionId>,
61 pub transition_map: Vec<Vec<usize>>,
64}
65
66impl RepartitionPlanEntry {
67 pub(crate) fn allocate_regions(&self) -> Vec<&RegionDescriptor> {
69 self.target_regions
70 .iter()
71 .filter(|r| self.allocated_region_ids.contains(&r.region_id))
72 .collect()
73 }
74}
75
76pub fn convert_allocation_plan_to_repartition_plan(
93 table_id: TableId,
94 next_region_number: &mut RegionNumber,
95 AllocationPlanEntry {
96 group_id,
97 source_regions,
98 target_partition_exprs,
99 transition_map,
100 ..
101 }: &AllocationPlanEntry,
102) -> RepartitionPlanEntry {
103 match source_regions.len().cmp(&target_partition_exprs.len()) {
104 Ordering::Less => {
105 let pending_allocate_target_partition_exprs = target_partition_exprs
107 .iter()
108 .skip(source_regions.len())
109 .map(|target_partition_expr| {
110 let desc = RegionDescriptor {
111 region_id: RegionId::new(table_id, *next_region_number),
112 partition_expr: target_partition_expr.clone(),
113 };
114 *next_region_number += 1;
115 desc
116 })
117 .collect::<Vec<_>>();
118
119 let allocated_region_ids = pending_allocate_target_partition_exprs
120 .iter()
121 .map(|rd| rd.region_id)
122 .collect::<Vec<_>>();
123
124 let target_regions = source_regions
125 .iter()
126 .zip(target_partition_exprs.iter())
127 .map(|(source_region, target_partition_expr)| RegionDescriptor {
128 region_id: source_region.region_id,
129 partition_expr: target_partition_expr.clone(),
130 })
131 .chain(pending_allocate_target_partition_exprs)
132 .collect::<Vec<_>>();
133
134 RepartitionPlanEntry {
135 group_id: *group_id,
136 source_regions: source_regions.clone(),
137 target_regions,
138 allocated_region_ids,
139 pending_deallocate_region_ids: vec![],
140 transition_map: transition_map.clone(),
141 }
142 }
143 Ordering::Equal => {
144 let target_regions = source_regions
145 .iter()
146 .zip(target_partition_exprs.iter())
147 .map(|(source_region, target_partition_expr)| RegionDescriptor {
148 region_id: source_region.region_id,
149 partition_expr: target_partition_expr.clone(),
150 })
151 .collect::<Vec<_>>();
152
153 RepartitionPlanEntry {
154 group_id: *group_id,
155 source_regions: source_regions.clone(),
156 target_regions,
157 allocated_region_ids: vec![],
158 pending_deallocate_region_ids: vec![],
159 transition_map: transition_map.clone(),
160 }
161 }
162 Ordering::Greater => {
163 let target_regions = source_regions
165 .iter()
166 .take(target_partition_exprs.len())
167 .zip(target_partition_exprs.iter())
168 .map(|(source_region, target_partition_expr)| RegionDescriptor {
169 region_id: source_region.region_id,
170 partition_expr: target_partition_expr.clone(),
171 })
172 .collect::<Vec<_>>();
173
174 let pending_deallocate_region_ids = source_regions
175 .iter()
176 .skip(target_partition_exprs.len())
177 .map(|source_region| source_region.region_id)
178 .collect::<Vec<_>>();
179
180 RepartitionPlanEntry {
181 group_id: *group_id,
182 source_regions: source_regions.clone(),
183 target_regions,
184 allocated_region_ids: vec![],
185 pending_deallocate_region_ids,
186 transition_map: transition_map.clone(),
187 }
188 }
189 }
190}
191
192#[cfg(test)]
193mod tests {
194 use store_api::storage::RegionId;
195 use uuid::Uuid;
196
197 use super::*;
198 use crate::procedure::repartition::test_util::range_expr;
199
200 fn create_region_descriptor(
201 table_id: TableId,
202 region_number: u32,
203 col: &str,
204 start: i64,
205 end: i64,
206 ) -> RegionDescriptor {
207 RegionDescriptor {
208 region_id: RegionId::new(table_id, region_number),
209 partition_expr: range_expr(col, start, end),
210 }
211 }
212
213 #[test]
214 fn test_convert_plan_equal_regions() {
215 let group_id = Uuid::new_v4();
216 let table_id = 1024;
217 let mut next_region_number = 10;
218 let source_regions = vec![
219 create_region_descriptor(table_id, 1, "x", 0, 100),
220 create_region_descriptor(table_id, 2, "x", 100, 200),
221 ];
222 let target_partition_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 200)];
223 let allocation_plan = AllocationPlanEntry {
224 group_id,
225 source_regions: source_regions.clone(),
226 target_partition_exprs: target_partition_exprs.clone(),
227 transition_map: Vec::new(),
228 };
229 let result = convert_allocation_plan_to_repartition_plan(
230 table_id,
231 &mut next_region_number,
232 &allocation_plan,
233 );
234 assert_eq!(result.group_id, group_id);
235 assert_eq!(result.source_regions, source_regions);
236 assert_eq!(result.target_regions.len(), 2);
237 assert!(result.allocated_region_ids.is_empty());
238 assert!(result.pending_deallocate_region_ids.is_empty());
239 assert_eq!(next_region_number, 10);
241 assert_eq!(
243 result.target_regions[0].region_id,
244 RegionId::new(table_id, 1)
245 );
246 assert_eq!(
247 result.target_regions[0].partition_expr,
248 target_partition_exprs[0]
249 );
250 assert_eq!(
251 result.target_regions[1].region_id,
252 RegionId::new(table_id, 2)
253 );
254 assert_eq!(
255 result.target_regions[1].partition_expr,
256 target_partition_exprs[1]
257 );
258 }
259
260 #[test]
261 fn test_convert_plan_allocate_regions() {
262 let group_id = Uuid::new_v4();
263 let table_id = 1024;
264 let mut next_region_number = 10;
265
266 let source_regions = vec![
268 create_region_descriptor(table_id, 1, "x", 0, 100),
269 create_region_descriptor(table_id, 2, "x", 100, 200),
270 create_region_descriptor(table_id, 3, "x", 200, 300),
271 ];
272 let target_partition_exprs = vec![
273 range_expr("x", 0, 50),
274 range_expr("x", 50, 100),
275 range_expr("x", 100, 150),
276 range_expr("x", 150, 200),
277 range_expr("x", 200, 300),
278 ];
279 let allocation_plan = AllocationPlanEntry {
280 group_id,
281 source_regions: source_regions.clone(),
282 target_partition_exprs: target_partition_exprs.clone(),
283 transition_map: vec![],
284 };
285 let result = convert_allocation_plan_to_repartition_plan(
286 table_id,
287 &mut next_region_number,
288 &allocation_plan,
289 );
290 assert_eq!(result.group_id, group_id);
291 assert_eq!(result.source_regions, source_regions);
292 assert_eq!(result.target_regions.len(), 5);
293 assert_eq!(result.allocated_region_ids.len(), 2);
294 assert!(result.pending_deallocate_region_ids.is_empty());
295 assert_eq!(next_region_number, 12);
296
297 assert_eq!(
299 result.target_regions[0].region_id,
300 RegionId::new(table_id, 1)
301 );
302 assert_eq!(
303 result.target_regions[0].partition_expr,
304 target_partition_exprs[0]
305 );
306 assert_eq!(
307 result.target_regions[1].region_id,
308 RegionId::new(table_id, 2)
309 );
310 assert_eq!(
311 result.target_regions[1].partition_expr,
312 target_partition_exprs[1]
313 );
314 assert_eq!(
315 result.target_regions[2].region_id,
316 RegionId::new(table_id, 3)
317 );
318 assert_eq!(
319 result.target_regions[2].partition_expr,
320 target_partition_exprs[2]
321 );
322
323 assert_eq!(
325 result.target_regions[3].region_id,
326 RegionId::new(table_id, 10)
327 );
328 assert_eq!(
329 result.target_regions[3].partition_expr,
330 target_partition_exprs[3]
331 );
332 assert_eq!(
333 result.target_regions[4].region_id,
334 RegionId::new(table_id, 11)
335 );
336 assert_eq!(
337 result.target_regions[4].partition_expr,
338 target_partition_exprs[4]
339 );
340
341 assert_eq!(result.allocated_region_ids[0], RegionId::new(table_id, 10));
343 assert_eq!(result.allocated_region_ids[1], RegionId::new(table_id, 11));
344 }
345
346 #[test]
347 fn test_convert_plan_deallocate_regions() {
348 let group_id = Uuid::new_v4();
349 let table_id = 1024;
350
351 let source_regions = vec![
353 create_region_descriptor(table_id, 1, "x", 0, 50),
354 create_region_descriptor(table_id, 2, "x", 50, 100),
355 create_region_descriptor(table_id, 3, "x", 100, 150),
356 create_region_descriptor(table_id, 4, "x", 150, 200),
357 create_region_descriptor(table_id, 5, "x", 200, 300),
358 ];
359 let target_partition_exprs = vec![
360 range_expr("x", 0, 100),
361 range_expr("x", 100, 200),
362 range_expr("x", 200, 300),
363 ];
364 let allocation_plan = AllocationPlanEntry {
365 group_id,
366 source_regions: source_regions.clone(),
367 target_partition_exprs: target_partition_exprs.clone(),
368 transition_map: vec![],
369 };
370 let mut next_region_number = 10;
371 let result = convert_allocation_plan_to_repartition_plan(
372 table_id,
373 &mut next_region_number,
374 &allocation_plan,
375 );
376 assert_eq!(next_region_number, 10);
377 assert_eq!(result.group_id, group_id);
378 assert_eq!(result.source_regions, source_regions);
379 assert_eq!(result.target_regions.len(), 3);
380 assert!(result.allocated_region_ids.is_empty());
381 assert_eq!(result.pending_deallocate_region_ids.len(), 2);
382
383 assert_eq!(
385 result.target_regions[0].region_id,
386 RegionId::new(table_id, 1)
387 );
388 assert_eq!(
389 result.target_regions[0].partition_expr,
390 target_partition_exprs[0]
391 );
392 assert_eq!(
393 result.target_regions[1].region_id,
394 RegionId::new(table_id, 2)
395 );
396 assert_eq!(
397 result.target_regions[1].partition_expr,
398 target_partition_exprs[1]
399 );
400 assert_eq!(
401 result.target_regions[2].region_id,
402 RegionId::new(table_id, 3)
403 );
404 assert_eq!(
405 result.target_regions[2].partition_expr,
406 target_partition_exprs[2]
407 );
408
409 assert_eq!(
411 result.pending_deallocate_region_ids[0],
412 RegionId::new(table_id, 4)
413 );
414 assert_eq!(
415 result.pending_deallocate_region_ids[1],
416 RegionId::new(table_id, 5)
417 );
418 }
419
420 #[test]
421 fn test_convert_plan_allocate_single_region() {
422 let group_id = Uuid::new_v4();
423 let table_id = 1024;
424 let mut next_region_number = 5;
425 let source_regions = vec![create_region_descriptor(table_id, 1, "x", 0, 100)];
427 let target_partition_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)];
428 let allocation_plan = AllocationPlanEntry {
429 group_id,
430 source_regions: source_regions.clone(),
431 target_partition_exprs: target_partition_exprs.clone(),
432 transition_map: vec![],
433 };
434
435 let result = convert_allocation_plan_to_repartition_plan(
436 table_id,
437 &mut next_region_number,
438 &allocation_plan,
439 );
440 assert_eq!(result.target_regions.len(), 2);
441 assert_eq!(result.allocated_region_ids.len(), 1);
442 assert_eq!(result.pending_deallocate_region_ids.len(), 0);
443 assert_eq!(
445 result.target_regions[0].region_id,
446 RegionId::new(table_id, 1)
447 );
448 assert_eq!(
449 result.target_regions[0].partition_expr,
450 target_partition_exprs[0]
451 );
452 assert_eq!(
454 result.target_regions[1].region_id,
455 RegionId::new(table_id, 5)
456 );
457 assert_eq!(
458 result.target_regions[1].partition_expr,
459 target_partition_exprs[1]
460 );
461 assert_eq!(next_region_number, 6);
462 }
463
464 #[test]
465 fn test_convert_plan_deallocate_to_single_region() {
466 let group_id = Uuid::new_v4();
467 let table_id = 1024;
468
469 let source_regions = vec![
471 create_region_descriptor(table_id, 1, "x", 0, 100),
472 create_region_descriptor(table_id, 2, "x", 100, 200),
473 create_region_descriptor(table_id, 3, "x", 200, 300),
474 ];
475 let target_partition_exprs = vec![range_expr("x", 0, 300)];
476 let allocation_plan = AllocationPlanEntry {
477 group_id,
478 source_regions: source_regions.clone(),
479 target_partition_exprs: target_partition_exprs.clone(),
480 transition_map: vec![],
481 };
482 let mut next_region_number = 10;
483 let result = convert_allocation_plan_to_repartition_plan(
484 table_id,
485 &mut next_region_number,
486 &allocation_plan,
487 );
488 assert_eq!(result.target_regions.len(), 1);
489 assert_eq!(result.allocated_region_ids.len(), 0);
490 assert_eq!(result.pending_deallocate_region_ids.len(), 2);
491
492 assert_eq!(
494 result.target_regions[0].region_id,
495 RegionId::new(table_id, 1)
496 );
497 assert_eq!(
498 result.target_regions[0].partition_expr,
499 target_partition_exprs[0]
500 );
501
502 assert_eq!(
504 result.pending_deallocate_region_ids[0],
505 RegionId::new(table_id, 2)
506 );
507 assert_eq!(
508 result.pending_deallocate_region_ids[1],
509 RegionId::new(table_id, 3)
510 );
511 }
512}