1use std::collections::BinaryHeap;
16use std::sync::Arc;
17
18use common_telemetry::debug;
19use datafusion::config::ConfigOptions;
20use datafusion::physical_optimizer::PhysicalOptimizerRule;
21use datafusion::physical_plan::sorts::sort::SortExec;
22use datafusion::physical_plan::ExecutionPlan;
23use datafusion_common::tree_node::{Transformed, TreeNode};
24use datafusion_common::{DataFusionError, Result};
25use store_api::region_engine::PartitionRange;
26use table::table::scan::RegionScanExec;
27
28#[derive(Debug)]
29pub struct ParallelizeScan;
30
31impl PhysicalOptimizerRule for ParallelizeScan {
32 fn optimize(
33 &self,
34 plan: Arc<dyn ExecutionPlan>,
35 config: &ConfigOptions,
36 ) -> Result<Arc<dyn ExecutionPlan>> {
37 Self::do_optimize(plan, config)
38 }
39
40 fn name(&self) -> &str {
41 "parallelize_scan"
42 }
43
44 fn schema_check(&self) -> bool {
45 true
46 }
47}
48
49impl ParallelizeScan {
50 fn do_optimize(
51 plan: Arc<dyn ExecutionPlan>,
52 config: &ConfigOptions,
53 ) -> Result<Arc<dyn ExecutionPlan>> {
54 let mut first_order_expr = None;
55
56 let result = plan
57 .transform_down(|plan| {
58 if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
59 first_order_expr = sort_exec.expr().first().cloned();
61 } else if let Some(region_scan_exec) =
62 plan.as_any().downcast_ref::<RegionScanExec>()
63 {
64 let expected_partition_num = config.execution.target_partitions;
65 if region_scan_exec.is_partition_set() {
66 return Ok(Transformed::no(plan));
67 }
68
69 let ranges = region_scan_exec.get_partition_ranges();
70 let total_range_num = ranges.len();
71
72 let mut partition_ranges =
74 Self::assign_partition_range(ranges, expected_partition_num);
75 debug!(
76 "Assign {total_range_num} ranges to {expected_partition_num} partitions"
77 );
78
79 if let Some(order_expr) = &first_order_expr
85 && order_expr.options.descending
86 {
87 for ranges in partition_ranges.iter_mut() {
88 ranges.sort_by(|a, b| b.end.cmp(&a.end));
89 }
90 } else {
91 for ranges in partition_ranges.iter_mut() {
92 ranges.sort_by(|a, b| a.start.cmp(&b.start));
93 }
94 }
95
96 let new_exec = region_scan_exec
98 .with_new_partitions(partition_ranges, expected_partition_num)
99 .map_err(|e| DataFusionError::External(e.into_inner()))?;
100 return Ok(Transformed::yes(Arc::new(new_exec)));
101 }
102
103 Ok(Transformed::no(plan))
106 })?
107 .data;
108
109 Ok(result)
110 }
111
112 fn assign_partition_range(
117 mut ranges: Vec<PartitionRange>,
118 expected_partition_num: usize,
119 ) -> Vec<Vec<PartitionRange>> {
120 if ranges.is_empty() {
121 return vec![vec![]; expected_partition_num];
123 }
124
125 if ranges.len() == 1 {
126 let mut vec = vec![vec![]; expected_partition_num];
127 vec[0] = ranges;
128 return vec;
129 }
130
131 ranges.sort_by(|a, b| b.num_rows.cmp(&a.num_rows));
133 let mut partition_ranges = vec![vec![]; expected_partition_num];
134
135 #[derive(Eq, PartialEq)]
136 struct HeapNode {
137 num_rows: usize,
138 partition_idx: usize,
139 }
140
141 impl Ord for HeapNode {
142 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
143 self.num_rows.cmp(&other.num_rows).reverse()
145 }
146 }
147
148 impl PartialOrd for HeapNode {
149 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
150 Some(self.cmp(other))
151 }
152 }
153
154 let mut part_heap =
155 BinaryHeap::from_iter((0..expected_partition_num).map(|partition_idx| HeapNode {
156 num_rows: 0,
157 partition_idx,
158 }));
159
160 for range in ranges {
162 let mut node = part_heap.pop().unwrap();
163 let partition_idx = node.partition_idx;
164 node.num_rows += range.num_rows;
165 partition_ranges[partition_idx].push(range);
166 part_heap.push(node);
167 }
168
169 partition_ranges
170 }
171}
172
173#[cfg(test)]
174mod test {
175 use common_time::timestamp::TimeUnit;
176 use common_time::Timestamp;
177
178 use super::*;
179
180 #[test]
181 fn test_assign_partition_range() {
182 let ranges = vec![
183 PartitionRange {
184 start: Timestamp::new(0, TimeUnit::Second),
185 end: Timestamp::new(10, TimeUnit::Second),
186 num_rows: 100,
187 identifier: 1,
188 },
189 PartitionRange {
190 start: Timestamp::new(10, TimeUnit::Second),
191 end: Timestamp::new(20, TimeUnit::Second),
192 num_rows: 200,
193 identifier: 2,
194 },
195 PartitionRange {
196 start: Timestamp::new(20, TimeUnit::Second),
197 end: Timestamp::new(30, TimeUnit::Second),
198 num_rows: 150,
199 identifier: 3,
200 },
201 PartitionRange {
202 start: Timestamp::new(30, TimeUnit::Second),
203 end: Timestamp::new(40, TimeUnit::Second),
204 num_rows: 250,
205 identifier: 4,
206 },
207 ];
208
209 let expected_partition_num = 2;
211 let result =
212 ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num);
213 let expected = vec![
214 vec![
215 PartitionRange {
216 start: Timestamp::new(30, TimeUnit::Second),
217 end: Timestamp::new(40, TimeUnit::Second),
218 num_rows: 250,
219 identifier: 4,
220 },
221 PartitionRange {
222 start: Timestamp::new(0, TimeUnit::Second),
223 end: Timestamp::new(10, TimeUnit::Second),
224 num_rows: 100,
225 identifier: 1,
226 },
227 ],
228 vec![
229 PartitionRange {
230 start: Timestamp::new(10, TimeUnit::Second),
231 end: Timestamp::new(20, TimeUnit::Second),
232 num_rows: 200,
233 identifier: 2,
234 },
235 PartitionRange {
236 start: Timestamp::new(20, TimeUnit::Second),
237 end: Timestamp::new(30, TimeUnit::Second),
238 num_rows: 150,
239 identifier: 3,
240 },
241 ],
242 ];
243 assert_eq!(result, expected);
244
245 let expected_partition_num = 5;
247 let result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num);
248 let expected = vec![
249 vec![PartitionRange {
250 start: Timestamp::new(30, TimeUnit::Second),
251 end: Timestamp::new(40, TimeUnit::Second),
252 num_rows: 250,
253 identifier: 4,
254 }],
255 vec![PartitionRange {
256 start: Timestamp::new(0, TimeUnit::Second),
257 end: Timestamp::new(10, TimeUnit::Second),
258 num_rows: 100,
259 identifier: 1,
260 }],
261 vec![PartitionRange {
262 start: Timestamp::new(10, TimeUnit::Second),
263 end: Timestamp::new(20, TimeUnit::Second),
264 num_rows: 200,
265 identifier: 2,
266 }],
267 vec![],
268 vec![PartitionRange {
269 start: Timestamp::new(20, TimeUnit::Second),
270 end: Timestamp::new(30, TimeUnit::Second),
271 num_rows: 150,
272 identifier: 3,
273 }],
274 ];
275 assert_eq!(result, expected);
276
277 let result = ParallelizeScan::assign_partition_range(vec![], 5);
279 assert_eq!(result.len(), 5);
280 }
281
282 #[test]
283 fn test_assign_unbalance_partition_range() {
284 let ranges = vec![
285 PartitionRange {
286 start: Timestamp::new(0, TimeUnit::Second),
287 end: Timestamp::new(10, TimeUnit::Second),
288 num_rows: 100,
289 identifier: 1,
290 },
291 PartitionRange {
292 start: Timestamp::new(10, TimeUnit::Second),
293 end: Timestamp::new(20, TimeUnit::Second),
294 num_rows: 200,
295 identifier: 2,
296 },
297 PartitionRange {
298 start: Timestamp::new(20, TimeUnit::Second),
299 end: Timestamp::new(30, TimeUnit::Second),
300 num_rows: 150,
301 identifier: 3,
302 },
303 PartitionRange {
304 start: Timestamp::new(30, TimeUnit::Second),
305 end: Timestamp::new(40, TimeUnit::Second),
306 num_rows: 2500,
307 identifier: 4,
308 },
309 ];
310
311 let expected_partition_num = 2;
313 let result =
314 ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num);
315 let expected = vec![
316 vec![PartitionRange {
317 start: Timestamp::new(30, TimeUnit::Second),
318 end: Timestamp::new(40, TimeUnit::Second),
319 num_rows: 2500,
320 identifier: 4,
321 }],
322 vec![
323 PartitionRange {
324 start: Timestamp::new(10, TimeUnit::Second),
325 end: Timestamp::new(20, TimeUnit::Second),
326 num_rows: 200,
327 identifier: 2,
328 },
329 PartitionRange {
330 start: Timestamp::new(20, TimeUnit::Second),
331 end: Timestamp::new(30, TimeUnit::Second),
332 num_rows: 150,
333 identifier: 3,
334 },
335 PartitionRange {
336 start: Timestamp::new(0, TimeUnit::Second),
337 end: Timestamp::new(10, TimeUnit::Second),
338 num_rows: 100,
339 identifier: 1,
340 },
341 ],
342 ];
343 assert_eq!(result, expected);
344 }
345}