1use std::any::Any;
16use std::borrow::Cow;
17use std::str::FromStr;
18use std::sync::Arc;
19
20use async_trait::async_trait;
21use catalog::table_source::DfTableSourceProvider;
22use common_error::ext::BoxedError;
23use common_telemetry::tracing;
24use datafusion::common::{DFSchema, plan_err};
25use datafusion::execution::context::SessionState;
26use datafusion::sql::planner::PlannerContext;
27use datafusion_common::ToDFSchema;
28use datafusion_expr::{
29 Analyze, Explain, ExplainFormat, Expr as DfExpr, LogicalPlan, LogicalPlanBuilder, PlanType,
30 ToStringifiedPlan, col,
31};
32use datafusion_sql::planner::{ParserOptions, SqlToRel};
33use log_query::LogQuery;
34use promql_parser::parser::EvalStmt;
35use session::context::QueryContextRef;
36use snafu::{ResultExt, ensure};
37use sql::CteContent;
38use sql::ast::Expr as SqlExpr;
39use sql::statements::explain::ExplainStatement;
40use sql::statements::query::Query;
41use sql::statements::statement::Statement;
42use sql::statements::tql::Tql;
43
44use crate::error::{
45 CteColumnSchemaMismatchSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu,
46 UnimplementedSnafu,
47};
48use crate::log_query::planner::LogQueryPlanner;
49use crate::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser, QueryStatement};
50use crate::promql::planner::PromPlanner;
51use crate::query_engine::{DefaultPlanDecoder, QueryEngineState};
52use crate::range_select::plan_rewrite::RangePlanRewriter;
53use crate::{DfContextProviderAdapter, QueryEngineContext};
54
55#[async_trait]
56pub trait LogicalPlanner: Send + Sync {
57 async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan>;
58
59 async fn plan_logs_query(
60 &self,
61 query: LogQuery,
62 query_ctx: QueryContextRef,
63 ) -> Result<LogicalPlan>;
64
65 fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan>;
66
67 fn as_any(&self) -> &dyn Any;
68}
69
70pub struct DfLogicalPlanner {
71 engine_state: Arc<QueryEngineState>,
72 session_state: SessionState,
73}
74
75impl DfLogicalPlanner {
76 pub fn new(engine_state: Arc<QueryEngineState>) -> Self {
77 let session_state = engine_state.session_state();
78 Self {
79 engine_state,
80 session_state,
81 }
82 }
83
84 async fn explain_to_plan(
87 &self,
88 explain: &ExplainStatement,
89 query_ctx: QueryContextRef,
90 ) -> Result<LogicalPlan> {
91 let plan = self.plan_sql(&explain.statement, query_ctx).await?;
92 if matches!(plan, LogicalPlan::Explain(_)) {
93 return plan_err!("Nested EXPLAINs are not supported").context(PlanSqlSnafu);
94 }
95
96 let verbose = explain.verbose;
97 let analyze = explain.analyze;
98 let format = explain.format.map(|f| f.to_string());
99
100 let plan = Arc::new(plan);
101 let schema = LogicalPlan::explain_schema();
102 let schema = ToDFSchema::to_dfschema_ref(schema)?;
103
104 if verbose && format.is_some() {
105 return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported").context(PlanSqlSnafu);
106 }
107
108 if analyze {
109 Ok(LogicalPlan::Analyze(Analyze {
111 verbose,
112 input: plan,
113 schema,
114 }))
115 } else {
116 let stringified_plans = vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
117
118 let options = self.session_state.config().options();
120 let format = format
121 .map(|x| ExplainFormat::from_str(&x))
122 .transpose()?
123 .unwrap_or_else(|| options.explain.format.clone());
124
125 Ok(LogicalPlan::Explain(Explain {
126 verbose,
127 explain_format: format,
128 plan,
129 stringified_plans,
130 schema,
131 logical_optimization_succeeded: false,
132 }))
133 }
134 }
135
136 #[tracing::instrument(skip_all)]
137 #[async_recursion::async_recursion]
138 async fn plan_sql(&self, stmt: &Statement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
139 let mut planner_context = PlannerContext::new();
140 let mut stmt = Cow::Borrowed(stmt);
141 let mut is_tql_cte = false;
142
143 if let Statement::Explain(explain) = stmt.as_ref() {
145 return self.explain_to_plan(explain, query_ctx).await;
146 }
147
148 if self.has_hybrid_ctes(stmt.as_ref()) {
150 let stmt_owned = stmt.into_owned();
151 let mut query = match stmt_owned {
152 Statement::Query(query) => query.as_ref().clone(),
153 _ => unreachable!("has_hybrid_ctes should only return true for Query statements"),
154 };
155 self.plan_query_with_hybrid_ctes(&query, query_ctx.clone(), &mut planner_context)
156 .await?;
157
158 query.hybrid_cte = None;
160 stmt = Cow::Owned(Statement::Query(Box::new(query)));
161 is_tql_cte = true;
162 }
163
164 let mut df_stmt = stmt.as_ref().try_into().context(SqlSnafu)?;
165
166 if let datafusion::sql::parser::Statement::Statement(
168 box datafusion::sql::sqlparser::ast::Statement::Explain { .. },
169 ) = &mut df_stmt
170 {
171 UnimplementedSnafu {
172 operation: "EXPLAIN with FORMAT using raw datafusion planner",
173 }
174 .fail()?;
175 }
176
177 let table_provider = DfTableSourceProvider::new(
178 self.engine_state.catalog_manager().clone(),
179 self.engine_state.disallow_cross_catalog_query(),
180 query_ctx.clone(),
181 Arc::new(DefaultPlanDecoder::new(
182 self.session_state.clone(),
183 &query_ctx,
184 )?),
185 self.session_state
186 .config_options()
187 .sql_parser
188 .enable_ident_normalization,
189 );
190
191 let context_provider = DfContextProviderAdapter::try_new(
192 self.engine_state.clone(),
193 self.session_state.clone(),
194 Some(&df_stmt),
195 query_ctx.clone(),
196 )
197 .await?;
198
199 let config_options = self.session_state.config().options();
200 let parser_options = &config_options.sql_parser;
201 let parser_options = ParserOptions {
202 map_string_types_to_utf8view: false,
203 ..parser_options.into()
204 };
205
206 let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
207
208 let result = if is_tql_cte {
210 let Statement::Query(query) = stmt.into_owned() else {
211 unreachable!("is_tql_cte should only be true for Query statements");
212 };
213 let sqlparser_stmt = sqlparser::ast::Statement::Query(Box::new(query.inner));
214 sql_to_rel
215 .sql_statement_to_plan_with_context(sqlparser_stmt, &mut planner_context)
216 .context(PlanSqlSnafu)?
217 } else {
218 sql_to_rel
219 .statement_to_plan(df_stmt)
220 .context(PlanSqlSnafu)?
221 };
222
223 common_telemetry::debug!("Logical planner, statement to plan result: {result}");
224 let plan = RangePlanRewriter::new(table_provider, query_ctx.clone())
225 .rewrite(result)
226 .await?;
227
228 let context = QueryEngineContext::new(self.session_state.clone(), query_ctx);
230 let plan = self
231 .engine_state
232 .optimize_by_extension_rules(plan, &context)?;
233 common_telemetry::debug!("Logical planner, optimize result: {plan}");
234
235 Ok(plan)
236 }
237
238 #[tracing::instrument(skip_all)]
240 pub(crate) async fn sql_to_expr(
241 &self,
242 sql: SqlExpr,
243 schema: &DFSchema,
244 normalize_ident: bool,
245 query_ctx: QueryContextRef,
246 ) -> Result<DfExpr> {
247 let context_provider = DfContextProviderAdapter::try_new(
248 self.engine_state.clone(),
249 self.session_state.clone(),
250 None,
251 query_ctx,
252 )
253 .await?;
254
255 let config_options = self.session_state.config().options();
256 let parser_options = &config_options.sql_parser;
257 let parser_options: ParserOptions = ParserOptions {
258 map_string_types_to_utf8view: false,
259 enable_ident_normalization: normalize_ident,
260 ..parser_options.into()
261 };
262
263 let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
264
265 Ok(sql_to_rel.sql_to_expr(sql, schema, &mut PlannerContext::new())?)
266 }
267
268 #[tracing::instrument(skip_all)]
269 async fn plan_pql(
270 &self,
271 stmt: &EvalStmt,
272 alias: Option<String>,
273 query_ctx: QueryContextRef,
274 ) -> Result<LogicalPlan> {
275 let plan_decoder = Arc::new(DefaultPlanDecoder::new(
276 self.session_state.clone(),
277 &query_ctx,
278 )?);
279 let table_provider = DfTableSourceProvider::new(
280 self.engine_state.catalog_manager().clone(),
281 self.engine_state.disallow_cross_catalog_query(),
282 query_ctx,
283 plan_decoder,
284 self.session_state
285 .config_options()
286 .sql_parser
287 .enable_ident_normalization,
288 );
289 PromPlanner::stmt_to_plan_with_alias(table_provider, stmt, alias, &self.engine_state)
290 .await
291 .map_err(BoxedError::new)
292 .context(QueryPlanSnafu)
293 }
294
295 #[tracing::instrument(skip_all)]
296 fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
297 Ok(self.engine_state.optimize_logical_plan(plan)?)
298 }
299
300 fn has_hybrid_ctes(&self, stmt: &Statement) -> bool {
302 if let Statement::Query(query) = stmt {
303 query
304 .hybrid_cte
305 .as_ref()
306 .map(|hybrid_cte| !hybrid_cte.cte_tables.is_empty())
307 .unwrap_or(false)
308 } else {
309 false
310 }
311 }
312
313 async fn plan_query_with_hybrid_ctes(
315 &self,
316 query: &Query,
317 query_ctx: QueryContextRef,
318 planner_context: &mut PlannerContext,
319 ) -> Result<()> {
320 let hybrid_cte = query.hybrid_cte.as_ref().unwrap();
321
322 for cte in &hybrid_cte.cte_tables {
323 match &cte.content {
324 CteContent::Tql(tql) => {
325 let mut logical_plan = self.tql_to_logical_plan(tql, query_ctx.clone()).await?;
327 if !cte.columns.is_empty() {
328 let schema = logical_plan.schema();
329 let schema_fields = schema.fields().to_vec();
330 ensure!(
331 schema_fields.len() == cte.columns.len(),
332 CteColumnSchemaMismatchSnafu {
333 cte_name: cte.name.value.clone(),
334 original: schema_fields
335 .iter()
336 .map(|field| field.name().clone())
337 .collect::<Vec<_>>(),
338 expected: cte
339 .columns
340 .iter()
341 .map(|column| column.to_string())
342 .collect::<Vec<_>>(),
343 }
344 );
345 let aliases = cte
346 .columns
347 .iter()
348 .zip(schema_fields.iter())
349 .map(|(column, field)| col(field.name()).alias(column.to_string()));
350 logical_plan = LogicalPlanBuilder::from(logical_plan)
351 .project(aliases)
352 .context(PlanSqlSnafu)?
353 .build()
354 .context(PlanSqlSnafu)?;
355 }
356
357 logical_plan = LogicalPlan::SubqueryAlias(
359 datafusion_expr::SubqueryAlias::try_new(
360 Arc::new(logical_plan),
361 cte.name.value.clone(),
362 )
363 .context(PlanSqlSnafu)?,
364 );
365
366 planner_context.insert_cte(&cte.name.value, logical_plan);
367 }
368 CteContent::Sql(_) => {
369 unreachable!("SQL CTEs should not be in hybrid_cte.cte_tables");
372 }
373 }
374 }
375
376 Ok(())
377 }
378
379 async fn tql_to_logical_plan(
381 &self,
382 tql: &Tql,
383 query_ctx: QueryContextRef,
384 ) -> Result<LogicalPlan> {
385 match tql {
386 Tql::Eval(eval) => {
387 let prom_query = PromQuery {
389 query: eval.query.clone(),
390 start: eval.start.clone(),
391 end: eval.end.clone(),
392 step: eval.step.clone(),
393 lookback: eval
394 .lookback
395 .clone()
396 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
397 alias: eval.alias.clone(),
398 };
399 let stmt = QueryLanguageParser::parse_promql(&prom_query, &query_ctx)?;
400
401 self.plan(&stmt, query_ctx).await
402 }
403 Tql::Explain(_) => UnimplementedSnafu {
404 operation: "TQL EXPLAIN in CTEs",
405 }
406 .fail(),
407 Tql::Analyze(_) => UnimplementedSnafu {
408 operation: "TQL ANALYZE in CTEs",
409 }
410 .fail(),
411 }
412 }
413}
414
415#[async_trait]
416impl LogicalPlanner for DfLogicalPlanner {
417 #[tracing::instrument(skip_all)]
418 async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
419 match stmt {
420 QueryStatement::Sql(stmt) => self.plan_sql(stmt, query_ctx).await,
421 QueryStatement::Promql(stmt, alias) => {
422 self.plan_pql(stmt, alias.clone(), query_ctx).await
423 }
424 }
425 }
426
427 async fn plan_logs_query(
428 &self,
429 query: LogQuery,
430 query_ctx: QueryContextRef,
431 ) -> Result<LogicalPlan> {
432 let plan_decoder = Arc::new(DefaultPlanDecoder::new(
433 self.session_state.clone(),
434 &query_ctx,
435 )?);
436 let table_provider = DfTableSourceProvider::new(
437 self.engine_state.catalog_manager().clone(),
438 self.engine_state.disallow_cross_catalog_query(),
439 query_ctx,
440 plan_decoder,
441 self.session_state
442 .config_options()
443 .sql_parser
444 .enable_ident_normalization,
445 );
446
447 let mut planner = LogQueryPlanner::new(table_provider, self.session_state.clone());
448 planner
449 .query_to_plan(query)
450 .await
451 .map_err(BoxedError::new)
452 .context(QueryPlanSnafu)
453 }
454
455 fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
456 self.optimize_logical_plan(plan)
457 }
458
459 fn as_any(&self) -> &dyn Any {
460 self
461 }
462}