1use std::cmp::Ordering;
16
17use common_meta::rpc::router::RegionRoute;
18use partition::expr::PartitionExpr;
19use serde::{Deserialize, Deserializer, Serialize};
20use snafu::ResultExt;
21use store_api::storage::{RegionId, RegionNumber, TableId};
22
23use crate::error::{self, Result};
24use crate::procedure::repartition::group::GroupId;
25
26#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
31pub enum SourceRegionDescriptor {
32 Partitioned {
34 region_id: RegionId,
36 partition_expr: PartitionExpr,
38 },
39 Default {
41 region_id: RegionId,
43 },
44}
45
46impl<'de> Deserialize<'de> for SourceRegionDescriptor {
47 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
48 where
49 D: Deserializer<'de>,
50 {
51 #[derive(Deserialize)]
52 #[serde(deny_unknown_fields)]
53 struct PartitionedSourceRegionDescriptor {
54 region_id: RegionId,
55 partition_expr: PartitionExpr,
56 }
57
58 #[derive(Deserialize)]
59 #[serde(untagged)]
60 enum SourceRegionDescriptorRepr {
61 Tagged(SourceRegionDescriptorTaggedRepr),
62 Legacy(PartitionedSourceRegionDescriptor),
63 }
64
65 #[derive(Deserialize)]
66 enum SourceRegionDescriptorTaggedRepr {
67 Partitioned {
68 region_id: RegionId,
69 partition_expr: PartitionExpr,
70 },
71 Default {
72 region_id: RegionId,
73 },
74 }
75
76 match SourceRegionDescriptorRepr::deserialize(deserializer)? {
77 SourceRegionDescriptorRepr::Tagged(SourceRegionDescriptorTaggedRepr::Partitioned {
78 region_id,
79 partition_expr,
80 }) => Ok(Self::Partitioned {
81 region_id,
82 partition_expr,
83 }),
84 SourceRegionDescriptorRepr::Tagged(SourceRegionDescriptorTaggedRepr::Default {
85 region_id,
86 }) => Ok(Self::Default { region_id }),
87 SourceRegionDescriptorRepr::Legacy(descriptor) => Ok(Self::Partitioned {
88 region_id: descriptor.region_id,
89 partition_expr: descriptor.partition_expr,
90 }),
91 }
92 }
93}
94
95impl SourceRegionDescriptor {
96 pub fn partitioned(region_id: RegionId, partition_expr: PartitionExpr) -> Self {
98 Self::Partitioned {
99 region_id,
100 partition_expr,
101 }
102 }
103
104 pub fn region_id(&self) -> RegionId {
106 match self {
107 Self::Partitioned { region_id, .. } => *region_id,
108 Self::Default { region_id } => *region_id,
109 }
110 }
111
112 pub fn partition_expr(&self) -> Option<&PartitionExpr> {
114 match self {
115 Self::Partitioned { partition_expr, .. } => Some(partition_expr),
116 Self::Default { .. } => None,
117 }
118 }
119
120 pub fn matches_route_expr(&self, route_expr: &str) -> Result<bool> {
122 match self {
123 Self::Partitioned { partition_expr, .. } => {
124 let expected = partition_expr
125 .as_json_str()
126 .context(error::SerializePartitionExprSnafu)?;
127 Ok(route_expr == expected)
128 }
129 Self::Default { .. } => Ok(route_expr.is_empty()),
130 }
131 }
132
133 pub fn route_expr_for_rollback(&self) -> Result<String> {
135 match self {
136 Self::Partitioned { partition_expr, .. } => partition_expr
137 .as_json_str()
138 .context(error::SerializePartitionExprSnafu),
139 Self::Default { .. } => Ok(String::new()),
140 }
141 }
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
146pub struct TargetRegionDescriptor {
147 pub region_id: RegionId,
149 pub partition_expr: PartitionExpr,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
156pub struct AllocationPlanEntry {
157 pub group_id: GroupId,
159 pub source_regions: Vec<SourceRegionDescriptor>,
161 pub target_partition_exprs: Vec<PartitionExpr>,
163 pub transition_map: Vec<Vec<usize>>,
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
171pub struct RepartitionPlanEntry {
172 pub group_id: GroupId,
174 pub source_regions: Vec<SourceRegionDescriptor>,
176 pub target_regions: Vec<TargetRegionDescriptor>,
178 pub allocated_region_ids: Vec<RegionId>,
180 pub pending_deallocate_region_ids: Vec<RegionId>,
182 pub transition_map: Vec<Vec<usize>>,
185 #[serde(default)]
187 pub original_target_routes: Vec<RegionRoute>,
188}
189
190impl RepartitionPlanEntry {
191 pub(crate) fn allocate_regions(&self) -> Vec<&TargetRegionDescriptor> {
193 self.target_regions
194 .iter()
195 .filter(|r| self.allocated_region_ids.contains(&r.region_id))
196 .collect()
197 }
198}
199
200pub fn convert_allocation_plan_to_repartition_plan(
217 table_id: TableId,
218 next_region_number: &mut RegionNumber,
219 AllocationPlanEntry {
220 group_id,
221 source_regions,
222 target_partition_exprs,
223 transition_map,
224 ..
225 }: &AllocationPlanEntry,
226) -> RepartitionPlanEntry {
227 match source_regions.len().cmp(&target_partition_exprs.len()) {
228 Ordering::Less => {
229 let pending_allocate_target_partition_exprs = target_partition_exprs
231 .iter()
232 .skip(source_regions.len())
233 .map(|target_partition_expr| {
234 let desc = TargetRegionDescriptor {
235 region_id: RegionId::new(table_id, *next_region_number),
236 partition_expr: target_partition_expr.clone(),
237 };
238 *next_region_number += 1;
239 desc
240 })
241 .collect::<Vec<_>>();
242
243 let allocated_region_ids = pending_allocate_target_partition_exprs
244 .iter()
245 .map(|rd| rd.region_id)
246 .collect::<Vec<_>>();
247
248 let target_regions = source_regions
249 .iter()
250 .zip(target_partition_exprs.iter())
251 .map(
252 |(source_region, target_partition_expr)| TargetRegionDescriptor {
253 region_id: source_region.region_id(),
254 partition_expr: target_partition_expr.clone(),
255 },
256 )
257 .chain(pending_allocate_target_partition_exprs)
258 .collect::<Vec<_>>();
259
260 RepartitionPlanEntry {
261 group_id: *group_id,
262 source_regions: source_regions.clone(),
263 target_regions,
264 allocated_region_ids,
265 pending_deallocate_region_ids: vec![],
266 transition_map: transition_map.clone(),
267 original_target_routes: vec![],
268 }
269 }
270 Ordering::Equal => {
271 let target_regions = source_regions
272 .iter()
273 .zip(target_partition_exprs.iter())
274 .map(
275 |(source_region, target_partition_expr)| TargetRegionDescriptor {
276 region_id: source_region.region_id(),
277 partition_expr: target_partition_expr.clone(),
278 },
279 )
280 .collect::<Vec<_>>();
281
282 RepartitionPlanEntry {
283 group_id: *group_id,
284 source_regions: source_regions.clone(),
285 target_regions,
286 allocated_region_ids: vec![],
287 pending_deallocate_region_ids: vec![],
288 transition_map: transition_map.clone(),
289 original_target_routes: vec![],
290 }
291 }
292 Ordering::Greater => {
293 let target_regions = source_regions
295 .iter()
296 .take(target_partition_exprs.len())
297 .zip(target_partition_exprs.iter())
298 .map(
299 |(source_region, target_partition_expr)| TargetRegionDescriptor {
300 region_id: source_region.region_id(),
301 partition_expr: target_partition_expr.clone(),
302 },
303 )
304 .collect::<Vec<_>>();
305
306 let pending_deallocate_region_ids = source_regions
307 .iter()
308 .skip(target_partition_exprs.len())
309 .map(|source_region| source_region.region_id())
310 .collect::<Vec<_>>();
311
312 RepartitionPlanEntry {
313 group_id: *group_id,
314 source_regions: source_regions.clone(),
315 target_regions,
316 allocated_region_ids: vec![],
317 pending_deallocate_region_ids,
318 transition_map: transition_map.clone(),
319 original_target_routes: vec![],
320 }
321 }
322 }
323}
324
325#[cfg(test)]
326mod tests {
327 use store_api::storage::RegionId;
328 use uuid::Uuid;
329
330 use super::*;
331 use crate::procedure::repartition::test_util::range_expr;
332
333 fn create_region_descriptor(
334 table_id: TableId,
335 region_number: u32,
336 col: &str,
337 start: i64,
338 end: i64,
339 ) -> SourceRegionDescriptor {
340 SourceRegionDescriptor::partitioned(
341 RegionId::new(table_id, region_number),
342 range_expr(col, start, end),
343 )
344 }
345
346 #[test]
347 fn test_source_region_descriptor_deserializes_legacy_partitioned_shape() {
348 let table_id = 1024;
349 let region_id = RegionId::new(table_id, 1);
350 let partition_expr = range_expr("x", 0, 100);
351 let legacy_json = serde_json::json!({
352 "region_id": region_id,
353 "partition_expr": partition_expr,
354 });
355
356 let descriptor: SourceRegionDescriptor = serde_json::from_value(legacy_json).unwrap();
357
358 assert_eq!(
359 descriptor,
360 SourceRegionDescriptor::partitioned(region_id, partition_expr)
361 );
362 }
363
364 #[test]
365 fn test_source_region_descriptor_rejects_legacy_default_shape() {
366 let region_id = RegionId::new(1024, 1);
367 let default_json = serde_json::json!({
368 "region_id": region_id,
369 });
370
371 let err = serde_json::from_value::<SourceRegionDescriptor>(default_json).unwrap_err();
372
373 assert!(err.to_string().contains("data did not match any variant"));
374 }
375
376 #[test]
377 fn test_source_region_descriptor_roundtrip_tagged_partitioned_shape() {
378 let region_id = RegionId::new(1024, 1);
379 let partition_expr = range_expr("x", 0, 100);
380 let descriptor = SourceRegionDescriptor::partitioned(region_id, partition_expr.clone());
381
382 let value = serde_json::to_value(&descriptor).unwrap();
383 let decoded = serde_json::from_value::<SourceRegionDescriptor>(value.clone()).unwrap();
384
385 assert_eq!(
386 value,
387 serde_json::json!({
388 "Partitioned": {
389 "region_id": region_id,
390 "partition_expr": partition_expr,
391 }
392 })
393 );
394 assert_eq!(decoded, descriptor);
395 }
396
397 #[test]
398 fn test_source_region_descriptor_roundtrip_tagged_default_shape() {
399 let region_id = RegionId::new(1024, 1);
400 let descriptor = SourceRegionDescriptor::Default { region_id };
401
402 let value = serde_json::to_value(&descriptor).unwrap();
403 let decoded = serde_json::from_value::<SourceRegionDescriptor>(value.clone()).unwrap();
404
405 assert_eq!(
406 value,
407 serde_json::json!({
408 "Default": {
409 "region_id": region_id,
410 }
411 })
412 );
413 assert_eq!(decoded, descriptor);
414 }
415
416 #[test]
417 fn test_source_region_descriptor_rejects_invalid_partition_expr_shape() {
418 let region_id = RegionId::new(1024, 1);
419 let invalid_json = serde_json::json!({
420 "region_id": region_id,
421 "partition_expr": 42,
422 });
423
424 let err = serde_json::from_value::<SourceRegionDescriptor>(invalid_json).unwrap_err();
425
426 assert!(err.to_string().contains("data did not match any variant"));
427 }
428
429 #[test]
430 fn test_repartition_plan_entry_deserializes_legacy_source_regions() {
431 let group_id = Uuid::new_v4();
432 let table_id = 1024;
433 let source_region_id = RegionId::new(table_id, 1);
434 let target_region_id = RegionId::new(table_id, 2);
435 let source_partition_expr = range_expr("x", 0, 100);
436 let target_partition_expr = range_expr("x", 0, 50);
437 let legacy_json = serde_json::json!({
438 "group_id": group_id,
439 "source_regions": [{
440 "region_id": source_region_id,
441 "partition_expr": source_partition_expr,
442 }],
443 "target_regions": [{
444 "region_id": target_region_id,
445 "partition_expr": target_partition_expr,
446 }],
447 "allocated_region_ids": [target_region_id],
448 "pending_deallocate_region_ids": [],
449 "transition_map": [[0]],
450 });
451
452 let plan: RepartitionPlanEntry = serde_json::from_value(legacy_json).unwrap();
453
454 assert_eq!(plan.group_id, group_id);
455 assert_eq!(
456 plan.source_regions,
457 vec![SourceRegionDescriptor::partitioned(
458 source_region_id,
459 source_partition_expr
460 )]
461 );
462 assert_eq!(
463 plan.target_regions,
464 vec![TargetRegionDescriptor {
465 region_id: target_region_id,
466 partition_expr: target_partition_expr,
467 }]
468 );
469 assert_eq!(plan.allocated_region_ids, vec![target_region_id]);
470 assert!(plan.pending_deallocate_region_ids.is_empty());
471 assert_eq!(plan.transition_map, vec![vec![0]]);
472 assert!(plan.original_target_routes.is_empty());
473 }
474
475 #[test]
476 fn test_convert_plan_equal_regions() {
477 let group_id = Uuid::new_v4();
478 let table_id = 1024;
479 let mut next_region_number = 10;
480 let source_regions = vec![
481 create_region_descriptor(table_id, 1, "x", 0, 100),
482 create_region_descriptor(table_id, 2, "x", 100, 200),
483 ];
484 let target_partition_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 200)];
485 let allocation_plan = AllocationPlanEntry {
486 group_id,
487 source_regions: source_regions.clone(),
488 target_partition_exprs: target_partition_exprs.clone(),
489 transition_map: Vec::new(),
490 };
491 let result = convert_allocation_plan_to_repartition_plan(
492 table_id,
493 &mut next_region_number,
494 &allocation_plan,
495 );
496 assert_eq!(result.group_id, group_id);
497 assert_eq!(result.source_regions, source_regions);
498 assert_eq!(result.target_regions.len(), 2);
499 assert!(result.allocated_region_ids.is_empty());
500 assert!(result.pending_deallocate_region_ids.is_empty());
501 assert_eq!(next_region_number, 10);
503 assert_eq!(
505 result.target_regions[0].region_id,
506 RegionId::new(table_id, 1)
507 );
508 assert_eq!(
509 result.target_regions[0].partition_expr,
510 target_partition_exprs[0]
511 );
512 assert_eq!(
513 result.target_regions[1].region_id,
514 RegionId::new(table_id, 2)
515 );
516 assert_eq!(
517 result.target_regions[1].partition_expr,
518 target_partition_exprs[1]
519 );
520 }
521
522 #[test]
523 fn test_convert_plan_allocate_regions() {
524 let group_id = Uuid::new_v4();
525 let table_id = 1024;
526 let mut next_region_number = 10;
527
528 let source_regions = vec![
530 create_region_descriptor(table_id, 1, "x", 0, 100),
531 create_region_descriptor(table_id, 2, "x", 100, 200),
532 create_region_descriptor(table_id, 3, "x", 200, 300),
533 ];
534 let target_partition_exprs = vec![
535 range_expr("x", 0, 50),
536 range_expr("x", 50, 100),
537 range_expr("x", 100, 150),
538 range_expr("x", 150, 200),
539 range_expr("x", 200, 300),
540 ];
541 let allocation_plan = AllocationPlanEntry {
542 group_id,
543 source_regions: source_regions.clone(),
544 target_partition_exprs: target_partition_exprs.clone(),
545 transition_map: vec![],
546 };
547 let result = convert_allocation_plan_to_repartition_plan(
548 table_id,
549 &mut next_region_number,
550 &allocation_plan,
551 );
552 assert_eq!(result.group_id, group_id);
553 assert_eq!(result.source_regions, source_regions);
554 assert_eq!(result.target_regions.len(), 5);
555 assert_eq!(result.allocated_region_ids.len(), 2);
556 assert!(result.pending_deallocate_region_ids.is_empty());
557 assert_eq!(next_region_number, 12);
558
559 assert_eq!(
561 result.target_regions[0].region_id,
562 RegionId::new(table_id, 1)
563 );
564 assert_eq!(
565 result.target_regions[0].partition_expr,
566 target_partition_exprs[0]
567 );
568 assert_eq!(
569 result.target_regions[1].region_id,
570 RegionId::new(table_id, 2)
571 );
572 assert_eq!(
573 result.target_regions[1].partition_expr,
574 target_partition_exprs[1]
575 );
576 assert_eq!(
577 result.target_regions[2].region_id,
578 RegionId::new(table_id, 3)
579 );
580 assert_eq!(
581 result.target_regions[2].partition_expr,
582 target_partition_exprs[2]
583 );
584
585 assert_eq!(
587 result.target_regions[3].region_id,
588 RegionId::new(table_id, 10)
589 );
590 assert_eq!(
591 result.target_regions[3].partition_expr,
592 target_partition_exprs[3]
593 );
594 assert_eq!(
595 result.target_regions[4].region_id,
596 RegionId::new(table_id, 11)
597 );
598 assert_eq!(
599 result.target_regions[4].partition_expr,
600 target_partition_exprs[4]
601 );
602
603 assert_eq!(result.allocated_region_ids[0], RegionId::new(table_id, 10));
605 assert_eq!(result.allocated_region_ids[1], RegionId::new(table_id, 11));
606 }
607
608 #[test]
609 fn test_convert_plan_deallocate_regions() {
610 let group_id = Uuid::new_v4();
611 let table_id = 1024;
612
613 let source_regions = vec![
615 create_region_descriptor(table_id, 1, "x", 0, 50),
616 create_region_descriptor(table_id, 2, "x", 50, 100),
617 create_region_descriptor(table_id, 3, "x", 100, 150),
618 create_region_descriptor(table_id, 4, "x", 150, 200),
619 create_region_descriptor(table_id, 5, "x", 200, 300),
620 ];
621 let target_partition_exprs = vec![
622 range_expr("x", 0, 100),
623 range_expr("x", 100, 200),
624 range_expr("x", 200, 300),
625 ];
626 let allocation_plan = AllocationPlanEntry {
627 group_id,
628 source_regions: source_regions.clone(),
629 target_partition_exprs: target_partition_exprs.clone(),
630 transition_map: vec![],
631 };
632 let mut next_region_number = 10;
633 let result = convert_allocation_plan_to_repartition_plan(
634 table_id,
635 &mut next_region_number,
636 &allocation_plan,
637 );
638 assert_eq!(next_region_number, 10);
639 assert_eq!(result.group_id, group_id);
640 assert_eq!(result.source_regions, source_regions);
641 assert_eq!(result.target_regions.len(), 3);
642 assert!(result.allocated_region_ids.is_empty());
643 assert_eq!(result.pending_deallocate_region_ids.len(), 2);
644
645 assert_eq!(
647 result.target_regions[0].region_id,
648 RegionId::new(table_id, 1)
649 );
650 assert_eq!(
651 result.target_regions[0].partition_expr,
652 target_partition_exprs[0]
653 );
654 assert_eq!(
655 result.target_regions[1].region_id,
656 RegionId::new(table_id, 2)
657 );
658 assert_eq!(
659 result.target_regions[1].partition_expr,
660 target_partition_exprs[1]
661 );
662 assert_eq!(
663 result.target_regions[2].region_id,
664 RegionId::new(table_id, 3)
665 );
666 assert_eq!(
667 result.target_regions[2].partition_expr,
668 target_partition_exprs[2]
669 );
670
671 assert_eq!(
673 result.pending_deallocate_region_ids[0],
674 RegionId::new(table_id, 4)
675 );
676 assert_eq!(
677 result.pending_deallocate_region_ids[1],
678 RegionId::new(table_id, 5)
679 );
680 }
681
682 #[test]
683 fn test_convert_plan_allocate_single_region() {
684 let group_id = Uuid::new_v4();
685 let table_id = 1024;
686 let mut next_region_number = 5;
687 let source_regions = vec![create_region_descriptor(table_id, 1, "x", 0, 100)];
689 let target_partition_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)];
690 let allocation_plan = AllocationPlanEntry {
691 group_id,
692 source_regions: source_regions.clone(),
693 target_partition_exprs: target_partition_exprs.clone(),
694 transition_map: vec![],
695 };
696
697 let result = convert_allocation_plan_to_repartition_plan(
698 table_id,
699 &mut next_region_number,
700 &allocation_plan,
701 );
702 assert_eq!(result.target_regions.len(), 2);
703 assert_eq!(result.allocated_region_ids.len(), 1);
704 assert_eq!(result.pending_deallocate_region_ids.len(), 0);
705 assert_eq!(
707 result.target_regions[0].region_id,
708 RegionId::new(table_id, 1)
709 );
710 assert_eq!(
711 result.target_regions[0].partition_expr,
712 target_partition_exprs[0]
713 );
714 assert_eq!(
716 result.target_regions[1].region_id,
717 RegionId::new(table_id, 5)
718 );
719 assert_eq!(
720 result.target_regions[1].partition_expr,
721 target_partition_exprs[1]
722 );
723 assert_eq!(next_region_number, 6);
724 }
725
726 #[test]
727 fn test_convert_plan_allocate_default_source_region() {
728 let group_id = Uuid::new_v4();
729 let table_id = 1024;
730 let mut next_region_number = 5;
731 let source_regions = vec![SourceRegionDescriptor::Default {
732 region_id: RegionId::new(table_id, 1),
733 }];
734 let target_partition_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)];
735 let allocation_plan = AllocationPlanEntry {
736 group_id,
737 source_regions: source_regions.clone(),
738 target_partition_exprs: target_partition_exprs.clone(),
739 transition_map: vec![vec![0, 1]],
740 };
741
742 let result = convert_allocation_plan_to_repartition_plan(
743 table_id,
744 &mut next_region_number,
745 &allocation_plan,
746 );
747
748 assert_eq!(result.source_regions, source_regions);
749 assert_eq!(result.target_regions.len(), 2);
750 assert_eq!(
751 result.target_regions[0].region_id,
752 RegionId::new(table_id, 1)
753 );
754 assert_eq!(
755 result.target_regions[0].partition_expr,
756 target_partition_exprs[0]
757 );
758 assert_eq!(
759 result.target_regions[1].region_id,
760 RegionId::new(table_id, 5)
761 );
762 assert_eq!(
763 result.target_regions[1].partition_expr,
764 target_partition_exprs[1]
765 );
766 assert_eq!(
767 result.allocated_region_ids,
768 vec![RegionId::new(table_id, 5)]
769 );
770 assert!(result.pending_deallocate_region_ids.is_empty());
771 assert_eq!(result.transition_map, vec![vec![0, 1]]);
772 assert_eq!(next_region_number, 6);
773 }
774
775 #[test]
776 fn test_convert_plan_deallocate_to_single_region() {
777 let group_id = Uuid::new_v4();
778 let table_id = 1024;
779
780 let source_regions = vec![
782 create_region_descriptor(table_id, 1, "x", 0, 100),
783 create_region_descriptor(table_id, 2, "x", 100, 200),
784 create_region_descriptor(table_id, 3, "x", 200, 300),
785 ];
786 let target_partition_exprs = vec![range_expr("x", 0, 300)];
787 let allocation_plan = AllocationPlanEntry {
788 group_id,
789 source_regions: source_regions.clone(),
790 target_partition_exprs: target_partition_exprs.clone(),
791 transition_map: vec![],
792 };
793 let mut next_region_number = 10;
794 let result = convert_allocation_plan_to_repartition_plan(
795 table_id,
796 &mut next_region_number,
797 &allocation_plan,
798 );
799 assert_eq!(result.target_regions.len(), 1);
800 assert_eq!(result.allocated_region_ids.len(), 0);
801 assert_eq!(result.pending_deallocate_region_ids.len(), 2);
802
803 assert_eq!(
805 result.target_regions[0].region_id,
806 RegionId::new(table_id, 1)
807 );
808 assert_eq!(
809 result.target_regions[0].partition_expr,
810 target_partition_exprs[0]
811 );
812
813 assert_eq!(
815 result.pending_deallocate_region_ids[0],
816 RegionId::new(table_id, 2)
817 );
818 assert_eq!(
819 result.pending_deallocate_region_ids[1],
820 RegionId::new(table_id, 3)
821 );
822 }
823}