query/optimizer/
parallelize_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BinaryHeap;
16use std::sync::Arc;
17
18use common_telemetry::debug;
19use datafusion::config::ConfigOptions;
20use datafusion::physical_optimizer::PhysicalOptimizerRule;
21use datafusion::physical_plan::ExecutionPlan;
22use datafusion::physical_plan::sorts::sort::SortExec;
23use datafusion_common::tree_node::{Transformed, TreeNode};
24use datafusion_common::{DataFusionError, Result};
25use store_api::region_engine::PartitionRange;
26use table::table::scan::RegionScanExec;
27
28#[derive(Debug)]
29pub struct ParallelizeScan;
30
31impl PhysicalOptimizerRule for ParallelizeScan {
32    fn optimize(
33        &self,
34        plan: Arc<dyn ExecutionPlan>,
35        config: &ConfigOptions,
36    ) -> Result<Arc<dyn ExecutionPlan>> {
37        Self::do_optimize(plan, config)
38    }
39
40    fn name(&self) -> &str {
41        "parallelize_scan"
42    }
43
44    fn schema_check(&self) -> bool {
45        true
46    }
47}
48
49impl ParallelizeScan {
50    fn do_optimize(
51        plan: Arc<dyn ExecutionPlan>,
52        config: &ConfigOptions,
53    ) -> Result<Arc<dyn ExecutionPlan>> {
54        let mut first_order_expr = None;
55
56        let result = plan
57            .transform_down(|plan| {
58                if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
59                    // save the first order expr
60                    first_order_expr = Some(sort_exec.expr().first()).cloned();
61                } else if let Some(region_scan_exec) =
62                    plan.as_any().downcast_ref::<RegionScanExec>()
63                {
64                    let expected_partition_num = config.execution.target_partitions;
65                    if region_scan_exec.is_partition_set()
66                        || region_scan_exec.scanner_type().as_str() == "SinglePartition"
67                    {
68                        return Ok(Transformed::no(plan));
69                    }
70
71                    let ranges = region_scan_exec.get_partition_ranges();
72                    let total_range_num = ranges.len();
73
74                    // assign ranges to each partition
75                    let mut partition_ranges =
76                        Self::assign_partition_range(ranges, expected_partition_num);
77                    debug!(
78                        "Assign {total_range_num} ranges to {expected_partition_num} partitions"
79                    );
80
81                    // Sort the ranges in each partition based on the order expr
82                    //
83                    // This optimistically assumes that the first order expr is on the time index column
84                    // to skip the validation of the order expr. As it's not harmful if this condition
85                    // is not met.
86                    if let Some(order_expr) = &first_order_expr
87                        && order_expr.options.descending
88                    {
89                        for ranges in partition_ranges.iter_mut() {
90                            ranges.sort_by(|a, b| b.end.cmp(&a.end));
91                        }
92                    } else {
93                        for ranges in partition_ranges.iter_mut() {
94                            ranges.sort_by(|a, b| a.start.cmp(&b.start));
95                        }
96                    }
97
98                    // update the partition ranges
99                    let new_exec = region_scan_exec
100                        .with_new_partitions(partition_ranges, expected_partition_num)
101                        .map_err(|e| DataFusionError::External(e.into_inner()))?;
102                    return Ok(Transformed::yes(Arc::new(new_exec)));
103                }
104
105                // The plan might be modified, but it's modified in-place so we always return
106                // Transformed::no(plan) to indicate there is no "new child"
107                Ok(Transformed::no(plan))
108            })?
109            .data;
110
111        Ok(result)
112    }
113
114    /// Distribute [`PartitionRange`]s to each partition.
115    ///
116    /// Currently we assign ranges to partitions according to their rows so each partition
117    /// has similar number of rows. This method always return `expected_partition_num` partitions.
118    fn assign_partition_range(
119        mut ranges: Vec<PartitionRange>,
120        expected_partition_num: usize,
121    ) -> Vec<Vec<PartitionRange>> {
122        if ranges.is_empty() {
123            // Returns a single partition with no range.
124            return vec![vec![]; expected_partition_num];
125        }
126
127        if ranges.len() == 1 {
128            let mut vec = vec![vec![]; expected_partition_num];
129            vec[0] = ranges;
130            return vec;
131        }
132
133        // Sort ranges by number of rows in descending order.
134        ranges.sort_by(|a, b| b.num_rows.cmp(&a.num_rows));
135        let mut partition_ranges = vec![vec![]; expected_partition_num];
136
137        #[derive(Eq, PartialEq)]
138        struct HeapNode {
139            num_rows: usize,
140            partition_idx: usize,
141        }
142
143        impl Ord for HeapNode {
144            fn cmp(&self, other: &Self) -> std::cmp::Ordering {
145                // Reverse for min-heap.
146                self.num_rows.cmp(&other.num_rows).reverse()
147            }
148        }
149
150        impl PartialOrd for HeapNode {
151            fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
152                Some(self.cmp(other))
153            }
154        }
155
156        let mut part_heap =
157            BinaryHeap::from_iter((0..expected_partition_num).map(|partition_idx| HeapNode {
158                num_rows: 0,
159                partition_idx,
160            }));
161
162        // Assigns the range to the partition with the smallest number of rows.
163        for range in ranges {
164            let mut node = part_heap.pop().unwrap();
165            let partition_idx = node.partition_idx;
166            node.num_rows += range.num_rows;
167            partition_ranges[partition_idx].push(range);
168            part_heap.push(node);
169        }
170
171        partition_ranges
172    }
173}
174
175#[cfg(test)]
176mod test {
177    use common_time::Timestamp;
178    use common_time::timestamp::TimeUnit;
179
180    use super::*;
181
182    #[test]
183    fn test_assign_partition_range() {
184        let ranges = vec![
185            PartitionRange {
186                start: Timestamp::new(0, TimeUnit::Second),
187                end: Timestamp::new(10, TimeUnit::Second),
188                num_rows: 100,
189                identifier: 1,
190            },
191            PartitionRange {
192                start: Timestamp::new(10, TimeUnit::Second),
193                end: Timestamp::new(20, TimeUnit::Second),
194                num_rows: 200,
195                identifier: 2,
196            },
197            PartitionRange {
198                start: Timestamp::new(20, TimeUnit::Second),
199                end: Timestamp::new(30, TimeUnit::Second),
200                num_rows: 150,
201                identifier: 3,
202            },
203            PartitionRange {
204                start: Timestamp::new(30, TimeUnit::Second),
205                end: Timestamp::new(40, TimeUnit::Second),
206                num_rows: 250,
207                identifier: 4,
208            },
209        ];
210
211        // assign to 2 partitions
212        let expected_partition_num = 2;
213        let result =
214            ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num);
215        let expected = vec![
216            vec![
217                PartitionRange {
218                    start: Timestamp::new(30, TimeUnit::Second),
219                    end: Timestamp::new(40, TimeUnit::Second),
220                    num_rows: 250,
221                    identifier: 4,
222                },
223                PartitionRange {
224                    start: Timestamp::new(0, TimeUnit::Second),
225                    end: Timestamp::new(10, TimeUnit::Second),
226                    num_rows: 100,
227                    identifier: 1,
228                },
229            ],
230            vec![
231                PartitionRange {
232                    start: Timestamp::new(10, TimeUnit::Second),
233                    end: Timestamp::new(20, TimeUnit::Second),
234                    num_rows: 200,
235                    identifier: 2,
236                },
237                PartitionRange {
238                    start: Timestamp::new(20, TimeUnit::Second),
239                    end: Timestamp::new(30, TimeUnit::Second),
240                    num_rows: 150,
241                    identifier: 3,
242                },
243            ],
244        ];
245        assert_eq!(result, expected);
246
247        // assign 4 ranges to 5 partitions.
248        let expected_partition_num = 5;
249        let result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num);
250        let expected = vec![
251            vec![PartitionRange {
252                start: Timestamp::new(30, TimeUnit::Second),
253                end: Timestamp::new(40, TimeUnit::Second),
254                num_rows: 250,
255                identifier: 4,
256            }],
257            vec![PartitionRange {
258                start: Timestamp::new(0, TimeUnit::Second),
259                end: Timestamp::new(10, TimeUnit::Second),
260                num_rows: 100,
261                identifier: 1,
262            }],
263            vec![PartitionRange {
264                start: Timestamp::new(10, TimeUnit::Second),
265                end: Timestamp::new(20, TimeUnit::Second),
266                num_rows: 200,
267                identifier: 2,
268            }],
269            vec![],
270            vec![PartitionRange {
271                start: Timestamp::new(20, TimeUnit::Second),
272                end: Timestamp::new(30, TimeUnit::Second),
273                num_rows: 150,
274                identifier: 3,
275            }],
276        ];
277        assert_eq!(result, expected);
278
279        // assign 0 ranges to 5 partitions. Should return 5 empty ranges.
280        let result = ParallelizeScan::assign_partition_range(vec![], 5);
281        assert_eq!(result.len(), 5);
282    }
283
284    #[test]
285    fn test_assign_unbalance_partition_range() {
286        let ranges = vec![
287            PartitionRange {
288                start: Timestamp::new(0, TimeUnit::Second),
289                end: Timestamp::new(10, TimeUnit::Second),
290                num_rows: 100,
291                identifier: 1,
292            },
293            PartitionRange {
294                start: Timestamp::new(10, TimeUnit::Second),
295                end: Timestamp::new(20, TimeUnit::Second),
296                num_rows: 200,
297                identifier: 2,
298            },
299            PartitionRange {
300                start: Timestamp::new(20, TimeUnit::Second),
301                end: Timestamp::new(30, TimeUnit::Second),
302                num_rows: 150,
303                identifier: 3,
304            },
305            PartitionRange {
306                start: Timestamp::new(30, TimeUnit::Second),
307                end: Timestamp::new(40, TimeUnit::Second),
308                num_rows: 2500,
309                identifier: 4,
310            },
311        ];
312
313        // assign to 2 partitions
314        let expected_partition_num = 2;
315        let result =
316            ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num);
317        let expected = vec![
318            vec![PartitionRange {
319                start: Timestamp::new(30, TimeUnit::Second),
320                end: Timestamp::new(40, TimeUnit::Second),
321                num_rows: 2500,
322                identifier: 4,
323            }],
324            vec![
325                PartitionRange {
326                    start: Timestamp::new(10, TimeUnit::Second),
327                    end: Timestamp::new(20, TimeUnit::Second),
328                    num_rows: 200,
329                    identifier: 2,
330                },
331                PartitionRange {
332                    start: Timestamp::new(20, TimeUnit::Second),
333                    end: Timestamp::new(30, TimeUnit::Second),
334                    num_rows: 150,
335                    identifier: 3,
336                },
337                PartitionRange {
338                    start: Timestamp::new(0, TimeUnit::Second),
339                    end: Timestamp::new(10, TimeUnit::Second),
340                    num_rows: 100,
341                    identifier: 1,
342                },
343            ],
344        ];
345        assert_eq!(result, expected);
346    }
347}