query/optimizer/
remove_duplicate.rs1use std::sync::Arc;
16
17use datafusion::config::ConfigOptions;
18use datafusion::physical_optimizer::PhysicalOptimizerRule;
19use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
20use datafusion::physical_plan::repartition::RepartitionExec;
21use datafusion::physical_plan::ExecutionPlan;
22use datafusion_common::tree_node::{Transformed, TreeNode};
23use datafusion_common::Result as DfResult;
24
25#[derive(Debug)]
31pub struct RemoveDuplicate;
32
33impl PhysicalOptimizerRule for RemoveDuplicate {
34 fn optimize(
35 &self,
36 plan: Arc<dyn ExecutionPlan>,
37 _config: &ConfigOptions,
38 ) -> DfResult<Arc<dyn ExecutionPlan>> {
39 Self::do_optimize(plan)
40 }
41
42 fn name(&self) -> &str {
43 "RemoveDuplicateRule"
44 }
45
46 fn schema_check(&self) -> bool {
47 false
48 }
49}
50
51impl RemoveDuplicate {
52 fn do_optimize(plan: Arc<dyn ExecutionPlan>) -> DfResult<Arc<dyn ExecutionPlan>> {
53 let result = plan
54 .transform_down(|plan| {
55 if plan.as_any().is::<CoalesceBatchesExec>()
56 || plan.as_any().is::<RepartitionExec>()
57 {
58 let child = plan.children()[0].clone();
60 if child.as_any().type_id() == plan.as_any().type_id() {
61 let grand_child = child.children()[0].clone();
63 let new_plan = plan.with_new_children(vec![grand_child])?;
64 return Ok(Transformed::yes(new_plan));
65 }
66 }
67
68 Ok(Transformed::no(plan))
69 })?
70 .data;
71
72 Ok(result)
73 }
74}
75
76#[cfg(test)]
77mod test {
78 use std::sync::Arc;
79
80 use arrow_schema::Schema;
81 use datafusion::physical_plan::displayable;
82 use datafusion::physical_plan::empty::EmptyExec;
83 use datafusion_physical_expr::Partitioning;
84
85 use super::*;
86
87 #[test]
88 fn remove_coalesce_batches() {
89 let empty = Arc::new(EmptyExec::new(Arc::new(Schema::empty())));
90 let coalesce_batches = Arc::new(CoalesceBatchesExec::new(empty, 1024));
91 let another_coalesce_batches = Arc::new(CoalesceBatchesExec::new(coalesce_batches, 8192));
92
93 let optimized = RemoveDuplicate::do_optimize(another_coalesce_batches).unwrap();
94 let formatted = displayable(optimized.as_ref()).indent(true).to_string();
95 let expected = "CoalesceBatchesExec: target_batch_size=8192\
96 \n EmptyExec\n";
97
98 assert_eq!(expected, formatted);
99 }
100
101 #[test]
102 fn non_continuous_coalesce_batches() {
103 let empty = Arc::new(EmptyExec::new(Arc::new(Schema::empty())));
104 let coalesce_batches = Arc::new(CoalesceBatchesExec::new(empty, 1024));
105 let repartition = Arc::new(
106 RepartitionExec::try_new(coalesce_batches, Partitioning::UnknownPartitioning(1))
107 .unwrap(),
108 );
109 let another_coalesce_batches = Arc::new(CoalesceBatchesExec::new(repartition, 8192));
110
111 let optimized = RemoveDuplicate::do_optimize(another_coalesce_batches).unwrap();
112 let formatted = displayable(optimized.as_ref()).indent(true).to_string();
113 let expected = "CoalesceBatchesExec: target_batch_size=8192\
114 \n RepartitionExec: partitioning=UnknownPartitioning(1), input_partitions=1\
115 \n CoalesceBatchesExec: target_batch_size=1024\
116 \n EmptyExec\n";
117
118 assert_eq!(expected, formatted);
119 }
120}