use std::sync::Arc;
use common_telemetry::debug;
use datafusion::config::ConfigOptions;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{DataFusionError, Result};
use store_api::region_engine::PartitionRange;
use table::table::scan::RegionScanExec;
pub struct ParallelizeScan;
impl PhysicalOptimizerRule for ParallelizeScan {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
Self::do_optimize(plan, config)
}
fn name(&self) -> &str {
"parallelize_scan"
}
fn schema_check(&self) -> bool {
true
}
}
impl ParallelizeScan {
fn do_optimize(
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let result = plan
.transform_down(|plan| {
if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
let ranges = region_scan_exec.get_partition_ranges();
let total_range_num = ranges.len();
let expected_partition_num = config.execution.target_partitions;
let partition_ranges =
Self::assign_partition_range(ranges, expected_partition_num);
debug!(
"Assign {total_range_num} ranges to {expected_partition_num} partitions"
);
let new_exec = region_scan_exec
.with_new_partitions(partition_ranges)
.map_err(|e| DataFusionError::External(e.into_inner()))?;
return Ok(Transformed::yes(Arc::new(new_exec)));
}
Ok(Transformed::no(plan))
})?
.data;
Ok(result)
}
fn assign_partition_range(
ranges: Vec<PartitionRange>,
expected_partition_num: usize,
) -> Vec<Vec<PartitionRange>> {
let actual_partition_num = expected_partition_num.min(ranges.len()).max(1);
let mut partition_ranges = vec![vec![]; actual_partition_num];
for (i, range) in ranges.into_iter().enumerate() {
let partition_idx = i % expected_partition_num;
partition_ranges[partition_idx].push(range);
}
partition_ranges
}
}
#[cfg(test)]
mod test {
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use super::*;
#[test]
fn test_assign_partition_range() {
let ranges = vec![
PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
num_rows: 100,
identifier: 1,
},
PartitionRange {
start: Timestamp::new(10, TimeUnit::Second),
end: Timestamp::new(20, TimeUnit::Second),
num_rows: 200,
identifier: 2,
},
PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
num_rows: 150,
identifier: 3,
},
PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
num_rows: 250,
identifier: 4,
},
];
let expected_partition_num = 2;
let result =
ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num);
let expected = vec![
vec![
PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
num_rows: 100,
identifier: 1,
},
PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
num_rows: 150,
identifier: 3,
},
],
vec![
PartitionRange {
start: Timestamp::new(10, TimeUnit::Second),
end: Timestamp::new(20, TimeUnit::Second),
num_rows: 200,
identifier: 2,
},
PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
num_rows: 250,
identifier: 4,
},
],
];
assert_eq!(result, expected);
let expected_partition_num = 5;
let result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num);
let expected = vec![
vec![PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
num_rows: 100,
identifier: 1,
}],
vec![PartitionRange {
start: Timestamp::new(10, TimeUnit::Second),
end: Timestamp::new(20, TimeUnit::Second),
num_rows: 200,
identifier: 2,
}],
vec![PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
num_rows: 150,
identifier: 3,
}],
vec![PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
num_rows: 250,
identifier: 4,
}],
];
assert_eq!(result, expected);
let result = ParallelizeScan::assign_partition_range(vec![], 5);
assert_eq!(result.len(), 1);
}
}