query/optimizer/
pass_distribution.rs1use std::sync::Arc;
16
17use datafusion::config::ConfigOptions;
18use datafusion::physical_optimizer::PhysicalOptimizerRule;
19use datafusion::physical_plan::ExecutionPlan;
20use datafusion_common::Result as DfResult;
21use datafusion_physical_expr::Distribution;
22
23use crate::dist_plan::MergeScanExec;
24
25#[derive(Debug)]
33pub struct PassDistribution;
34
35impl PhysicalOptimizerRule for PassDistribution {
36 fn optimize(
37 &self,
38 plan: Arc<dyn ExecutionPlan>,
39 config: &ConfigOptions,
40 ) -> DfResult<Arc<dyn ExecutionPlan>> {
41 Self::do_optimize(plan, config)
42 }
43
44 fn name(&self) -> &str {
45 "PassDistributionRule"
46 }
47
48 fn schema_check(&self) -> bool {
49 false
50 }
51}
52
53impl PassDistribution {
54 fn do_optimize(
55 plan: Arc<dyn ExecutionPlan>,
56 _config: &ConfigOptions,
57 ) -> DfResult<Arc<dyn ExecutionPlan>> {
58 Self::rewrite_with_distribution(plan, None)
60 }
61
62 fn rewrite_with_distribution(
64 plan: Arc<dyn ExecutionPlan>,
65 current_req: Option<Distribution>,
66 ) -> DfResult<Arc<dyn ExecutionPlan>> {
67 if let Some(merge_scan) = plan.as_any().downcast_ref::<MergeScanExec>()
69 && let Some(distribution) = current_req.as_ref()
70 && let Some(new_plan) = merge_scan.try_with_new_distribution(distribution.clone())
71 {
72 return Ok(Arc::new(new_plan) as _);
74 }
75
76 let children = plan.children();
78 if children.is_empty() {
79 return Ok(plan);
80 }
81
82 let required = plan.required_input_distribution();
83 let mut new_children = Vec::with_capacity(children.len());
84 for (idx, child) in children.into_iter().enumerate() {
85 let child_req = match required.get(idx) {
86 Some(Distribution::UnspecifiedDistribution) => None,
87 None => current_req.clone(),
88 Some(req) => Some(req.clone()),
89 };
90 let new_child = Self::rewrite_with_distribution(child.clone(), child_req)?;
91 new_children.push(new_child);
92 }
93
94 let unchanged = plan
96 .children()
97 .into_iter()
98 .zip(new_children.iter())
99 .all(|(old, new)| Arc::ptr_eq(old, new));
100 if unchanged {
101 Ok(plan)
102 } else {
103 plan.with_new_children(new_children)
104 }
105 }
106}