Skip to main content

flow/
df_optimizer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Datafusion optimizer for flow plan
16
17#![warn(unused)]
18
19use std::collections::HashSet;
20use std::sync::Arc;
21
22use common_error::ext::BoxedError;
23use common_telemetry::debug;
24use datafusion::config::ConfigOptions;
25use datafusion::error::DataFusionError;
26use datafusion::optimizer::analyzer::type_coercion::TypeCoercion;
27use datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
28use datafusion::optimizer::optimize_projections::OptimizeProjections;
29use datafusion::optimizer::simplify_expressions::SimplifyExpressions;
30use datafusion::optimizer::{Analyzer, AnalyzerRule, Optimizer, OptimizerContext};
31use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
32use query::QueryEngine;
33use query::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
34use query::parser::QueryLanguageParser;
35use query::query_engine::DefaultSerializer;
36use session::context::QueryContextRef;
37use snafu::ResultExt;
38/// note here we are using the `substrait_proto_df` crate from the `substrait` module and
39/// rename it to `substrait_proto`
40use substrait::DFLogicalSubstraitConvertor;
41
42use crate::adapter::FlownodeContext;
43use crate::error::{DatafusionSnafu, Error, ExternalSnafu, UnexpectedSnafu};
44use crate::plan::TypedPlan;
45
46// TODO(discord9): use `Analyzer` to manage rules if more `AnalyzerRule` is needed
47pub async fn apply_df_optimizer(
48    plan: datafusion_expr::LogicalPlan,
49    query_ctx: &QueryContextRef,
50) -> Result<datafusion_expr::LogicalPlan, Error> {
51    let cfg = query_ctx.create_config_options();
52    let analyzer = Analyzer::with_rules(vec![
53        Arc::new(CountWildcardToTimeIndexRule),
54        Arc::new(CheckGroupByRule::new()),
55        Arc::new(TypeCoercion::new()),
56    ]);
57    let plan = analyzer
58        .execute_and_check(plan, &cfg, |p, r| {
59            debug!("After apply rule {}, get plan: \n{:?}", r.name(), p);
60        })
61        .context(DatafusionSnafu {
62            context: "Fail to apply analyzer",
63        })?;
64
65    let mut ctx = OptimizerContext::new();
66    let scheduled_time = query::options::parse_scheduled_time_datetime(&query_ctx.extensions())
67        .map_err(BoxedError::new)
68        .context(ExternalSnafu)?;
69    if let Some(dt) = scheduled_time {
70        ctx = ctx.with_query_execution_start_time(dt);
71    }
72    let optimizer = Optimizer::with_rules(vec![
73        Arc::new(OptimizeProjections::new()),
74        Arc::new(CommonSubexprEliminate::new()),
75        Arc::new(SimplifyExpressions::new()),
76    ]);
77    let plan = optimizer
78        .optimize(plan, &ctx, |_, _| {})
79        .context(DatafusionSnafu {
80            context: "Fail to apply optimizer",
81        })?;
82
83    Ok(plan)
84}
85
86/// To reuse existing code for parse sql, the sql is first parsed into a datafusion logical plan,
87/// then to a substrait plan, and finally to a flow plan.
88pub async fn sql_to_flow_plan(
89    ctx: &mut FlownodeContext,
90    engine: &Arc<dyn QueryEngine>,
91    sql: &str,
92) -> Result<TypedPlan, Error> {
93    let query_ctx = ctx.query_context.clone().ok_or_else(|| {
94        UnexpectedSnafu {
95            reason: "Query context is missing",
96        }
97        .build()
98    })?;
99    let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
100        .map_err(BoxedError::new)
101        .context(ExternalSnafu)?;
102    let plan = engine
103        .planner()
104        .plan(&stmt, query_ctx.clone())
105        .await
106        .map_err(BoxedError::new)
107        .context(ExternalSnafu)?;
108
109    let opted_plan = apply_df_optimizer(plan, &query_ctx).await?;
110
111    // TODO(discord9): add df optimization
112    let sub_plan = DFLogicalSubstraitConvertor {}
113        .to_sub_plan(&opted_plan, DefaultSerializer)
114        .map_err(BoxedError::new)
115        .context(ExternalSnafu)?;
116
117    let flow_plan = TypedPlan::from_substrait_plan(ctx, &sub_plan).await?;
118
119    Ok(flow_plan)
120}
121
122/// This rule check all group by exprs, and make sure they are also in select clause in a aggr query
123#[derive(Debug)]
124struct CheckGroupByRule {}
125
126impl CheckGroupByRule {
127    pub fn new() -> Self {
128        Self {}
129    }
130}
131
132impl AnalyzerRule for CheckGroupByRule {
133    fn analyze(
134        &self,
135        plan: datafusion_expr::LogicalPlan,
136        _config: &ConfigOptions,
137    ) -> datafusion_common::Result<datafusion_expr::LogicalPlan> {
138        let transformed = plan
139            .transform_up_with_subqueries(check_group_by_analyzer)?
140            .data;
141        Ok(transformed)
142    }
143
144    fn name(&self) -> &str {
145        "check_groupby"
146    }
147}
148
149/// make sure everything in group by's expr is in select
150fn check_group_by_analyzer(
151    plan: datafusion_expr::LogicalPlan,
152) -> Result<Transformed<datafusion_expr::LogicalPlan>, DataFusionError> {
153    if let datafusion_expr::LogicalPlan::Projection(proj) = &plan
154        && let datafusion_expr::LogicalPlan::Aggregate(aggr) = proj.input.as_ref()
155    {
156        let mut found_column_used = FindColumn::new();
157        proj.expr
158            .iter()
159            .map(|i| i.visit(&mut found_column_used))
160            .count();
161        for expr in aggr.group_expr.iter() {
162            if !found_column_used
163                .names_for_alias
164                .contains(&expr.name_for_alias()?)
165            {
166                return Err(DataFusionError::Plan(format!(
167                    "Expect {} expr in group by also exist in select list, but select list only contain {:?}",
168                    expr.name_for_alias()?,
169                    found_column_used.names_for_alias
170                )));
171            }
172        }
173    }
174
175    Ok(Transformed::no(plan))
176}
177
178/// Find all column names in a plan
179#[derive(Debug, Default)]
180struct FindColumn {
181    names_for_alias: HashSet<String>,
182}
183
184impl FindColumn {
185    fn new() -> Self {
186        Default::default()
187    }
188}
189
190impl TreeNodeVisitor<'_> for FindColumn {
191    type Node = datafusion_expr::Expr;
192    fn f_down(
193        &mut self,
194        node: &datafusion_expr::Expr,
195    ) -> Result<TreeNodeRecursion, DataFusionError> {
196        if let datafusion_expr::Expr::Column(_) = node {
197            self.names_for_alias.insert(node.name_for_alias()?);
198        }
199        Ok(TreeNodeRecursion::Continue)
200    }
201}