1use std::any::Any;
16use std::borrow::Cow;
17use std::collections::{HashMap, HashSet};
18use std::str::FromStr;
19use std::sync::Arc;
20
21use arrow_schema::DataType;
22use async_trait::async_trait;
23use catalog::table_source::DfTableSourceProvider;
24use common_error::ext::BoxedError;
25use common_telemetry::tracing;
26use datafusion::common::{DFSchema, plan_err};
27use datafusion::execution::context::SessionState;
28use datafusion::sql::planner::PlannerContext;
29use datafusion_common::ToDFSchema;
30use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
31use datafusion_expr::{
32 Analyze, Explain, ExplainFormat, Expr as DfExpr, LogicalPlan, LogicalPlanBuilder, PlanType,
33 ToStringifiedPlan, col,
34};
35use datafusion_sql::planner::{ParserOptions, SqlToRel};
36use log_query::LogQuery;
37use promql_parser::parser::EvalStmt;
38use session::context::QueryContextRef;
39use snafu::{ResultExt, ensure};
40use sql::CteContent;
41use sql::ast::Expr as SqlExpr;
42use sql::statements::explain::ExplainStatement;
43use sql::statements::query::Query;
44use sql::statements::statement::Statement;
45use sql::statements::tql::Tql;
46
47use crate::error::{
48 CteColumnSchemaMismatchSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu,
49 UnimplementedSnafu,
50};
51use crate::log_query::planner::LogQueryPlanner;
52use crate::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser, QueryStatement};
53use crate::promql::planner::PromPlanner;
54use crate::query_engine::{DefaultPlanDecoder, QueryEngineState};
55use crate::range_select::plan_rewrite::RangePlanRewriter;
56use crate::{DfContextProviderAdapter, QueryEngineContext};
57
58#[async_trait]
59pub trait LogicalPlanner: Send + Sync {
60 async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan>;
61
62 async fn plan_logs_query(
63 &self,
64 query: LogQuery,
65 query_ctx: QueryContextRef,
66 ) -> Result<LogicalPlan>;
67
68 fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan>;
69
70 fn as_any(&self) -> &dyn Any;
71}
72
73pub struct DfLogicalPlanner {
74 engine_state: Arc<QueryEngineState>,
75 session_state: SessionState,
76}
77
78impl DfLogicalPlanner {
79 pub fn new(engine_state: Arc<QueryEngineState>) -> Self {
80 let session_state = engine_state.session_state();
81 Self {
82 engine_state,
83 session_state,
84 }
85 }
86
87 async fn explain_to_plan(
90 &self,
91 explain: &ExplainStatement,
92 query_ctx: QueryContextRef,
93 ) -> Result<LogicalPlan> {
94 let plan = self.plan_sql(&explain.statement, query_ctx).await?;
95 if matches!(plan, LogicalPlan::Explain(_)) {
96 return plan_err!("Nested EXPLAINs are not supported").context(PlanSqlSnafu);
97 }
98
99 let verbose = explain.verbose;
100 let analyze = explain.analyze;
101 let format = explain.format.map(|f| f.to_string());
102
103 let plan = Arc::new(plan);
104 let schema = LogicalPlan::explain_schema();
105 let schema = ToDFSchema::to_dfschema_ref(schema)?;
106
107 if verbose && format.is_some() {
108 return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported").context(PlanSqlSnafu);
109 }
110
111 if analyze {
112 Ok(LogicalPlan::Analyze(Analyze {
114 verbose,
115 input: plan,
116 schema,
117 }))
118 } else {
119 let stringified_plans = vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
120
121 let options = self.session_state.config().options();
123 let format = format
124 .map(|x| ExplainFormat::from_str(&x))
125 .transpose()?
126 .unwrap_or_else(|| options.explain.format.clone());
127
128 Ok(LogicalPlan::Explain(Explain {
129 verbose,
130 explain_format: format,
131 plan,
132 stringified_plans,
133 schema,
134 logical_optimization_succeeded: false,
135 }))
136 }
137 }
138
139 #[tracing::instrument(skip_all)]
140 #[async_recursion::async_recursion]
141 async fn plan_sql(&self, stmt: &Statement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
142 let mut planner_context = PlannerContext::new();
143 let mut stmt = Cow::Borrowed(stmt);
144 let mut is_tql_cte = false;
145
146 if let Statement::Explain(explain) = stmt.as_ref() {
148 return self.explain_to_plan(explain, query_ctx).await;
149 }
150
151 if self.has_hybrid_ctes(stmt.as_ref()) {
153 let stmt_owned = stmt.into_owned();
154 let mut query = match stmt_owned {
155 Statement::Query(query) => query.as_ref().clone(),
156 _ => unreachable!("has_hybrid_ctes should only return true for Query statements"),
157 };
158 self.plan_query_with_hybrid_ctes(&query, query_ctx.clone(), &mut planner_context)
159 .await?;
160
161 query.hybrid_cte = None;
163 stmt = Cow::Owned(Statement::Query(Box::new(query)));
164 is_tql_cte = true;
165 }
166
167 let mut df_stmt = stmt.as_ref().try_into().context(SqlSnafu)?;
168
169 if let datafusion::sql::parser::Statement::Statement(
171 box datafusion::sql::sqlparser::ast::Statement::Explain { .. },
172 ) = &mut df_stmt
173 {
174 UnimplementedSnafu {
175 operation: "EXPLAIN with FORMAT using raw datafusion planner",
176 }
177 .fail()?;
178 }
179
180 let table_provider = DfTableSourceProvider::new(
181 self.engine_state.catalog_manager().clone(),
182 self.engine_state.disallow_cross_catalog_query(),
183 query_ctx.clone(),
184 Arc::new(DefaultPlanDecoder::new(
185 self.session_state.clone(),
186 &query_ctx,
187 )?),
188 self.session_state
189 .config_options()
190 .sql_parser
191 .enable_ident_normalization,
192 );
193
194 let context_provider = DfContextProviderAdapter::try_new(
195 self.engine_state.clone(),
196 self.session_state.clone(),
197 Some(&df_stmt),
198 query_ctx.clone(),
199 )
200 .await?;
201
202 let config_options = self.session_state.config().options();
203 let parser_options = &config_options.sql_parser;
204 let parser_options = ParserOptions {
205 map_string_types_to_utf8view: false,
206 ..parser_options.into()
207 };
208
209 let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
210
211 let result = if is_tql_cte {
213 let Statement::Query(query) = stmt.into_owned() else {
214 unreachable!("is_tql_cte should only be true for Query statements");
215 };
216 let sqlparser_stmt = sqlparser::ast::Statement::Query(Box::new(query.inner));
217 sql_to_rel
218 .sql_statement_to_plan_with_context(sqlparser_stmt, &mut planner_context)
219 .context(PlanSqlSnafu)?
220 } else {
221 sql_to_rel
222 .statement_to_plan(df_stmt)
223 .context(PlanSqlSnafu)?
224 };
225
226 common_telemetry::debug!("Logical planner, statement to plan result: {result}");
227 let plan = RangePlanRewriter::new(table_provider, query_ctx.clone())
228 .rewrite(result)
229 .await?;
230
231 let context = QueryEngineContext::new(self.session_state.clone(), query_ctx);
233 let plan = self
234 .engine_state
235 .optimize_by_extension_rules(plan, &context)?;
236 common_telemetry::debug!("Logical planner, optimize result: {plan}");
237
238 Ok(plan)
239 }
240
241 #[tracing::instrument(skip_all)]
243 pub(crate) async fn sql_to_expr(
244 &self,
245 sql: SqlExpr,
246 schema: &DFSchema,
247 normalize_ident: bool,
248 query_ctx: QueryContextRef,
249 ) -> Result<DfExpr> {
250 let context_provider = DfContextProviderAdapter::try_new(
251 self.engine_state.clone(),
252 self.session_state.clone(),
253 None,
254 query_ctx,
255 )
256 .await?;
257
258 let config_options = self.session_state.config().options();
259 let parser_options = &config_options.sql_parser;
260 let parser_options: ParserOptions = ParserOptions {
261 map_string_types_to_utf8view: false,
262 enable_ident_normalization: normalize_ident,
263 ..parser_options.into()
264 };
265
266 let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
267
268 Ok(sql_to_rel.sql_to_expr(sql, schema, &mut PlannerContext::new())?)
269 }
270
271 #[tracing::instrument(skip_all)]
272 async fn plan_pql(&self, stmt: &EvalStmt, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
273 let plan_decoder = Arc::new(DefaultPlanDecoder::new(
274 self.session_state.clone(),
275 &query_ctx,
276 )?);
277 let table_provider = DfTableSourceProvider::new(
278 self.engine_state.catalog_manager().clone(),
279 self.engine_state.disallow_cross_catalog_query(),
280 query_ctx,
281 plan_decoder,
282 self.session_state
283 .config_options()
284 .sql_parser
285 .enable_ident_normalization,
286 );
287 PromPlanner::stmt_to_plan(table_provider, stmt, &self.engine_state)
288 .await
289 .map_err(BoxedError::new)
290 .context(QueryPlanSnafu)
291 }
292
293 #[tracing::instrument(skip_all)]
294 fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
295 Ok(self.engine_state.optimize_logical_plan(plan)?)
296 }
297
298 fn has_hybrid_ctes(&self, stmt: &Statement) -> bool {
300 if let Statement::Query(query) = stmt {
301 query
302 .hybrid_cte
303 .as_ref()
304 .map(|hybrid_cte| !hybrid_cte.cte_tables.is_empty())
305 .unwrap_or(false)
306 } else {
307 false
308 }
309 }
310
311 async fn plan_query_with_hybrid_ctes(
313 &self,
314 query: &Query,
315 query_ctx: QueryContextRef,
316 planner_context: &mut PlannerContext,
317 ) -> Result<()> {
318 let hybrid_cte = query.hybrid_cte.as_ref().unwrap();
319
320 for cte in &hybrid_cte.cte_tables {
321 match &cte.content {
322 CteContent::Tql(tql) => {
323 let mut logical_plan = self.tql_to_logical_plan(tql, query_ctx.clone()).await?;
325 if !cte.columns.is_empty() {
326 let schema = logical_plan.schema();
327 let schema_fields = schema.fields().to_vec();
328 ensure!(
329 schema_fields.len() == cte.columns.len(),
330 CteColumnSchemaMismatchSnafu {
331 cte_name: cte.name.value.clone(),
332 original: schema_fields
333 .iter()
334 .map(|field| field.name().clone())
335 .collect::<Vec<_>>(),
336 expected: cte
337 .columns
338 .iter()
339 .map(|column| column.to_string())
340 .collect::<Vec<_>>(),
341 }
342 );
343 let aliases = cte
344 .columns
345 .iter()
346 .zip(schema_fields.iter())
347 .map(|(column, field)| col(field.name()).alias(column.to_string()));
348 logical_plan = LogicalPlanBuilder::from(logical_plan)
349 .project(aliases)
350 .context(PlanSqlSnafu)?
351 .build()
352 .context(PlanSqlSnafu)?;
353 }
354
355 logical_plan = LogicalPlan::SubqueryAlias(
357 datafusion_expr::SubqueryAlias::try_new(
358 Arc::new(logical_plan),
359 cte.name.value.clone(),
360 )
361 .context(PlanSqlSnafu)?,
362 );
363
364 planner_context.insert_cte(&cte.name.value, logical_plan);
365 }
366 CteContent::Sql(_) => {
367 unreachable!("SQL CTEs should not be in hybrid_cte.cte_tables");
370 }
371 }
372 }
373
374 Ok(())
375 }
376
377 async fn tql_to_logical_plan(
379 &self,
380 tql: &Tql,
381 query_ctx: QueryContextRef,
382 ) -> Result<LogicalPlan> {
383 match tql {
384 Tql::Eval(eval) => {
385 let prom_query = PromQuery {
387 query: eval.query.clone(),
388 start: eval.start.clone(),
389 end: eval.end.clone(),
390 step: eval.step.clone(),
391 lookback: eval
392 .lookback
393 .clone()
394 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
395 alias: eval.alias.clone(),
396 };
397 let stmt = QueryLanguageParser::parse_promql(&prom_query, &query_ctx)?;
398
399 self.plan(&stmt, query_ctx).await
400 }
401 Tql::Explain(_) => UnimplementedSnafu {
402 operation: "TQL EXPLAIN in CTEs",
403 }
404 .fail(),
405 Tql::Analyze(_) => UnimplementedSnafu {
406 operation: "TQL ANALYZE in CTEs",
407 }
408 .fail(),
409 }
410 }
411
412 fn extract_placeholder_cast_types(
422 plan: &LogicalPlan,
423 ) -> Result<HashMap<String, Option<DataType>>> {
424 let mut placeholder_types = HashMap::new();
425 let mut casted_placeholders = HashSet::new();
426
427 plan.apply(|node| {
428 for expr in node.expressions() {
429 let _ = expr.apply(|e| {
430 if let DfExpr::Cast(cast) = e
431 && let DfExpr::Placeholder(ph) = &*cast.expr
432 {
433 placeholder_types.insert(ph.id.clone(), Some(cast.data_type.clone()));
434 casted_placeholders.insert(ph.id.clone());
435 }
436
437 if let DfExpr::Placeholder(ph) = e
438 && !casted_placeholders.contains(&ph.id)
439 && !placeholder_types.contains_key(&ph.id)
440 {
441 placeholder_types.insert(ph.id.clone(), None);
442 }
443
444 Ok(TreeNodeRecursion::Continue)
445 });
446 }
447 Ok(TreeNodeRecursion::Continue)
448 })?;
449
450 Ok(placeholder_types)
451 }
452
453 pub fn get_inferred_parameter_types(
467 plan: &LogicalPlan,
468 ) -> Result<HashMap<String, Option<DataType>>> {
469 let param_types = plan.get_parameter_types().context(PlanSqlSnafu)?;
470
471 let has_none = param_types.values().any(|v| v.is_none());
472
473 if !has_none {
474 Ok(param_types)
475 } else {
476 let cast_types = Self::extract_placeholder_cast_types(plan)?;
477
478 let mut merged = param_types;
479
480 for (id, opt_type) in cast_types {
481 merged
482 .entry(id)
483 .and_modify(|existing| {
484 if existing.is_none() {
485 *existing = opt_type.clone();
486 }
487 })
488 .or_insert(opt_type);
489 }
490
491 Ok(merged)
492 }
493 }
494}
495
496#[async_trait]
497impl LogicalPlanner for DfLogicalPlanner {
498 #[tracing::instrument(skip_all)]
499 async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
500 match stmt {
501 QueryStatement::Sql(stmt) => self.plan_sql(stmt, query_ctx).await,
502 QueryStatement::Promql(stmt, _alias) => self.plan_pql(stmt, query_ctx).await,
503 }
504 }
505
506 async fn plan_logs_query(
507 &self,
508 query: LogQuery,
509 query_ctx: QueryContextRef,
510 ) -> Result<LogicalPlan> {
511 let plan_decoder = Arc::new(DefaultPlanDecoder::new(
512 self.session_state.clone(),
513 &query_ctx,
514 )?);
515 let table_provider = DfTableSourceProvider::new(
516 self.engine_state.catalog_manager().clone(),
517 self.engine_state.disallow_cross_catalog_query(),
518 query_ctx,
519 plan_decoder,
520 self.session_state
521 .config_options()
522 .sql_parser
523 .enable_ident_normalization,
524 );
525
526 let mut planner = LogQueryPlanner::new(table_provider, self.session_state.clone());
527 planner
528 .query_to_plan(query)
529 .await
530 .map_err(BoxedError::new)
531 .context(QueryPlanSnafu)
532 }
533
534 fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
535 self.optimize_logical_plan(plan)
536 }
537
538 fn as_any(&self) -> &dyn Any {
539 self
540 }
541}
542
543#[cfg(test)]
544mod tests {
545 use std::sync::Arc;
546
547 use arrow_schema::DataType;
548 use datatypes::prelude::ConcreteDataType;
549 use datatypes::schema::{ColumnSchema, Schema};
550 use session::context::QueryContext;
551 use table::metadata::{TableInfoBuilder, TableMetaBuilder};
552 use table::test_util::EmptyTable;
553
554 use super::*;
555 use crate::QueryEngineRef;
556 use crate::parser::QueryLanguageParser;
557
558 async fn create_test_engine() -> QueryEngineRef {
559 let columns = vec![
560 ColumnSchema::new("id", ConcreteDataType::int32_datatype(), false),
561 ColumnSchema::new("name", ConcreteDataType::string_datatype(), true),
562 ];
563 let schema = Arc::new(Schema::new(columns));
564 let table_meta = TableMetaBuilder::empty()
565 .schema(schema)
566 .primary_key_indices(vec![0])
567 .value_indices(vec![1])
568 .next_column_id(1024)
569 .build()
570 .unwrap();
571 let table_info = TableInfoBuilder::new("test", table_meta).build().unwrap();
572 let table = EmptyTable::from_table_info(&table_info);
573
574 crate::tests::new_query_engine_with_table(table)
575 }
576
577 async fn parse_sql_to_plan(sql: &str) -> LogicalPlan {
578 let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
579 let engine = create_test_engine().await;
580 engine
581 .planner()
582 .plan(&stmt, QueryContext::arc())
583 .await
584 .unwrap()
585 }
586
587 #[tokio::test]
588 async fn test_extract_placeholder_cast_types_multiple() {
589 let plan = parse_sql_to_plan(
590 "SELECT $1::INT, $2::TEXT, $3, $4::INTEGER FROM test WHERE $5::FLOAT > 0",
591 )
592 .await;
593 let types = DfLogicalPlanner::extract_placeholder_cast_types(&plan).unwrap();
594
595 assert_eq!(types.len(), 5);
596 assert_eq!(types.get("$1"), Some(&Some(DataType::Int32)));
597 assert_eq!(types.get("$2"), Some(&Some(DataType::Utf8)));
598 assert_eq!(types.get("$3"), Some(&None));
599 assert_eq!(types.get("$4"), Some(&Some(DataType::Int32)));
600 assert_eq!(types.get("$5"), Some(&Some(DataType::Float32)));
601 }
602
603 #[tokio::test]
604 async fn test_get_inferred_parameter_types_fallback_for_udf_args() {
605 let plan = parse_sql_to_plan(
607 "SELECT parse_ident($1), parse_ident($2::TEXT) FROM test WHERE id > $3",
608 )
609 .await;
610 let types = DfLogicalPlanner::get_inferred_parameter_types(&plan).unwrap();
611
612 assert_eq!(types.len(), 3);
613
614 let type_1 = types.get("$1").unwrap();
615 let type_2 = types.get("$2").unwrap();
616 let type_3 = types.get("$3").unwrap();
617
618 assert!(type_1.is_none(), "Expected $1 to be None");
619 assert_eq!(type_2, &Some(DataType::Utf8));
620 assert_eq!(type_3, &Some(DataType::Int32));
621 }
622}