1use std::collections::BinaryHeap;
16use std::sync::Arc;
17
18use common_telemetry::debug;
19use datafusion::config::ConfigOptions;
20use datafusion::physical_optimizer::PhysicalOptimizerRule;
21use datafusion::physical_plan::ExecutionPlan;
22use datafusion::physical_plan::sorts::sort::SortExec;
23use datafusion_common::tree_node::{Transformed, TreeNode};
24use datafusion_common::{DataFusionError, Result};
25use store_api::region_engine::PartitionRange;
26use table::table::scan::RegionScanExec;
27
28#[derive(Debug)]
29pub struct ParallelizeScan;
30
31impl PhysicalOptimizerRule for ParallelizeScan {
32 fn optimize(
33 &self,
34 plan: Arc<dyn ExecutionPlan>,
35 config: &ConfigOptions,
36 ) -> Result<Arc<dyn ExecutionPlan>> {
37 Self::do_optimize(plan, config)
38 }
39
40 fn name(&self) -> &str {
41 "parallelize_scan"
42 }
43
44 fn schema_check(&self) -> bool {
45 true
46 }
47}
48
49impl ParallelizeScan {
50 fn do_optimize(
51 plan: Arc<dyn ExecutionPlan>,
52 config: &ConfigOptions,
53 ) -> Result<Arc<dyn ExecutionPlan>> {
54 let mut first_order_expr = None;
55
56 let result = plan
57 .transform_down(|plan| {
58 if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
59 first_order_expr = Some(sort_exec.expr().first()).cloned();
61 } else if let Some(region_scan_exec) =
62 plan.as_any().downcast_ref::<RegionScanExec>()
63 {
64 let expected_partition_num = config.execution.target_partitions;
65 if region_scan_exec.is_partition_set()
66 || region_scan_exec.scanner_type().as_str() == "SinglePartition"
67 {
68 return Ok(Transformed::no(plan));
69 }
70
71 let ranges = region_scan_exec.get_partition_ranges();
72 let total_range_num = ranges.len();
73
74 let mut partition_ranges =
76 Self::assign_partition_range(ranges, expected_partition_num);
77 debug!(
78 "Assign {total_range_num} ranges to {expected_partition_num} partitions"
79 );
80
81 if let Some(order_expr) = &first_order_expr
87 && order_expr.options.descending
88 {
89 for ranges in partition_ranges.iter_mut() {
90 ranges.sort_by(|a, b| b.end.cmp(&a.end));
91 }
92 } else {
93 for ranges in partition_ranges.iter_mut() {
94 ranges.sort_by(|a, b| a.start.cmp(&b.start));
95 }
96 }
97
98 let new_exec = region_scan_exec
100 .with_new_partitions(partition_ranges, expected_partition_num)
101 .map_err(|e| DataFusionError::External(e.into_inner()))?;
102 return Ok(Transformed::yes(Arc::new(new_exec)));
103 }
104
105 Ok(Transformed::no(plan))
108 })?
109 .data;
110
111 Ok(result)
112 }
113
114 fn assign_partition_range(
119 mut ranges: Vec<PartitionRange>,
120 expected_partition_num: usize,
121 ) -> Vec<Vec<PartitionRange>> {
122 if ranges.is_empty() {
123 return vec![vec![]; expected_partition_num];
125 }
126
127 if ranges.len() == 1 {
128 let mut vec = vec![vec![]; expected_partition_num];
129 vec[0] = ranges;
130 return vec;
131 }
132
133 ranges.sort_by(|a, b| b.num_rows.cmp(&a.num_rows));
135 let mut partition_ranges = vec![vec![]; expected_partition_num];
136
137 #[derive(Eq, PartialEq)]
138 struct HeapNode {
139 num_rows: usize,
140 partition_idx: usize,
141 }
142
143 impl Ord for HeapNode {
144 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
145 self.num_rows.cmp(&other.num_rows).reverse()
147 }
148 }
149
150 impl PartialOrd for HeapNode {
151 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
152 Some(self.cmp(other))
153 }
154 }
155
156 let mut part_heap =
157 BinaryHeap::from_iter((0..expected_partition_num).map(|partition_idx| HeapNode {
158 num_rows: 0,
159 partition_idx,
160 }));
161
162 for range in ranges {
164 let mut node = part_heap.pop().unwrap();
165 let partition_idx = node.partition_idx;
166 node.num_rows += range.num_rows;
167 partition_ranges[partition_idx].push(range);
168 part_heap.push(node);
169 }
170
171 partition_ranges
172 }
173}
174
175#[cfg(test)]
176mod test {
177 use common_time::Timestamp;
178 use common_time::timestamp::TimeUnit;
179
180 use super::*;
181
182 #[test]
183 fn test_assign_partition_range() {
184 let ranges = vec![
185 PartitionRange {
186 start: Timestamp::new(0, TimeUnit::Second),
187 end: Timestamp::new(10, TimeUnit::Second),
188 num_rows: 100,
189 identifier: 1,
190 },
191 PartitionRange {
192 start: Timestamp::new(10, TimeUnit::Second),
193 end: Timestamp::new(20, TimeUnit::Second),
194 num_rows: 200,
195 identifier: 2,
196 },
197 PartitionRange {
198 start: Timestamp::new(20, TimeUnit::Second),
199 end: Timestamp::new(30, TimeUnit::Second),
200 num_rows: 150,
201 identifier: 3,
202 },
203 PartitionRange {
204 start: Timestamp::new(30, TimeUnit::Second),
205 end: Timestamp::new(40, TimeUnit::Second),
206 num_rows: 250,
207 identifier: 4,
208 },
209 ];
210
211 let expected_partition_num = 2;
213 let result =
214 ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num);
215 let expected = vec![
216 vec![
217 PartitionRange {
218 start: Timestamp::new(30, TimeUnit::Second),
219 end: Timestamp::new(40, TimeUnit::Second),
220 num_rows: 250,
221 identifier: 4,
222 },
223 PartitionRange {
224 start: Timestamp::new(0, TimeUnit::Second),
225 end: Timestamp::new(10, TimeUnit::Second),
226 num_rows: 100,
227 identifier: 1,
228 },
229 ],
230 vec![
231 PartitionRange {
232 start: Timestamp::new(10, TimeUnit::Second),
233 end: Timestamp::new(20, TimeUnit::Second),
234 num_rows: 200,
235 identifier: 2,
236 },
237 PartitionRange {
238 start: Timestamp::new(20, TimeUnit::Second),
239 end: Timestamp::new(30, TimeUnit::Second),
240 num_rows: 150,
241 identifier: 3,
242 },
243 ],
244 ];
245 assert_eq!(result, expected);
246
247 let expected_partition_num = 5;
249 let result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num);
250 let expected = vec![
251 vec![PartitionRange {
252 start: Timestamp::new(30, TimeUnit::Second),
253 end: Timestamp::new(40, TimeUnit::Second),
254 num_rows: 250,
255 identifier: 4,
256 }],
257 vec![PartitionRange {
258 start: Timestamp::new(0, TimeUnit::Second),
259 end: Timestamp::new(10, TimeUnit::Second),
260 num_rows: 100,
261 identifier: 1,
262 }],
263 vec![PartitionRange {
264 start: Timestamp::new(10, TimeUnit::Second),
265 end: Timestamp::new(20, TimeUnit::Second),
266 num_rows: 200,
267 identifier: 2,
268 }],
269 vec![],
270 vec![PartitionRange {
271 start: Timestamp::new(20, TimeUnit::Second),
272 end: Timestamp::new(30, TimeUnit::Second),
273 num_rows: 150,
274 identifier: 3,
275 }],
276 ];
277 assert_eq!(result, expected);
278
279 let result = ParallelizeScan::assign_partition_range(vec![], 5);
281 assert_eq!(result.len(), 5);
282 }
283
284 #[test]
285 fn test_assign_unbalance_partition_range() {
286 let ranges = vec![
287 PartitionRange {
288 start: Timestamp::new(0, TimeUnit::Second),
289 end: Timestamp::new(10, TimeUnit::Second),
290 num_rows: 100,
291 identifier: 1,
292 },
293 PartitionRange {
294 start: Timestamp::new(10, TimeUnit::Second),
295 end: Timestamp::new(20, TimeUnit::Second),
296 num_rows: 200,
297 identifier: 2,
298 },
299 PartitionRange {
300 start: Timestamp::new(20, TimeUnit::Second),
301 end: Timestamp::new(30, TimeUnit::Second),
302 num_rows: 150,
303 identifier: 3,
304 },
305 PartitionRange {
306 start: Timestamp::new(30, TimeUnit::Second),
307 end: Timestamp::new(40, TimeUnit::Second),
308 num_rows: 2500,
309 identifier: 4,
310 },
311 ];
312
313 let expected_partition_num = 2;
315 let result =
316 ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num);
317 let expected = vec![
318 vec![PartitionRange {
319 start: Timestamp::new(30, TimeUnit::Second),
320 end: Timestamp::new(40, TimeUnit::Second),
321 num_rows: 2500,
322 identifier: 4,
323 }],
324 vec![
325 PartitionRange {
326 start: Timestamp::new(10, TimeUnit::Second),
327 end: Timestamp::new(20, TimeUnit::Second),
328 num_rows: 200,
329 identifier: 2,
330 },
331 PartitionRange {
332 start: Timestamp::new(20, TimeUnit::Second),
333 end: Timestamp::new(30, TimeUnit::Second),
334 num_rows: 150,
335 identifier: 3,
336 },
337 PartitionRange {
338 start: Timestamp::new(0, TimeUnit::Second),
339 end: Timestamp::new(10, TimeUnit::Second),
340 num_rows: 100,
341 identifier: 1,
342 },
343 ],
344 ];
345 assert_eq!(result, expected);
346 }
347}