1use std::any::Any;
16use std::borrow::Cow;
17use std::sync::Arc;
18
19use async_trait::async_trait;
20use catalog::table_source::DfTableSourceProvider;
21use common_error::ext::BoxedError;
22use common_telemetry::tracing;
23use datafusion::common::DFSchema;
24use datafusion::execution::context::SessionState;
25use datafusion::sql::planner::PlannerContext;
26use datafusion_expr::{Expr as DfExpr, LogicalPlan, LogicalPlanBuilder, col};
27use datafusion_sql::planner::{ParserOptions, SqlToRel};
28use log_query::LogQuery;
29use promql_parser::parser::EvalStmt;
30use session::context::QueryContextRef;
31use snafu::{ResultExt, ensure};
32use sql::CteContent;
33use sql::ast::Expr as SqlExpr;
34use sql::statements::query::Query;
35use sql::statements::statement::Statement;
36use sql::statements::tql::Tql;
37
38use crate::error::{
39 CteColumnSchemaMismatchSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu,
40 UnimplementedSnafu,
41};
42use crate::log_query::planner::LogQueryPlanner;
43use crate::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser, QueryStatement};
44use crate::promql::planner::PromPlanner;
45use crate::query_engine::{DefaultPlanDecoder, QueryEngineState};
46use crate::range_select::plan_rewrite::RangePlanRewriter;
47use crate::{DfContextProviderAdapter, QueryEngineContext};
48
49#[async_trait]
50pub trait LogicalPlanner: Send + Sync {
51 async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan>;
52
53 async fn plan_logs_query(
54 &self,
55 query: LogQuery,
56 query_ctx: QueryContextRef,
57 ) -> Result<LogicalPlan>;
58
59 fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan>;
60
61 fn as_any(&self) -> &dyn Any;
62}
63
64pub struct DfLogicalPlanner {
65 engine_state: Arc<QueryEngineState>,
66 session_state: SessionState,
67}
68
69impl DfLogicalPlanner {
70 pub fn new(engine_state: Arc<QueryEngineState>) -> Self {
71 let session_state = engine_state.session_state();
72 Self {
73 engine_state,
74 session_state,
75 }
76 }
77
78 #[tracing::instrument(skip_all)]
79 async fn plan_sql(&self, stmt: &Statement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
80 let mut planner_context = PlannerContext::new();
81 let mut stmt = Cow::Borrowed(stmt);
82 let mut is_tql_cte = false;
83
84 if self.has_hybrid_ctes(stmt.as_ref()) {
86 let stmt_owned = stmt.into_owned();
87 let mut query = match stmt_owned {
88 Statement::Query(query) => query.as_ref().clone(),
89 _ => unreachable!("has_hybrid_ctes should only return true for Query statements"),
90 };
91 self.plan_query_with_hybrid_ctes(&query, query_ctx.clone(), &mut planner_context)
92 .await?;
93
94 query.hybrid_cte = None;
96 stmt = Cow::Owned(Statement::Query(Box::new(query)));
97 is_tql_cte = true;
98 }
99
100 let mut df_stmt = stmt.as_ref().try_into().context(SqlSnafu)?;
101
102 if let datafusion::sql::parser::Statement::Statement(
104 box datafusion::sql::sqlparser::ast::Statement::Explain { format, .. },
105 ) = &mut df_stmt
106 {
107 format.take();
108 }
109
110 let table_provider = DfTableSourceProvider::new(
111 self.engine_state.catalog_manager().clone(),
112 self.engine_state.disallow_cross_catalog_query(),
113 query_ctx.clone(),
114 Arc::new(DefaultPlanDecoder::new(
115 self.session_state.clone(),
116 &query_ctx,
117 )?),
118 self.session_state
119 .config_options()
120 .sql_parser
121 .enable_ident_normalization,
122 );
123
124 let context_provider = DfContextProviderAdapter::try_new(
125 self.engine_state.clone(),
126 self.session_state.clone(),
127 Some(&df_stmt),
128 query_ctx.clone(),
129 )
130 .await?;
131
132 let config_options = self.session_state.config().options();
133 let parser_options = &config_options.sql_parser;
134 let parser_options = ParserOptions {
135 map_string_types_to_utf8view: false,
136 ..parser_options.into()
137 };
138
139 let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
140
141 let result = if is_tql_cte {
143 let Statement::Query(query) = stmt.into_owned() else {
144 unreachable!("is_tql_cte should only be true for Query statements");
145 };
146 let sqlparser_stmt =
147 datafusion::sql::sqlparser::ast::Statement::Query(Box::new(query.inner.into()));
148 sql_to_rel
149 .sql_statement_to_plan_with_context(sqlparser_stmt, &mut planner_context)
150 .context(PlanSqlSnafu)?
151 } else {
152 sql_to_rel
153 .statement_to_plan(df_stmt)
154 .context(PlanSqlSnafu)?
155 };
156
157 common_telemetry::debug!("Logical planner, statement to plan result: {result}");
158 let plan = RangePlanRewriter::new(table_provider, query_ctx.clone())
159 .rewrite(result)
160 .await?;
161
162 let context = QueryEngineContext::new(self.session_state.clone(), query_ctx);
164 let plan = self
165 .engine_state
166 .optimize_by_extension_rules(plan, &context)?;
167 common_telemetry::debug!("Logical planner, optimize result: {plan}");
168
169 Ok(plan)
170 }
171
172 #[tracing::instrument(skip_all)]
174 pub(crate) async fn sql_to_expr(
175 &self,
176 sql: SqlExpr,
177 schema: &DFSchema,
178 normalize_ident: bool,
179 query_ctx: QueryContextRef,
180 ) -> Result<DfExpr> {
181 let context_provider = DfContextProviderAdapter::try_new(
182 self.engine_state.clone(),
183 self.session_state.clone(),
184 None,
185 query_ctx,
186 )
187 .await?;
188
189 let config_options = self.session_state.config().options();
190 let parser_options = &config_options.sql_parser;
191 let parser_options: ParserOptions = ParserOptions {
192 map_string_types_to_utf8view: false,
193 enable_ident_normalization: normalize_ident,
194 ..parser_options.into()
195 };
196
197 let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
198
199 Ok(sql_to_rel.sql_to_expr(sql.into(), schema, &mut PlannerContext::new())?)
200 }
201
202 #[tracing::instrument(skip_all)]
203 async fn plan_pql(&self, stmt: &EvalStmt, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
204 let plan_decoder = Arc::new(DefaultPlanDecoder::new(
205 self.session_state.clone(),
206 &query_ctx,
207 )?);
208 let table_provider = DfTableSourceProvider::new(
209 self.engine_state.catalog_manager().clone(),
210 self.engine_state.disallow_cross_catalog_query(),
211 query_ctx,
212 plan_decoder,
213 self.session_state
214 .config_options()
215 .sql_parser
216 .enable_ident_normalization,
217 );
218 PromPlanner::stmt_to_plan(table_provider, stmt, &self.engine_state)
219 .await
220 .map_err(BoxedError::new)
221 .context(QueryPlanSnafu)
222 }
223
224 #[tracing::instrument(skip_all)]
225 fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
226 Ok(self.engine_state.optimize_logical_plan(plan)?)
227 }
228
229 fn has_hybrid_ctes(&self, stmt: &Statement) -> bool {
231 if let Statement::Query(query) = stmt {
232 query
233 .hybrid_cte
234 .as_ref()
235 .map(|hybrid_cte| !hybrid_cte.cte_tables.is_empty())
236 .unwrap_or(false)
237 } else {
238 false
239 }
240 }
241
242 async fn plan_query_with_hybrid_ctes(
244 &self,
245 query: &Query,
246 query_ctx: QueryContextRef,
247 planner_context: &mut PlannerContext,
248 ) -> Result<()> {
249 let hybrid_cte = query.hybrid_cte.as_ref().unwrap();
250
251 for cte in &hybrid_cte.cte_tables {
252 match &cte.content {
253 CteContent::Tql(tql) => {
254 let mut logical_plan = self.tql_to_logical_plan(tql, query_ctx.clone()).await?;
256 if !cte.columns.is_empty() {
257 let schema = logical_plan.schema();
258 let schema_fields = schema.fields().to_vec();
259 ensure!(
260 schema_fields.len() == cte.columns.len(),
261 CteColumnSchemaMismatchSnafu {
262 cte_name: cte.name.value.clone(),
263 original: schema_fields
264 .iter()
265 .map(|field| field.name().to_string())
266 .collect::<Vec<_>>(),
267 expected: cte
268 .columns
269 .iter()
270 .map(|column| column.to_string())
271 .collect::<Vec<_>>(),
272 }
273 );
274 let aliases = cte
275 .columns
276 .iter()
277 .zip(schema_fields.iter())
278 .map(|(column, field)| col(field.name()).alias(column.to_string()));
279 logical_plan = LogicalPlanBuilder::from(logical_plan)
280 .project(aliases)
281 .context(PlanSqlSnafu)?
282 .build()
283 .context(PlanSqlSnafu)?;
284 }
285
286 logical_plan = LogicalPlan::SubqueryAlias(
288 datafusion_expr::SubqueryAlias::try_new(
289 Arc::new(logical_plan),
290 cte.name.value.clone(),
291 )
292 .context(PlanSqlSnafu)?,
293 );
294
295 planner_context.insert_cte(&cte.name.value, logical_plan);
296 }
297 CteContent::Sql(_) => {
298 unreachable!("SQL CTEs should not be in hybrid_cte.cte_tables");
301 }
302 }
303 }
304
305 Ok(())
306 }
307
308 async fn tql_to_logical_plan(
310 &self,
311 tql: &Tql,
312 query_ctx: QueryContextRef,
313 ) -> Result<LogicalPlan> {
314 match tql {
315 Tql::Eval(eval) => {
316 let prom_query = PromQuery {
318 query: eval.query.clone(),
319 start: eval.start.clone(),
320 end: eval.end.clone(),
321 step: eval.step.clone(),
322 lookback: eval
323 .lookback
324 .clone()
325 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
326 };
327 let stmt = QueryLanguageParser::parse_promql(&prom_query, &query_ctx)?;
328 self.plan(&stmt, query_ctx).await
329 }
330 Tql::Explain(_) => UnimplementedSnafu {
331 operation: "TQL EXPLAIN in CTEs",
332 }
333 .fail(),
334 Tql::Analyze(_) => UnimplementedSnafu {
335 operation: "TQL ANALYZE in CTEs",
336 }
337 .fail(),
338 }
339 }
340}
341
342#[async_trait]
343impl LogicalPlanner for DfLogicalPlanner {
344 #[tracing::instrument(skip_all)]
345 async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
346 match stmt {
347 QueryStatement::Sql(stmt) => self.plan_sql(stmt, query_ctx).await,
348 QueryStatement::Promql(stmt) => self.plan_pql(stmt, query_ctx).await,
349 }
350 }
351
352 async fn plan_logs_query(
353 &self,
354 query: LogQuery,
355 query_ctx: QueryContextRef,
356 ) -> Result<LogicalPlan> {
357 let plan_decoder = Arc::new(DefaultPlanDecoder::new(
358 self.session_state.clone(),
359 &query_ctx,
360 )?);
361 let table_provider = DfTableSourceProvider::new(
362 self.engine_state.catalog_manager().clone(),
363 self.engine_state.disallow_cross_catalog_query(),
364 query_ctx,
365 plan_decoder,
366 self.session_state
367 .config_options()
368 .sql_parser
369 .enable_ident_normalization,
370 );
371
372 let mut planner = LogQueryPlanner::new(table_provider, self.session_state.clone());
373 planner
374 .query_to_plan(query)
375 .await
376 .map_err(BoxedError::new)
377 .context(QueryPlanSnafu)
378 }
379
380 fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
381 self.optimize_logical_plan(plan)
382 }
383
384 fn as_any(&self) -> &dyn Any {
385 self
386 }
387}