meta_srv/procedure/repartition/
plan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16
17use partition::expr::PartitionExpr;
18use serde::{Deserialize, Serialize};
19use store_api::storage::{RegionId, RegionNumber, TableId};
20
21use crate::procedure::repartition::group::GroupId;
22
23/// Metadata describing a region involved in the plan.
24#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
25pub struct RegionDescriptor {
26    /// The region id of the region involved in the plan.
27    pub region_id: RegionId,
28    /// The partition expression of the region.
29    pub partition_expr: PartitionExpr,
30}
31
32/// A plan entry for the region allocation phase, describing source regions
33/// and target partition expressions before allocation.
34#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
35pub struct AllocationPlanEntry {
36    /// The group id for this plan entry.
37    pub group_id: GroupId,
38    /// Source region descriptors involved in the plan.
39    pub source_regions: Vec<RegionDescriptor>,
40    /// The target partition expressions for the new or changed regions.
41    pub target_partition_exprs: Vec<PartitionExpr>,
42    /// For each `source_regions[k]`, the corresponding vector contains global
43    /// `target_partition_exprs` that overlap with it.
44    pub transition_map: Vec<Vec<usize>>,
45}
46
47/// A plan entry for the dispatch phase after region allocation,
48/// with concrete source and target region descriptors.
49#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
50pub struct RepartitionPlanEntry {
51    /// The group id for this plan entry.
52    pub group_id: GroupId,
53    /// The source region descriptors involved in the plan.
54    pub source_regions: Vec<RegionDescriptor>,
55    /// The target region descriptors involved in the plan.
56    pub target_regions: Vec<RegionDescriptor>,
57    /// The region ids of the allocated regions.
58    pub allocated_region_ids: Vec<RegionId>,
59    /// The region ids of the regions that are pending deallocation.
60    pub pending_deallocate_region_ids: Vec<RegionId>,
61    /// For each `source_regions[k]`, the corresponding vector contains global
62    /// `target_regions` that overlap with it.
63    pub transition_map: Vec<Vec<usize>>,
64}
65
66impl RepartitionPlanEntry {
67    /// Returns the target regions that are newly allocated.
68    pub(crate) fn allocate_regions(&self) -> Vec<&RegionDescriptor> {
69        self.target_regions
70            .iter()
71            .filter(|r| self.allocated_region_ids.contains(&r.region_id))
72            .collect()
73    }
74}
75
76/// Converts an allocation plan to a repartition plan.
77///
78/// Converts an [`AllocationPlanEntry`] (which contains abstract region allocation intents)
79/// into a [`RepartitionPlanEntry`] with concrete source and target region descriptors,
80/// plus records information on which regions are newly allocated and/or pending deallocation.
81///
82/// # Returns
83///
84/// A [`Result`] containing the [`RepartitionPlanEntry`] describing exactly the source regions,
85/// the target regions (including any that need to be newly allocated), and transition mappings.
86///
87/// # Notes
88/// - If new regions are needed, their region ids are constructed using `table_id` and incrementing
89///   from `next_region_number`.
90/// - For each target, associates the correct region descriptor; for new regions, the region id
91///   is assigned sequentially.
92pub fn convert_allocation_plan_to_repartition_plan(
93    table_id: TableId,
94    next_region_number: &mut RegionNumber,
95    AllocationPlanEntry {
96        group_id,
97        source_regions,
98        target_partition_exprs,
99        transition_map,
100        ..
101    }: &AllocationPlanEntry,
102) -> RepartitionPlanEntry {
103    match source_regions.len().cmp(&target_partition_exprs.len()) {
104        Ordering::Less => {
105            // requires to allocate regions
106            let pending_allocate_target_partition_exprs = target_partition_exprs
107                .iter()
108                .skip(source_regions.len())
109                .map(|target_partition_expr| {
110                    let desc = RegionDescriptor {
111                        region_id: RegionId::new(table_id, *next_region_number),
112                        partition_expr: target_partition_expr.clone(),
113                    };
114                    *next_region_number += 1;
115                    desc
116                })
117                .collect::<Vec<_>>();
118
119            let allocated_region_ids = pending_allocate_target_partition_exprs
120                .iter()
121                .map(|rd| rd.region_id)
122                .collect::<Vec<_>>();
123
124            let target_regions = source_regions
125                .iter()
126                .zip(target_partition_exprs.iter())
127                .map(|(source_region, target_partition_expr)| RegionDescriptor {
128                    region_id: source_region.region_id,
129                    partition_expr: target_partition_expr.clone(),
130                })
131                .chain(pending_allocate_target_partition_exprs)
132                .collect::<Vec<_>>();
133
134            RepartitionPlanEntry {
135                group_id: *group_id,
136                source_regions: source_regions.clone(),
137                target_regions,
138                allocated_region_ids,
139                pending_deallocate_region_ids: vec![],
140                transition_map: transition_map.clone(),
141            }
142        }
143        Ordering::Equal => {
144            let target_regions = source_regions
145                .iter()
146                .zip(target_partition_exprs.iter())
147                .map(|(source_region, target_partition_expr)| RegionDescriptor {
148                    region_id: source_region.region_id,
149                    partition_expr: target_partition_expr.clone(),
150                })
151                .collect::<Vec<_>>();
152
153            RepartitionPlanEntry {
154                group_id: *group_id,
155                source_regions: source_regions.clone(),
156                target_regions,
157                allocated_region_ids: vec![],
158                pending_deallocate_region_ids: vec![],
159                transition_map: transition_map.clone(),
160            }
161        }
162        Ordering::Greater => {
163            // requires to deallocate regions
164            let target_regions = source_regions
165                .iter()
166                .take(target_partition_exprs.len())
167                .zip(target_partition_exprs.iter())
168                .map(|(source_region, target_partition_expr)| RegionDescriptor {
169                    region_id: source_region.region_id,
170                    partition_expr: target_partition_expr.clone(),
171                })
172                .collect::<Vec<_>>();
173
174            let pending_deallocate_region_ids = source_regions
175                .iter()
176                .skip(target_partition_exprs.len())
177                .map(|source_region| source_region.region_id)
178                .collect::<Vec<_>>();
179
180            RepartitionPlanEntry {
181                group_id: *group_id,
182                source_regions: source_regions.clone(),
183                target_regions,
184                allocated_region_ids: vec![],
185                pending_deallocate_region_ids,
186                transition_map: transition_map.clone(),
187            }
188        }
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use store_api::storage::RegionId;
195    use uuid::Uuid;
196
197    use super::*;
198    use crate::procedure::repartition::test_util::range_expr;
199
200    fn create_region_descriptor(
201        table_id: TableId,
202        region_number: u32,
203        col: &str,
204        start: i64,
205        end: i64,
206    ) -> RegionDescriptor {
207        RegionDescriptor {
208            region_id: RegionId::new(table_id, region_number),
209            partition_expr: range_expr(col, start, end),
210        }
211    }
212
213    #[test]
214    fn test_convert_plan_equal_regions() {
215        let group_id = Uuid::new_v4();
216        let table_id = 1024;
217        let mut next_region_number = 10;
218        let source_regions = vec![
219            create_region_descriptor(table_id, 1, "x", 0, 100),
220            create_region_descriptor(table_id, 2, "x", 100, 200),
221        ];
222        let target_partition_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 200)];
223        let allocation_plan = AllocationPlanEntry {
224            group_id,
225            source_regions: source_regions.clone(),
226            target_partition_exprs: target_partition_exprs.clone(),
227            transition_map: Vec::new(),
228        };
229        let result = convert_allocation_plan_to_repartition_plan(
230            table_id,
231            &mut next_region_number,
232            &allocation_plan,
233        );
234        assert_eq!(result.group_id, group_id);
235        assert_eq!(result.source_regions, source_regions);
236        assert_eq!(result.target_regions.len(), 2);
237        assert!(result.allocated_region_ids.is_empty());
238        assert!(result.pending_deallocate_region_ids.is_empty());
239        // next_region_number should not change for equal regions
240        assert_eq!(next_region_number, 10);
241        // Verify target regions
242        assert_eq!(
243            result.target_regions[0].region_id,
244            RegionId::new(table_id, 1)
245        );
246        assert_eq!(
247            result.target_regions[0].partition_expr,
248            target_partition_exprs[0]
249        );
250        assert_eq!(
251            result.target_regions[1].region_id,
252            RegionId::new(table_id, 2)
253        );
254        assert_eq!(
255            result.target_regions[1].partition_expr,
256            target_partition_exprs[1]
257        );
258    }
259
260    #[test]
261    fn test_convert_plan_allocate_regions() {
262        let group_id = Uuid::new_v4();
263        let table_id = 1024;
264        let mut next_region_number = 10;
265
266        // 3 source regions -> 5 target partition expressions
267        let source_regions = vec![
268            create_region_descriptor(table_id, 1, "x", 0, 100),
269            create_region_descriptor(table_id, 2, "x", 100, 200),
270            create_region_descriptor(table_id, 3, "x", 200, 300),
271        ];
272        let target_partition_exprs = vec![
273            range_expr("x", 0, 50),
274            range_expr("x", 50, 100),
275            range_expr("x", 100, 150),
276            range_expr("x", 150, 200),
277            range_expr("x", 200, 300),
278        ];
279        let allocation_plan = AllocationPlanEntry {
280            group_id,
281            source_regions: source_regions.clone(),
282            target_partition_exprs: target_partition_exprs.clone(),
283            transition_map: vec![],
284        };
285        let result = convert_allocation_plan_to_repartition_plan(
286            table_id,
287            &mut next_region_number,
288            &allocation_plan,
289        );
290        assert_eq!(result.group_id, group_id);
291        assert_eq!(result.source_regions, source_regions);
292        assert_eq!(result.target_regions.len(), 5);
293        assert_eq!(result.allocated_region_ids.len(), 2);
294        assert!(result.pending_deallocate_region_ids.is_empty());
295        assert_eq!(next_region_number, 12);
296
297        // Verify first 3 target regions use source region ids with target partition exprs
298        assert_eq!(
299            result.target_regions[0].region_id,
300            RegionId::new(table_id, 1)
301        );
302        assert_eq!(
303            result.target_regions[0].partition_expr,
304            target_partition_exprs[0]
305        );
306        assert_eq!(
307            result.target_regions[1].region_id,
308            RegionId::new(table_id, 2)
309        );
310        assert_eq!(
311            result.target_regions[1].partition_expr,
312            target_partition_exprs[1]
313        );
314        assert_eq!(
315            result.target_regions[2].region_id,
316            RegionId::new(table_id, 3)
317        );
318        assert_eq!(
319            result.target_regions[2].partition_expr,
320            target_partition_exprs[2]
321        );
322
323        // Verify last 2 target regions are newly allocated
324        assert_eq!(
325            result.target_regions[3].region_id,
326            RegionId::new(table_id, 10)
327        );
328        assert_eq!(
329            result.target_regions[3].partition_expr,
330            target_partition_exprs[3]
331        );
332        assert_eq!(
333            result.target_regions[4].region_id,
334            RegionId::new(table_id, 11)
335        );
336        assert_eq!(
337            result.target_regions[4].partition_expr,
338            target_partition_exprs[4]
339        );
340
341        // Verify allocated region ids
342        assert_eq!(result.allocated_region_ids[0], RegionId::new(table_id, 10));
343        assert_eq!(result.allocated_region_ids[1], RegionId::new(table_id, 11));
344    }
345
346    #[test]
347    fn test_convert_plan_deallocate_regions() {
348        let group_id = Uuid::new_v4();
349        let table_id = 1024;
350
351        // 5 source regions -> 3 target partition expressions
352        let source_regions = vec![
353            create_region_descriptor(table_id, 1, "x", 0, 50),
354            create_region_descriptor(table_id, 2, "x", 50, 100),
355            create_region_descriptor(table_id, 3, "x", 100, 150),
356            create_region_descriptor(table_id, 4, "x", 150, 200),
357            create_region_descriptor(table_id, 5, "x", 200, 300),
358        ];
359        let target_partition_exprs = vec![
360            range_expr("x", 0, 100),
361            range_expr("x", 100, 200),
362            range_expr("x", 200, 300),
363        ];
364        let allocation_plan = AllocationPlanEntry {
365            group_id,
366            source_regions: source_regions.clone(),
367            target_partition_exprs: target_partition_exprs.clone(),
368            transition_map: vec![],
369        };
370        let mut next_region_number = 10;
371        let result = convert_allocation_plan_to_repartition_plan(
372            table_id,
373            &mut next_region_number,
374            &allocation_plan,
375        );
376        assert_eq!(next_region_number, 10);
377        assert_eq!(result.group_id, group_id);
378        assert_eq!(result.source_regions, source_regions);
379        assert_eq!(result.target_regions.len(), 3);
380        assert!(result.allocated_region_ids.is_empty());
381        assert_eq!(result.pending_deallocate_region_ids.len(), 2);
382
383        // Verify first 3 source regions are kept as target regions with new partition exprs
384        assert_eq!(
385            result.target_regions[0].region_id,
386            RegionId::new(table_id, 1)
387        );
388        assert_eq!(
389            result.target_regions[0].partition_expr,
390            target_partition_exprs[0]
391        );
392        assert_eq!(
393            result.target_regions[1].region_id,
394            RegionId::new(table_id, 2)
395        );
396        assert_eq!(
397            result.target_regions[1].partition_expr,
398            target_partition_exprs[1]
399        );
400        assert_eq!(
401            result.target_regions[2].region_id,
402            RegionId::new(table_id, 3)
403        );
404        assert_eq!(
405            result.target_regions[2].partition_expr,
406            target_partition_exprs[2]
407        );
408
409        // Verify last 2 source regions are pending deallocation
410        assert_eq!(
411            result.pending_deallocate_region_ids[0],
412            RegionId::new(table_id, 4)
413        );
414        assert_eq!(
415            result.pending_deallocate_region_ids[1],
416            RegionId::new(table_id, 5)
417        );
418    }
419
420    #[test]
421    fn test_convert_plan_allocate_single_region() {
422        let group_id = Uuid::new_v4();
423        let table_id = 1024;
424        let mut next_region_number = 5;
425        // 1 source region -> 2 target partition expressions
426        let source_regions = vec![create_region_descriptor(table_id, 1, "x", 0, 100)];
427        let target_partition_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)];
428        let allocation_plan = AllocationPlanEntry {
429            group_id,
430            source_regions: source_regions.clone(),
431            target_partition_exprs: target_partition_exprs.clone(),
432            transition_map: vec![],
433        };
434
435        let result = convert_allocation_plan_to_repartition_plan(
436            table_id,
437            &mut next_region_number,
438            &allocation_plan,
439        );
440        assert_eq!(result.target_regions.len(), 2);
441        assert_eq!(result.allocated_region_ids.len(), 1);
442        assert_eq!(result.pending_deallocate_region_ids.len(), 0);
443        // First target uses source region id
444        assert_eq!(
445            result.target_regions[0].region_id,
446            RegionId::new(table_id, 1)
447        );
448        assert_eq!(
449            result.target_regions[0].partition_expr,
450            target_partition_exprs[0]
451        );
452        // Second target is newly allocated
453        assert_eq!(
454            result.target_regions[1].region_id,
455            RegionId::new(table_id, 5)
456        );
457        assert_eq!(
458            result.target_regions[1].partition_expr,
459            target_partition_exprs[1]
460        );
461        assert_eq!(next_region_number, 6);
462    }
463
464    #[test]
465    fn test_convert_plan_deallocate_to_single_region() {
466        let group_id = Uuid::new_v4();
467        let table_id = 1024;
468
469        // 3 source regions -> 1 target partition expression
470        let source_regions = vec![
471            create_region_descriptor(table_id, 1, "x", 0, 100),
472            create_region_descriptor(table_id, 2, "x", 100, 200),
473            create_region_descriptor(table_id, 3, "x", 200, 300),
474        ];
475        let target_partition_exprs = vec![range_expr("x", 0, 300)];
476        let allocation_plan = AllocationPlanEntry {
477            group_id,
478            source_regions: source_regions.clone(),
479            target_partition_exprs: target_partition_exprs.clone(),
480            transition_map: vec![],
481        };
482        let mut next_region_number = 10;
483        let result = convert_allocation_plan_to_repartition_plan(
484            table_id,
485            &mut next_region_number,
486            &allocation_plan,
487        );
488        assert_eq!(result.target_regions.len(), 1);
489        assert_eq!(result.allocated_region_ids.len(), 0);
490        assert_eq!(result.pending_deallocate_region_ids.len(), 2);
491
492        // Only first source region is kept
493        assert_eq!(
494            result.target_regions[0].region_id,
495            RegionId::new(table_id, 1)
496        );
497        assert_eq!(
498            result.target_regions[0].partition_expr,
499            target_partition_exprs[0]
500        );
501
502        // Other regions are pending deallocation
503        assert_eq!(
504            result.pending_deallocate_region_ids[0],
505            RegionId::new(table_id, 2)
506        );
507        assert_eq!(
508            result.pending_deallocate_region_ids[1],
509            RegionId::new(table_id, 3)
510        );
511    }
512}