query/
planner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::borrow::Cow;
17use std::sync::Arc;
18
19use async_trait::async_trait;
20use catalog::table_source::DfTableSourceProvider;
21use common_error::ext::BoxedError;
22use common_telemetry::tracing;
23use datafusion::common::{DFSchema, plan_err};
24use datafusion::execution::context::SessionState;
25use datafusion::sql::planner::PlannerContext;
26use datafusion_common::ToDFSchema;
27use datafusion_expr::{
28    Analyze, Explain, ExplainFormat, Expr as DfExpr, LogicalPlan, LogicalPlanBuilder, PlanType,
29    ToStringifiedPlan, col,
30};
31use datafusion_sql::planner::{ParserOptions, SqlToRel};
32use log_query::LogQuery;
33use promql_parser::parser::EvalStmt;
34use session::context::QueryContextRef;
35use snafu::{ResultExt, ensure};
36use sql::CteContent;
37use sql::ast::Expr as SqlExpr;
38use sql::statements::explain::ExplainStatement;
39use sql::statements::query::Query;
40use sql::statements::statement::Statement;
41use sql::statements::tql::Tql;
42
43use crate::error::{
44    CteColumnSchemaMismatchSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu,
45    UnimplementedSnafu,
46};
47use crate::log_query::planner::LogQueryPlanner;
48use crate::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser, QueryStatement};
49use crate::promql::planner::PromPlanner;
50use crate::query_engine::{DefaultPlanDecoder, QueryEngineState};
51use crate::range_select::plan_rewrite::RangePlanRewriter;
52use crate::{DfContextProviderAdapter, QueryEngineContext};
53
54#[async_trait]
55pub trait LogicalPlanner: Send + Sync {
56    async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan>;
57
58    async fn plan_logs_query(
59        &self,
60        query: LogQuery,
61        query_ctx: QueryContextRef,
62    ) -> Result<LogicalPlan>;
63
64    fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan>;
65
66    fn as_any(&self) -> &dyn Any;
67}
68
69pub struct DfLogicalPlanner {
70    engine_state: Arc<QueryEngineState>,
71    session_state: SessionState,
72}
73
74impl DfLogicalPlanner {
75    pub fn new(engine_state: Arc<QueryEngineState>) -> Self {
76        let session_state = engine_state.session_state();
77        Self {
78            engine_state,
79            session_state,
80        }
81    }
82
83    /// Basically the same with `explain_to_plan` in DataFusion, but adapted to Greptime's
84    /// `plan_sql` to support Greptime Statements.
85    async fn explain_to_plan(
86        &self,
87        explain: &ExplainStatement,
88        query_ctx: QueryContextRef,
89    ) -> Result<LogicalPlan> {
90        let plan = self.plan_sql(&explain.statement, query_ctx).await?;
91        if matches!(plan, LogicalPlan::Explain(_)) {
92            return plan_err!("Nested EXPLAINs are not supported").context(PlanSqlSnafu);
93        }
94
95        let verbose = explain.verbose;
96        let analyze = explain.analyze;
97        let format = explain.format.map(|f| f.to_string());
98
99        let plan = Arc::new(plan);
100        let schema = LogicalPlan::explain_schema();
101        let schema = ToDFSchema::to_dfschema_ref(schema)?;
102
103        if verbose && format.is_some() {
104            return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported").context(PlanSqlSnafu);
105        }
106
107        if analyze {
108            // notice format is already set in query context, so can be ignore here
109            Ok(LogicalPlan::Analyze(Analyze {
110                verbose,
111                input: plan,
112                schema,
113            }))
114        } else {
115            let stringified_plans = vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
116
117            // default to configuration value
118            let options = self.session_state.config().options();
119            let format = format.as_ref().unwrap_or(&options.explain.format);
120
121            let format: ExplainFormat = format.parse()?;
122
123            Ok(LogicalPlan::Explain(Explain {
124                verbose,
125                explain_format: format,
126                plan,
127                stringified_plans,
128                schema,
129                logical_optimization_succeeded: false,
130            }))
131        }
132    }
133
134    #[tracing::instrument(skip_all)]
135    #[async_recursion::async_recursion]
136    async fn plan_sql(&self, stmt: &Statement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
137        let mut planner_context = PlannerContext::new();
138        let mut stmt = Cow::Borrowed(stmt);
139        let mut is_tql_cte = false;
140
141        // handle explain before normal processing so we can explain Greptime Statements
142        if let Statement::Explain(explain) = stmt.as_ref() {
143            return self.explain_to_plan(explain, query_ctx).await;
144        }
145
146        // Check for hybrid CTEs before normal processing
147        if self.has_hybrid_ctes(stmt.as_ref()) {
148            let stmt_owned = stmt.into_owned();
149            let mut query = match stmt_owned {
150                Statement::Query(query) => query.as_ref().clone(),
151                _ => unreachable!("has_hybrid_ctes should only return true for Query statements"),
152            };
153            self.plan_query_with_hybrid_ctes(&query, query_ctx.clone(), &mut planner_context)
154                .await?;
155
156            // remove the processed TQL CTEs from the query
157            query.hybrid_cte = None;
158            stmt = Cow::Owned(Statement::Query(Box::new(query)));
159            is_tql_cte = true;
160        }
161
162        let mut df_stmt = stmt.as_ref().try_into().context(SqlSnafu)?;
163
164        // TODO(LFC): Remove this when Datafusion supports **both** the syntax and implementation of "explain with format".
165        if let datafusion::sql::parser::Statement::Statement(
166            box datafusion::sql::sqlparser::ast::Statement::Explain { .. },
167        ) = &mut df_stmt
168        {
169            UnimplementedSnafu {
170                operation: "EXPLAIN with FORMAT using raw datafusion planner",
171            }
172            .fail()?;
173        }
174
175        let table_provider = DfTableSourceProvider::new(
176            self.engine_state.catalog_manager().clone(),
177            self.engine_state.disallow_cross_catalog_query(),
178            query_ctx.clone(),
179            Arc::new(DefaultPlanDecoder::new(
180                self.session_state.clone(),
181                &query_ctx,
182            )?),
183            self.session_state
184                .config_options()
185                .sql_parser
186                .enable_ident_normalization,
187        );
188
189        let context_provider = DfContextProviderAdapter::try_new(
190            self.engine_state.clone(),
191            self.session_state.clone(),
192            Some(&df_stmt),
193            query_ctx.clone(),
194        )
195        .await?;
196
197        let config_options = self.session_state.config().options();
198        let parser_options = &config_options.sql_parser;
199        let parser_options = ParserOptions {
200            map_string_types_to_utf8view: false,
201            ..parser_options.into()
202        };
203
204        let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
205
206        // this IF is to handle different version of ASTs
207        let result = if is_tql_cte {
208            let Statement::Query(query) = stmt.into_owned() else {
209                unreachable!("is_tql_cte should only be true for Query statements");
210            };
211            let sqlparser_stmt =
212                datafusion::sql::sqlparser::ast::Statement::Query(Box::new(query.inner.into()));
213            sql_to_rel
214                .sql_statement_to_plan_with_context(sqlparser_stmt, &mut planner_context)
215                .context(PlanSqlSnafu)?
216        } else {
217            sql_to_rel
218                .statement_to_plan(df_stmt)
219                .context(PlanSqlSnafu)?
220        };
221
222        common_telemetry::debug!("Logical planner, statement to plan result: {result}");
223        let plan = RangePlanRewriter::new(table_provider, query_ctx.clone())
224            .rewrite(result)
225            .await?;
226
227        // Optimize logical plan by extension rules
228        let context = QueryEngineContext::new(self.session_state.clone(), query_ctx);
229        let plan = self
230            .engine_state
231            .optimize_by_extension_rules(plan, &context)?;
232        common_telemetry::debug!("Logical planner, optimize result: {plan}");
233
234        Ok(plan)
235    }
236
237    /// Generate a relational expression from a SQL expression
238    #[tracing::instrument(skip_all)]
239    pub(crate) async fn sql_to_expr(
240        &self,
241        sql: SqlExpr,
242        schema: &DFSchema,
243        normalize_ident: bool,
244        query_ctx: QueryContextRef,
245    ) -> Result<DfExpr> {
246        let context_provider = DfContextProviderAdapter::try_new(
247            self.engine_state.clone(),
248            self.session_state.clone(),
249            None,
250            query_ctx,
251        )
252        .await?;
253
254        let config_options = self.session_state.config().options();
255        let parser_options = &config_options.sql_parser;
256        let parser_options: ParserOptions = ParserOptions {
257            map_string_types_to_utf8view: false,
258            enable_ident_normalization: normalize_ident,
259            ..parser_options.into()
260        };
261
262        let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
263
264        Ok(sql_to_rel.sql_to_expr(sql.into(), schema, &mut PlannerContext::new())?)
265    }
266
267    #[tracing::instrument(skip_all)]
268    async fn plan_pql(
269        &self,
270        stmt: &EvalStmt,
271        alias: Option<String>,
272        query_ctx: QueryContextRef,
273    ) -> Result<LogicalPlan> {
274        let plan_decoder = Arc::new(DefaultPlanDecoder::new(
275            self.session_state.clone(),
276            &query_ctx,
277        )?);
278        let table_provider = DfTableSourceProvider::new(
279            self.engine_state.catalog_manager().clone(),
280            self.engine_state.disallow_cross_catalog_query(),
281            query_ctx,
282            plan_decoder,
283            self.session_state
284                .config_options()
285                .sql_parser
286                .enable_ident_normalization,
287        );
288        PromPlanner::stmt_to_plan_with_alias(table_provider, stmt, alias, &self.engine_state)
289            .await
290            .map_err(BoxedError::new)
291            .context(QueryPlanSnafu)
292    }
293
294    #[tracing::instrument(skip_all)]
295    fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
296        Ok(self.engine_state.optimize_logical_plan(plan)?)
297    }
298
299    /// Check if a statement contains hybrid CTEs (mix of SQL and TQL)
300    fn has_hybrid_ctes(&self, stmt: &Statement) -> bool {
301        if let Statement::Query(query) = stmt {
302            query
303                .hybrid_cte
304                .as_ref()
305                .map(|hybrid_cte| !hybrid_cte.cte_tables.is_empty())
306                .unwrap_or(false)
307        } else {
308            false
309        }
310    }
311
312    /// Plan a query with hybrid CTEs using DataFusion's native PlannerContext
313    async fn plan_query_with_hybrid_ctes(
314        &self,
315        query: &Query,
316        query_ctx: QueryContextRef,
317        planner_context: &mut PlannerContext,
318    ) -> Result<()> {
319        let hybrid_cte = query.hybrid_cte.as_ref().unwrap();
320
321        for cte in &hybrid_cte.cte_tables {
322            match &cte.content {
323                CteContent::Tql(tql) => {
324                    // Plan TQL and register in PlannerContext
325                    let mut logical_plan = self.tql_to_logical_plan(tql, query_ctx.clone()).await?;
326                    if !cte.columns.is_empty() {
327                        let schema = logical_plan.schema();
328                        let schema_fields = schema.fields().to_vec();
329                        ensure!(
330                            schema_fields.len() == cte.columns.len(),
331                            CteColumnSchemaMismatchSnafu {
332                                cte_name: cte.name.value.clone(),
333                                original: schema_fields
334                                    .iter()
335                                    .map(|field| field.name().clone())
336                                    .collect::<Vec<_>>(),
337                                expected: cte
338                                    .columns
339                                    .iter()
340                                    .map(|column| column.to_string())
341                                    .collect::<Vec<_>>(),
342                            }
343                        );
344                        let aliases = cte
345                            .columns
346                            .iter()
347                            .zip(schema_fields.iter())
348                            .map(|(column, field)| col(field.name()).alias(column.to_string()));
349                        logical_plan = LogicalPlanBuilder::from(logical_plan)
350                            .project(aliases)
351                            .context(PlanSqlSnafu)?
352                            .build()
353                            .context(PlanSqlSnafu)?;
354                    }
355
356                    // Wrap in SubqueryAlias to ensure proper table qualification for CTE
357                    logical_plan = LogicalPlan::SubqueryAlias(
358                        datafusion_expr::SubqueryAlias::try_new(
359                            Arc::new(logical_plan),
360                            cte.name.value.clone(),
361                        )
362                        .context(PlanSqlSnafu)?,
363                    );
364
365                    planner_context.insert_cte(&cte.name.value, logical_plan);
366                }
367                CteContent::Sql(_) => {
368                    // SQL CTEs should have been moved to the main query's WITH clause
369                    // during parsing, so we shouldn't encounter them here
370                    unreachable!("SQL CTEs should not be in hybrid_cte.cte_tables");
371                }
372            }
373        }
374
375        Ok(())
376    }
377
378    /// Convert TQL to LogicalPlan directly
379    async fn tql_to_logical_plan(
380        &self,
381        tql: &Tql,
382        query_ctx: QueryContextRef,
383    ) -> Result<LogicalPlan> {
384        match tql {
385            Tql::Eval(eval) => {
386                // Convert TqlEval to PromQuery then to QueryStatement::Promql
387                let prom_query = PromQuery {
388                    query: eval.query.clone(),
389                    start: eval.start.clone(),
390                    end: eval.end.clone(),
391                    step: eval.step.clone(),
392                    lookback: eval
393                        .lookback
394                        .clone()
395                        .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
396                    alias: eval.alias.clone(),
397                };
398                let stmt = QueryLanguageParser::parse_promql(&prom_query, &query_ctx)?;
399
400                self.plan(&stmt, query_ctx).await
401            }
402            Tql::Explain(_) => UnimplementedSnafu {
403                operation: "TQL EXPLAIN in CTEs",
404            }
405            .fail(),
406            Tql::Analyze(_) => UnimplementedSnafu {
407                operation: "TQL ANALYZE in CTEs",
408            }
409            .fail(),
410        }
411    }
412}
413
414#[async_trait]
415impl LogicalPlanner for DfLogicalPlanner {
416    #[tracing::instrument(skip_all)]
417    async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
418        match stmt {
419            QueryStatement::Sql(stmt) => self.plan_sql(stmt, query_ctx).await,
420            QueryStatement::Promql(stmt, alias) => {
421                self.plan_pql(stmt, alias.clone(), query_ctx).await
422            }
423        }
424    }
425
426    async fn plan_logs_query(
427        &self,
428        query: LogQuery,
429        query_ctx: QueryContextRef,
430    ) -> Result<LogicalPlan> {
431        let plan_decoder = Arc::new(DefaultPlanDecoder::new(
432            self.session_state.clone(),
433            &query_ctx,
434        )?);
435        let table_provider = DfTableSourceProvider::new(
436            self.engine_state.catalog_manager().clone(),
437            self.engine_state.disallow_cross_catalog_query(),
438            query_ctx,
439            plan_decoder,
440            self.session_state
441                .config_options()
442                .sql_parser
443                .enable_ident_normalization,
444        );
445
446        let mut planner = LogQueryPlanner::new(table_provider, self.session_state.clone());
447        planner
448            .query_to_plan(query)
449            .await
450            .map_err(BoxedError::new)
451            .context(QueryPlanSnafu)
452    }
453
454    fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
455        self.optimize_logical_plan(plan)
456    }
457
458    fn as_any(&self) -> &dyn Any {
459        self
460    }
461}