query/dist_plan/analyzer/
fallback.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Fallback dist plan analyzer, which will only push down table scan node
16//! This is used when `PlanRewriter` produce errors when trying to rewrite the plan
17//! This is a temporary solution, and will be removed once we have a more robust plan rewriter
18//!
19
20use common_telemetry::debug;
21use datafusion::datasource::DefaultTableSource;
22use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
23use datafusion_expr::LogicalPlan;
24use table::metadata::TableType;
25use table::table::adapter::DfTableProviderAdapter;
26
27use crate::dist_plan::MergeScanLogicalPlan;
28
29/// FallbackPlanRewriter is a plan rewriter that will only push down table scan node
30/// This is used when `PlanRewriter` produce errors when trying to rewrite the plan
31/// This is a temporary solution, and will be removed once we have a more robust plan rewriter
32/// It will traverse the logical plan and rewrite table scan node to merge scan node
33#[derive(Debug, Clone, Default)]
34pub struct FallbackPlanRewriter;
35
36impl TreeNodeRewriter for FallbackPlanRewriter {
37    type Node = LogicalPlan;
38
39    fn f_down(
40        &mut self,
41        node: Self::Node,
42    ) -> datafusion_common::Result<datafusion_common::tree_node::Transformed<Self::Node>> {
43        if let LogicalPlan::TableScan(table_scan) = &node {
44            let partition_cols = if let Some(source) = table_scan
45                .source
46                .as_any()
47                .downcast_ref::<DefaultTableSource>()
48            {
49                if let Some(provider) = source
50                    .table_provider
51                    .as_any()
52                    .downcast_ref::<DfTableProviderAdapter>()
53                {
54                    if provider.table().table_type() == TableType::Base {
55                        let info = provider.table().table_info();
56                        let partition_key_indices = info.meta.partition_key_indices.clone();
57                        let schema = info.meta.schema.clone();
58                        let partition_cols = partition_key_indices
59                            .into_iter()
60                            .map(|index| schema.column_name_by_index(index).to_string())
61                            .collect::<Vec<String>>();
62                        debug!(
63                            "FallbackPlanRewriter: table {} has partition columns: {:?}",
64                            info.name, partition_cols
65                        );
66                        Some(partition_cols)
67                    } else {
68                        None
69                    }
70                } else {
71                    None
72                }
73            } else {
74                None
75            };
76            let node = MergeScanLogicalPlan::new(
77                node,
78                false,
79                // at this stage, the partition cols should be set
80                // treat it as non-partitioned if None
81                partition_cols.clone().unwrap_or_default(),
82            )
83            .into_logical_plan();
84            Ok(Transformed::yes(node))
85        } else {
86            Ok(Transformed::no(node))
87        }
88    }
89}