query/optimizer/
parallelize_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BinaryHeap;
16use std::sync::Arc;
17
18use common_telemetry::debug;
19use datafusion::config::ConfigOptions;
20use datafusion::physical_optimizer::PhysicalOptimizerRule;
21use datafusion::physical_plan::sorts::sort::SortExec;
22use datafusion::physical_plan::ExecutionPlan;
23use datafusion_common::tree_node::{Transformed, TreeNode};
24use datafusion_common::{DataFusionError, Result};
25use store_api::region_engine::PartitionRange;
26use table::table::scan::RegionScanExec;
27
28#[derive(Debug)]
29pub struct ParallelizeScan;
30
31impl PhysicalOptimizerRule for ParallelizeScan {
32    fn optimize(
33        &self,
34        plan: Arc<dyn ExecutionPlan>,
35        config: &ConfigOptions,
36    ) -> Result<Arc<dyn ExecutionPlan>> {
37        Self::do_optimize(plan, config)
38    }
39
40    fn name(&self) -> &str {
41        "parallelize_scan"
42    }
43
44    fn schema_check(&self) -> bool {
45        true
46    }
47}
48
49impl ParallelizeScan {
50    fn do_optimize(
51        plan: Arc<dyn ExecutionPlan>,
52        config: &ConfigOptions,
53    ) -> Result<Arc<dyn ExecutionPlan>> {
54        let mut first_order_expr = None;
55
56        let result = plan
57            .transform_down(|plan| {
58                if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
59                    // save the first order expr
60                    first_order_expr = sort_exec.expr().first().cloned();
61                } else if let Some(region_scan_exec) =
62                    plan.as_any().downcast_ref::<RegionScanExec>()
63                {
64                    let expected_partition_num = config.execution.target_partitions;
65                    if region_scan_exec.is_partition_set() {
66                        return Ok(Transformed::no(plan));
67                    }
68
69                    let ranges = region_scan_exec.get_partition_ranges();
70                    let total_range_num = ranges.len();
71
72                    // assign ranges to each partition
73                    let mut partition_ranges =
74                        Self::assign_partition_range(ranges, expected_partition_num);
75                    debug!(
76                        "Assign {total_range_num} ranges to {expected_partition_num} partitions"
77                    );
78
79                    // Sort the ranges in each partition based on the order expr
80                    //
81                    // This optimistically assumes that the first order expr is on the time index column
82                    // to skip the validation of the order expr. As it's not harmful if this condition
83                    // is not met.
84                    if let Some(order_expr) = &first_order_expr
85                        && order_expr.options.descending
86                    {
87                        for ranges in partition_ranges.iter_mut() {
88                            ranges.sort_by(|a, b| b.end.cmp(&a.end));
89                        }
90                    } else {
91                        for ranges in partition_ranges.iter_mut() {
92                            ranges.sort_by(|a, b| a.start.cmp(&b.start));
93                        }
94                    }
95
96                    // update the partition ranges
97                    let new_exec = region_scan_exec
98                        .with_new_partitions(partition_ranges, expected_partition_num)
99                        .map_err(|e| DataFusionError::External(e.into_inner()))?;
100                    return Ok(Transformed::yes(Arc::new(new_exec)));
101                }
102
103                // The plan might be modified, but it's modified in-place so we always return
104                // Transformed::no(plan) to indicate there is no "new child"
105                Ok(Transformed::no(plan))
106            })?
107            .data;
108
109        Ok(result)
110    }
111
112    /// Distribute [`PartitionRange`]s to each partition.
113    ///
114    /// Currently we assign ranges to partitions according to their rows so each partition
115    /// has similar number of rows. This method always return `expected_partition_num` partitions.
116    fn assign_partition_range(
117        mut ranges: Vec<PartitionRange>,
118        expected_partition_num: usize,
119    ) -> Vec<Vec<PartitionRange>> {
120        if ranges.is_empty() {
121            // Returns a single partition with no range.
122            return vec![vec![]; expected_partition_num];
123        }
124
125        if ranges.len() == 1 {
126            let mut vec = vec![vec![]; expected_partition_num];
127            vec[0] = ranges;
128            return vec;
129        }
130
131        // Sort ranges by number of rows in descending order.
132        ranges.sort_by(|a, b| b.num_rows.cmp(&a.num_rows));
133        let mut partition_ranges = vec![vec![]; expected_partition_num];
134
135        #[derive(Eq, PartialEq)]
136        struct HeapNode {
137            num_rows: usize,
138            partition_idx: usize,
139        }
140
141        impl Ord for HeapNode {
142            fn cmp(&self, other: &Self) -> std::cmp::Ordering {
143                // Reverse for min-heap.
144                self.num_rows.cmp(&other.num_rows).reverse()
145            }
146        }
147
148        impl PartialOrd for HeapNode {
149            fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
150                Some(self.cmp(other))
151            }
152        }
153
154        let mut part_heap =
155            BinaryHeap::from_iter((0..expected_partition_num).map(|partition_idx| HeapNode {
156                num_rows: 0,
157                partition_idx,
158            }));
159
160        // Assigns the range to the partition with the smallest number of rows.
161        for range in ranges {
162            let mut node = part_heap.pop().unwrap();
163            let partition_idx = node.partition_idx;
164            node.num_rows += range.num_rows;
165            partition_ranges[partition_idx].push(range);
166            part_heap.push(node);
167        }
168
169        partition_ranges
170    }
171}
172
173#[cfg(test)]
174mod test {
175    use common_time::timestamp::TimeUnit;
176    use common_time::Timestamp;
177
178    use super::*;
179
180    #[test]
181    fn test_assign_partition_range() {
182        let ranges = vec![
183            PartitionRange {
184                start: Timestamp::new(0, TimeUnit::Second),
185                end: Timestamp::new(10, TimeUnit::Second),
186                num_rows: 100,
187                identifier: 1,
188            },
189            PartitionRange {
190                start: Timestamp::new(10, TimeUnit::Second),
191                end: Timestamp::new(20, TimeUnit::Second),
192                num_rows: 200,
193                identifier: 2,
194            },
195            PartitionRange {
196                start: Timestamp::new(20, TimeUnit::Second),
197                end: Timestamp::new(30, TimeUnit::Second),
198                num_rows: 150,
199                identifier: 3,
200            },
201            PartitionRange {
202                start: Timestamp::new(30, TimeUnit::Second),
203                end: Timestamp::new(40, TimeUnit::Second),
204                num_rows: 250,
205                identifier: 4,
206            },
207        ];
208
209        // assign to 2 partitions
210        let expected_partition_num = 2;
211        let result =
212            ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num);
213        let expected = vec![
214            vec![
215                PartitionRange {
216                    start: Timestamp::new(30, TimeUnit::Second),
217                    end: Timestamp::new(40, TimeUnit::Second),
218                    num_rows: 250,
219                    identifier: 4,
220                },
221                PartitionRange {
222                    start: Timestamp::new(0, TimeUnit::Second),
223                    end: Timestamp::new(10, TimeUnit::Second),
224                    num_rows: 100,
225                    identifier: 1,
226                },
227            ],
228            vec![
229                PartitionRange {
230                    start: Timestamp::new(10, TimeUnit::Second),
231                    end: Timestamp::new(20, TimeUnit::Second),
232                    num_rows: 200,
233                    identifier: 2,
234                },
235                PartitionRange {
236                    start: Timestamp::new(20, TimeUnit::Second),
237                    end: Timestamp::new(30, TimeUnit::Second),
238                    num_rows: 150,
239                    identifier: 3,
240                },
241            ],
242        ];
243        assert_eq!(result, expected);
244
245        // assign 4 ranges to 5 partitions.
246        let expected_partition_num = 5;
247        let result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num);
248        let expected = vec![
249            vec![PartitionRange {
250                start: Timestamp::new(30, TimeUnit::Second),
251                end: Timestamp::new(40, TimeUnit::Second),
252                num_rows: 250,
253                identifier: 4,
254            }],
255            vec![PartitionRange {
256                start: Timestamp::new(0, TimeUnit::Second),
257                end: Timestamp::new(10, TimeUnit::Second),
258                num_rows: 100,
259                identifier: 1,
260            }],
261            vec![PartitionRange {
262                start: Timestamp::new(10, TimeUnit::Second),
263                end: Timestamp::new(20, TimeUnit::Second),
264                num_rows: 200,
265                identifier: 2,
266            }],
267            vec![],
268            vec![PartitionRange {
269                start: Timestamp::new(20, TimeUnit::Second),
270                end: Timestamp::new(30, TimeUnit::Second),
271                num_rows: 150,
272                identifier: 3,
273            }],
274        ];
275        assert_eq!(result, expected);
276
277        // assign 0 ranges to 5 partitions. Should return 5 empty ranges.
278        let result = ParallelizeScan::assign_partition_range(vec![], 5);
279        assert_eq!(result.len(), 5);
280    }
281
282    #[test]
283    fn test_assign_unbalance_partition_range() {
284        let ranges = vec![
285            PartitionRange {
286                start: Timestamp::new(0, TimeUnit::Second),
287                end: Timestamp::new(10, TimeUnit::Second),
288                num_rows: 100,
289                identifier: 1,
290            },
291            PartitionRange {
292                start: Timestamp::new(10, TimeUnit::Second),
293                end: Timestamp::new(20, TimeUnit::Second),
294                num_rows: 200,
295                identifier: 2,
296            },
297            PartitionRange {
298                start: Timestamp::new(20, TimeUnit::Second),
299                end: Timestamp::new(30, TimeUnit::Second),
300                num_rows: 150,
301                identifier: 3,
302            },
303            PartitionRange {
304                start: Timestamp::new(30, TimeUnit::Second),
305                end: Timestamp::new(40, TimeUnit::Second),
306                num_rows: 2500,
307                identifier: 4,
308            },
309        ];
310
311        // assign to 2 partitions
312        let expected_partition_num = 2;
313        let result =
314            ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num);
315        let expected = vec![
316            vec![PartitionRange {
317                start: Timestamp::new(30, TimeUnit::Second),
318                end: Timestamp::new(40, TimeUnit::Second),
319                num_rows: 2500,
320                identifier: 4,
321            }],
322            vec![
323                PartitionRange {
324                    start: Timestamp::new(10, TimeUnit::Second),
325                    end: Timestamp::new(20, TimeUnit::Second),
326                    num_rows: 200,
327                    identifier: 2,
328                },
329                PartitionRange {
330                    start: Timestamp::new(20, TimeUnit::Second),
331                    end: Timestamp::new(30, TimeUnit::Second),
332                    num_rows: 150,
333                    identifier: 3,
334                },
335                PartitionRange {
336                    start: Timestamp::new(0, TimeUnit::Second),
337                    end: Timestamp::new(10, TimeUnit::Second),
338                    num_rows: 100,
339                    identifier: 1,
340                },
341            ],
342        ];
343        assert_eq!(result, expected);
344    }
345}