query/optimizer/
parallelize_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BinaryHeap;
16use std::sync::Arc;
17
18use common_telemetry::debug;
19use datafusion::config::ConfigOptions;
20use datafusion::physical_optimizer::PhysicalOptimizerRule;
21use datafusion::physical_plan::ExecutionPlan;
22use datafusion::physical_plan::sorts::sort::SortExec;
23use datafusion_common::tree_node::{Transformed, TreeNode};
24use datafusion_common::{DataFusionError, Result};
25use store_api::region_engine::PartitionRange;
26use table::table::scan::RegionScanExec;
27
28#[derive(Debug)]
29pub struct ParallelizeScan;
30
31impl PhysicalOptimizerRule for ParallelizeScan {
32    fn optimize(
33        &self,
34        plan: Arc<dyn ExecutionPlan>,
35        config: &ConfigOptions,
36    ) -> Result<Arc<dyn ExecutionPlan>> {
37        Self::do_optimize(plan, config)
38    }
39
40    fn name(&self) -> &str {
41        "parallelize_scan"
42    }
43
44    fn schema_check(&self) -> bool {
45        true
46    }
47}
48
49impl ParallelizeScan {
50    fn do_optimize(
51        plan: Arc<dyn ExecutionPlan>,
52        config: &ConfigOptions,
53    ) -> Result<Arc<dyn ExecutionPlan>> {
54        let mut first_order_expr = None;
55
56        let result = plan
57            .transform_down(|plan| {
58                if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
59                    // save the first order expr
60                    first_order_expr = Some(sort_exec.expr().first()).cloned();
61                } else if let Some(region_scan_exec) =
62                    plan.as_any().downcast_ref::<RegionScanExec>()
63                {
64                    let expected_partition_num = config.execution.target_partitions;
65                    if region_scan_exec.is_partition_set()
66                        || region_scan_exec.scanner_type().as_str() == "SinglePartition"
67                    {
68                        return Ok(Transformed::no(plan));
69                    }
70
71                    let ranges = region_scan_exec.get_partition_ranges();
72                    let total_range_num = ranges.len();
73
74                    // assign ranges to each partition
75                    let mut partition_ranges =
76                        Self::assign_partition_range(ranges, expected_partition_num);
77                    debug!(
78                        "Assign {total_range_num} ranges to {expected_partition_num} partitions"
79                    );
80
81                    // Sort the ranges in each partition based on the order expr
82                    //
83                    // This optimistically assumes that the first order expr is on the time index column
84                    // to skip the validation of the order expr. As it's not harmful if this condition
85                    // is not met.
86                    if let Some(order_expr) = &first_order_expr
87                        && order_expr.options.descending
88                    {
89                        for ranges in partition_ranges.iter_mut() {
90                            // Primary: end descending (larger end first)
91                            // Secondary: start descending (shorter range first when ends are equal)
92                            ranges.sort_by(|a, b| {
93                                b.end.cmp(&a.end).then_with(|| b.start.cmp(&a.start))
94                            });
95                        }
96                    } else {
97                        for ranges in partition_ranges.iter_mut() {
98                            // Primary: start ascending (smaller start first)
99                            // Secondary: end ascending (shorter range first when starts are equal)
100                            ranges.sort_by(|a, b| {
101                                a.start.cmp(&b.start).then_with(|| a.end.cmp(&b.end))
102                            });
103                        }
104                    }
105
106                    // update the partition ranges
107                    let new_exec = region_scan_exec
108                        .with_new_partitions(partition_ranges, expected_partition_num)
109                        .map_err(|e| DataFusionError::External(e.into_inner()))?;
110                    return Ok(Transformed::yes(Arc::new(new_exec)));
111                }
112
113                // The plan might be modified, but it's modified in-place so we always return
114                // Transformed::no(plan) to indicate there is no "new child"
115                Ok(Transformed::no(plan))
116            })?
117            .data;
118
119        Ok(result)
120    }
121
122    /// Distribute [`PartitionRange`]s to each partition.
123    ///
124    /// Currently we assign ranges to partitions according to their rows so each partition
125    /// has similar number of rows. This method always return `expected_partition_num` partitions.
126    fn assign_partition_range(
127        mut ranges: Vec<PartitionRange>,
128        expected_partition_num: usize,
129    ) -> Vec<Vec<PartitionRange>> {
130        if ranges.is_empty() {
131            // Returns a single partition with no range.
132            return vec![vec![]; expected_partition_num];
133        }
134
135        if ranges.len() == 1 {
136            let mut vec = vec![vec![]; expected_partition_num];
137            vec[0] = ranges;
138            return vec;
139        }
140
141        // Sort ranges by number of rows in descending order.
142        ranges.sort_by(|a, b| b.num_rows.cmp(&a.num_rows));
143        let mut partition_ranges = vec![vec![]; expected_partition_num];
144
145        #[derive(Eq, PartialEq)]
146        struct HeapNode {
147            num_rows: usize,
148            partition_idx: usize,
149        }
150
151        impl Ord for HeapNode {
152            fn cmp(&self, other: &Self) -> std::cmp::Ordering {
153                // Reverse for min-heap.
154                self.num_rows.cmp(&other.num_rows).reverse()
155            }
156        }
157
158        impl PartialOrd for HeapNode {
159            fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
160                Some(self.cmp(other))
161            }
162        }
163
164        let mut part_heap =
165            BinaryHeap::from_iter((0..expected_partition_num).map(|partition_idx| HeapNode {
166                num_rows: 0,
167                partition_idx,
168            }));
169
170        // Assigns the range to the partition with the smallest number of rows.
171        for range in ranges {
172            let mut node = part_heap.pop().unwrap();
173            let partition_idx = node.partition_idx;
174            node.num_rows += range.num_rows;
175            partition_ranges[partition_idx].push(range);
176            part_heap.push(node);
177        }
178
179        partition_ranges
180    }
181}
182
183#[cfg(test)]
184mod test {
185    use common_time::Timestamp;
186    use common_time::timestamp::TimeUnit;
187
188    use super::*;
189
190    #[test]
191    fn test_assign_partition_range() {
192        let ranges = vec![
193            PartitionRange {
194                start: Timestamp::new(0, TimeUnit::Second),
195                end: Timestamp::new(10, TimeUnit::Second),
196                num_rows: 100,
197                identifier: 1,
198            },
199            PartitionRange {
200                start: Timestamp::new(10, TimeUnit::Second),
201                end: Timestamp::new(20, TimeUnit::Second),
202                num_rows: 200,
203                identifier: 2,
204            },
205            PartitionRange {
206                start: Timestamp::new(20, TimeUnit::Second),
207                end: Timestamp::new(30, TimeUnit::Second),
208                num_rows: 150,
209                identifier: 3,
210            },
211            PartitionRange {
212                start: Timestamp::new(30, TimeUnit::Second),
213                end: Timestamp::new(40, TimeUnit::Second),
214                num_rows: 250,
215                identifier: 4,
216            },
217        ];
218
219        // assign to 2 partitions
220        let expected_partition_num = 2;
221        let result =
222            ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num);
223        let expected = vec![
224            vec![
225                PartitionRange {
226                    start: Timestamp::new(30, TimeUnit::Second),
227                    end: Timestamp::new(40, TimeUnit::Second),
228                    num_rows: 250,
229                    identifier: 4,
230                },
231                PartitionRange {
232                    start: Timestamp::new(0, TimeUnit::Second),
233                    end: Timestamp::new(10, TimeUnit::Second),
234                    num_rows: 100,
235                    identifier: 1,
236                },
237            ],
238            vec![
239                PartitionRange {
240                    start: Timestamp::new(10, TimeUnit::Second),
241                    end: Timestamp::new(20, TimeUnit::Second),
242                    num_rows: 200,
243                    identifier: 2,
244                },
245                PartitionRange {
246                    start: Timestamp::new(20, TimeUnit::Second),
247                    end: Timestamp::new(30, TimeUnit::Second),
248                    num_rows: 150,
249                    identifier: 3,
250                },
251            ],
252        ];
253        assert_eq!(result, expected);
254
255        // assign 4 ranges to 5 partitions.
256        let expected_partition_num = 5;
257        let result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num);
258        let expected = vec![
259            vec![PartitionRange {
260                start: Timestamp::new(30, TimeUnit::Second),
261                end: Timestamp::new(40, TimeUnit::Second),
262                num_rows: 250,
263                identifier: 4,
264            }],
265            vec![PartitionRange {
266                start: Timestamp::new(0, TimeUnit::Second),
267                end: Timestamp::new(10, TimeUnit::Second),
268                num_rows: 100,
269                identifier: 1,
270            }],
271            vec![PartitionRange {
272                start: Timestamp::new(10, TimeUnit::Second),
273                end: Timestamp::new(20, TimeUnit::Second),
274                num_rows: 200,
275                identifier: 2,
276            }],
277            vec![],
278            vec![PartitionRange {
279                start: Timestamp::new(20, TimeUnit::Second),
280                end: Timestamp::new(30, TimeUnit::Second),
281                num_rows: 150,
282                identifier: 3,
283            }],
284        ];
285        assert_eq!(result, expected);
286
287        // assign 0 ranges to 5 partitions. Should return 5 empty ranges.
288        let result = ParallelizeScan::assign_partition_range(vec![], 5);
289        assert_eq!(result.len(), 5);
290    }
291
292    #[test]
293    fn test_assign_unbalance_partition_range() {
294        let ranges = vec![
295            PartitionRange {
296                start: Timestamp::new(0, TimeUnit::Second),
297                end: Timestamp::new(10, TimeUnit::Second),
298                num_rows: 100,
299                identifier: 1,
300            },
301            PartitionRange {
302                start: Timestamp::new(10, TimeUnit::Second),
303                end: Timestamp::new(20, TimeUnit::Second),
304                num_rows: 200,
305                identifier: 2,
306            },
307            PartitionRange {
308                start: Timestamp::new(20, TimeUnit::Second),
309                end: Timestamp::new(30, TimeUnit::Second),
310                num_rows: 150,
311                identifier: 3,
312            },
313            PartitionRange {
314                start: Timestamp::new(30, TimeUnit::Second),
315                end: Timestamp::new(40, TimeUnit::Second),
316                num_rows: 2500,
317                identifier: 4,
318            },
319        ];
320
321        // assign to 2 partitions
322        let expected_partition_num = 2;
323        let result =
324            ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num);
325        let expected = vec![
326            vec![PartitionRange {
327                start: Timestamp::new(30, TimeUnit::Second),
328                end: Timestamp::new(40, TimeUnit::Second),
329                num_rows: 2500,
330                identifier: 4,
331            }],
332            vec![
333                PartitionRange {
334                    start: Timestamp::new(10, TimeUnit::Second),
335                    end: Timestamp::new(20, TimeUnit::Second),
336                    num_rows: 200,
337                    identifier: 2,
338                },
339                PartitionRange {
340                    start: Timestamp::new(20, TimeUnit::Second),
341                    end: Timestamp::new(30, TimeUnit::Second),
342                    num_rows: 150,
343                    identifier: 3,
344                },
345                PartitionRange {
346                    start: Timestamp::new(0, TimeUnit::Second),
347                    end: Timestamp::new(10, TimeUnit::Second),
348                    num_rows: 100,
349                    identifier: 1,
350                },
351            ],
352        ];
353        assert_eq!(result, expected);
354    }
355}