1use std::any::Any;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use catalog::table_source::DfTableSourceProvider;
20use common_error::ext::BoxedError;
21use common_telemetry::tracing;
22use datafusion::common::DFSchema;
23use datafusion::execution::context::SessionState;
24use datafusion::sql::planner::PlannerContext;
25use datafusion_expr::{Expr as DfExpr, LogicalPlan};
26use datafusion_sql::planner::{ParserOptions, SqlToRel};
27use log_query::LogQuery;
28use promql_parser::parser::EvalStmt;
29use session::context::QueryContextRef;
30use snafu::ResultExt;
31use sql::ast::Expr as SqlExpr;
32use sql::statements::statement::Statement;
33
34use crate::error::{PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu};
35use crate::log_query::planner::LogQueryPlanner;
36use crate::parser::QueryStatement;
37use crate::promql::planner::PromPlanner;
38use crate::query_engine::{DefaultPlanDecoder, QueryEngineState};
39use crate::range_select::plan_rewrite::RangePlanRewriter;
40use crate::{DfContextProviderAdapter, QueryEngineContext};
41
42#[async_trait]
43pub trait LogicalPlanner: Send + Sync {
44 async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan>;
45
46 async fn plan_logs_query(
47 &self,
48 query: LogQuery,
49 query_ctx: QueryContextRef,
50 ) -> Result<LogicalPlan>;
51
52 fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan>;
53
54 fn as_any(&self) -> &dyn Any;
55}
56
57pub struct DfLogicalPlanner {
58 engine_state: Arc<QueryEngineState>,
59 session_state: SessionState,
60}
61
62impl DfLogicalPlanner {
63 pub fn new(engine_state: Arc<QueryEngineState>) -> Self {
64 let session_state = engine_state.session_state();
65 Self {
66 engine_state,
67 session_state,
68 }
69 }
70
71 #[tracing::instrument(skip_all)]
72 async fn plan_sql(&self, stmt: &Statement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
73 let df_stmt = stmt.try_into().context(SqlSnafu)?;
74
75 let table_provider = DfTableSourceProvider::new(
76 self.engine_state.catalog_manager().clone(),
77 self.engine_state.disallow_cross_catalog_query(),
78 query_ctx.clone(),
79 Arc::new(DefaultPlanDecoder::new(
80 self.session_state.clone(),
81 &query_ctx,
82 )?),
83 self.session_state
84 .config_options()
85 .sql_parser
86 .enable_ident_normalization,
87 );
88
89 let context_provider = DfContextProviderAdapter::try_new(
90 self.engine_state.clone(),
91 self.session_state.clone(),
92 Some(&df_stmt),
93 query_ctx.clone(),
94 )
95 .await?;
96
97 let config_options = self.session_state.config().options();
98 let parser_options = &config_options.sql_parser;
99 let parser_options = ParserOptions {
100 enable_ident_normalization: parser_options.enable_ident_normalization,
101 parse_float_as_decimal: parser_options.parse_float_as_decimal,
102 support_varchar_with_length: parser_options.support_varchar_with_length,
103 enable_options_value_normalization: parser_options.enable_options_value_normalization,
104 collect_spans: parser_options.collect_spans,
105 };
106
107 let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
108
109 let result = sql_to_rel
110 .statement_to_plan(df_stmt)
111 .context(PlanSqlSnafu)?;
112 common_telemetry::debug!("Logical planner, statement to plan result: {result}");
113 let plan = RangePlanRewriter::new(table_provider, query_ctx.clone())
114 .rewrite(result)
115 .await?;
116
117 let context = QueryEngineContext::new(self.session_state.clone(), query_ctx);
119 let plan = self
120 .engine_state
121 .optimize_by_extension_rules(plan, &context)?;
122 common_telemetry::debug!("Logical planner, optimize result: {plan}");
123
124 Ok(plan)
125 }
126
127 #[tracing::instrument(skip_all)]
129 pub(crate) async fn sql_to_expr(
130 &self,
131 sql: SqlExpr,
132 schema: &DFSchema,
133 normalize_ident: bool,
134 query_ctx: QueryContextRef,
135 ) -> Result<DfExpr> {
136 let context_provider = DfContextProviderAdapter::try_new(
137 self.engine_state.clone(),
138 self.session_state.clone(),
139 None,
140 query_ctx,
141 )
142 .await?;
143
144 let config_options = self.session_state.config().options();
145 let parser_options = &config_options.sql_parser;
146 let parser_options = ParserOptions {
147 enable_ident_normalization: normalize_ident,
148 parse_float_as_decimal: parser_options.parse_float_as_decimal,
149 support_varchar_with_length: parser_options.support_varchar_with_length,
150 enable_options_value_normalization: parser_options.enable_options_value_normalization,
151 collect_spans: parser_options.collect_spans,
152 };
153
154 let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
155
156 Ok(sql_to_rel.sql_to_expr(sql.into(), schema, &mut PlannerContext::new())?)
157 }
158
159 #[tracing::instrument(skip_all)]
160 async fn plan_pql(&self, stmt: &EvalStmt, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
161 let plan_decoder = Arc::new(DefaultPlanDecoder::new(
162 self.session_state.clone(),
163 &query_ctx,
164 )?);
165 let table_provider = DfTableSourceProvider::new(
166 self.engine_state.catalog_manager().clone(),
167 self.engine_state.disallow_cross_catalog_query(),
168 query_ctx,
169 plan_decoder,
170 self.session_state
171 .config_options()
172 .sql_parser
173 .enable_ident_normalization,
174 );
175 PromPlanner::stmt_to_plan(table_provider, stmt, &self.session_state)
176 .await
177 .map_err(BoxedError::new)
178 .context(QueryPlanSnafu)
179 }
180
181 #[tracing::instrument(skip_all)]
182 fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
183 Ok(self.engine_state.optimize_logical_plan(plan)?)
184 }
185}
186
187#[async_trait]
188impl LogicalPlanner for DfLogicalPlanner {
189 #[tracing::instrument(skip_all)]
190 async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
191 match stmt {
192 QueryStatement::Sql(stmt) => self.plan_sql(stmt, query_ctx).await,
193 QueryStatement::Promql(stmt) => self.plan_pql(stmt, query_ctx).await,
194 }
195 }
196
197 async fn plan_logs_query(
198 &self,
199 query: LogQuery,
200 query_ctx: QueryContextRef,
201 ) -> Result<LogicalPlan> {
202 let plan_decoder = Arc::new(DefaultPlanDecoder::new(
203 self.session_state.clone(),
204 &query_ctx,
205 )?);
206 let table_provider = DfTableSourceProvider::new(
207 self.engine_state.catalog_manager().clone(),
208 self.engine_state.disallow_cross_catalog_query(),
209 query_ctx,
210 plan_decoder,
211 self.session_state
212 .config_options()
213 .sql_parser
214 .enable_ident_normalization,
215 );
216
217 let mut planner = LogQueryPlanner::new(table_provider, self.session_state.clone());
218 planner
219 .query_to_plan(query)
220 .await
221 .map_err(BoxedError::new)
222 .context(QueryPlanSnafu)
223 }
224
225 fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
226 self.optimize_logical_plan(plan)
227 }
228
229 fn as_any(&self) -> &dyn Any {
230 self
231 }
232}