query/
planner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::borrow::Cow;
17use std::str::FromStr;
18use std::sync::Arc;
19
20use async_trait::async_trait;
21use catalog::table_source::DfTableSourceProvider;
22use common_error::ext::BoxedError;
23use common_telemetry::tracing;
24use datafusion::common::{DFSchema, plan_err};
25use datafusion::execution::context::SessionState;
26use datafusion::sql::planner::PlannerContext;
27use datafusion_common::ToDFSchema;
28use datafusion_expr::{
29    Analyze, Explain, ExplainFormat, Expr as DfExpr, LogicalPlan, LogicalPlanBuilder, PlanType,
30    ToStringifiedPlan, col,
31};
32use datafusion_sql::planner::{ParserOptions, SqlToRel};
33use log_query::LogQuery;
34use promql_parser::parser::EvalStmt;
35use session::context::QueryContextRef;
36use snafu::{ResultExt, ensure};
37use sql::CteContent;
38use sql::ast::Expr as SqlExpr;
39use sql::statements::explain::ExplainStatement;
40use sql::statements::query::Query;
41use sql::statements::statement::Statement;
42use sql::statements::tql::Tql;
43
44use crate::error::{
45    CteColumnSchemaMismatchSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu,
46    UnimplementedSnafu,
47};
48use crate::log_query::planner::LogQueryPlanner;
49use crate::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser, QueryStatement};
50use crate::promql::planner::PromPlanner;
51use crate::query_engine::{DefaultPlanDecoder, QueryEngineState};
52use crate::range_select::plan_rewrite::RangePlanRewriter;
53use crate::{DfContextProviderAdapter, QueryEngineContext};
54
55#[async_trait]
56pub trait LogicalPlanner: Send + Sync {
57    async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan>;
58
59    async fn plan_logs_query(
60        &self,
61        query: LogQuery,
62        query_ctx: QueryContextRef,
63    ) -> Result<LogicalPlan>;
64
65    fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan>;
66
67    fn as_any(&self) -> &dyn Any;
68}
69
70pub struct DfLogicalPlanner {
71    engine_state: Arc<QueryEngineState>,
72    session_state: SessionState,
73}
74
75impl DfLogicalPlanner {
76    pub fn new(engine_state: Arc<QueryEngineState>) -> Self {
77        let session_state = engine_state.session_state();
78        Self {
79            engine_state,
80            session_state,
81        }
82    }
83
84    /// Basically the same with `explain_to_plan` in DataFusion, but adapted to Greptime's
85    /// `plan_sql` to support Greptime Statements.
86    async fn explain_to_plan(
87        &self,
88        explain: &ExplainStatement,
89        query_ctx: QueryContextRef,
90    ) -> Result<LogicalPlan> {
91        let plan = self.plan_sql(&explain.statement, query_ctx).await?;
92        if matches!(plan, LogicalPlan::Explain(_)) {
93            return plan_err!("Nested EXPLAINs are not supported").context(PlanSqlSnafu);
94        }
95
96        let verbose = explain.verbose;
97        let analyze = explain.analyze;
98        let format = explain.format.map(|f| f.to_string());
99
100        let plan = Arc::new(plan);
101        let schema = LogicalPlan::explain_schema();
102        let schema = ToDFSchema::to_dfschema_ref(schema)?;
103
104        if verbose && format.is_some() {
105            return plan_err!("EXPLAIN VERBOSE with FORMAT is not supported").context(PlanSqlSnafu);
106        }
107
108        if analyze {
109            // notice format is already set in query context, so can be ignore here
110            Ok(LogicalPlan::Analyze(Analyze {
111                verbose,
112                input: plan,
113                schema,
114            }))
115        } else {
116            let stringified_plans = vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
117
118            // default to configuration value
119            let options = self.session_state.config().options();
120            let format = format
121                .map(|x| ExplainFormat::from_str(&x))
122                .transpose()?
123                .unwrap_or_else(|| options.explain.format.clone());
124
125            Ok(LogicalPlan::Explain(Explain {
126                verbose,
127                explain_format: format,
128                plan,
129                stringified_plans,
130                schema,
131                logical_optimization_succeeded: false,
132            }))
133        }
134    }
135
136    #[tracing::instrument(skip_all)]
137    #[async_recursion::async_recursion]
138    async fn plan_sql(&self, stmt: &Statement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
139        let mut planner_context = PlannerContext::new();
140        let mut stmt = Cow::Borrowed(stmt);
141        let mut is_tql_cte = false;
142
143        // handle explain before normal processing so we can explain Greptime Statements
144        if let Statement::Explain(explain) = stmt.as_ref() {
145            return self.explain_to_plan(explain, query_ctx).await;
146        }
147
148        // Check for hybrid CTEs before normal processing
149        if self.has_hybrid_ctes(stmt.as_ref()) {
150            let stmt_owned = stmt.into_owned();
151            let mut query = match stmt_owned {
152                Statement::Query(query) => query.as_ref().clone(),
153                _ => unreachable!("has_hybrid_ctes should only return true for Query statements"),
154            };
155            self.plan_query_with_hybrid_ctes(&query, query_ctx.clone(), &mut planner_context)
156                .await?;
157
158            // remove the processed TQL CTEs from the query
159            query.hybrid_cte = None;
160            stmt = Cow::Owned(Statement::Query(Box::new(query)));
161            is_tql_cte = true;
162        }
163
164        let mut df_stmt = stmt.as_ref().try_into().context(SqlSnafu)?;
165
166        // TODO(LFC): Remove this when Datafusion supports **both** the syntax and implementation of "explain with format".
167        if let datafusion::sql::parser::Statement::Statement(
168            box datafusion::sql::sqlparser::ast::Statement::Explain { .. },
169        ) = &mut df_stmt
170        {
171            UnimplementedSnafu {
172                operation: "EXPLAIN with FORMAT using raw datafusion planner",
173            }
174            .fail()?;
175        }
176
177        let table_provider = DfTableSourceProvider::new(
178            self.engine_state.catalog_manager().clone(),
179            self.engine_state.disallow_cross_catalog_query(),
180            query_ctx.clone(),
181            Arc::new(DefaultPlanDecoder::new(
182                self.session_state.clone(),
183                &query_ctx,
184            )?),
185            self.session_state
186                .config_options()
187                .sql_parser
188                .enable_ident_normalization,
189        );
190
191        let context_provider = DfContextProviderAdapter::try_new(
192            self.engine_state.clone(),
193            self.session_state.clone(),
194            Some(&df_stmt),
195            query_ctx.clone(),
196        )
197        .await?;
198
199        let config_options = self.session_state.config().options();
200        let parser_options = &config_options.sql_parser;
201        let parser_options = ParserOptions {
202            map_string_types_to_utf8view: false,
203            ..parser_options.into()
204        };
205
206        let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
207
208        // this IF is to handle different version of ASTs
209        let result = if is_tql_cte {
210            let Statement::Query(query) = stmt.into_owned() else {
211                unreachable!("is_tql_cte should only be true for Query statements");
212            };
213            let sqlparser_stmt = sqlparser::ast::Statement::Query(Box::new(query.inner));
214            sql_to_rel
215                .sql_statement_to_plan_with_context(sqlparser_stmt, &mut planner_context)
216                .context(PlanSqlSnafu)?
217        } else {
218            sql_to_rel
219                .statement_to_plan(df_stmt)
220                .context(PlanSqlSnafu)?
221        };
222
223        common_telemetry::debug!("Logical planner, statement to plan result: {result}");
224        let plan = RangePlanRewriter::new(table_provider, query_ctx.clone())
225            .rewrite(result)
226            .await?;
227
228        // Optimize logical plan by extension rules
229        let context = QueryEngineContext::new(self.session_state.clone(), query_ctx);
230        let plan = self
231            .engine_state
232            .optimize_by_extension_rules(plan, &context)?;
233        common_telemetry::debug!("Logical planner, optimize result: {plan}");
234
235        Ok(plan)
236    }
237
238    /// Generate a relational expression from a SQL expression
239    #[tracing::instrument(skip_all)]
240    pub(crate) async fn sql_to_expr(
241        &self,
242        sql: SqlExpr,
243        schema: &DFSchema,
244        normalize_ident: bool,
245        query_ctx: QueryContextRef,
246    ) -> Result<DfExpr> {
247        let context_provider = DfContextProviderAdapter::try_new(
248            self.engine_state.clone(),
249            self.session_state.clone(),
250            None,
251            query_ctx,
252        )
253        .await?;
254
255        let config_options = self.session_state.config().options();
256        let parser_options = &config_options.sql_parser;
257        let parser_options: ParserOptions = ParserOptions {
258            map_string_types_to_utf8view: false,
259            enable_ident_normalization: normalize_ident,
260            ..parser_options.into()
261        };
262
263        let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
264
265        Ok(sql_to_rel.sql_to_expr(sql, schema, &mut PlannerContext::new())?)
266    }
267
268    #[tracing::instrument(skip_all)]
269    async fn plan_pql(
270        &self,
271        stmt: &EvalStmt,
272        alias: Option<String>,
273        query_ctx: QueryContextRef,
274    ) -> Result<LogicalPlan> {
275        let plan_decoder = Arc::new(DefaultPlanDecoder::new(
276            self.session_state.clone(),
277            &query_ctx,
278        )?);
279        let table_provider = DfTableSourceProvider::new(
280            self.engine_state.catalog_manager().clone(),
281            self.engine_state.disallow_cross_catalog_query(),
282            query_ctx,
283            plan_decoder,
284            self.session_state
285                .config_options()
286                .sql_parser
287                .enable_ident_normalization,
288        );
289        PromPlanner::stmt_to_plan_with_alias(table_provider, stmt, alias, &self.engine_state)
290            .await
291            .map_err(BoxedError::new)
292            .context(QueryPlanSnafu)
293    }
294
295    #[tracing::instrument(skip_all)]
296    fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
297        Ok(self.engine_state.optimize_logical_plan(plan)?)
298    }
299
300    /// Check if a statement contains hybrid CTEs (mix of SQL and TQL)
301    fn has_hybrid_ctes(&self, stmt: &Statement) -> bool {
302        if let Statement::Query(query) = stmt {
303            query
304                .hybrid_cte
305                .as_ref()
306                .map(|hybrid_cte| !hybrid_cte.cte_tables.is_empty())
307                .unwrap_or(false)
308        } else {
309            false
310        }
311    }
312
313    /// Plan a query with hybrid CTEs using DataFusion's native PlannerContext
314    async fn plan_query_with_hybrid_ctes(
315        &self,
316        query: &Query,
317        query_ctx: QueryContextRef,
318        planner_context: &mut PlannerContext,
319    ) -> Result<()> {
320        let hybrid_cte = query.hybrid_cte.as_ref().unwrap();
321
322        for cte in &hybrid_cte.cte_tables {
323            match &cte.content {
324                CteContent::Tql(tql) => {
325                    // Plan TQL and register in PlannerContext
326                    let mut logical_plan = self.tql_to_logical_plan(tql, query_ctx.clone()).await?;
327                    if !cte.columns.is_empty() {
328                        let schema = logical_plan.schema();
329                        let schema_fields = schema.fields().to_vec();
330                        ensure!(
331                            schema_fields.len() == cte.columns.len(),
332                            CteColumnSchemaMismatchSnafu {
333                                cte_name: cte.name.value.clone(),
334                                original: schema_fields
335                                    .iter()
336                                    .map(|field| field.name().clone())
337                                    .collect::<Vec<_>>(),
338                                expected: cte
339                                    .columns
340                                    .iter()
341                                    .map(|column| column.to_string())
342                                    .collect::<Vec<_>>(),
343                            }
344                        );
345                        let aliases = cte
346                            .columns
347                            .iter()
348                            .zip(schema_fields.iter())
349                            .map(|(column, field)| col(field.name()).alias(column.to_string()));
350                        logical_plan = LogicalPlanBuilder::from(logical_plan)
351                            .project(aliases)
352                            .context(PlanSqlSnafu)?
353                            .build()
354                            .context(PlanSqlSnafu)?;
355                    }
356
357                    // Wrap in SubqueryAlias to ensure proper table qualification for CTE
358                    logical_plan = LogicalPlan::SubqueryAlias(
359                        datafusion_expr::SubqueryAlias::try_new(
360                            Arc::new(logical_plan),
361                            cte.name.value.clone(),
362                        )
363                        .context(PlanSqlSnafu)?,
364                    );
365
366                    planner_context.insert_cte(&cte.name.value, logical_plan);
367                }
368                CteContent::Sql(_) => {
369                    // SQL CTEs should have been moved to the main query's WITH clause
370                    // during parsing, so we shouldn't encounter them here
371                    unreachable!("SQL CTEs should not be in hybrid_cte.cte_tables");
372                }
373            }
374        }
375
376        Ok(())
377    }
378
379    /// Convert TQL to LogicalPlan directly
380    async fn tql_to_logical_plan(
381        &self,
382        tql: &Tql,
383        query_ctx: QueryContextRef,
384    ) -> Result<LogicalPlan> {
385        match tql {
386            Tql::Eval(eval) => {
387                // Convert TqlEval to PromQuery then to QueryStatement::Promql
388                let prom_query = PromQuery {
389                    query: eval.query.clone(),
390                    start: eval.start.clone(),
391                    end: eval.end.clone(),
392                    step: eval.step.clone(),
393                    lookback: eval
394                        .lookback
395                        .clone()
396                        .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
397                    alias: eval.alias.clone(),
398                };
399                let stmt = QueryLanguageParser::parse_promql(&prom_query, &query_ctx)?;
400
401                self.plan(&stmt, query_ctx).await
402            }
403            Tql::Explain(_) => UnimplementedSnafu {
404                operation: "TQL EXPLAIN in CTEs",
405            }
406            .fail(),
407            Tql::Analyze(_) => UnimplementedSnafu {
408                operation: "TQL ANALYZE in CTEs",
409            }
410            .fail(),
411        }
412    }
413}
414
415#[async_trait]
416impl LogicalPlanner for DfLogicalPlanner {
417    #[tracing::instrument(skip_all)]
418    async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
419        match stmt {
420            QueryStatement::Sql(stmt) => self.plan_sql(stmt, query_ctx).await,
421            QueryStatement::Promql(stmt, alias) => {
422                self.plan_pql(stmt, alias.clone(), query_ctx).await
423            }
424        }
425    }
426
427    async fn plan_logs_query(
428        &self,
429        query: LogQuery,
430        query_ctx: QueryContextRef,
431    ) -> Result<LogicalPlan> {
432        let plan_decoder = Arc::new(DefaultPlanDecoder::new(
433            self.session_state.clone(),
434            &query_ctx,
435        )?);
436        let table_provider = DfTableSourceProvider::new(
437            self.engine_state.catalog_manager().clone(),
438            self.engine_state.disallow_cross_catalog_query(),
439            query_ctx,
440            plan_decoder,
441            self.session_state
442                .config_options()
443                .sql_parser
444                .enable_ident_normalization,
445        );
446
447        let mut planner = LogQueryPlanner::new(table_provider, self.session_state.clone());
448        planner
449            .query_to_plan(query)
450            .await
451            .map_err(BoxedError::new)
452            .context(QueryPlanSnafu)
453    }
454
455    fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
456        self.optimize_logical_plan(plan)
457    }
458
459    fn as_any(&self) -> &dyn Any {
460        self
461    }
462}