1use std::any::Any;
16use std::borrow::Cow;
17use std::sync::Arc;
18
19use async_trait::async_trait;
20use catalog::table_source::DfTableSourceProvider;
21use common_error::ext::BoxedError;
22use common_telemetry::tracing;
23use datafusion::common::DFSchema;
24use datafusion::execution::context::SessionState;
25use datafusion::sql::planner::PlannerContext;
26use datafusion_expr::{col, Expr as DfExpr, LogicalPlan, LogicalPlanBuilder};
27use datafusion_sql::planner::{ParserOptions, SqlToRel};
28use log_query::LogQuery;
29use promql_parser::parser::EvalStmt;
30use session::context::QueryContextRef;
31use snafu::{ensure, ResultExt};
32use sql::ast::Expr as SqlExpr;
33use sql::statements::query::Query;
34use sql::statements::statement::Statement;
35use sql::statements::tql::Tql;
36use sql::CteContent;
37
38use crate::error::{
39 CteColumnSchemaMismatchSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu,
40 UnimplementedSnafu,
41};
42use crate::log_query::planner::LogQueryPlanner;
43use crate::parser::{PromQuery, QueryLanguageParser, QueryStatement, DEFAULT_LOOKBACK_STRING};
44use crate::promql::planner::PromPlanner;
45use crate::query_engine::{DefaultPlanDecoder, QueryEngineState};
46use crate::range_select::plan_rewrite::RangePlanRewriter;
47use crate::{DfContextProviderAdapter, QueryEngineContext};
48
49#[async_trait]
50pub trait LogicalPlanner: Send + Sync {
51 async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan>;
52
53 async fn plan_logs_query(
54 &self,
55 query: LogQuery,
56 query_ctx: QueryContextRef,
57 ) -> Result<LogicalPlan>;
58
59 fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan>;
60
61 fn as_any(&self) -> &dyn Any;
62}
63
64pub struct DfLogicalPlanner {
65 engine_state: Arc<QueryEngineState>,
66 session_state: SessionState,
67}
68
69impl DfLogicalPlanner {
70 pub fn new(engine_state: Arc<QueryEngineState>) -> Self {
71 let session_state = engine_state.session_state();
72 Self {
73 engine_state,
74 session_state,
75 }
76 }
77
78 #[tracing::instrument(skip_all)]
79 async fn plan_sql(&self, stmt: &Statement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
80 let mut planner_context = PlannerContext::new();
81 let mut stmt = Cow::Borrowed(stmt);
82 let mut is_tql_cte = false;
83
84 if self.has_hybrid_ctes(stmt.as_ref()) {
86 let stmt_owned = stmt.into_owned();
87 let mut query = match stmt_owned {
88 Statement::Query(query) => query.as_ref().clone(),
89 _ => unreachable!("has_hybrid_ctes should only return true for Query statements"),
90 };
91 self.plan_query_with_hybrid_ctes(&query, query_ctx.clone(), &mut planner_context)
92 .await?;
93
94 query.hybrid_cte = None;
96 stmt = Cow::Owned(Statement::Query(Box::new(query)));
97 is_tql_cte = true;
98 }
99
100 let mut df_stmt = stmt.as_ref().try_into().context(SqlSnafu)?;
101
102 if let datafusion::sql::parser::Statement::Statement(
104 box datafusion::sql::sqlparser::ast::Statement::Explain { format, .. },
105 ) = &mut df_stmt
106 {
107 format.take();
108 }
109
110 let table_provider = DfTableSourceProvider::new(
111 self.engine_state.catalog_manager().clone(),
112 self.engine_state.disallow_cross_catalog_query(),
113 query_ctx.clone(),
114 Arc::new(DefaultPlanDecoder::new(
115 self.session_state.clone(),
116 &query_ctx,
117 )?),
118 self.session_state
119 .config_options()
120 .sql_parser
121 .enable_ident_normalization,
122 );
123
124 let context_provider = DfContextProviderAdapter::try_new(
125 self.engine_state.clone(),
126 self.session_state.clone(),
127 Some(&df_stmt),
128 query_ctx.clone(),
129 )
130 .await?;
131
132 let config_options = self.session_state.config().options();
133 let parser_options = &config_options.sql_parser;
134 let parser_options = ParserOptions {
135 map_string_types_to_utf8view: false,
136 ..parser_options.into()
137 };
138
139 let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
140
141 let result = if is_tql_cte {
143 let Statement::Query(query) = stmt.into_owned() else {
144 unreachable!("is_tql_cte should only be true for Query statements");
145 };
146 let sqlparser_stmt =
147 datafusion::sql::sqlparser::ast::Statement::Query(Box::new(query.inner.into()));
148 sql_to_rel
149 .sql_statement_to_plan_with_context(sqlparser_stmt, &mut planner_context)
150 .context(PlanSqlSnafu)?
151 } else {
152 sql_to_rel
153 .statement_to_plan(df_stmt)
154 .context(PlanSqlSnafu)?
155 };
156
157 common_telemetry::debug!("Logical planner, statement to plan result: {result}");
158 let plan = RangePlanRewriter::new(table_provider, query_ctx.clone())
159 .rewrite(result)
160 .await?;
161
162 let context = QueryEngineContext::new(self.session_state.clone(), query_ctx);
164 let plan = self
165 .engine_state
166 .optimize_by_extension_rules(plan, &context)?;
167 common_telemetry::debug!("Logical planner, optimize result: {plan}");
168
169 Ok(plan)
170 }
171
172 #[tracing::instrument(skip_all)]
174 pub(crate) async fn sql_to_expr(
175 &self,
176 sql: SqlExpr,
177 schema: &DFSchema,
178 normalize_ident: bool,
179 query_ctx: QueryContextRef,
180 ) -> Result<DfExpr> {
181 let context_provider = DfContextProviderAdapter::try_new(
182 self.engine_state.clone(),
183 self.session_state.clone(),
184 None,
185 query_ctx,
186 )
187 .await?;
188
189 let config_options = self.session_state.config().options();
190 let parser_options = &config_options.sql_parser;
191 let parser_options: ParserOptions = ParserOptions {
192 map_string_types_to_utf8view: false,
193 enable_ident_normalization: normalize_ident,
194 ..parser_options.into()
195 };
196
197 let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
198
199 Ok(sql_to_rel.sql_to_expr(sql.into(), schema, &mut PlannerContext::new())?)
200 }
201
202 #[tracing::instrument(skip_all)]
203 async fn plan_pql(&self, stmt: &EvalStmt, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
204 let plan_decoder = Arc::new(DefaultPlanDecoder::new(
205 self.session_state.clone(),
206 &query_ctx,
207 )?);
208 let table_provider = DfTableSourceProvider::new(
209 self.engine_state.catalog_manager().clone(),
210 self.engine_state.disallow_cross_catalog_query(),
211 query_ctx,
212 plan_decoder,
213 self.session_state
214 .config_options()
215 .sql_parser
216 .enable_ident_normalization,
217 );
218 PromPlanner::stmt_to_plan(table_provider, stmt, &self.engine_state)
219 .await
220 .map_err(BoxedError::new)
221 .context(QueryPlanSnafu)
222 }
223
224 #[tracing::instrument(skip_all)]
225 fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
226 Ok(self.engine_state.optimize_logical_plan(plan)?)
227 }
228
229 fn has_hybrid_ctes(&self, stmt: &Statement) -> bool {
231 if let Statement::Query(query) = stmt {
232 query
233 .hybrid_cte
234 .as_ref()
235 .map(|hybrid_cte| !hybrid_cte.cte_tables.is_empty())
236 .unwrap_or(false)
237 } else {
238 false
239 }
240 }
241
242 async fn plan_query_with_hybrid_ctes(
244 &self,
245 query: &Query,
246 query_ctx: QueryContextRef,
247 planner_context: &mut PlannerContext,
248 ) -> Result<()> {
249 let hybrid_cte = query.hybrid_cte.as_ref().unwrap();
250
251 for cte in &hybrid_cte.cte_tables {
252 match &cte.content {
253 CteContent::Tql(tql) => {
254 let mut logical_plan = self.tql_to_logical_plan(tql, query_ctx.clone()).await?;
256 if !cte.columns.is_empty() {
257 let schema = logical_plan.schema();
258 let schema_fields = schema.fields().to_vec();
259 ensure!(
260 schema_fields.len() == cte.columns.len(),
261 CteColumnSchemaMismatchSnafu {
262 cte_name: cte.name.value.clone(),
263 original: schema_fields
264 .iter()
265 .map(|field| field.name().to_string())
266 .collect::<Vec<_>>(),
267 expected: cte
268 .columns
269 .iter()
270 .map(|column| column.to_string())
271 .collect::<Vec<_>>(),
272 }
273 );
274 let aliases = cte
275 .columns
276 .iter()
277 .zip(schema_fields.iter())
278 .map(|(column, field)| col(field.name()).alias(column.to_string()));
279 logical_plan = LogicalPlanBuilder::from(logical_plan)
280 .project(aliases)
281 .context(PlanSqlSnafu)?
282 .build()
283 .context(PlanSqlSnafu)?;
284 }
285 planner_context.insert_cte(&cte.name.value, logical_plan);
286 }
287 CteContent::Sql(_) => {
288 unreachable!("SQL CTEs should not be in hybrid_cte.cte_tables");
291 }
292 }
293 }
294
295 Ok(())
296 }
297
298 async fn tql_to_logical_plan(
300 &self,
301 tql: &Tql,
302 query_ctx: QueryContextRef,
303 ) -> Result<LogicalPlan> {
304 match tql {
305 Tql::Eval(eval) => {
306 let prom_query = PromQuery {
308 query: eval.query.clone(),
309 start: eval.start.clone(),
310 end: eval.end.clone(),
311 step: eval.step.clone(),
312 lookback: eval
313 .lookback
314 .clone()
315 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
316 };
317 let stmt = QueryLanguageParser::parse_promql(&prom_query, &query_ctx)?;
318 self.plan(&stmt, query_ctx).await
319 }
320 Tql::Explain(_) => UnimplementedSnafu {
321 operation: "TQL EXPLAIN in CTEs",
322 }
323 .fail(),
324 Tql::Analyze(_) => UnimplementedSnafu {
325 operation: "TQL ANALYZE in CTEs",
326 }
327 .fail(),
328 }
329 }
330}
331
332#[async_trait]
333impl LogicalPlanner for DfLogicalPlanner {
334 #[tracing::instrument(skip_all)]
335 async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
336 match stmt {
337 QueryStatement::Sql(stmt) => self.plan_sql(stmt, query_ctx).await,
338 QueryStatement::Promql(stmt) => self.plan_pql(stmt, query_ctx).await,
339 }
340 }
341
342 async fn plan_logs_query(
343 &self,
344 query: LogQuery,
345 query_ctx: QueryContextRef,
346 ) -> Result<LogicalPlan> {
347 let plan_decoder = Arc::new(DefaultPlanDecoder::new(
348 self.session_state.clone(),
349 &query_ctx,
350 )?);
351 let table_provider = DfTableSourceProvider::new(
352 self.engine_state.catalog_manager().clone(),
353 self.engine_state.disallow_cross_catalog_query(),
354 query_ctx,
355 plan_decoder,
356 self.session_state
357 .config_options()
358 .sql_parser
359 .enable_ident_normalization,
360 );
361
362 let mut planner = LogQueryPlanner::new(table_provider, self.session_state.clone());
363 planner
364 .query_to_plan(query)
365 .await
366 .map_err(BoxedError::new)
367 .context(QueryPlanSnafu)
368 }
369
370 fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
371 self.optimize_logical_plan(plan)
372 }
373
374 fn as_any(&self) -> &dyn Any {
375 self
376 }
377}