query/
planner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use catalog::table_source::DfTableSourceProvider;
20use common_error::ext::BoxedError;
21use common_telemetry::tracing;
22use datafusion::common::DFSchema;
23use datafusion::execution::context::SessionState;
24use datafusion::sql::planner::PlannerContext;
25use datafusion_expr::{Expr as DfExpr, LogicalPlan};
26use datafusion_sql::planner::{ParserOptions, SqlToRel};
27use log_query::LogQuery;
28use promql_parser::parser::EvalStmt;
29use session::context::QueryContextRef;
30use snafu::ResultExt;
31use sql::ast::Expr as SqlExpr;
32use sql::statements::statement::Statement;
33
34use crate::error::{PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu};
35use crate::log_query::planner::LogQueryPlanner;
36use crate::parser::QueryStatement;
37use crate::promql::planner::PromPlanner;
38use crate::query_engine::{DefaultPlanDecoder, QueryEngineState};
39use crate::range_select::plan_rewrite::RangePlanRewriter;
40use crate::{DfContextProviderAdapter, QueryEngineContext};
41
42#[async_trait]
43pub trait LogicalPlanner: Send + Sync {
44    async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan>;
45
46    async fn plan_logs_query(
47        &self,
48        query: LogQuery,
49        query_ctx: QueryContextRef,
50    ) -> Result<LogicalPlan>;
51
52    fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan>;
53
54    fn as_any(&self) -> &dyn Any;
55}
56
57pub struct DfLogicalPlanner {
58    engine_state: Arc<QueryEngineState>,
59    session_state: SessionState,
60}
61
62impl DfLogicalPlanner {
63    pub fn new(engine_state: Arc<QueryEngineState>) -> Self {
64        let session_state = engine_state.session_state();
65        Self {
66            engine_state,
67            session_state,
68        }
69    }
70
71    #[tracing::instrument(skip_all)]
72    async fn plan_sql(&self, stmt: &Statement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
73        let df_stmt = stmt.try_into().context(SqlSnafu)?;
74
75        let table_provider = DfTableSourceProvider::new(
76            self.engine_state.catalog_manager().clone(),
77            self.engine_state.disallow_cross_catalog_query(),
78            query_ctx.clone(),
79            Arc::new(DefaultPlanDecoder::new(
80                self.session_state.clone(),
81                &query_ctx,
82            )?),
83            self.session_state
84                .config_options()
85                .sql_parser
86                .enable_ident_normalization,
87        );
88
89        let context_provider = DfContextProviderAdapter::try_new(
90            self.engine_state.clone(),
91            self.session_state.clone(),
92            Some(&df_stmt),
93            query_ctx.clone(),
94        )
95        .await?;
96
97        let config_options = self.session_state.config().options();
98        let parser_options = &config_options.sql_parser;
99        let parser_options = ParserOptions {
100            enable_ident_normalization: parser_options.enable_ident_normalization,
101            parse_float_as_decimal: parser_options.parse_float_as_decimal,
102            support_varchar_with_length: parser_options.support_varchar_with_length,
103            enable_options_value_normalization: parser_options.enable_options_value_normalization,
104            collect_spans: parser_options.collect_spans,
105        };
106
107        let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
108
109        let result = sql_to_rel
110            .statement_to_plan(df_stmt)
111            .context(PlanSqlSnafu)?;
112        common_telemetry::debug!("Logical planner, statement to plan result: {result}");
113        let plan = RangePlanRewriter::new(table_provider, query_ctx.clone())
114            .rewrite(result)
115            .await?;
116
117        // Optimize logical plan by extension rules
118        let context = QueryEngineContext::new(self.session_state.clone(), query_ctx);
119        let plan = self
120            .engine_state
121            .optimize_by_extension_rules(plan, &context)?;
122        common_telemetry::debug!("Logical planner, optimize result: {plan}");
123
124        Ok(plan)
125    }
126
127    /// Generate a relational expression from a SQL expression
128    #[tracing::instrument(skip_all)]
129    pub(crate) async fn sql_to_expr(
130        &self,
131        sql: SqlExpr,
132        schema: &DFSchema,
133        normalize_ident: bool,
134        query_ctx: QueryContextRef,
135    ) -> Result<DfExpr> {
136        let context_provider = DfContextProviderAdapter::try_new(
137            self.engine_state.clone(),
138            self.session_state.clone(),
139            None,
140            query_ctx,
141        )
142        .await?;
143
144        let config_options = self.session_state.config().options();
145        let parser_options = &config_options.sql_parser;
146        let parser_options = ParserOptions {
147            enable_ident_normalization: normalize_ident,
148            parse_float_as_decimal: parser_options.parse_float_as_decimal,
149            support_varchar_with_length: parser_options.support_varchar_with_length,
150            enable_options_value_normalization: parser_options.enable_options_value_normalization,
151            collect_spans: parser_options.collect_spans,
152        };
153
154        let sql_to_rel = SqlToRel::new_with_options(&context_provider, parser_options);
155
156        Ok(sql_to_rel.sql_to_expr(sql.into(), schema, &mut PlannerContext::new())?)
157    }
158
159    #[tracing::instrument(skip_all)]
160    async fn plan_pql(&self, stmt: &EvalStmt, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
161        let plan_decoder = Arc::new(DefaultPlanDecoder::new(
162            self.session_state.clone(),
163            &query_ctx,
164        )?);
165        let table_provider = DfTableSourceProvider::new(
166            self.engine_state.catalog_manager().clone(),
167            self.engine_state.disallow_cross_catalog_query(),
168            query_ctx,
169            plan_decoder,
170            self.session_state
171                .config_options()
172                .sql_parser
173                .enable_ident_normalization,
174        );
175        PromPlanner::stmt_to_plan(table_provider, stmt, &self.session_state)
176            .await
177            .map_err(BoxedError::new)
178            .context(QueryPlanSnafu)
179    }
180
181    #[tracing::instrument(skip_all)]
182    fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
183        Ok(self.engine_state.optimize_logical_plan(plan)?)
184    }
185}
186
187#[async_trait]
188impl LogicalPlanner for DfLogicalPlanner {
189    #[tracing::instrument(skip_all)]
190    async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
191        match stmt {
192            QueryStatement::Sql(stmt) => self.plan_sql(stmt, query_ctx).await,
193            QueryStatement::Promql(stmt) => self.plan_pql(stmt, query_ctx).await,
194        }
195    }
196
197    async fn plan_logs_query(
198        &self,
199        query: LogQuery,
200        query_ctx: QueryContextRef,
201    ) -> Result<LogicalPlan> {
202        let plan_decoder = Arc::new(DefaultPlanDecoder::new(
203            self.session_state.clone(),
204            &query_ctx,
205        )?);
206        let table_provider = DfTableSourceProvider::new(
207            self.engine_state.catalog_manager().clone(),
208            self.engine_state.disallow_cross_catalog_query(),
209            query_ctx,
210            plan_decoder,
211            self.session_state
212                .config_options()
213                .sql_parser
214                .enable_ident_normalization,
215        );
216
217        let mut planner = LogQueryPlanner::new(table_provider, self.session_state.clone());
218        planner
219            .query_to_plan(query)
220            .await
221            .map_err(BoxedError::new)
222            .context(QueryPlanSnafu)
223    }
224
225    fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
226        self.optimize_logical_plan(plan)
227    }
228
229    fn as_any(&self) -> &dyn Any {
230        self
231    }
232}