1use std::any::Any;
16use std::borrow::Cow;
17use std::str::FromStr;
18use std::sync::Arc;
19
20use async_trait::async_trait;
21use catalog::table_source::DfTableSourceProvider;
22use common_error::ext::BoxedError;
23use common_telemetry::tracing;
24use datafusion::common::{DFSchema, plan_err};
25use datafusion::execution::context::SessionState;
26use datafusion::sql::planner::PlannerContext;
27use datafusion_common::ToDFSchema;
28use datafusion_expr::{
29 Analyze, Explain, ExplainFormat, Expr as DfExpr, LogicalPlan, LogicalPlanBuilder, PlanType,
30 ToStringifiedPlan, col,
31};
32use datafusion_sql::planner::{ParserOptions, SqlToRel};
33use log_query::LogQuery;
34use promql_parser::parser::EvalStmt;
35use session::context::QueryContextRef;
36use snafu::{ResultExt, ensure};
37use sql::CteContent;
38use sql::ast::Expr as SqlExpr;
39use sql::statements::explain::ExplainStatement;
40use sql::statements::query::Query;
41use sql::statements::statement::Statement;
42use sql::statements::tql::Tql;
43
44use crate::error::{
45 CteColumnSchemaMismatchSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu,
46 UnimplementedSnafu,
47};
48use crate::log_query::planner::LogQueryPlanner;
49use crate::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser, QueryStatement};
50use crate::promql::planner::PromPlanner;
51use crate::query_engine::{DefaultPlanDecoder, QueryEngineState};
52use crate::range_select::plan_rewrite::RangePlanRewriter;
53use crate::{DfContextProviderAdapter, QueryEngineContext};
54
55#[async_trait]
56pub trait LogicalPlanner: Send + Sync {
57 async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan>;
58
59 async fn plan_logs_query(
60 &self,
61 query: LogQuery,
62 query_ctx: QueryContextRef,
63 ) -> Result<LogicalPlan>;
64
65 fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan>;
66
67 fn as_any(&self) -> &dyn Any;
68}
69
70pub struct DfLogicalPlanner {
71 engine_state: Arc<QueryEngineState>,
72 session_state: SessionState,
73}
74
75impl DfLogicalPlanner {
76 pub fn new(engine_state: Arc<QueryEngineState>) -> Self {
77 let session_state = engine_state.session_state();
78 Self {
79 engine_state,
80 session_state,
81 }
82 }
83
84 async fn explain_to_plan(
87 &self,
88 explain: &ExplainStatement,
89 query_ctx: QueryContextRef,
90 ) -> Result<LogicalPlan> {
91 let plan = self.plan_sql(&explain.statement, query_ctx).await?;
92 if matches!(plan, LogicalPlan::Explain(_)) {
93 return plan_err!("Nested EXPLAINs are not supported").context(PlanSqlSnafu);
94 }
95
96 let verbose = explain.verbose;
97 let analyze = explain.analyze;
98 let format = explain.format.map(|f| f.to_string());
99
100 let plan = Arc::new(plan);
101 let schema = LogicalPlan::explain_schema();
102 let schema = ToDFSchema::to_dfschema_ref(schema)?;
103
104 if verbose && format.is_some() {
105 return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported").context(PlanSqlSnafu);
106 }
107
108 if analyze {
109 Ok(LogicalPlan::Analyze(Analyze {
111 verbose,
112 input: plan,
113 schema,
114 }))
115 } else {
116 let stringified_plans = vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
117
118 let options = self.session_state.config().options();
120 let format = format
121 .map(|x| ExplainFormat::from_str(&x))
122 .transpose()?
123 .unwrap_or_else(|| options.explain.format.clone());
124
125 Ok(LogicalPlan::Explain(Explain {
126 verbose,
127 explain_format: format,
128 plan,
129 stringified_plans,
130 schema,
131 logical_optimization_succeeded: false,
132 }))
133 }
134 }
135
136 #[tracing::instrument(skip_all)]
137 #[async_recursion::async_recursion]
138 async fn plan_sql(&self, stmt: &Statement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
139 let mut planner_context = PlannerContext::new();
140 let mut stmt = Cow::Borrowed(stmt);
141 let mut is_tql_cte = false;
142
143 if let Statement::Explain(explain) = stmt.as_ref() {
145 return self.explain_to_plan(explain, query_ctx).await;
146 }
147
148 if self.has_hybrid_ctes(stmt.as_ref()) {
150 let stmt_owned = stmt.into_owned();
151 let mut query = match stmt_owned {
152 Statement::Query(query) => query.as_ref().clone(),
153 _ => unreachable!("has_hybrid_ctes should only return true for Query statements"),
154 };
155 self.plan_query_with_hybrid_ctes(&query, query_ctx.clone(), &mut planner_context)
156 .await?;
157
158 query.hybrid_cte = None;
160 stmt = Cow::Owned(Statement::Query(Box::new(query)));
161 is_tql_cte = true;
162 }
163
164 let mut df_stmt = stmt.as_ref().try_into().context(SqlSnafu)?;
165
166 if let datafusion::sql::parser::Statement::Statement(
168 box datafusion::sql::sqlparser::ast::Statement::Explain { .. },
169 ) = &mut df_stmt
170 {
171 UnimplementedSnafu {
172 operation: "EXPLAIN with FORMAT using raw datafusion planner",
173 }
174 .fail()?;
175 }
176
177 let table_provider = DfTableSourceProvider::new(
178 self.engine_state.catalog_manager().clone(),
179 self.engine_state.disallow_cross_catalog_query(),
180 query_ctx.clone(),
181 Arc::new(DefaultPlanDecoder::new(
182 self.session_state.clone(),
183 &query_ctx,
184 )?),
185 self.session_state
186 .config_options()
187 .sql_parser
188 .enable_ident_normalization,
189 );
190
191 let context_provider = DfContextProviderAdapter::try_new(
192 self.engine_state.clone(),
193 self.session_state.clone(),
194 Some(&df_stmt),
195 query_ctx.clone(),
196 )
197 .await?;
198
199 let config_options = self.session_state.config().options();
200 let parser_options = &config_options.sql_parser;
201 let parser_options = ParserOptions {
202 map_string_types_to_utf8view: false,
203 ..parser_options.into()
204 };
205
206 let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
207
208 let result = if is_tql_cte {
210 let Statement::Query(query) = stmt.into_owned() else {
211 unreachable!("is_tql_cte should only be true for Query statements");
212 };
213 let sqlparser_stmt = sqlparser::ast::Statement::Query(Box::new(query.inner));
214 sql_to_rel
215 .sql_statement_to_plan_with_context(sqlparser_stmt, &mut planner_context)
216 .context(PlanSqlSnafu)?
217 } else {
218 sql_to_rel
219 .statement_to_plan(df_stmt)
220 .context(PlanSqlSnafu)?
221 };
222
223 common_telemetry::debug!("Logical planner, statement to plan result: {result}");
224 let plan = RangePlanRewriter::new(table_provider, query_ctx.clone())
225 .rewrite(result)
226 .await?;
227
228 let context = QueryEngineContext::new(self.session_state.clone(), query_ctx);
230 let plan = self
231 .engine_state
232 .optimize_by_extension_rules(plan, &context)?;
233 common_telemetry::debug!("Logical planner, optimize result: {plan}");
234
235 Ok(plan)
236 }
237
238 #[tracing::instrument(skip_all)]
240 pub(crate) async fn sql_to_expr(
241 &self,
242 sql: SqlExpr,
243 schema: &DFSchema,
244 normalize_ident: bool,
245 query_ctx: QueryContextRef,
246 ) -> Result<DfExpr> {
247 let context_provider = DfContextProviderAdapter::try_new(
248 self.engine_state.clone(),
249 self.session_state.clone(),
250 None,
251 query_ctx,
252 )
253 .await?;
254
255 let config_options = self.session_state.config().options();
256 let parser_options = &config_options.sql_parser;
257 let parser_options: ParserOptions = ParserOptions {
258 map_string_types_to_utf8view: false,
259 enable_ident_normalization: normalize_ident,
260 ..parser_options.into()
261 };
262
263 let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
264
265 Ok(sql_to_rel.sql_to_expr(sql, schema, &mut PlannerContext::new())?)
266 }
267
268 #[tracing::instrument(skip_all)]
269 async fn plan_pql(&self, stmt: &EvalStmt, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
270 let plan_decoder = Arc::new(DefaultPlanDecoder::new(
271 self.session_state.clone(),
272 &query_ctx,
273 )?);
274 let table_provider = DfTableSourceProvider::new(
275 self.engine_state.catalog_manager().clone(),
276 self.engine_state.disallow_cross_catalog_query(),
277 query_ctx,
278 plan_decoder,
279 self.session_state
280 .config_options()
281 .sql_parser
282 .enable_ident_normalization,
283 );
284 PromPlanner::stmt_to_plan(table_provider, stmt, &self.engine_state)
285 .await
286 .map_err(BoxedError::new)
287 .context(QueryPlanSnafu)
288 }
289
290 #[tracing::instrument(skip_all)]
291 fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
292 Ok(self.engine_state.optimize_logical_plan(plan)?)
293 }
294
295 fn has_hybrid_ctes(&self, stmt: &Statement) -> bool {
297 if let Statement::Query(query) = stmt {
298 query
299 .hybrid_cte
300 .as_ref()
301 .map(|hybrid_cte| !hybrid_cte.cte_tables.is_empty())
302 .unwrap_or(false)
303 } else {
304 false
305 }
306 }
307
308 async fn plan_query_with_hybrid_ctes(
310 &self,
311 query: &Query,
312 query_ctx: QueryContextRef,
313 planner_context: &mut PlannerContext,
314 ) -> Result<()> {
315 let hybrid_cte = query.hybrid_cte.as_ref().unwrap();
316
317 for cte in &hybrid_cte.cte_tables {
318 match &cte.content {
319 CteContent::Tql(tql) => {
320 let mut logical_plan = self.tql_to_logical_plan(tql, query_ctx.clone()).await?;
322 if !cte.columns.is_empty() {
323 let schema = logical_plan.schema();
324 let schema_fields = schema.fields().to_vec();
325 ensure!(
326 schema_fields.len() == cte.columns.len(),
327 CteColumnSchemaMismatchSnafu {
328 cte_name: cte.name.value.clone(),
329 original: schema_fields
330 .iter()
331 .map(|field| field.name().clone())
332 .collect::<Vec<_>>(),
333 expected: cte
334 .columns
335 .iter()
336 .map(|column| column.to_string())
337 .collect::<Vec<_>>(),
338 }
339 );
340 let aliases = cte
341 .columns
342 .iter()
343 .zip(schema_fields.iter())
344 .map(|(column, field)| col(field.name()).alias(column.to_string()));
345 logical_plan = LogicalPlanBuilder::from(logical_plan)
346 .project(aliases)
347 .context(PlanSqlSnafu)?
348 .build()
349 .context(PlanSqlSnafu)?;
350 }
351
352 logical_plan = LogicalPlan::SubqueryAlias(
354 datafusion_expr::SubqueryAlias::try_new(
355 Arc::new(logical_plan),
356 cte.name.value.clone(),
357 )
358 .context(PlanSqlSnafu)?,
359 );
360
361 planner_context.insert_cte(&cte.name.value, logical_plan);
362 }
363 CteContent::Sql(_) => {
364 unreachable!("SQL CTEs should not be in hybrid_cte.cte_tables");
367 }
368 }
369 }
370
371 Ok(())
372 }
373
374 async fn tql_to_logical_plan(
376 &self,
377 tql: &Tql,
378 query_ctx: QueryContextRef,
379 ) -> Result<LogicalPlan> {
380 match tql {
381 Tql::Eval(eval) => {
382 let prom_query = PromQuery {
384 query: eval.query.clone(),
385 start: eval.start.clone(),
386 end: eval.end.clone(),
387 step: eval.step.clone(),
388 lookback: eval
389 .lookback
390 .clone()
391 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
392 alias: eval.alias.clone(),
393 };
394 let stmt = QueryLanguageParser::parse_promql(&prom_query, &query_ctx)?;
395
396 self.plan(&stmt, query_ctx).await
397 }
398 Tql::Explain(_) => UnimplementedSnafu {
399 operation: "TQL EXPLAIN in CTEs",
400 }
401 .fail(),
402 Tql::Analyze(_) => UnimplementedSnafu {
403 operation: "TQL ANALYZE in CTEs",
404 }
405 .fail(),
406 }
407 }
408}
409
410#[async_trait]
411impl LogicalPlanner for DfLogicalPlanner {
412 #[tracing::instrument(skip_all)]
413 async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
414 match stmt {
415 QueryStatement::Sql(stmt) => self.plan_sql(stmt, query_ctx).await,
416 QueryStatement::Promql(stmt, _alias) => self.plan_pql(stmt, query_ctx).await,
417 }
418 }
419
420 async fn plan_logs_query(
421 &self,
422 query: LogQuery,
423 query_ctx: QueryContextRef,
424 ) -> Result<LogicalPlan> {
425 let plan_decoder = Arc::new(DefaultPlanDecoder::new(
426 self.session_state.clone(),
427 &query_ctx,
428 )?);
429 let table_provider = DfTableSourceProvider::new(
430 self.engine_state.catalog_manager().clone(),
431 self.engine_state.disallow_cross_catalog_query(),
432 query_ctx,
433 plan_decoder,
434 self.session_state
435 .config_options()
436 .sql_parser
437 .enable_ident_normalization,
438 );
439
440 let mut planner = LogQueryPlanner::new(table_provider, self.session_state.clone());
441 planner
442 .query_to_plan(query)
443 .await
444 .map_err(BoxedError::new)
445 .context(QueryPlanSnafu)
446 }
447
448 fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
449 self.optimize_logical_plan(plan)
450 }
451
452 fn as_any(&self) -> &dyn Any {
453 self
454 }
455}