query/
planner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::borrow::Cow;
17use std::sync::Arc;
18
19use async_trait::async_trait;
20use catalog::table_source::DfTableSourceProvider;
21use common_error::ext::BoxedError;
22use common_telemetry::tracing;
23use datafusion::common::DFSchema;
24use datafusion::execution::context::SessionState;
25use datafusion::sql::planner::PlannerContext;
26use datafusion_expr::{Expr as DfExpr, LogicalPlan, LogicalPlanBuilder, col};
27use datafusion_sql::planner::{ParserOptions, SqlToRel};
28use log_query::LogQuery;
29use promql_parser::parser::EvalStmt;
30use session::context::QueryContextRef;
31use snafu::{ResultExt, ensure};
32use sql::CteContent;
33use sql::ast::Expr as SqlExpr;
34use sql::statements::query::Query;
35use sql::statements::statement::Statement;
36use sql::statements::tql::Tql;
37
38use crate::error::{
39    CteColumnSchemaMismatchSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu,
40    UnimplementedSnafu,
41};
42use crate::log_query::planner::LogQueryPlanner;
43use crate::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser, QueryStatement};
44use crate::promql::planner::PromPlanner;
45use crate::query_engine::{DefaultPlanDecoder, QueryEngineState};
46use crate::range_select::plan_rewrite::RangePlanRewriter;
47use crate::{DfContextProviderAdapter, QueryEngineContext};
48
49#[async_trait]
50pub trait LogicalPlanner: Send + Sync {
51    async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan>;
52
53    async fn plan_logs_query(
54        &self,
55        query: LogQuery,
56        query_ctx: QueryContextRef,
57    ) -> Result<LogicalPlan>;
58
59    fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan>;
60
61    fn as_any(&self) -> &dyn Any;
62}
63
64pub struct DfLogicalPlanner {
65    engine_state: Arc<QueryEngineState>,
66    session_state: SessionState,
67}
68
69impl DfLogicalPlanner {
70    pub fn new(engine_state: Arc<QueryEngineState>) -> Self {
71        let session_state = engine_state.session_state();
72        Self {
73            engine_state,
74            session_state,
75        }
76    }
77
78    #[tracing::instrument(skip_all)]
79    async fn plan_sql(&self, stmt: &Statement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
80        let mut planner_context = PlannerContext::new();
81        let mut stmt = Cow::Borrowed(stmt);
82        let mut is_tql_cte = false;
83
84        // Check for hybrid CTEs before normal processing
85        if self.has_hybrid_ctes(stmt.as_ref()) {
86            let stmt_owned = stmt.into_owned();
87            let mut query = match stmt_owned {
88                Statement::Query(query) => query.as_ref().clone(),
89                _ => unreachable!("has_hybrid_ctes should only return true for Query statements"),
90            };
91            self.plan_query_with_hybrid_ctes(&query, query_ctx.clone(), &mut planner_context)
92                .await?;
93
94            // remove the processed TQL CTEs from the query
95            query.hybrid_cte = None;
96            stmt = Cow::Owned(Statement::Query(Box::new(query)));
97            is_tql_cte = true;
98        }
99
100        let mut df_stmt = stmt.as_ref().try_into().context(SqlSnafu)?;
101
102        // TODO(LFC): Remove this when Datafusion supports **both** the syntax and implementation of "explain with format".
103        if let datafusion::sql::parser::Statement::Statement(
104            box datafusion::sql::sqlparser::ast::Statement::Explain { format, .. },
105        ) = &mut df_stmt
106        {
107            format.take();
108        }
109
110        let table_provider = DfTableSourceProvider::new(
111            self.engine_state.catalog_manager().clone(),
112            self.engine_state.disallow_cross_catalog_query(),
113            query_ctx.clone(),
114            Arc::new(DefaultPlanDecoder::new(
115                self.session_state.clone(),
116                &query_ctx,
117            )?),
118            self.session_state
119                .config_options()
120                .sql_parser
121                .enable_ident_normalization,
122        );
123
124        let context_provider = DfContextProviderAdapter::try_new(
125            self.engine_state.clone(),
126            self.session_state.clone(),
127            Some(&df_stmt),
128            query_ctx.clone(),
129        )
130        .await?;
131
132        let config_options = self.session_state.config().options();
133        let parser_options = &config_options.sql_parser;
134        let parser_options = ParserOptions {
135            map_string_types_to_utf8view: false,
136            ..parser_options.into()
137        };
138
139        let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
140
141        // this IF is to handle different version of ASTs
142        let result = if is_tql_cte {
143            let Statement::Query(query) = stmt.into_owned() else {
144                unreachable!("is_tql_cte should only be true for Query statements");
145            };
146            let sqlparser_stmt =
147                datafusion::sql::sqlparser::ast::Statement::Query(Box::new(query.inner.into()));
148            sql_to_rel
149                .sql_statement_to_plan_with_context(sqlparser_stmt, &mut planner_context)
150                .context(PlanSqlSnafu)?
151        } else {
152            sql_to_rel
153                .statement_to_plan(df_stmt)
154                .context(PlanSqlSnafu)?
155        };
156
157        common_telemetry::debug!("Logical planner, statement to plan result: {result}");
158        let plan = RangePlanRewriter::new(table_provider, query_ctx.clone())
159            .rewrite(result)
160            .await?;
161
162        // Optimize logical plan by extension rules
163        let context = QueryEngineContext::new(self.session_state.clone(), query_ctx);
164        let plan = self
165            .engine_state
166            .optimize_by_extension_rules(plan, &context)?;
167        common_telemetry::debug!("Logical planner, optimize result: {plan}");
168
169        Ok(plan)
170    }
171
172    /// Generate a relational expression from a SQL expression
173    #[tracing::instrument(skip_all)]
174    pub(crate) async fn sql_to_expr(
175        &self,
176        sql: SqlExpr,
177        schema: &DFSchema,
178        normalize_ident: bool,
179        query_ctx: QueryContextRef,
180    ) -> Result<DfExpr> {
181        let context_provider = DfContextProviderAdapter::try_new(
182            self.engine_state.clone(),
183            self.session_state.clone(),
184            None,
185            query_ctx,
186        )
187        .await?;
188
189        let config_options = self.session_state.config().options();
190        let parser_options = &config_options.sql_parser;
191        let parser_options: ParserOptions = ParserOptions {
192            map_string_types_to_utf8view: false,
193            enable_ident_normalization: normalize_ident,
194            ..parser_options.into()
195        };
196
197        let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
198
199        Ok(sql_to_rel.sql_to_expr(sql.into(), schema, &mut PlannerContext::new())?)
200    }
201
202    #[tracing::instrument(skip_all)]
203    async fn plan_pql(&self, stmt: &EvalStmt, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
204        let plan_decoder = Arc::new(DefaultPlanDecoder::new(
205            self.session_state.clone(),
206            &query_ctx,
207        )?);
208        let table_provider = DfTableSourceProvider::new(
209            self.engine_state.catalog_manager().clone(),
210            self.engine_state.disallow_cross_catalog_query(),
211            query_ctx,
212            plan_decoder,
213            self.session_state
214                .config_options()
215                .sql_parser
216                .enable_ident_normalization,
217        );
218        PromPlanner::stmt_to_plan(table_provider, stmt, &self.engine_state)
219            .await
220            .map_err(BoxedError::new)
221            .context(QueryPlanSnafu)
222    }
223
224    #[tracing::instrument(skip_all)]
225    fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
226        Ok(self.engine_state.optimize_logical_plan(plan)?)
227    }
228
229    /// Check if a statement contains hybrid CTEs (mix of SQL and TQL)
230    fn has_hybrid_ctes(&self, stmt: &Statement) -> bool {
231        if let Statement::Query(query) = stmt {
232            query
233                .hybrid_cte
234                .as_ref()
235                .map(|hybrid_cte| !hybrid_cte.cte_tables.is_empty())
236                .unwrap_or(false)
237        } else {
238            false
239        }
240    }
241
242    /// Plan a query with hybrid CTEs using DataFusion's native PlannerContext
243    async fn plan_query_with_hybrid_ctes(
244        &self,
245        query: &Query,
246        query_ctx: QueryContextRef,
247        planner_context: &mut PlannerContext,
248    ) -> Result<()> {
249        let hybrid_cte = query.hybrid_cte.as_ref().unwrap();
250
251        for cte in &hybrid_cte.cte_tables {
252            match &cte.content {
253                CteContent::Tql(tql) => {
254                    // Plan TQL and register in PlannerContext
255                    let mut logical_plan = self.tql_to_logical_plan(tql, query_ctx.clone()).await?;
256                    if !cte.columns.is_empty() {
257                        let schema = logical_plan.schema();
258                        let schema_fields = schema.fields().to_vec();
259                        ensure!(
260                            schema_fields.len() == cte.columns.len(),
261                            CteColumnSchemaMismatchSnafu {
262                                cte_name: cte.name.value.clone(),
263                                original: schema_fields
264                                    .iter()
265                                    .map(|field| field.name().to_string())
266                                    .collect::<Vec<_>>(),
267                                expected: cte
268                                    .columns
269                                    .iter()
270                                    .map(|column| column.to_string())
271                                    .collect::<Vec<_>>(),
272                            }
273                        );
274                        let aliases = cte
275                            .columns
276                            .iter()
277                            .zip(schema_fields.iter())
278                            .map(|(column, field)| col(field.name()).alias(column.to_string()));
279                        logical_plan = LogicalPlanBuilder::from(logical_plan)
280                            .project(aliases)
281                            .context(PlanSqlSnafu)?
282                            .build()
283                            .context(PlanSqlSnafu)?;
284                    }
285
286                    // Wrap in SubqueryAlias to ensure proper table qualification for CTE
287                    logical_plan = LogicalPlan::SubqueryAlias(
288                        datafusion_expr::SubqueryAlias::try_new(
289                            Arc::new(logical_plan),
290                            cte.name.value.clone(),
291                        )
292                        .context(PlanSqlSnafu)?,
293                    );
294
295                    planner_context.insert_cte(&cte.name.value, logical_plan);
296                }
297                CteContent::Sql(_) => {
298                    // SQL CTEs should have been moved to the main query's WITH clause
299                    // during parsing, so we shouldn't encounter them here
300                    unreachable!("SQL CTEs should not be in hybrid_cte.cte_tables");
301                }
302            }
303        }
304
305        Ok(())
306    }
307
308    /// Convert TQL to LogicalPlan directly
309    async fn tql_to_logical_plan(
310        &self,
311        tql: &Tql,
312        query_ctx: QueryContextRef,
313    ) -> Result<LogicalPlan> {
314        match tql {
315            Tql::Eval(eval) => {
316                // Convert TqlEval to PromQuery then to QueryStatement::Promql
317                let prom_query = PromQuery {
318                    query: eval.query.clone(),
319                    start: eval.start.clone(),
320                    end: eval.end.clone(),
321                    step: eval.step.clone(),
322                    lookback: eval
323                        .lookback
324                        .clone()
325                        .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
326                };
327                let stmt = QueryLanguageParser::parse_promql(&prom_query, &query_ctx)?;
328                self.plan(&stmt, query_ctx).await
329            }
330            Tql::Explain(_) => UnimplementedSnafu {
331                operation: "TQL EXPLAIN in CTEs",
332            }
333            .fail(),
334            Tql::Analyze(_) => UnimplementedSnafu {
335                operation: "TQL ANALYZE in CTEs",
336            }
337            .fail(),
338        }
339    }
340}
341
342#[async_trait]
343impl LogicalPlanner for DfLogicalPlanner {
344    #[tracing::instrument(skip_all)]
345    async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
346        match stmt {
347            QueryStatement::Sql(stmt) => self.plan_sql(stmt, query_ctx).await,
348            QueryStatement::Promql(stmt) => self.plan_pql(stmt, query_ctx).await,
349        }
350    }
351
352    async fn plan_logs_query(
353        &self,
354        query: LogQuery,
355        query_ctx: QueryContextRef,
356    ) -> Result<LogicalPlan> {
357        let plan_decoder = Arc::new(DefaultPlanDecoder::new(
358            self.session_state.clone(),
359            &query_ctx,
360        )?);
361        let table_provider = DfTableSourceProvider::new(
362            self.engine_state.catalog_manager().clone(),
363            self.engine_state.disallow_cross_catalog_query(),
364            query_ctx,
365            plan_decoder,
366            self.session_state
367                .config_options()
368                .sql_parser
369                .enable_ident_normalization,
370        );
371
372        let mut planner = LogQueryPlanner::new(table_provider, self.session_state.clone());
373        planner
374            .query_to_plan(query)
375            .await
376            .map_err(BoxedError::new)
377            .context(QueryPlanSnafu)
378    }
379
380    fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
381        self.optimize_logical_plan(plan)
382    }
383
384    fn as_any(&self) -> &dyn Any {
385        self
386    }
387}