Skip to main content

query/dist_plan/analyzer/
fallback.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Fallback dist plan analyzer, which will only push down table scan node
16//! This is used when `PlanRewriter` produce errors when trying to rewrite the plan
17//! This is a temporary solution, and will be removed once we have a more robust plan rewriter
18//!
19
20use std::collections::BTreeSet;
21
22use common_telemetry::debug;
23use datafusion::datasource::DefaultTableSource;
24use datafusion_common::Result as DfResult;
25use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
26use datafusion_expr::LogicalPlan;
27use table::metadata::TableType;
28use table::table::adapter::DfTableProviderAdapter;
29
30use crate::dist_plan::MergeScanLogicalPlan;
31use crate::dist_plan::analyzer::{AliasMapping, OTHER_PHY_PART_COL_PLACEHOLDER};
32
33/// FallbackPlanRewriter is a plan rewriter that will only push down table scan node
34/// This is used when `PlanRewriter` produce errors when trying to rewrite the plan
35/// This is a temporary solution, and will be removed once we have a more robust plan rewriter
36/// It will traverse the logical plan and rewrite table scan node to merge scan node
37#[derive(Debug, Clone, Default)]
38pub struct FallbackPlanRewriter;
39
40impl TreeNodeRewriter for FallbackPlanRewriter {
41    type Node = LogicalPlan;
42
43    fn f_down(
44        &mut self,
45        plan: Self::Node,
46    ) -> DfResult<datafusion_common::tree_node::Transformed<Self::Node>> {
47        if let LogicalPlan::TableScan(table_scan) = &plan {
48            let partition_cols = if let Some(source) = table_scan
49                .source
50                .as_any()
51                .downcast_ref::<DefaultTableSource>()
52            {
53                if let Some(provider) = source
54                    .table_provider
55                    .as_any()
56                    .downcast_ref::<DfTableProviderAdapter>()
57                {
58                    if provider.table().table_type() == TableType::Base {
59                        let info = provider.table().table_info();
60                        let partition_key_indices = info.meta.partition_key_indices.clone();
61                        let schema = info.meta.schema.clone();
62                        let partition_cols = partition_key_indices
63                            .iter()
64                            .map(|index| schema.column_name_by_index(*index).to_string())
65                            .collect::<Vec<String>>();
66                        debug!(
67                            "FallbackPlanRewriter: loaded table partition metadata, table: {}, table_id: {}, partition_key_indices: {:?}, partition_columns: {:?}",
68                            info.name,
69                            info.ident.table_id,
70                            info.meta.partition_key_indices,
71                            partition_cols,
72                        );
73                        Some(partition_cols
74                                .into_iter()
75                                .map(|c| {
76                                    if c == OTHER_PHY_PART_COL_PLACEHOLDER {
77                                        // for placeholder, just return a empty alias
78                                        return Ok((c.clone(), BTreeSet::new()));
79                                    }
80                                    let index =
81                                        plan.schema().index_of_column_by_name(None, &c).ok_or_else(|| {
82                                            datafusion_common::DataFusionError::Internal(
83                                                format!(
84                                                    "PlanRewriter: maybe_set_partitions: column {c} not found in schema of plan: {plan}"
85                                                ),
86                                            )
87                                        })?;
88                                    let column = plan.schema().columns().get(index).cloned().ok_or_else(|| {
89                                        datafusion_common::DataFusionError::Internal(format!(
90                                            "PlanRewriter: maybe_set_partitions: column index {index} out of bounds in schema of plan: {plan}"
91                                        ))
92                                    })?;
93                                    Ok((c.clone(), BTreeSet::from([column])))
94                                })
95                                .collect::<DfResult<AliasMapping>>()?)
96                    } else {
97                        None
98                    }
99                } else {
100                    None
101                }
102            } else {
103                None
104            };
105            let node = MergeScanLogicalPlan::new(
106                plan,
107                false,
108                // at this stage, the partition cols should be set
109                // treat it as non-partitioned if None
110                partition_cols.clone().unwrap_or_default(),
111            )
112            .into_logical_plan();
113            Ok(Transformed::yes(node))
114        } else {
115            Ok(Transformed::no(plan))
116        }
117    }
118}