query/
query_engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod context;
16mod default_serializer;
17pub mod options;
18mod state;
19use std::any::Any;
20use std::sync::Arc;
21
22use async_trait::async_trait;
23use catalog::CatalogManagerRef;
24use common_base::Plugins;
25use common_function::function_factory::ScalarFunctionFactory;
26use common_function::function_registry::FUNCTION_REGISTRY;
27use common_function::handlers::{
28    FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
29};
30use common_query::Output;
31use datafusion::catalog::TableFunction;
32use datafusion::dataframe::DataFrame;
33use datafusion_expr::{AggregateUDF, LogicalPlan, WindowUDF};
34use datatypes::schema::Schema;
35pub use default_serializer::{DefaultPlanDecoder, DefaultSerializer};
36use partition::manager::PartitionRuleManagerRef;
37use session::context::QueryContextRef;
38use table::TableRef;
39
40use crate::datafusion::DatafusionQueryEngine;
41use crate::error::Result;
42use crate::options::QueryOptions;
43use crate::planner::LogicalPlanner;
44pub use crate::query_engine::context::QueryEngineContext;
45pub use crate::query_engine::state::QueryEngineState;
46use crate::region_query::RegionQueryHandlerRef;
47
48/// Describe statement result
49#[derive(Debug)]
50pub struct DescribeResult {
51    /// The schema of statement
52    pub schema: Schema,
53    /// The logical plan for statement
54    pub logical_plan: LogicalPlan,
55}
56
57#[async_trait]
58pub trait QueryEngine: Send + Sync {
59    /// Returns the query engine as Any
60    /// so that it can be downcast to a specific implementation.
61    fn as_any(&self) -> &dyn Any;
62
63    /// Returns the logical planner
64    fn planner(&self) -> Arc<dyn LogicalPlanner>;
65
66    /// Returns the query engine name.
67    fn name(&self) -> &str;
68
69    /// Describe the given [`LogicalPlan`].
70    async fn describe(
71        &self,
72        plan: LogicalPlan,
73        query_ctx: QueryContextRef,
74    ) -> Result<DescribeResult>;
75
76    /// Execute the given [`LogicalPlan`].
77    async fn execute(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output>;
78
79    /// Register an aggregate function.
80    ///
81    /// # Panics
82    /// Will panic if the function with same name is already registered.
83    fn register_aggregate_function(&self, func: AggregateUDF);
84
85    /// Register a scalar function.
86    /// Will override if the function with same name is already registered.
87    fn register_scalar_function(&self, func: ScalarFunctionFactory);
88
89    /// Register table function
90    fn register_table_function(&self, func: Arc<TableFunction>);
91
92    /// Register a window function (UDWF).
93    fn register_window_function(&self, func: WindowUDF);
94
95    /// Create a DataFrame from a table.
96    fn read_table(&self, table: TableRef) -> Result<DataFrame>;
97
98    /// Create a [`QueryEngineContext`].
99    fn engine_context(&self, query_ctx: QueryContextRef) -> QueryEngineContext;
100
101    /// Retrieve the query engine state [`QueryEngineState`]
102    fn engine_state(&self) -> &QueryEngineState;
103}
104
105pub struct QueryEngineFactory {
106    query_engine: Arc<dyn QueryEngine>,
107}
108
109impl QueryEngineFactory {
110    pub fn new(
111        catalog_manager: CatalogManagerRef,
112        region_query_handler: Option<RegionQueryHandlerRef>,
113        table_mutation_handler: Option<TableMutationHandlerRef>,
114        procedure_service_handler: Option<ProcedureServiceHandlerRef>,
115        flow_service_handler: Option<FlowServiceHandlerRef>,
116        with_dist_planner: bool,
117        options: QueryOptions,
118    ) -> Self {
119        Self::new_with_plugins(
120            catalog_manager,
121            None,
122            region_query_handler,
123            table_mutation_handler,
124            procedure_service_handler,
125            flow_service_handler,
126            with_dist_planner,
127            Default::default(),
128            options,
129        )
130    }
131
132    #[allow(clippy::too_many_arguments)]
133    pub fn new_with_plugins(
134        catalog_manager: CatalogManagerRef,
135        partition_rule_manager: Option<PartitionRuleManagerRef>,
136        region_query_handler: Option<RegionQueryHandlerRef>,
137        table_mutation_handler: Option<TableMutationHandlerRef>,
138        procedure_service_handler: Option<ProcedureServiceHandlerRef>,
139        flow_service_handler: Option<FlowServiceHandlerRef>,
140        with_dist_planner: bool,
141        plugins: Plugins,
142        options: QueryOptions,
143    ) -> Self {
144        let state = Arc::new(QueryEngineState::new(
145            catalog_manager,
146            partition_rule_manager,
147            region_query_handler,
148            table_mutation_handler,
149            procedure_service_handler,
150            flow_service_handler,
151            with_dist_planner,
152            plugins.clone(),
153            options,
154        ));
155        let query_engine = Arc::new(DatafusionQueryEngine::new(state, plugins));
156        register_functions(&query_engine);
157        Self { query_engine }
158    }
159
160    pub fn query_engine(&self) -> QueryEngineRef {
161        self.query_engine.clone()
162    }
163}
164
165/// Register all functions implemented by GreptimeDB
166fn register_functions(query_engine: &Arc<DatafusionQueryEngine>) {
167    for func in FUNCTION_REGISTRY.scalar_functions() {
168        query_engine.register_scalar_function(func);
169    }
170
171    for accumulator in FUNCTION_REGISTRY.aggregate_functions() {
172        query_engine.register_aggregate_function(accumulator);
173    }
174
175    for table_function in FUNCTION_REGISTRY.table_functions() {
176        query_engine.register_table_function(table_function);
177    }
178
179    for window_function in FUNCTION_REGISTRY.window_functions() {
180        query_engine.register_window_function(window_function);
181    }
182}
183
184pub type QueryEngineRef = Arc<dyn QueryEngine>;
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189
190    #[test]
191    fn test_query_engine_factory() {
192        let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap();
193        let factory = QueryEngineFactory::new(
194            catalog_list,
195            None,
196            None,
197            None,
198            None,
199            false,
200            QueryOptions::default(),
201        );
202
203        let engine = factory.query_engine();
204
205        assert_eq!("datafusion", engine.name());
206    }
207}