1#![warn(unused)]
18
19use std::collections::HashSet;
20use std::sync::Arc;
21
22use common_error::ext::BoxedError;
23use common_telemetry::debug;
24use datafusion::config::ConfigOptions;
25use datafusion::error::DataFusionError;
26use datafusion::optimizer::analyzer::type_coercion::TypeCoercion;
27use datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
28use datafusion::optimizer::optimize_projections::OptimizeProjections;
29use datafusion::optimizer::simplify_expressions::SimplifyExpressions;
30use datafusion::optimizer::{Analyzer, AnalyzerRule, Optimizer, OptimizerContext};
31use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
32use query::QueryEngine;
33use query::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
34use query::parser::QueryLanguageParser;
35use query::query_engine::DefaultSerializer;
36use session::context::QueryContextRef;
37use snafu::ResultExt;
38use substrait::DFLogicalSubstraitConvertor;
41
42use crate::adapter::FlownodeContext;
43use crate::error::{DatafusionSnafu, Error, ExternalSnafu, UnexpectedSnafu};
44use crate::plan::TypedPlan;
45
46pub async fn apply_df_optimizer(
48 plan: datafusion_expr::LogicalPlan,
49 query_ctx: &QueryContextRef,
50) -> Result<datafusion_expr::LogicalPlan, Error> {
51 let cfg = query_ctx.create_config_options();
52 let analyzer = Analyzer::with_rules(vec![
53 Arc::new(CountWildcardToTimeIndexRule),
54 Arc::new(CheckGroupByRule::new()),
55 Arc::new(TypeCoercion::new()),
56 ]);
57 let plan = analyzer
58 .execute_and_check(plan, &cfg, |p, r| {
59 debug!("After apply rule {}, get plan: \n{:?}", r.name(), p);
60 })
61 .context(DatafusionSnafu {
62 context: "Fail to apply analyzer",
63 })?;
64
65 let mut ctx = OptimizerContext::new();
66 let scheduled_time = query::options::parse_scheduled_time_datetime(&query_ctx.extensions())
67 .map_err(BoxedError::new)
68 .context(ExternalSnafu)?;
69 if let Some(dt) = scheduled_time {
70 ctx = ctx.with_query_execution_start_time(dt);
71 }
72 let optimizer = Optimizer::with_rules(vec![
73 Arc::new(OptimizeProjections::new()),
74 Arc::new(CommonSubexprEliminate::new()),
75 Arc::new(SimplifyExpressions::new()),
76 ]);
77 let plan = optimizer
78 .optimize(plan, &ctx, |_, _| {})
79 .context(DatafusionSnafu {
80 context: "Fail to apply optimizer",
81 })?;
82
83 Ok(plan)
84}
85
86pub async fn sql_to_flow_plan(
89 ctx: &mut FlownodeContext,
90 engine: &Arc<dyn QueryEngine>,
91 sql: &str,
92) -> Result<TypedPlan, Error> {
93 let query_ctx = ctx.query_context.clone().ok_or_else(|| {
94 UnexpectedSnafu {
95 reason: "Query context is missing",
96 }
97 .build()
98 })?;
99 let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
100 .map_err(BoxedError::new)
101 .context(ExternalSnafu)?;
102 let plan = engine
103 .planner()
104 .plan(&stmt, query_ctx.clone())
105 .await
106 .map_err(BoxedError::new)
107 .context(ExternalSnafu)?;
108
109 let opted_plan = apply_df_optimizer(plan, &query_ctx).await?;
110
111 let sub_plan = DFLogicalSubstraitConvertor {}
113 .to_sub_plan(&opted_plan, DefaultSerializer)
114 .map_err(BoxedError::new)
115 .context(ExternalSnafu)?;
116
117 let flow_plan = TypedPlan::from_substrait_plan(ctx, &sub_plan).await?;
118
119 Ok(flow_plan)
120}
121
122#[derive(Debug)]
124struct CheckGroupByRule {}
125
126impl CheckGroupByRule {
127 pub fn new() -> Self {
128 Self {}
129 }
130}
131
132impl AnalyzerRule for CheckGroupByRule {
133 fn analyze(
134 &self,
135 plan: datafusion_expr::LogicalPlan,
136 _config: &ConfigOptions,
137 ) -> datafusion_common::Result<datafusion_expr::LogicalPlan> {
138 let transformed = plan
139 .transform_up_with_subqueries(check_group_by_analyzer)?
140 .data;
141 Ok(transformed)
142 }
143
144 fn name(&self) -> &str {
145 "check_groupby"
146 }
147}
148
149fn check_group_by_analyzer(
151 plan: datafusion_expr::LogicalPlan,
152) -> Result<Transformed<datafusion_expr::LogicalPlan>, DataFusionError> {
153 if let datafusion_expr::LogicalPlan::Projection(proj) = &plan
154 && let datafusion_expr::LogicalPlan::Aggregate(aggr) = proj.input.as_ref()
155 {
156 let mut found_column_used = FindColumn::new();
157 proj.expr
158 .iter()
159 .map(|i| i.visit(&mut found_column_used))
160 .count();
161 for expr in aggr.group_expr.iter() {
162 if !found_column_used
163 .names_for_alias
164 .contains(&expr.name_for_alias()?)
165 {
166 return Err(DataFusionError::Plan(format!(
167 "Expect {} expr in group by also exist in select list, but select list only contain {:?}",
168 expr.name_for_alias()?,
169 found_column_used.names_for_alias
170 )));
171 }
172 }
173 }
174
175 Ok(Transformed::no(plan))
176}
177
178#[derive(Debug, Default)]
180struct FindColumn {
181 names_for_alias: HashSet<String>,
182}
183
184impl FindColumn {
185 fn new() -> Self {
186 Default::default()
187 }
188}
189
190impl TreeNodeVisitor<'_> for FindColumn {
191 type Node = datafusion_expr::Expr;
192 fn f_down(
193 &mut self,
194 node: &datafusion_expr::Expr,
195 ) -> Result<TreeNodeRecursion, DataFusionError> {
196 if let datafusion_expr::Expr::Column(_) = node {
197 self.names_for_alias.insert(node.name_for_alias()?);
198 }
199 Ok(TreeNodeRecursion::Continue)
200 }
201}