Skip to main content

meta_srv/procedure/repartition/
plan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16
17use common_meta::rpc::router::RegionRoute;
18use partition::expr::PartitionExpr;
19use serde::{Deserialize, Deserializer, Serialize};
20use snafu::ResultExt;
21use store_api::storage::{RegionId, RegionNumber, TableId};
22
23use crate::error::{self, Result};
24use crate::procedure::repartition::group::GroupId;
25
26/// Metadata describing a source region involved in the plan.
27///
28/// Source regions may represent either an existing partitioned region or the
29/// default region of an unpartitioned table.
30#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
31pub enum SourceRegionDescriptor {
32    /// A regular partitioned source region.
33    Partitioned {
34        /// The region id of the source region.
35        region_id: RegionId,
36        /// The partition expression of the source region.
37        partition_expr: PartitionExpr,
38    },
39    /// The default source region of an unpartitioned table.
40    Default {
41        /// The region id of the default source region.
42        region_id: RegionId,
43    },
44}
45
46impl<'de> Deserialize<'de> for SourceRegionDescriptor {
47    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
48    where
49        D: Deserializer<'de>,
50    {
51        #[derive(Deserialize)]
52        #[serde(deny_unknown_fields)]
53        struct PartitionedSourceRegionDescriptor {
54            region_id: RegionId,
55            partition_expr: PartitionExpr,
56        }
57
58        #[derive(Deserialize)]
59        #[serde(untagged)]
60        enum SourceRegionDescriptorRepr {
61            Tagged(SourceRegionDescriptorTaggedRepr),
62            Legacy(PartitionedSourceRegionDescriptor),
63        }
64
65        #[derive(Deserialize)]
66        enum SourceRegionDescriptorTaggedRepr {
67            Partitioned {
68                region_id: RegionId,
69                partition_expr: PartitionExpr,
70            },
71            Default {
72                region_id: RegionId,
73            },
74        }
75
76        match SourceRegionDescriptorRepr::deserialize(deserializer)? {
77            SourceRegionDescriptorRepr::Tagged(SourceRegionDescriptorTaggedRepr::Partitioned {
78                region_id,
79                partition_expr,
80            }) => Ok(Self::Partitioned {
81                region_id,
82                partition_expr,
83            }),
84            SourceRegionDescriptorRepr::Tagged(SourceRegionDescriptorTaggedRepr::Default {
85                region_id,
86            }) => Ok(Self::Default { region_id }),
87            SourceRegionDescriptorRepr::Legacy(descriptor) => Ok(Self::Partitioned {
88                region_id: descriptor.region_id,
89                partition_expr: descriptor.partition_expr,
90            }),
91        }
92    }
93}
94
95impl SourceRegionDescriptor {
96    /// Creates a partitioned source region descriptor.
97    pub fn partitioned(region_id: RegionId, partition_expr: PartitionExpr) -> Self {
98        Self::Partitioned {
99            region_id,
100            partition_expr,
101        }
102    }
103
104    /// Returns the region id of this source descriptor.
105    pub fn region_id(&self) -> RegionId {
106        match self {
107            Self::Partitioned { region_id, .. } => *region_id,
108            Self::Default { region_id } => *region_id,
109        }
110    }
111
112    /// Returns the partition expression if this source is partitioned.
113    pub fn partition_expr(&self) -> Option<&PartitionExpr> {
114        match self {
115            Self::Partitioned { partition_expr, .. } => Some(partition_expr),
116            Self::Default { .. } => None,
117        }
118    }
119
120    /// Returns true if this source descriptor matches the route partition expression.
121    pub fn matches_route_expr(&self, route_expr: &str) -> Result<bool> {
122        match self {
123            Self::Partitioned { partition_expr, .. } => {
124                let expected = partition_expr
125                    .as_json_str()
126                    .context(error::SerializePartitionExprSnafu)?;
127                Ok(route_expr == expected)
128            }
129            Self::Default { .. } => Ok(route_expr.is_empty()),
130        }
131    }
132
133    /// Returns the route partition expression to restore during rollback.
134    pub fn route_expr_for_rollback(&self) -> Result<String> {
135        match self {
136            Self::Partitioned { partition_expr, .. } => partition_expr
137                .as_json_str()
138                .context(error::SerializePartitionExprSnafu),
139            Self::Default { .. } => Ok(String::new()),
140        }
141    }
142}
143
144/// Metadata describing a target region involved in the plan.
145#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
146pub struct TargetRegionDescriptor {
147    /// The region id of the target region.
148    pub region_id: RegionId,
149    /// The partition expression of the target region.
150    pub partition_expr: PartitionExpr,
151}
152
153/// A plan entry for the region allocation phase, describing source regions
154/// and target partition expressions before allocation.
155#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
156pub struct AllocationPlanEntry {
157    /// The group id for this plan entry.
158    pub group_id: GroupId,
159    /// Source region descriptors involved in the plan.
160    pub source_regions: Vec<SourceRegionDescriptor>,
161    /// The target partition expressions for the new or changed regions.
162    pub target_partition_exprs: Vec<PartitionExpr>,
163    /// For each `source_regions[k]`, the corresponding vector contains global
164    /// `target_partition_exprs` that overlap with it.
165    pub transition_map: Vec<Vec<usize>>,
166}
167
168/// A plan entry for the dispatch phase after region allocation,
169/// with concrete source and target region descriptors.
170#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
171pub struct RepartitionPlanEntry {
172    /// The group id for this plan entry.
173    pub group_id: GroupId,
174    /// The source region descriptors involved in the plan.
175    pub source_regions: Vec<SourceRegionDescriptor>,
176    /// The target region descriptors involved in the plan.
177    pub target_regions: Vec<TargetRegionDescriptor>,
178    /// The region ids of the allocated regions.
179    pub allocated_region_ids: Vec<RegionId>,
180    /// The region ids of the regions that are pending deallocation.
181    pub pending_deallocate_region_ids: Vec<RegionId>,
182    /// For each `source_regions[k]`, the corresponding vector contains global
183    /// `target_regions` that overlap with it.
184    pub transition_map: Vec<Vec<usize>>,
185    /// Pre-staging target routes persisted for parent rollback and recovery.
186    #[serde(default)]
187    pub original_target_routes: Vec<RegionRoute>,
188}
189
190impl RepartitionPlanEntry {
191    /// Returns the target regions that are newly allocated.
192    pub(crate) fn allocate_regions(&self) -> Vec<&TargetRegionDescriptor> {
193        self.target_regions
194            .iter()
195            .filter(|r| self.allocated_region_ids.contains(&r.region_id))
196            .collect()
197    }
198}
199
200/// Converts an allocation plan to a repartition plan.
201///
202/// Converts an [`AllocationPlanEntry`] (which contains abstract region allocation intents)
203/// into a [`RepartitionPlanEntry`] with concrete source and target region descriptors,
204/// plus records information on which regions are newly allocated and/or pending deallocation.
205///
206/// # Returns
207///
208/// A [`Result`] containing the [`RepartitionPlanEntry`] describing exactly the source regions,
209/// the target regions (including any that need to be newly allocated), and transition mappings.
210///
211/// # Notes
212/// - If new regions are needed, their region ids are constructed using `table_id` and incrementing
213///   from `next_region_number`.
214/// - For each target, associates the correct region descriptor; for new regions, the region id
215///   is assigned sequentially.
216pub fn convert_allocation_plan_to_repartition_plan(
217    table_id: TableId,
218    next_region_number: &mut RegionNumber,
219    AllocationPlanEntry {
220        group_id,
221        source_regions,
222        target_partition_exprs,
223        transition_map,
224        ..
225    }: &AllocationPlanEntry,
226) -> RepartitionPlanEntry {
227    match source_regions.len().cmp(&target_partition_exprs.len()) {
228        Ordering::Less => {
229            // requires to allocate regions
230            let pending_allocate_target_partition_exprs = target_partition_exprs
231                .iter()
232                .skip(source_regions.len())
233                .map(|target_partition_expr| {
234                    let desc = TargetRegionDescriptor {
235                        region_id: RegionId::new(table_id, *next_region_number),
236                        partition_expr: target_partition_expr.clone(),
237                    };
238                    *next_region_number += 1;
239                    desc
240                })
241                .collect::<Vec<_>>();
242
243            let allocated_region_ids = pending_allocate_target_partition_exprs
244                .iter()
245                .map(|rd| rd.region_id)
246                .collect::<Vec<_>>();
247
248            let target_regions = source_regions
249                .iter()
250                .zip(target_partition_exprs.iter())
251                .map(
252                    |(source_region, target_partition_expr)| TargetRegionDescriptor {
253                        region_id: source_region.region_id(),
254                        partition_expr: target_partition_expr.clone(),
255                    },
256                )
257                .chain(pending_allocate_target_partition_exprs)
258                .collect::<Vec<_>>();
259
260            RepartitionPlanEntry {
261                group_id: *group_id,
262                source_regions: source_regions.clone(),
263                target_regions,
264                allocated_region_ids,
265                pending_deallocate_region_ids: vec![],
266                transition_map: transition_map.clone(),
267                original_target_routes: vec![],
268            }
269        }
270        Ordering::Equal => {
271            let target_regions = source_regions
272                .iter()
273                .zip(target_partition_exprs.iter())
274                .map(
275                    |(source_region, target_partition_expr)| TargetRegionDescriptor {
276                        region_id: source_region.region_id(),
277                        partition_expr: target_partition_expr.clone(),
278                    },
279                )
280                .collect::<Vec<_>>();
281
282            RepartitionPlanEntry {
283                group_id: *group_id,
284                source_regions: source_regions.clone(),
285                target_regions,
286                allocated_region_ids: vec![],
287                pending_deallocate_region_ids: vec![],
288                transition_map: transition_map.clone(),
289                original_target_routes: vec![],
290            }
291        }
292        Ordering::Greater => {
293            // requires to deallocate regions
294            let target_regions = source_regions
295                .iter()
296                .take(target_partition_exprs.len())
297                .zip(target_partition_exprs.iter())
298                .map(
299                    |(source_region, target_partition_expr)| TargetRegionDescriptor {
300                        region_id: source_region.region_id(),
301                        partition_expr: target_partition_expr.clone(),
302                    },
303                )
304                .collect::<Vec<_>>();
305
306            let pending_deallocate_region_ids = source_regions
307                .iter()
308                .skip(target_partition_exprs.len())
309                .map(|source_region| source_region.region_id())
310                .collect::<Vec<_>>();
311
312            RepartitionPlanEntry {
313                group_id: *group_id,
314                source_regions: source_regions.clone(),
315                target_regions,
316                allocated_region_ids: vec![],
317                pending_deallocate_region_ids,
318                transition_map: transition_map.clone(),
319                original_target_routes: vec![],
320            }
321        }
322    }
323}
324
325#[cfg(test)]
326mod tests {
327    use store_api::storage::RegionId;
328    use uuid::Uuid;
329
330    use super::*;
331    use crate::procedure::repartition::test_util::range_expr;
332
333    fn create_region_descriptor(
334        table_id: TableId,
335        region_number: u32,
336        col: &str,
337        start: i64,
338        end: i64,
339    ) -> SourceRegionDescriptor {
340        SourceRegionDescriptor::partitioned(
341            RegionId::new(table_id, region_number),
342            range_expr(col, start, end),
343        )
344    }
345
346    #[test]
347    fn test_source_region_descriptor_deserializes_legacy_partitioned_shape() {
348        let table_id = 1024;
349        let region_id = RegionId::new(table_id, 1);
350        let partition_expr = range_expr("x", 0, 100);
351        let legacy_json = serde_json::json!({
352            "region_id": region_id,
353            "partition_expr": partition_expr,
354        });
355
356        let descriptor: SourceRegionDescriptor = serde_json::from_value(legacy_json).unwrap();
357
358        assert_eq!(
359            descriptor,
360            SourceRegionDescriptor::partitioned(region_id, partition_expr)
361        );
362    }
363
364    #[test]
365    fn test_source_region_descriptor_rejects_legacy_default_shape() {
366        let region_id = RegionId::new(1024, 1);
367        let default_json = serde_json::json!({
368            "region_id": region_id,
369        });
370
371        let err = serde_json::from_value::<SourceRegionDescriptor>(default_json).unwrap_err();
372
373        assert!(err.to_string().contains("data did not match any variant"));
374    }
375
376    #[test]
377    fn test_source_region_descriptor_roundtrip_tagged_partitioned_shape() {
378        let region_id = RegionId::new(1024, 1);
379        let partition_expr = range_expr("x", 0, 100);
380        let descriptor = SourceRegionDescriptor::partitioned(region_id, partition_expr.clone());
381
382        let value = serde_json::to_value(&descriptor).unwrap();
383        let decoded = serde_json::from_value::<SourceRegionDescriptor>(value.clone()).unwrap();
384
385        assert_eq!(
386            value,
387            serde_json::json!({
388                "Partitioned": {
389                    "region_id": region_id,
390                    "partition_expr": partition_expr,
391                }
392            })
393        );
394        assert_eq!(decoded, descriptor);
395    }
396
397    #[test]
398    fn test_source_region_descriptor_roundtrip_tagged_default_shape() {
399        let region_id = RegionId::new(1024, 1);
400        let descriptor = SourceRegionDescriptor::Default { region_id };
401
402        let value = serde_json::to_value(&descriptor).unwrap();
403        let decoded = serde_json::from_value::<SourceRegionDescriptor>(value.clone()).unwrap();
404
405        assert_eq!(
406            value,
407            serde_json::json!({
408                "Default": {
409                    "region_id": region_id,
410                }
411            })
412        );
413        assert_eq!(decoded, descriptor);
414    }
415
416    #[test]
417    fn test_source_region_descriptor_rejects_invalid_partition_expr_shape() {
418        let region_id = RegionId::new(1024, 1);
419        let invalid_json = serde_json::json!({
420            "region_id": region_id,
421            "partition_expr": 42,
422        });
423
424        let err = serde_json::from_value::<SourceRegionDescriptor>(invalid_json).unwrap_err();
425
426        assert!(err.to_string().contains("data did not match any variant"));
427    }
428
429    #[test]
430    fn test_repartition_plan_entry_deserializes_legacy_source_regions() {
431        let group_id = Uuid::new_v4();
432        let table_id = 1024;
433        let source_region_id = RegionId::new(table_id, 1);
434        let target_region_id = RegionId::new(table_id, 2);
435        let source_partition_expr = range_expr("x", 0, 100);
436        let target_partition_expr = range_expr("x", 0, 50);
437        let legacy_json = serde_json::json!({
438            "group_id": group_id,
439            "source_regions": [{
440                "region_id": source_region_id,
441                "partition_expr": source_partition_expr,
442            }],
443            "target_regions": [{
444                "region_id": target_region_id,
445                "partition_expr": target_partition_expr,
446            }],
447            "allocated_region_ids": [target_region_id],
448            "pending_deallocate_region_ids": [],
449            "transition_map": [[0]],
450        });
451
452        let plan: RepartitionPlanEntry = serde_json::from_value(legacy_json).unwrap();
453
454        assert_eq!(plan.group_id, group_id);
455        assert_eq!(
456            plan.source_regions,
457            vec![SourceRegionDescriptor::partitioned(
458                source_region_id,
459                source_partition_expr
460            )]
461        );
462        assert_eq!(
463            plan.target_regions,
464            vec![TargetRegionDescriptor {
465                region_id: target_region_id,
466                partition_expr: target_partition_expr,
467            }]
468        );
469        assert_eq!(plan.allocated_region_ids, vec![target_region_id]);
470        assert!(plan.pending_deallocate_region_ids.is_empty());
471        assert_eq!(plan.transition_map, vec![vec![0]]);
472        assert!(plan.original_target_routes.is_empty());
473    }
474
475    #[test]
476    fn test_convert_plan_equal_regions() {
477        let group_id = Uuid::new_v4();
478        let table_id = 1024;
479        let mut next_region_number = 10;
480        let source_regions = vec![
481            create_region_descriptor(table_id, 1, "x", 0, 100),
482            create_region_descriptor(table_id, 2, "x", 100, 200),
483        ];
484        let target_partition_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 200)];
485        let allocation_plan = AllocationPlanEntry {
486            group_id,
487            source_regions: source_regions.clone(),
488            target_partition_exprs: target_partition_exprs.clone(),
489            transition_map: Vec::new(),
490        };
491        let result = convert_allocation_plan_to_repartition_plan(
492            table_id,
493            &mut next_region_number,
494            &allocation_plan,
495        );
496        assert_eq!(result.group_id, group_id);
497        assert_eq!(result.source_regions, source_regions);
498        assert_eq!(result.target_regions.len(), 2);
499        assert!(result.allocated_region_ids.is_empty());
500        assert!(result.pending_deallocate_region_ids.is_empty());
501        // next_region_number should not change for equal regions
502        assert_eq!(next_region_number, 10);
503        // Verify target regions
504        assert_eq!(
505            result.target_regions[0].region_id,
506            RegionId::new(table_id, 1)
507        );
508        assert_eq!(
509            result.target_regions[0].partition_expr,
510            target_partition_exprs[0]
511        );
512        assert_eq!(
513            result.target_regions[1].region_id,
514            RegionId::new(table_id, 2)
515        );
516        assert_eq!(
517            result.target_regions[1].partition_expr,
518            target_partition_exprs[1]
519        );
520    }
521
522    #[test]
523    fn test_convert_plan_allocate_regions() {
524        let group_id = Uuid::new_v4();
525        let table_id = 1024;
526        let mut next_region_number = 10;
527
528        // 3 source regions -> 5 target partition expressions
529        let source_regions = vec![
530            create_region_descriptor(table_id, 1, "x", 0, 100),
531            create_region_descriptor(table_id, 2, "x", 100, 200),
532            create_region_descriptor(table_id, 3, "x", 200, 300),
533        ];
534        let target_partition_exprs = vec![
535            range_expr("x", 0, 50),
536            range_expr("x", 50, 100),
537            range_expr("x", 100, 150),
538            range_expr("x", 150, 200),
539            range_expr("x", 200, 300),
540        ];
541        let allocation_plan = AllocationPlanEntry {
542            group_id,
543            source_regions: source_regions.clone(),
544            target_partition_exprs: target_partition_exprs.clone(),
545            transition_map: vec![],
546        };
547        let result = convert_allocation_plan_to_repartition_plan(
548            table_id,
549            &mut next_region_number,
550            &allocation_plan,
551        );
552        assert_eq!(result.group_id, group_id);
553        assert_eq!(result.source_regions, source_regions);
554        assert_eq!(result.target_regions.len(), 5);
555        assert_eq!(result.allocated_region_ids.len(), 2);
556        assert!(result.pending_deallocate_region_ids.is_empty());
557        assert_eq!(next_region_number, 12);
558
559        // Verify first 3 target regions use source region ids with target partition exprs
560        assert_eq!(
561            result.target_regions[0].region_id,
562            RegionId::new(table_id, 1)
563        );
564        assert_eq!(
565            result.target_regions[0].partition_expr,
566            target_partition_exprs[0]
567        );
568        assert_eq!(
569            result.target_regions[1].region_id,
570            RegionId::new(table_id, 2)
571        );
572        assert_eq!(
573            result.target_regions[1].partition_expr,
574            target_partition_exprs[1]
575        );
576        assert_eq!(
577            result.target_regions[2].region_id,
578            RegionId::new(table_id, 3)
579        );
580        assert_eq!(
581            result.target_regions[2].partition_expr,
582            target_partition_exprs[2]
583        );
584
585        // Verify last 2 target regions are newly allocated
586        assert_eq!(
587            result.target_regions[3].region_id,
588            RegionId::new(table_id, 10)
589        );
590        assert_eq!(
591            result.target_regions[3].partition_expr,
592            target_partition_exprs[3]
593        );
594        assert_eq!(
595            result.target_regions[4].region_id,
596            RegionId::new(table_id, 11)
597        );
598        assert_eq!(
599            result.target_regions[4].partition_expr,
600            target_partition_exprs[4]
601        );
602
603        // Verify allocated region ids
604        assert_eq!(result.allocated_region_ids[0], RegionId::new(table_id, 10));
605        assert_eq!(result.allocated_region_ids[1], RegionId::new(table_id, 11));
606    }
607
608    #[test]
609    fn test_convert_plan_deallocate_regions() {
610        let group_id = Uuid::new_v4();
611        let table_id = 1024;
612
613        // 5 source regions -> 3 target partition expressions
614        let source_regions = vec![
615            create_region_descriptor(table_id, 1, "x", 0, 50),
616            create_region_descriptor(table_id, 2, "x", 50, 100),
617            create_region_descriptor(table_id, 3, "x", 100, 150),
618            create_region_descriptor(table_id, 4, "x", 150, 200),
619            create_region_descriptor(table_id, 5, "x", 200, 300),
620        ];
621        let target_partition_exprs = vec![
622            range_expr("x", 0, 100),
623            range_expr("x", 100, 200),
624            range_expr("x", 200, 300),
625        ];
626        let allocation_plan = AllocationPlanEntry {
627            group_id,
628            source_regions: source_regions.clone(),
629            target_partition_exprs: target_partition_exprs.clone(),
630            transition_map: vec![],
631        };
632        let mut next_region_number = 10;
633        let result = convert_allocation_plan_to_repartition_plan(
634            table_id,
635            &mut next_region_number,
636            &allocation_plan,
637        );
638        assert_eq!(next_region_number, 10);
639        assert_eq!(result.group_id, group_id);
640        assert_eq!(result.source_regions, source_regions);
641        assert_eq!(result.target_regions.len(), 3);
642        assert!(result.allocated_region_ids.is_empty());
643        assert_eq!(result.pending_deallocate_region_ids.len(), 2);
644
645        // Verify first 3 source regions are kept as target regions with new partition exprs
646        assert_eq!(
647            result.target_regions[0].region_id,
648            RegionId::new(table_id, 1)
649        );
650        assert_eq!(
651            result.target_regions[0].partition_expr,
652            target_partition_exprs[0]
653        );
654        assert_eq!(
655            result.target_regions[1].region_id,
656            RegionId::new(table_id, 2)
657        );
658        assert_eq!(
659            result.target_regions[1].partition_expr,
660            target_partition_exprs[1]
661        );
662        assert_eq!(
663            result.target_regions[2].region_id,
664            RegionId::new(table_id, 3)
665        );
666        assert_eq!(
667            result.target_regions[2].partition_expr,
668            target_partition_exprs[2]
669        );
670
671        // Verify last 2 source regions are pending deallocation
672        assert_eq!(
673            result.pending_deallocate_region_ids[0],
674            RegionId::new(table_id, 4)
675        );
676        assert_eq!(
677            result.pending_deallocate_region_ids[1],
678            RegionId::new(table_id, 5)
679        );
680    }
681
682    #[test]
683    fn test_convert_plan_allocate_single_region() {
684        let group_id = Uuid::new_v4();
685        let table_id = 1024;
686        let mut next_region_number = 5;
687        // 1 source region -> 2 target partition expressions
688        let source_regions = vec![create_region_descriptor(table_id, 1, "x", 0, 100)];
689        let target_partition_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)];
690        let allocation_plan = AllocationPlanEntry {
691            group_id,
692            source_regions: source_regions.clone(),
693            target_partition_exprs: target_partition_exprs.clone(),
694            transition_map: vec![],
695        };
696
697        let result = convert_allocation_plan_to_repartition_plan(
698            table_id,
699            &mut next_region_number,
700            &allocation_plan,
701        );
702        assert_eq!(result.target_regions.len(), 2);
703        assert_eq!(result.allocated_region_ids.len(), 1);
704        assert_eq!(result.pending_deallocate_region_ids.len(), 0);
705        // First target uses source region id
706        assert_eq!(
707            result.target_regions[0].region_id,
708            RegionId::new(table_id, 1)
709        );
710        assert_eq!(
711            result.target_regions[0].partition_expr,
712            target_partition_exprs[0]
713        );
714        // Second target is newly allocated
715        assert_eq!(
716            result.target_regions[1].region_id,
717            RegionId::new(table_id, 5)
718        );
719        assert_eq!(
720            result.target_regions[1].partition_expr,
721            target_partition_exprs[1]
722        );
723        assert_eq!(next_region_number, 6);
724    }
725
726    #[test]
727    fn test_convert_plan_allocate_default_source_region() {
728        let group_id = Uuid::new_v4();
729        let table_id = 1024;
730        let mut next_region_number = 5;
731        let source_regions = vec![SourceRegionDescriptor::Default {
732            region_id: RegionId::new(table_id, 1),
733        }];
734        let target_partition_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)];
735        let allocation_plan = AllocationPlanEntry {
736            group_id,
737            source_regions: source_regions.clone(),
738            target_partition_exprs: target_partition_exprs.clone(),
739            transition_map: vec![vec![0, 1]],
740        };
741
742        let result = convert_allocation_plan_to_repartition_plan(
743            table_id,
744            &mut next_region_number,
745            &allocation_plan,
746        );
747
748        assert_eq!(result.source_regions, source_regions);
749        assert_eq!(result.target_regions.len(), 2);
750        assert_eq!(
751            result.target_regions[0].region_id,
752            RegionId::new(table_id, 1)
753        );
754        assert_eq!(
755            result.target_regions[0].partition_expr,
756            target_partition_exprs[0]
757        );
758        assert_eq!(
759            result.target_regions[1].region_id,
760            RegionId::new(table_id, 5)
761        );
762        assert_eq!(
763            result.target_regions[1].partition_expr,
764            target_partition_exprs[1]
765        );
766        assert_eq!(
767            result.allocated_region_ids,
768            vec![RegionId::new(table_id, 5)]
769        );
770        assert!(result.pending_deallocate_region_ids.is_empty());
771        assert_eq!(result.transition_map, vec![vec![0, 1]]);
772        assert_eq!(next_region_number, 6);
773    }
774
775    #[test]
776    fn test_convert_plan_deallocate_to_single_region() {
777        let group_id = Uuid::new_v4();
778        let table_id = 1024;
779
780        // 3 source regions -> 1 target partition expression
781        let source_regions = vec![
782            create_region_descriptor(table_id, 1, "x", 0, 100),
783            create_region_descriptor(table_id, 2, "x", 100, 200),
784            create_region_descriptor(table_id, 3, "x", 200, 300),
785        ];
786        let target_partition_exprs = vec![range_expr("x", 0, 300)];
787        let allocation_plan = AllocationPlanEntry {
788            group_id,
789            source_regions: source_regions.clone(),
790            target_partition_exprs: target_partition_exprs.clone(),
791            transition_map: vec![],
792        };
793        let mut next_region_number = 10;
794        let result = convert_allocation_plan_to_repartition_plan(
795            table_id,
796            &mut next_region_number,
797            &allocation_plan,
798        );
799        assert_eq!(result.target_regions.len(), 1);
800        assert_eq!(result.allocated_region_ids.len(), 0);
801        assert_eq!(result.pending_deallocate_region_ids.len(), 2);
802
803        // Only first source region is kept
804        assert_eq!(
805            result.target_regions[0].region_id,
806            RegionId::new(table_id, 1)
807        );
808        assert_eq!(
809            result.target_regions[0].partition_expr,
810            target_partition_exprs[0]
811        );
812
813        // Other regions are pending deallocation
814        assert_eq!(
815            result.pending_deallocate_region_ids[0],
816            RegionId::new(table_id, 2)
817        );
818        assert_eq!(
819            result.pending_deallocate_region_ids[1],
820            RegionId::new(table_id, 3)
821        );
822    }
823}