query/optimizer/
remove_duplicate.rs1use std::sync::Arc;
16
17use datafusion::config::ConfigOptions;
18use datafusion::physical_optimizer::PhysicalOptimizerRule;
19use datafusion::physical_plan::ExecutionPlan;
20use datafusion::physical_plan::repartition::RepartitionExec;
21use datafusion_common::Result as DfResult;
22use datafusion_common::tree_node::{Transformed, TreeNode};
23
24#[derive(Debug)]
30pub struct RemoveDuplicate;
31
32impl PhysicalOptimizerRule for RemoveDuplicate {
33 fn optimize(
34 &self,
35 plan: Arc<dyn ExecutionPlan>,
36 _config: &ConfigOptions,
37 ) -> DfResult<Arc<dyn ExecutionPlan>> {
38 Self::do_optimize(plan)
39 }
40
41 fn name(&self) -> &str {
42 "RemoveDuplicateRule"
43 }
44
45 fn schema_check(&self) -> bool {
46 false
47 }
48}
49
50impl RemoveDuplicate {
51 fn do_optimize(plan: Arc<dyn ExecutionPlan>) -> DfResult<Arc<dyn ExecutionPlan>> {
52 let result = plan
53 .transform_down(|plan| {
54 if plan.as_any().is::<RepartitionExec>() {
55 let child = plan.children()[0].clone();
57 if child.as_any().type_id() == plan.as_any().type_id() {
58 let grand_child = child.children()[0].clone();
60 let new_plan = plan.with_new_children(vec![grand_child])?;
61 return Ok(Transformed::yes(new_plan));
62 }
63 }
64
65 Ok(Transformed::no(plan))
66 })?
67 .data;
68
69 Ok(result)
70 }
71}