query/
planner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::borrow::Cow;
17use std::sync::Arc;
18
19use async_trait::async_trait;
20use catalog::table_source::DfTableSourceProvider;
21use common_error::ext::BoxedError;
22use common_telemetry::tracing;
23use datafusion::common::DFSchema;
24use datafusion::execution::context::SessionState;
25use datafusion::sql::planner::PlannerContext;
26use datafusion_expr::{col, Expr as DfExpr, LogicalPlan, LogicalPlanBuilder};
27use datafusion_sql::planner::{ParserOptions, SqlToRel};
28use log_query::LogQuery;
29use promql_parser::parser::EvalStmt;
30use session::context::QueryContextRef;
31use snafu::{ensure, ResultExt};
32use sql::ast::Expr as SqlExpr;
33use sql::statements::query::Query;
34use sql::statements::statement::Statement;
35use sql::statements::tql::Tql;
36use sql::CteContent;
37
38use crate::error::{
39    CteColumnSchemaMismatchSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu,
40    UnimplementedSnafu,
41};
42use crate::log_query::planner::LogQueryPlanner;
43use crate::parser::{PromQuery, QueryLanguageParser, QueryStatement, DEFAULT_LOOKBACK_STRING};
44use crate::promql::planner::PromPlanner;
45use crate::query_engine::{DefaultPlanDecoder, QueryEngineState};
46use crate::range_select::plan_rewrite::RangePlanRewriter;
47use crate::{DfContextProviderAdapter, QueryEngineContext};
48
49#[async_trait]
50pub trait LogicalPlanner: Send + Sync {
51    async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan>;
52
53    async fn plan_logs_query(
54        &self,
55        query: LogQuery,
56        query_ctx: QueryContextRef,
57    ) -> Result<LogicalPlan>;
58
59    fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan>;
60
61    fn as_any(&self) -> &dyn Any;
62}
63
64pub struct DfLogicalPlanner {
65    engine_state: Arc<QueryEngineState>,
66    session_state: SessionState,
67}
68
69impl DfLogicalPlanner {
70    pub fn new(engine_state: Arc<QueryEngineState>) -> Self {
71        let session_state = engine_state.session_state();
72        Self {
73            engine_state,
74            session_state,
75        }
76    }
77
78    #[tracing::instrument(skip_all)]
79    async fn plan_sql(&self, stmt: &Statement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
80        let mut planner_context = PlannerContext::new();
81        let mut stmt = Cow::Borrowed(stmt);
82        let mut is_tql_cte = false;
83
84        // Check for hybrid CTEs before normal processing
85        if self.has_hybrid_ctes(stmt.as_ref()) {
86            let stmt_owned = stmt.into_owned();
87            let mut query = match stmt_owned {
88                Statement::Query(query) => query.as_ref().clone(),
89                _ => unreachable!("has_hybrid_ctes should only return true for Query statements"),
90            };
91            self.plan_query_with_hybrid_ctes(&query, query_ctx.clone(), &mut planner_context)
92                .await?;
93
94            // remove the processed TQL CTEs from the query
95            query.hybrid_cte = None;
96            stmt = Cow::Owned(Statement::Query(Box::new(query)));
97            is_tql_cte = true;
98        }
99
100        let mut df_stmt = stmt.as_ref().try_into().context(SqlSnafu)?;
101
102        // TODO(LFC): Remove this when Datafusion supports **both** the syntax and implementation of "explain with format".
103        if let datafusion::sql::parser::Statement::Statement(
104            box datafusion::sql::sqlparser::ast::Statement::Explain { format, .. },
105        ) = &mut df_stmt
106        {
107            format.take();
108        }
109
110        let table_provider = DfTableSourceProvider::new(
111            self.engine_state.catalog_manager().clone(),
112            self.engine_state.disallow_cross_catalog_query(),
113            query_ctx.clone(),
114            Arc::new(DefaultPlanDecoder::new(
115                self.session_state.clone(),
116                &query_ctx,
117            )?),
118            self.session_state
119                .config_options()
120                .sql_parser
121                .enable_ident_normalization,
122        );
123
124        let context_provider = DfContextProviderAdapter::try_new(
125            self.engine_state.clone(),
126            self.session_state.clone(),
127            Some(&df_stmt),
128            query_ctx.clone(),
129        )
130        .await?;
131
132        let config_options = self.session_state.config().options();
133        let parser_options = &config_options.sql_parser;
134        let parser_options = ParserOptions {
135            map_string_types_to_utf8view: false,
136            ..parser_options.into()
137        };
138
139        let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
140
141        // this IF is to handle different version of ASTs
142        let result = if is_tql_cte {
143            let Statement::Query(query) = stmt.into_owned() else {
144                unreachable!("is_tql_cte should only be true for Query statements");
145            };
146            let sqlparser_stmt =
147                datafusion::sql::sqlparser::ast::Statement::Query(Box::new(query.inner.into()));
148            sql_to_rel
149                .sql_statement_to_plan_with_context(sqlparser_stmt, &mut planner_context)
150                .context(PlanSqlSnafu)?
151        } else {
152            sql_to_rel
153                .statement_to_plan(df_stmt)
154                .context(PlanSqlSnafu)?
155        };
156
157        common_telemetry::debug!("Logical planner, statement to plan result: {result}");
158        let plan = RangePlanRewriter::new(table_provider, query_ctx.clone())
159            .rewrite(result)
160            .await?;
161
162        // Optimize logical plan by extension rules
163        let context = QueryEngineContext::new(self.session_state.clone(), query_ctx);
164        let plan = self
165            .engine_state
166            .optimize_by_extension_rules(plan, &context)?;
167        common_telemetry::debug!("Logical planner, optimize result: {plan}");
168
169        Ok(plan)
170    }
171
172    /// Generate a relational expression from a SQL expression
173    #[tracing::instrument(skip_all)]
174    pub(crate) async fn sql_to_expr(
175        &self,
176        sql: SqlExpr,
177        schema: &DFSchema,
178        normalize_ident: bool,
179        query_ctx: QueryContextRef,
180    ) -> Result<DfExpr> {
181        let context_provider = DfContextProviderAdapter::try_new(
182            self.engine_state.clone(),
183            self.session_state.clone(),
184            None,
185            query_ctx,
186        )
187        .await?;
188
189        let config_options = self.session_state.config().options();
190        let parser_options = &config_options.sql_parser;
191        let parser_options: ParserOptions = ParserOptions {
192            map_string_types_to_utf8view: false,
193            enable_ident_normalization: normalize_ident,
194            ..parser_options.into()
195        };
196
197        let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
198
199        Ok(sql_to_rel.sql_to_expr(sql.into(), schema, &mut PlannerContext::new())?)
200    }
201
202    #[tracing::instrument(skip_all)]
203    async fn plan_pql(&self, stmt: &EvalStmt, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
204        let plan_decoder = Arc::new(DefaultPlanDecoder::new(
205            self.session_state.clone(),
206            &query_ctx,
207        )?);
208        let table_provider = DfTableSourceProvider::new(
209            self.engine_state.catalog_manager().clone(),
210            self.engine_state.disallow_cross_catalog_query(),
211            query_ctx,
212            plan_decoder,
213            self.session_state
214                .config_options()
215                .sql_parser
216                .enable_ident_normalization,
217        );
218        PromPlanner::stmt_to_plan(table_provider, stmt, &self.engine_state)
219            .await
220            .map_err(BoxedError::new)
221            .context(QueryPlanSnafu)
222    }
223
224    #[tracing::instrument(skip_all)]
225    fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
226        Ok(self.engine_state.optimize_logical_plan(plan)?)
227    }
228
229    /// Check if a statement contains hybrid CTEs (mix of SQL and TQL)
230    fn has_hybrid_ctes(&self, stmt: &Statement) -> bool {
231        if let Statement::Query(query) = stmt {
232            query
233                .hybrid_cte
234                .as_ref()
235                .map(|hybrid_cte| !hybrid_cte.cte_tables.is_empty())
236                .unwrap_or(false)
237        } else {
238            false
239        }
240    }
241
242    /// Plan a query with hybrid CTEs using DataFusion's native PlannerContext
243    async fn plan_query_with_hybrid_ctes(
244        &self,
245        query: &Query,
246        query_ctx: QueryContextRef,
247        planner_context: &mut PlannerContext,
248    ) -> Result<()> {
249        let hybrid_cte = query.hybrid_cte.as_ref().unwrap();
250
251        for cte in &hybrid_cte.cte_tables {
252            match &cte.content {
253                CteContent::Tql(tql) => {
254                    // Plan TQL and register in PlannerContext
255                    let mut logical_plan = self.tql_to_logical_plan(tql, query_ctx.clone()).await?;
256                    if !cte.columns.is_empty() {
257                        let schema = logical_plan.schema();
258                        let schema_fields = schema.fields().to_vec();
259                        ensure!(
260                            schema_fields.len() == cte.columns.len(),
261                            CteColumnSchemaMismatchSnafu {
262                                cte_name: cte.name.value.clone(),
263                                original: schema_fields
264                                    .iter()
265                                    .map(|field| field.name().to_string())
266                                    .collect::<Vec<_>>(),
267                                expected: cte
268                                    .columns
269                                    .iter()
270                                    .map(|column| column.to_string())
271                                    .collect::<Vec<_>>(),
272                            }
273                        );
274                        let aliases = cte
275                            .columns
276                            .iter()
277                            .zip(schema_fields.iter())
278                            .map(|(column, field)| col(field.name()).alias(column.to_string()));
279                        logical_plan = LogicalPlanBuilder::from(logical_plan)
280                            .project(aliases)
281                            .context(PlanSqlSnafu)?
282                            .build()
283                            .context(PlanSqlSnafu)?;
284                    }
285                    planner_context.insert_cte(&cte.name.value, logical_plan);
286                }
287                CteContent::Sql(_) => {
288                    // SQL CTEs should have been moved to the main query's WITH clause
289                    // during parsing, so we shouldn't encounter them here
290                    unreachable!("SQL CTEs should not be in hybrid_cte.cte_tables");
291                }
292            }
293        }
294
295        Ok(())
296    }
297
298    /// Convert TQL to LogicalPlan directly
299    async fn tql_to_logical_plan(
300        &self,
301        tql: &Tql,
302        query_ctx: QueryContextRef,
303    ) -> Result<LogicalPlan> {
304        match tql {
305            Tql::Eval(eval) => {
306                // Convert TqlEval to PromQuery then to QueryStatement::Promql
307                let prom_query = PromQuery {
308                    query: eval.query.clone(),
309                    start: eval.start.clone(),
310                    end: eval.end.clone(),
311                    step: eval.step.clone(),
312                    lookback: eval
313                        .lookback
314                        .clone()
315                        .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
316                };
317                let stmt = QueryLanguageParser::parse_promql(&prom_query, &query_ctx)?;
318                self.plan(&stmt, query_ctx).await
319            }
320            Tql::Explain(_) => UnimplementedSnafu {
321                operation: "TQL EXPLAIN in CTEs",
322            }
323            .fail(),
324            Tql::Analyze(_) => UnimplementedSnafu {
325                operation: "TQL ANALYZE in CTEs",
326            }
327            .fail(),
328        }
329    }
330}
331
332#[async_trait]
333impl LogicalPlanner for DfLogicalPlanner {
334    #[tracing::instrument(skip_all)]
335    async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
336        match stmt {
337            QueryStatement::Sql(stmt) => self.plan_sql(stmt, query_ctx).await,
338            QueryStatement::Promql(stmt) => self.plan_pql(stmt, query_ctx).await,
339        }
340    }
341
342    async fn plan_logs_query(
343        &self,
344        query: LogQuery,
345        query_ctx: QueryContextRef,
346    ) -> Result<LogicalPlan> {
347        let plan_decoder = Arc::new(DefaultPlanDecoder::new(
348            self.session_state.clone(),
349            &query_ctx,
350        )?);
351        let table_provider = DfTableSourceProvider::new(
352            self.engine_state.catalog_manager().clone(),
353            self.engine_state.disallow_cross_catalog_query(),
354            query_ctx,
355            plan_decoder,
356            self.session_state
357                .config_options()
358                .sql_parser
359                .enable_ident_normalization,
360        );
361
362        let mut planner = LogQueryPlanner::new(table_provider, self.session_state.clone());
363        planner
364            .query_to_plan(query)
365            .await
366            .map_err(BoxedError::new)
367            .context(QueryPlanSnafu)
368    }
369
370    fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
371        self.optimize_logical_plan(plan)
372    }
373
374    fn as_any(&self) -> &dyn Any {
375        self
376    }
377}