query/optimizer/
remove_duplicate.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use datafusion::config::ConfigOptions;
18use datafusion::physical_optimizer::PhysicalOptimizerRule;
19use datafusion::physical_plan::ExecutionPlan;
20use datafusion::physical_plan::repartition::RepartitionExec;
21use datafusion_common::Result as DfResult;
22use datafusion_common::tree_node::{Transformed, TreeNode};
23
24/// This is [PhysicalOptimizerRule] to remove duplicate physical plans such as two
25/// adjoining [RepartitionExec]. They won't have any effect
26/// if one runs right after another.
27///
28/// This rule is expected to be run in the final stage of the optimization process.
29#[derive(Debug)]
30pub struct RemoveDuplicate;
31
32impl PhysicalOptimizerRule for RemoveDuplicate {
33    fn optimize(
34        &self,
35        plan: Arc<dyn ExecutionPlan>,
36        _config: &ConfigOptions,
37    ) -> DfResult<Arc<dyn ExecutionPlan>> {
38        Self::do_optimize(plan)
39    }
40
41    fn name(&self) -> &str {
42        "RemoveDuplicateRule"
43    }
44
45    fn schema_check(&self) -> bool {
46        false
47    }
48}
49
50impl RemoveDuplicate {
51    fn do_optimize(plan: Arc<dyn ExecutionPlan>) -> DfResult<Arc<dyn ExecutionPlan>> {
52        let result = plan
53            .transform_down(|plan| {
54                if plan.as_any().is::<RepartitionExec>() {
55                    // check child
56                    let child = plan.children()[0].clone();
57                    if child.as_any().type_id() == plan.as_any().type_id() {
58                        // remove child
59                        let grand_child = child.children()[0].clone();
60                        let new_plan = plan.with_new_children(vec![grand_child])?;
61                        return Ok(Transformed::yes(new_plan));
62                    }
63                }
64
65                Ok(Transformed::no(plan))
66            })?
67            .data;
68
69        Ok(result)
70    }
71}