1use std::any::Any;
16use std::borrow::Cow;
17use std::sync::Arc;
18
19use async_trait::async_trait;
20use catalog::table_source::DfTableSourceProvider;
21use common_error::ext::BoxedError;
22use common_telemetry::tracing;
23use datafusion::common::{DFSchema, plan_err};
24use datafusion::execution::context::SessionState;
25use datafusion::sql::planner::PlannerContext;
26use datafusion_common::ToDFSchema;
27use datafusion_expr::{
28 Analyze, Explain, ExplainFormat, Expr as DfExpr, LogicalPlan, LogicalPlanBuilder, PlanType,
29 ToStringifiedPlan, col,
30};
31use datafusion_sql::planner::{ParserOptions, SqlToRel};
32use log_query::LogQuery;
33use promql_parser::parser::EvalStmt;
34use session::context::QueryContextRef;
35use snafu::{ResultExt, ensure};
36use sql::CteContent;
37use sql::ast::Expr as SqlExpr;
38use sql::statements::explain::ExplainStatement;
39use sql::statements::query::Query;
40use sql::statements::statement::Statement;
41use sql::statements::tql::Tql;
42
43use crate::error::{
44 CteColumnSchemaMismatchSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu,
45 UnimplementedSnafu,
46};
47use crate::log_query::planner::LogQueryPlanner;
48use crate::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser, QueryStatement};
49use crate::promql::planner::PromPlanner;
50use crate::query_engine::{DefaultPlanDecoder, QueryEngineState};
51use crate::range_select::plan_rewrite::RangePlanRewriter;
52use crate::{DfContextProviderAdapter, QueryEngineContext};
53
54#[async_trait]
55pub trait LogicalPlanner: Send + Sync {
56 async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan>;
57
58 async fn plan_logs_query(
59 &self,
60 query: LogQuery,
61 query_ctx: QueryContextRef,
62 ) -> Result<LogicalPlan>;
63
64 fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan>;
65
66 fn as_any(&self) -> &dyn Any;
67}
68
69pub struct DfLogicalPlanner {
70 engine_state: Arc<QueryEngineState>,
71 session_state: SessionState,
72}
73
74impl DfLogicalPlanner {
75 pub fn new(engine_state: Arc<QueryEngineState>) -> Self {
76 let session_state = engine_state.session_state();
77 Self {
78 engine_state,
79 session_state,
80 }
81 }
82
83 async fn explain_to_plan(
86 &self,
87 explain: &ExplainStatement,
88 query_ctx: QueryContextRef,
89 ) -> Result<LogicalPlan> {
90 let plan = self.plan_sql(&explain.statement, query_ctx).await?;
91 if matches!(plan, LogicalPlan::Explain(_)) {
92 return plan_err!("Nested EXPLAINs are not supported").context(PlanSqlSnafu);
93 }
94
95 let verbose = explain.verbose;
96 let analyze = explain.analyze;
97 let format = explain.format.map(|f| f.to_string());
98
99 let plan = Arc::new(plan);
100 let schema = LogicalPlan::explain_schema();
101 let schema = ToDFSchema::to_dfschema_ref(schema)?;
102
103 if verbose && format.is_some() {
104 return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported").context(PlanSqlSnafu);
105 }
106
107 if analyze {
108 Ok(LogicalPlan::Analyze(Analyze {
110 verbose,
111 input: plan,
112 schema,
113 }))
114 } else {
115 let stringified_plans = vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
116
117 let options = self.session_state.config().options();
119 let format = format.as_ref().unwrap_or(&options.explain.format);
120
121 let format: ExplainFormat = format.parse()?;
122
123 Ok(LogicalPlan::Explain(Explain {
124 verbose,
125 explain_format: format,
126 plan,
127 stringified_plans,
128 schema,
129 logical_optimization_succeeded: false,
130 }))
131 }
132 }
133
134 #[tracing::instrument(skip_all)]
135 #[async_recursion::async_recursion]
136 async fn plan_sql(&self, stmt: &Statement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
137 let mut planner_context = PlannerContext::new();
138 let mut stmt = Cow::Borrowed(stmt);
139 let mut is_tql_cte = false;
140
141 if let Statement::Explain(explain) = stmt.as_ref() {
143 return self.explain_to_plan(explain, query_ctx).await;
144 }
145
146 if self.has_hybrid_ctes(stmt.as_ref()) {
148 let stmt_owned = stmt.into_owned();
149 let mut query = match stmt_owned {
150 Statement::Query(query) => query.as_ref().clone(),
151 _ => unreachable!("has_hybrid_ctes should only return true for Query statements"),
152 };
153 self.plan_query_with_hybrid_ctes(&query, query_ctx.clone(), &mut planner_context)
154 .await?;
155
156 query.hybrid_cte = None;
158 stmt = Cow::Owned(Statement::Query(Box::new(query)));
159 is_tql_cte = true;
160 }
161
162 let mut df_stmt = stmt.as_ref().try_into().context(SqlSnafu)?;
163
164 if let datafusion::sql::parser::Statement::Statement(
166 box datafusion::sql::sqlparser::ast::Statement::Explain { .. },
167 ) = &mut df_stmt
168 {
169 UnimplementedSnafu {
170 operation: "EXPLAIN with FORMAT using raw datafusion planner",
171 }
172 .fail()?;
173 }
174
175 let table_provider = DfTableSourceProvider::new(
176 self.engine_state.catalog_manager().clone(),
177 self.engine_state.disallow_cross_catalog_query(),
178 query_ctx.clone(),
179 Arc::new(DefaultPlanDecoder::new(
180 self.session_state.clone(),
181 &query_ctx,
182 )?),
183 self.session_state
184 .config_options()
185 .sql_parser
186 .enable_ident_normalization,
187 );
188
189 let context_provider = DfContextProviderAdapter::try_new(
190 self.engine_state.clone(),
191 self.session_state.clone(),
192 Some(&df_stmt),
193 query_ctx.clone(),
194 )
195 .await?;
196
197 let config_options = self.session_state.config().options();
198 let parser_options = &config_options.sql_parser;
199 let parser_options = ParserOptions {
200 map_string_types_to_utf8view: false,
201 ..parser_options.into()
202 };
203
204 let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
205
206 let result = if is_tql_cte {
208 let Statement::Query(query) = stmt.into_owned() else {
209 unreachable!("is_tql_cte should only be true for Query statements");
210 };
211 let sqlparser_stmt =
212 datafusion::sql::sqlparser::ast::Statement::Query(Box::new(query.inner.into()));
213 sql_to_rel
214 .sql_statement_to_plan_with_context(sqlparser_stmt, &mut planner_context)
215 .context(PlanSqlSnafu)?
216 } else {
217 sql_to_rel
218 .statement_to_plan(df_stmt)
219 .context(PlanSqlSnafu)?
220 };
221
222 common_telemetry::debug!("Logical planner, statement to plan result: {result}");
223 let plan = RangePlanRewriter::new(table_provider, query_ctx.clone())
224 .rewrite(result)
225 .await?;
226
227 let context = QueryEngineContext::new(self.session_state.clone(), query_ctx);
229 let plan = self
230 .engine_state
231 .optimize_by_extension_rules(plan, &context)?;
232 common_telemetry::debug!("Logical planner, optimize result: {plan}");
233
234 Ok(plan)
235 }
236
237 #[tracing::instrument(skip_all)]
239 pub(crate) async fn sql_to_expr(
240 &self,
241 sql: SqlExpr,
242 schema: &DFSchema,
243 normalize_ident: bool,
244 query_ctx: QueryContextRef,
245 ) -> Result<DfExpr> {
246 let context_provider = DfContextProviderAdapter::try_new(
247 self.engine_state.clone(),
248 self.session_state.clone(),
249 None,
250 query_ctx,
251 )
252 .await?;
253
254 let config_options = self.session_state.config().options();
255 let parser_options = &config_options.sql_parser;
256 let parser_options: ParserOptions = ParserOptions {
257 map_string_types_to_utf8view: false,
258 enable_ident_normalization: normalize_ident,
259 ..parser_options.into()
260 };
261
262 let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
263
264 Ok(sql_to_rel.sql_to_expr(sql.into(), schema, &mut PlannerContext::new())?)
265 }
266
267 #[tracing::instrument(skip_all)]
268 async fn plan_pql(
269 &self,
270 stmt: &EvalStmt,
271 alias: Option<String>,
272 query_ctx: QueryContextRef,
273 ) -> Result<LogicalPlan> {
274 let plan_decoder = Arc::new(DefaultPlanDecoder::new(
275 self.session_state.clone(),
276 &query_ctx,
277 )?);
278 let table_provider = DfTableSourceProvider::new(
279 self.engine_state.catalog_manager().clone(),
280 self.engine_state.disallow_cross_catalog_query(),
281 query_ctx,
282 plan_decoder,
283 self.session_state
284 .config_options()
285 .sql_parser
286 .enable_ident_normalization,
287 );
288 PromPlanner::stmt_to_plan_with_alias(table_provider, stmt, alias, &self.engine_state)
289 .await
290 .map_err(BoxedError::new)
291 .context(QueryPlanSnafu)
292 }
293
294 #[tracing::instrument(skip_all)]
295 fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
296 Ok(self.engine_state.optimize_logical_plan(plan)?)
297 }
298
299 fn has_hybrid_ctes(&self, stmt: &Statement) -> bool {
301 if let Statement::Query(query) = stmt {
302 query
303 .hybrid_cte
304 .as_ref()
305 .map(|hybrid_cte| !hybrid_cte.cte_tables.is_empty())
306 .unwrap_or(false)
307 } else {
308 false
309 }
310 }
311
312 async fn plan_query_with_hybrid_ctes(
314 &self,
315 query: &Query,
316 query_ctx: QueryContextRef,
317 planner_context: &mut PlannerContext,
318 ) -> Result<()> {
319 let hybrid_cte = query.hybrid_cte.as_ref().unwrap();
320
321 for cte in &hybrid_cte.cte_tables {
322 match &cte.content {
323 CteContent::Tql(tql) => {
324 let mut logical_plan = self.tql_to_logical_plan(tql, query_ctx.clone()).await?;
326 if !cte.columns.is_empty() {
327 let schema = logical_plan.schema();
328 let schema_fields = schema.fields().to_vec();
329 ensure!(
330 schema_fields.len() == cte.columns.len(),
331 CteColumnSchemaMismatchSnafu {
332 cte_name: cte.name.value.clone(),
333 original: schema_fields
334 .iter()
335 .map(|field| field.name().clone())
336 .collect::<Vec<_>>(),
337 expected: cte
338 .columns
339 .iter()
340 .map(|column| column.to_string())
341 .collect::<Vec<_>>(),
342 }
343 );
344 let aliases = cte
345 .columns
346 .iter()
347 .zip(schema_fields.iter())
348 .map(|(column, field)| col(field.name()).alias(column.to_string()));
349 logical_plan = LogicalPlanBuilder::from(logical_plan)
350 .project(aliases)
351 .context(PlanSqlSnafu)?
352 .build()
353 .context(PlanSqlSnafu)?;
354 }
355
356 logical_plan = LogicalPlan::SubqueryAlias(
358 datafusion_expr::SubqueryAlias::try_new(
359 Arc::new(logical_plan),
360 cte.name.value.clone(),
361 )
362 .context(PlanSqlSnafu)?,
363 );
364
365 planner_context.insert_cte(&cte.name.value, logical_plan);
366 }
367 CteContent::Sql(_) => {
368 unreachable!("SQL CTEs should not be in hybrid_cte.cte_tables");
371 }
372 }
373 }
374
375 Ok(())
376 }
377
378 async fn tql_to_logical_plan(
380 &self,
381 tql: &Tql,
382 query_ctx: QueryContextRef,
383 ) -> Result<LogicalPlan> {
384 match tql {
385 Tql::Eval(eval) => {
386 let prom_query = PromQuery {
388 query: eval.query.clone(),
389 start: eval.start.clone(),
390 end: eval.end.clone(),
391 step: eval.step.clone(),
392 lookback: eval
393 .lookback
394 .clone()
395 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
396 alias: eval.alias.clone(),
397 };
398 let stmt = QueryLanguageParser::parse_promql(&prom_query, &query_ctx)?;
399
400 self.plan(&stmt, query_ctx).await
401 }
402 Tql::Explain(_) => UnimplementedSnafu {
403 operation: "TQL EXPLAIN in CTEs",
404 }
405 .fail(),
406 Tql::Analyze(_) => UnimplementedSnafu {
407 operation: "TQL ANALYZE in CTEs",
408 }
409 .fail(),
410 }
411 }
412}
413
414#[async_trait]
415impl LogicalPlanner for DfLogicalPlanner {
416 #[tracing::instrument(skip_all)]
417 async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
418 match stmt {
419 QueryStatement::Sql(stmt) => self.plan_sql(stmt, query_ctx).await,
420 QueryStatement::Promql(stmt, alias) => {
421 self.plan_pql(stmt, alias.clone(), query_ctx).await
422 }
423 }
424 }
425
426 async fn plan_logs_query(
427 &self,
428 query: LogQuery,
429 query_ctx: QueryContextRef,
430 ) -> Result<LogicalPlan> {
431 let plan_decoder = Arc::new(DefaultPlanDecoder::new(
432 self.session_state.clone(),
433 &query_ctx,
434 )?);
435 let table_provider = DfTableSourceProvider::new(
436 self.engine_state.catalog_manager().clone(),
437 self.engine_state.disallow_cross_catalog_query(),
438 query_ctx,
439 plan_decoder,
440 self.session_state
441 .config_options()
442 .sql_parser
443 .enable_ident_normalization,
444 );
445
446 let mut planner = LogQueryPlanner::new(table_provider, self.session_state.clone());
447 planner
448 .query_to_plan(query)
449 .await
450 .map_err(BoxedError::new)
451 .context(QueryPlanSnafu)
452 }
453
454 fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
455 self.optimize_logical_plan(plan)
456 }
457
458 fn as_any(&self) -> &dyn Any {
459 self
460 }
461}