1use std::collections::BinaryHeap;
16use std::sync::Arc;
17
18use common_telemetry::debug;
19use datafusion::config::ConfigOptions;
20use datafusion::physical_optimizer::PhysicalOptimizerRule;
21use datafusion::physical_plan::ExecutionPlan;
22use datafusion::physical_plan::sorts::sort::SortExec;
23use datafusion_common::tree_node::{Transformed, TreeNode};
24use datafusion_common::{DataFusionError, Result};
25use store_api::region_engine::PartitionRange;
26use table::table::scan::RegionScanExec;
27
28#[derive(Debug)]
29pub struct ParallelizeScan;
30
31impl PhysicalOptimizerRule for ParallelizeScan {
32 fn optimize(
33 &self,
34 plan: Arc<dyn ExecutionPlan>,
35 config: &ConfigOptions,
36 ) -> Result<Arc<dyn ExecutionPlan>> {
37 Self::do_optimize(plan, config)
38 }
39
40 fn name(&self) -> &str {
41 "parallelize_scan"
42 }
43
44 fn schema_check(&self) -> bool {
45 true
46 }
47}
48
49impl ParallelizeScan {
50 fn do_optimize(
51 plan: Arc<dyn ExecutionPlan>,
52 config: &ConfigOptions,
53 ) -> Result<Arc<dyn ExecutionPlan>> {
54 let mut first_order_expr = None;
55
56 let result = plan
57 .transform_down(|plan| {
58 if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
59 first_order_expr = Some(sort_exec.expr().first()).cloned();
61 } else if let Some(region_scan_exec) =
62 plan.as_any().downcast_ref::<RegionScanExec>()
63 {
64 let expected_partition_num = config.execution.target_partitions;
65 if region_scan_exec.is_partition_set()
66 || region_scan_exec.scanner_type().as_str() == "SinglePartition"
67 {
68 return Ok(Transformed::no(plan));
69 }
70
71 let ranges = region_scan_exec.get_partition_ranges();
72 let total_range_num = ranges.len();
73
74 let mut partition_ranges =
76 Self::assign_partition_range(ranges, expected_partition_num);
77 debug!(
78 "Assign {total_range_num} ranges to {expected_partition_num} partitions"
79 );
80
81 if let Some(order_expr) = &first_order_expr
87 && order_expr.options.descending
88 {
89 for ranges in partition_ranges.iter_mut() {
90 ranges.sort_by(|a, b| {
93 b.end.cmp(&a.end).then_with(|| b.start.cmp(&a.start))
94 });
95 }
96 } else {
97 for ranges in partition_ranges.iter_mut() {
98 ranges.sort_by(|a, b| {
101 a.start.cmp(&b.start).then_with(|| a.end.cmp(&b.end))
102 });
103 }
104 }
105
106 let new_exec = region_scan_exec
108 .with_new_partitions(partition_ranges, expected_partition_num)
109 .map_err(|e| DataFusionError::External(e.into_inner()))?;
110 return Ok(Transformed::yes(Arc::new(new_exec)));
111 }
112
113 Ok(Transformed::no(plan))
116 })?
117 .data;
118
119 Ok(result)
120 }
121
122 fn assign_partition_range(
127 mut ranges: Vec<PartitionRange>,
128 expected_partition_num: usize,
129 ) -> Vec<Vec<PartitionRange>> {
130 if ranges.is_empty() {
131 return vec![vec![]; expected_partition_num];
133 }
134
135 if ranges.len() == 1 {
136 let mut vec = vec![vec![]; expected_partition_num];
137 vec[0] = ranges;
138 return vec;
139 }
140
141 ranges.sort_by(|a, b| b.num_rows.cmp(&a.num_rows));
143 let mut partition_ranges = vec![vec![]; expected_partition_num];
144
145 #[derive(Eq, PartialEq)]
146 struct HeapNode {
147 num_rows: usize,
148 partition_idx: usize,
149 }
150
151 impl Ord for HeapNode {
152 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
153 self.num_rows.cmp(&other.num_rows).reverse()
155 }
156 }
157
158 impl PartialOrd for HeapNode {
159 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
160 Some(self.cmp(other))
161 }
162 }
163
164 let mut part_heap =
165 BinaryHeap::from_iter((0..expected_partition_num).map(|partition_idx| HeapNode {
166 num_rows: 0,
167 partition_idx,
168 }));
169
170 for range in ranges {
172 let mut node = part_heap.pop().unwrap();
173 let partition_idx = node.partition_idx;
174 node.num_rows += range.num_rows;
175 partition_ranges[partition_idx].push(range);
176 part_heap.push(node);
177 }
178
179 partition_ranges
180 }
181}
182
183#[cfg(test)]
184mod test {
185 use common_time::Timestamp;
186 use common_time::timestamp::TimeUnit;
187
188 use super::*;
189
190 #[test]
191 fn test_assign_partition_range() {
192 let ranges = vec![
193 PartitionRange {
194 start: Timestamp::new(0, TimeUnit::Second),
195 end: Timestamp::new(10, TimeUnit::Second),
196 num_rows: 100,
197 identifier: 1,
198 },
199 PartitionRange {
200 start: Timestamp::new(10, TimeUnit::Second),
201 end: Timestamp::new(20, TimeUnit::Second),
202 num_rows: 200,
203 identifier: 2,
204 },
205 PartitionRange {
206 start: Timestamp::new(20, TimeUnit::Second),
207 end: Timestamp::new(30, TimeUnit::Second),
208 num_rows: 150,
209 identifier: 3,
210 },
211 PartitionRange {
212 start: Timestamp::new(30, TimeUnit::Second),
213 end: Timestamp::new(40, TimeUnit::Second),
214 num_rows: 250,
215 identifier: 4,
216 },
217 ];
218
219 let expected_partition_num = 2;
221 let result =
222 ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num);
223 let expected = vec![
224 vec![
225 PartitionRange {
226 start: Timestamp::new(30, TimeUnit::Second),
227 end: Timestamp::new(40, TimeUnit::Second),
228 num_rows: 250,
229 identifier: 4,
230 },
231 PartitionRange {
232 start: Timestamp::new(0, TimeUnit::Second),
233 end: Timestamp::new(10, TimeUnit::Second),
234 num_rows: 100,
235 identifier: 1,
236 },
237 ],
238 vec![
239 PartitionRange {
240 start: Timestamp::new(10, TimeUnit::Second),
241 end: Timestamp::new(20, TimeUnit::Second),
242 num_rows: 200,
243 identifier: 2,
244 },
245 PartitionRange {
246 start: Timestamp::new(20, TimeUnit::Second),
247 end: Timestamp::new(30, TimeUnit::Second),
248 num_rows: 150,
249 identifier: 3,
250 },
251 ],
252 ];
253 assert_eq!(result, expected);
254
255 let expected_partition_num = 5;
257 let result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num);
258 let expected = vec![
259 vec![PartitionRange {
260 start: Timestamp::new(30, TimeUnit::Second),
261 end: Timestamp::new(40, TimeUnit::Second),
262 num_rows: 250,
263 identifier: 4,
264 }],
265 vec![PartitionRange {
266 start: Timestamp::new(0, TimeUnit::Second),
267 end: Timestamp::new(10, TimeUnit::Second),
268 num_rows: 100,
269 identifier: 1,
270 }],
271 vec![PartitionRange {
272 start: Timestamp::new(10, TimeUnit::Second),
273 end: Timestamp::new(20, TimeUnit::Second),
274 num_rows: 200,
275 identifier: 2,
276 }],
277 vec![],
278 vec![PartitionRange {
279 start: Timestamp::new(20, TimeUnit::Second),
280 end: Timestamp::new(30, TimeUnit::Second),
281 num_rows: 150,
282 identifier: 3,
283 }],
284 ];
285 assert_eq!(result, expected);
286
287 let result = ParallelizeScan::assign_partition_range(vec![], 5);
289 assert_eq!(result.len(), 5);
290 }
291
292 #[test]
293 fn test_assign_unbalance_partition_range() {
294 let ranges = vec![
295 PartitionRange {
296 start: Timestamp::new(0, TimeUnit::Second),
297 end: Timestamp::new(10, TimeUnit::Second),
298 num_rows: 100,
299 identifier: 1,
300 },
301 PartitionRange {
302 start: Timestamp::new(10, TimeUnit::Second),
303 end: Timestamp::new(20, TimeUnit::Second),
304 num_rows: 200,
305 identifier: 2,
306 },
307 PartitionRange {
308 start: Timestamp::new(20, TimeUnit::Second),
309 end: Timestamp::new(30, TimeUnit::Second),
310 num_rows: 150,
311 identifier: 3,
312 },
313 PartitionRange {
314 start: Timestamp::new(30, TimeUnit::Second),
315 end: Timestamp::new(40, TimeUnit::Second),
316 num_rows: 2500,
317 identifier: 4,
318 },
319 ];
320
321 let expected_partition_num = 2;
323 let result =
324 ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num);
325 let expected = vec![
326 vec![PartitionRange {
327 start: Timestamp::new(30, TimeUnit::Second),
328 end: Timestamp::new(40, TimeUnit::Second),
329 num_rows: 2500,
330 identifier: 4,
331 }],
332 vec![
333 PartitionRange {
334 start: Timestamp::new(10, TimeUnit::Second),
335 end: Timestamp::new(20, TimeUnit::Second),
336 num_rows: 200,
337 identifier: 2,
338 },
339 PartitionRange {
340 start: Timestamp::new(20, TimeUnit::Second),
341 end: Timestamp::new(30, TimeUnit::Second),
342 num_rows: 150,
343 identifier: 3,
344 },
345 PartitionRange {
346 start: Timestamp::new(0, TimeUnit::Second),
347 end: Timestamp::new(10, TimeUnit::Second),
348 num_rows: 100,
349 identifier: 1,
350 },
351 ],
352 ];
353 assert_eq!(result, expected);
354 }
355}