Skip to main content

meta_srv/procedure/repartition/
plan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16
17use common_meta::rpc::router::RegionRoute;
18use partition::expr::PartitionExpr;
19use serde::{Deserialize, Serialize};
20use store_api::storage::{RegionId, RegionNumber, TableId};
21
22use crate::procedure::repartition::group::GroupId;
23
24/// Metadata describing a region involved in the plan.
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
26pub struct RegionDescriptor {
27    /// The region id of the region involved in the plan.
28    pub region_id: RegionId,
29    /// The partition expression of the region.
30    pub partition_expr: PartitionExpr,
31}
32
33/// A plan entry for the region allocation phase, describing source regions
34/// and target partition expressions before allocation.
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
36pub struct AllocationPlanEntry {
37    /// The group id for this plan entry.
38    pub group_id: GroupId,
39    /// Source region descriptors involved in the plan.
40    pub source_regions: Vec<RegionDescriptor>,
41    /// The target partition expressions for the new or changed regions.
42    pub target_partition_exprs: Vec<PartitionExpr>,
43    /// For each `source_regions[k]`, the corresponding vector contains global
44    /// `target_partition_exprs` that overlap with it.
45    pub transition_map: Vec<Vec<usize>>,
46}
47
48/// A plan entry for the dispatch phase after region allocation,
49/// with concrete source and target region descriptors.
50#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
51pub struct RepartitionPlanEntry {
52    /// The group id for this plan entry.
53    pub group_id: GroupId,
54    /// The source region descriptors involved in the plan.
55    pub source_regions: Vec<RegionDescriptor>,
56    /// The target region descriptors involved in the plan.
57    pub target_regions: Vec<RegionDescriptor>,
58    /// The region ids of the allocated regions.
59    pub allocated_region_ids: Vec<RegionId>,
60    /// The region ids of the regions that are pending deallocation.
61    pub pending_deallocate_region_ids: Vec<RegionId>,
62    /// For each `source_regions[k]`, the corresponding vector contains global
63    /// `target_regions` that overlap with it.
64    pub transition_map: Vec<Vec<usize>>,
65    /// Pre-staging target routes persisted for parent rollback and recovery.
66    #[serde(default)]
67    pub original_target_routes: Vec<RegionRoute>,
68}
69
70impl RepartitionPlanEntry {
71    /// Returns the target regions that are newly allocated.
72    pub(crate) fn allocate_regions(&self) -> Vec<&RegionDescriptor> {
73        self.target_regions
74            .iter()
75            .filter(|r| self.allocated_region_ids.contains(&r.region_id))
76            .collect()
77    }
78}
79
80/// Converts an allocation plan to a repartition plan.
81///
82/// Converts an [`AllocationPlanEntry`] (which contains abstract region allocation intents)
83/// into a [`RepartitionPlanEntry`] with concrete source and target region descriptors,
84/// plus records information on which regions are newly allocated and/or pending deallocation.
85///
86/// # Returns
87///
88/// A [`Result`] containing the [`RepartitionPlanEntry`] describing exactly the source regions,
89/// the target regions (including any that need to be newly allocated), and transition mappings.
90///
91/// # Notes
92/// - If new regions are needed, their region ids are constructed using `table_id` and incrementing
93///   from `next_region_number`.
94/// - For each target, associates the correct region descriptor; for new regions, the region id
95///   is assigned sequentially.
96pub fn convert_allocation_plan_to_repartition_plan(
97    table_id: TableId,
98    next_region_number: &mut RegionNumber,
99    AllocationPlanEntry {
100        group_id,
101        source_regions,
102        target_partition_exprs,
103        transition_map,
104        ..
105    }: &AllocationPlanEntry,
106) -> RepartitionPlanEntry {
107    match source_regions.len().cmp(&target_partition_exprs.len()) {
108        Ordering::Less => {
109            // requires to allocate regions
110            let pending_allocate_target_partition_exprs = target_partition_exprs
111                .iter()
112                .skip(source_regions.len())
113                .map(|target_partition_expr| {
114                    let desc = RegionDescriptor {
115                        region_id: RegionId::new(table_id, *next_region_number),
116                        partition_expr: target_partition_expr.clone(),
117                    };
118                    *next_region_number += 1;
119                    desc
120                })
121                .collect::<Vec<_>>();
122
123            let allocated_region_ids = pending_allocate_target_partition_exprs
124                .iter()
125                .map(|rd| rd.region_id)
126                .collect::<Vec<_>>();
127
128            let target_regions = source_regions
129                .iter()
130                .zip(target_partition_exprs.iter())
131                .map(|(source_region, target_partition_expr)| RegionDescriptor {
132                    region_id: source_region.region_id,
133                    partition_expr: target_partition_expr.clone(),
134                })
135                .chain(pending_allocate_target_partition_exprs)
136                .collect::<Vec<_>>();
137
138            RepartitionPlanEntry {
139                group_id: *group_id,
140                source_regions: source_regions.clone(),
141                target_regions,
142                allocated_region_ids,
143                pending_deallocate_region_ids: vec![],
144                transition_map: transition_map.clone(),
145                original_target_routes: vec![],
146            }
147        }
148        Ordering::Equal => {
149            let target_regions = source_regions
150                .iter()
151                .zip(target_partition_exprs.iter())
152                .map(|(source_region, target_partition_expr)| RegionDescriptor {
153                    region_id: source_region.region_id,
154                    partition_expr: target_partition_expr.clone(),
155                })
156                .collect::<Vec<_>>();
157
158            RepartitionPlanEntry {
159                group_id: *group_id,
160                source_regions: source_regions.clone(),
161                target_regions,
162                allocated_region_ids: vec![],
163                pending_deallocate_region_ids: vec![],
164                transition_map: transition_map.clone(),
165                original_target_routes: vec![],
166            }
167        }
168        Ordering::Greater => {
169            // requires to deallocate regions
170            let target_regions = source_regions
171                .iter()
172                .take(target_partition_exprs.len())
173                .zip(target_partition_exprs.iter())
174                .map(|(source_region, target_partition_expr)| RegionDescriptor {
175                    region_id: source_region.region_id,
176                    partition_expr: target_partition_expr.clone(),
177                })
178                .collect::<Vec<_>>();
179
180            let pending_deallocate_region_ids = source_regions
181                .iter()
182                .skip(target_partition_exprs.len())
183                .map(|source_region| source_region.region_id)
184                .collect::<Vec<_>>();
185
186            RepartitionPlanEntry {
187                group_id: *group_id,
188                source_regions: source_regions.clone(),
189                target_regions,
190                allocated_region_ids: vec![],
191                pending_deallocate_region_ids,
192                transition_map: transition_map.clone(),
193                original_target_routes: vec![],
194            }
195        }
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use store_api::storage::RegionId;
202    use uuid::Uuid;
203
204    use super::*;
205    use crate::procedure::repartition::test_util::range_expr;
206
207    fn create_region_descriptor(
208        table_id: TableId,
209        region_number: u32,
210        col: &str,
211        start: i64,
212        end: i64,
213    ) -> RegionDescriptor {
214        RegionDescriptor {
215            region_id: RegionId::new(table_id, region_number),
216            partition_expr: range_expr(col, start, end),
217        }
218    }
219
220    #[test]
221    fn test_convert_plan_equal_regions() {
222        let group_id = Uuid::new_v4();
223        let table_id = 1024;
224        let mut next_region_number = 10;
225        let source_regions = vec![
226            create_region_descriptor(table_id, 1, "x", 0, 100),
227            create_region_descriptor(table_id, 2, "x", 100, 200),
228        ];
229        let target_partition_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 200)];
230        let allocation_plan = AllocationPlanEntry {
231            group_id,
232            source_regions: source_regions.clone(),
233            target_partition_exprs: target_partition_exprs.clone(),
234            transition_map: Vec::new(),
235        };
236        let result = convert_allocation_plan_to_repartition_plan(
237            table_id,
238            &mut next_region_number,
239            &allocation_plan,
240        );
241        assert_eq!(result.group_id, group_id);
242        assert_eq!(result.source_regions, source_regions);
243        assert_eq!(result.target_regions.len(), 2);
244        assert!(result.allocated_region_ids.is_empty());
245        assert!(result.pending_deallocate_region_ids.is_empty());
246        // next_region_number should not change for equal regions
247        assert_eq!(next_region_number, 10);
248        // Verify target regions
249        assert_eq!(
250            result.target_regions[0].region_id,
251            RegionId::new(table_id, 1)
252        );
253        assert_eq!(
254            result.target_regions[0].partition_expr,
255            target_partition_exprs[0]
256        );
257        assert_eq!(
258            result.target_regions[1].region_id,
259            RegionId::new(table_id, 2)
260        );
261        assert_eq!(
262            result.target_regions[1].partition_expr,
263            target_partition_exprs[1]
264        );
265    }
266
267    #[test]
268    fn test_convert_plan_allocate_regions() {
269        let group_id = Uuid::new_v4();
270        let table_id = 1024;
271        let mut next_region_number = 10;
272
273        // 3 source regions -> 5 target partition expressions
274        let source_regions = vec![
275            create_region_descriptor(table_id, 1, "x", 0, 100),
276            create_region_descriptor(table_id, 2, "x", 100, 200),
277            create_region_descriptor(table_id, 3, "x", 200, 300),
278        ];
279        let target_partition_exprs = vec![
280            range_expr("x", 0, 50),
281            range_expr("x", 50, 100),
282            range_expr("x", 100, 150),
283            range_expr("x", 150, 200),
284            range_expr("x", 200, 300),
285        ];
286        let allocation_plan = AllocationPlanEntry {
287            group_id,
288            source_regions: source_regions.clone(),
289            target_partition_exprs: target_partition_exprs.clone(),
290            transition_map: vec![],
291        };
292        let result = convert_allocation_plan_to_repartition_plan(
293            table_id,
294            &mut next_region_number,
295            &allocation_plan,
296        );
297        assert_eq!(result.group_id, group_id);
298        assert_eq!(result.source_regions, source_regions);
299        assert_eq!(result.target_regions.len(), 5);
300        assert_eq!(result.allocated_region_ids.len(), 2);
301        assert!(result.pending_deallocate_region_ids.is_empty());
302        assert_eq!(next_region_number, 12);
303
304        // Verify first 3 target regions use source region ids with target partition exprs
305        assert_eq!(
306            result.target_regions[0].region_id,
307            RegionId::new(table_id, 1)
308        );
309        assert_eq!(
310            result.target_regions[0].partition_expr,
311            target_partition_exprs[0]
312        );
313        assert_eq!(
314            result.target_regions[1].region_id,
315            RegionId::new(table_id, 2)
316        );
317        assert_eq!(
318            result.target_regions[1].partition_expr,
319            target_partition_exprs[1]
320        );
321        assert_eq!(
322            result.target_regions[2].region_id,
323            RegionId::new(table_id, 3)
324        );
325        assert_eq!(
326            result.target_regions[2].partition_expr,
327            target_partition_exprs[2]
328        );
329
330        // Verify last 2 target regions are newly allocated
331        assert_eq!(
332            result.target_regions[3].region_id,
333            RegionId::new(table_id, 10)
334        );
335        assert_eq!(
336            result.target_regions[3].partition_expr,
337            target_partition_exprs[3]
338        );
339        assert_eq!(
340            result.target_regions[4].region_id,
341            RegionId::new(table_id, 11)
342        );
343        assert_eq!(
344            result.target_regions[4].partition_expr,
345            target_partition_exprs[4]
346        );
347
348        // Verify allocated region ids
349        assert_eq!(result.allocated_region_ids[0], RegionId::new(table_id, 10));
350        assert_eq!(result.allocated_region_ids[1], RegionId::new(table_id, 11));
351    }
352
353    #[test]
354    fn test_convert_plan_deallocate_regions() {
355        let group_id = Uuid::new_v4();
356        let table_id = 1024;
357
358        // 5 source regions -> 3 target partition expressions
359        let source_regions = vec![
360            create_region_descriptor(table_id, 1, "x", 0, 50),
361            create_region_descriptor(table_id, 2, "x", 50, 100),
362            create_region_descriptor(table_id, 3, "x", 100, 150),
363            create_region_descriptor(table_id, 4, "x", 150, 200),
364            create_region_descriptor(table_id, 5, "x", 200, 300),
365        ];
366        let target_partition_exprs = vec![
367            range_expr("x", 0, 100),
368            range_expr("x", 100, 200),
369            range_expr("x", 200, 300),
370        ];
371        let allocation_plan = AllocationPlanEntry {
372            group_id,
373            source_regions: source_regions.clone(),
374            target_partition_exprs: target_partition_exprs.clone(),
375            transition_map: vec![],
376        };
377        let mut next_region_number = 10;
378        let result = convert_allocation_plan_to_repartition_plan(
379            table_id,
380            &mut next_region_number,
381            &allocation_plan,
382        );
383        assert_eq!(next_region_number, 10);
384        assert_eq!(result.group_id, group_id);
385        assert_eq!(result.source_regions, source_regions);
386        assert_eq!(result.target_regions.len(), 3);
387        assert!(result.allocated_region_ids.is_empty());
388        assert_eq!(result.pending_deallocate_region_ids.len(), 2);
389
390        // Verify first 3 source regions are kept as target regions with new partition exprs
391        assert_eq!(
392            result.target_regions[0].region_id,
393            RegionId::new(table_id, 1)
394        );
395        assert_eq!(
396            result.target_regions[0].partition_expr,
397            target_partition_exprs[0]
398        );
399        assert_eq!(
400            result.target_regions[1].region_id,
401            RegionId::new(table_id, 2)
402        );
403        assert_eq!(
404            result.target_regions[1].partition_expr,
405            target_partition_exprs[1]
406        );
407        assert_eq!(
408            result.target_regions[2].region_id,
409            RegionId::new(table_id, 3)
410        );
411        assert_eq!(
412            result.target_regions[2].partition_expr,
413            target_partition_exprs[2]
414        );
415
416        // Verify last 2 source regions are pending deallocation
417        assert_eq!(
418            result.pending_deallocate_region_ids[0],
419            RegionId::new(table_id, 4)
420        );
421        assert_eq!(
422            result.pending_deallocate_region_ids[1],
423            RegionId::new(table_id, 5)
424        );
425    }
426
427    #[test]
428    fn test_convert_plan_allocate_single_region() {
429        let group_id = Uuid::new_v4();
430        let table_id = 1024;
431        let mut next_region_number = 5;
432        // 1 source region -> 2 target partition expressions
433        let source_regions = vec![create_region_descriptor(table_id, 1, "x", 0, 100)];
434        let target_partition_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)];
435        let allocation_plan = AllocationPlanEntry {
436            group_id,
437            source_regions: source_regions.clone(),
438            target_partition_exprs: target_partition_exprs.clone(),
439            transition_map: vec![],
440        };
441
442        let result = convert_allocation_plan_to_repartition_plan(
443            table_id,
444            &mut next_region_number,
445            &allocation_plan,
446        );
447        assert_eq!(result.target_regions.len(), 2);
448        assert_eq!(result.allocated_region_ids.len(), 1);
449        assert_eq!(result.pending_deallocate_region_ids.len(), 0);
450        // First target uses source region id
451        assert_eq!(
452            result.target_regions[0].region_id,
453            RegionId::new(table_id, 1)
454        );
455        assert_eq!(
456            result.target_regions[0].partition_expr,
457            target_partition_exprs[0]
458        );
459        // Second target is newly allocated
460        assert_eq!(
461            result.target_regions[1].region_id,
462            RegionId::new(table_id, 5)
463        );
464        assert_eq!(
465            result.target_regions[1].partition_expr,
466            target_partition_exprs[1]
467        );
468        assert_eq!(next_region_number, 6);
469    }
470
471    #[test]
472    fn test_convert_plan_deallocate_to_single_region() {
473        let group_id = Uuid::new_v4();
474        let table_id = 1024;
475
476        // 3 source regions -> 1 target partition expression
477        let source_regions = vec![
478            create_region_descriptor(table_id, 1, "x", 0, 100),
479            create_region_descriptor(table_id, 2, "x", 100, 200),
480            create_region_descriptor(table_id, 3, "x", 200, 300),
481        ];
482        let target_partition_exprs = vec![range_expr("x", 0, 300)];
483        let allocation_plan = AllocationPlanEntry {
484            group_id,
485            source_regions: source_regions.clone(),
486            target_partition_exprs: target_partition_exprs.clone(),
487            transition_map: vec![],
488        };
489        let mut next_region_number = 10;
490        let result = convert_allocation_plan_to_repartition_plan(
491            table_id,
492            &mut next_region_number,
493            &allocation_plan,
494        );
495        assert_eq!(result.target_regions.len(), 1);
496        assert_eq!(result.allocated_region_ids.len(), 0);
497        assert_eq!(result.pending_deallocate_region_ids.len(), 2);
498
499        // Only first source region is kept
500        assert_eq!(
501            result.target_regions[0].region_id,
502            RegionId::new(table_id, 1)
503        );
504        assert_eq!(
505            result.target_regions[0].partition_expr,
506            target_partition_exprs[0]
507        );
508
509        // Other regions are pending deallocation
510        assert_eq!(
511            result.pending_deallocate_region_ids[0],
512            RegionId::new(table_id, 2)
513        );
514        assert_eq!(
515            result.pending_deallocate_region_ids[1],
516            RegionId::new(table_id, 3)
517        );
518    }
519}