query/
planner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::borrow::Cow;
17use std::collections::{HashMap, HashSet};
18use std::str::FromStr;
19use std::sync::Arc;
20
21use arrow_schema::DataType;
22use async_trait::async_trait;
23use catalog::table_source::DfTableSourceProvider;
24use common_error::ext::BoxedError;
25use common_telemetry::tracing;
26use datafusion::common::{DFSchema, plan_err};
27use datafusion::execution::context::SessionState;
28use datafusion::sql::planner::PlannerContext;
29use datafusion_common::ToDFSchema;
30use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
31use datafusion_expr::{
32    Analyze, Explain, ExplainFormat, Expr as DfExpr, LogicalPlan, LogicalPlanBuilder, PlanType,
33    ToStringifiedPlan, col,
34};
35use datafusion_sql::planner::{ParserOptions, SqlToRel};
36use log_query::LogQuery;
37use promql_parser::parser::EvalStmt;
38use session::context::QueryContextRef;
39use snafu::{ResultExt, ensure};
40use sql::CteContent;
41use sql::ast::Expr as SqlExpr;
42use sql::statements::explain::ExplainStatement;
43use sql::statements::query::Query;
44use sql::statements::statement::Statement;
45use sql::statements::tql::Tql;
46
47use crate::error::{
48    CteColumnSchemaMismatchSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu,
49    UnimplementedSnafu,
50};
51use crate::log_query::planner::LogQueryPlanner;
52use crate::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser, QueryStatement};
53use crate::promql::planner::PromPlanner;
54use crate::query_engine::{DefaultPlanDecoder, QueryEngineState};
55use crate::range_select::plan_rewrite::RangePlanRewriter;
56use crate::{DfContextProviderAdapter, QueryEngineContext};
57
58#[async_trait]
59pub trait LogicalPlanner: Send + Sync {
60    async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan>;
61
62    async fn plan_logs_query(
63        &self,
64        query: LogQuery,
65        query_ctx: QueryContextRef,
66    ) -> Result<LogicalPlan>;
67
68    fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan>;
69
70    fn as_any(&self) -> &dyn Any;
71}
72
73pub struct DfLogicalPlanner {
74    engine_state: Arc<QueryEngineState>,
75    session_state: SessionState,
76}
77
78impl DfLogicalPlanner {
79    pub fn new(engine_state: Arc<QueryEngineState>) -> Self {
80        let session_state = engine_state.session_state();
81        Self {
82            engine_state,
83            session_state,
84        }
85    }
86
87    /// Basically the same with `explain_to_plan` in DataFusion, but adapted to Greptime's
88    /// `plan_sql` to support Greptime Statements.
89    async fn explain_to_plan(
90        &self,
91        explain: &ExplainStatement,
92        query_ctx: QueryContextRef,
93    ) -> Result<LogicalPlan> {
94        let plan = self.plan_sql(&explain.statement, query_ctx).await?;
95        if matches!(plan, LogicalPlan::Explain(_)) {
96            return plan_err!("Nested EXPLAINs are not supported").context(PlanSqlSnafu);
97        }
98
99        let verbose = explain.verbose;
100        let analyze = explain.analyze;
101        let format = explain.format.map(|f| f.to_string());
102
103        let plan = Arc::new(plan);
104        let schema = LogicalPlan::explain_schema();
105        let schema = ToDFSchema::to_dfschema_ref(schema)?;
106
107        if verbose && format.is_some() {
108            return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported").context(PlanSqlSnafu);
109        }
110
111        if analyze {
112            // notice format is already set in query context, so can be ignore here
113            Ok(LogicalPlan::Analyze(Analyze {
114                verbose,
115                input: plan,
116                schema,
117            }))
118        } else {
119            let stringified_plans = vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
120
121            // default to configuration value
122            let options = self.session_state.config().options();
123            let format = format
124                .map(|x| ExplainFormat::from_str(&x))
125                .transpose()?
126                .unwrap_or_else(|| options.explain.format.clone());
127
128            Ok(LogicalPlan::Explain(Explain {
129                verbose,
130                explain_format: format,
131                plan,
132                stringified_plans,
133                schema,
134                logical_optimization_succeeded: false,
135            }))
136        }
137    }
138
139    #[tracing::instrument(skip_all)]
140    #[async_recursion::async_recursion]
141    async fn plan_sql(&self, stmt: &Statement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
142        let mut planner_context = PlannerContext::new();
143        let mut stmt = Cow::Borrowed(stmt);
144        let mut is_tql_cte = false;
145
146        // handle explain before normal processing so we can explain Greptime Statements
147        if let Statement::Explain(explain) = stmt.as_ref() {
148            return self.explain_to_plan(explain, query_ctx).await;
149        }
150
151        // Check for hybrid CTEs before normal processing
152        if self.has_hybrid_ctes(stmt.as_ref()) {
153            let stmt_owned = stmt.into_owned();
154            let mut query = match stmt_owned {
155                Statement::Query(query) => query.as_ref().clone(),
156                _ => unreachable!("has_hybrid_ctes should only return true for Query statements"),
157            };
158            self.plan_query_with_hybrid_ctes(&query, query_ctx.clone(), &mut planner_context)
159                .await?;
160
161            // remove the processed TQL CTEs from the query
162            query.hybrid_cte = None;
163            stmt = Cow::Owned(Statement::Query(Box::new(query)));
164            is_tql_cte = true;
165        }
166
167        let mut df_stmt = stmt.as_ref().try_into().context(SqlSnafu)?;
168
169        // TODO(LFC): Remove this when Datafusion supports **both** the syntax and implementation of "explain with format".
170        if let datafusion::sql::parser::Statement::Statement(
171            box datafusion::sql::sqlparser::ast::Statement::Explain { .. },
172        ) = &mut df_stmt
173        {
174            UnimplementedSnafu {
175                operation: "EXPLAIN with FORMAT using raw datafusion planner",
176            }
177            .fail()?;
178        }
179
180        let table_provider = DfTableSourceProvider::new(
181            self.engine_state.catalog_manager().clone(),
182            self.engine_state.disallow_cross_catalog_query(),
183            query_ctx.clone(),
184            Arc::new(DefaultPlanDecoder::new(
185                self.session_state.clone(),
186                &query_ctx,
187            )?),
188            self.session_state
189                .config_options()
190                .sql_parser
191                .enable_ident_normalization,
192        );
193
194        let context_provider = DfContextProviderAdapter::try_new(
195            self.engine_state.clone(),
196            self.session_state.clone(),
197            Some(&df_stmt),
198            query_ctx.clone(),
199        )
200        .await?;
201
202        let config_options = self.session_state.config().options();
203        let parser_options = &config_options.sql_parser;
204        let parser_options = ParserOptions {
205            map_string_types_to_utf8view: false,
206            ..parser_options.into()
207        };
208
209        let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
210
211        // this IF is to handle different version of ASTs
212        let result = if is_tql_cte {
213            let Statement::Query(query) = stmt.into_owned() else {
214                unreachable!("is_tql_cte should only be true for Query statements");
215            };
216            let sqlparser_stmt = sqlparser::ast::Statement::Query(Box::new(query.inner));
217            sql_to_rel
218                .sql_statement_to_plan_with_context(sqlparser_stmt, &mut planner_context)
219                .context(PlanSqlSnafu)?
220        } else {
221            sql_to_rel
222                .statement_to_plan(df_stmt)
223                .context(PlanSqlSnafu)?
224        };
225
226        common_telemetry::debug!("Logical planner, statement to plan result: {result}");
227        let plan = RangePlanRewriter::new(table_provider, query_ctx.clone())
228            .rewrite(result)
229            .await?;
230
231        // Optimize logical plan by extension rules
232        let context = QueryEngineContext::new(self.session_state.clone(), query_ctx);
233        let plan = self
234            .engine_state
235            .optimize_by_extension_rules(plan, &context)?;
236        common_telemetry::debug!("Logical planner, optimize result: {plan}");
237
238        Ok(plan)
239    }
240
241    /// Generate a relational expression from a SQL expression
242    #[tracing::instrument(skip_all)]
243    pub(crate) async fn sql_to_expr(
244        &self,
245        sql: SqlExpr,
246        schema: &DFSchema,
247        normalize_ident: bool,
248        query_ctx: QueryContextRef,
249    ) -> Result<DfExpr> {
250        let context_provider = DfContextProviderAdapter::try_new(
251            self.engine_state.clone(),
252            self.session_state.clone(),
253            None,
254            query_ctx,
255        )
256        .await?;
257
258        let config_options = self.session_state.config().options();
259        let parser_options = &config_options.sql_parser;
260        let parser_options: ParserOptions = ParserOptions {
261            map_string_types_to_utf8view: false,
262            enable_ident_normalization: normalize_ident,
263            ..parser_options.into()
264        };
265
266        let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
267
268        Ok(sql_to_rel.sql_to_expr(sql, schema, &mut PlannerContext::new())?)
269    }
270
271    #[tracing::instrument(skip_all)]
272    async fn plan_pql(&self, stmt: &EvalStmt, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
273        let plan_decoder = Arc::new(DefaultPlanDecoder::new(
274            self.session_state.clone(),
275            &query_ctx,
276        )?);
277        let table_provider = DfTableSourceProvider::new(
278            self.engine_state.catalog_manager().clone(),
279            self.engine_state.disallow_cross_catalog_query(),
280            query_ctx,
281            plan_decoder,
282            self.session_state
283                .config_options()
284                .sql_parser
285                .enable_ident_normalization,
286        );
287        PromPlanner::stmt_to_plan(table_provider, stmt, &self.engine_state)
288            .await
289            .map_err(BoxedError::new)
290            .context(QueryPlanSnafu)
291    }
292
293    #[tracing::instrument(skip_all)]
294    fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
295        Ok(self.engine_state.optimize_logical_plan(plan)?)
296    }
297
298    /// Check if a statement contains hybrid CTEs (mix of SQL and TQL)
299    fn has_hybrid_ctes(&self, stmt: &Statement) -> bool {
300        if let Statement::Query(query) = stmt {
301            query
302                .hybrid_cte
303                .as_ref()
304                .map(|hybrid_cte| !hybrid_cte.cte_tables.is_empty())
305                .unwrap_or(false)
306        } else {
307            false
308        }
309    }
310
311    /// Plan a query with hybrid CTEs using DataFusion's native PlannerContext
312    async fn plan_query_with_hybrid_ctes(
313        &self,
314        query: &Query,
315        query_ctx: QueryContextRef,
316        planner_context: &mut PlannerContext,
317    ) -> Result<()> {
318        let hybrid_cte = query.hybrid_cte.as_ref().unwrap();
319
320        for cte in &hybrid_cte.cte_tables {
321            match &cte.content {
322                CteContent::Tql(tql) => {
323                    // Plan TQL and register in PlannerContext
324                    let mut logical_plan = self.tql_to_logical_plan(tql, query_ctx.clone()).await?;
325                    if !cte.columns.is_empty() {
326                        let schema = logical_plan.schema();
327                        let schema_fields = schema.fields().to_vec();
328                        ensure!(
329                            schema_fields.len() == cte.columns.len(),
330                            CteColumnSchemaMismatchSnafu {
331                                cte_name: cte.name.value.clone(),
332                                original: schema_fields
333                                    .iter()
334                                    .map(|field| field.name().clone())
335                                    .collect::<Vec<_>>(),
336                                expected: cte
337                                    .columns
338                                    .iter()
339                                    .map(|column| column.to_string())
340                                    .collect::<Vec<_>>(),
341                            }
342                        );
343                        let aliases = cte
344                            .columns
345                            .iter()
346                            .zip(schema_fields.iter())
347                            .map(|(column, field)| col(field.name()).alias(column.to_string()));
348                        logical_plan = LogicalPlanBuilder::from(logical_plan)
349                            .project(aliases)
350                            .context(PlanSqlSnafu)?
351                            .build()
352                            .context(PlanSqlSnafu)?;
353                    }
354
355                    // Wrap in SubqueryAlias to ensure proper table qualification for CTE
356                    logical_plan = LogicalPlan::SubqueryAlias(
357                        datafusion_expr::SubqueryAlias::try_new(
358                            Arc::new(logical_plan),
359                            cte.name.value.clone(),
360                        )
361                        .context(PlanSqlSnafu)?,
362                    );
363
364                    planner_context.insert_cte(&cte.name.value, logical_plan);
365                }
366                CteContent::Sql(_) => {
367                    // SQL CTEs should have been moved to the main query's WITH clause
368                    // during parsing, so we shouldn't encounter them here
369                    unreachable!("SQL CTEs should not be in hybrid_cte.cte_tables");
370                }
371            }
372        }
373
374        Ok(())
375    }
376
377    /// Convert TQL to LogicalPlan directly
378    async fn tql_to_logical_plan(
379        &self,
380        tql: &Tql,
381        query_ctx: QueryContextRef,
382    ) -> Result<LogicalPlan> {
383        match tql {
384            Tql::Eval(eval) => {
385                // Convert TqlEval to PromQuery then to QueryStatement::Promql
386                let prom_query = PromQuery {
387                    query: eval.query.clone(),
388                    start: eval.start.clone(),
389                    end: eval.end.clone(),
390                    step: eval.step.clone(),
391                    lookback: eval
392                        .lookback
393                        .clone()
394                        .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
395                    alias: eval.alias.clone(),
396                };
397                let stmt = QueryLanguageParser::parse_promql(&prom_query, &query_ctx)?;
398
399                self.plan(&stmt, query_ctx).await
400            }
401            Tql::Explain(_) => UnimplementedSnafu {
402                operation: "TQL EXPLAIN in CTEs",
403            }
404            .fail(),
405            Tql::Analyze(_) => UnimplementedSnafu {
406                operation: "TQL ANALYZE in CTEs",
407            }
408            .fail(),
409        }
410    }
411
412    /// Extracts cast types for all placeholders in a logical plan.
413    /// Returns a map where each placeholder ID is mapped to:
414    /// - Some(DataType) if the placeholder is cast to a specific type
415    /// - None if the placeholder exists but has no cast
416    ///
417    /// Example: `$1::TEXT` returns `{"$1": Some(DataType::Utf8)}`
418    ///
419    /// This function walks through all expressions in the logical plan,
420    /// including subqueries, to identify placeholders and their cast types.
421    fn extract_placeholder_cast_types(
422        plan: &LogicalPlan,
423    ) -> Result<HashMap<String, Option<DataType>>> {
424        let mut placeholder_types = HashMap::new();
425        let mut casted_placeholders = HashSet::new();
426
427        plan.apply(|node| {
428            for expr in node.expressions() {
429                let _ = expr.apply(|e| {
430                    if let DfExpr::Cast(cast) = e
431                        && let DfExpr::Placeholder(ph) = &*cast.expr
432                    {
433                        placeholder_types.insert(ph.id.clone(), Some(cast.data_type.clone()));
434                        casted_placeholders.insert(ph.id.clone());
435                    }
436
437                    if let DfExpr::Placeholder(ph) = e
438                        && !casted_placeholders.contains(&ph.id)
439                        && !placeholder_types.contains_key(&ph.id)
440                    {
441                        placeholder_types.insert(ph.id.clone(), None);
442                    }
443
444                    Ok(TreeNodeRecursion::Continue)
445                });
446            }
447            Ok(TreeNodeRecursion::Continue)
448        })?;
449
450        Ok(placeholder_types)
451    }
452
453    /// Gets inferred parameter types from a logical plan.
454    /// Returns a map where each parameter ID is mapped to:
455    /// - Some(DataType) if the parameter type could be inferred
456    /// - None if the parameter type could not be inferred
457    ///
458    /// This function first uses DataFusion's `get_parameter_types()` to infer types.
459    /// If any parameters have `None` values (i.e., DataFusion couldn't infer their types),
460    /// it falls back to using `extract_placeholder_cast_types()` to detect explicit casts.
461    ///
462    /// This is because datafusion can only infer types for a limited cases.
463    ///
464    /// Example: For query `WHERE $1::TEXT AND $2`, DataFusion may not infer `$2`'s type,
465    /// but this function will return `{"$1": Some(DataType::Utf8), "$2": None}`.
466    pub fn get_inferred_parameter_types(
467        plan: &LogicalPlan,
468    ) -> Result<HashMap<String, Option<DataType>>> {
469        let param_types = plan.get_parameter_types().context(PlanSqlSnafu)?;
470
471        let has_none = param_types.values().any(|v| v.is_none());
472
473        if !has_none {
474            Ok(param_types)
475        } else {
476            let cast_types = Self::extract_placeholder_cast_types(plan)?;
477
478            let mut merged = param_types;
479
480            for (id, opt_type) in cast_types {
481                merged
482                    .entry(id)
483                    .and_modify(|existing| {
484                        if existing.is_none() {
485                            *existing = opt_type.clone();
486                        }
487                    })
488                    .or_insert(opt_type);
489            }
490
491            Ok(merged)
492        }
493    }
494}
495
496#[async_trait]
497impl LogicalPlanner for DfLogicalPlanner {
498    #[tracing::instrument(skip_all)]
499    async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
500        match stmt {
501            QueryStatement::Sql(stmt) => self.plan_sql(stmt, query_ctx).await,
502            QueryStatement::Promql(stmt, _alias) => self.plan_pql(stmt, query_ctx).await,
503        }
504    }
505
506    async fn plan_logs_query(
507        &self,
508        query: LogQuery,
509        query_ctx: QueryContextRef,
510    ) -> Result<LogicalPlan> {
511        let plan_decoder = Arc::new(DefaultPlanDecoder::new(
512            self.session_state.clone(),
513            &query_ctx,
514        )?);
515        let table_provider = DfTableSourceProvider::new(
516            self.engine_state.catalog_manager().clone(),
517            self.engine_state.disallow_cross_catalog_query(),
518            query_ctx,
519            plan_decoder,
520            self.session_state
521                .config_options()
522                .sql_parser
523                .enable_ident_normalization,
524        );
525
526        let mut planner = LogQueryPlanner::new(table_provider, self.session_state.clone());
527        planner
528            .query_to_plan(query)
529            .await
530            .map_err(BoxedError::new)
531            .context(QueryPlanSnafu)
532    }
533
534    fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
535        self.optimize_logical_plan(plan)
536    }
537
538    fn as_any(&self) -> &dyn Any {
539        self
540    }
541}
542
543#[cfg(test)]
544mod tests {
545    use std::sync::Arc;
546
547    use arrow_schema::DataType;
548    use datatypes::prelude::ConcreteDataType;
549    use datatypes::schema::{ColumnSchema, Schema};
550    use session::context::QueryContext;
551    use table::metadata::{TableInfoBuilder, TableMetaBuilder};
552    use table::test_util::EmptyTable;
553
554    use super::*;
555    use crate::QueryEngineRef;
556    use crate::parser::QueryLanguageParser;
557
558    async fn create_test_engine() -> QueryEngineRef {
559        let columns = vec![
560            ColumnSchema::new("id", ConcreteDataType::int32_datatype(), false),
561            ColumnSchema::new("name", ConcreteDataType::string_datatype(), true),
562        ];
563        let schema = Arc::new(Schema::new(columns));
564        let table_meta = TableMetaBuilder::empty()
565            .schema(schema)
566            .primary_key_indices(vec![0])
567            .value_indices(vec![1])
568            .next_column_id(1024)
569            .build()
570            .unwrap();
571        let table_info = TableInfoBuilder::new("test", table_meta).build().unwrap();
572        let table = EmptyTable::from_table_info(&table_info);
573
574        crate::tests::new_query_engine_with_table(table)
575    }
576
577    async fn parse_sql_to_plan(sql: &str) -> LogicalPlan {
578        let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
579        let engine = create_test_engine().await;
580        engine
581            .planner()
582            .plan(&stmt, QueryContext::arc())
583            .await
584            .unwrap()
585    }
586
587    #[tokio::test]
588    async fn test_extract_placeholder_cast_types_multiple() {
589        let plan = parse_sql_to_plan(
590            "SELECT $1::INT, $2::TEXT, $3, $4::INTEGER FROM test WHERE $5::FLOAT > 0",
591        )
592        .await;
593        let types = DfLogicalPlanner::extract_placeholder_cast_types(&plan).unwrap();
594
595        assert_eq!(types.len(), 5);
596        assert_eq!(types.get("$1"), Some(&Some(DataType::Int32)));
597        assert_eq!(types.get("$2"), Some(&Some(DataType::Utf8)));
598        assert_eq!(types.get("$3"), Some(&None));
599        assert_eq!(types.get("$4"), Some(&Some(DataType::Int32)));
600        assert_eq!(types.get("$5"), Some(&Some(DataType::Float32)));
601    }
602
603    #[tokio::test]
604    async fn test_get_inferred_parameter_types_fallback_for_udf_args() {
605        // datafusion is not able to infer type for scalar function arguments
606        let plan = parse_sql_to_plan(
607            "SELECT parse_ident($1), parse_ident($2::TEXT) FROM test WHERE id > $3",
608        )
609        .await;
610        let types = DfLogicalPlanner::get_inferred_parameter_types(&plan).unwrap();
611
612        assert_eq!(types.len(), 3);
613
614        let type_1 = types.get("$1").unwrap();
615        let type_2 = types.get("$2").unwrap();
616        let type_3 = types.get("$3").unwrap();
617
618        assert!(type_1.is_none(), "Expected $1 to be None");
619        assert_eq!(type_2, &Some(DataType::Utf8));
620        assert_eq!(type_3, &Some(DataType::Int32));
621    }
622}